Message ID | 1358357479-7912-10-git-send-email-pbonzini@redhat.com |
---|---|
State | New |
Headers | show |
Am 16.01.2013 18:31, schrieb Paolo Bonzini: > There is really no change in the behavior of the job here, since > there is still a maximum of one in-flight I/O operation between > the source and the target. However, this patch already introduces > the AIO callbacks (which are unmodified in the next patch) > and some of the logic to count in-flight operations and only > complete the job when there is none. > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> > --- > block/mirror.c | 155 ++++++++++++++++++++++++++++++++++++++++++-------------- > trace-events | 2 + > 2 files changed, 119 insertions(+), 38 deletions(-) > > diff --git a/block/mirror.c b/block/mirror.c > index ab41340..75c550a 100644 > --- a/block/mirror.c > +++ b/block/mirror.c > @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob { > unsigned long *cow_bitmap; > HBitmapIter hbi; > uint8_t *buf; > + > + int in_flight; > + int ret; > } MirrorBlockJob; > > +typedef struct MirrorOp { > + MirrorBlockJob *s; > + QEMUIOVector qiov; > + struct iovec iov; > + int64_t sector_num; > + int nb_sectors; > +} MirrorOp; > + > static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, > int error) > { > @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, > } > } > > -static int coroutine_fn mirror_iteration(MirrorBlockJob *s, > - BlockErrorAction *p_action) > +static void mirror_iteration_done(MirrorOp *op) > +{ > + MirrorBlockJob *s = op->s; > + > + s->in_flight--; > + trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors); > + g_slice_free(MirrorOp, op); > + qemu_coroutine_enter(s->common.co, NULL); This doesn't check if the job coroutine is actually in a state where it's valid to reenter. Technically it might even be okay because reentering during a sleep is allowed and as good as reentering during the new yield, and bdrv_flush() is only called if s->in_flight == 0. Most other calls _should_ be okay as well, but I'm not so sure about bdrv_drain_all(), especially once .bdrv_drain exists. As you can see, this is becoming very subtle, so I would prefer adding some explicit bool s->may_reenter or something like that. > +} > @@ -177,28 +233,43 @@ static void coroutine_fn mirror_run(void *opaque) > } > > bdrv_dirty_iter_init(bs, &s->hbi); > + last_pause_ns = qemu_get_clock_ns(rt_clock); > for (;;) { > uint64_t delay_ns; > int64_t cnt; > bool should_complete; > > + if (s->ret < 0) { > + ret = s->ret; > + break; > + } > + > cnt = bdrv_get_dirty_count(bs); > - if (cnt != 0) { > - BlockErrorAction action = BDRV_ACTION_REPORT; > - ret = mirror_iteration(s, &action); > - if (ret < 0 && action == BDRV_ACTION_REPORT) { > - goto immediate_exit; > + > + /* Note that even when no rate limit is applied we need to yield > + * periodically with no pending I/O so that qemu_aio_flush() returns. > + * We do so every SLICE_TIME milliseconds, or when there is an error, s/milli/nano/ > + * or when the source is clean, whichever comes first. > + */ > + if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME && > + s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) { > + if (s->in_flight > 0) { > + trace_mirror_yield(s, s->in_flight, cnt); > + qemu_coroutine_yield(); > + continue; > + } else if (cnt != 0) { > + mirror_iteration(s); > + continue; > } > - cnt = bdrv_get_dirty_count(bs); > } > > should_complete = false; > - if (cnt == 0) { > + if (s->in_flight == 0 && cnt == 0) { > trace_mirror_before_flush(s); > ret = bdrv_flush(s->target); > if (ret < 0) { > if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) { > - goto immediate_exit; > + break; Is this an unrelated change? > } > } else { > /* We're out of the streaming phase. From now on, if the job > @@ -244,15 +315,12 @@ static void coroutine_fn mirror_run(void *opaque) > delay_ns = 0; > } > > - /* Note that even when no rate limit is applied we need to yield > - * with no pending I/O here so that bdrv_drain_all() returns. > - */ > block_job_sleep_ns(&s->common, rt_clock, delay_ns); > if (block_job_is_cancelled(&s->common)) { > break; > } > } else if (!should_complete) { > - delay_ns = (cnt == 0 ? SLICE_TIME : 0); > + delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0); > block_job_sleep_ns(&s->common, rt_clock, delay_ns); > } else if (cnt == 0) { Why don't we need to check s->in_flight == 0 here? > /* The two disks are in sync. Exit and report successful > @@ -262,9 +330,20 @@ static void coroutine_fn mirror_run(void *opaque) > s->common.cancelled = false; > break; > } > + last_pause_ns = qemu_get_clock_ns(rt_clock); > } > > immediate_exit: > + if (s->in_flight > 0) { > + /* We get here only if something went wrong. Either the job failed, > + * or it was cancelled prematurely so that we do not guarantee that > + * the target is a copy of the source. > + */ > + assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common))); > + mirror_drain(s); > + } > + > + assert(s->in_flight == 0); > qemu_vfree(s->buf); > g_free(s->cow_bitmap); > bdrv_set_dirty_tracking(bs, 0); Kevin
Il 21/01/2013 12:39, Kevin Wolf ha scritto: > Am 16.01.2013 18:31, schrieb Paolo Bonzini: >> There is really no change in the behavior of the job here, since >> there is still a maximum of one in-flight I/O operation between >> the source and the target. However, this patch already introduces >> the AIO callbacks (which are unmodified in the next patch) >> and some of the logic to count in-flight operations and only >> complete the job when there is none. >> >> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> >> --- >> block/mirror.c | 155 ++++++++++++++++++++++++++++++++++++++++++-------------- >> trace-events | 2 + >> 2 files changed, 119 insertions(+), 38 deletions(-) >> >> diff --git a/block/mirror.c b/block/mirror.c >> index ab41340..75c550a 100644 >> --- a/block/mirror.c >> +++ b/block/mirror.c >> @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob { >> unsigned long *cow_bitmap; >> HBitmapIter hbi; >> uint8_t *buf; >> + >> + int in_flight; >> + int ret; >> } MirrorBlockJob; >> >> +typedef struct MirrorOp { >> + MirrorBlockJob *s; >> + QEMUIOVector qiov; >> + struct iovec iov; >> + int64_t sector_num; >> + int nb_sectors; >> +} MirrorOp; >> + >> static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >> int error) >> { >> @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >> } >> } >> >> -static int coroutine_fn mirror_iteration(MirrorBlockJob *s, >> - BlockErrorAction *p_action) >> +static void mirror_iteration_done(MirrorOp *op) >> +{ >> + MirrorBlockJob *s = op->s; >> + >> + s->in_flight--; >> + trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors); >> + g_slice_free(MirrorOp, op); >> + qemu_coroutine_enter(s->common.co, NULL); > > This doesn't check if the job coroutine is actually in a state where > it's valid to reenter. > > Technically it might even be okay because reentering during a sleep is > allowed and as good as reentering during the new yield, and bdrv_flush() > is only called if s->in_flight == 0. Most other calls _should_ be okay > as well, but I'm not so sure about bdrv_drain_all(), especially once > .bdrv_drain exists. bdrv_drain_all is also called only if s->in_flight == 0 too, but I see your point. It is indeed quite subtle, but it's okay. > As you can see, this is becoming very subtle, so I would prefer adding > some explicit bool s->may_reenter or something like that. The right boolean to test is already there, it's job->busy. I can add a new API block_job_yield/block_job_enter (where block_job_yield resets/sets busy across the yield, and block_job_enter only enters if !job->busy), but that would be a separate series IMO. >> +} > >> @@ -177,28 +233,43 @@ static void coroutine_fn mirror_run(void *opaque) >> } >> >> bdrv_dirty_iter_init(bs, &s->hbi); >> + last_pause_ns = qemu_get_clock_ns(rt_clock); >> for (;;) { >> uint64_t delay_ns; >> int64_t cnt; >> bool should_complete; >> >> + if (s->ret < 0) { >> + ret = s->ret; >> + break; >> + } >> + >> cnt = bdrv_get_dirty_count(bs); >> - if (cnt != 0) { >> - BlockErrorAction action = BDRV_ACTION_REPORT; >> - ret = mirror_iteration(s, &action); >> - if (ret < 0 && action == BDRV_ACTION_REPORT) { >> - goto immediate_exit; >> + >> + /* Note that even when no rate limit is applied we need to yield >> + * periodically with no pending I/O so that qemu_aio_flush() returns. >> + * We do so every SLICE_TIME milliseconds, or when there is an error, > > s/milli/nano/ > >> + * or when the source is clean, whichever comes first. >> + */ >> + if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME && >> + s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) { >> + if (s->in_flight > 0) { >> + trace_mirror_yield(s, s->in_flight, cnt); >> + qemu_coroutine_yield(); >> + continue; >> + } else if (cnt != 0) { >> + mirror_iteration(s); >> + continue; >> } >> - cnt = bdrv_get_dirty_count(bs); >> } >> >> should_complete = false; >> - if (cnt == 0) { >> + if (s->in_flight == 0 && cnt == 0) { >> trace_mirror_before_flush(s); >> ret = bdrv_flush(s->target); >> if (ret < 0) { >> if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) { >> - goto immediate_exit; >> + break; > > Is this an unrelated change? Seems like a rebase hiccup. Doesn't have any semantic change, I'll drop it. >> } >> } else { >> /* We're out of the streaming phase. From now on, if the job >> @@ -244,15 +315,12 @@ static void coroutine_fn mirror_run(void *opaque) >> delay_ns = 0; >> } >> >> - /* Note that even when no rate limit is applied we need to yield >> - * with no pending I/O here so that bdrv_drain_all() returns. >> - */ >> block_job_sleep_ns(&s->common, rt_clock, delay_ns); >> if (block_job_is_cancelled(&s->common)) { >> break; >> } >> } else if (!should_complete) { >> - delay_ns = (cnt == 0 ? SLICE_TIME : 0); >> + delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0); >> block_job_sleep_ns(&s->common, rt_clock, delay_ns); >> } else if (cnt == 0) { > > Why don't we need to check s->in_flight == 0 here? As above, should_complete is only set to true within an if(in_flight == 0). Paolo >> /* The two disks are in sync. Exit and report successful >> @@ -262,9 +330,20 @@ static void coroutine_fn mirror_run(void *opaque) >> s->common.cancelled = false; >> break; >> } >> + last_pause_ns = qemu_get_clock_ns(rt_clock); >> } >> >> immediate_exit: >> + if (s->in_flight > 0) { >> + /* We get here only if something went wrong. Either the job failed, >> + * or it was cancelled prematurely so that we do not guarantee that >> + * the target is a copy of the source. >> + */ >> + assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common))); >> + mirror_drain(s); >> + } >> + >> + assert(s->in_flight == 0); >> qemu_vfree(s->buf); >> g_free(s->cow_bitmap); >> bdrv_set_dirty_tracking(bs, 0); > > Kevin > >
Am 21.01.2013 13:09, schrieb Paolo Bonzini: > Il 21/01/2013 12:39, Kevin Wolf ha scritto: >> Am 16.01.2013 18:31, schrieb Paolo Bonzini: >>> There is really no change in the behavior of the job here, since >>> there is still a maximum of one in-flight I/O operation between >>> the source and the target. However, this patch already introduces >>> the AIO callbacks (which are unmodified in the next patch) >>> and some of the logic to count in-flight operations and only >>> complete the job when there is none. >>> >>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> >>> --- >>> block/mirror.c | 155 ++++++++++++++++++++++++++++++++++++++++++-------------- >>> trace-events | 2 + >>> 2 files changed, 119 insertions(+), 38 deletions(-) >>> >>> diff --git a/block/mirror.c b/block/mirror.c >>> index ab41340..75c550a 100644 >>> --- a/block/mirror.c >>> +++ b/block/mirror.c >>> @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob { >>> unsigned long *cow_bitmap; >>> HBitmapIter hbi; >>> uint8_t *buf; >>> + >>> + int in_flight; >>> + int ret; >>> } MirrorBlockJob; >>> >>> +typedef struct MirrorOp { >>> + MirrorBlockJob *s; >>> + QEMUIOVector qiov; >>> + struct iovec iov; >>> + int64_t sector_num; >>> + int nb_sectors; >>> +} MirrorOp; >>> + >>> static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >>> int error) >>> { >>> @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >>> } >>> } >>> >>> -static int coroutine_fn mirror_iteration(MirrorBlockJob *s, >>> - BlockErrorAction *p_action) >>> +static void mirror_iteration_done(MirrorOp *op) >>> +{ >>> + MirrorBlockJob *s = op->s; >>> + >>> + s->in_flight--; >>> + trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors); >>> + g_slice_free(MirrorOp, op); >>> + qemu_coroutine_enter(s->common.co, NULL); >> >> This doesn't check if the job coroutine is actually in a state where >> it's valid to reenter. >> >> Technically it might even be okay because reentering during a sleep is >> allowed and as good as reentering during the new yield, and bdrv_flush() >> is only called if s->in_flight == 0. Most other calls _should_ be okay >> as well, but I'm not so sure about bdrv_drain_all(), especially once >> .bdrv_drain exists. > > bdrv_drain_all is also called only if s->in_flight == 0 too, but I see > your point. It is indeed quite subtle, but it's okay. Ah, yes, that's the part I missed. Looks correct indeed. >> As you can see, this is becoming very subtle, so I would prefer adding >> some explicit bool s->may_reenter or something like that. > > The right boolean to test is already there, it's job->busy. I can add a > new API block_job_yield/block_job_enter (where block_job_yield > resets/sets busy across the yield, and block_job_enter only enters if > !job->busy), but that would be a separate series IMO. Please put it on your todo list then. I think I can accept the current state if I know that it will be improved soon, even though I'm not very comfortable with it. Kevin
diff --git a/block/mirror.c b/block/mirror.c index ab41340..75c550a 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -33,8 +33,19 @@ typedef struct MirrorBlockJob { unsigned long *cow_bitmap; HBitmapIter hbi; uint8_t *buf; + + int in_flight; + int ret; } MirrorBlockJob; +typedef struct MirrorOp { + MirrorBlockJob *s; + QEMUIOVector qiov; + struct iovec iov; + int64_t sector_num; + int nb_sectors; +} MirrorOp; + static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, int error) { @@ -48,15 +59,60 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, } } -static int coroutine_fn mirror_iteration(MirrorBlockJob *s, - BlockErrorAction *p_action) +static void mirror_iteration_done(MirrorOp *op) +{ + MirrorBlockJob *s = op->s; + + s->in_flight--; + trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors); + g_slice_free(MirrorOp, op); + qemu_coroutine_enter(s->common.co, NULL); +} + +static void mirror_write_complete(void *opaque, int ret) +{ + MirrorOp *op = opaque; + MirrorBlockJob *s = op->s; + if (ret < 0) { + BlockDriverState *source = s->common.bs; + BlockErrorAction action; + + bdrv_set_dirty(source, op->sector_num, op->nb_sectors); + action = mirror_error_action(s, false, -ret); + if (action == BDRV_ACTION_REPORT && s->ret >= 0) { + s->ret = ret; + } + } + mirror_iteration_done(op); +} + +static void mirror_read_complete(void *opaque, int ret) +{ + MirrorOp *op = opaque; + MirrorBlockJob *s = op->s; + if (ret < 0) { + BlockDriverState *source = s->common.bs; + BlockErrorAction action; + + bdrv_set_dirty(source, op->sector_num, op->nb_sectors); + action = mirror_error_action(s, true, -ret); + if (action == BDRV_ACTION_REPORT && s->ret >= 0) { + s->ret = ret; + } + + mirror_iteration_done(op); + return; + } + bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors, + mirror_write_complete, op); +} + +static void coroutine_fn mirror_iteration(MirrorBlockJob *s) { BlockDriverState *source = s->common.bs; - BlockDriverState *target = s->target; - QEMUIOVector qiov; - int ret, nb_sectors, nb_sectors_chunk; + int nb_sectors, nb_sectors_chunk; int64_t end, sector_num, cluster_num; - struct iovec iov; + MirrorOp *op; s->sector_num = hbitmap_iter_next(&s->hbi); if (s->sector_num < 0) { @@ -87,31 +143,30 @@ static int coroutine_fn mirror_iteration(MirrorBlockJob *s, end = s->common.len >> BDRV_SECTOR_BITS; nb_sectors = MIN(nb_sectors, end - sector_num); + + /* Allocate a MirrorOp that is used as an AIO callback. */ + op = g_slice_new(MirrorOp); + op->s = s; + op->iov.iov_base = s->buf; + op->iov.iov_len = nb_sectors * 512; + op->sector_num = sector_num; + op->nb_sectors = nb_sectors; + qemu_iovec_init_external(&op->qiov, &op->iov, 1); + bdrv_reset_dirty(source, sector_num, nb_sectors); /* Copy the dirty cluster. */ - iov.iov_base = s->buf; - iov.iov_len = nb_sectors * 512; - qemu_iovec_init_external(&qiov, &iov, 1); - + s->in_flight++; trace_mirror_one_iteration(s, sector_num, nb_sectors); - ret = bdrv_co_readv(source, sector_num, nb_sectors, &qiov); - if (ret < 0) { - *p_action = mirror_error_action(s, true, -ret); - goto fail; - } - ret = bdrv_co_writev(target, sector_num, nb_sectors, &qiov); - if (ret < 0) { - *p_action = mirror_error_action(s, false, -ret); - s->synced = false; - goto fail; - } - return 0; + bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors, + mirror_read_complete, op); +} -fail: - /* Try again later. */ - bdrv_set_dirty(source, sector_num, nb_sectors); - return ret; +static void mirror_drain(MirrorBlockJob *s) +{ + while (s->in_flight > 0) { + qemu_coroutine_yield(); + } } static void coroutine_fn mirror_run(void *opaque) @@ -119,6 +174,7 @@ static void coroutine_fn mirror_run(void *opaque) MirrorBlockJob *s = opaque; BlockDriverState *bs = s->common.bs; int64_t sector_num, end, nb_sectors_chunk, length; + uint64_t last_pause_ns; BlockDriverInfo bdi; char backing_filename[1024]; int ret = 0; @@ -177,28 +233,43 @@ static void coroutine_fn mirror_run(void *opaque) } bdrv_dirty_iter_init(bs, &s->hbi); + last_pause_ns = qemu_get_clock_ns(rt_clock); for (;;) { uint64_t delay_ns; int64_t cnt; bool should_complete; + if (s->ret < 0) { + ret = s->ret; + break; + } + cnt = bdrv_get_dirty_count(bs); - if (cnt != 0) { - BlockErrorAction action = BDRV_ACTION_REPORT; - ret = mirror_iteration(s, &action); - if (ret < 0 && action == BDRV_ACTION_REPORT) { - goto immediate_exit; + + /* Note that even when no rate limit is applied we need to yield + * periodically with no pending I/O so that qemu_aio_flush() returns. + * We do so every SLICE_TIME milliseconds, or when there is an error, + * or when the source is clean, whichever comes first. + */ + if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME && + s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) { + if (s->in_flight > 0) { + trace_mirror_yield(s, s->in_flight, cnt); + qemu_coroutine_yield(); + continue; + } else if (cnt != 0) { + mirror_iteration(s); + continue; } - cnt = bdrv_get_dirty_count(bs); } should_complete = false; - if (cnt == 0) { + if (s->in_flight == 0 && cnt == 0) { trace_mirror_before_flush(s); ret = bdrv_flush(s->target); if (ret < 0) { if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) { - goto immediate_exit; + break; } } else { /* We're out of the streaming phase. From now on, if the job @@ -244,15 +315,12 @@ static void coroutine_fn mirror_run(void *opaque) delay_ns = 0; } - /* Note that even when no rate limit is applied we need to yield - * with no pending I/O here so that bdrv_drain_all() returns. - */ block_job_sleep_ns(&s->common, rt_clock, delay_ns); if (block_job_is_cancelled(&s->common)) { break; } } else if (!should_complete) { - delay_ns = (cnt == 0 ? SLICE_TIME : 0); + delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0); block_job_sleep_ns(&s->common, rt_clock, delay_ns); } else if (cnt == 0) { /* The two disks are in sync. Exit and report successful @@ -262,9 +330,20 @@ static void coroutine_fn mirror_run(void *opaque) s->common.cancelled = false; break; } + last_pause_ns = qemu_get_clock_ns(rt_clock); } immediate_exit: + if (s->in_flight > 0) { + /* We get here only if something went wrong. Either the job failed, + * or it was cancelled prematurely so that we do not guarantee that + * the target is a copy of the source. + */ + assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common))); + mirror_drain(s); + } + + assert(s->in_flight == 0); qemu_vfree(s->buf); g_free(s->cow_bitmap); bdrv_set_dirty_tracking(bs, 0); diff --git a/trace-events b/trace-events index b0a8085..4427d1f 100644 --- a/trace-events +++ b/trace-events @@ -85,6 +85,8 @@ mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64 mirror_before_sleep(void *s, int64_t cnt, int synced) "s %p dirty count %"PRId64" synced %d" mirror_one_iteration(void *s, int64_t sector_num, int nb_sectors) "s %p sector_num %"PRId64" nb_sectors %d" mirror_cow(void *s, int64_t sector_num) "s %p sector_num %"PRId64 +mirror_iteration_done(void *s, int64_t sector_num, int nb_sectors) "s %p sector_num %"PRId64" nb_sectors %d" +mirror_yield(void *s, int64_t cnt, int in_flight) "s %p dirty count %"PRId64" in_flight %d" # blockdev.c qmp_block_job_cancel(void *job) "job %p"
There is really no change in the behavior of the job here, since there is still a maximum of one in-flight I/O operation between the source and the target. However, this patch already introduces the AIO callbacks (which are unmodified in the next patch) and some of the logic to count in-flight operations and only complete the job when there is none. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> --- block/mirror.c | 155 ++++++++++++++++++++++++++++++++++++++++++-------------- trace-events | 2 + 2 files changed, 119 insertions(+), 38 deletions(-)