Patchwork [10/13] qmp: add QMP support for stream commands

login
register
mail settings
Submitter Stefan Hajnoczi
Date June 14, 2011, 6:18 p.m.
Message ID <1308075511-4745-11-git-send-email-stefanha@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/100415/
State New
Headers show

Comments

Stefan Hajnoczi - June 14, 2011, 6:18 p.m.
From: Anthony Liguori <aliguori@us.ibm.com>

For leaf images with copy-on-read semantics, the stream commands allow the user
to populate local blocks by manually streaming them from the backing image.
Once all blocks have been streamed, the dependency on the original backing
image can be removed.  Therefore, stream commands can be used to implement
post-copy live block migration and rapid deployment.

The block_stream command can be used to stream a single cluster, to
start streaming the entire device, and to cancel an active stream.  It
is easiest to allow the block_stream command to manage streaming for the
entire device but a managent tool could use single cluster mode to
throttle the I/O rate.

The command synopses are as follows:

block_stream
------------

Copy data from a backing file into a block device.

If the optional 'all' argument is true, this operation is performed in the
background until the entire backing file has been copied.  The status of
ongoing block_stream operations can be checked with query-block-stream.

Arguments:

- all:    copy entire device (json-bool, optional)
- stop:   stop copying to device (json-bool, optional)
- device: device name (json-string)

Return:

- device: device name (json-string)
- len:    size of the device, in bytes (json-int)
- offset: ending offset of the completed I/O, in bytes (json-int)

Examples:

-> { "execute": "block_stream", "arguments": { "device": "virtio0" } }
<- { "return":  { "device": "virtio0", "len": 10737418240, "offset": 512 } }

-> { "execute": "block_stream", "arguments": { "all": true, "device": "virtio0" } }
<- { "return": {} }

-> { "execute": "block_stream", "arguments": { "stop": true, "device": "virtio0" } }
<- { "return": {} }

query-block-stream
------------------

Show progress of ongoing block_stream operations.

Return a json-array of all operations.  If no operation is active then an empty
array will be returned.  Each operation is a json-object with the following
data:

- device: device name (json-string)
- len:    size of the device, in bytes (json-int)
- offset: ending offset of the completed I/O, in bytes (json-int)

Example:

-> { "execute": "query-block-stream" }
<- { "return":[
        { "device": "virtio0", "len": 10737418240, "offset": 709632}
     ]
   }

The naming of these commands is slightly odd but consistent with
existing block and query commands.  The block_stream command follows the
naming of block_resize, block_passwd, and others.  The
query-block-stream command follows the hyphenated naming of QMP query
commands like query-block.

Signed-off-by: Adam Litke <agl@us.ibm.com>
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 blockdev.c      |  270 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 blockdev.h      |    6 ++
 hmp-commands.hx |   18 ++++
 monitor.c       |   23 +++++
 monitor.h       |    1 +
 qerror.c        |    9 ++
 qerror.h        |    6 ++
 qmp-commands.hx |   68 ++++++++++++++
 8 files changed, 401 insertions(+), 0 deletions(-)

Patch

diff --git a/blockdev.c b/blockdev.c
index 9dbd2fa..ffbc45e 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -16,6 +16,7 @@ 
 #include "sysemu.h"
 #include "hw/qdev.h"
 #include "block_int.h"
+#include "qjson.h"
 
 static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives);
 
@@ -50,6 +51,197 @@  static const int if_max_devs[IF_COUNT] = {
     [IF_SCSI] = 7,
 };
 
+typedef struct StreamState {
+    MonitorCompletion *cb;
+    void *cb_opaque;
+    MonitorCompletion *cancel_cb;
+    void *cancel_opaque;
+    int64_t offset;             /* current position in block device */
+    bool running;               /* prevent re-entrancy */
+    bool once;                  /* only stream for one iteration */
+    BlockDriverState *bs;
+    QEMUTimer *timer;
+    uint64_t stream_delay;
+    QLIST_ENTRY(StreamState) list;
+} StreamState;
+
+static QLIST_HEAD(, StreamState) block_streams =
+    QLIST_HEAD_INITIALIZER(block_streams);
+
+static QObject *stream_get_qobject(StreamState *s)
+{
+    const char *name = bdrv_get_device_name(s->bs);
+    int64_t len = bdrv_getlength(s->bs);
+
+    return qobject_from_jsonf("{ 'device': %s, 'offset': %" PRId64 ", "
+                              "'len': %" PRId64 " }", name, s->offset, len);
+}
+
+static void stream_mon_event(StreamState *s, int ret)
+{
+    QObject *data = stream_get_qobject(s);
+
+    if (ret < 0) {
+        QError *qerror = qerror_from_args(QERR_STREAMING_ERROR,
+                                          strerror(-ret));
+
+        qdict_put(qobject_to_qdict(data), "error", qerror);
+    }
+
+    monitor_protocol_event(QEVENT_BLOCK_STREAM_COMPLETED, data);
+    qobject_decref(data);
+}
+
+static void stream_free(StreamState *s)
+{
+    QLIST_REMOVE(s, list);
+
+    if (s->cancel_cb) {
+        s->cancel_cb(s->cancel_opaque, NULL);
+    }
+
+    qemu_del_timer(s->timer);
+    qemu_free_timer(s->timer);
+    free(s);
+}
+
+static void stream_invoke_monitor_cb(StreamState *s)
+{
+    QObject *qobject = stream_get_qobject(s);
+
+    s->cb(s->cb_opaque, qobject);
+    qobject_decref(qobject);
+}
+
+static void stream_complete(StreamState *s, int ret)
+{
+    if (ret < 0) {
+        /* Error return if we have a callback, otherwise generate an event */
+        if (s->cb) {
+            qerror_report(QERR_STREAMING_ERROR, strerror(-ret));
+        } else {
+            stream_mon_event(s, ret);
+        }
+    } else {
+        /* Always generate event on successful completion */
+        stream_mon_event(s, ret);
+    }
+
+    if (s->cb) {
+        stream_invoke_monitor_cb(s);
+    }
+    stream_free(s);
+}
+
+static void stream_cb(void *opaque, int nb_sectors)
+{
+    StreamState *s = opaque;
+
+    if (nb_sectors < 0) {
+        stream_complete(s, nb_sectors);
+        return;
+    }
+
+    s->offset += nb_sectors * BDRV_SECTOR_SIZE;
+
+    if (s->offset == bdrv_getlength(s->bs)) {
+        bdrv_change_backing_file(s->bs, NULL, NULL);
+        stream_complete(s, 0);
+    } else if (s->once) {
+        assert(s->cb);
+        stream_invoke_monitor_cb(s);
+        s->running = false;
+    } else if (s->cancel_cb) {
+        stream_free(s);
+    } else {
+        qemu_mod_timer(s->timer, qemu_get_clock_ns(rt_clock) +
+                       s->stream_delay);
+    }
+}
+
+/* We can't call bdrv_aio_stream() directly from the callback because that
+ * makes qemu_aio_flush() not complete until the streaming is completed.
+ * By delaying with a timer, we give qemu_aio_flush() a chance to complete.
+ */
+static void stream_next_iteration(void *opaque)
+{
+    StreamState *s = opaque;
+
+    bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE, stream_cb, s);
+}
+
+static StreamState *stream_find(const char *device)
+{
+    StreamState *s;
+
+    QLIST_FOREACH(s, &block_streams, list) {
+        if (strcmp(bdrv_get_device_name(s->bs), device) == 0) {
+            return s;
+        }
+    }
+    return NULL;
+}
+
+static StreamState *stream_start(const char *device, bool once,
+                                 MonitorCompletion cb, void *opaque)
+{
+    StreamState *s;
+    BlockDriverAIOCB *acb;
+
+    s = stream_find(device);
+    if (s && s->running) {
+        qerror_report(QERR_DEVICE_IN_USE, device);
+        return NULL;
+    }
+
+    /* Create a new stream, if necessary */
+    if (!s) {
+        BlockDriverState *bs = bdrv_find(device);
+        if (!bs) {
+            qerror_report(QERR_DEVICE_NOT_FOUND, device);
+            return NULL;
+        }
+
+        s = qemu_mallocz(sizeof(*s));
+        s->bs = bs;
+        s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s);
+        QLIST_INSERT_HEAD(&block_streams, s, list);
+    }
+
+    s->running = true;
+    s->once = once;
+    s->cb = once ? cb : NULL;
+    s->cb_opaque = opaque;
+    s->stream_delay = 0; /* FIXME make this configurable */
+
+    acb = bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE,
+                                stream_cb, s);
+    if (acb == NULL) {
+        stream_free(s);
+        qerror_report(QERR_NOT_SUPPORTED);
+        return NULL;
+    }
+    return s;
+}
+
+static int stream_stop(const char *device, MonitorCompletion *cb, void *opaque)
+{
+    StreamState *s = stream_find(device);
+
+    if (!s || s->once) {
+        qerror_report(QERR_DEVICE_NOT_ACTIVE, device);
+        return -1;
+    }
+    if (s->cancel_cb) {
+        qerror_report(QERR_DEVICE_IN_USE, device);
+        return -1;
+    }
+
+    s->cancel_cb = cb;
+    s->cancel_opaque = opaque;
+    return 0;
+}
+
 /*
  * We automatically delete the drive when a device using it gets
  * unplugged.  Questionable feature, but we can't just drop it.
@@ -654,6 +846,84 @@  out:
     return ret;
 }
 
+void monitor_print_block_stream(Monitor *mon, const QObject *data)
+{
+    QDict *stream;
+
+    assert(data);
+    stream = qobject_to_qdict(data);
+
+    monitor_printf(mon, "Streaming device %s: Completed %" PRId64 " of %"
+                   PRId64 " bytes\n", qdict_get_str(stream, "device"),
+                   qdict_get_int(stream, "offset"),
+                   qdict_get_int(stream, "len"));
+}
+
+void monitor_print_block_stream_info(Monitor *mon, const QObject *data)
+{
+    QList *streams;
+    QListEntry *entry;
+
+    assert(data);
+    streams = qobject_to_qlist(data);
+    assert(streams); /* we pass a list of stream objects to ourselves */
+
+    if (qlist_empty(streams)) {
+        monitor_printf(mon, "No active stream\n");
+        return;
+    }
+
+    QLIST_FOREACH_ENTRY(streams, entry) {
+        monitor_print_block_stream(mon, entry->value);
+    }
+}
+
+int do_block_stream_info(Monitor *mon, MonitorCompletion *cb, void *opaque)
+{
+    QList *streams = qlist_new();
+    StreamState *s;
+
+    QLIST_FOREACH(s, &block_streams, list) {
+        if (!s->once) {
+            qlist_append_obj(streams, stream_get_qobject(s));
+        }
+    }
+
+    cb(opaque, QOBJECT(streams));
+    QDECREF(streams);
+    return 0;
+}
+
+int do_block_stream(Monitor *mon, const QDict *params,
+                    MonitorCompletion cb, void *opaque)
+{
+    int all = qdict_get_try_bool(params, "all", false);
+    int stop = qdict_get_try_bool(params, "stop", false);
+    const char *device = qdict_get_str(params, "device");
+    StreamState *s;
+
+    if (all && stop) {
+        qerror_report(QERR_INVALID_PARAMETER, "stop' not allowed with 'all");
+        return -1;
+    }
+
+    if (stop) {
+        return stream_stop(device, cb, opaque);
+    } else if (all) {
+        s = stream_start(device, false, NULL, NULL);
+        if (!s) {
+            return -1;
+        }
+        cb(opaque, NULL);
+    } else {
+        s = stream_start(device, true, cb, opaque);
+        if (!s) {
+            return -1;
+        }
+    }
+    return 0;
+}
+
 static int eject_device(Monitor *mon, BlockDriverState *bs, int force)
 {
     if (!force) {
diff --git a/blockdev.h b/blockdev.h
index 3587786..e246c81 100644
--- a/blockdev.h
+++ b/blockdev.h
@@ -12,6 +12,7 @@ 
 
 #include "block.h"
 #include "qemu-queue.h"
+#include "monitor.h"
 
 void blockdev_mark_auto_del(BlockDriverState *bs);
 void blockdev_auto_del(BlockDriverState *bs);
@@ -65,5 +66,10 @@  int do_change_block(Monitor *mon, const char *device,
 int do_drive_del(Monitor *mon, const QDict *qdict, QObject **ret_data);
 int do_snapshot_blkdev(Monitor *mon, const QDict *qdict, QObject **ret_data);
 int do_block_resize(Monitor *mon, const QDict *qdict, QObject **ret_data);
+void monitor_print_block_stream(Monitor *mon, const QObject *data);
+void monitor_print_block_stream_info(Monitor *mon, const QObject *data);
+int do_block_stream_info(Monitor *mon, MonitorCompletion *cb, void *opaque);
+int do_block_stream(Monitor *mon, const QDict *params,
+                    MonitorCompletion cb, void *opaque);
 
 #endif
diff --git a/hmp-commands.hx b/hmp-commands.hx
index f6cc724..e78a1f8 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -38,6 +38,22 @@  Commit changes to the disk images (if -snapshot is used) or backing files.
 ETEXI
 
     {
+        .name       = "block_stream",
+        .args_type  = "all:-a,stop:-s,device:B",
+        .params     = "[-a] [-s] device",
+        .help       = "Stream data to a block device",
+        .user_print = monitor_print_block_stream,
+        .mhandler.cmd_async = do_block_stream,
+        .flags      = MONITOR_CMD_ASYNC,
+    },
+
+STEXI
+@item block_stream
+@findex block_stream
+Copy data from a backing file into a block device.
+ETEXI
+
+    {
         .name       = "q|quit",
         .args_type  = "",
         .params     = "",
@@ -1354,6 +1370,8 @@  show device tree
 show qdev device model list
 @item info roms
 show roms
+@item info block-stream
+show progress of ongoing block_stream operations
 @end table
 ETEXI
 
diff --git a/monitor.c b/monitor.c
index 6af6a4d..f9ee743 100644
--- a/monitor.c
+++ b/monitor.c
@@ -468,6 +468,9 @@  void monitor_protocol_event(MonitorEvent event, QObject *data)
         case QEVENT_SPICE_DISCONNECTED:
             event_name = "SPICE_DISCONNECTED";
             break;
+        case QEVENT_BLOCK_STREAM_COMPLETED:
+            event_name = "BLOCK_STREAM_COMPLETED";
+            break;
         default:
             abort();
             break;
@@ -3105,6 +3108,16 @@  static const mon_cmd_t info_cmds[] = {
         .mhandler.info = do_info_trace_events,
     },
 #endif
+     {
+        .name       = "block-stream",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show block streaming status",
+        .user_print = monitor_print_block_stream_info,
+        .mhandler.info_async = do_block_stream_info,
+        .flags      = MONITOR_CMD_ASYNC,
+
+    },
     {
         .name       = NULL,
     },
@@ -3247,6 +3260,16 @@  static const mon_cmd_t qmp_query_cmds[] = {
         .mhandler.info_async = do_info_balloon,
         .flags      = MONITOR_CMD_ASYNC,
     },
+    {
+        .name       = "block-stream",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show block streaming status",
+        .user_print = monitor_print_block_stream_info,
+        .mhandler.info_async = do_block_stream_info,
+        .flags      = MONITOR_CMD_ASYNC,
+
+    },
     { /* NULL */ },
 };
 
diff --git a/monitor.h b/monitor.h
index 4f2d328..7b916c7 100644
--- a/monitor.h
+++ b/monitor.h
@@ -35,6 +35,7 @@  typedef enum MonitorEvent {
     QEVENT_SPICE_CONNECTED,
     QEVENT_SPICE_INITIALIZED,
     QEVENT_SPICE_DISCONNECTED,
+    QEVENT_BLOCK_STREAM_COMPLETED,
     QEVENT_MAX,
 } MonitorEvent;
 
diff --git a/qerror.c b/qerror.c
index 64b41ca..83164c1 100644
--- a/qerror.c
+++ b/qerror.c
@@ -157,6 +157,10 @@  static const QErrorStringTable qerror_table[] = {
         .desc      = "No '%(bus)' bus found for device '%(device)'",
     },
     {
+        .error_fmt = QERR_NOT_SUPPORTED,
+        .desc      = "Operation is not supported",
+    },
+    {
         .error_fmt = QERR_OPEN_FILE_FAILED,
         .desc      = "Could not open '%(filename)'",
     },
@@ -213,6 +217,11 @@  static const QErrorStringTable qerror_table[] = {
         .error_fmt = QERR_VNC_SERVER_FAILED,
         .desc      = "Could not start VNC server on %(target)",
     },
+    {
+        .error_fmt = QERR_STREAMING_ERROR,
+        .desc      = "An error occurred during streaming: %(msg)",
+    },
+
     {}
 };
 
diff --git a/qerror.h b/qerror.h
index 173c84f..ad80982 100644
--- a/qerror.h
+++ b/qerror.h
@@ -142,6 +142,9 @@  QError *qobject_to_qerror(const QObject *obj);
 #define QERR_NO_BUS_FOR_DEVICE \
     "{ 'class': 'NoBusForDevice', 'data': { 'device': %s, 'bus': %s } }"
 
+#define QERR_NOT_SUPPORTED \
+    "{ 'class': 'NotSupported', 'data': {} }"
+
 #define QERR_OPEN_FILE_FAILED \
     "{ 'class': 'OpenFileFailed', 'data': { 'filename': %s } }"
 
@@ -187,4 +190,7 @@  QError *qobject_to_qerror(const QObject *obj);
 #define QERR_FEATURE_DISABLED \
     "{ 'class': 'FeatureDisabled', 'data': { 'name': %s } }"
 
+#define QERR_STREAMING_ERROR \
+    "{ 'class': 'StreamingError', 'data': { 'msg': %s } }"
+
 #endif /* QERROR_H */
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 92c5c3a..d8966bf 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -885,6 +885,51 @@  Example:
 EQMP
 
     {
+        .name       = "block_stream",
+        .args_type  = "all:-a,stop:-s,device:B",
+        .params     = "[-a] [-s] device",
+        .help       = "Copy data from a backing file into a block device",
+        .user_print = monitor_print_block_stream,
+        .mhandler.cmd_async = do_block_stream,
+        .flags      = MONITOR_CMD_ASYNC,
+    },
+
+SQMP
+block_stream
+------------
+
+Copy data from a backing file into a block device.
+
+If the optional 'all' argument is true, this operation is performed in the
+background until the entire backing file has been copied.  The status of
+ongoing block_stream operations can be checked with query-block-stream.
+
+Arguments:
+
+- all:    copy entire device (json-bool, optional)
+- stop:   stop copying to device (json-bool, optional)
+- device: device name (json-string)
+
+Return:
+
+- device: device name (json-string)
+- len:    size of the device, in bytes (json-int)
+- offset: ending offset of the completed I/O, in bytes (json-int)
+
+Examples:
+
+-> { "execute": "block_stream", "arguments": { "device": "virtio0" } }
+<- { "return":  { "device": "virtio0", "len": 10737418240, "offset": 512 } }
+
+-> { "execute": "block_stream", "arguments": { "all": true, "device": "virtio0" } }
+<- { "return": {} }
+
+-> { "execute": "block_stream", "arguments": { "stop": true, "device": "virtio0" } }
+<- { "return": {} }
+
+EQMP
+
+    {
         .name       = "qmp_capabilities",
         .args_type  = "",
         .params     = "",
@@ -1805,3 +1850,26 @@  Example:
 
 EQMP
 
+SQMP
+query-block-stream
+------------------
+
+Show progress of ongoing block_stream operations.
+
+Return a json-array of all operations.  If no operation is active then an empty
+array will be returned.  Each operation is a json-object with the following
+data:
+
+- device: device name (json-string)
+- len:    size of the device, in bytes (json-int)
+- offset: ending offset of the completed I/O, in bytes (json-int)
+
+Example:
+
+-> { "execute": "query-block-stream" }
+<- { "return":[
+        { "device": "virtio0", "len": 10737418240, "offset": 709632}
+     ]
+   }
+
+EQMP