diff mbox

[v2,09/12] mirror: switch mirror_iteration to AIO

Message ID 1358357479-7912-10-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Jan. 16, 2013, 5:31 p.m. UTC
There is really no change in the behavior of the job here, since
there is still a maximum of one in-flight I/O operation between
the source and the target.  However, this patch already introduces
the AIO callbacks (which are unmodified in the next patch)
and some of the logic to count in-flight operations and only
complete the job when there is none.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/mirror.c |  155 ++++++++++++++++++++++++++++++++++++++++++--------------
 trace-events   |    2 +
 2 files changed, 119 insertions(+), 38 deletions(-)

Comments

Kevin Wolf Jan. 21, 2013, 11:39 a.m. UTC | #1
Am 16.01.2013 18:31, schrieb Paolo Bonzini:
> There is really no change in the behavior of the job here, since
> there is still a maximum of one in-flight I/O operation between
> the source and the target.  However, this patch already introduces
> the AIO callbacks (which are unmodified in the next patch)
> and some of the logic to count in-flight operations and only
> complete the job when there is none.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/mirror.c |  155 ++++++++++++++++++++++++++++++++++++++++++--------------
>  trace-events   |    2 +
>  2 files changed, 119 insertions(+), 38 deletions(-)
> 
> diff --git a/block/mirror.c b/block/mirror.c
> index ab41340..75c550a 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob {
>      unsigned long *cow_bitmap;
>      HBitmapIter hbi;
>      uint8_t *buf;
> +
> +    int in_flight;
> +    int ret;
>  } MirrorBlockJob;
>  
> +typedef struct MirrorOp {
> +    MirrorBlockJob *s;
> +    QEMUIOVector qiov;
> +    struct iovec iov;
> +    int64_t sector_num;
> +    int nb_sectors;
> +} MirrorOp;
> +
>  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>                                              int error)
>  {
> @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>      }
>  }
>  
> -static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
> -                                         BlockErrorAction *p_action)
> +static void mirror_iteration_done(MirrorOp *op)
> +{
> +    MirrorBlockJob *s = op->s;
> +
> +    s->in_flight--;
> +    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
> +    g_slice_free(MirrorOp, op);
> +    qemu_coroutine_enter(s->common.co, NULL);

This doesn't check if the job coroutine is actually in a state where
it's valid to reenter.

Technically it might even be okay because reentering during a sleep is
allowed and as good as reentering during the new yield, and bdrv_flush()
is only called if s->in_flight == 0. Most other calls _should_ be okay
as well, but I'm not so sure about bdrv_drain_all(), especially once
.bdrv_drain exists.

As you can see, this is becoming very subtle, so I would prefer adding
some explicit bool s->may_reenter or something like that.

> +}

> @@ -177,28 +233,43 @@ static void coroutine_fn mirror_run(void *opaque)
>      }
>  
>      bdrv_dirty_iter_init(bs, &s->hbi);
> +    last_pause_ns = qemu_get_clock_ns(rt_clock);
>      for (;;) {
>          uint64_t delay_ns;
>          int64_t cnt;
>          bool should_complete;
>  
> +        if (s->ret < 0) {
> +            ret = s->ret;
> +            break;
> +        }
> +
>          cnt = bdrv_get_dirty_count(bs);
> -        if (cnt != 0) {
> -            BlockErrorAction action = BDRV_ACTION_REPORT;
> -            ret = mirror_iteration(s, &action);
> -            if (ret < 0 && action == BDRV_ACTION_REPORT) {
> -                goto immediate_exit;
> +
> +        /* Note that even when no rate limit is applied we need to yield
> +         * periodically with no pending I/O so that qemu_aio_flush() returns.
> +         * We do so every SLICE_TIME milliseconds, or when there is an error,

s/milli/nano/

> +         * or when the source is clean, whichever comes first.
> +         */
> +        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
> +            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
> +            if (s->in_flight > 0) {
> +                trace_mirror_yield(s, s->in_flight, cnt);
> +                qemu_coroutine_yield();
> +                continue;
> +            } else if (cnt != 0) {
> +                mirror_iteration(s);
> +                continue;
>              }
> -            cnt = bdrv_get_dirty_count(bs);
>          }
>  
>          should_complete = false;
> -        if (cnt == 0) {
> +        if (s->in_flight == 0 && cnt == 0) {
>              trace_mirror_before_flush(s);
>              ret = bdrv_flush(s->target);
>              if (ret < 0) {
>                  if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) {
> -                    goto immediate_exit;
> +                    break;

Is this an unrelated change?

>                  }
>              } else {
>                  /* We're out of the streaming phase.  From now on, if the job
> @@ -244,15 +315,12 @@ static void coroutine_fn mirror_run(void *opaque)
>                  delay_ns = 0;
>              }
>  
> -            /* Note that even when no rate limit is applied we need to yield
> -             * with no pending I/O here so that bdrv_drain_all() returns.
> -             */
>              block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>              if (block_job_is_cancelled(&s->common)) {
>                  break;
>              }
>          } else if (!should_complete) {
> -            delay_ns = (cnt == 0 ? SLICE_TIME : 0);
> +            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
>              block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>          } else if (cnt == 0) {

Why don't we need to check s->in_flight == 0 here?

>              /* The two disks are in sync.  Exit and report successful
> @@ -262,9 +330,20 @@ static void coroutine_fn mirror_run(void *opaque)
>              s->common.cancelled = false;
>              break;
>          }
> +        last_pause_ns = qemu_get_clock_ns(rt_clock);
>      }
>  
>  immediate_exit:
> +    if (s->in_flight > 0) {
> +        /* We get here only if something went wrong.  Either the job failed,
> +         * or it was cancelled prematurely so that we do not guarantee that
> +         * the target is a copy of the source.
> +         */
> +        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
> +        mirror_drain(s);
> +    }
> +
> +    assert(s->in_flight == 0);
>      qemu_vfree(s->buf);
>      g_free(s->cow_bitmap);
>      bdrv_set_dirty_tracking(bs, 0);

Kevin
Paolo Bonzini Jan. 21, 2013, 12:09 p.m. UTC | #2
Il 21/01/2013 12:39, Kevin Wolf ha scritto:
> Am 16.01.2013 18:31, schrieb Paolo Bonzini:
>> There is really no change in the behavior of the job here, since
>> there is still a maximum of one in-flight I/O operation between
>> the source and the target.  However, this patch already introduces
>> the AIO callbacks (which are unmodified in the next patch)
>> and some of the logic to count in-flight operations and only
>> complete the job when there is none.
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>  block/mirror.c |  155 ++++++++++++++++++++++++++++++++++++++++++--------------
>>  trace-events   |    2 +
>>  2 files changed, 119 insertions(+), 38 deletions(-)
>>
>> diff --git a/block/mirror.c b/block/mirror.c
>> index ab41340..75c550a 100644
>> --- a/block/mirror.c
>> +++ b/block/mirror.c
>> @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob {
>>      unsigned long *cow_bitmap;
>>      HBitmapIter hbi;
>>      uint8_t *buf;
>> +
>> +    int in_flight;
>> +    int ret;
>>  } MirrorBlockJob;
>>  
>> +typedef struct MirrorOp {
>> +    MirrorBlockJob *s;
>> +    QEMUIOVector qiov;
>> +    struct iovec iov;
>> +    int64_t sector_num;
>> +    int nb_sectors;
>> +} MirrorOp;
>> +
>>  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>                                              int error)
>>  {
>> @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>      }
>>  }
>>  
>> -static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
>> -                                         BlockErrorAction *p_action)
>> +static void mirror_iteration_done(MirrorOp *op)
>> +{
>> +    MirrorBlockJob *s = op->s;
>> +
>> +    s->in_flight--;
>> +    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
>> +    g_slice_free(MirrorOp, op);
>> +    qemu_coroutine_enter(s->common.co, NULL);
> 
> This doesn't check if the job coroutine is actually in a state where
> it's valid to reenter.
> 
> Technically it might even be okay because reentering during a sleep is
> allowed and as good as reentering during the new yield, and bdrv_flush()
> is only called if s->in_flight == 0. Most other calls _should_ be okay
> as well, but I'm not so sure about bdrv_drain_all(), especially once
> .bdrv_drain exists.

bdrv_drain_all is also called only if s->in_flight == 0 too, but I see
your point.  It is indeed quite subtle, but it's okay.

> As you can see, this is becoming very subtle, so I would prefer adding
> some explicit bool s->may_reenter or something like that.

The right boolean to test is already there, it's job->busy.  I can add a
new API block_job_yield/block_job_enter (where block_job_yield
resets/sets busy across the yield, and block_job_enter only enters if
!job->busy), but that would be a separate series IMO.

>> +}
> 
>> @@ -177,28 +233,43 @@ static void coroutine_fn mirror_run(void *opaque)
>>      }
>>  
>>      bdrv_dirty_iter_init(bs, &s->hbi);
>> +    last_pause_ns = qemu_get_clock_ns(rt_clock);
>>      for (;;) {
>>          uint64_t delay_ns;
>>          int64_t cnt;
>>          bool should_complete;
>>  
>> +        if (s->ret < 0) {
>> +            ret = s->ret;
>> +            break;
>> +        }
>> +
>>          cnt = bdrv_get_dirty_count(bs);
>> -        if (cnt != 0) {
>> -            BlockErrorAction action = BDRV_ACTION_REPORT;
>> -            ret = mirror_iteration(s, &action);
>> -            if (ret < 0 && action == BDRV_ACTION_REPORT) {
>> -                goto immediate_exit;
>> +
>> +        /* Note that even when no rate limit is applied we need to yield
>> +         * periodically with no pending I/O so that qemu_aio_flush() returns.
>> +         * We do so every SLICE_TIME milliseconds, or when there is an error,
> 
> s/milli/nano/
> 
>> +         * or when the source is clean, whichever comes first.
>> +         */
>> +        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
>> +            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
>> +            if (s->in_flight > 0) {
>> +                trace_mirror_yield(s, s->in_flight, cnt);
>> +                qemu_coroutine_yield();
>> +                continue;
>> +            } else if (cnt != 0) {
>> +                mirror_iteration(s);
>> +                continue;
>>              }
>> -            cnt = bdrv_get_dirty_count(bs);
>>          }
>>  
>>          should_complete = false;
>> -        if (cnt == 0) {
>> +        if (s->in_flight == 0 && cnt == 0) {
>>              trace_mirror_before_flush(s);
>>              ret = bdrv_flush(s->target);
>>              if (ret < 0) {
>>                  if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) {
>> -                    goto immediate_exit;
>> +                    break;
> 
> Is this an unrelated change?

Seems like a rebase hiccup.  Doesn't have any semantic change, I'll drop it.

>>                  }
>>              } else {
>>                  /* We're out of the streaming phase.  From now on, if the job
>> @@ -244,15 +315,12 @@ static void coroutine_fn mirror_run(void *opaque)
>>                  delay_ns = 0;
>>              }
>>  
>> -            /* Note that even when no rate limit is applied we need to yield
>> -             * with no pending I/O here so that bdrv_drain_all() returns.
>> -             */
>>              block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>>              if (block_job_is_cancelled(&s->common)) {
>>                  break;
>>              }
>>          } else if (!should_complete) {
>> -            delay_ns = (cnt == 0 ? SLICE_TIME : 0);
>> +            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
>>              block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>>          } else if (cnt == 0) {
> 
> Why don't we need to check s->in_flight == 0 here?

As above, should_complete is only set to true within an if(in_flight == 0).

Paolo

>>              /* The two disks are in sync.  Exit and report successful
>> @@ -262,9 +330,20 @@ static void coroutine_fn mirror_run(void *opaque)
>>              s->common.cancelled = false;
>>              break;
>>          }
>> +        last_pause_ns = qemu_get_clock_ns(rt_clock);
>>      }
>>  
>>  immediate_exit:
>> +    if (s->in_flight > 0) {
>> +        /* We get here only if something went wrong.  Either the job failed,
>> +         * or it was cancelled prematurely so that we do not guarantee that
>> +         * the target is a copy of the source.
>> +         */
>> +        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
>> +        mirror_drain(s);
>> +    }
>> +
>> +    assert(s->in_flight == 0);
>>      qemu_vfree(s->buf);
>>      g_free(s->cow_bitmap);
>>      bdrv_set_dirty_tracking(bs, 0);
> 
> Kevin
> 
>
Kevin Wolf Jan. 21, 2013, 12:15 p.m. UTC | #3
Am 21.01.2013 13:09, schrieb Paolo Bonzini:
> Il 21/01/2013 12:39, Kevin Wolf ha scritto:
>> Am 16.01.2013 18:31, schrieb Paolo Bonzini:
>>> There is really no change in the behavior of the job here, since
>>> there is still a maximum of one in-flight I/O operation between
>>> the source and the target.  However, this patch already introduces
>>> the AIO callbacks (which are unmodified in the next patch)
>>> and some of the logic to count in-flight operations and only
>>> complete the job when there is none.
>>>
>>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>>> ---
>>>  block/mirror.c |  155 ++++++++++++++++++++++++++++++++++++++++++--------------
>>>  trace-events   |    2 +
>>>  2 files changed, 119 insertions(+), 38 deletions(-)
>>>
>>> diff --git a/block/mirror.c b/block/mirror.c
>>> index ab41340..75c550a 100644
>>> --- a/block/mirror.c
>>> +++ b/block/mirror.c
>>> @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob {
>>>      unsigned long *cow_bitmap;
>>>      HBitmapIter hbi;
>>>      uint8_t *buf;
>>> +
>>> +    int in_flight;
>>> +    int ret;
>>>  } MirrorBlockJob;
>>>  
>>> +typedef struct MirrorOp {
>>> +    MirrorBlockJob *s;
>>> +    QEMUIOVector qiov;
>>> +    struct iovec iov;
>>> +    int64_t sector_num;
>>> +    int nb_sectors;
>>> +} MirrorOp;
>>> +
>>>  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>>                                              int error)
>>>  {
>>> @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>>      }
>>>  }
>>>  
>>> -static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
>>> -                                         BlockErrorAction *p_action)
>>> +static void mirror_iteration_done(MirrorOp *op)
>>> +{
>>> +    MirrorBlockJob *s = op->s;
>>> +
>>> +    s->in_flight--;
>>> +    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
>>> +    g_slice_free(MirrorOp, op);
>>> +    qemu_coroutine_enter(s->common.co, NULL);
>>
>> This doesn't check if the job coroutine is actually in a state where
>> it's valid to reenter.
>>
>> Technically it might even be okay because reentering during a sleep is
>> allowed and as good as reentering during the new yield, and bdrv_flush()
>> is only called if s->in_flight == 0. Most other calls _should_ be okay
>> as well, but I'm not so sure about bdrv_drain_all(), especially once
>> .bdrv_drain exists.
> 
> bdrv_drain_all is also called only if s->in_flight == 0 too, but I see
> your point.  It is indeed quite subtle, but it's okay.

Ah, yes, that's the part I missed. Looks correct indeed.

>> As you can see, this is becoming very subtle, so I would prefer adding
>> some explicit bool s->may_reenter or something like that.
> 
> The right boolean to test is already there, it's job->busy.  I can add a
> new API block_job_yield/block_job_enter (where block_job_yield
> resets/sets busy across the yield, and block_job_enter only enters if
> !job->busy), but that would be a separate series IMO.

Please put it on your todo list then. I think I can accept the current
state if I know that it will be improved soon, even though I'm not very
comfortable with it.

Kevin
diff mbox

Patch

diff --git a/block/mirror.c b/block/mirror.c
index ab41340..75c550a 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -33,8 +33,19 @@  typedef struct MirrorBlockJob {
     unsigned long *cow_bitmap;
     HBitmapIter hbi;
     uint8_t *buf;
+
+    int in_flight;
+    int ret;
 } MirrorBlockJob;
 
+typedef struct MirrorOp {
+    MirrorBlockJob *s;
+    QEMUIOVector qiov;
+    struct iovec iov;
+    int64_t sector_num;
+    int nb_sectors;
+} MirrorOp;
+
 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
                                             int error)
 {
@@ -48,15 +59,60 @@  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
     }
 }
 
-static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
-                                         BlockErrorAction *p_action)
+static void mirror_iteration_done(MirrorOp *op)
+{
+    MirrorBlockJob *s = op->s;
+
+    s->in_flight--;
+    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
+    g_slice_free(MirrorOp, op);
+    qemu_coroutine_enter(s->common.co, NULL);
+}
+
+static void mirror_write_complete(void *opaque, int ret)
+{
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
+    if (ret < 0) {
+        BlockDriverState *source = s->common.bs;
+        BlockErrorAction action;
+
+        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+        action = mirror_error_action(s, false, -ret);
+        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+            s->ret = ret;
+        }
+    }
+    mirror_iteration_done(op);
+}
+
+static void mirror_read_complete(void *opaque, int ret)
+{
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
+    if (ret < 0) {
+        BlockDriverState *source = s->common.bs;
+        BlockErrorAction action;
+
+        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+        action = mirror_error_action(s, true, -ret);
+        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+            s->ret = ret;
+        }
+
+        mirror_iteration_done(op);
+        return;
+    }
+    bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors,
+                    mirror_write_complete, op);
+}
+
+static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
 {
     BlockDriverState *source = s->common.bs;
-    BlockDriverState *target = s->target;
-    QEMUIOVector qiov;
-    int ret, nb_sectors, nb_sectors_chunk;
+    int nb_sectors, nb_sectors_chunk;
     int64_t end, sector_num, cluster_num;
-    struct iovec iov;
+    MirrorOp *op;
 
     s->sector_num = hbitmap_iter_next(&s->hbi);
     if (s->sector_num < 0) {
@@ -87,31 +143,30 @@  static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
 
     end = s->common.len >> BDRV_SECTOR_BITS;
     nb_sectors = MIN(nb_sectors, end - sector_num);
+
+    /* Allocate a MirrorOp that is used as an AIO callback.  */
+    op = g_slice_new(MirrorOp);
+    op->s = s;
+    op->iov.iov_base = s->buf;
+    op->iov.iov_len  = nb_sectors * 512;
+    op->sector_num = sector_num;
+    op->nb_sectors = nb_sectors;
+    qemu_iovec_init_external(&op->qiov, &op->iov, 1);
+
     bdrv_reset_dirty(source, sector_num, nb_sectors);
 
     /* Copy the dirty cluster.  */
-    iov.iov_base = s->buf;
-    iov.iov_len  = nb_sectors * 512;
-    qemu_iovec_init_external(&qiov, &iov, 1);
-
+    s->in_flight++;
     trace_mirror_one_iteration(s, sector_num, nb_sectors);
-    ret = bdrv_co_readv(source, sector_num, nb_sectors, &qiov);
-    if (ret < 0) {
-        *p_action = mirror_error_action(s, true, -ret);
-        goto fail;
-    }
-    ret = bdrv_co_writev(target, sector_num, nb_sectors, &qiov);
-    if (ret < 0) {
-        *p_action = mirror_error_action(s, false, -ret);
-        s->synced = false;
-        goto fail;
-    }
-    return 0;
+    bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
+                   mirror_read_complete, op);
+}
 
-fail:
-    /* Try again later.  */
-    bdrv_set_dirty(source, sector_num, nb_sectors);
-    return ret;
+static void mirror_drain(MirrorBlockJob *s)
+{
+    while (s->in_flight > 0) {
+        qemu_coroutine_yield();
+    }
 }
 
 static void coroutine_fn mirror_run(void *opaque)
@@ -119,6 +174,7 @@  static void coroutine_fn mirror_run(void *opaque)
     MirrorBlockJob *s = opaque;
     BlockDriverState *bs = s->common.bs;
     int64_t sector_num, end, nb_sectors_chunk, length;
+    uint64_t last_pause_ns;
     BlockDriverInfo bdi;
     char backing_filename[1024];
     int ret = 0;
@@ -177,28 +233,43 @@  static void coroutine_fn mirror_run(void *opaque)
     }
 
     bdrv_dirty_iter_init(bs, &s->hbi);
+    last_pause_ns = qemu_get_clock_ns(rt_clock);
     for (;;) {
         uint64_t delay_ns;
         int64_t cnt;
         bool should_complete;
 
+        if (s->ret < 0) {
+            ret = s->ret;
+            break;
+        }
+
         cnt = bdrv_get_dirty_count(bs);
-        if (cnt != 0) {
-            BlockErrorAction action = BDRV_ACTION_REPORT;
-            ret = mirror_iteration(s, &action);
-            if (ret < 0 && action == BDRV_ACTION_REPORT) {
-                goto immediate_exit;
+
+        /* Note that even when no rate limit is applied we need to yield
+         * periodically with no pending I/O so that qemu_aio_flush() returns.
+         * We do so every SLICE_TIME milliseconds, or when there is an error,
+         * or when the source is clean, whichever comes first.
+         */
+        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
+            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+            if (s->in_flight > 0) {
+                trace_mirror_yield(s, s->in_flight, cnt);
+                qemu_coroutine_yield();
+                continue;
+            } else if (cnt != 0) {
+                mirror_iteration(s);
+                continue;
             }
-            cnt = bdrv_get_dirty_count(bs);
         }
 
         should_complete = false;
-        if (cnt == 0) {
+        if (s->in_flight == 0 && cnt == 0) {
             trace_mirror_before_flush(s);
             ret = bdrv_flush(s->target);
             if (ret < 0) {
                 if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) {
-                    goto immediate_exit;
+                    break;
                 }
             } else {
                 /* We're out of the streaming phase.  From now on, if the job
@@ -244,15 +315,12 @@  static void coroutine_fn mirror_run(void *opaque)
                 delay_ns = 0;
             }
 
-            /* Note that even when no rate limit is applied we need to yield
-             * with no pending I/O here so that bdrv_drain_all() returns.
-             */
             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
             if (block_job_is_cancelled(&s->common)) {
                 break;
             }
         } else if (!should_complete) {
-            delay_ns = (cnt == 0 ? SLICE_TIME : 0);
+            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
         } else if (cnt == 0) {
             /* The two disks are in sync.  Exit and report successful
@@ -262,9 +330,20 @@  static void coroutine_fn mirror_run(void *opaque)
             s->common.cancelled = false;
             break;
         }
+        last_pause_ns = qemu_get_clock_ns(rt_clock);
     }
 
 immediate_exit:
+    if (s->in_flight > 0) {
+        /* We get here only if something went wrong.  Either the job failed,
+         * or it was cancelled prematurely so that we do not guarantee that
+         * the target is a copy of the source.
+         */
+        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
+        mirror_drain(s);
+    }
+
+    assert(s->in_flight == 0);
     qemu_vfree(s->buf);
     g_free(s->cow_bitmap);
     bdrv_set_dirty_tracking(bs, 0);
diff --git a/trace-events b/trace-events
index b0a8085..4427d1f 100644
--- a/trace-events
+++ b/trace-events
@@ -85,6 +85,8 @@  mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64
 mirror_before_sleep(void *s, int64_t cnt, int synced) "s %p dirty count %"PRId64" synced %d"
 mirror_one_iteration(void *s, int64_t sector_num, int nb_sectors) "s %p sector_num %"PRId64" nb_sectors %d"
 mirror_cow(void *s, int64_t sector_num) "s %p sector_num %"PRId64
+mirror_iteration_done(void *s, int64_t sector_num, int nb_sectors) "s %p sector_num %"PRId64" nb_sectors %d"
+mirror_yield(void *s, int64_t cnt, int in_flight) "s %p dirty count %"PRId64" in_flight %d"
 
 # blockdev.c
 qmp_block_job_cancel(void *job) "job %p"