diff mbox

[09/15] linux-aio: fix submit aio as a batch

Message ID 1406720388-18671-10-git-send-email-ming.lei@canonical.com
State New
Headers show

Commit Message

Ming Lei July 30, 2014, 11:39 a.m. UTC
In the enqueue path, we can't complete request, otherwise
"Co-routine re-entered recursively" may be caused, so this
patch fixes the issue with below ideas:

	- for -EAGAIN or partial completion, retry the submission by
	an introduced event handler
	- for part of completion, also update the io queue
	- for other failure, return the failure if in enqueue path,
	otherwise, abort all queued I/O

Signed-off-by: Ming Lei <ming.lei@canonical.com>
---
 block/linux-aio.c |   90 ++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 68 insertions(+), 22 deletions(-)

Comments

Paolo Bonzini July 30, 2014, 1:59 p.m. UTC | #1
Il 30/07/2014 13:39, Ming Lei ha scritto:
> In the enqueue path, we can't complete request, otherwise
> "Co-routine re-entered recursively" may be caused, so this
> patch fixes the issue with below ideas:
> 
> 	- for -EAGAIN or partial completion, retry the submission by
> 	an introduced event handler
> 	- for part of completion, also update the io queue
> 	- for other failure, return the failure if in enqueue path,
> 	otherwise, abort all queued I/O
> 
> Signed-off-by: Ming Lei <ming.lei@canonical.com>
> ---
>  block/linux-aio.c |   90 ++++++++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 68 insertions(+), 22 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index 7ac7e8c..5eb9c92 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -51,6 +51,7 @@ struct qemu_laio_state {
>  
>      /* io queue for submit at batch */
>      LaioQueue io_q;
> +    EventNotifier retry;      /* handle -EAGAIN and partial completion */
>  };
>  
>  static inline ssize_t io_event_ret(struct io_event *ev)
> @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q)
>      io_q->plugged = 0;
>  }
>  
> -static int ioq_submit(struct qemu_laio_state *s)
> +static void abort_queue(struct qemu_laio_state *s)
> +{
> +    int i;
> +    for (i = 0; i < s->io_q.idx; i++) {
> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
> +                                                  struct qemu_laiocb,
> +                                                  iocb);
> +        laiocb->ret = -EIO;
> +        qemu_laio_process_completion(s, laiocb);
> +    }
> +}
> +
> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>  {
>      int ret, i = 0;
>      int len = s->io_q.idx;
> +    int j = 0;
>  
> -    do {
> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -    } while (i++ < 3 && ret == -EAGAIN);
> +    if (!len) {
> +        return 0;
> +    }
>  
> -    /* empty io queue */
> -    s->io_q.idx = 0;
> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
> +    if (ret == -EAGAIN) {
> +        event_notifier_set(&s->retry);

Retrying immediately (and just doing a couple of system calls to waste
time) is not an improvement.  The right place to retry is in
qemu_laio_completion_cb, after io_getevents has been called and
presumably the queue depth has decreased.

If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it
will just work.  Then you can only go to out_free_aiocb if the queue is
full (independent of the "plug" state).

Paolo

> +        return 0;
> +    } else if (ret < 0) {
> +        if (enqueue) {
> +            return ret;
> +        }
>  
> -    if (ret < 0) {
> -        i = 0;
> -    } else {
> -        i = ret;
> +        /* in non-queue path, all IOs have to be completed */
> +        abort_queue(s);
> +        ret = len;
> +    } else if (ret == 0) {
> +        goto out;
>      }
>  
> -    for (; i < len; i++) {
> -        struct qemu_laiocb *laiocb =
> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
> -
> -        laiocb->ret = (ret < 0) ? ret : -EIO;
> -        qemu_laio_process_completion(s, laiocb);
> +    for (i = ret; i < len; i++) {
> +        s->io_q.iocbs[j++] = s->io_q.iocbs[i];
>      }
> +
> + out:
> +    /* update io queue */
> +    s->io_q.idx -= ret;
> +
>      return ret;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> +static void ioq_submit_retry(EventNotifier *e)
> +{
> +    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, retry);
> +
> +    event_notifier_test_and_clear(e);
> +    ioq_submit(s, false);
> +}
> +
> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>  {
>      unsigned int idx = s->io_q.idx;
>  
> +    if (unlikely(idx == s->io_q.size)) {
> +        return -1;
> +    }
> +
>      s->io_q.iocbs[idx++] = iocb;
>      s->io_q.idx = idx;
>  
> -    /* submit immediately if queue is full */
> -    if (idx == s->io_q.size) {
> -        ioq_submit(s);
> +    /* submit immediately if queue depth is above 2/3 */
> +    if (idx > s->io_q.size * 2 / 3) {
> +        return ioq_submit(s, true);
>      }
> +
> +    return 0;
>  }
>  
>  void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
> @@ -214,7 +250,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
>      }
>  
>      if (s->io_q.idx > 0) {
> -        ret = ioq_submit(s);
> +        ret = ioq_submit(s, false);
>      }
>  
>      return ret;
> @@ -258,7 +294,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
>              goto out_free_aiocb;
>          }
>      } else {
> -        ioq_enqueue(s, iocbs);
> +        if (ioq_enqueue(s, iocbs) < 0) {
> +            goto out_free_aiocb;
> +        }
>      }
>      return &laiocb->common;
>  
> @@ -272,6 +310,7 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
>      struct qemu_laio_state *s = s_;
>  
>      aio_set_event_notifier(old_context, &s->e, NULL);
> +    aio_set_event_notifier(old_context, &s->retry, NULL);
>  }
>  
>  void laio_attach_aio_context(void *s_, AioContext *new_context)
> @@ -279,6 +318,7 @@ void laio_attach_aio_context(void *s_, AioContext *new_context)
>      struct qemu_laio_state *s = s_;
>  
>      aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
> +    aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry);
>  }
>  
>  void *laio_init(void)
> @@ -295,9 +335,14 @@ void *laio_init(void)
>      }
>  
>      ioq_init(&s->io_q);
> +    if (event_notifier_init(&s->retry, false) < 0) {
> +        goto out_notifer_init;
> +    }
>  
>      return s;
>  
> +out_notifer_init:
> +    io_destroy(s->ctx);
>  out_close_efd:
>      event_notifier_cleanup(&s->e);
>  out_free_state:
> @@ -310,6 +355,7 @@ void laio_cleanup(void *s_)
>      struct qemu_laio_state *s = s_;
>  
>      event_notifier_cleanup(&s->e);
> +    event_notifier_cleanup(&s->retry);
>  
>      if (io_destroy(s->ctx) != 0) {
>          fprintf(stderr, "%s: destroy AIO context %p failed\n",
>
Ming Lei July 30, 2014, 5:32 p.m. UTC | #2
On Wed, Jul 30, 2014 at 9:59 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 30/07/2014 13:39, Ming Lei ha scritto:
>> In the enqueue path, we can't complete request, otherwise
>> "Co-routine re-entered recursively" may be caused, so this
>> patch fixes the issue with below ideas:
>>
>>       - for -EAGAIN or partial completion, retry the submission by
>>       an introduced event handler
>>       - for part of completion, also update the io queue
>>       - for other failure, return the failure if in enqueue path,
>>       otherwise, abort all queued I/O
>>
>> Signed-off-by: Ming Lei <ming.lei@canonical.com>
>> ---
>>  block/linux-aio.c |   90 ++++++++++++++++++++++++++++++++++++++++-------------
>>  1 file changed, 68 insertions(+), 22 deletions(-)
>>
>> diff --git a/block/linux-aio.c b/block/linux-aio.c
>> index 7ac7e8c..5eb9c92 100644
>> --- a/block/linux-aio.c
>> +++ b/block/linux-aio.c
>> @@ -51,6 +51,7 @@ struct qemu_laio_state {
>>
>>      /* io queue for submit at batch */
>>      LaioQueue io_q;
>> +    EventNotifier retry;      /* handle -EAGAIN and partial completion */
>>  };
>>
>>  static inline ssize_t io_event_ret(struct io_event *ev)
>> @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q)
>>      io_q->plugged = 0;
>>  }
>>
>> -static int ioq_submit(struct qemu_laio_state *s)
>> +static void abort_queue(struct qemu_laio_state *s)
>> +{
>> +    int i;
>> +    for (i = 0; i < s->io_q.idx; i++) {
>> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
>> +                                                  struct qemu_laiocb,
>> +                                                  iocb);
>> +        laiocb->ret = -EIO;
>> +        qemu_laio_process_completion(s, laiocb);
>> +    }
>> +}
>> +
>> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>>  {
>>      int ret, i = 0;
>>      int len = s->io_q.idx;
>> +    int j = 0;
>>
>> -    do {
>> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> -    } while (i++ < 3 && ret == -EAGAIN);
>> +    if (!len) {
>> +        return 0;
>> +    }
>>
>> -    /* empty io queue */
>> -    s->io_q.idx = 0;
>> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> +    if (ret == -EAGAIN) {
>> +        event_notifier_set(&s->retry);
>
> Retrying immediately (and just doing a couple of system calls to waste
> time) is not an improvement.  The right place to retry is in
> qemu_laio_completion_cb, after io_getevents has been called and
> presumably the queue depth has decreased.

Good point.

>
> If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it

When will the queued I/O be submitted? That will introduce extra
complexity definitely.

It is a change for !s->io_q.plugged case, and it isn't good to do that in
this patch, IMO.

> will just work.  Then you can only go to out_free_aiocb if the queue is
> full (independent of the "plug" state).


Thanks,
Paolo Bonzini July 30, 2014, 11:41 p.m. UTC | #3
Il 30/07/2014 19:32, Ming Lei ha scritto:
> On Wed, Jul 30, 2014 at 9:59 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>> Il 30/07/2014 13:39, Ming Lei ha scritto:
>>> In the enqueue path, we can't complete request, otherwise
>>> "Co-routine re-entered recursively" may be caused, so this
>>> patch fixes the issue with below ideas:
>>>
>>>       - for -EAGAIN or partial completion, retry the submission by
>>>       an introduced event handler
>>>       - for part of completion, also update the io queue
>>>       - for other failure, return the failure if in enqueue path,
>>>       otherwise, abort all queued I/O
>>>
>>> Signed-off-by: Ming Lei <ming.lei@canonical.com>
>>> ---
>>>  block/linux-aio.c |   90 ++++++++++++++++++++++++++++++++++++++++-------------
>>>  1 file changed, 68 insertions(+), 22 deletions(-)
>>>
>>> diff --git a/block/linux-aio.c b/block/linux-aio.c
>>> index 7ac7e8c..5eb9c92 100644
>>> --- a/block/linux-aio.c
>>> +++ b/block/linux-aio.c
>>> @@ -51,6 +51,7 @@ struct qemu_laio_state {
>>>
>>>      /* io queue for submit at batch */
>>>      LaioQueue io_q;
>>> +    EventNotifier retry;      /* handle -EAGAIN and partial completion */
>>>  };
>>>
>>>  static inline ssize_t io_event_ret(struct io_event *ev)
>>> @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q)
>>>      io_q->plugged = 0;
>>>  }
>>>
>>> -static int ioq_submit(struct qemu_laio_state *s)
>>> +static void abort_queue(struct qemu_laio_state *s)
>>> +{
>>> +    int i;
>>> +    for (i = 0; i < s->io_q.idx; i++) {
>>> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
>>> +                                                  struct qemu_laiocb,
>>> +                                                  iocb);
>>> +        laiocb->ret = -EIO;
>>> +        qemu_laio_process_completion(s, laiocb);
>>> +    }
>>> +}
>>> +
>>> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>>>  {
>>>      int ret, i = 0;
>>>      int len = s->io_q.idx;
>>> +    int j = 0;
>>>
>>> -    do {
>>> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
>>> -    } while (i++ < 3 && ret == -EAGAIN);
>>> +    if (!len) {
>>> +        return 0;
>>> +    }
>>>
>>> -    /* empty io queue */
>>> -    s->io_q.idx = 0;
>>> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
>>> +    if (ret == -EAGAIN) {
>>> +        event_notifier_set(&s->retry);
>>
>> Retrying immediately (and just doing a couple of system calls to waste
>> time) is not an improvement.  The right place to retry is in
>> qemu_laio_completion_cb, after io_getevents has been called and
>> presumably the queue depth has decreased.
> 
> Good point.
> 
>>
>> If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it
> 
> When will the queued I/O be submitted? That will introduce extra
> complexity definitely.

It will be submitted when qemu_laio_completion_cb is called.

> It is a change for !s->io_q.plugged case, and it isn't good to do that in
> this patch, IMO.

I agree with you that this series is doing too many things at a single
time.  You can submit separate series for 1) no-coroutine fast path, 2)
full queue, 3) multiqueue.  If you do things properly you won't have a
single conflict, since they affect respectively block.c,
block/linux-aio.c and hw/block/.

Paolo
diff mbox

Patch

diff --git a/block/linux-aio.c b/block/linux-aio.c
index 7ac7e8c..5eb9c92 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -51,6 +51,7 @@  struct qemu_laio_state {
 
     /* io queue for submit at batch */
     LaioQueue io_q;
+    EventNotifier retry;      /* handle -EAGAIN and partial completion */
 };
 
 static inline ssize_t io_event_ret(struct io_event *ev)
@@ -154,45 +155,80 @@  static void ioq_init(LaioQueue *io_q)
     io_q->plugged = 0;
 }
 
-static int ioq_submit(struct qemu_laio_state *s)
+static void abort_queue(struct qemu_laio_state *s)
+{
+    int i;
+    for (i = 0; i < s->io_q.idx; i++) {
+        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+                                                  struct qemu_laiocb,
+                                                  iocb);
+        laiocb->ret = -EIO;
+        qemu_laio_process_completion(s, laiocb);
+    }
+}
+
+static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
 {
     int ret, i = 0;
     int len = s->io_q.idx;
+    int j = 0;
 
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
+    if (!len) {
+        return 0;
+    }
 
-    /* empty io queue */
-    s->io_q.idx = 0;
+    ret = io_submit(s->ctx, len, s->io_q.iocbs);
+    if (ret == -EAGAIN) {
+        event_notifier_set(&s->retry);
+        return 0;
+    } else if (ret < 0) {
+        if (enqueue) {
+            return ret;
+        }
 
-    if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        /* in non-queue path, all IOs have to be completed */
+        abort_queue(s);
+        ret = len;
+    } else if (ret == 0) {
+        goto out;
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
-
-        laiocb->ret = (ret < 0) ? ret : -EIO;
-        qemu_laio_process_completion(s, laiocb);
+    for (i = ret; i < len; i++) {
+        s->io_q.iocbs[j++] = s->io_q.iocbs[i];
     }
+
+ out:
+    /* update io queue */
+    s->io_q.idx -= ret;
+
     return ret;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static void ioq_submit_retry(EventNotifier *e)
+{
+    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, retry);
+
+    event_notifier_test_and_clear(e);
+    ioq_submit(s, false);
+}
+
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
     unsigned int idx = s->io_q.idx;
 
+    if (unlikely(idx == s->io_q.size)) {
+        return -1;
+    }
+
     s->io_q.iocbs[idx++] = iocb;
     s->io_q.idx = idx;
 
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
-        ioq_submit(s);
+    /* submit immediately if queue depth is above 2/3 */
+    if (idx > s->io_q.size * 2 / 3) {
+        return ioq_submit(s, true);
     }
+
+    return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -214,7 +250,7 @@  int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
     }
 
     if (s->io_q.idx > 0) {
-        ret = ioq_submit(s);
+        ret = ioq_submit(s, false);
     }
 
     return ret;
@@ -258,7 +294,9 @@  BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
             goto out_free_aiocb;
         }
     } else {
-        ioq_enqueue(s, iocbs);
+        if (ioq_enqueue(s, iocbs) < 0) {
+            goto out_free_aiocb;
+        }
     }
     return &laiocb->common;
 
@@ -272,6 +310,7 @@  void laio_detach_aio_context(void *s_, AioContext *old_context)
     struct qemu_laio_state *s = s_;
 
     aio_set_event_notifier(old_context, &s->e, NULL);
+    aio_set_event_notifier(old_context, &s->retry, NULL);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
@@ -279,6 +318,7 @@  void laio_attach_aio_context(void *s_, AioContext *new_context)
     struct qemu_laio_state *s = s_;
 
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
+    aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry);
 }
 
 void *laio_init(void)
@@ -295,9 +335,14 @@  void *laio_init(void)
     }
 
     ioq_init(&s->io_q);
+    if (event_notifier_init(&s->retry, false) < 0) {
+        goto out_notifer_init;
+    }
 
     return s;
 
+out_notifer_init:
+    io_destroy(s->ctx);
 out_close_efd:
     event_notifier_cleanup(&s->e);
 out_free_state:
@@ -310,6 +355,7 @@  void laio_cleanup(void *s_)
     struct qemu_laio_state *s = s_;
 
     event_notifier_cleanup(&s->e);
+    event_notifier_cleanup(&s->retry);
 
     if (io_destroy(s->ctx) != 0) {
         fprintf(stderr, "%s: destroy AIO context %p failed\n",