diff mbox

[2/8] qmp: Add QMP support for stream commands

Message ID 1303910855-28999-3-git-send-email-stefanha@linux.vnet.ibm.com
State New
Headers show

Commit Message

Stefan Hajnoczi April 27, 2011, 1:27 p.m. UTC
From: Anthony Liguori <aliguori@us.ibm.com>

For leaf images with copy on read semantics, the stream commands allow the user
to populate local blocks by manually streaming them from the backing image.
Once all blocks have been streamed, the dependency on the original backing
image can be removed.  Therefore, stream commands can be used to implement
post-copy live block migration and rapid deployment.

The stream command can be used to stream a single sector, to start streaming
the entire device, and to cancel an active stream.  It is easiest to allow the
stream command to manage streaming for the entire device but a managent tool
could use single sector mode to throttle the I/O rate.  When a single sector is
streamed, the command returns an offset that can be used for a subsequent call.

The command synopses are as follows:

stream
------

Stream data to a block device.

Arguments:

- all:    Stream the entire device (json-bool, optional)
- stop:   Stop streaming to the device (json-bool, optional)
- device: device name (json-string)
- offset: device offset in bytes (json-int, optional)

Return:

- device: The device name being streamed
- len:    The size of the device (in bytes)
- offset: The ending offset of the completed I/O (in bytes)

Examples:

-> { "execute": "stream", "arguments": { "device": "virtio0", "offset": 0 } }
<- { "return":  { "device": "virtio0", "len": 10737418240, "offset": 512 } }

-> { "execute": "stream", "arguments": { "all": true, "device": "virtio0" } }
<- { "return": {} }

-> { "execute": "stream", "arguments": { "stop": true, "device": "virtio0" } }
<- { "return": {} }

query-stream
------------

Show progress of ongoing stream operation

Return a json-array of all streams.  If no stream is active then an empty array
will be returned.  Each stream is a json-object with the following data:

- device: The device name being streamed
- len:    The size of the device (in bytes)
- offset: The ending offset of the completed I/O (in bytes)

Example:

-> { "execute": "query-stream" }
<- { "return":[
        { "device": "virtio0", "len": 10737418240, "offset": 709632}
     ]
   }

Signed-off-by: Adam Litke <agl@us.ibm.com>
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 blockdev.c      |  212 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 blockdev.h      |    5 ++
 hmp-commands.hx |   18 +++++
 monitor.c       |   20 +++++
 qerror.c        |    9 +++
 qerror.h        |    6 ++
 qmp-commands.hx |   64 +++++++++++++++++
 7 files changed, 334 insertions(+), 0 deletions(-)

Comments

Kevin Wolf April 29, 2011, 12:09 p.m. UTC | #1
Am 27.04.2011 15:27, schrieb Stefan Hajnoczi:
> From: Anthony Liguori <aliguori@us.ibm.com>
> 
> For leaf images with copy on read semantics, the stream commands allow the user
> to populate local blocks by manually streaming them from the backing image.
> Once all blocks have been streamed, the dependency on the original backing
> image can be removed.  Therefore, stream commands can be used to implement
> post-copy live block migration and rapid deployment.
> 
> The stream command can be used to stream a single sector, to start streaming
> the entire device, and to cancel an active stream.  It is easiest to allow the
> stream command to manage streaming for the entire device but a managent tool
> could use single sector mode to throttle the I/O rate.  When a single sector is
> streamed, the command returns an offset that can be used for a subsequent call.

You mean literally single sectors? You're not interested in completing
the job in finite time, are you? ;-)

I would suggest adding a length argument for the all=false case, so that
management tools can choose more reasonable sizes.

Kevin
Stefan Hajnoczi May 6, 2011, 1:23 p.m. UTC | #2
On Fri, Apr 29, 2011 at 1:09 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 27.04.2011 15:27, schrieb Stefan Hajnoczi:
>> From: Anthony Liguori <aliguori@us.ibm.com>
>>
>> For leaf images with copy on read semantics, the stream commands allow the user
>> to populate local blocks by manually streaming them from the backing image.
>> Once all blocks have been streamed, the dependency on the original backing
>> image can be removed.  Therefore, stream commands can be used to implement
>> post-copy live block migration and rapid deployment.
>>
>> The stream command can be used to stream a single sector, to start streaming
>> the entire device, and to cancel an active stream.  It is easiest to allow the
>> stream command to manage streaming for the entire device but a managent tool
>> could use single sector mode to throttle the I/O rate.  When a single sector is
>> streamed, the command returns an offset that can be used for a subsequent call.
>
> You mean literally single sectors? You're not interested in completing
> the job in finite time, are you? ;-)
>
> I would suggest adding a length argument for the all=false case, so that
> management tools can choose more reasonable sizes.

Discussion on libvir-list suggests the same thing.  Let's take a
nb_sectors where 0=all.

Stefan
diff mbox

Patch

diff --git a/blockdev.c b/blockdev.c
index 5429621..99c0726 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -16,6 +16,7 @@ 
 #include "sysemu.h"
 #include "hw/qdev.h"
 #include "block_int.h"
+#include "qjson.h"
 
 static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives);
 
@@ -50,6 +51,144 @@  static const int if_max_devs[IF_COUNT] = {
     [IF_SCSI] = 7,
 };
 
+typedef struct StreamState {
+    MonitorCompletion *cb;
+    void *cb_opaque;
+    int64_t offset;
+    bool once;
+    bool cancel;
+    BlockDriverState *bs;
+    QEMUTimer *timer;
+    uint64_t stream_delay;
+} StreamState;
+
+static StreamState global_stream;
+static StreamState *active_stream;
+
+static QObject *stream_get_qobject(StreamState *s)
+{
+    const char *name = bdrv_get_device_name(s->bs);
+    int64_t len = bdrv_getlength(s->bs);
+
+    return qobject_from_jsonf("{ 'device': %s, 'offset': %" PRId64 ", "
+                              "'len': %" PRId64 " }", name, s->offset, len);
+}
+
+static void do_stream_cb(void *opaque, int ret)
+{
+    StreamState *s = opaque;
+
+    if (ret < 0) {
+        qerror_report(QERR_STREAMING_ERROR, strerror(-ret));
+        goto out;
+    }
+
+    s->offset += ret * BDRV_SECTOR_SIZE;
+
+    if (!s->once) {
+        if (s->offset == bdrv_getlength(s->bs)) {
+            bdrv_change_backing_file(s->bs, NULL, NULL);
+        } else if (!s->cancel) {
+            qemu_mod_timer(s->timer,
+                           qemu_get_clock_ns(rt_clock) + s->stream_delay);
+            return;
+        }
+    }
+
+out:
+    if (s->cb) {
+        s->cb(s->cb_opaque, stream_get_qobject(s));
+    }
+    qemu_del_timer(s->timer);
+    qemu_free_timer(s->timer);
+    active_stream = NULL;
+}
+
+/* We can't call bdrv_aio_stream() directly from the callback because that
+ * makes qemu_aio_flush() not complete until the streaming is completed.
+ * By delaying with a timer, we give qemu_aio_flush() a chance to complete.
+ */
+static void stream_next_iteration(void *opaque)
+{
+    StreamState *s = opaque;
+
+    bdrv_aio_stream(s->bs, s->offset / BDRV_SECTOR_SIZE, do_stream_cb, s);
+}
+
+static StreamState *stream_start(const char *device, int64_t offset, bool once,
+                                 MonitorCompletion cb, void *opaque)
+{
+    BlockDriverState *bs;
+    StreamState *s = &global_stream;
+    BlockDriverAIOCB *acb;
+
+    if (active_stream) {
+        qerror_report(QERR_DEVICE_IN_USE,
+                      bdrv_get_device_name(active_stream->bs));
+        return NULL;
+    }
+
+    bs = bdrv_find(device);
+    if (!bs) {
+        qerror_report(QERR_DEVICE_NOT_FOUND, device);
+        return NULL;
+    }
+
+    if (offset % BDRV_SECTOR_SIZE) {
+        qerror_report(QERR_INVALID_PARAMETER_VALUE,
+                      "offset", "a sector-aligned offset");
+        return NULL;
+    }
+
+    if (offset >= bdrv_getlength(bs)) {
+        qerror_report(QERR_INVALID_PARAMETER_VALUE,
+                      "offset", "an offset less than device length");
+        return NULL;
+    }
+
+    memset(s, 0, sizeof(*s));
+    if (once) {
+        s->cb = cb;
+        s->cb_opaque = opaque;
+        s->once = true;
+    }
+    s->offset = offset;
+    s->bs = bs;
+    s->stream_delay = 0; /* FIXME make this configurable */
+    s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s);
+
+    acb = bdrv_aio_stream(bs, offset / BDRV_SECTOR_SIZE, do_stream_cb, s);
+    if (acb == NULL) {
+        qemu_free_timer(s->timer);
+        qerror_report(QERR_NOT_SUPPORTED);
+        return NULL;
+    }
+
+    active_stream = s;
+
+    return s;
+}
+
+static int stream_stop(const char *device, MonitorCompletion *cb, void *opaque)
+{
+    if (!active_stream) {
+        qerror_report(QERR_STREAMING_ERROR, strerror(ESRCH));
+        return -1;
+    }
+
+    /*
+     * In case we want to support simultaneous streams in the future,
+     * require a device name to be specified when stopping a stream.
+     */
+    if (strcmp(device, bdrv_get_device_name(active_stream->bs))) {
+        qerror_report(QERR_DEVICE_NOT_FOUND, device);
+        return -1;
+    }
+
+    active_stream->cancel = true;
+    return 0;
+}
+
 /*
  * We automatically delete the drive when a device using it gets
  * unplugged.  Questionable feature, but we can't just drop it.
@@ -647,6 +786,79 @@  out:
     return ret;
 }
 
+void monitor_print_stream(Monitor *mon, const QObject *data)
+{
+    QList *streams;
+
+    if (data == NULL) {
+        return;
+    }
+
+    streams = qobject_to_qlist(data);
+    if (streams && !qlist_empty(streams)) {
+        /* Only print a single stream until multi-stream support is added */
+        QDict *qdict = qobject_to_qdict(qlist_peek(streams));
+        monitor_printf(mon, "Streaming device %s: Completed %" PRId64 " of %"
+                       PRId64 " bytes\n", qdict_get_str(qdict, "device"),
+                       qdict_get_int(qdict, "offset"),
+                       qdict_get_int(qdict, "len"));
+    } else {
+        monitor_printf(mon, "No active stream\n");
+    }
+}
+
+int do_stream_info(Monitor *mon, MonitorCompletion *cb, void *opaque)
+{
+    QList *streams = qlist_new();
+
+    if (active_stream) {
+        qlist_append_obj(streams, stream_get_qobject(active_stream));
+    }
+
+    cb(opaque, QOBJECT(streams));
+    return 0;
+}
+
+int do_stream(Monitor *mon, const QDict *params,
+              MonitorCompletion cb, void *opaque)
+{
+    int all = qdict_get_try_bool(params, "all", false);
+    int stop = qdict_get_try_bool(params, "stop", false);
+    const char *device = qdict_get_str(params, "device");
+    int64_t offset = 0;
+    StreamState *s;
+
+    if (all && stop) {
+        qerror_report(QERR_INVALID_PARAMETER, "stop' not allowed with 'all");
+        return -1;
+    }
+
+    if (stop) {
+        if (stream_stop(device, cb, opaque)) {
+            return -1;
+        }
+    } else if (all) {
+        s = stream_start(device, offset, false, NULL, NULL);
+        if (!s) {
+            return -1;
+        }
+    } else {
+        if (qdict_haskey(params, "offset")) {
+            offset = qdict_get_int(params, "offset");
+        }
+        s = stream_start(device, offset, true, cb, opaque);
+        if (!s) {
+            return -1;
+        }
+        return 0;
+        /* This will complete asynchronously when the sector is streamed */
+    }
+
+    /* Starting and stopping full device streams complete immediately */
+    cb(opaque, NULL);
+    return 0;
+}
+
 static int eject_device(Monitor *mon, BlockDriverState *bs, int force)
 {
     if (!force) {
diff --git a/blockdev.h b/blockdev.h
index 2c9e780..c1c4dfd 100644
--- a/blockdev.h
+++ b/blockdev.h
@@ -12,6 +12,7 @@ 
 
 #include "block.h"
 #include "qemu-queue.h"
+#include "monitor.h"
 
 void blockdev_mark_auto_del(BlockDriverState *bs);
 void blockdev_auto_del(BlockDriverState *bs);
@@ -64,5 +65,9 @@  int do_change_block(Monitor *mon, const char *device,
 int do_drive_del(Monitor *mon, const QDict *qdict, QObject **ret_data);
 int do_snapshot_blkdev(Monitor *mon, const QDict *qdict, QObject **ret_data);
 int do_block_resize(Monitor *mon, const QDict *qdict, QObject **ret_data);
+void monitor_print_stream(Monitor *mon, const QObject *data);
+int do_stream_info(Monitor *mon, MonitorCompletion *cb, void *opaque);
+int do_stream(Monitor *mon, const QDict *params,
+              MonitorCompletion cb, void *opaque);
 
 #endif
diff --git a/hmp-commands.hx b/hmp-commands.hx
index 834e6a8..1db477c 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -38,6 +38,22 @@  Commit changes to the disk images (if -snapshot is used) or backing files.
 ETEXI
 
     {
+        .name       = "stream",
+        .args_type  = "all:-a,stop:-s,device:B,offset:i?",
+        .params     = "[-a] [-s] device [offset]",
+        .help       = "Stream data to a block device",
+        .user_print = monitor_print_stream,
+        .mhandler.cmd_async = do_stream,
+        .flags      = MONITOR_CMD_ASYNC,
+    },
+
+STEXI
+@item stream
+@findex stream
+Stream data to a block device.
+ETEXI
+
+    {
         .name       = "q|quit",
         .args_type  = "",
         .params     = "",
@@ -1352,6 +1368,8 @@  show device tree
 show qdev device model list
 @item info roms
 show roms
+@item info stream
+show progress of ongoing stream operation
 @end table
 ETEXI
 
diff --git a/monitor.c b/monitor.c
index 5f3bc72..f325ede 100644
--- a/monitor.c
+++ b/monitor.c
@@ -3100,6 +3100,16 @@  static const mon_cmd_t info_cmds[] = {
         .mhandler.info = do_info_trace_events,
     },
 #endif
+     {
+        .name       = "stream",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show block streaming status",
+        .user_print = monitor_print_stream,
+        .mhandler.info_async = do_stream_info,
+        .flags      = MONITOR_CMD_ASYNC,
+
+    },
     {
         .name       = NULL,
     },
@@ -3242,6 +3252,16 @@  static const mon_cmd_t qmp_query_cmds[] = {
         .mhandler.info_async = do_info_balloon,
         .flags      = MONITOR_CMD_ASYNC,
     },
+    {
+        .name       = "stream",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show block streaming status",
+        .user_print = monitor_print_stream,
+        .mhandler.info_async = do_stream_info,
+        .flags      = MONITOR_CMD_ASYNC,
+
+    },
     { /* NULL */ },
 };
 
diff --git a/qerror.c b/qerror.c
index 4855604..5a4f0ba 100644
--- a/qerror.c
+++ b/qerror.c
@@ -157,6 +157,10 @@  static const QErrorStringTable qerror_table[] = {
         .desc      = "No '%(bus)' bus found for device '%(device)'",
     },
     {
+        .error_fmt = QERR_NOT_SUPPORTED,
+        .desc      = "Operation is not supported",
+    },
+    {
         .error_fmt = QERR_OPEN_FILE_FAILED,
         .desc      = "Could not open '%(filename)'",
     },
@@ -209,6 +213,11 @@  static const QErrorStringTable qerror_table[] = {
         .error_fmt = QERR_VNC_SERVER_FAILED,
         .desc      = "Could not start VNC server on %(target)",
     },
+    {
+        .error_fmt = QERR_STREAMING_ERROR,
+        .desc      = "An error occurred during streaming: %(msg)",
+    },
+
     {}
 };
 
diff --git a/qerror.h b/qerror.h
index df61d2c..cbe19cb 100644
--- a/qerror.h
+++ b/qerror.h
@@ -132,6 +132,9 @@  QError *qobject_to_qerror(const QObject *obj);
 #define QERR_NO_BUS_FOR_DEVICE \
     "{ 'class': 'NoBusForDevice', 'data': { 'device': %s, 'bus': %s } }"
 
+#define QERR_NOT_SUPPORTED \
+    "{ 'class': 'NotSupported', 'data': {} }"
+
 #define QERR_OPEN_FILE_FAILED \
     "{ 'class': 'OpenFileFailed', 'data': { 'filename': %s } }"
 
@@ -174,4 +177,7 @@  QError *qobject_to_qerror(const QObject *obj);
 #define QERR_FEATURE_DISABLED \
     "{ 'class': 'FeatureDisabled', 'data': { 'name': %s } }"
 
+#define QERR_STREAMING_ERROR \
+    "{ 'class': 'StreamingError', 'data': { 'msg': %s } }"
+
 #endif /* QERROR_H */
diff --git a/qmp-commands.hx b/qmp-commands.hx
index fbd98ee..c2bccd6 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -858,6 +858,48 @@  Example:
 EQMP
 
     {
+        .name       = "stream",
+        .args_type  = "all:-a,stop:-s,device:B,offset:i?",
+        .params     = "[-a] [-s] device [offset]",
+        .help       = "Stream data to a block device",
+        .user_print = monitor_print_stream,
+        .mhandler.cmd_async = do_stream,
+        .flags      = MONITOR_CMD_ASYNC,
+    },
+
+SQMP
+stream
+------
+
+Stream data to a block device.
+
+Arguments:
+
+- all:    Stream the entire device (json-bool, optional)
+- stop:   Stop streaming to the device (json-bool, optional)
+- device: device name (json-string)
+- offset: device offset in bytes (json-int, optional)
+
+Return:
+
+- device: The device name being streamed
+- len:    The size of the device (in bytes)
+- offset: The ending offset of the completed I/O (in bytes)
+
+Examples:
+
+-> { "execute": "stream", "arguments": { "device": "virtio0", "offset": 0 } }
+<- { "return":  { "device": "virtio0", "len": 10737418240, "offset": 512 } }
+
+-> { "execute": "stream", "arguments": { "all": true, "device": "virtio0" } }
+<- { "return": {} }
+
+-> { "execute": "stream", "arguments": { "stop": true, "device": "virtio0" } }
+<- { "return": {} }
+
+EQMP
+
+    {
         .name       = "qmp_capabilities",
         .args_type  = "",
         .params     = "",
@@ -1777,3 +1819,25 @@  Example:
 
 EQMP
 
+SQMP
+query-stream
+------------
+
+Show progress of ongoing stream operation
+
+Return a json-array of all streams.  If no stream is active then an empty array
+will be returned.  Each stream is a json-object with the following data:
+
+- device: The device name being streamed
+- len:    The size of the device (in bytes)
+- offset: The ending offset of the completed I/O (in bytes)
+
+Example:
+
+-> { "execute": "query-stream" }
+<- { "return":[
+        { "device": "virtio0", "len": 10737418240, "offset": 709632}
+     ]
+   }
+
+EQMP