diff mbox

[v6,1/3] linux-aio: fix submit aio as a batch

Message ID 1416900193-3763-2-git-send-email-ming.lei@canonical.com
State New
Headers show

Commit Message

Ming Lei Nov. 25, 2014, 7:23 a.m. UTC
In the submit path, we can't complete request directly,
otherwise "Co-routine re-entered recursively" may be caused,
so this patch fixes the issue with below ideas:

	- for -EAGAIN or partial completion, retry the submision
	in following completion cb which is run in BH context
	- for part of completion, update the io queue too
	- for case of io queue full, submit queued requests
	immediatelly and return failure to caller
	- for other failure, abort all queued requests in BH
    context, and requests won't be allow to submit until
    aborting is handled

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Ming Lei <ming.lei@canonical.com>
---
 block/linux-aio.c |  114 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 95 insertions(+), 19 deletions(-)

Comments

Stefan Hajnoczi Nov. 25, 2014, 1:08 p.m. UTC | #1
On Tue, Nov 25, 2014 at 03:23:11PM +0800, Ming Lei wrote:
> @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
>  
>      aio_set_event_notifier(old_context, &s->e, NULL);
>      qemu_bh_delete(s->completion_bh);
> +    qemu_bh_delete(s->io_q.abort_bh);
>  }
>  
>  void laio_attach_aio_context(void *s_, AioContext *new_context)
>  {
>      struct qemu_laio_state *s = s_;
>  
> +    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
>      s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
>      aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
>  }

These functions are incomplete when ->aborting == true.  I can't think
of a reason why we are guaranteed never to hit that state, and fixing it
is easy.  Just add the following to the end of
laio_attach_aio_context():

if (s->aborting) {
    qemu_bh_schedule(s->io_q.abort_bh);
}

Stefan
Ming Lei Nov. 25, 2014, 2:45 p.m. UTC | #2
On Tue, Nov 25, 2014 at 9:08 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> On Tue, Nov 25, 2014 at 03:23:11PM +0800, Ming Lei wrote:
>> @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
>>
>>      aio_set_event_notifier(old_context, &s->e, NULL);
>>      qemu_bh_delete(s->completion_bh);
>> +    qemu_bh_delete(s->io_q.abort_bh);
>>  }
>>
>>  void laio_attach_aio_context(void *s_, AioContext *new_context)
>>  {
>>      struct qemu_laio_state *s = s_;
>>
>> +    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
>>      s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
>>      aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
>>  }
>
> These functions are incomplete when ->aborting == true.  I can't think

Could you explain in a bit when ->aborting is true during attach callback?

> of a reason why we are guaranteed never to hit that state, and fixing it
> is easy.  Just add the following to the end of
> laio_attach_aio_context():
>
> if (s->aborting) {
>     qemu_bh_schedule(s->io_q.abort_bh);
> }

You mean the abort BH may not have chance to run before its deletion
in the detach callback?

If so, bdrv_drain_all() from bdrv_set_aio_context() should have
handled the pending BH, right?


Thanks,
Ming Lei
Stefan Hajnoczi Nov. 25, 2014, 4:18 p.m. UTC | #3
On Tue, Nov 25, 2014 at 10:45:05PM +0800, Ming Lei wrote:
> On Tue, Nov 25, 2014 at 9:08 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> > On Tue, Nov 25, 2014 at 03:23:11PM +0800, Ming Lei wrote:
> >> @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
> >>
> >>      aio_set_event_notifier(old_context, &s->e, NULL);
> >>      qemu_bh_delete(s->completion_bh);
> >> +    qemu_bh_delete(s->io_q.abort_bh);
> >>  }
> >>
> >>  void laio_attach_aio_context(void *s_, AioContext *new_context)
> >>  {
> >>      struct qemu_laio_state *s = s_;
> >>
> >> +    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
> >>      s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
> >>      aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
> >>  }
> >
> > These functions are incomplete when ->aborting == true.  I can't think
> 
> Could you explain in a bit when ->aborting is true during attach callback?
> 
> > of a reason why we are guaranteed never to hit that state, and fixing it
> > is easy.  Just add the following to the end of
> > laio_attach_aio_context():
> >
> > if (s->aborting) {
> >     qemu_bh_schedule(s->io_q.abort_bh);
> > }
> 
> You mean the abort BH may not have chance to run before its deletion
> in the detach callback?

Exactly.  Any time you schedule a BH you need to be aware of things that
may happen before the BH is invoked.

> If so, bdrv_drain_all() from bdrv_set_aio_context() should have
> handled the pending BH, right?

I'm not sure if it's good to make subtle assumptions like that.  If the
code changes they will break.

Since it is very easy to protect against this case (the code I posted
before), it seems worthwhile to be on the safe side.

Stefan
Ming Lei Nov. 26, 2014, 9:15 a.m. UTC | #4
On Wed, Nov 26, 2014 at 12:18 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>>
>> You mean the abort BH may not have chance to run before its deletion
>> in the detach callback?
>
> Exactly.  Any time you schedule a BH you need to be aware of things that
> may happen before the BH is invoked.
>
>> If so, bdrv_drain_all() from bdrv_set_aio_context() should have
>> handled the pending BH, right?
>
> I'm not sure if it's good to make subtle assumptions like that.  If the
> code changes they will break.

IMO, that should be the purpose of bdrv_drain_all(), at least from
its comment:

         /* ensure there are no in-flight requests */

If it changes in future, the general problem has to be considered.

> Since it is very easy to protect against this case (the code I posted
> before), it seems worthwhile to be on the safe side.

Given there hasn't the potential problem in current tree, could you
agree on merging it first?

BTW, there isn't sort of handling for 'completion_bh' of linux aio too, :-)

Thanks,
Ming Lei
Kevin Wolf Nov. 26, 2014, 11:18 a.m. UTC | #5
Am 25.11.2014 um 08:23 hat Ming Lei geschrieben:
> In the submit path, we can't complete request directly,
> otherwise "Co-routine re-entered recursively" may be caused,
> so this patch fixes the issue with below ideas:
> 
> 	- for -EAGAIN or partial completion, retry the submision
> 	in following completion cb which is run in BH context
> 	- for part of completion, update the io queue too
> 	- for case of io queue full, submit queued requests
> 	immediatelly and return failure to caller
> 	- for other failure, abort all queued requests in BH
>     context, and requests won't be allow to submit until
>     aborting is handled
> 
> Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Ming Lei <ming.lei@canonical.com>

This looks like a quite complex fix to this problem, and it introduces
new error cases, too (while aborting, requests fail now, but they really
should just be waiting).

I'm wondering if this is the time to convert the linux-aio interface to
coroutines finally. It wouldn't only be a performance optimisation, but
would potentially also simplify this code.

>  block/linux-aio.c |  114 ++++++++++++++++++++++++++++++++++++++++++++---------
>  1 file changed, 95 insertions(+), 19 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index d92513b..11ac828 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -38,11 +38,20 @@ struct qemu_laiocb {
>      QLIST_ENTRY(qemu_laiocb) node;
>  };
>  
> +/*
> + * TODO: support to batch I/O from multiple bs in one same
> + * AIO context, one important use case is multi-lun scsi,
> + * so in future the IO queue should be per AIO context.
> + */
>  typedef struct {
>      struct iocb *iocbs[MAX_QUEUED_IO];
>      int plugged;
>      unsigned int size;
>      unsigned int idx;
> +
> +    /* abort queued requests in BH context */
> +    QEMUBH *abort_bh;
> +    bool  aborting;

Two spaces.

>  } LaioQueue;
>  
>  struct qemu_laio_state {
> @@ -59,6 +68,8 @@ struct qemu_laio_state {
>      int event_max;
>  };
>  
> +static int ioq_submit(struct qemu_laio_state *s);
> +
>  static inline ssize_t io_event_ret(struct io_event *ev)
>  {
>      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
> @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
>      qemu_aio_unref(laiocb);
>  }
>  
> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
> +{
> +    if (s->io_q.idx) {
> +        ioq_submit(s);
> +    }
> +}
> +
>  /* The completion BH fetches completed I/O requests and invokes their
>   * callbacks.
>   *
> @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque)
>  
>          qemu_laio_process_completion(s, laiocb);
>      }
> +
> +    qemu_laio_start_retry(s);
>  }

Why is qemu_laio_start_retry() a separate function? This is the only
caller.

>  static void qemu_laio_completion_cb(EventNotifier *e)
> @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q)
>      io_q->size = MAX_QUEUED_IO;
>      io_q->idx = 0;
>      io_q->plugged = 0;
> +    io_q->aborting = false;
>  }
>  
> +/* Always return >= 0 and it means how many requests are submitted */
>  static int ioq_submit(struct qemu_laio_state *s)
>  {
> -    int ret, i = 0;
> +    int ret;
>      int len = s->io_q.idx;
>  
> -    do {
> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -    } while (i++ < 3 && ret == -EAGAIN);
> -
> -    /* empty io queue */
> -    s->io_q.idx = 0;
> +    if (!len) {
> +        return 0;
> +    }
>  
> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
>      if (ret < 0) {
> -        i = 0;
> -    } else {
> -        i = ret;
> +        /* retry in following completion cb */
> +        if (ret == -EAGAIN) {
> +            return 0;
> +        }
> +
> +        /*
> +         * Abort in BH context for avoiding Co-routine re-entered,
> +         * and update io queue at that time
> +         */
> +        qemu_bh_schedule(s->io_q.abort_bh);
> +        s->io_q.aborting = true;
> +        ret = 0;
>      }
>  
> -    for (; i < len; i++) {
> -        struct qemu_laiocb *laiocb =
> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);

> +    /*
> +     * update io queue, and retry will be started automatically
> +     * in following completion cb for the remainder
> +     */
> +    if (ret > 0) {
> +        if (ret < len) {
> +            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
> +                    (len - ret) * sizeof(struct iocb *));
> +        }
> +        s->io_q.idx -= ret;
> +    }

Support for partly handled queues is nice, but a logically separate
change. Please move this to its own patch.

> -        laiocb->ret = (ret < 0) ? ret : -EIO;
> +    return ret;
> +}
> +
> +static void ioq_abort_bh(void *opaque)
> +{
> +    struct qemu_laio_state *s = opaque;
> +    int i;
> +
> +    for (i = 0; i < s->io_q.idx; i++) {
> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
> +                                                  struct qemu_laiocb,
> +                                                  iocb);
> +        laiocb->ret = -EIO;

We shouldn't be throwing the real error code away.

>          qemu_laio_process_completion(s, laiocb);
>      }
> -    return ret;
> +
> +    s->io_q.idx = 0;
> +    s->io_q.aborting = false;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>  {
>      unsigned int idx = s->io_q.idx;
>  
> +    /* Request can't be allowed to submit until aborting is handled */
> +    if (unlikely(s->io_q.aborting)) {
> +        return -EIO;
> +    }
> +
> +    if (unlikely(idx == s->io_q.size)) {
> +        ioq_submit(s);
> +
> +        if (unlikely(s->io_q.aborting)) {
> +            return -EIO;
> +        }
> +        idx = s->io_q.idx;
> +    }
> +
> +    /* It has to return now if queue is still full */
> +    if (unlikely(idx == s->io_q.size)) {
> +        return -EAGAIN;
> +    }
> +
>      s->io_q.iocbs[idx++] = iocb;
>      s->io_q.idx = idx;
>  
> -    /* submit immediately if queue is full */
> -    if (idx == s->io_q.size) {
> +    /* submit immediately if queue depth is above 2/3 */
> +    if (idx > s->io_q.size * 2 / 3) {
>          ioq_submit(s);
>      }

This change looks independent as well. Also isn't mentioned in the
commit message. Why is it a good idea to use only 2/3 of the queue
instead of making use of its full length?

> +    return 0;
>  }

Kevin
Kevin Wolf Nov. 26, 2014, 2:48 p.m. UTC | #6
Am 25.11.2014 um 08:23 hat Ming Lei geschrieben:
> In the submit path, we can't complete request directly,
> otherwise "Co-routine re-entered recursively" may be caused,

One more thing: Can you write a qemu-iotests case to test this
code path? (Which should fail on master and be fixed with this patch,
obviously.)

Kevin
Stefan Hajnoczi Nov. 27, 2014, 4:56 p.m. UTC | #7
On Wed, Nov 26, 2014 at 05:15:44PM +0800, Ming Lei wrote:
> On Wed, Nov 26, 2014 at 12:18 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> >>
> >> You mean the abort BH may not have chance to run before its deletion
> >> in the detach callback?
> >
> > Exactly.  Any time you schedule a BH you need to be aware of things that
> > may happen before the BH is invoked.
> >
> >> If so, bdrv_drain_all() from bdrv_set_aio_context() should have
> >> handled the pending BH, right?
> >
> > I'm not sure if it's good to make subtle assumptions like that.  If the
> > code changes they will break.
> 
> IMO, that should be the purpose of bdrv_drain_all(), at least from
> its comment:
> 
>          /* ensure there are no in-flight requests */
> 
> If it changes in future, the general problem has to be considered.
> 
> > Since it is very easy to protect against this case (the code I posted
> > before), it seems worthwhile to be on the safe side.
> 
> Given there hasn't the potential problem in current tree, could you
> agree on merging it first?
> 
> BTW, there isn't sort of handling for 'completion_bh' of linux aio too, :-)

That's incorrect, completion_bh is protected:

void laio_attach_aio_context(void *s_, AioContext *new_context)
{
    struct qemu_laio_state *s = s_;

    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
    aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
    ^---- this will reschedule completion_bh if s->e has events pending

I am asking for a one-line if statement to avoid introducing a subtle
assumption that would be a pain to debug in the future.  It's so easy to
add that I'm against merging the patch without this protection.
Paolo Bonzini Nov. 27, 2014, 4:58 p.m. UTC | #8
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1



On 27/11/2014 17:56, Stefan Hajnoczi wrote:
> I am asking for a one-line if statement to avoid introducing a
> subtle assumption that would be a pain to debug in the future.
> It's so easy to add that I'm against merging the patch without this
> protection.

Yeah, either that or an assertion must be there.

Paolo
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2

iQEcBAEBAgAGBQJUd1guAAoJEL/70l94x66DfR8H/1De/j4oEmPA1XKSguoXtJz+
vjMzSg93cfpE4e095LqQ7Ho7imSxWuSNPf7vT6NBM0jRTebBTwMk224NCp1IkCYv
U9nFCo3aJsRizSlDO4AjUJv55AsGUJXlo/gjXwwYPBcbn4lC3GV2MNCcZi/rAfMe
FGPozxARBqT+6BD200meRsrIbsGvLjQXwCgWsDpb5OuxBM3tB7amJQNIIG9GI0uI
YTTn1Vtn7E2lsYKNu4Rv2yL6Lg6zAkJizZWMBk1uJI9WzdFpbb+tarQKI0nrsB3D
4SXAhyAGTkFmvNpWwoFQLHd9hWtqUIZzN1n7Vlperj+/O27pKPtJujpl5jGgkc8=
=lXXB
-----END PGP SIGNATURE-----
Ming Lei Nov. 28, 2014, 2:16 a.m. UTC | #9
On Wed, Nov 26, 2014 at 7:18 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 25.11.2014 um 08:23 hat Ming Lei geschrieben:
>> In the submit path, we can't complete request directly,
>> otherwise "Co-routine re-entered recursively" may be caused,
>> so this patch fixes the issue with below ideas:
>>
>>       - for -EAGAIN or partial completion, retry the submision
>>       in following completion cb which is run in BH context
>>       - for part of completion, update the io queue too
>>       - for case of io queue full, submit queued requests
>>       immediatelly and return failure to caller
>>       - for other failure, abort all queued requests in BH
>>     context, and requests won't be allow to submit until
>>     aborting is handled
>>
>> Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
>> Signed-off-by: Ming Lei <ming.lei@canonical.com>
>
> This looks like a quite complex fix to this problem, and it introduces
> new error cases, too (while aborting, requests fail now, but they really
> should just be waiting).

"Co-routine re-entered recursively" is only triggered when io_submit()
returns failure(either -EAGAIN or other error) or partial completion, so
this patch actually is for handling failure of io_submit() and making
linux-aio more reliably.

After io queue is introduce, it is a bit easy to trigger the failure, especially
in case of multi-queue, or VM driver sets a longer queue depth. So all
cases(EAGAIN, other failure and partial completion) have to be
considered too.

So it doesn't mean a complex fix, and the actual problem is complex too.
Not mention previously linux aio doesn't handle -EAGAIN.

>
> I'm wondering if this is the time to convert the linux-aio interface to
> coroutines finally. It wouldn't only be a performance optimisation, but
> would potentially also simplify this code.

Even with coroutine, the above io_submit() issues has to be considered
too.

>
>>  block/linux-aio.c |  114 ++++++++++++++++++++++++++++++++++++++++++++---------
>>  1 file changed, 95 insertions(+), 19 deletions(-)
>>
>> diff --git a/block/linux-aio.c b/block/linux-aio.c
>> index d92513b..11ac828 100644
>> --- a/block/linux-aio.c
>> +++ b/block/linux-aio.c
>> @@ -38,11 +38,20 @@ struct qemu_laiocb {
>>      QLIST_ENTRY(qemu_laiocb) node;
>>  };
>>
>> +/*
>> + * TODO: support to batch I/O from multiple bs in one same
>> + * AIO context, one important use case is multi-lun scsi,
>> + * so in future the IO queue should be per AIO context.
>> + */
>>  typedef struct {
>>      struct iocb *iocbs[MAX_QUEUED_IO];
>>      int plugged;
>>      unsigned int size;
>>      unsigned int idx;
>> +
>> +    /* abort queued requests in BH context */
>> +    QEMUBH *abort_bh;
>> +    bool  aborting;
>
> Two spaces.

OK.

>
>>  } LaioQueue;
>>
>>  struct qemu_laio_state {
>> @@ -59,6 +68,8 @@ struct qemu_laio_state {
>>      int event_max;
>>  };
>>
>> +static int ioq_submit(struct qemu_laio_state *s);
>> +
>>  static inline ssize_t io_event_ret(struct io_event *ev)
>>  {
>>      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
>> @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
>>      qemu_aio_unref(laiocb);
>>  }
>>
>> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
>> +{
>> +    if (s->io_q.idx) {
>> +        ioq_submit(s);
>> +    }
>> +}
>> +
>>  /* The completion BH fetches completed I/O requests and invokes their
>>   * callbacks.
>>   *
>> @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque)
>>
>>          qemu_laio_process_completion(s, laiocb);
>>      }
>> +
>> +    qemu_laio_start_retry(s);
>>  }
>
> Why is qemu_laio_start_retry() a separate function? This is the only
> caller.

OK.

>
>>  static void qemu_laio_completion_cb(EventNotifier *e)
>> @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q)
>>      io_q->size = MAX_QUEUED_IO;
>>      io_q->idx = 0;
>>      io_q->plugged = 0;
>> +    io_q->aborting = false;
>>  }
>>
>> +/* Always return >= 0 and it means how many requests are submitted */
>>  static int ioq_submit(struct qemu_laio_state *s)
>>  {
>> -    int ret, i = 0;
>> +    int ret;
>>      int len = s->io_q.idx;
>>
>> -    do {
>> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> -    } while (i++ < 3 && ret == -EAGAIN);
>> -
>> -    /* empty io queue */
>> -    s->io_q.idx = 0;
>> +    if (!len) {
>> +        return 0;
>> +    }
>>
>> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
>>      if (ret < 0) {
>> -        i = 0;
>> -    } else {
>> -        i = ret;
>> +        /* retry in following completion cb */
>> +        if (ret == -EAGAIN) {
>> +            return 0;
>> +        }
>> +
>> +        /*
>> +         * Abort in BH context for avoiding Co-routine re-entered,
>> +         * and update io queue at that time
>> +         */
>> +        qemu_bh_schedule(s->io_q.abort_bh);
>> +        s->io_q.aborting = true;
>> +        ret = 0;
>>      }
>>
>> -    for (; i < len; i++) {
>> -        struct qemu_laiocb *laiocb =
>> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
>
>> +    /*
>> +     * update io queue, and retry will be started automatically
>> +     * in following completion cb for the remainder
>> +     */
>> +    if (ret > 0) {
>> +        if (ret < len) {
>> +            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
>> +                    (len - ret) * sizeof(struct iocb *));
>> +        }
>> +        s->io_q.idx -= ret;
>> +    }
>
> Support for partly handled queues is nice, but a logically separate
> change. Please move this to its own patch.

As I described above, the issue of "Co-routine re-entered recursively"
is triggered in current handling of partly completion.

>
>> -        laiocb->ret = (ret < 0) ? ret : -EIO;
>> +    return ret;
>> +}
>> +
>> +static void ioq_abort_bh(void *opaque)
>> +{
>> +    struct qemu_laio_state *s = opaque;
>> +    int i;
>> +
>> +    for (i = 0; i < s->io_q.idx; i++) {
>> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
>> +                                                  struct qemu_laiocb,
>> +                                                  iocb);
>> +        laiocb->ret = -EIO;
>
> We shouldn't be throwing the real error code away.

OK, will do that.

>
>>          qemu_laio_process_completion(s, laiocb);
>>      }
>> -    return ret;
>> +
>> +    s->io_q.idx = 0;
>> +    s->io_q.aborting = false;
>>  }
>>
>> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>>  {
>>      unsigned int idx = s->io_q.idx;
>>
>> +    /* Request can't be allowed to submit until aborting is handled */
>> +    if (unlikely(s->io_q.aborting)) {
>> +        return -EIO;
>> +    }
>> +
>> +    if (unlikely(idx == s->io_q.size)) {
>> +        ioq_submit(s);
>> +
>> +        if (unlikely(s->io_q.aborting)) {
>> +            return -EIO;
>> +        }
>> +        idx = s->io_q.idx;
>> +    }
>> +
>> +    /* It has to return now if queue is still full */
>> +    if (unlikely(idx == s->io_q.size)) {
>> +        return -EAGAIN;
>> +    }
>> +
>>      s->io_q.iocbs[idx++] = iocb;
>>      s->io_q.idx = idx;
>>
>> -    /* submit immediately if queue is full */
>> -    if (idx == s->io_q.size) {
>> +    /* submit immediately if queue depth is above 2/3 */
>> +    if (idx > s->io_q.size * 2 / 3) {
>>          ioq_submit(s);
>>      }
>
> This change looks independent as well. Also isn't mentioned in the
> commit message. Why is it a good idea to use only 2/3 of the queue
> instead of making use of its full length?

When queue is full, the new submission has to return failure, so it is
used for making linux-aio more reliable.


Thanks,
Ming Lei
Ming Lei Nov. 28, 2014, 3:01 a.m. UTC | #10
On Fri, Nov 28, 2014 at 12:58 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
>
>
> On 27/11/2014 17:56, Stefan Hajnoczi wrote:
>> I am asking for a one-line if statement to avoid introducing a
>> subtle assumption that would be a pain to debug in the future.
>> It's so easy to add that I'm against merging the patch without this
>> protection.
>
> Yeah, either that or an assertion must be there.

OK.

Thanks,
Ming Lei
diff mbox

Patch

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..11ac828 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,20 @@  struct qemu_laiocb {
     QLIST_ENTRY(qemu_laiocb) node;
 };
 
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
 typedef struct {
     struct iocb *iocbs[MAX_QUEUED_IO];
     int plugged;
     unsigned int size;
     unsigned int idx;
+
+    /* abort queued requests in BH context */
+    QEMUBH *abort_bh;
+    bool  aborting;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -59,6 +68,8 @@  struct qemu_laio_state {
     int event_max;
 };
 
+static int ioq_submit(struct qemu_laio_state *s);
+
 static inline ssize_t io_event_ret(struct io_event *ev)
 {
     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -91,6 +102,13 @@  static void qemu_laio_process_completion(struct qemu_laio_state *s,
     qemu_aio_unref(laiocb);
 }
 
+static void qemu_laio_start_retry(struct qemu_laio_state *s)
+{
+    if (s->io_q.idx) {
+        ioq_submit(s);
+    }
+}
+
 /* The completion BH fetches completed I/O requests and invokes their
  * callbacks.
  *
@@ -135,6 +153,8 @@  static void qemu_laio_completion_bh(void *opaque)
 
         qemu_laio_process_completion(s, laiocb);
     }
+
+    qemu_laio_start_retry(s);
 }
 
 static void qemu_laio_completion_cb(EventNotifier *e)
@@ -175,47 +195,99 @@  static void ioq_init(LaioQueue *io_q)
     io_q->size = MAX_QUEUED_IO;
     io_q->idx = 0;
     io_q->plugged = 0;
+    io_q->aborting = false;
 }
 
+/* Always return >= 0 and it means how many requests are submitted */
 static int ioq_submit(struct qemu_laio_state *s)
 {
-    int ret, i = 0;
+    int ret;
     int len = s->io_q.idx;
 
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
-
-    /* empty io queue */
-    s->io_q.idx = 0;
+    if (!len) {
+        return 0;
+    }
 
+    ret = io_submit(s->ctx, len, s->io_q.iocbs);
     if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        /* retry in following completion cb */
+        if (ret == -EAGAIN) {
+            return 0;
+        }
+
+        /*
+         * Abort in BH context for avoiding Co-routine re-entered,
+         * and update io queue at that time
+         */
+        qemu_bh_schedule(s->io_q.abort_bh);
+        s->io_q.aborting = true;
+        ret = 0;
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+    /*
+     * update io queue, and retry will be started automatically
+     * in following completion cb for the remainder
+     */
+    if (ret > 0) {
+        if (ret < len) {
+            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
+                    (len - ret) * sizeof(struct iocb *));
+        }
+        s->io_q.idx -= ret;
+    }
 
-        laiocb->ret = (ret < 0) ? ret : -EIO;
+    return ret;
+}
+
+static void ioq_abort_bh(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    int i;
+
+    for (i = 0; i < s->io_q.idx; i++) {
+        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+                                                  struct qemu_laiocb,
+                                                  iocb);
+        laiocb->ret = -EIO;
         qemu_laio_process_completion(s, laiocb);
     }
-    return ret;
+
+    s->io_q.idx = 0;
+    s->io_q.aborting = false;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
     unsigned int idx = s->io_q.idx;
 
+    /* Request can't be allowed to submit until aborting is handled */
+    if (unlikely(s->io_q.aborting)) {
+        return -EIO;
+    }
+
+    if (unlikely(idx == s->io_q.size)) {
+        ioq_submit(s);
+
+        if (unlikely(s->io_q.aborting)) {
+            return -EIO;
+        }
+        idx = s->io_q.idx;
+    }
+
+    /* It has to return now if queue is still full */
+    if (unlikely(idx == s->io_q.size)) {
+        return -EAGAIN;
+    }
+
     s->io_q.iocbs[idx++] = iocb;
     s->io_q.idx = idx;
 
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
+    /* submit immediately if queue depth is above 2/3 */
+    if (idx > s->io_q.size * 2 / 3) {
         ioq_submit(s);
     }
+
+    return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -281,7 +353,9 @@  BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
             goto out_free_aiocb;
         }
     } else {
-        ioq_enqueue(s, iocbs);
+        if (ioq_enqueue(s, iocbs) < 0) {
+            goto out_free_aiocb;
+        }
     }
     return &laiocb->common;
 
@@ -296,12 +370,14 @@  void laio_detach_aio_context(void *s_, AioContext *old_context)
 
     aio_set_event_notifier(old_context, &s->e, NULL);
     qemu_bh_delete(s->completion_bh);
+    qemu_bh_delete(s->io_q.abort_bh);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
     s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
 }