diff mbox

[3/8] block: add image streaming block job

Message ID 1319728975-6069-4-git-send-email-stefanha@linux.vnet.ibm.com
State New
Headers show

Commit Message

Stefan Hajnoczi Oct. 27, 2011, 3:22 p.m. UTC
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 Makefile.objs  |    1 +
 block/stream.c |  135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 block_int.h    |    3 +
 trace-events   |    4 ++
 4 files changed, 143 insertions(+), 0 deletions(-)
 create mode 100644 block/stream.c

Comments

Marcelo Tosatti Nov. 1, 2011, 6:06 p.m. UTC | #1
On Thu, Oct 27, 2011 at 04:22:50PM +0100, Stefan Hajnoczi wrote:
> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>  Makefile.objs  |    1 +
>  block/stream.c |  135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block_int.h    |    3 +
>  trace-events   |    4 ++
>  4 files changed, 143 insertions(+), 0 deletions(-)
>  create mode 100644 block/stream.c
> 
> diff --git a/Makefile.objs b/Makefile.objs
> index c290fd3..802db96 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -34,6 +34,7 @@ block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow
>  block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>  block-nested-y += qed-check.o
>  block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
> +block-nested-y += stream.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>  block-nested-$(CONFIG_CURL) += curl.o
> diff --git a/block/stream.c b/block/stream.c
> new file mode 100644
> index 0000000..8cdf566
> --- /dev/null
> +++ b/block/stream.c
> @@ -0,0 +1,135 @@
> +/*
> + * Image streaming
> + *
> + * Copyright IBM, Corp. 2011
> + *
> + * Authors:
> + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + *
> + */
> +
> +#include "trace.h"
> +#include "block_int.h"
> +
> +enum {
> +    /*
> +     * Size of data buffer for populating the image file.  This should be large
> +     * enough to process multiple clusters in a single call, so that populating
> +     * contiguous regions of the image is efficient.
> +     */
> +    STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */
> +};
> +
> +typedef struct StreamBlockJob {
> +    BlockJob common;
> +    BlockDriverState *base;
> +} StreamBlockJob;
> +
> +static int coroutine_fn stream_populate(BlockDriverState *bs,
> +                                        int64_t sector_num, int nb_sectors,
> +                                        void *buf)
> +{
> +    struct iovec iov = {
> +        .iov_base = buf,
> +        .iov_len  = nb_sectors * BDRV_SECTOR_SIZE,
> +    };
> +    QEMUIOVector qiov;
> +
> +    qemu_iovec_init_external(&qiov, &iov, 1);
> +
> +    /* Copy-on-read the unallocated clusters */
> +    return bdrv_co_readv(bs, sector_num, nb_sectors, &qiov);
> +}
> +
> +static int stream_one_iteration(StreamBlockJob *s, int64_t sector_num,
> +                                void *buf, int max_sectors, int *n)
> +{
> +    BlockDriverState *bs = s->common.bs;
> +    int ret;
> +
> +    trace_stream_one_iteration(s, sector_num, max_sectors);
> +
> +    ret = bdrv_is_allocated(bs, sector_num, max_sectors, n);
> +    if (ret < 0) {
> +        return ret;
> +    }

bdrv_is_allocated is still synchronous? If so, there should be at least
a plan to make it asynchronous.

> +    if (!ret) {
> +        ret = stream_populate(bs, sector_num, *n, buf);
> +    }
> +    return ret;
> +}
> +
> +static void coroutine_fn stream_run(void *opaque)
> +{
> +    StreamBlockJob *s = opaque;
> +    BlockDriverState *bs = s->common.bs;
> +    int64_t sector_num, end;
> +    int ret = 0;
> +    int n;
> +    void *buf;
> +
> +    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);
> +    s->common.len = bdrv_getlength(bs);
> +    bdrv_get_geometry(bs, (uint64_t *)&end);
> +
> +    bdrv_set_zero_detection(bs, true);
> +    bdrv_set_copy_on_read(bs, true);

Should distinguish between stream initiated and user initiated setting 
of zero detection and cor (so that unsetting below does not clear
user settings).

> +
> +    for (sector_num = 0; sector_num < end; sector_num += n) {
> +        if (block_job_is_cancelled(&s->common)) {
> +            break;
> +        }

If cancellation is seen here in the last loop iteration,
bdrv_change_backing_file below should not be executed.

> +
> +        /* TODO rate-limit */
> +        /* Note that even when no rate limit is applied we need to yield with
> +         * no pending I/O here so that qemu_aio_flush() is able to return.
> +         */
> +        co_sleep_ns(rt_clock, 0);

How do you plan to implement rate limit?

> +
> +        ret = stream_one_iteration(s, sector_num, buf,
> +                                   STREAM_BUFFER_SIZE / BDRV_SECTOR_SIZE, &n);
> +        if (ret < 0) {
> +            break;
> +        }
> +
> +        /* Publish progress */
> +        s->common.offset += n * BDRV_SECTOR_SIZE;
> +    }
> +
> +    bdrv_set_copy_on_read(bs, false);
> +    bdrv_set_zero_detection(bs, false);
> +
> +    if (sector_num == end && ret == 0) {
> +        bdrv_change_backing_file(bs, NULL, NULL);
> +    }
> +
> +    qemu_vfree(buf);
> +    block_job_complete(&s->common, ret);
> +}
> +
> +static BlockJobType stream_job_type = {
> +    .instance_size = sizeof(StreamBlockJob),
> +    .job_type      = "stream",
> +};
> +
> +int stream_start(BlockDriverState *bs, BlockDriverState *base,
> +                 BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    StreamBlockJob *s;
> +    Coroutine *co;
> +
> +    if (bs->job) {
> +        return -EBUSY;
> +    }
> +
> +    s = block_job_create(&stream_job_type, bs, cb, opaque);
> +    s->base = base;
> +
> +    co = qemu_coroutine_create(stream_run);
> +    trace_stream_start(bs, base, s, co, opaque);
> +    qemu_coroutine_enter(co, s);
> +    return 0;
> +}

I'd like to see the shared base code implemented before this is merged.

On a related note, the maze of coroutine locks and waiting queues makes
it difficult to have a clear picture of the execution flow (perhaps that
is due to lack of familiarity with the use of coroutines in the block
code).
Stefan Hajnoczi Nov. 2, 2011, 3:43 p.m. UTC | #2
On Tue, Nov 1, 2011 at 6:06 PM, Marcelo Tosatti <mtosatti@redhat.com> wrote:
> On Thu, Oct 27, 2011 at 04:22:50PM +0100, Stefan Hajnoczi wrote:
>> +static int stream_one_iteration(StreamBlockJob *s, int64_t sector_num,
>> +                                void *buf, int max_sectors, int *n)
>> +{
>> +    BlockDriverState *bs = s->common.bs;
>> +    int ret;
>> +
>> +    trace_stream_one_iteration(s, sector_num, max_sectors);
>> +
>> +    ret = bdrv_is_allocated(bs, sector_num, max_sectors, n);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>
> bdrv_is_allocated is still synchronous? If so, there should be at least
> a plan to make it asynchronous.

Yes, that's a good discussion to have.  My thoughts are that
bdrv_is_allocated() should be executed in coroutine context.  The
semantics are a little tricky because of parallel requests:

1. If a write request is in progress when we do bdrv_is_allocated() we
might get back "unallocated" even though clusters are just being
allocated.
2. If a TRIM request is in progress when we do bdrv_is_allocated() we
might get back "allocated" even though clusters are just being
deallocated.

In order to be reliable the caller needs to be aware of parallel
requests.  I think it's correct to defer this problem to the caller.

In the case of image streaming we're not TRIM-safe, I haven't really
thought about it yet.  But we are safe against parallel write requests
because there is serialization to prevent copy-on-read requests from
racing with write requests.

>> +    if (!ret) {
>> +        ret = stream_populate(bs, sector_num, *n, buf);
>> +    }
>> +    return ret;
>> +}
>> +
>> +static void coroutine_fn stream_run(void *opaque)
>> +{
>> +    StreamBlockJob *s = opaque;
>> +    BlockDriverState *bs = s->common.bs;
>> +    int64_t sector_num, end;
>> +    int ret = 0;
>> +    int n;
>> +    void *buf;
>> +
>> +    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);
>> +    s->common.len = bdrv_getlength(bs);
>> +    bdrv_get_geometry(bs, (uint64_t *)&end);
>> +
>> +    bdrv_set_zero_detection(bs, true);
>> +    bdrv_set_copy_on_read(bs, true);
>
> Should distinguish between stream initiated and user initiated setting
> of zero detection and cor (so that unsetting below does not clear
> user settings).

For zero detection I agree.

For copy-on-read it is questionable since once streaming is complete
it does not make sense to have copy-on-read enabled.

I will address this in the next revision and think more about the
copy-on-read case.

>> +
>> +    for (sector_num = 0; sector_num < end; sector_num += n) {
>> +        if (block_job_is_cancelled(&s->common)) {
>> +            break;
>> +        }
>
> If cancellation is seen here in the last loop iteration,
> bdrv_change_backing_file below should not be executed.

I documented this case in the QMP API.  I'm not sure if it's possible
to guarantee that the operation isn't just completing as you cancel
it.  Any blocking point between completion of the last iteration and
completing the operation is vulnerable to missing the cancel.  It's
easier to explicitly say the operation might just have completed when
you canceled, rather than trying to protect the completion path.  Do
you think it's a problem to have these loose semantics that I
described?

>> +
>> +        /* TODO rate-limit */
>> +        /* Note that even when no rate limit is applied we need to yield with
>> +         * no pending I/O here so that qemu_aio_flush() is able to return.
>> +         */
>> +        co_sleep_ns(rt_clock, 0);
>
> How do you plan to implement rate limit?

It was implemented in the QED-specific image streaming series:

http://repo.or.cz/w/qemu/stefanha.git/commitdiff/22f2c09d2fcfe5e49ac4604fd23e4744f549a476

That implementation works fine and is small but I'd like to reuse the
migration speed limit, if possible.  That way we don't have 3
different rate-limiting implementations in QEMU :).

Stefan
Kevin Wolf Nov. 2, 2011, 4:43 p.m. UTC | #3
Am 02.11.2011 16:43, schrieb Stefan Hajnoczi:
> On Tue, Nov 1, 2011 at 6:06 PM, Marcelo Tosatti <mtosatti@redhat.com> wrote:
>> On Thu, Oct 27, 2011 at 04:22:50PM +0100, Stefan Hajnoczi wrote:
>>> +static int stream_one_iteration(StreamBlockJob *s, int64_t sector_num,
>>> +                                void *buf, int max_sectors, int *n)
>>> +{
>>> +    BlockDriverState *bs = s->common.bs;
>>> +    int ret;
>>> +
>>> +    trace_stream_one_iteration(s, sector_num, max_sectors);
>>> +
>>> +    ret = bdrv_is_allocated(bs, sector_num, max_sectors, n);
>>> +    if (ret < 0) {
>>> +        return ret;
>>> +    }
>>
>> bdrv_is_allocated is still synchronous? If so, there should be at least
>> a plan to make it asynchronous.
> 
> Yes, that's a good discussion to have.  My thoughts are that
> bdrv_is_allocated() should be executed in coroutine context.  

bdrv_is_allocated() isn't coroutine-safe. You need to introduce
bdrv_co_is_allocated and convert all drivers before you can do this. You
don't want to access the qcow2 metadata cache without holding the lock,
for example.

> The
> semantics are a little tricky because of parallel requests:
> 
> 1. If a write request is in progress when we do bdrv_is_allocated() we
> might get back "unallocated" even though clusters are just being
> allocated.
> 2. If a TRIM request is in progress when we do bdrv_is_allocated() we
> might get back "allocated" even though clusters are just being
> deallocated.
> 
> In order to be reliable the caller needs to be aware of parallel
> requests.  I think it's correct to defer this problem to the caller.

I agree.

> In the case of image streaming we're not TRIM-safe, I haven't really
> thought about it yet.  But we are safe against parallel write requests
> because there is serialization to prevent copy-on-read requests from
> racing with write requests.

I don't think it matters. If you lose a bdrv_discard, nothing bad has
happened. bdrv_discard means that you have undefined content afterwards.

Kevin
Marcelo Tosatti Nov. 3, 2011, 4:34 p.m. UTC | #4
On Wed, Nov 02, 2011 at 03:43:49PM +0000, Stefan Hajnoczi wrote:
> On Tue, Nov 1, 2011 at 6:06 PM, Marcelo Tosatti <mtosatti@redhat.com> wrote:
> > On Thu, Oct 27, 2011 at 04:22:50PM +0100, Stefan Hajnoczi wrote:
> >> +static int stream_one_iteration(StreamBlockJob *s, int64_t sector_num,
> >> +                                void *buf, int max_sectors, int *n)
> >> +{
> >> +    BlockDriverState *bs = s->common.bs;
> >> +    int ret;
> >> +
> >> +    trace_stream_one_iteration(s, sector_num, max_sectors);
> >> +
> >> +    ret = bdrv_is_allocated(bs, sector_num, max_sectors, n);
> >> +    if (ret < 0) {
> >> +        return ret;
> >> +    }
> >
> > bdrv_is_allocated is still synchronous? If so, there should be at least
> > a plan to make it asynchronous.
> 
> Yes, that's a good discussion to have.  My thoughts are that
> bdrv_is_allocated() should be executed in coroutine context.  The
> semantics are a little tricky because of parallel requests:
> 
> 1. If a write request is in progress when we do bdrv_is_allocated() we
> might get back "unallocated" even though clusters are just being
> allocated.
> 2. If a TRIM request is in progress when we do bdrv_is_allocated() we
> might get back "allocated" even though clusters are just being
> deallocated.
> 
> In order to be reliable the caller needs to be aware of parallel
> requests.  I think it's correct to defer this problem to the caller.
> 
> In the case of image streaming we're not TRIM-safe, I haven't really
> thought about it yet.  But we are safe against parallel write requests
> because there is serialization to prevent copy-on-read requests from
> racing with write requests.
> 
> >> +    if (!ret) {
> >> +        ret = stream_populate(bs, sector_num, *n, buf);
> >> +    }
> >> +    return ret;
> >> +}
> >> +
> >> +static void coroutine_fn stream_run(void *opaque)
> >> +{
> >> +    StreamBlockJob *s = opaque;
> >> +    BlockDriverState *bs = s->common.bs;
> >> +    int64_t sector_num, end;
> >> +    int ret = 0;
> >> +    int n;
> >> +    void *buf;
> >> +
> >> +    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);
> >> +    s->common.len = bdrv_getlength(bs);
> >> +    bdrv_get_geometry(bs, (uint64_t *)&end);
> >> +
> >> +    bdrv_set_zero_detection(bs, true);
> >> +    bdrv_set_copy_on_read(bs, true);
> >
> > Should distinguish between stream initiated and user initiated setting
> > of zero detection and cor (so that unsetting below does not clear
> > user settings).
> 
> For zero detection I agree.
> 
> For copy-on-read it is questionable since once streaming is complete
> it does not make sense to have copy-on-read enabled.
> 
> I will address this in the next revision and think more about the
> copy-on-read case.
> 
> >> +
> >> +    for (sector_num = 0; sector_num < end; sector_num += n) {
> >> +        if (block_job_is_cancelled(&s->common)) {
> >> +            break;
> >> +        }
> >
> > If cancellation is seen here in the last loop iteration,
> > bdrv_change_backing_file below should not be executed.
> 
> I documented this case in the QMP API.  I'm not sure if it's possible
> to guarantee that the operation isn't just completing as you cancel
> it.  Any blocking point between completion of the last iteration and
> completing the operation is vulnerable to missing the cancel.  It's
> easier to explicitly say the operation might just have completed when
> you canceled, rather than trying to protect the completion path.  Do
> you think it's a problem to have these loose semantics that I
> described?

No, that is ok. I'm referring to bdrv_change_backing_file() being
executed without the entire image being streamed.

"if (sector_num == end && ret == 0)" includes both all sectors being 
streamed and all sectors except the last iteration being streamed (due
to job cancelled break).

> >> +
> >> +        /* TODO rate-limit */
> >> +        /* Note that even when no rate limit is applied we need to yield with
> >> +         * no pending I/O here so that qemu_aio_flush() is able to return.
> >> +         */
> >> +        co_sleep_ns(rt_clock, 0);
> >
> > How do you plan to implement rate limit?
> 
> It was implemented in the QED-specific image streaming series:
> 
> http://repo.or.cz/w/qemu/stefanha.git/commitdiff/22f2c09d2fcfe5e49ac4604fd23e4744f549a476
> 
> That implementation works fine and is small but I'd like to reuse the
> migration speed limit, if possible.  That way we don't have 3
> different rate-limiting implementations in QEMU :).

One possibility would be to create a "virtual" block device for
streaming, sitting on top of the real block device. Then enforce block
I/O limits on the virtual block device, the guest would remain accessing
the real block device.
Stefan Hajnoczi Nov. 4, 2011, 8:03 a.m. UTC | #5
On Thu, Nov 03, 2011 at 02:34:24PM -0200, Marcelo Tosatti wrote:
> On Wed, Nov 02, 2011 at 03:43:49PM +0000, Stefan Hajnoczi wrote:
> > On Tue, Nov 1, 2011 at 6:06 PM, Marcelo Tosatti <mtosatti@redhat.com> wrote:
> > > On Thu, Oct 27, 2011 at 04:22:50PM +0100, Stefan Hajnoczi wrote:
> > >> +
> > >> +    for (sector_num = 0; sector_num < end; sector_num += n) {
> > >> +        if (block_job_is_cancelled(&s->common)) {
> > >> +            break;
> > >> +        }
> > >
> > > If cancellation is seen here in the last loop iteration,
> > > bdrv_change_backing_file below should not be executed.
> > 
> > I documented this case in the QMP API.  I'm not sure if it's possible
> > to guarantee that the operation isn't just completing as you cancel
> > it.  Any blocking point between completion of the last iteration and
> > completing the operation is vulnerable to missing the cancel.  It's
> > easier to explicitly say the operation might just have completed when
> > you canceled, rather than trying to protect the completion path.  Do
> > you think it's a problem to have these loose semantics that I
> > described?
> 
> No, that is ok. I'm referring to bdrv_change_backing_file() being
> executed without the entire image being streamed.
> 
> "if (sector_num == end && ret == 0)" includes both all sectors being 
> streamed and all sectors except the last iteration being streamed (due
> to job cancelled break).

I don't see the case you mention.  Here is the code again:

for (sector_num = 0; sector_num < end; sector_num += n) {
    if (block_job_is_cancelled(&s->common)) {
        break;
    }

If we are on the last iteration, then sector_num = end - m, where m > 0
and is the number of sectors we are about to stream.

If we are cancelled during this last iteration then sector_num == end
- m.  Therefore the "if (sector_num == end && ret == 0)" case does not
  evaluate to true.

The only way we can reach sector_num == end is by having successfully
streamed those last m sectors.  Why?  Because sector_num is a 0-based
index and not a 1-based index, so it excludes end.

> > >> +
> > >> +        /* TODO rate-limit */
> > >> +        /* Note that even when no rate limit is applied we need to yield with
> > >> +         * no pending I/O here so that qemu_aio_flush() is able to return.
> > >> +         */
> > >> +        co_sleep_ns(rt_clock, 0);
> > >
> > > How do you plan to implement rate limit?
> > 
> > It was implemented in the QED-specific image streaming series:
> > 
> > http://repo.or.cz/w/qemu/stefanha.git/commitdiff/22f2c09d2fcfe5e49ac4604fd23e4744f549a476
> > 
> > That implementation works fine and is small but I'd like to reuse the
> > migration speed limit, if possible.  That way we don't have 3
> > different rate-limiting implementations in QEMU :).
> 
> One possibility would be to create a "virtual" block device for
> streaming, sitting on top of the real block device. Then enforce block
> I/O limits on the virtual block device, the guest would remain accessing
> the real block device.

That's an interesting idea.  I have also experimented with rate-limiting
and it seems the common code is really small - the rate-limiting code is
quite short to begin with.  So I'm now tending to reimplementing it.

Stefan
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index c290fd3..802db96 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -34,6 +34,7 @@  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += qed-check.o
 block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-nested-y += stream.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block/stream.c b/block/stream.c
new file mode 100644
index 0000000..8cdf566
--- /dev/null
+++ b/block/stream.c
@@ -0,0 +1,135 @@ 
+/*
+ * Image streaming
+ *
+ * Copyright IBM, Corp. 2011
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ *
+ */
+
+#include "trace.h"
+#include "block_int.h"
+
+enum {
+    /*
+     * Size of data buffer for populating the image file.  This should be large
+     * enough to process multiple clusters in a single call, so that populating
+     * contiguous regions of the image is efficient.
+     */
+    STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */
+};
+
+typedef struct StreamBlockJob {
+    BlockJob common;
+    BlockDriverState *base;
+} StreamBlockJob;
+
+static int coroutine_fn stream_populate(BlockDriverState *bs,
+                                        int64_t sector_num, int nb_sectors,
+                                        void *buf)
+{
+    struct iovec iov = {
+        .iov_base = buf,
+        .iov_len  = nb_sectors * BDRV_SECTOR_SIZE,
+    };
+    QEMUIOVector qiov;
+
+    qemu_iovec_init_external(&qiov, &iov, 1);
+
+    /* Copy-on-read the unallocated clusters */
+    return bdrv_co_readv(bs, sector_num, nb_sectors, &qiov);
+}
+
+static int stream_one_iteration(StreamBlockJob *s, int64_t sector_num,
+                                void *buf, int max_sectors, int *n)
+{
+    BlockDriverState *bs = s->common.bs;
+    int ret;
+
+    trace_stream_one_iteration(s, sector_num, max_sectors);
+
+    ret = bdrv_is_allocated(bs, sector_num, max_sectors, n);
+    if (ret < 0) {
+        return ret;
+    }
+    if (!ret) {
+        ret = stream_populate(bs, sector_num, *n, buf);
+    }
+    return ret;
+}
+
+static void coroutine_fn stream_run(void *opaque)
+{
+    StreamBlockJob *s = opaque;
+    BlockDriverState *bs = s->common.bs;
+    int64_t sector_num, end;
+    int ret = 0;
+    int n;
+    void *buf;
+
+    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);
+    s->common.len = bdrv_getlength(bs);
+    bdrv_get_geometry(bs, (uint64_t *)&end);
+
+    bdrv_set_zero_detection(bs, true);
+    bdrv_set_copy_on_read(bs, true);
+
+    for (sector_num = 0; sector_num < end; sector_num += n) {
+        if (block_job_is_cancelled(&s->common)) {
+            break;
+        }
+
+        /* TODO rate-limit */
+        /* Note that even when no rate limit is applied we need to yield with
+         * no pending I/O here so that qemu_aio_flush() is able to return.
+         */
+        co_sleep_ns(rt_clock, 0);
+
+        ret = stream_one_iteration(s, sector_num, buf,
+                                   STREAM_BUFFER_SIZE / BDRV_SECTOR_SIZE, &n);
+        if (ret < 0) {
+            break;
+        }
+
+        /* Publish progress */
+        s->common.offset += n * BDRV_SECTOR_SIZE;
+    }
+
+    bdrv_set_copy_on_read(bs, false);
+    bdrv_set_zero_detection(bs, false);
+
+    if (sector_num == end && ret == 0) {
+        bdrv_change_backing_file(bs, NULL, NULL);
+    }
+
+    qemu_vfree(buf);
+    block_job_complete(&s->common, ret);
+}
+
+static BlockJobType stream_job_type = {
+    .instance_size = sizeof(StreamBlockJob),
+    .job_type      = "stream",
+};
+
+int stream_start(BlockDriverState *bs, BlockDriverState *base,
+                 BlockDriverCompletionFunc *cb, void *opaque)
+{
+    StreamBlockJob *s;
+    Coroutine *co;
+
+    if (bs->job) {
+        return -EBUSY;
+    }
+
+    s = block_job_create(&stream_job_type, bs, cb, opaque);
+    s->base = base;
+
+    co = qemu_coroutine_create(stream_run);
+    trace_stream_start(bs, base, s, co, opaque);
+    qemu_coroutine_enter(co, s);
+    return 0;
+}
diff --git a/block_int.h b/block_int.h
index 4da1b74..c454170 100644
--- a/block_int.h
+++ b/block_int.h
@@ -346,4 +346,7 @@  static inline bool block_job_is_cancelled(BlockJob *job)
     return job->cancel_cb;
 }
 
+int stream_start(BlockDriverState *bs, BlockDriverState *base,
+                 BlockDriverCompletionFunc *cb, void *opaque);
+
 #endif /* BLOCK_INT_H */
diff --git a/trace-events b/trace-events
index e3a1e6f..487d560 100644
--- a/trace-events
+++ b/trace-events
@@ -71,6 +71,10 @@  bdrv_co_writev(void *bs, int64_t sector_num, int nb_sector) "bs %p sector_num %"
 bdrv_co_io_em(void *bs, int64_t sector_num, int nb_sectors, int is_write, void *acb) "bs %p sector_num %"PRId64" nb_sectors %d is_write %d acb %p"
 bdrv_co_copy_on_readv(void *bs, int64_t sector_num, int nb_sectors, int64_t cluster_sector_num, int cluster_nb_sectors) "bs %p sector_num %"PRId64" nb_sectors %d cluster_sector_num %"PRId64" cluster_nb_sectors %d"
 
+# block/stream.c
+stream_one_iteration(void *s, int64_t sector_num, int max_sectors) "s %p sector_num %"PRId64" max_sectors %d"
+stream_start(void *bs, void *base, void *s, void *co, void *opaque) "bs %p base %p s %p co %p opaque %p"
+
 # hw/virtio-blk.c
 virtio_blk_req_complete(void *req, int status) "req %p status %d"
 virtio_blk_rw_complete(void *req, int ret) "req %p ret %d"