diff mbox

[1/4] linux-aio: queue requests that cannot be submitted

Message ID 1418223122-22481-2-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Dec. 10, 2014, 2:51 p.m. UTC
Keep a queue of requests that were not submitted; pass them to
the kernel when a completion is reported, unless the queue is
plugged.

The array of iocbs is rebuilt every time from scratch.  This
avoids keeping the iocbs array and list synchronized.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/linux-aio.c | 75 ++++++++++++++++++++++++-------------------------------
 1 file changed, 33 insertions(+), 42 deletions(-)

Comments

Kevin Wolf Dec. 11, 2014, 12:49 p.m. UTC | #1
Am 10.12.2014 um 15:51 hat Paolo Bonzini geschrieben:
> Keep a queue of requests that were not submitted; pass them to
> the kernel when a completion is reported, unless the queue is
> plugged.
> 
> The array of iocbs is rebuilt every time from scratch.  This
> avoids keeping the iocbs array and list synchronized.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/linux-aio.c | 75 ++++++++++++++++++++++++-------------------------------
>  1 file changed, 33 insertions(+), 42 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index d92513b..b6fbfd8 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -35,14 +35,13 @@ struct qemu_laiocb {
>      size_t nbytes;
>      QEMUIOVector *qiov;
>      bool is_read;
> -    QLIST_ENTRY(qemu_laiocb) node;
> +    QSIMPLEQ_ENTRY(qemu_laiocb) next;
>  };
>  
>  typedef struct {
> -    struct iocb *iocbs[MAX_QUEUED_IO];
>      int plugged;
> -    unsigned int size;
>      unsigned int idx;
> +    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
>  } LaioQueue;
>  
>  struct qemu_laio_state {
> @@ -59,6 +58,8 @@ struct qemu_laio_state {
>      int event_max;
>  };
>  
> +static int ioq_submit(struct qemu_laio_state *s);
> +
>  static inline ssize_t io_event_ret(struct io_event *ev)
>  {
>      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
> @@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque)
>  
>          qemu_laio_process_completion(s, laiocb);
>      }
> +
> +    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
> +        ioq_submit(s);
> +    }
>  }
>  
>  static void qemu_laio_completion_cb(EventNotifier *e)
> @@ -172,52 +177,40 @@ static const AIOCBInfo laio_aiocb_info = {
>  
>  static void ioq_init(LaioQueue *io_q)
>  {
> -    io_q->size = MAX_QUEUED_IO;
> -    io_q->idx = 0;
> +    QSIMPLEQ_INIT(&io_q->pending);
>      io_q->plugged = 0;
> +    io_q->idx = 0;
>  }
>  
>  static int ioq_submit(struct qemu_laio_state *s)
>  {
> -    int ret, i = 0;
> -    int len = s->io_q.idx;
> -
> -    do {
> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -    } while (i++ < 3 && ret == -EAGAIN);
> +    int ret, i;
> +    int len = 0;
> +    struct qemu_laiocb *aiocb;
> +    struct iocb *iocbs[MAX_QUEUED_IO];
>  
> -    /* empty io queue */
> -    s->io_q.idx = 0;
> +    QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
> +        iocbs[len++] = &aiocb->iocb;
> +        if (len == MAX_QUEUED_IO) {
> +            break;
> +        }
> +    }
>  
> +    ret = io_submit(s->ctx, len, iocbs);
> +    if (ret == -EAGAIN) {
> +        ret = 0;
> +    }
>      if (ret < 0) {
> -        i = 0;
> -    } else {
> -        i = ret;
> +        abort();
>      }

abort() doesn't feel right here.

>  
> -    for (; i < len; i++) {
> -        struct qemu_laiocb *laiocb =
> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
> -
> -        laiocb->ret = (ret < 0) ? ret : -EIO;
> -        qemu_laio_process_completion(s, laiocb);
> +    for (i = 0; i < ret; i++) {
> +        s->io_q.idx--;
> +        QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
>      }
>      return ret;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> -{
> -    unsigned int idx = s->io_q.idx;
> -
> -    s->io_q.iocbs[idx++] = iocb;
> -    s->io_q.idx = idx;
> -
> -    /* submit immediately if queue is full */
> -    if (idx == s->io_q.size) {
> -        ioq_submit(s);
> -    }
> -}
> -
>  void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
>  {
>      struct qemu_laio_state *s = aio_ctx;
> @@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
>          return 0;
>      }
>  
> -    if (s->io_q.idx > 0) {
> +    if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
>          ret = ioq_submit(s);
>      }
>  
> @@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
>      }
>      io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
>  
> -    if (!s->io_q.plugged) {
> -        if (io_submit(s->ctx, 1, &iocbs) < 0) {
> -            goto out_free_aiocb;
> -        }
> -    } else {
> -        ioq_enqueue(s, iocbs);
> +    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
> +    s->io_q.idx++;
> +    if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {

More naturally written and more obviously correct as (!s->io_q,plugged ||
s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch
converts it to, so I won't spend much time thinking about whether this
version is actually right.

> +        ioq_submit(s);
>      }
>      return &laiocb->common;

Kevin
Paolo Bonzini Dec. 11, 2014, 12:52 p.m. UTC | #2
On 11/12/2014 13:49, Kevin Wolf wrote:
> > -    } else {
> > -        i = ret;
> > +        abort();
> >      }
> 
> abort() doesn't feel right here.

man doesn't suggest any error that can actually happen.

> > +    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
> > +    s->io_q.idx++;
> > +    if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {
> 
> More naturally written and more obviously correct as (!s->io_q,plugged ||
> s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch
> converts it to, so I won't spend much time thinking about whether this
> version is actually right.

Sort of.  If the queue is blocked due to -EAGAIN, I don't want to
io_submit every time an operation is queued, hence the ==.  The next
patch adds !s->io_q.blocked, so it can use the more natural and indeed
more obvious expression.

Paolo
Kevin Wolf Dec. 11, 2014, 1:02 p.m. UTC | #3
Am 11.12.2014 um 13:52 hat Paolo Bonzini geschrieben:
> 
> 
> On 11/12/2014 13:49, Kevin Wolf wrote:
> > > -    } else {
> > > -        i = ret;
> > > +        abort();
> > >      }
> > 
> > abort() doesn't feel right here.
> 
> man doesn't suggest any error that can actually happen.

Yes. I guess I just like to be on the safe side. I would be fine with
dropping requests on the floor and thereby breaking the block device in
this unlikely case if proper error handling is too hard, but killing the
qemu process generally makes me feel uncomfortable.

> > > +    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
> > > +    s->io_q.idx++;
> > > +    if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {
> > 
> > More naturally written and more obviously correct as (!s->io_q,plugged ||
> > s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch
> > converts it to, so I won't spend much time thinking about whether this
> > version is actually right.
> 
> Sort of.  If the queue is blocked due to -EAGAIN, I don't want to
> io_submit every time an operation is queued, hence the ==.  The next
> patch adds !s->io_q.blocked, so it can use the more natural and indeed
> more obvious expression.

Oh, I see. I didn't get that this was an optimisation.

Kevin
Paolo Bonzini Dec. 11, 2014, 1:07 p.m. UTC | #4
On 11/12/2014 14:02, Kevin Wolf wrote:
>> > > > -    } else {
>> > > > -        i = ret;
>> > > > +        abort();
>> > > >      }
> > > 
> > > abort() doesn't feel right here.
> > 
> > man doesn't suggest any error that can actually happen.
> 
> Yes. I guess I just like to be on the safe side. I would be fine with
> dropping requests on the floor and thereby breaking the block device in
> this unlikely case if proper error handling is too hard, but killing the
> qemu process generally makes me feel uncomfortable.

Hmm, since it shouldn't happen I prefer for it to fail right away and
make it easier to spot what happened in the core dump.

Paolo
diff mbox

Patch

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..b6fbfd8 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -35,14 +35,13 @@  struct qemu_laiocb {
     size_t nbytes;
     QEMUIOVector *qiov;
     bool is_read;
-    QLIST_ENTRY(qemu_laiocb) node;
+    QSIMPLEQ_ENTRY(qemu_laiocb) next;
 };
 
 typedef struct {
-    struct iocb *iocbs[MAX_QUEUED_IO];
     int plugged;
-    unsigned int size;
     unsigned int idx;
+    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -59,6 +58,8 @@  struct qemu_laio_state {
     int event_max;
 };
 
+static int ioq_submit(struct qemu_laio_state *s);
+
 static inline ssize_t io_event_ret(struct io_event *ev)
 {
     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -135,6 +136,10 @@  static void qemu_laio_completion_bh(void *opaque)
 
         qemu_laio_process_completion(s, laiocb);
     }
+
+    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
+        ioq_submit(s);
+    }
 }
 
 static void qemu_laio_completion_cb(EventNotifier *e)
@@ -172,52 +177,40 @@  static const AIOCBInfo laio_aiocb_info = {
 
 static void ioq_init(LaioQueue *io_q)
 {
-    io_q->size = MAX_QUEUED_IO;
-    io_q->idx = 0;
+    QSIMPLEQ_INIT(&io_q->pending);
     io_q->plugged = 0;
+    io_q->idx = 0;
 }
 
 static int ioq_submit(struct qemu_laio_state *s)
 {
-    int ret, i = 0;
-    int len = s->io_q.idx;
-
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
+    int ret, i;
+    int len = 0;
+    struct qemu_laiocb *aiocb;
+    struct iocb *iocbs[MAX_QUEUED_IO];
 
-    /* empty io queue */
-    s->io_q.idx = 0;
+    QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
+        iocbs[len++] = &aiocb->iocb;
+        if (len == MAX_QUEUED_IO) {
+            break;
+        }
+    }
 
+    ret = io_submit(s->ctx, len, iocbs);
+    if (ret == -EAGAIN) {
+        ret = 0;
+    }
     if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        abort();
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
-
-        laiocb->ret = (ret < 0) ? ret : -EIO;
-        qemu_laio_process_completion(s, laiocb);
+    for (i = 0; i < ret; i++) {
+        s->io_q.idx--;
+        QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
     }
     return ret;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
-{
-    unsigned int idx = s->io_q.idx;
-
-    s->io_q.iocbs[idx++] = iocb;
-    s->io_q.idx = idx;
-
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
-        ioq_submit(s);
-    }
-}
-
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
 {
     struct qemu_laio_state *s = aio_ctx;
@@ -236,7 +229,7 @@  int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
         return 0;
     }
 
-    if (s->io_q.idx > 0) {
+    if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
         ret = ioq_submit(s);
     }
 
@@ -276,12 +269,10 @@  BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
     }
     io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
 
-    if (!s->io_q.plugged) {
-        if (io_submit(s->ctx, 1, &iocbs) < 0) {
-            goto out_free_aiocb;
-        }
-    } else {
-        ioq_enqueue(s, iocbs);
+    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
+    s->io_q.idx++;
+    if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {
+        ioq_submit(s);
     }
     return &laiocb->common;