diff mbox series

[05/18] block/mirror: Convert to coroutines

Message ID 20170913181910.29688-6-mreitz@redhat.com
State New
Headers show
Series block/mirror: Add active-sync mirroring | expand

Commit Message

Max Reitz Sept. 13, 2017, 6:18 p.m. UTC
In order to talk to the source BDS (and maybe in the future to the
target BDS as well) directly, we need to convert our existing AIO
requests into coroutine I/O requests.

Signed-off-by: Max Reitz <mreitz@redhat.com>
---
 block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------
 1 file changed, 78 insertions(+), 56 deletions(-)

Comments

Fam Zheng Sept. 18, 2017, 6:02 a.m. UTC | #1
On Wed, 09/13 20:18, Max Reitz wrote:
> In order to talk to the source BDS (and maybe in the future to the
> target BDS as well) directly, we need to convert our existing AIO
> requests into coroutine I/O requests.
> 
> Signed-off-by: Max Reitz <mreitz@redhat.com>
> ---
>  block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------
>  1 file changed, 78 insertions(+), 56 deletions(-)
> 
> diff --git a/block/mirror.c b/block/mirror.c
> index 4664b0516f..2b3297aa61 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -80,6 +80,9 @@ typedef struct MirrorOp {
>      QEMUIOVector qiov;
>      int64_t offset;
>      uint64_t bytes;
> +
> +    /* Set by mirror_co_read() before yielding for the first time */
> +    uint64_t bytes_copied;
>  } MirrorOp;
>  
>  typedef enum MirrorMethod {
> @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>      }
>  }
>  
> -static void mirror_iteration_done(MirrorOp *op, int ret)
> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
>  {
>      MirrorBlockJob *s = op->s;
>      struct iovec *iov;
> @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
>      }
>  }
>  
> -static void mirror_write_complete(void *opaque, int ret)
> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
>  {
> -    MirrorOp *op = opaque;
>      MirrorBlockJob *s = op->s;
>  
>      aio_context_acquire(blk_get_aio_context(s->common.blk));
> @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
>      aio_context_release(blk_get_aio_context(s->common.blk));
>  }
>  
> -static void mirror_read_complete(void *opaque, int ret)
> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
>  {
> -    MirrorOp *op = opaque;
>      MirrorBlockJob *s = op->s;
>  
>      aio_context_acquire(blk_get_aio_context(s->common.blk));
> @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret)
>  
>          mirror_iteration_done(op, ret);
>      } else {
> -        blk_aio_pwritev(s->target, op->offset, &op->qiov,
> -                        0, mirror_write_complete, op);
> +        int ret;
> +
> +        ret = blk_co_pwritev(s->target, op->offset,
> +                             op->qiov.size, &op->qiov, 0);
> +        mirror_write_complete(op, ret);
>      }
>      aio_context_release(blk_get_aio_context(s->common.blk));
>  }
> @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
>   *          (new_end - offset) if tail is rounded up or down due to
>   *          alignment or buffer limit.
>   */
> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
> -                               uint64_t bytes)
> +static void coroutine_fn mirror_co_read(void *opaque)
>  {
> +    MirrorOp *op = opaque;
> +    MirrorBlockJob *s = op->s;
>      BlockBackend *source = s->common.blk;
>      int nb_chunks;
>      uint64_t ret;
> -    MirrorOp *op;
>      uint64_t max_bytes;
>  
>      max_bytes = s->granularity * s->max_iov;
>  
>      /* We can only handle as much as buf_size at a time. */
> -    bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
> -    assert(bytes);
> -    assert(bytes < BDRV_REQUEST_MAX_BYTES);
> -    ret = bytes;
> +    op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
> +    assert(op->bytes);
> +    assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
> +    op->bytes_copied = op->bytes;
>  
>      if (s->cow_bitmap) {
> -        ret += mirror_cow_align(s, &offset, &bytes);
> +        op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes);
>      }
> -    assert(bytes <= s->buf_size);
> +    /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
> +    assert(op->bytes_copied <= UINT_MAX);
> +    assert(op->bytes <= s->buf_size);
>      /* The offset is granularity-aligned because:
>       * 1) Caller passes in aligned values;
>       * 2) mirror_cow_align is used only when target cluster is larger. */
> -    assert(QEMU_IS_ALIGNED(offset, s->granularity));
> +    assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
>      /* The range is sector-aligned, since bdrv_getlength() rounds up. */
> -    assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
> -    nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
> +    assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
> +    nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
>  
>      while (s->buf_free_count < nb_chunks) {
> -        trace_mirror_yield_in_flight(s, offset, s->in_flight);
> +        trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
>          mirror_wait_for_io(s);
>      }
>  
> -    /* Allocate a MirrorOp that is used as an AIO callback.  */
> -    op = g_new(MirrorOp, 1);
> -    op->s = s;
> -    op->offset = offset;
> -    op->bytes = bytes;
> -
>      /* Now make a QEMUIOVector taking enough granularity-sized chunks
>       * from s->buf_free.
>       */
>      qemu_iovec_init(&op->qiov, nb_chunks);
>      while (nb_chunks-- > 0) {
>          MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
> -        size_t remaining = bytes - op->qiov.size;
> +        size_t remaining = op->bytes - op->qiov.size;
>  
>          QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
>          s->buf_free_count--;
> @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
>  
>      /* Copy the dirty cluster.  */
>      s->in_flight++;
> -    s->bytes_in_flight += bytes;
> -    trace_mirror_one_iteration(s, offset, bytes);
> +    s->bytes_in_flight += op->bytes;
> +    trace_mirror_one_iteration(s, op->offset, op->bytes);
>  
> -    blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
> -    return ret;
> +    ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
> +    mirror_read_complete(op, ret);
>  }
>  
> -static void mirror_do_zero_or_discard(MirrorBlockJob *s,
> -                                      int64_t offset,
> -                                      uint64_t bytes,
> -                                      bool is_discard)
> +static void coroutine_fn mirror_co_zero(void *opaque)
>  {
> -    MirrorOp *op;
> +    MirrorOp *op = opaque;
> +    int ret;
>  
> -    /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
> -     * so the freeing in mirror_iteration_done is nop. */
> -    op = g_new0(MirrorOp, 1);
> -    op->s = s;
> -    op->offset = offset;
> -    op->bytes = bytes;
> +    op->s->in_flight++;
> +    op->s->bytes_in_flight += op->bytes;
>  
> -    s->in_flight++;
> -    s->bytes_in_flight += bytes;
> -    if (is_discard) {
> -        blk_aio_pdiscard(s->target, offset,
> -                         op->bytes, mirror_write_complete, op);
> -    } else {
> -        blk_aio_pwrite_zeroes(s->target, offset,
> -                              op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
> -                              mirror_write_complete, op);
> -    }
> +    ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
> +                               op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
> +    mirror_write_complete(op, ret);
> +}
> +
> +static void coroutine_fn mirror_co_discard(void *opaque)
> +{
> +    MirrorOp *op = opaque;
> +    int ret;
> +
> +    op->s->in_flight++;
> +    op->s->bytes_in_flight += op->bytes;
> +
> +    ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
> +    mirror_write_complete(op, ret);
>  }
>  
>  static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
>                                 unsigned bytes, MirrorMethod mirror_method)
>  {
> +    MirrorOp *op;
> +    Coroutine *co;
> +    unsigned ret = bytes;
> +
> +    op = g_new(MirrorOp, 1);
> +    *op = (MirrorOp){
> +        .s      = s,
> +        .offset = offset,
> +        .bytes  = bytes,
> +    };
> +
>      switch (mirror_method) {
>      case MIRROR_METHOD_COPY:
> -        return mirror_do_read(s, offset, bytes);
> +        co = qemu_coroutine_create(mirror_co_read, op);
> +        break;
>      case MIRROR_METHOD_ZERO:
> +        co = qemu_coroutine_create(mirror_co_zero, op);
> +        break;
>      case MIRROR_METHOD_DISCARD:
> -        mirror_do_zero_or_discard(s, offset, bytes,
> -                                  mirror_method == MIRROR_METHOD_DISCARD);
> -        return bytes;
> +        co = qemu_coroutine_create(mirror_co_discard, op);
> +        break;
>      default:
>          abort();
>      }
> +
> +    qemu_coroutine_enter(co);
> +
> +    if (mirror_method == MIRROR_METHOD_COPY) {
> +        /* Same assertion as in mirror_co_read() */
> +        assert(op->bytes_copied <= UINT_MAX);
> +        ret = op->bytes_copied;
> +    }

This special casing is a bit ugly. Can you just make mirror_co_zero and
mirror_co_discard set op->bytes_copied too? (and perhaps rename to
op->bytes_handled) If so the comment in MirrorOp needs an update too.

And is it better to initialize it to -1 before entering coroutine, then assert
it is != -1 afterwards?

> +
> +    return ret;
>  }
>  
>  static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
> -- 
> 2.13.5
> 

Fam
Max Reitz Sept. 18, 2017, 4:41 p.m. UTC | #2
On 2017-09-18 08:02, Fam Zheng wrote:
> On Wed, 09/13 20:18, Max Reitz wrote:
>> In order to talk to the source BDS (and maybe in the future to the
>> target BDS as well) directly, we need to convert our existing AIO
>> requests into coroutine I/O requests.
>>
>> Signed-off-by: Max Reitz <mreitz@redhat.com>
>> ---
>>  block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------
>>  1 file changed, 78 insertions(+), 56 deletions(-)
>>
>> diff --git a/block/mirror.c b/block/mirror.c
>> index 4664b0516f..2b3297aa61 100644
>> --- a/block/mirror.c
>> +++ b/block/mirror.c
>> @@ -80,6 +80,9 @@ typedef struct MirrorOp {
>>      QEMUIOVector qiov;
>>      int64_t offset;
>>      uint64_t bytes;
>> +
>> +    /* Set by mirror_co_read() before yielding for the first time */
>> +    uint64_t bytes_copied;
>>  } MirrorOp;
>>  
>>  typedef enum MirrorMethod {
>> @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>      }
>>  }
>>  
>> -static void mirror_iteration_done(MirrorOp *op, int ret)
>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
>>  {
>>      MirrorBlockJob *s = op->s;
>>      struct iovec *iov;
>> @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
>>      }
>>  }
>>  
>> -static void mirror_write_complete(void *opaque, int ret)
>> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
>>  {
>> -    MirrorOp *op = opaque;
>>      MirrorBlockJob *s = op->s;
>>  
>>      aio_context_acquire(blk_get_aio_context(s->common.blk));
>> @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
>>      aio_context_release(blk_get_aio_context(s->common.blk));
>>  }
>>  
>> -static void mirror_read_complete(void *opaque, int ret)
>> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
>>  {
>> -    MirrorOp *op = opaque;
>>      MirrorBlockJob *s = op->s;
>>  
>>      aio_context_acquire(blk_get_aio_context(s->common.blk));
>> @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret)
>>  
>>          mirror_iteration_done(op, ret);
>>      } else {
>> -        blk_aio_pwritev(s->target, op->offset, &op->qiov,
>> -                        0, mirror_write_complete, op);
>> +        int ret;
>> +
>> +        ret = blk_co_pwritev(s->target, op->offset,
>> +                             op->qiov.size, &op->qiov, 0);
>> +        mirror_write_complete(op, ret);
>>      }
>>      aio_context_release(blk_get_aio_context(s->common.blk));
>>  }
>> @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
>>   *          (new_end - offset) if tail is rounded up or down due to
>>   *          alignment or buffer limit.
>>   */
>> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
>> -                               uint64_t bytes)
>> +static void coroutine_fn mirror_co_read(void *opaque)
>>  {
>> +    MirrorOp *op = opaque;
>> +    MirrorBlockJob *s = op->s;
>>      BlockBackend *source = s->common.blk;
>>      int nb_chunks;
>>      uint64_t ret;
>> -    MirrorOp *op;
>>      uint64_t max_bytes;
>>  
>>      max_bytes = s->granularity * s->max_iov;
>>  
>>      /* We can only handle as much as buf_size at a time. */
>> -    bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
>> -    assert(bytes);
>> -    assert(bytes < BDRV_REQUEST_MAX_BYTES);
>> -    ret = bytes;
>> +    op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
>> +    assert(op->bytes);
>> +    assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
>> +    op->bytes_copied = op->bytes;
>>  
>>      if (s->cow_bitmap) {
>> -        ret += mirror_cow_align(s, &offset, &bytes);
>> +        op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes);
>>      }
>> -    assert(bytes <= s->buf_size);
>> +    /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
>> +    assert(op->bytes_copied <= UINT_MAX);
>> +    assert(op->bytes <= s->buf_size);
>>      /* The offset is granularity-aligned because:
>>       * 1) Caller passes in aligned values;
>>       * 2) mirror_cow_align is used only when target cluster is larger. */
>> -    assert(QEMU_IS_ALIGNED(offset, s->granularity));
>> +    assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
>>      /* The range is sector-aligned, since bdrv_getlength() rounds up. */
>> -    assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
>> -    nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
>> +    assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
>> +    nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
>>  
>>      while (s->buf_free_count < nb_chunks) {
>> -        trace_mirror_yield_in_flight(s, offset, s->in_flight);
>> +        trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
>>          mirror_wait_for_io(s);
>>      }
>>  
>> -    /* Allocate a MirrorOp that is used as an AIO callback.  */
>> -    op = g_new(MirrorOp, 1);
>> -    op->s = s;
>> -    op->offset = offset;
>> -    op->bytes = bytes;
>> -
>>      /* Now make a QEMUIOVector taking enough granularity-sized chunks
>>       * from s->buf_free.
>>       */
>>      qemu_iovec_init(&op->qiov, nb_chunks);
>>      while (nb_chunks-- > 0) {
>>          MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
>> -        size_t remaining = bytes - op->qiov.size;
>> +        size_t remaining = op->bytes - op->qiov.size;
>>  
>>          QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
>>          s->buf_free_count--;
>> @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
>>  
>>      /* Copy the dirty cluster.  */
>>      s->in_flight++;
>> -    s->bytes_in_flight += bytes;
>> -    trace_mirror_one_iteration(s, offset, bytes);
>> +    s->bytes_in_flight += op->bytes;
>> +    trace_mirror_one_iteration(s, op->offset, op->bytes);
>>  
>> -    blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
>> -    return ret;
>> +    ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
>> +    mirror_read_complete(op, ret);
>>  }
>>  
>> -static void mirror_do_zero_or_discard(MirrorBlockJob *s,
>> -                                      int64_t offset,
>> -                                      uint64_t bytes,
>> -                                      bool is_discard)
>> +static void coroutine_fn mirror_co_zero(void *opaque)
>>  {
>> -    MirrorOp *op;
>> +    MirrorOp *op = opaque;
>> +    int ret;
>>  
>> -    /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
>> -     * so the freeing in mirror_iteration_done is nop. */
>> -    op = g_new0(MirrorOp, 1);
>> -    op->s = s;
>> -    op->offset = offset;
>> -    op->bytes = bytes;
>> +    op->s->in_flight++;
>> +    op->s->bytes_in_flight += op->bytes;
>>  
>> -    s->in_flight++;
>> -    s->bytes_in_flight += bytes;
>> -    if (is_discard) {
>> -        blk_aio_pdiscard(s->target, offset,
>> -                         op->bytes, mirror_write_complete, op);
>> -    } else {
>> -        blk_aio_pwrite_zeroes(s->target, offset,
>> -                              op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
>> -                              mirror_write_complete, op);
>> -    }
>> +    ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
>> +                               op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
>> +    mirror_write_complete(op, ret);
>> +}
>> +
>> +static void coroutine_fn mirror_co_discard(void *opaque)
>> +{
>> +    MirrorOp *op = opaque;
>> +    int ret;
>> +
>> +    op->s->in_flight++;
>> +    op->s->bytes_in_flight += op->bytes;
>> +
>> +    ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
>> +    mirror_write_complete(op, ret);
>>  }
>>  
>>  static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
>>                                 unsigned bytes, MirrorMethod mirror_method)
>>  {
>> +    MirrorOp *op;
>> +    Coroutine *co;
>> +    unsigned ret = bytes;
>> +
>> +    op = g_new(MirrorOp, 1);
>> +    *op = (MirrorOp){
>> +        .s      = s,
>> +        .offset = offset,
>> +        .bytes  = bytes,
>> +    };
>> +
>>      switch (mirror_method) {
>>      case MIRROR_METHOD_COPY:
>> -        return mirror_do_read(s, offset, bytes);
>> +        co = qemu_coroutine_create(mirror_co_read, op);
>> +        break;
>>      case MIRROR_METHOD_ZERO:
>> +        co = qemu_coroutine_create(mirror_co_zero, op);
>> +        break;
>>      case MIRROR_METHOD_DISCARD:
>> -        mirror_do_zero_or_discard(s, offset, bytes,
>> -                                  mirror_method == MIRROR_METHOD_DISCARD);
>> -        return bytes;
>> +        co = qemu_coroutine_create(mirror_co_discard, op);
>> +        break;
>>      default:
>>          abort();
>>      }
>> +
>> +    qemu_coroutine_enter(co);
>> +
>> +    if (mirror_method == MIRROR_METHOD_COPY) {
>> +        /* Same assertion as in mirror_co_read() */
>> +        assert(op->bytes_copied <= UINT_MAX);
>> +        ret = op->bytes_copied;
>> +    }
> 
> This special casing is a bit ugly. Can you just make mirror_co_zero and
> mirror_co_discard set op->bytes_copied too? (and perhaps rename to
> op->bytes_handled) If so the comment in MirrorOp needs an update too.

Sure.

> And is it better to initialize it to -1 before entering coroutine, then assert
> it is != -1 afterwards?

Sounds good, will do.

Max
Kevin Wolf Oct. 10, 2017, 9:14 a.m. UTC | #3
Am 13.09.2017 um 20:18 hat Max Reitz geschrieben:
> In order to talk to the source BDS (and maybe in the future to the
> target BDS as well) directly, we need to convert our existing AIO
> requests into coroutine I/O requests.
> 
> Signed-off-by: Max Reitz <mreitz@redhat.com>

Please follow through with it and add a few patches that turn it into
natural coroutine code rather than just any coroutine code. I know I did
the same kind of half-assed conversion in qed, but mirror is code that
is actually used and that people look at for more than just a bad
example.

You'll probably notice more things when you do this, but the obvious
things would be changing mirror_co_read() into a mirror_co_copy() with
the former callbacks inlined; keeping op on the stack instead of
mallocing it in mirror_perform() and free it deep inside the nested
functions that used to be callbacks; and probably also cleaning up the
random calls to aio_context_acquire/release() that will now appear in
the middle of the function.

Anyway, that's for follow-up patches (though ideally in the same
series), so for this one you can have:

Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Max Reitz Oct. 11, 2017, 11:43 a.m. UTC | #4
On 2017-10-10 11:14, Kevin Wolf wrote:
> Am 13.09.2017 um 20:18 hat Max Reitz geschrieben:
>> In order to talk to the source BDS (and maybe in the future to the
>> target BDS as well) directly, we need to convert our existing AIO
>> requests into coroutine I/O requests.
>>
>> Signed-off-by: Max Reitz <mreitz@redhat.com>
> 
> Please follow through with it and add a few patches that turn it into
> natural coroutine code rather than just any coroutine code. I know I did
> the same kind of half-assed conversion in qed, but mirror is code that
> is actually used and that people look at for more than just a bad
> example.
> 
> You'll probably notice more things when you do this, but the obvious
> things would be changing mirror_co_read() into a mirror_co_copy() with
> the former callbacks inlined; keeping op on the stack instead of
> mallocing it in mirror_perform() and free it deep inside the nested
> functions that used to be callbacks; and probably also cleaning up the
> random calls to aio_context_acquire/release() that will now appear in
> the middle of the function.
> 
> Anyway, that's for follow-up patches (though ideally in the same
> series), so for this one you can have:
> 
> Reviewed-by: Kevin Wolf <kwolf@redhat.com>

Phew. :-)

I think I'll write the patches (while working on v2), but I'll send them
as a follow-up.

Max
diff mbox series

Patch

diff --git a/block/mirror.c b/block/mirror.c
index 4664b0516f..2b3297aa61 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -80,6 +80,9 @@  typedef struct MirrorOp {
     QEMUIOVector qiov;
     int64_t offset;
     uint64_t bytes;
+
+    /* Set by mirror_co_read() before yielding for the first time */
+    uint64_t bytes_copied;
 } MirrorOp;
 
 typedef enum MirrorMethod {
@@ -101,7 +104,7 @@  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
     }
 }
 
-static void mirror_iteration_done(MirrorOp *op, int ret)
+static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
 {
     MirrorBlockJob *s = op->s;
     struct iovec *iov;
@@ -138,9 +141,8 @@  static void mirror_iteration_done(MirrorOp *op, int ret)
     }
 }
 
-static void mirror_write_complete(void *opaque, int ret)
+static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
 {
-    MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
 
     aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -158,9 +160,8 @@  static void mirror_write_complete(void *opaque, int ret)
     aio_context_release(blk_get_aio_context(s->common.blk));
 }
 
-static void mirror_read_complete(void *opaque, int ret)
+static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
 {
-    MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
 
     aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -176,8 +177,11 @@  static void mirror_read_complete(void *opaque, int ret)
 
         mirror_iteration_done(op, ret);
     } else {
-        blk_aio_pwritev(s->target, op->offset, &op->qiov,
-                        0, mirror_write_complete, op);
+        int ret;
+
+        ret = blk_co_pwritev(s->target, op->offset,
+                             op->qiov.size, &op->qiov, 0);
+        mirror_write_complete(op, ret);
     }
     aio_context_release(blk_get_aio_context(s->common.blk));
 }
@@ -242,53 +246,49 @@  static inline void mirror_wait_for_io(MirrorBlockJob *s)
  *          (new_end - offset) if tail is rounded up or down due to
  *          alignment or buffer limit.
  */
-static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
-                               uint64_t bytes)
+static void coroutine_fn mirror_co_read(void *opaque)
 {
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
     BlockBackend *source = s->common.blk;
     int nb_chunks;
     uint64_t ret;
-    MirrorOp *op;
     uint64_t max_bytes;
 
     max_bytes = s->granularity * s->max_iov;
 
     /* We can only handle as much as buf_size at a time. */
-    bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
-    assert(bytes);
-    assert(bytes < BDRV_REQUEST_MAX_BYTES);
-    ret = bytes;
+    op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
+    assert(op->bytes);
+    assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
+    op->bytes_copied = op->bytes;
 
     if (s->cow_bitmap) {
-        ret += mirror_cow_align(s, &offset, &bytes);
+        op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes);
     }
-    assert(bytes <= s->buf_size);
+    /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
+    assert(op->bytes_copied <= UINT_MAX);
+    assert(op->bytes <= s->buf_size);
     /* The offset is granularity-aligned because:
      * 1) Caller passes in aligned values;
      * 2) mirror_cow_align is used only when target cluster is larger. */
-    assert(QEMU_IS_ALIGNED(offset, s->granularity));
+    assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
     /* The range is sector-aligned, since bdrv_getlength() rounds up. */
-    assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
-    nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
+    assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
+    nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
 
     while (s->buf_free_count < nb_chunks) {
-        trace_mirror_yield_in_flight(s, offset, s->in_flight);
+        trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
         mirror_wait_for_io(s);
     }
 
-    /* Allocate a MirrorOp that is used as an AIO callback.  */
-    op = g_new(MirrorOp, 1);
-    op->s = s;
-    op->offset = offset;
-    op->bytes = bytes;
-
     /* Now make a QEMUIOVector taking enough granularity-sized chunks
      * from s->buf_free.
      */
     qemu_iovec_init(&op->qiov, nb_chunks);
     while (nb_chunks-- > 0) {
         MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
-        size_t remaining = bytes - op->qiov.size;
+        size_t remaining = op->bytes - op->qiov.size;
 
         QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
         s->buf_free_count--;
@@ -297,53 +297,75 @@  static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
 
     /* Copy the dirty cluster.  */
     s->in_flight++;
-    s->bytes_in_flight += bytes;
-    trace_mirror_one_iteration(s, offset, bytes);
+    s->bytes_in_flight += op->bytes;
+    trace_mirror_one_iteration(s, op->offset, op->bytes);
 
-    blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
-    return ret;
+    ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
+    mirror_read_complete(op, ret);
 }
 
-static void mirror_do_zero_or_discard(MirrorBlockJob *s,
-                                      int64_t offset,
-                                      uint64_t bytes,
-                                      bool is_discard)
+static void coroutine_fn mirror_co_zero(void *opaque)
 {
-    MirrorOp *op;
+    MirrorOp *op = opaque;
+    int ret;
 
-    /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
-     * so the freeing in mirror_iteration_done is nop. */
-    op = g_new0(MirrorOp, 1);
-    op->s = s;
-    op->offset = offset;
-    op->bytes = bytes;
+    op->s->in_flight++;
+    op->s->bytes_in_flight += op->bytes;
 
-    s->in_flight++;
-    s->bytes_in_flight += bytes;
-    if (is_discard) {
-        blk_aio_pdiscard(s->target, offset,
-                         op->bytes, mirror_write_complete, op);
-    } else {
-        blk_aio_pwrite_zeroes(s->target, offset,
-                              op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
-                              mirror_write_complete, op);
-    }
+    ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
+                               op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
+    mirror_write_complete(op, ret);
+}
+
+static void coroutine_fn mirror_co_discard(void *opaque)
+{
+    MirrorOp *op = opaque;
+    int ret;
+
+    op->s->in_flight++;
+    op->s->bytes_in_flight += op->bytes;
+
+    ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
+    mirror_write_complete(op, ret);
 }
 
 static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
                                unsigned bytes, MirrorMethod mirror_method)
 {
+    MirrorOp *op;
+    Coroutine *co;
+    unsigned ret = bytes;
+
+    op = g_new(MirrorOp, 1);
+    *op = (MirrorOp){
+        .s      = s,
+        .offset = offset,
+        .bytes  = bytes,
+    };
+
     switch (mirror_method) {
     case MIRROR_METHOD_COPY:
-        return mirror_do_read(s, offset, bytes);
+        co = qemu_coroutine_create(mirror_co_read, op);
+        break;
     case MIRROR_METHOD_ZERO:
+        co = qemu_coroutine_create(mirror_co_zero, op);
+        break;
     case MIRROR_METHOD_DISCARD:
-        mirror_do_zero_or_discard(s, offset, bytes,
-                                  mirror_method == MIRROR_METHOD_DISCARD);
-        return bytes;
+        co = qemu_coroutine_create(mirror_co_discard, op);
+        break;
     default:
         abort();
     }
+
+    qemu_coroutine_enter(co);
+
+    if (mirror_method == MIRROR_METHOD_COPY) {
+        /* Same assertion as in mirror_co_read() */
+        assert(op->bytes_copied <= UINT_MAX);
+        ret = op->bytes_copied;
+    }
+
+    return ret;
 }
 
 static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)