Message ID | 1418223122-22481-2-git-send-email-pbonzini@redhat.com |
---|---|
State | New |
Headers | show |
Am 10.12.2014 um 15:51 hat Paolo Bonzini geschrieben: > Keep a queue of requests that were not submitted; pass them to > the kernel when a completion is reported, unless the queue is > plugged. > > The array of iocbs is rebuilt every time from scratch. This > avoids keeping the iocbs array and list synchronized. > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> > --- > block/linux-aio.c | 75 ++++++++++++++++++++++++------------------------------- > 1 file changed, 33 insertions(+), 42 deletions(-) > > diff --git a/block/linux-aio.c b/block/linux-aio.c > index d92513b..b6fbfd8 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -35,14 +35,13 @@ struct qemu_laiocb { > size_t nbytes; > QEMUIOVector *qiov; > bool is_read; > - QLIST_ENTRY(qemu_laiocb) node; > + QSIMPLEQ_ENTRY(qemu_laiocb) next; > }; > > typedef struct { > - struct iocb *iocbs[MAX_QUEUED_IO]; > int plugged; > - unsigned int size; > unsigned int idx; > + QSIMPLEQ_HEAD(, qemu_laiocb) pending; > } LaioQueue; > > struct qemu_laio_state { > @@ -59,6 +58,8 @@ struct qemu_laio_state { > int event_max; > }; > > +static int ioq_submit(struct qemu_laio_state *s); > + > static inline ssize_t io_event_ret(struct io_event *ev) > { > return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); > @@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque) > > qemu_laio_process_completion(s, laiocb); > } > + > + if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { > + ioq_submit(s); > + } > } > > static void qemu_laio_completion_cb(EventNotifier *e) > @@ -172,52 +177,40 @@ static const AIOCBInfo laio_aiocb_info = { > > static void ioq_init(LaioQueue *io_q) > { > - io_q->size = MAX_QUEUED_IO; > - io_q->idx = 0; > + QSIMPLEQ_INIT(&io_q->pending); > io_q->plugged = 0; > + io_q->idx = 0; > } > > static int ioq_submit(struct qemu_laio_state *s) > { > - int ret, i = 0; > - int len = s->io_q.idx; > - > - do { > - ret = io_submit(s->ctx, len, s->io_q.iocbs); > - } while (i++ < 3 && ret == -EAGAIN); > + int ret, i; > + int len = 0; > + struct qemu_laiocb *aiocb; > + struct iocb *iocbs[MAX_QUEUED_IO]; > > - /* empty io queue */ > - s->io_q.idx = 0; > + QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) { > + iocbs[len++] = &aiocb->iocb; > + if (len == MAX_QUEUED_IO) { > + break; > + } > + } > > + ret = io_submit(s->ctx, len, iocbs); > + if (ret == -EAGAIN) { > + ret = 0; > + } > if (ret < 0) { > - i = 0; > - } else { > - i = ret; > + abort(); > } abort() doesn't feel right here. > > - for (; i < len; i++) { > - struct qemu_laiocb *laiocb = > - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); > - > - laiocb->ret = (ret < 0) ? ret : -EIO; > - qemu_laio_process_completion(s, laiocb); > + for (i = 0; i < ret; i++) { > + s->io_q.idx--; > + QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next); > } > return ret; > } > > -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > -{ > - unsigned int idx = s->io_q.idx; > - > - s->io_q.iocbs[idx++] = iocb; > - s->io_q.idx = idx; > - > - /* submit immediately if queue is full */ > - if (idx == s->io_q.size) { > - ioq_submit(s); > - } > -} > - > void laio_io_plug(BlockDriverState *bs, void *aio_ctx) > { > struct qemu_laio_state *s = aio_ctx; > @@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) > return 0; > } > > - if (s->io_q.idx > 0) { > + if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) { > ret = ioq_submit(s); > } > > @@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, > } > io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); > > - if (!s->io_q.plugged) { > - if (io_submit(s->ctx, 1, &iocbs) < 0) { > - goto out_free_aiocb; > - } > - } else { > - ioq_enqueue(s, iocbs); > + QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); > + s->io_q.idx++; > + if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) { More naturally written and more obviously correct as (!s->io_q,plugged || s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch converts it to, so I won't spend much time thinking about whether this version is actually right. > + ioq_submit(s); > } > return &laiocb->common; Kevin
On 11/12/2014 13:49, Kevin Wolf wrote: > > - } else { > > - i = ret; > > + abort(); > > } > > abort() doesn't feel right here. man doesn't suggest any error that can actually happen. > > + QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); > > + s->io_q.idx++; > > + if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) { > > More naturally written and more obviously correct as (!s->io_q,plugged || > s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch > converts it to, so I won't spend much time thinking about whether this > version is actually right. Sort of. If the queue is blocked due to -EAGAIN, I don't want to io_submit every time an operation is queued, hence the ==. The next patch adds !s->io_q.blocked, so it can use the more natural and indeed more obvious expression. Paolo
Am 11.12.2014 um 13:52 hat Paolo Bonzini geschrieben: > > > On 11/12/2014 13:49, Kevin Wolf wrote: > > > - } else { > > > - i = ret; > > > + abort(); > > > } > > > > abort() doesn't feel right here. > > man doesn't suggest any error that can actually happen. Yes. I guess I just like to be on the safe side. I would be fine with dropping requests on the floor and thereby breaking the block device in this unlikely case if proper error handling is too hard, but killing the qemu process generally makes me feel uncomfortable. > > > + QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); > > > + s->io_q.idx++; > > > + if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) { > > > > More naturally written and more obviously correct as (!s->io_q,plugged || > > s->io_q.idx >= MAX_QUEUED_IO). Which happens to be what the next patch > > converts it to, so I won't spend much time thinking about whether this > > version is actually right. > > Sort of. If the queue is blocked due to -EAGAIN, I don't want to > io_submit every time an operation is queued, hence the ==. The next > patch adds !s->io_q.blocked, so it can use the more natural and indeed > more obvious expression. Oh, I see. I didn't get that this was an optimisation. Kevin
On 11/12/2014 14:02, Kevin Wolf wrote: >> > > > - } else { >> > > > - i = ret; >> > > > + abort(); >> > > > } > > > > > > abort() doesn't feel right here. > > > > man doesn't suggest any error that can actually happen. > > Yes. I guess I just like to be on the safe side. I would be fine with > dropping requests on the floor and thereby breaking the block device in > this unlikely case if proper error handling is too hard, but killing the > qemu process generally makes me feel uncomfortable. Hmm, since it shouldn't happen I prefer for it to fail right away and make it easier to spot what happened in the core dump. Paolo
diff --git a/block/linux-aio.c b/block/linux-aio.c index d92513b..b6fbfd8 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -35,14 +35,13 @@ struct qemu_laiocb { size_t nbytes; QEMUIOVector *qiov; bool is_read; - QLIST_ENTRY(qemu_laiocb) node; + QSIMPLEQ_ENTRY(qemu_laiocb) next; }; typedef struct { - struct iocb *iocbs[MAX_QUEUED_IO]; int plugged; - unsigned int size; unsigned int idx; + QSIMPLEQ_HEAD(, qemu_laiocb) pending; } LaioQueue; struct qemu_laio_state { @@ -59,6 +58,8 @@ struct qemu_laio_state { int event_max; }; +static int ioq_submit(struct qemu_laio_state *s); + static inline ssize_t io_event_ret(struct io_event *ev) { return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); @@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque) qemu_laio_process_completion(s, laiocb); } + + if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { + ioq_submit(s); + } } static void qemu_laio_completion_cb(EventNotifier *e) @@ -172,52 +177,40 @@ static const AIOCBInfo laio_aiocb_info = { static void ioq_init(LaioQueue *io_q) { - io_q->size = MAX_QUEUED_IO; - io_q->idx = 0; + QSIMPLEQ_INIT(&io_q->pending); io_q->plugged = 0; + io_q->idx = 0; } static int ioq_submit(struct qemu_laio_state *s) { - int ret, i = 0; - int len = s->io_q.idx; - - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + int ret, i; + int len = 0; + struct qemu_laiocb *aiocb; + struct iocb *iocbs[MAX_QUEUED_IO]; - /* empty io queue */ - s->io_q.idx = 0; + QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) { + iocbs[len++] = &aiocb->iocb; + if (len == MAX_QUEUED_IO) { + break; + } + } + ret = io_submit(s->ctx, len, iocbs); + if (ret == -EAGAIN) { + ret = 0; + } if (ret < 0) { - i = 0; - } else { - i = ret; + abort(); } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); - - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); + for (i = 0; i < ret; i++) { + s->io_q.idx--; + QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next); } return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) -{ - unsigned int idx = s->io_q.idx; - - s->io_q.iocbs[idx++] = iocb; - s->io_q.idx = idx; - - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); - } -} - void laio_io_plug(BlockDriverState *bs, void *aio_ctx) { struct qemu_laio_state *s = aio_ctx; @@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) return 0; } - if (s->io_q.idx > 0) { + if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) { ret = ioq_submit(s); } @@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, } io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); - if (!s->io_q.plugged) { - if (io_submit(s->ctx, 1, &iocbs) < 0) { - goto out_free_aiocb; - } - } else { - ioq_enqueue(s, iocbs); + QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); + s->io_q.idx++; + if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) { + ioq_submit(s); } return &laiocb->common;
Keep a queue of requests that were not submitted; pass them to the kernel when a completion is reported, unless the queue is plugged. The array of iocbs is rebuilt every time from scratch. This avoids keeping the iocbs array and list synchronized. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> --- block/linux-aio.c | 75 ++++++++++++++++++++++++------------------------------- 1 file changed, 33 insertions(+), 42 deletions(-)