Message ID | 20180613181823.13618-3-mreitz@redhat.com |
---|---|
State | New |
Headers | show |
Series | block/mirror: Add active-sync mirroring | expand |
Am 13.06.2018 um 20:18 hat Max Reitz geschrieben: > In order to talk to the source BDS (and maybe in the future to the > target BDS as well) directly, we need to convert our existing AIO > requests into coroutine I/O requests. > > Signed-off-by: Max Reitz <mreitz@redhat.com> > Reviewed-by: Fam Zheng <famz@redhat.com> > --- > block/mirror.c | 152 +++++++++++++++++++++++++++++-------------------- > 1 file changed, 90 insertions(+), 62 deletions(-) > > diff --git a/block/mirror.c b/block/mirror.c > index f5e240611b..47122482ff 100644 > --- a/block/mirror.c > +++ b/block/mirror.c > @@ -78,6 +78,10 @@ typedef struct MirrorOp { > QEMUIOVector qiov; > int64_t offset; > uint64_t bytes; > + > + /* The pointee is set by mirror_co_read(), mirror_co_zero(), and > + * mirror_co_discard() before yielding for the first time */ > + int64_t *bytes_handled; Should we at least document that this becomes invalid after the MirrorOp coroutine has yielded for the first time because it points to a stack variable of the caller which returns then? > } MirrorOp; > > typedef enum MirrorMethod { > @@ -99,7 +103,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, > } > } > > -static void mirror_iteration_done(MirrorOp *op, int ret) > +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) > { > MirrorBlockJob *s = op->s; > struct iovec *iov; > @@ -136,9 +140,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) > } > } > > -static void mirror_write_complete(void *opaque, int ret) > +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) > { > - MirrorOp *op = opaque; > MirrorBlockJob *s = op->s; > > aio_context_acquire(blk_get_aio_context(s->common.blk)); > @@ -155,9 +158,8 @@ static void mirror_write_complete(void *opaque, int ret) > aio_context_release(blk_get_aio_context(s->common.blk)); > } One of the effects of this conversion is that we hold the AioContext lock for s->common.blk recursively multiple times. I don't think anything calls drain in such sections (which would hang), so it's probably okay, but it could turn out to be a trap in the long run. > -static void mirror_read_complete(void *opaque, int ret) > +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) > { > - MirrorOp *op = opaque; > MirrorBlockJob *s = op->s; > > aio_context_acquire(blk_get_aio_context(s->common.blk)); > @@ -172,8 +174,9 @@ static void mirror_read_complete(void *opaque, int ret) > > mirror_iteration_done(op, ret); > } else { > - blk_aio_pwritev(s->target, op->offset, &op->qiov, > - 0, mirror_write_complete, op); > + ret = blk_co_pwritev(s->target, op->offset, > + op->qiov.size, &op->qiov, 0); > + mirror_write_complete(op, ret); > } > aio_context_release(blk_get_aio_context(s->common.blk)); > } > @@ -230,60 +233,57 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) > s->waiting_for_io = false; > } > > -/* Submit async read while handling COW. > - * Returns: The number of bytes copied after and including offset, > - * excluding any bytes copied prior to offset due to alignment. > - * This will be @bytes if no alignment is necessary, or > - * (new_end - offset) if tail is rounded up or down due to > - * alignment or buffer limit. > +/* Perform a mirror copy operation. > + * > + * *op->bytes_handled is set to the number of bytes copied after and > + * including offset, excluding any bytes copied prior to offset due > + * to alignment. This will be op->bytes if no alignment is necessary, > + * or (new_end - op->offset) if the tail is rounded up or down due to > + * alignment or buffer limit. > */ > -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, > - uint64_t bytes) > +static void coroutine_fn mirror_co_read(void *opaque) > { > + MirrorOp *op = opaque; > + MirrorBlockJob *s = op->s; > BlockBackend *source = s->common.blk; > int nb_chunks; > uint64_t ret; > - MirrorOp *op; > uint64_t max_bytes; > > max_bytes = s->granularity * s->max_iov; > > /* We can only handle as much as buf_size at a time. */ > - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); > - assert(bytes); > - assert(bytes < BDRV_REQUEST_MAX_BYTES); > - ret = bytes; > + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); > + assert(op->bytes); > + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); > + *op->bytes_handled = op->bytes; > > if (s->cow_bitmap) { > - ret += mirror_cow_align(s, &offset, &bytes); > + *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes); > } Actually, if we factor out this block, call it from mirror_perform() and only assert the conditions here, we could get completely rid of that peculiar op->bytes_handled. Kevin
On 2018-06-14 17:22, Kevin Wolf wrote: > Am 13.06.2018 um 20:18 hat Max Reitz geschrieben: >> In order to talk to the source BDS (and maybe in the future to the >> target BDS as well) directly, we need to convert our existing AIO >> requests into coroutine I/O requests. >> >> Signed-off-by: Max Reitz <mreitz@redhat.com> >> Reviewed-by: Fam Zheng <famz@redhat.com> >> --- >> block/mirror.c | 152 +++++++++++++++++++++++++++++-------------------- >> 1 file changed, 90 insertions(+), 62 deletions(-) >> >> diff --git a/block/mirror.c b/block/mirror.c >> index f5e240611b..47122482ff 100644 >> --- a/block/mirror.c >> +++ b/block/mirror.c >> @@ -78,6 +78,10 @@ typedef struct MirrorOp { >> QEMUIOVector qiov; >> int64_t offset; >> uint64_t bytes; >> + >> + /* The pointee is set by mirror_co_read(), mirror_co_zero(), and >> + * mirror_co_discard() before yielding for the first time */ >> + int64_t *bytes_handled; > > Should we at least document that this becomes invalid after the > MirrorOp coroutine has yielded for the first time because it points to a > stack variable of the caller which returns then? Can do. >> } MirrorOp; >> >> typedef enum MirrorMethod { >> @@ -99,7 +103,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >> } >> } >> >> -static void mirror_iteration_done(MirrorOp *op, int ret) >> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) >> { >> MirrorBlockJob *s = op->s; >> struct iovec *iov; >> @@ -136,9 +140,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) >> } >> } >> >> -static void mirror_write_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -155,9 +158,8 @@ static void mirror_write_complete(void *opaque, int ret) >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } > > One of the effects of this conversion is that we hold the AioContext > lock for s->common.blk recursively multiple times. I don't think > anything calls drain in such sections (which would hang), so it's > probably okay, but it could turn out to be a trap in the long run. Sounds like something I'll be cleaning up in the planned followup. https://git.xanclic.moe/XanClic/qemu/commit/0d2f888b9f96634b9665 >> -static void mirror_read_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -172,8 +174,9 @@ static void mirror_read_complete(void *opaque, int ret) >> >> mirror_iteration_done(op, ret); >> } else { >> - blk_aio_pwritev(s->target, op->offset, &op->qiov, >> - 0, mirror_write_complete, op); >> + ret = blk_co_pwritev(s->target, op->offset, >> + op->qiov.size, &op->qiov, 0); >> + mirror_write_complete(op, ret); >> } >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } >> @@ -230,60 +233,57 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) >> s->waiting_for_io = false; >> } >> >> -/* Submit async read while handling COW. >> - * Returns: The number of bytes copied after and including offset, >> - * excluding any bytes copied prior to offset due to alignment. >> - * This will be @bytes if no alignment is necessary, or >> - * (new_end - offset) if tail is rounded up or down due to >> - * alignment or buffer limit. >> +/* Perform a mirror copy operation. >> + * >> + * *op->bytes_handled is set to the number of bytes copied after and >> + * including offset, excluding any bytes copied prior to offset due >> + * to alignment. This will be op->bytes if no alignment is necessary, >> + * or (new_end - op->offset) if the tail is rounded up or down due to >> + * alignment or buffer limit. >> */ >> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, >> - uint64_t bytes) >> +static void coroutine_fn mirror_co_read(void *opaque) >> { >> + MirrorOp *op = opaque; >> + MirrorBlockJob *s = op->s; >> BlockBackend *source = s->common.blk; >> int nb_chunks; >> uint64_t ret; >> - MirrorOp *op; >> uint64_t max_bytes; >> >> max_bytes = s->granularity * s->max_iov; >> >> /* We can only handle as much as buf_size at a time. */ >> - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); >> - assert(bytes); >> - assert(bytes < BDRV_REQUEST_MAX_BYTES); >> - ret = bytes; >> + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); >> + assert(op->bytes); >> + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); >> + *op->bytes_handled = op->bytes; >> >> if (s->cow_bitmap) { >> - ret += mirror_cow_align(s, &offset, &bytes); >> + *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes); >> } > > Actually, if we factor out this block, call it from mirror_perform() and > only assert the conditions here, we could get completely rid of that > peculiar op->bytes_handled. That is definitely something I'll clean up in the followup: https://git.xanclic.moe/XanClic/qemu/commit/6cbe927e08dd1ec26a4c I'd rather leave it in this series and worry about it later. I gradually phased out more and more nice-to-have things from this series so it could get in at some point, and that apparently was one of those things. Max
diff --git a/block/mirror.c b/block/mirror.c index f5e240611b..47122482ff 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -78,6 +78,10 @@ typedef struct MirrorOp { QEMUIOVector qiov; int64_t offset; uint64_t bytes; + + /* The pointee is set by mirror_co_read(), mirror_co_zero(), and + * mirror_co_discard() before yielding for the first time */ + int64_t *bytes_handled; } MirrorOp; typedef enum MirrorMethod { @@ -99,7 +103,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, } } -static void mirror_iteration_done(MirrorOp *op, int ret) +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) { MirrorBlockJob *s = op->s; struct iovec *iov; @@ -136,9 +140,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) } } -static void mirror_write_complete(void *opaque, int ret) +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -155,9 +158,8 @@ static void mirror_write_complete(void *opaque, int ret) aio_context_release(blk_get_aio_context(s->common.blk)); } -static void mirror_read_complete(void *opaque, int ret) +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -172,8 +174,9 @@ static void mirror_read_complete(void *opaque, int ret) mirror_iteration_done(op, ret); } else { - blk_aio_pwritev(s->target, op->offset, &op->qiov, - 0, mirror_write_complete, op); + ret = blk_co_pwritev(s->target, op->offset, + op->qiov.size, &op->qiov, 0); + mirror_write_complete(op, ret); } aio_context_release(blk_get_aio_context(s->common.blk)); } @@ -230,60 +233,57 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) s->waiting_for_io = false; } -/* Submit async read while handling COW. - * Returns: The number of bytes copied after and including offset, - * excluding any bytes copied prior to offset due to alignment. - * This will be @bytes if no alignment is necessary, or - * (new_end - offset) if tail is rounded up or down due to - * alignment or buffer limit. +/* Perform a mirror copy operation. + * + * *op->bytes_handled is set to the number of bytes copied after and + * including offset, excluding any bytes copied prior to offset due + * to alignment. This will be op->bytes if no alignment is necessary, + * or (new_end - op->offset) if the tail is rounded up or down due to + * alignment or buffer limit. */ -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, - uint64_t bytes) +static void coroutine_fn mirror_co_read(void *opaque) { + MirrorOp *op = opaque; + MirrorBlockJob *s = op->s; BlockBackend *source = s->common.blk; int nb_chunks; uint64_t ret; - MirrorOp *op; uint64_t max_bytes; max_bytes = s->granularity * s->max_iov; /* We can only handle as much as buf_size at a time. */ - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); - assert(bytes); - assert(bytes < BDRV_REQUEST_MAX_BYTES); - ret = bytes; + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); + assert(op->bytes); + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); + *op->bytes_handled = op->bytes; if (s->cow_bitmap) { - ret += mirror_cow_align(s, &offset, &bytes); + *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes); } - assert(bytes <= s->buf_size); + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ + assert(*op->bytes_handled <= UINT_MAX); + assert(op->bytes <= s->buf_size); /* The offset is granularity-aligned because: * 1) Caller passes in aligned values; * 2) mirror_cow_align is used only when target cluster is larger. */ - assert(QEMU_IS_ALIGNED(offset, s->granularity)); + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); /* The range is sector-aligned, since bdrv_getlength() rounds up. */ - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); while (s->buf_free_count < nb_chunks) { - trace_mirror_yield_in_flight(s, offset, s->in_flight); + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); mirror_wait_for_io(s); } - /* Allocate a MirrorOp that is used as an AIO callback. */ - op = g_new(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; - /* Now make a QEMUIOVector taking enough granularity-sized chunks * from s->buf_free. */ qemu_iovec_init(&op->qiov, nb_chunks); while (nb_chunks-- > 0) { MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); - size_t remaining = bytes - op->qiov.size; + size_t remaining = op->bytes - op->qiov.size; QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); s->buf_free_count--; @@ -292,53 +292,81 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, /* Copy the dirty cluster. */ s->in_flight++; - s->bytes_in_flight += bytes; - trace_mirror_one_iteration(s, offset, bytes); + s->bytes_in_flight += op->bytes; + trace_mirror_one_iteration(s, op->offset, op->bytes); - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); - return ret; + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0); + mirror_read_complete(op, ret); } -static void mirror_do_zero_or_discard(MirrorBlockJob *s, - int64_t offset, - uint64_t bytes, - bool is_discard) +static void coroutine_fn mirror_co_zero(void *opaque) { - MirrorOp *op; + MirrorOp *op = opaque; + int ret; - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed - * so the freeing in mirror_iteration_done is nop. */ - op = g_new0(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + *op->bytes_handled = op->bytes; - s->in_flight++; - s->bytes_in_flight += bytes; - if (is_discard) { - blk_aio_pdiscard(s->target, offset, - op->bytes, mirror_write_complete, op); - } else { - blk_aio_pwrite_zeroes(s->target, offset, - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, - mirror_write_complete, op); - } + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); + mirror_write_complete(op, ret); +} + +static void coroutine_fn mirror_co_discard(void *opaque) +{ + MirrorOp *op = opaque; + int ret; + + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + *op->bytes_handled = op->bytes; + + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); + mirror_write_complete(op, ret); } static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, unsigned bytes, MirrorMethod mirror_method) { + MirrorOp *op; + Coroutine *co; + int64_t bytes_handled = -1; + + op = g_new(MirrorOp, 1); + *op = (MirrorOp){ + .s = s, + .offset = offset, + .bytes = bytes, + .bytes_handled = &bytes_handled, + }; + switch (mirror_method) { case MIRROR_METHOD_COPY: - return mirror_do_read(s, offset, bytes); + co = qemu_coroutine_create(mirror_co_read, op); + break; case MIRROR_METHOD_ZERO: + co = qemu_coroutine_create(mirror_co_zero, op); + break; case MIRROR_METHOD_DISCARD: - mirror_do_zero_or_discard(s, offset, bytes, - mirror_method == MIRROR_METHOD_DISCARD); - return bytes; + co = qemu_coroutine_create(mirror_co_discard, op); + break; default: abort(); } + + qemu_coroutine_enter(co); + /* At this point, ownership of op has been moved to the coroutine + * and the object may already be freed */ + + /* Assert that this value has been set */ + assert(bytes_handled >= 0); + + /* Same assertion as in mirror_co_read() (and for mirror_co_read() + * and mirror_co_discard(), bytes_handled == op->bytes, which + * is the @bytes parameter given to this function) */ + assert(bytes_handled <= UINT_MAX); + return bytes_handled; } static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)