Message ID | 1311774295-8696-9-git-send-email-stefanha@linux.vnet.ibm.com |
---|---|
State | New |
Headers | show |
On Wed, Jul 27, 2011 at 02:44:48PM +0100, Stefan Hajnoczi wrote: > For leaf images with copy-on-read semantics, the stream command allows > the user to populate the image file by copying data from the backing > file while the guest is running. Once all blocks have been streamed, > the dependency on the original backing file is removed. Therefore, > stream commands can be used to implement post-copy live block migration > and rapid deployment. > > The command synopsis is: > > block_stream > ------------ > > Copy data from a backing file into a block device. > > The block streaming operation is performed in the background until the > entire backing file has been copied. This command returns immediately > once streaming has started. The status of ongoing block streaming > operations can be checked with query-block-jobs. The operation can be > stopped before it has completed using the block_job_cancel command. > > If a base file is specified then sectors are not copied from that base > file and its backing chain. When streaming completes the image file > will have the base file as its backing file. This can be used to stream > a subset of the backing file chain instead of flattening the entire > image. > > On successful completion the image file is updated to drop the backing > file. > > Arguments: > > - device: device name (json-string) > - base: common backing file (json-string, optional) > > Errors: > > DeviceInUse: streaming is already active on this device > DeviceNotFound: device name is invalid > NotSupported: image streaming is not supported by this device > > Events: > > On completion the BLOCK_JOB_COMPLETED event is raised with the following > fields: > > - type: job type ("stream" for image streaming, json-string) > - device: device name (json-string) > - end: maximum progress value (json-int) > - position: current progress value (json-int) > - speed: rate limit, bytes per second (json-int) > - error: error message (json-string, only on error) > > The completion event is raised both on success and on failure. On > success position is equal to end. On failure position and end can be > used to indicate at which point the operation failed. > > On failure the error field contains a human-readable error message. > There are no semantics other than that streaming has failed and clients > should not try to interpret the error string. > > Examples: > > -> { "execute": "block_stream", "arguments": { "device": "virtio0" } } > <- { "return": {} } > > Signed-off-by: Adam Litke <agl@us.ibm.com> > Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> > --- > blockdev.c | 133 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ > blockdev.h | 1 + > hmp-commands.hx | 14 ++++++ > monitor.c | 3 + > monitor.h | 1 + > qerror.c | 8 +++ > qerror.h | 6 +++ > qmp-commands.hx | 64 ++++++++++++++++++++++++++ > 8 files changed, 230 insertions(+), 0 deletions(-) > > diff --git a/blockdev.c b/blockdev.c > index b337732..cd5e49c 100644 > --- a/blockdev.c > +++ b/blockdev.c > @@ -16,6 +16,7 @@ > #include "sysemu.h" > #include "hw/qdev.h" > #include "block_int.h" > +#include "qjson.h" > > static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives); > > @@ -50,6 +51,131 @@ static const int if_max_devs[IF_COUNT] = { > [IF_SCSI] = 7, > }; > > +typedef struct StreamState { > + int64_t offset; /* current position in block device */ > + BlockDriverState *bs; > + QEMUTimer *timer; > + QLIST_ENTRY(StreamState) list; > +} StreamState; > + > +static QLIST_HEAD(, StreamState) block_streams = > + QLIST_HEAD_INITIALIZER(block_streams); > + > +static QObject *stream_get_qobject(StreamState *s) > +{ > + const char *name = bdrv_get_device_name(s->bs); > + int64_t len = bdrv_getlength(s->bs); > + > + return qobject_from_jsonf("{ 'device': %s, 'type': 'stream', " > + "'offset': %" PRId64 ", 'len': %" PRId64 ", " > + "'speed': %" PRId64 " }", > + name, s->offset, len, (int64_t)0); > +} > + > +static void stream_mon_event(StreamState *s, int ret) > +{ > + QObject *data = stream_get_qobject(s); > + > + if (ret < 0) { > + QDict *qdict = qobject_to_qdict(data); > + > + qdict_put(qdict, "error", qstring_from_str(strerror(-ret))); > + } > + > + monitor_protocol_event(QEVENT_BLOCK_JOB_COMPLETED, data); > + qobject_decref(data); > +} > + > +static void stream_free(StreamState *s) > +{ > + QLIST_REMOVE(s, list); > + > + qemu_del_timer(s->timer); > + qemu_free_timer(s->timer); > + qemu_free(s); > +} > + > +static void stream_complete(StreamState *s, int ret) > +{ > + stream_mon_event(s, ret); > + stream_free(s); > +} > + > +static void stream_cb(void *opaque, int nb_sectors) > +{ > + StreamState *s = opaque; > + > + if (nb_sectors < 0) { > + stream_complete(s, nb_sectors); > + return; > + } > + > + s->offset += nb_sectors * BDRV_SECTOR_SIZE; > + > + if (s->offset == bdrv_getlength(s->bs)) { > + bdrv_change_backing_file(s->bs, NULL, NULL); > + stream_complete(s, 0); > + } else { > + qemu_mod_timer(s->timer, qemu_get_clock_ns(rt_clock)); > + } > +} > + > +/* We can't call bdrv_aio_stream() directly from the callback because that > + * makes qemu_aio_flush() not complete until the streaming is completed. > + * By delaying with a timer, we give qemu_aio_flush() a chance to complete. > + */ > +static void stream_next_iteration(void *opaque) > +{ > + StreamState *s = opaque; > + > + bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE, stream_cb, s); > +} The plan is to replace format specific code with the generic implementation in the future? > + > +static StreamState *stream_find(const char *device) > +{ > + StreamState *s; > + > + QLIST_FOREACH(s, &block_streams, list) { > + if (strcmp(bdrv_get_device_name(s->bs), device) == 0) { > + return s; > + } > + } > + return NULL; > +} > + > +static StreamState *stream_start(const char *device) > +{ > + StreamState *s; > + BlockDriverAIOCB *acb; > + BlockDriverState *bs; > + > + s = stream_find(device); > + if (s) { > + qerror_report(QERR_DEVICE_IN_USE, device); > + return NULL; > + } > + > + bs = bdrv_find(device); > + if (!bs) { > + qerror_report(QERR_DEVICE_NOT_FOUND, device); > + return NULL; > + } > + > + s = qemu_mallocz(sizeof(*s)); > + s->bs = bs; > + s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s); > + QLIST_INSERT_HEAD(&block_streams, s, list); Should increase refcount with drive_get_ref().
On Thu, Jul 28, 2011 at 4:53 PM, Marcelo Tosatti <mtosatti@redhat.com> wrote: > On Wed, Jul 27, 2011 at 02:44:48PM +0100, Stefan Hajnoczi wrote: >> For leaf images with copy-on-read semantics, the stream command allows >> the user to populate the image file by copying data from the backing >> file while the guest is running. Once all blocks have been streamed, >> the dependency on the original backing file is removed. Therefore, >> stream commands can be used to implement post-copy live block migration >> and rapid deployment. >> >> The command synopsis is: >> >> block_stream >> ------------ >> >> Copy data from a backing file into a block device. >> >> The block streaming operation is performed in the background until the >> entire backing file has been copied. This command returns immediately >> once streaming has started. The status of ongoing block streaming >> operations can be checked with query-block-jobs. The operation can be >> stopped before it has completed using the block_job_cancel command. >> >> If a base file is specified then sectors are not copied from that base >> file and its backing chain. When streaming completes the image file >> will have the base file as its backing file. This can be used to stream >> a subset of the backing file chain instead of flattening the entire >> image. >> >> On successful completion the image file is updated to drop the backing >> file. >> >> Arguments: >> >> - device: device name (json-string) >> - base: common backing file (json-string, optional) >> >> Errors: >> >> DeviceInUse: streaming is already active on this device >> DeviceNotFound: device name is invalid >> NotSupported: image streaming is not supported by this device >> >> Events: >> >> On completion the BLOCK_JOB_COMPLETED event is raised with the following >> fields: >> >> - type: job type ("stream" for image streaming, json-string) >> - device: device name (json-string) >> - end: maximum progress value (json-int) >> - position: current progress value (json-int) >> - speed: rate limit, bytes per second (json-int) >> - error: error message (json-string, only on error) >> >> The completion event is raised both on success and on failure. On >> success position is equal to end. On failure position and end can be >> used to indicate at which point the operation failed. >> >> On failure the error field contains a human-readable error message. >> There are no semantics other than that streaming has failed and clients >> should not try to interpret the error string. >> >> Examples: >> >> -> { "execute": "block_stream", "arguments": { "device": "virtio0" } } >> <- { "return": {} } >> >> Signed-off-by: Adam Litke <agl@us.ibm.com> >> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> >> --- >> blockdev.c | 133 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> blockdev.h | 1 + >> hmp-commands.hx | 14 ++++++ >> monitor.c | 3 + >> monitor.h | 1 + >> qerror.c | 8 +++ >> qerror.h | 6 +++ >> qmp-commands.hx | 64 ++++++++++++++++++++++++++ >> 8 files changed, 230 insertions(+), 0 deletions(-) >> >> diff --git a/blockdev.c b/blockdev.c >> index b337732..cd5e49c 100644 >> --- a/blockdev.c >> +++ b/blockdev.c >> @@ -16,6 +16,7 @@ >> #include "sysemu.h" >> #include "hw/qdev.h" >> #include "block_int.h" >> +#include "qjson.h" >> >> static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives); >> >> @@ -50,6 +51,131 @@ static const int if_max_devs[IF_COUNT] = { >> [IF_SCSI] = 7, >> }; >> >> +typedef struct StreamState { >> + int64_t offset; /* current position in block device */ >> + BlockDriverState *bs; >> + QEMUTimer *timer; >> + QLIST_ENTRY(StreamState) list; >> +} StreamState; >> + >> +static QLIST_HEAD(, StreamState) block_streams = >> + QLIST_HEAD_INITIALIZER(block_streams); >> + >> +static QObject *stream_get_qobject(StreamState *s) >> +{ >> + const char *name = bdrv_get_device_name(s->bs); >> + int64_t len = bdrv_getlength(s->bs); >> + >> + return qobject_from_jsonf("{ 'device': %s, 'type': 'stream', " >> + "'offset': %" PRId64 ", 'len': %" PRId64 ", " >> + "'speed': %" PRId64 " }", >> + name, s->offset, len, (int64_t)0); >> +} >> + >> +static void stream_mon_event(StreamState *s, int ret) >> +{ >> + QObject *data = stream_get_qobject(s); >> + >> + if (ret < 0) { >> + QDict *qdict = qobject_to_qdict(data); >> + >> + qdict_put(qdict, "error", qstring_from_str(strerror(-ret))); >> + } >> + >> + monitor_protocol_event(QEVENT_BLOCK_JOB_COMPLETED, data); >> + qobject_decref(data); >> +} >> + >> +static void stream_free(StreamState *s) >> +{ >> + QLIST_REMOVE(s, list); >> + >> + qemu_del_timer(s->timer); >> + qemu_free_timer(s->timer); >> + qemu_free(s); >> +} >> + >> +static void stream_complete(StreamState *s, int ret) >> +{ >> + stream_mon_event(s, ret); >> + stream_free(s); >> +} >> + >> +static void stream_cb(void *opaque, int nb_sectors) >> +{ >> + StreamState *s = opaque; >> + >> + if (nb_sectors < 0) { >> + stream_complete(s, nb_sectors); >> + return; >> + } >> + >> + s->offset += nb_sectors * BDRV_SECTOR_SIZE; >> + >> + if (s->offset == bdrv_getlength(s->bs)) { >> + bdrv_change_backing_file(s->bs, NULL, NULL); >> + stream_complete(s, 0); >> + } else { >> + qemu_mod_timer(s->timer, qemu_get_clock_ns(rt_clock)); >> + } >> +} >> + >> +/* We can't call bdrv_aio_stream() directly from the callback because that >> + * makes qemu_aio_flush() not complete until the streaming is completed. >> + * By delaying with a timer, we give qemu_aio_flush() a chance to complete. >> + */ >> +static void stream_next_iteration(void *opaque) >> +{ >> + StreamState *s = opaque; >> + >> + bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE, stream_cb, s); >> +} > > The plan is to replace format specific code with the generic > implementation in the future? I think Kevin has said this will not be merged into qemu.git. But I am sharing it as a reference implementation against which the libvirt API works. Next I will send out a stub implementation of the QMP/HMP block_stream APIs without any of the block driver-specific functionality. It may make sense to merge this into QEMU just to nail down the QMP/HMP interface, even if it does not do anything yet (we have errors for ENOTSUP). >> + >> +static StreamState *stream_find(const char *device) >> +{ >> + StreamState *s; >> + >> + QLIST_FOREACH(s, &block_streams, list) { >> + if (strcmp(bdrv_get_device_name(s->bs), device) == 0) { >> + return s; >> + } >> + } >> + return NULL; >> +} >> + >> +static StreamState *stream_start(const char *device) >> +{ >> + StreamState *s; >> + BlockDriverAIOCB *acb; >> + BlockDriverState *bs; >> + >> + s = stream_find(device); >> + if (s) { >> + qerror_report(QERR_DEVICE_IN_USE, device); >> + return NULL; >> + } >> + >> + bs = bdrv_find(device); >> + if (!bs) { >> + qerror_report(QERR_DEVICE_NOT_FOUND, device); >> + return NULL; >> + } >> + >> + s = qemu_mallocz(sizeof(*s)); >> + s->bs = bs; >> + s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s); >> + QLIST_INSERT_HEAD(&block_streams, s, list); > > Should increase refcount with drive_get_ref(). Yes thanks. Stefan
diff --git a/blockdev.c b/blockdev.c index b337732..cd5e49c 100644 --- a/blockdev.c +++ b/blockdev.c @@ -16,6 +16,7 @@ #include "sysemu.h" #include "hw/qdev.h" #include "block_int.h" +#include "qjson.h" static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives); @@ -50,6 +51,131 @@ static const int if_max_devs[IF_COUNT] = { [IF_SCSI] = 7, }; +typedef struct StreamState { + int64_t offset; /* current position in block device */ + BlockDriverState *bs; + QEMUTimer *timer; + QLIST_ENTRY(StreamState) list; +} StreamState; + +static QLIST_HEAD(, StreamState) block_streams = + QLIST_HEAD_INITIALIZER(block_streams); + +static QObject *stream_get_qobject(StreamState *s) +{ + const char *name = bdrv_get_device_name(s->bs); + int64_t len = bdrv_getlength(s->bs); + + return qobject_from_jsonf("{ 'device': %s, 'type': 'stream', " + "'offset': %" PRId64 ", 'len': %" PRId64 ", " + "'speed': %" PRId64 " }", + name, s->offset, len, (int64_t)0); +} + +static void stream_mon_event(StreamState *s, int ret) +{ + QObject *data = stream_get_qobject(s); + + if (ret < 0) { + QDict *qdict = qobject_to_qdict(data); + + qdict_put(qdict, "error", qstring_from_str(strerror(-ret))); + } + + monitor_protocol_event(QEVENT_BLOCK_JOB_COMPLETED, data); + qobject_decref(data); +} + +static void stream_free(StreamState *s) +{ + QLIST_REMOVE(s, list); + + qemu_del_timer(s->timer); + qemu_free_timer(s->timer); + qemu_free(s); +} + +static void stream_complete(StreamState *s, int ret) +{ + stream_mon_event(s, ret); + stream_free(s); +} + +static void stream_cb(void *opaque, int nb_sectors) +{ + StreamState *s = opaque; + + if (nb_sectors < 0) { + stream_complete(s, nb_sectors); + return; + } + + s->offset += nb_sectors * BDRV_SECTOR_SIZE; + + if (s->offset == bdrv_getlength(s->bs)) { + bdrv_change_backing_file(s->bs, NULL, NULL); + stream_complete(s, 0); + } else { + qemu_mod_timer(s->timer, qemu_get_clock_ns(rt_clock)); + } +} + +/* We can't call bdrv_aio_stream() directly from the callback because that + * makes qemu_aio_flush() not complete until the streaming is completed. + * By delaying with a timer, we give qemu_aio_flush() a chance to complete. + */ +static void stream_next_iteration(void *opaque) +{ + StreamState *s = opaque; + + bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE, stream_cb, s); +} + +static StreamState *stream_find(const char *device) +{ + StreamState *s; + + QLIST_FOREACH(s, &block_streams, list) { + if (strcmp(bdrv_get_device_name(s->bs), device) == 0) { + return s; + } + } + return NULL; +} + +static StreamState *stream_start(const char *device) +{ + StreamState *s; + BlockDriverAIOCB *acb; + BlockDriverState *bs; + + s = stream_find(device); + if (s) { + qerror_report(QERR_DEVICE_IN_USE, device); + return NULL; + } + + bs = bdrv_find(device); + if (!bs) { + qerror_report(QERR_DEVICE_NOT_FOUND, device); + return NULL; + } + + s = qemu_mallocz(sizeof(*s)); + s->bs = bs; + s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s); + QLIST_INSERT_HEAD(&block_streams, s, list); + + acb = bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE, + stream_cb, s); + if (acb == NULL) { + stream_free(s); + qerror_report(QERR_NOT_SUPPORTED); + return NULL; + } + return s; +} + /* * We automatically delete the drive when a device using it gets * unplugged. Questionable feature, but we can't just drop it. @@ -650,6 +776,13 @@ out: return ret; } +int do_block_stream(Monitor *mon, const QDict *params, QObject **ret_data) +{ + const char *device = qdict_get_str(params, "device"); + + return stream_start(device) ? 0 : -1; +} + static int eject_device(Monitor *mon, BlockDriverState *bs, int force) { if (!force) { diff --git a/blockdev.h b/blockdev.h index 3587786..f475aa8 100644 --- a/blockdev.h +++ b/blockdev.h @@ -65,5 +65,6 @@ int do_change_block(Monitor *mon, const char *device, int do_drive_del(Monitor *mon, const QDict *qdict, QObject **ret_data); int do_snapshot_blkdev(Monitor *mon, const QDict *qdict, QObject **ret_data); int do_block_resize(Monitor *mon, const QDict *qdict, QObject **ret_data); +int do_block_stream(Monitor *mon, const QDict *params, QObject **ret_data); #endif diff --git a/hmp-commands.hx b/hmp-commands.hx index cbaa9a0..9bf1025 100644 --- a/hmp-commands.hx +++ b/hmp-commands.hx @@ -38,6 +38,20 @@ Commit changes to the disk images (if -snapshot is used) or backing files. ETEXI { + .name = "block_stream", + .args_type = "device:B", + .params = "device", + .help = "Copy data from a backing file into a block device", + .mhandler.cmd_new = do_block_stream, + }, + +STEXI +@item block_stream +@findex block_stream +Copy data from a backing file into a block device. +ETEXI + + { .name = "q|quit", .args_type = "", .params = "", diff --git a/monitor.c b/monitor.c index 718935b..700b534 100644 --- a/monitor.c +++ b/monitor.c @@ -468,6 +468,9 @@ void monitor_protocol_event(MonitorEvent event, QObject *data) case QEVENT_SPICE_DISCONNECTED: event_name = "SPICE_DISCONNECTED"; break; + case QEVENT_BLOCK_JOB_COMPLETED: + event_name = "BLOCK_JOB_COMPLETED"; + break; default: abort(); break; diff --git a/monitor.h b/monitor.h index 4f2d328..135c927 100644 --- a/monitor.h +++ b/monitor.h @@ -35,6 +35,7 @@ typedef enum MonitorEvent { QEVENT_SPICE_CONNECTED, QEVENT_SPICE_INITIALIZED, QEVENT_SPICE_DISCONNECTED, + QEVENT_BLOCK_JOB_COMPLETED, QEVENT_MAX, } MonitorEvent; diff --git a/qerror.c b/qerror.c index 69c1bc9..c5bd197 100644 --- a/qerror.c +++ b/qerror.c @@ -162,6 +162,10 @@ static const QErrorStringTable qerror_table[] = { .desc = "No '%(bus)' bus found for device '%(device)'", }, { + .error_fmt = QERR_NOT_SUPPORTED, + .desc = "Operation is not supported", + }, + { .error_fmt = QERR_OPEN_FILE_FAILED, .desc = "Could not open '%(filename)'", }, @@ -230,6 +234,10 @@ static const QErrorStringTable qerror_table[] = { .error_fmt = QERR_QGA_COMMAND_FAILED, .desc = "Guest agent command failed, error was '%(message)'", }, + { + .error_fmt = QERR_STREAMING_ERROR, + .desc = "An error occurred during streaming: %(msg)", + }, {} }; diff --git a/qerror.h b/qerror.h index 8058456..ffe3190 100644 --- a/qerror.h +++ b/qerror.h @@ -139,6 +139,9 @@ QError *qobject_to_qerror(const QObject *obj); #define QERR_NO_BUS_FOR_DEVICE \ "{ 'class': 'NoBusForDevice', 'data': { 'device': %s, 'bus': %s } }" +#define QERR_NOT_SUPPORTED \ + "{ 'class': 'NotSupported', 'data': {} }" + #define QERR_OPEN_FILE_FAILED \ "{ 'class': 'OpenFileFailed', 'data': { 'filename': %s } }" @@ -193,4 +196,7 @@ QError *qobject_to_qerror(const QObject *obj); #define QERR_QGA_COMMAND_FAILED \ "{ 'class': 'QgaCommandFailed', 'data': { 'message': %s } }" +#define QERR_STREAMING_ERROR \ + "{ 'class': 'StreamingError', 'data': { 'msg': %s } }" + #endif /* QERROR_H */ diff --git a/qmp-commands.hx b/qmp-commands.hx index 54e313c..80402c7 100644 --- a/qmp-commands.hx +++ b/qmp-commands.hx @@ -945,6 +945,70 @@ Example: <- { "return": {} } EQMP + + { + .name = "block_stream", + .args_type = "device:B", + .params = "device", + .help = "Copy data from a backing file into a block device", + .mhandler.cmd_new = do_block_stream, + }, + +SQMP + +Copy data from a backing file into a block device. + +The block streaming operation is performed in the background until the entire +backing file has been copied. This command returns immediately once streaming +has started. The status of ongoing block streaming operations can be checked +with query-block-jobs. The operation can be stopped before it has completed +using the block_job_cancel command. + +If a base file is specified then sectors are not copied from that base file and +its backing chain. When streaming completes the image file will have the base +file as its backing file. This can be used to stream a subset of the backing +file chain instead of flattening the entire image. + +On successful completion the image file is updated to drop the backing file. + +Arguments: + +- device: device name (json-string) +- base: common backing file (json-string, optional) + +Errors: + +DeviceInUse: streaming is already active on this device +DeviceNotFound: device name is invalid +NotSupported: image streaming is not supported by this device + +Events: + +On completion the BLOCK_JOB_COMPLETED event is raised with the following +fields: + +- type: job type ("stream" for image streaming, json-string) +- device: device name (json-string) +- end: maximum progress value (json-int) +- position: current progress value (json-int) +- speed: rate limit, bytes per second (json-int) +- error: error message (json-string, only on error) + +The completion event is raised both on success and on failure. On +success position is equal to end. On failure position and end can be +used to indicate at which point the operation failed. + +On failure the error field contains a human-readable error message. There are +no semantics other than that streaming has failed and clients should not try +to interpret the error string. + +Examples: + +-> { "execute": "block_stream", "arguments": { "device": "virtio0" } } +<- { "return": {} } + +EQMP + { .name = "qmp_capabilities", .args_type = "",