Patchwork [44/47] mirror: switch mirror_iteration to AIO

login
register
mail settings
Submitter Paolo Bonzini
Date July 24, 2012, 11:04 a.m.
Message ID <1343127865-16608-45-git-send-email-pbonzini@redhat.com>
Download mbox | patch
Permalink /patch/172876/
State New
Headers show

Comments

Paolo Bonzini - July 24, 2012, 11:04 a.m.
There is really no change in the behavior of the job here, since there
is still a maximum of one in-flight I/O operation between the source and
the target.  However, this patch already introduces moves the copy logic
from mirror_iteration to AIO callbacks; it also adds the logic to count
in-flight operations, and only complete the job after they have finished.

Some care is required in the error and cancellation cases, in order
to avoid access to dangling pointers (and consequent corruption).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/mirror.c |  161 ++++++++++++++++++++++++++++++++++++++++++--------------
 trace-events   |    2 +
 2 files changed, 123 insertions(+), 40 deletions(-)
Eric Blake - July 28, 2012, 1:46 p.m.
On 07/24/2012 05:04 AM, Paolo Bonzini wrote:
> There is really no change in the behavior of the job here, since there
> is still a maximum of one in-flight I/O operation between the source and
> the target.  However, this patch already introduces moves the copy logic

grammar: 'already introduces moves' is awkward, but I'm not sure what
you meant.

> from mirror_iteration to AIO callbacks; it also adds the logic to count
> in-flight operations, and only complete the job after they have finished.

s/complete/completes/

> 
> Some care is required in the error and cancellation cases, in order
> to avoid access to dangling pointers (and consequent corruption).
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/mirror.c |  161 ++++++++++++++++++++++++++++++++++++++++++--------------
>  trace-events   |    2 +
>  2 files changed, 123 insertions(+), 40 deletions(-)
> 
> diff --git a/block/mirror.c b/block/mirror.c
> index 81a600b..971c923 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -17,7 +17,7 @@
>  #include "qemu/ratelimit.h"
>  #include "bitmap.h"
>  
> -#define SLICE_TIME 100000000ULL /* ns */
> +#define SLICE_TIME    100000000ULL /* ns */

Why the spurious respacing?
Paolo Bonzini - July 30, 2012, 1:41 p.m.
Il 28/07/2012 15:46, Eric Blake ha scritto:
> On 07/24/2012 05:04 AM, Paolo Bonzini wrote:
>> There is really no change in the behavior of the job here, since there
>> is still a maximum of one in-flight I/O operation between the source and
>> the target.  However, this patch already introduces moves the copy logic
> 
> grammar: 'already introduces moves' is awkward, but I'm not sure what
> you meant.
> 
>> from mirror_iteration to AIO callbacks; it also adds the logic to count
>> in-flight operations, and only complete the job after they have finished.
> 
> s/complete/completes/

Wow, I'm embarrassed...

>>
>> Some care is required in the error and cancellation cases, in order
>> to avoid access to dangling pointers (and consequent corruption).
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>  block/mirror.c |  161 ++++++++++++++++++++++++++++++++++++++++++--------------
>>  trace-events   |    2 +
>>  2 files changed, 123 insertions(+), 40 deletions(-)
>>
>> diff --git a/block/mirror.c b/block/mirror.c
>> index 81a600b..971c923 100644
>> --- a/block/mirror.c
>> +++ b/block/mirror.c
>> @@ -17,7 +17,7 @@
>>  #include "qemu/ratelimit.h"
>>  #include "bitmap.h"
>>  
>> -#define SLICE_TIME 100000000ULL /* ns */
>> +#define SLICE_TIME    100000000ULL /* ns */
> 
> Why the spurious respacing?

This patch was split from the one that introduces MAX_IN_FLIGHT.

 #define SLICE_TIME    100000000ULL /* ns */
+#define MAX_IN_FLIGHT 16

so the respacing belongs there.

Paolo

Patch

diff --git a/block/mirror.c b/block/mirror.c
index 81a600b..971c923 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -17,7 +17,7 @@ 
 #include "qemu/ratelimit.h"
 #include "bitmap.h"
 
-#define SLICE_TIME 100000000ULL /* ns */
+#define SLICE_TIME    100000000ULL /* ns */
 
 typedef struct MirrorBlockJob {
     BlockJob common;
@@ -33,17 +33,78 @@  typedef struct MirrorBlockJob {
     unsigned long *cow_bitmap;
     HBitmapIter hbi;
     uint8_t *buf;
+
+    int in_flight;
+    int ret;
 } MirrorBlockJob;
 
-static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
-                                         BlockErrorAction *p_action)
+typedef struct MirrorOp {
+    MirrorBlockJob *s;
+    QEMUIOVector qiov;
+    struct iovec iov;
+    int64_t sector_num;
+    int nb_sectors;
+} MirrorOp;
+
+static void mirror_iteration_done(MirrorOp *op)
 {
+    MirrorBlockJob *s = op->s;
+
+    s->in_flight--;
+    trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
+    g_slice_free(MirrorOp, op);
+    qemu_coroutine_enter(s->common.co, NULL);
+}
+
+static void mirror_write_complete(void *opaque, int ret)
+{
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
     BlockDriverState *source = s->common.bs;
     BlockDriverState *target = s->target;
-    QEMUIOVector qiov;
-    int ret, nb_sectors, nb_sectors_chunk;
+    BlockErrorAction action;
+
+    if (ret < 0) {
+        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+        action = block_job_error_action(&s->common, target, s->on_target_error,
+                                        false, -ret);
+        s->synced = false;
+        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+            s->ret = ret;
+        }
+    }
+    mirror_iteration_done(op);
+}
+
+static void mirror_read_complete(void *opaque, int ret)
+{
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
+    BlockDriverState *source = s->common.bs;
+    BlockDriverState *target = s->target;
+    BlockErrorAction action;
+    if (ret < 0) {
+        bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+        action = block_job_error_action(&s->common, source, s->on_source_error,
+                                        true, -ret);
+        s->synced = false;
+        if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+            s->ret = ret;
+        }
+
+        mirror_iteration_done(op);
+        return;
+    }
+    bdrv_aio_writev(target, op->sector_num, &op->qiov, op->nb_sectors,
+                    mirror_write_complete, op);
+}
+
+static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
+{
+    BlockDriverState *source = s->common.bs;
+    int nb_sectors, nb_sectors_chunk;
     int64_t end, sector_num, cluster_num;
-    struct iovec iov;
+    MirrorOp *op;
 
     s->sector_num = hbitmap_iter_next(&s->hbi);
     if (s->sector_num < 0) {
@@ -74,34 +135,30 @@  static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
 
     end = s->common.len >> BDRV_SECTOR_BITS;
     nb_sectors = MIN(nb_sectors, end - sector_num);
+
+    /* Allocate a MirrorOp that is used as an AIO callback.  */
+    op = g_slice_new(MirrorOp);
+    op->s = s;
+    op->iov.iov_base = s->buf;
+    op->iov.iov_len  = nb_sectors * 512;
+    op->sector_num = sector_num;
+    op->nb_sectors = nb_sectors;
+    qemu_iovec_init_external(&op->qiov, &op->iov, 1);
+
     bdrv_reset_dirty(source, sector_num, nb_sectors);
 
     /* Copy the dirty cluster.  */
-    iov.iov_base = s->buf;
-    iov.iov_len  = nb_sectors * 512;
-    qemu_iovec_init_external(&qiov, &iov, 1);
-
+    s->in_flight++;
     trace_mirror_one_iteration(s, sector_num, nb_sectors);
-    ret = bdrv_co_readv(source, sector_num, nb_sectors, &qiov);
-    if (ret < 0) {
-        *p_action = block_job_error_action(&s->common, source,
-                                           s->on_source_error, true, -ret);
-        s->synced = false;
-        goto fail;
-    }
-    ret = bdrv_co_writev(target, sector_num, nb_sectors, &qiov);
-    if (ret < 0) {
-        *p_action = block_job_error_action(&s->common, target,
-                                           s->on_target_error, false, -ret);
-        s->synced = false;
-        goto fail;
-    }
-    return 0;
+    bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
+                   mirror_read_complete, op);
+}
 
-fail:
-    /* Try again later.  */
-    bdrv_set_dirty(source, sector_num, nb_sectors);
-    return ret;
+static void mirror_drain(MirrorBlockJob *s)
+{
+    while (s->in_flight > 0) {
+        qemu_coroutine_yield();
+    }
 }
 
 static void coroutine_fn mirror_run(void *opaque)
@@ -109,6 +166,7 @@  static void coroutine_fn mirror_run(void *opaque)
     MirrorBlockJob *s = opaque;
     BlockDriverState *bs = s->common.bs;
     int64_t sector_num, end, nb_sectors_chunk, length;
+    uint64_t last_pause_ns;
     BlockDriverInfo bdi;
     char backing_filename[1024];
     int ret = 0;
@@ -168,22 +226,37 @@  static void coroutine_fn mirror_run(void *opaque)
     }
 
     bdrv_dirty_iter_init(bs, &s->hbi);
+    last_pause_ns = qemu_get_clock_ns(rt_clock);
     for (;;) {
         uint64_t delay_ns;
         int64_t cnt;
         bool should_complete;
 
+        if (s->ret < 0) {
+            ret = s->ret;
+            break;
+        }
+
         cnt = bdrv_get_dirty_count(bs);
-        if (cnt != 0) {
-            BlockErrorAction action = BDRV_ACTION_REPORT;
-            ret = mirror_iteration(s, &action);
-            if (ret < 0 && action == BDRV_ACTION_REPORT) {
-                break;
+
+        /* Note that even when no rate limit is applied we need to yield
+         * periodically with no pending I/O so that qemu_aio_flush() returns.
+         * We do so every SLICE_TIME milliseconds, or when there is an error,
+         * or when the source is clean, whichever comes first.
+         */
+        if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
+            s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+            if (s->in_flight > 0) {
+                trace_mirror_yield(s, s->in_flight, cnt);
+                qemu_coroutine_yield();
+                continue;
+            } else if (cnt != 0) {
+                mirror_iteration(s);
+                continue;
             }
-            cnt = bdrv_get_dirty_count(bs);
         }
 
-        if (cnt != 0) {
+        if (s->in_flight > 0 || cnt != 0) {
             should_complete = false;
         } else {
             trace_mirror_before_flush(s);
@@ -228,15 +301,12 @@  static void coroutine_fn mirror_run(void *opaque)
                 delay_ns = 0;
             }
 
-            /* Note that even when no rate limit is applied we need to yield
-             * with no pending I/O here so that qemu_aio_flush() returns.
-             */
             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
             if (block_job_is_cancelled(&s->common)) {
                 break;
             }
         } else if (!should_complete) {
-            delay_ns = (cnt == 0 ? SLICE_TIME : 0);
+            delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
         } else if (cnt == 0) {
             /* The two disks are in sync.  Exit and report successful
@@ -246,9 +316,20 @@  static void coroutine_fn mirror_run(void *opaque)
             s->common.cancelled = false;
             break;
         }
+        last_pause_ns = qemu_get_clock_ns(rt_clock);
+    }
+
+    if (s->in_flight > 0) {
+        /* We get here only if something went wrong.  Either the job failed,
+         * or it was cancelled prematurely so that we do not guarantee that
+         * the target is a copy of the source.
+         */
+        assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
+        mirror_drain(s);
     }
 
 immediate_exit:
+    assert(s->in_flight == 0);
     g_free(s->buf);
     g_free(s->cow_bitmap);
     bdrv_set_dirty_tracking(bs, 0);
diff --git a/trace-events b/trace-events
index 6b504d8..fe20bd7 100644
--- a/trace-events
+++ b/trace-events
@@ -83,6 +83,8 @@  mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64
 mirror_before_sleep(void *s, int64_t cnt, int synced) "s %p dirty count %"PRId64" synced %d"
 mirror_one_iteration(void *s, int64_t sector_num, int nb_sectors) "s %p sector_num %"PRId64" nb_sectors %d"
 mirror_cow(void *s, int64_t sector_num) "s %p sector_num %"PRId64
+mirror_iteration_done(void *s, int64_t sector_num, int nb_sectors) "s %p sector_num %"PRId64" nb_sectors %d"
+mirror_yield(void *s, int64_t cnt, int in_flight) "s %p dirty count %"PRId64" in_flight %d"
 
 # blockdev.c
 qmp_block_job_cancel(void *job) "job %p"