Patchwork [2/2] QMP: add server mode to QEMUMonitorProtocol

login
register
mail settings
Submitter Stefan Hajnoczi
Date May 26, 2011, 8:16 a.m.
Message ID <1306397775-22418-1-git-send-email-stefanha@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/97506/
State New
Headers show

Comments

Stefan Hajnoczi - May 26, 2011, 8:16 a.m.
The test-stream.py script performs several automated tests of the image
streaming QMP interface, including exercising both the incremental and
background streaming modes.

This should probably be ported to KVM-Autotest rather than reinventing
the wheel.

Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
This is an example of how I am using qmp.py, especially the new
QEMUMonitorProtocol(server=True) and QEMUMonitorProtocol.get_events(wait=True)
arguments.

I am not submitting this for merge.  Like the commit message says, this should
probably be ported to KVM-Autotest because it duplicates basic QEMU testing
infrastructure.

 test-stream.py |  145 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 145 insertions(+), 0 deletions(-)
 create mode 100644 test-stream.py

Patch

diff --git a/test-stream.py b/test-stream.py
new file mode 100644
index 0000000..eb9d9d6
--- /dev/null
+++ b/test-stream.py
@@ -0,0 +1,145 @@ 
+#!/usr/bin/env python
+import unittest
+import subprocess
+import re
+import os
+import sys; sys.path.append('QMP/')
+import qmp
+
+def qemu_img(*args):
+    devnull = open('/dev/null', 'r+')
+    return subprocess.call(['./qemu-img'] + list(args), stdin=devnull, stdout=devnull)
+
+class VM(object):
+    def __init__(self):
+        self._monitor_path = '/tmp/qemu-mon.%d' % os.getpid()
+        self._qemu_log_path = '/tmp/qemu-log.%d' % os.getpid()
+        self._args = ['x86_64-softmmu/qemu-system-x86_64',
+                      '-chardev', 'socket,id=mon,path=' + self._monitor_path,
+                      '-mon', 'chardev=mon,mode=control',
+                      '-nographic']
+        self._num_drives = 0
+
+    def add_drive(self, path):
+        self._args.append('-drive')
+        self._args.append('if=virtio,cache=none,file=%s,id=drive%d' % (path, self._num_drives))
+        self._num_drives += 1
+        return self
+
+    def launch(self):
+        devnull = open('/dev/null', 'rb')
+        qemulog = open(self._qemu_log_path, 'wb')
+        self._qmp = qmp.QEMUMonitorProtocol(self._monitor_path, server=True)
+        self._popen = subprocess.Popen(self._args, stdin=devnull, stdout=qemulog,
+                                       stderr=subprocess.STDOUT)
+        self._qmp.accept()
+
+    def shutdown(self):
+        self._qmp.cmd('quit')
+        self._popen.wait()
+        os.remove(self._monitor_path)
+        os.remove(self._qemu_log_path)
+
+    def qmp(self, cmd, **args):
+        return self._qmp.cmd(cmd, args=args)
+
+    def get_qmp_events(self, wait=False):
+        events = self._qmp.get_events(wait=wait)
+        self._qmp.clear_events()
+        return events
+
+index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
+
+class QMPTestCase(unittest.TestCase):
+    def dictpath(self, d, path):
+        """Traverse a path in a nested dict"""
+        for component in path.split('/'):
+            m = index_re.match(component)
+            if m:
+                component, idx = m.groups()
+                idx = int(idx)
+
+            if not isinstance(d, dict) or component not in d:
+                self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
+            d = d[component]
+
+            if m:
+                if not isinstance(d, list):
+                    self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
+                try:
+                    d = d[idx]
+                except IndexError:
+                    self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
+        return d
+
+    def assert_qmp(self, d, path, value):
+        result = self.dictpath(d, path)
+        self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
+
+    def assert_no_active_streams(self):
+        result = self.vm.qmp('query-block-stream')
+        self.assert_qmp(result, 'return', [])
+
+class TestSingleDrive(QMPTestCase):
+    image_len = 1 * 1024 * 1024
+
+    def setUp(self):
+        qemu_img('create', 'backing.img', str(TestSingleDrive.image_len))
+        qemu_img('create', '-f', 'qed', '-o', 'backing_file=backing.img,copy_on_read=on', 'test.qed')
+        self.vm = VM().add_drive('test.qed')
+        self.vm.launch()
+
+    def tearDown(self):
+        self.vm.shutdown()
+        os.remove('test.qed')
+        os.remove('backing.img')
+
+    def test_stream_incremental(self):
+        self.assert_no_active_streams()
+
+        completed = False
+        while not completed:
+            result = self.vm.qmp('block_stream', device='drive0')
+            self.assert_qmp(result, 'return/device', 'drive0')
+            self.assert_qmp(result, 'return/len', self.image_len)
+
+            for event in self.vm.get_qmp_events():
+                if event['event'] == 'BLOCK_STREAM_COMPLETED':
+                    self.assert_qmp(event, 'data/device', 'drive0')
+                    self.assert_qmp(event, 'data/offset', self.image_len)
+                    self.assert_qmp(event, 'data/len', self.image_len)
+                    completed = True
+
+            # Compare result against query-block-stream output
+            if not completed:
+                query = self.vm.qmp('query-block-stream')
+                self.assert_qmp(query, 'return[0]/device', 'drive0')
+                self.assert_qmp(query, 'return[0]/offset',
+                                self.dictpath(result, 'return/offset'))
+                self.assert_qmp(query, 'return[0]/len', self.image_len)
+
+        self.assert_no_active_streams()
+
+    def test_stream_all(self):
+        self.assert_no_active_streams()
+
+        result = self.vm.qmp('block_stream', device='drive0', all=True)
+        self.assert_qmp(result, 'return', {})
+
+        completed = False
+        while not completed:
+            for event in self.vm.get_qmp_events(wait=True):
+                if event['event'] == 'BLOCK_STREAM_COMPLETED':
+                    self.assert_qmp(event, 'data/device', 'drive0')
+                    self.assert_qmp(event, 'data/offset', self.image_len)
+                    self.assert_qmp(event, 'data/len', self.image_len)
+                    completed = True
+
+        self.assert_no_active_streams()
+
+    def test_device_not_found(self):
+        result = self.vm.qmp('block_stream', device='nonexistent')
+        self.assert_qmp(result, 'error/class', 'DeviceNotFound')
+
+if __name__ == '__main__':
+    unittest.main()