diff mbox

[1/2] qemu-iotests: qcow2.py

Message ID 1333470742-20405-2-git-send-email-kwolf@redhat.com
State New
Headers show

Commit Message

Kevin Wolf April 3, 2012, 4:32 p.m. UTC
This adds a tool that is meant to inspect and edit qcow2 files in a
low-level way, that wouldn't be possible with qemu-img/io, for example
by adding yet unknown extensions or flags. This way we can test whether
qemu deals properly with future backwards compatible extensions.

For now, let's start with the image header and header extensions.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 tests/qemu-iotests/qcow2.py |  207 +++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 207 insertions(+), 0 deletions(-)
 create mode 100755 tests/qemu-iotests/qcow2.py
diff mbox

Patch

diff --git a/tests/qemu-iotests/qcow2.py b/tests/qemu-iotests/qcow2.py
new file mode 100755
index 0000000..bfb47e8
--- /dev/null
+++ b/tests/qemu-iotests/qcow2.py
@@ -0,0 +1,207 @@ 
+#!/usr/bin/env python
+
+import sys
+import struct
+import string
+
+class QcowHeaderExtension:
+
+    def __init__(self, magic, length, data):
+        self.magic  = magic
+        self.length = length
+        self.data   = data
+
+    @classmethod
+    def create(cls, magic, data):
+        return QcowHeaderExtension(magic, len(data), data)
+
+class QcowHeader:
+
+    uint32_t = 'I'
+    uint64_t = 'Q'
+
+    fields = [
+        # Version 2 header fields
+        [ uint32_t, '%#x',  'magic' ],
+        [ uint32_t, '%d',   'version' ],
+        [ uint64_t, '%#x',  'backing_file_offset' ],
+        [ uint32_t, '%#x',  'backing_file_size' ],
+        [ uint32_t, '%d',   'cluster_bits' ],
+        [ uint64_t, '%d',   'size' ],
+        [ uint32_t, '%d',   'crypt_method' ],
+        [ uint32_t, '%d',   'l1_size' ],
+        [ uint64_t, '%#x',  'l1_table_offset' ],
+        [ uint64_t, '%#x',  'refcount_table_offset' ],
+        [ uint32_t, '%d',   'refcount_table_clusters' ],
+        [ uint32_t, '%d',   'nb_snapshots' ],
+        [ uint64_t, '%#x',  'snapshot_offset' ],
+    ];
+
+    fmt = '>' + ''.join(field[0] for field in fields)
+
+    def __init__(self, fd):
+
+        buf_size = struct.calcsize(QcowHeader.fmt)
+
+        fd.seek(0)
+        buf = fd.read(buf_size)
+
+        header = struct.unpack(QcowHeader.fmt, buf)
+        self.__dict__ = dict((field[2], header[i])
+            for i, field in enumerate(QcowHeader.fields))
+
+        self.cluster_size = 1 << self.cluster_bits
+
+        fd.seek(self.get_header_length())
+        self.load_extensions(fd)
+
+        if self.backing_file_offset:
+            fd.seek(self.backing_file_offset)
+            self.backing_file = fd.read(self.backing_file_size)
+        else:
+            self.backing_file = None
+
+    def get_header_length(self):
+        if self.version == 2:
+            return 72
+        else:
+            raise Exception("version != 2 not supported")
+
+    def load_extensions(self, fd):
+        self.extensions = []
+
+        if self.backing_file_offset != 0:
+            end = min(self.cluster_size, self.backing_file_offset)
+        else:
+            end = self.cluster_size
+
+        while fd.tell() < end:
+            (magic, length) = struct.unpack('>II', fd.read(8))
+            if magic == 0:
+                break
+            else:
+                padded = (length + 7) & ~7
+                data = fd.read(padded)
+                self.extensions.append(QcowHeaderExtension(magic, length, data))
+
+    def update_extensions(self, fd):
+
+        fd.seek(self.get_header_length())
+        extensions = self.extensions
+        extensions.append(QcowHeaderExtension(0, 0, ""))
+        for ex in extensions:
+            buf = struct.pack('>II', ex.magic, ex.length)
+            fd.write(buf)
+            fd.write(ex.data)
+
+        if self.backing_file != None:
+            self.backing_file_offset = fd.tell()
+            fd.write(self.backing_file)
+
+        if fd.tell() > self.cluster_size:
+            raise Exception("I think I just broke the image...")
+
+
+    def update(self, fd):
+        header_bytes = self.get_header_length()
+
+        self.update_extensions(fd)
+
+        fd.seek(0)
+        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
+        buf = struct.pack(QcowHeader.fmt, *header)
+        buf = buf[0:header_bytes-1]
+        fd.write(buf)
+
+    def dump(self):
+        for f in QcowHeader.fields:
+            print "%-25s" % f[2], f[1] % self.__dict__[f[2]]
+        print ""
+
+    def dump_extensions(self):
+        for ex in self.extensions:
+
+            data = ex.data[:ex.length]
+            if all(c in string.printable for c in data):
+                data = "'%s'" % data
+            else:
+                data = "<binary>"
+
+            print "Header extension:"
+            print "%-25s %#x" % ("magic", ex.magic)
+            print "%-25s %d" % ("length", ex.length)
+            print "%-25s %s" % ("data", data)
+            print ""
+
+
+def cmd_dump_header(fd):
+    h = QcowHeader(fd)
+    h.dump()
+    h.dump_extensions()
+
+def cmd_add_header_ext(fd, magic, data):
+    try:
+        magic = int(magic, 0)
+    except:
+        print "'%s' is not a valid magic number" % magic
+        sys.exit(1)
+
+    h = QcowHeader(fd)
+    h.extensions.append(QcowHeaderExtension.create(magic, data))
+    h.update(fd)
+
+def cmd_del_header_ext(fd, magic):
+    try:
+        magic = int(magic, 0)
+    except:
+        print "'%s' is not a valid magic number" % magic
+        sys.exit(1)
+
+    h = QcowHeader(fd)
+    found = False
+
+    for ex in h.extensions:
+        if ex.magic == magic:
+            found = True
+            h.extensions.remove(ex)
+
+    if not found:
+        print "No such header extension"
+        return
+
+    h.update(fd)
+
+cmds = [
+    [ 'dump-header',    cmd_dump_header,    0, 'Dump image header and header extensions' ],
+    [ 'add-header-ext', cmd_add_header_ext, 2, 'Add a header extension' ],
+    [ 'del-header-ext', cmd_del_header_ext, 1, 'Delete a header extension' ],
+]
+
+def main(filename, cmd, args):
+    fd = open(filename, "r+b")
+    try:
+        for name, handler, num_args, desc in cmds:
+            if name != cmd:
+                continue
+            elif len(args) != num_args:
+                usage()
+                return
+            else:
+                handler(fd, *args)
+                return
+        print "Unknown command '%s'" % cmd
+    finally:
+        fd.close()
+
+def usage():
+    print "Usage: %s <file> <cmd> [<arg>, ...]" % sys.argv[0]
+    print ""
+    print "Supported commands:"
+    for name, handler, num_args, desc in cmds:
+        print "    %-20s - %s" % (name, desc)
+
+if len(sys.argv) < 3:
+    usage()
+    sys.exit(1)
+
+main(sys.argv[1], sys.argv[2], sys.argv[3:])