diff mbox series

[swugenerator,v2,1/2] generator: Implement xz support

Message ID 20251208085756.2025575-1-ernestas.k@iconn-networks.com
State Changes Requested
Headers show
Series [swugenerator,v2,1/2] generator: Implement xz support | expand

Commit Message

Ernestas Kulik Dec. 8, 2025, 8:57 a.m. UTC
While swupdate is capable of decompressing XZ archives, swugenerator
currently does not allow its use in compression. This commit adds
support for it and refactors the compressed entry processing for
maintainability.
---
 swugenerator/generator.py | 78 ++++++++++++++++-----------------------
 1 file changed, 32 insertions(+), 46 deletions(-)

Comments

Mark Jonas Dec. 9, 2025, 5:28 p.m. UTC | #1
On Mon, Dec 8, 2025 at 9:59 AM Ernestas Kulik
<ernestas.k@iconn-networks.com> wrote:
>
> While swupdate is capable of decompressing XZ archives, swugenerator
> currently does not allow its use in compression. This commit adds
> support for it and refactors the compressed entry processing for
> maintainability.
> ---
>  swugenerator/generator.py | 78 ++++++++++++++++-----------------------
>  1 file changed, 32 insertions(+), 46 deletions(-)
>
> diff --git a/swugenerator/generator.py b/swugenerator/generator.py
> index 4a4ce8e..706b850 100644
> --- a/swugenerator/generator.py
> +++ b/swugenerator/generator.py
> @@ -64,10 +64,40 @@ class SWUGenerator:
>      def close(self):
>          self.temp.cleanup()
>          self.cpiofile.add_trailer()
>          self.out.close()
>
> +    def process_compressed_entry(self, entry, cmp, new):
> +        cmds = {
> +            "xz": ["xz", "-f", "-k", "-c"],
> +            "zlib": ["gzip", "-f", "-9", "-n", "-c", "--rsyncable"],
> +            "zstd": ["zstd", "-z", "-k", "-T0", "-f", "-c"],
> +        }
> +        cmd = cmds.get(cmp)
> +        if not cmd:
> +            logging.critical("Wrong compression algorithm: %s", cmp)
> +            sys.exit(1)
> +
> +        new_path = os.path.join(self.temp.name, new.newfilename) + "." + cmp
> +        new.newfilename = new.newfilename + "." + cmp
> +
> +        cmd.extend([new.fullfilename, ">", new_path])
> +
> +        try:
> +            subprocess.run(" ".join(cmd), shell=True, check=True, text=True)
> +        except subprocess.CalledProcessError:
> +            logging.critical(
> +                "Cannot compress %s with %s", entry["filename"], cmd
> +            )
> +            sys.exit(1)
> +
> +        new.fullfilename = new_path
> +
> +        if entry.get("type") == "ubivol":
> +            entry.setdefault("properties", {}) \
> +                 .update({ "decompressed-size": str(new.getsize()) })
> +
>      def process_entry(self, entry):
>          if "filename" not in entry:
>              return
>          new = None
>          for image in self.artifacts:
> @@ -81,56 +111,12 @@ class SWUGenerator:
>                  logging.critical("Artifact %s not found", entry["filename"])
>                  sys.exit(22)
>
>              new.newfilename = entry["filename"]
>
> -            if "compressed" in entry and not self.nocompress:
> -                cmp = entry["compressed"]
> -                if cmp not in ("zlib", "zstd"):
> -                    logging.critical("Wrong compression algorithm: %s", cmp)
> -                    sys.exit(1)
> -
> -                new_path = os.path.join(self.temp.name, new.newfilename) + "." + cmp
> -                new.newfilename = new.newfilename + "." + cmp
> -                if cmp == "zlib":
> -                    cmd = [
> -                        "gzip",
> -                        "-f",
> -                        "-9",
> -                        "-n",
> -                        "-c",
> -                        "--rsyncable",
> -                        new.fullfilename,
> -                        ">",
> -                        new_path,
> -                    ]
> -                else:
> -                    cmd = [
> -                        "zstd",
> -                        "-z",
> -                        "-k",
> -                        "-T0",
> -                        "-f",
> -                        "-c",
> -                        new.fullfilename,
> -                        ">",
> -                        new_path,
> -                    ]
> -
> -                try:
> -                    subprocess.run(" ".join(cmd), shell=True, check=True, text=True)
> -                except subprocess.CalledProcessError:
> -                    logging.critical(
> -                        "Cannot compress %s with %s", entry["filename"], cmd
> -                    )
> -                    sys.exit(1)
> -
> -                new.fullfilename = new_path
> -
> -                if entry.get("type") == "ubivol":
> -                    entry.setdefault("properties", {}) \
> -                         .update({ "decompressed-size": str(new.getsize()) })
> +            if not self.nocompress and (cmp := entry.get("compressed")):
> +                self.process_compressed_entry(entry, cmp, new)
>              # compression cannot be used with delta, because it has own compressor
>              elif ("type" in entry) and entry["type"] == "delta":
>                  cmd = [
>                      "zck",
>                      "-u",
> --
> 2.47.3
>

Reviewed-by: Mark Jonas <toertel@gmail.com>
diff mbox series

Patch

diff --git a/swugenerator/generator.py b/swugenerator/generator.py
index 4a4ce8e..706b850 100644
--- a/swugenerator/generator.py
+++ b/swugenerator/generator.py
@@ -64,10 +64,40 @@  class SWUGenerator:
     def close(self):
         self.temp.cleanup()
         self.cpiofile.add_trailer()
         self.out.close()
 
+    def process_compressed_entry(self, entry, cmp, new):
+        cmds = {
+            "xz": ["xz", "-f", "-k", "-c"],
+            "zlib": ["gzip", "-f", "-9", "-n", "-c", "--rsyncable"],
+            "zstd": ["zstd", "-z", "-k", "-T0", "-f", "-c"],
+        }
+        cmd = cmds.get(cmp)
+        if not cmd:
+            logging.critical("Wrong compression algorithm: %s", cmp)
+            sys.exit(1)
+
+        new_path = os.path.join(self.temp.name, new.newfilename) + "." + cmp
+        new.newfilename = new.newfilename + "." + cmp
+
+        cmd.extend([new.fullfilename, ">", new_path])
+
+        try:
+            subprocess.run(" ".join(cmd), shell=True, check=True, text=True)
+        except subprocess.CalledProcessError:
+            logging.critical(
+                "Cannot compress %s with %s", entry["filename"], cmd
+            )
+            sys.exit(1)
+
+        new.fullfilename = new_path
+
+        if entry.get("type") == "ubivol":
+            entry.setdefault("properties", {}) \
+                 .update({ "decompressed-size": str(new.getsize()) })
+
     def process_entry(self, entry):
         if "filename" not in entry:
             return
         new = None
         for image in self.artifacts:
@@ -81,56 +111,12 @@  class SWUGenerator:
                 logging.critical("Artifact %s not found", entry["filename"])
                 sys.exit(22)
 
             new.newfilename = entry["filename"]
 
-            if "compressed" in entry and not self.nocompress:
-                cmp = entry["compressed"]
-                if cmp not in ("zlib", "zstd"):
-                    logging.critical("Wrong compression algorithm: %s", cmp)
-                    sys.exit(1)
-
-                new_path = os.path.join(self.temp.name, new.newfilename) + "." + cmp
-                new.newfilename = new.newfilename + "." + cmp
-                if cmp == "zlib":
-                    cmd = [
-                        "gzip",
-                        "-f",
-                        "-9",
-                        "-n",
-                        "-c",
-                        "--rsyncable",
-                        new.fullfilename,
-                        ">",
-                        new_path,
-                    ]
-                else:
-                    cmd = [
-                        "zstd",
-                        "-z",
-                        "-k",
-                        "-T0",
-                        "-f",
-                        "-c",
-                        new.fullfilename,
-                        ">",
-                        new_path,
-                    ]
-
-                try:
-                    subprocess.run(" ".join(cmd), shell=True, check=True, text=True)
-                except subprocess.CalledProcessError:
-                    logging.critical(
-                        "Cannot compress %s with %s", entry["filename"], cmd
-                    )
-                    sys.exit(1)
-
-                new.fullfilename = new_path
-
-                if entry.get("type") == "ubivol":
-                    entry.setdefault("properties", {}) \
-                         .update({ "decompressed-size": str(new.getsize()) })
+            if not self.nocompress and (cmp := entry.get("compressed")):
+                self.process_compressed_entry(entry, cmp, new)
             # compression cannot be used with delta, because it has own compressor
             elif ("type" in entry) and entry["type"] == "delta":
                 cmd = [
                     "zck",
                     "-u",