@@ -16,6 +16,7 @@
#include "sysemu.h"
#include "hw/qdev.h"
#include "block_int.h"
+#include "qjson.h"
static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives);
@@ -50,6 +51,144 @@ static const int if_max_devs[IF_COUNT] = {
[IF_SCSI] = 7,
};
+typedef struct StreamState {
+ MonitorCompletion *cb;
+ void *cb_opaque;
+ int64_t offset;
+ bool once;
+ bool cancel;
+ BlockDriverState *bs;
+ QEMUTimer *timer;
+ uint64_t stream_delay;
+} StreamState;
+
+static StreamState global_stream;
+static StreamState *active_stream;
+
+static QObject *stream_get_qobject(StreamState *s)
+{
+ const char *name = bdrv_get_device_name(s->bs);
+ int64_t len = bdrv_getlength(s->bs);
+
+ return qobject_from_jsonf("{ 'device': %s, 'offset': %" PRId64 ", "
+ "'len': %" PRId64 " }", name, s->offset, len);
+}
+
+static void do_stream_cb(void *opaque, int ret)
+{
+ StreamState *s = opaque;
+
+ if (ret < 0) {
+ qerror_report(QERR_STREAMING_ERROR, strerror(-ret));
+ goto out;
+ }
+
+ s->offset += ret * BDRV_SECTOR_SIZE;
+
+ if (!s->once) {
+ if (s->offset == bdrv_getlength(s->bs)) {
+ bdrv_change_backing_file(s->bs, NULL, NULL);
+ } else if (!s->cancel) {
+ qemu_mod_timer(s->timer,
+ qemu_get_clock_ns(rt_clock) + s->stream_delay);
+ return;
+ }
+ }
+
+out:
+ if (s->cb) {
+ s->cb(s->cb_opaque, stream_get_qobject(s));
+ }
+ qemu_del_timer(s->timer);
+ qemu_free_timer(s->timer);
+ active_stream = NULL;
+}
+
+/* We can't call bdrv_aio_stream() directly from the callback because that
+ * makes qemu_aio_flush() not complete until the streaming is completed.
+ * By delaying with a timer, we give qemu_aio_flush() a chance to complete.
+ */
+static void stream_next_iteration(void *opaque)
+{
+ StreamState *s = opaque;
+
+ bdrv_aio_stream(s->bs, s->offset / BDRV_SECTOR_SIZE, do_stream_cb, s);
+}
+
+static StreamState *stream_start(const char *device, int64_t offset, bool once,
+ MonitorCompletion cb, void *opaque)
+{
+ BlockDriverState *bs;
+ StreamState *s = &global_stream;
+ BlockDriverAIOCB *acb;
+
+ if (active_stream) {
+ qerror_report(QERR_DEVICE_IN_USE,
+ bdrv_get_device_name(active_stream->bs));
+ return NULL;
+ }
+
+ bs = bdrv_find(device);
+ if (!bs) {
+ qerror_report(QERR_DEVICE_NOT_FOUND, device);
+ return NULL;
+ }
+
+ if (offset % BDRV_SECTOR_SIZE) {
+ qerror_report(QERR_INVALID_PARAMETER_VALUE,
+ "offset", "a sector-aligned offset");
+ return NULL;
+ }
+
+ if (offset >= bdrv_getlength(bs)) {
+ qerror_report(QERR_INVALID_PARAMETER_VALUE,
+ "offset", "an offset less than device length");
+ return NULL;
+ }
+
+ memset(s, 0, sizeof(*s));
+ if (once) {
+ s->cb = cb;
+ s->cb_opaque = opaque;
+ s->once = true;
+ }
+ s->offset = offset;
+ s->bs = bs;
+ s->stream_delay = 0; /* FIXME make this configurable */
+ s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s);
+
+ acb = bdrv_aio_stream(bs, offset / BDRV_SECTOR_SIZE, do_stream_cb, s);
+ if (acb == NULL) {
+ qemu_free_timer(s->timer);
+ qerror_report(QERR_NOT_SUPPORTED);
+ return NULL;
+ }
+
+ active_stream = s;
+
+ return s;
+}
+
+static int stream_stop(const char *device, MonitorCompletion *cb, void *opaque)
+{
+ if (!active_stream) {
+ qerror_report(QERR_STREAMING_ERROR, strerror(ESRCH));
+ return -1;
+ }
+
+ /*
+ * In case we want to support simultaneous streams in the future,
+ * require a device name to be specified when stopping a stream.
+ */
+ if (strcmp(device, bdrv_get_device_name(active_stream->bs))) {
+ qerror_report(QERR_DEVICE_NOT_FOUND, device);
+ return -1;
+ }
+
+ active_stream->cancel = true;
+ return 0;
+}
+
/*
* We automatically delete the drive when a device using it gets
* unplugged. Questionable feature, but we can't just drop it.
@@ -647,6 +786,79 @@ out:
return ret;
}
+void monitor_print_stream(Monitor *mon, const QObject *data)
+{
+ QList *streams;
+
+ if (data == NULL) {
+ return;
+ }
+
+ streams = qobject_to_qlist(data);
+ if (streams && !qlist_empty(streams)) {
+ /* Only print a single stream until multi-stream support is added */
+ QDict *qdict = qobject_to_qdict(qlist_peek(streams));
+ monitor_printf(mon, "Streaming device %s: Completed %" PRId64 " of %"
+ PRId64 " bytes\n", qdict_get_str(qdict, "device"),
+ qdict_get_int(qdict, "offset"),
+ qdict_get_int(qdict, "len"));
+ } else {
+ monitor_printf(mon, "No active stream\n");
+ }
+}
+
+int do_stream_info(Monitor *mon, MonitorCompletion *cb, void *opaque)
+{
+ QList *streams = qlist_new();
+
+ if (active_stream) {
+ qlist_append_obj(streams, stream_get_qobject(active_stream));
+ }
+
+ cb(opaque, QOBJECT(streams));
+ return 0;
+}
+
+int do_stream(Monitor *mon, const QDict *params,
+ MonitorCompletion cb, void *opaque)
+{
+ int all = qdict_get_try_bool(params, "all", false);
+ int stop = qdict_get_try_bool(params, "stop", false);
+ const char *device = qdict_get_str(params, "device");
+ int64_t offset = 0;
+ StreamState *s;
+
+ if (all && stop) {
+ qerror_report(QERR_INVALID_PARAMETER, "stop' not allowed with 'all");
+ return -1;
+ }
+
+ if (stop) {
+ if (stream_stop(device, cb, opaque)) {
+ return -1;
+ }
+ } else if (all) {
+ s = stream_start(device, offset, false, NULL, NULL);
+ if (!s) {
+ return -1;
+ }
+ } else {
+ if (qdict_haskey(params, "offset")) {
+ offset = qdict_get_int(params, "offset");
+ }
+ s = stream_start(device, offset, true, cb, opaque);
+ if (!s) {
+ return -1;
+ }
+ return 0;
+ /* This will complete asynchronously when the sector is streamed */
+ }
+
+ /* Starting and stopping full device streams complete immediately */
+ cb(opaque, NULL);
+ return 0;
+}
+
static int eject_device(Monitor *mon, BlockDriverState *bs, int force)
{
if (!force) {
@@ -12,6 +12,7 @@
#include "block.h"
#include "qemu-queue.h"
+#include "monitor.h"
void blockdev_mark_auto_del(BlockDriverState *bs);
void blockdev_auto_del(BlockDriverState *bs);
@@ -64,5 +65,9 @@ int do_change_block(Monitor *mon, const char *device,
int do_drive_del(Monitor *mon, const QDict *qdict, QObject **ret_data);
int do_snapshot_blkdev(Monitor *mon, const QDict *qdict, QObject **ret_data);
int do_block_resize(Monitor *mon, const QDict *qdict, QObject **ret_data);
+void monitor_print_stream(Monitor *mon, const QObject *data);
+int do_stream_info(Monitor *mon, MonitorCompletion *cb, void *opaque);
+int do_stream(Monitor *mon, const QDict *params,
+ MonitorCompletion cb, void *opaque);
#endif
@@ -38,6 +38,22 @@ Commit changes to the disk images (if -snapshot is used) or backing files.
ETEXI
{
+ .name = "stream",
+ .args_type = "all:-a,stop:-s,device:B,offset:i?",
+ .params = "[-a] [-s] device [offset]",
+ .help = "Stream data to a block device",
+ .user_print = monitor_print_stream,
+ .mhandler.cmd_async = do_stream,
+ .flags = MONITOR_CMD_ASYNC,
+ },
+
+STEXI
+@item stream
+@findex stream
+Stream data to a block device.
+ETEXI
+
+ {
.name = "q|quit",
.args_type = "",
.params = "",
@@ -1352,6 +1368,8 @@ show device tree
show qdev device model list
@item info roms
show roms
+@item info stream
+show progress of ongoing stream operation
@end table
ETEXI
@@ -3100,6 +3100,16 @@ static const mon_cmd_t info_cmds[] = {
.mhandler.info = do_info_trace_events,
},
#endif
+ {
+ .name = "stream",
+ .args_type = "",
+ .params = "",
+ .help = "show block streaming status",
+ .user_print = monitor_print_stream,
+ .mhandler.info_async = do_stream_info,
+ .flags = MONITOR_CMD_ASYNC,
+
+ },
{
.name = NULL,
},
@@ -3242,6 +3252,16 @@ static const mon_cmd_t qmp_query_cmds[] = {
.mhandler.info_async = do_info_balloon,
.flags = MONITOR_CMD_ASYNC,
},
+ {
+ .name = "stream",
+ .args_type = "",
+ .params = "",
+ .help = "show block streaming status",
+ .user_print = monitor_print_stream,
+ .mhandler.info_async = do_stream_info,
+ .flags = MONITOR_CMD_ASYNC,
+
+ },
{ /* NULL */ },
};
@@ -157,6 +157,10 @@ static const QErrorStringTable qerror_table[] = {
.desc = "No '%(bus)' bus found for device '%(device)'",
},
{
+ .error_fmt = QERR_NOT_SUPPORTED,
+ .desc = "Operation is not supported",
+ },
+ {
.error_fmt = QERR_OPEN_FILE_FAILED,
.desc = "Could not open '%(filename)'",
},
@@ -209,6 +213,11 @@ static const QErrorStringTable qerror_table[] = {
.error_fmt = QERR_VNC_SERVER_FAILED,
.desc = "Could not start VNC server on %(target)",
},
+ {
+ .error_fmt = QERR_STREAMING_ERROR,
+ .desc = "An error occurred during streaming: %(msg)",
+ },
+
{}
};
@@ -132,6 +132,9 @@ QError *qobject_to_qerror(const QObject *obj);
#define QERR_NO_BUS_FOR_DEVICE \
"{ 'class': 'NoBusForDevice', 'data': { 'device': %s, 'bus': %s } }"
+#define QERR_NOT_SUPPORTED \
+ "{ 'class': 'NotSupported', 'data': {} }"
+
#define QERR_OPEN_FILE_FAILED \
"{ 'class': 'OpenFileFailed', 'data': { 'filename': %s } }"
@@ -174,4 +177,7 @@ QError *qobject_to_qerror(const QObject *obj);
#define QERR_FEATURE_DISABLED \
"{ 'class': 'FeatureDisabled', 'data': { 'name': %s } }"
+#define QERR_STREAMING_ERROR \
+ "{ 'class': 'StreamingError', 'data': { 'msg': %s } }"
+
#endif /* QERROR_H */
@@ -858,6 +858,48 @@ Example:
EQMP
{
+ .name = "stream",
+ .args_type = "all:-a,stop:-s,device:B,offset:i?",
+ .params = "[-a] [-s] device [offset]",
+ .help = "Stream data to a block device",
+ .user_print = monitor_print_stream,
+ .mhandler.cmd_async = do_stream,
+ .flags = MONITOR_CMD_ASYNC,
+ },
+
+SQMP
+stream
+------
+
+Stream data to a block device.
+
+Arguments:
+
+- all: Stream the entire device (json-bool, optional)
+- stop: Stop streaming to the device (json-bool, optional)
+- device: device name (json-string)
+- offset: device offset in bytes (json-int, optional)
+
+Return:
+
+- device: The device name being streamed
+- len: The size of the device (in bytes)
+- offset: The ending offset of the completed I/O (in bytes)
+
+Examples:
+
+-> { "execute": "stream", "arguments": { "device": "virtio0", "offset": 0 } }
+<- { "return": { "device": "virtio0", "len": 10737418240, "offset": 512 } }
+
+-> { "execute": "stream", "arguments": { "all": true, "device": "virtio0" } }
+<- { "return": {} }
+
+-> { "execute": "stream", "arguments": { "stop": true, "device": "virtio0" } }
+<- { "return": {} }
+
+EQMP
+
+ {
.name = "qmp_capabilities",
.args_type = "",
.params = "",
@@ -1777,3 +1819,25 @@ Example:
EQMP
+SQMP
+query-stream
+------------
+
+Show progress of ongoing stream operation
+
+Return a json-array of all streams. If no stream is active then an empty array
+will be returned. Each stream is a json-object with the following data:
+
+- device: The device name being streamed
+- len: The size of the device (in bytes)
+- offset: The ending offset of the completed I/O (in bytes)
+
+Example:
+
+-> { "execute": "query-stream" }
+<- { "return":[
+ { "device": "virtio0", "len": 10737418240, "offset": 709632}
+ ]
+ }
+
+EQMP