diff mbox

[v7,1/3] linux-aio: fix submit aio as a batch

Message ID 1417424656-29714-2-git-send-email-ming.lei@canonical.com
State New
Headers show

Commit Message

Ming Lei Dec. 1, 2014, 9:04 a.m. UTC
In the submit path, we can't complete request directly,
otherwise "Co-routine re-entered recursively" may be caused,
so this patch fixes the issue with below ideas:

	- for -EAGAIN or partial submission, retry the submision
	in following completion cb which is run in BH context
	- for part of submission, update the io queue too
	- for case of io queue full, submit queued requests
	immediatelly and return failure to caller
	- for other failure, abort all queued requests in BH
    context, and requests won't be allow to submit until
    aborting is handled

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Ming Lei <ming.lei@canonical.com>
---
 block/linux-aio.c |  116 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 97 insertions(+), 19 deletions(-)
diff mbox

Patch

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..53c5616 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,21 @@  struct qemu_laiocb {
     QLIST_ENTRY(qemu_laiocb) node;
 };
 
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
 typedef struct {
     struct iocb *iocbs[MAX_QUEUED_IO];
     int plugged;
     unsigned int size;
     unsigned int idx;
+
+    /* abort queued requests in BH context */
+    QEMUBH *abort_bh;
+    bool aborting;
+    int abort_ret;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -59,6 +69,8 @@  struct qemu_laio_state {
     int event_max;
 };
 
+static int ioq_submit(struct qemu_laio_state *s);
+
 static inline ssize_t io_event_ret(struct io_event *ev)
 {
     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -135,6 +147,11 @@  static void qemu_laio_completion_bh(void *opaque)
 
         qemu_laio_process_completion(s, laiocb);
     }
+
+    /* Handle -EAGAIN or partial submission */
+    if (s->io_q.idx) {
+        ioq_submit(s);
+    }
 }
 
 static void qemu_laio_completion_cb(EventNotifier *e)
@@ -175,47 +192,100 @@  static void ioq_init(LaioQueue *io_q)
     io_q->size = MAX_QUEUED_IO;
     io_q->idx = 0;
     io_q->plugged = 0;
+    io_q->aborting = false;
 }
 
+/* Always return >= 0 and it means how many requests are submitted */
 static int ioq_submit(struct qemu_laio_state *s)
 {
-    int ret, i = 0;
+    int ret;
     int len = s->io_q.idx;
 
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
-
-    /* empty io queue */
-    s->io_q.idx = 0;
+    if (!len) {
+        return 0;
+    }
 
+    ret = io_submit(s->ctx, len, s->io_q.iocbs);
     if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        /* retry in following completion cb */
+        if (ret == -EAGAIN) {
+            return 0;
+        }
+
+        /*
+         * Abort in BH context for avoiding Co-routine re-entered,
+         * and update io queue at that time
+         */
+        s->io_q.aborting = true;
+        s->io_q.abort_ret = ret;
+        qemu_bh_schedule(s->io_q.abort_bh);
+        ret = 0;
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+    /*
+     * update io queue, and retry will be started automatically
+     * in following completion cb for the remainder
+     */
+    if (ret > 0) {
+        if (ret < len) {
+            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
+                    (len - ret) * sizeof(struct iocb *));
+        }
+        s->io_q.idx -= ret;
+    }
+
+    return ret;
+}
 
-        laiocb->ret = (ret < 0) ? ret : -EIO;
+static void ioq_abort_bh(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    int i;
+
+    for (i = 0; i < s->io_q.idx; i++) {
+        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+                                                  struct qemu_laiocb,
+                                                  iocb);
+        laiocb->ret = s->io_q.abort_ret;
         qemu_laio_process_completion(s, laiocb);
     }
-    return ret;
+
+    s->io_q.idx = 0;
+    s->io_q.aborting = false;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
     unsigned int idx = s->io_q.idx;
 
+    /* Request can't be allowed to submit until aborting is handled */
+    if (unlikely(s->io_q.aborting)) {
+        return -EIO;
+    }
+
+    if (unlikely(idx == s->io_q.size)) {
+        ioq_submit(s);
+
+        if (unlikely(s->io_q.aborting)) {
+            return -EIO;
+        }
+        idx = s->io_q.idx;
+    }
+
+    /* It has to return now if queue is still full */
+    if (unlikely(idx == s->io_q.size)) {
+        return -EAGAIN;
+    }
+
     s->io_q.iocbs[idx++] = iocb;
     s->io_q.idx = idx;
 
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
+    /* submit immediately if queue depth is above 2/3 */
+    if (idx > s->io_q.size * 2 / 3) {
         ioq_submit(s);
     }
+
+    return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -281,7 +351,9 @@  BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
             goto out_free_aiocb;
         }
     } else {
-        ioq_enqueue(s, iocbs);
+        if (ioq_enqueue(s, iocbs) < 0) {
+            goto out_free_aiocb;
+        }
     }
     return &laiocb->common;
 
@@ -296,14 +368,20 @@  void laio_detach_aio_context(void *s_, AioContext *old_context)
 
     aio_set_event_notifier(old_context, &s->e, NULL);
     qemu_bh_delete(s->completion_bh);
+    qemu_bh_delete(s->io_q.abort_bh);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
     s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
+
+    if (s->io_q.aborting) {
+        qemu_bh_schedule(s->io_q.abort_bh);
+    }
 }
 
 void *laio_init(void)