Message ID | 1415286601-30715-2-git-send-email-ming.lei@canonical.com |
---|---|
State | New |
Headers | show |
> @@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque) > } > } > > +static void qemu_laio_start_retry(struct qemu_laio_state *s) > +{ > + if (s->io_q.idx) > + qemu_bh_schedule(s->io_q.retry); > +} > + > static void qemu_laio_completion_cb(EventNotifier *e) > { > struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); > @@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) > if (event_notifier_test_and_clear(&s->e)) { > qemu_bh_schedule(s->completion_bh); > } > + qemu_laio_start_retry(s); I think you do not even need two bottom halves. Just call ioq_submit from completion_bh instead, after the call to io_getevents. > } > > static void laio_cancel(BlockAIOCB *blockacb) > @@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb) > } > > laiocb->common.cb(laiocb->common.opaque, laiocb->ret); > + > + /* check if there are requests in io queue */ > + qemu_laio_start_retry(laiocb->ctx); > } > > static const AIOCBInfo laio_aiocb_info = { > @@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q) > io_q->plugged = 0; > } > > -static int ioq_submit(struct qemu_laio_state *s) > +static void abort_queue(struct qemu_laio_state *s) > +{ > + int i; > + for (i = 0; i < s->io_q.idx; i++) { > + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], > + struct qemu_laiocb, > + iocb); > + laiocb->ret = -EIO; > + qemu_laio_process_completion(s, laiocb); > + } > +} > + > +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) > { > int ret, i = 0; > int len = s->io_q.idx; > + int j = 0; > > - do { > - ret = io_submit(s->ctx, len, s->io_q.iocbs); > - } while (i++ < 3 && ret == -EAGAIN); > + if (!len) { > + return 0; > + } > > - /* empty io queue */ > - s->io_q.idx = 0; > + ret = io_submit(s->ctx, len, s->io_q.iocbs); > + if (ret == -EAGAIN) { /* retry in following completion cb */ > + return 0; > + } else if (ret < 0) { > + if (enqueue) { > + return ret; > + } > > - if (ret < 0) { > - i = 0; > - } else { > - i = ret; > + /* in non-queue path, all IOs have to be completed */ > + abort_queue(s); > + ret = len; > + } else if (ret == 0) { > + goto out; No need for goto; just move the "for" loop inside this conditional. Or better, just use memmove. That is: if (ret < 0) { if (ret == -EAGAIN) { return 0; } if (enqueue) { return ret; } abort_queue(s); ret = len; } if (ret > 0) { memmove(...) s->io_q.idx -= ret; } return ret; > + * update io queue, for partial completion, retry will be > + * started automatically in following completion cb. > + */ > + s->io_q.idx -= ret; > + > return ret; > } > > -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > +static void ioq_submit_retry(void *opaque) > +{ > + struct qemu_laio_state *s = opaque; > + ioq_submit(s, false); > +} > + > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > { > unsigned int idx = s->io_q.idx; > > + if (unlikely(idx == s->io_q.size)) { > + return -1; return -EAGAIN? Thanks, Paolo > + } > + > s->io_q.iocbs[idx++] = iocb; > s->io_q.idx = idx; > > - /* submit immediately if queue is full */ > - if (idx == s->io_q.size) { > - ioq_submit(s); > + /* submit immediately if queue depth is above 2/3 */ > + if (idx > s->io_q.size * 2 / 3) { > + return ioq_submit(s, true); > } > + > + return 0; > } > > void laio_io_plug(BlockDriverState *bs, void *aio_ctx) > @@ -237,7 +290,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) > } > > if (s->io_q.idx > 0) { > - ret = ioq_submit(s); > + ret = ioq_submit(s, false); > } > > return ret; > @@ -281,7 +334,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, > goto out_free_aiocb; > } > } else { > - ioq_enqueue(s, iocbs); > + if (ioq_enqueue(s, iocbs) < 0) { > + goto out_free_aiocb; > + } > } > return &laiocb->common; > > @@ -296,12 +351,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) > > aio_set_event_notifier(old_context, &s->e, NULL); > qemu_bh_delete(s->completion_bh); > + qemu_bh_delete(s->io_q.retry); > } > > void laio_attach_aio_context(void *s_, AioContext *new_context) > { > struct qemu_laio_state *s = s_; > > + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s); > s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); > aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); > } >
On Tue, Nov 18, 2014 at 10:18 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > > >> @@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque) >> } >> } >> >> +static void qemu_laio_start_retry(struct qemu_laio_state *s) >> +{ >> + if (s->io_q.idx) >> + qemu_bh_schedule(s->io_q.retry); >> +} >> + >> static void qemu_laio_completion_cb(EventNotifier *e) >> { >> struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); >> @@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) >> if (event_notifier_test_and_clear(&s->e)) { >> qemu_bh_schedule(s->completion_bh); >> } >> + qemu_laio_start_retry(s); > > I think you do not even need two bottom halves. Just call ioq_submit > from completion_bh instead, after the call to io_getevents. Yes, that can save one BH, actually the patch was written when there wasn't completion BH, :-) > >> } >> >> static void laio_cancel(BlockAIOCB *blockacb) >> @@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb) >> } >> >> laiocb->common.cb(laiocb->common.opaque, laiocb->ret); >> + >> + /* check if there are requests in io queue */ >> + qemu_laio_start_retry(laiocb->ctx); >> } >> >> static const AIOCBInfo laio_aiocb_info = { >> @@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q) >> io_q->plugged = 0; >> } >> >> -static int ioq_submit(struct qemu_laio_state *s) >> +static void abort_queue(struct qemu_laio_state *s) >> +{ >> + int i; >> + for (i = 0; i < s->io_q.idx; i++) { >> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], >> + struct qemu_laiocb, >> + iocb); >> + laiocb->ret = -EIO; >> + qemu_laio_process_completion(s, laiocb); >> + } >> +} >> + >> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) >> { >> int ret, i = 0; >> int len = s->io_q.idx; >> + int j = 0; >> >> - do { >> - ret = io_submit(s->ctx, len, s->io_q.iocbs); >> - } while (i++ < 3 && ret == -EAGAIN); >> + if (!len) { >> + return 0; >> + } >> >> - /* empty io queue */ >> - s->io_q.idx = 0; >> + ret = io_submit(s->ctx, len, s->io_q.iocbs); >> + if (ret == -EAGAIN) { /* retry in following completion cb */ >> + return 0; >> + } else if (ret < 0) { >> + if (enqueue) { >> + return ret; >> + } >> >> - if (ret < 0) { >> - i = 0; >> - } else { >> - i = ret; >> + /* in non-queue path, all IOs have to be completed */ >> + abort_queue(s); >> + ret = len; >> + } else if (ret == 0) { >> + goto out; > > No need for goto; just move the "for" loop inside this conditional. Or > better, just use memmove. That is: > > if (ret < 0) { > if (ret == -EAGAIN) { > return 0; > } > if (enqueue) { > return ret; > } > > abort_queue(s); > ret = len; > } > > if (ret > 0) { > memmove(...) > s->io_q.idx -= ret; > } > return ret; The above is better. >> + * update io queue, for partial completion, retry will be >> + * started automatically in following completion cb. >> + */ >> + s->io_q.idx -= ret; >> + >> return ret; >> } >> >> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> +static void ioq_submit_retry(void *opaque) >> +{ >> + struct qemu_laio_state *s = opaque; >> + ioq_submit(s, false); >> +} >> + >> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> { >> unsigned int idx = s->io_q.idx; >> >> + if (unlikely(idx == s->io_q.size)) { >> + return -1; > > return -EAGAIN? It means the io queue is full, so the code has to fail the current request. Thanks, Ming Lei
On 22/11/2014 13:16, Ming Lei wrote: > > > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > > > { > > > unsigned int idx = s->io_q.idx; > > > > > > + if (unlikely(idx == s->io_q.size)) { > > > + return -1; > > > > return -EAGAIN? > > It means the io queue is full, so the code has to fail the current > request. Right, but "-1" is "-EPERM" which doesn't make much sense. I think the right thing to do is to return EAGAIN, and let the owner figure it out. For example, a SCSI device might return a "BUSY" status code. Paolo
On Mon, Nov 24, 2014 at 5:47 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > > > On 22/11/2014 13:16, Ming Lei wrote: >> > > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> > > { >> > > unsigned int idx = s->io_q.idx; >> > > >> > > + if (unlikely(idx == s->io_q.size)) { >> > > + return -1; >> > >> > return -EAGAIN? >> >> It means the io queue is full, so the code has to fail the current >> request. > > Right, but "-1" is "-EPERM" which doesn't make much sense. I think the > right thing to do is to return EAGAIN, and let the owner figure it out. > For example, a SCSI device might return a "BUSY" status code. We can do that, but laio_submit() doesn't return the code to callers. Thanks, Ming Lei
On 24/11/2014 11:02, Ming Lei wrote: >> > Right, but "-1" is "-EPERM" which doesn't make much sense. I think the >> > right thing to do is to return EAGAIN, and let the owner figure it out. >> > For example, a SCSI device might return a "BUSY" status code. > We can do that, but laio_submit() doesn't return the code to callers. That can be fixed later. But not returning a sensible errno only makes things more complicated later on. Paolo
diff --git a/block/linux-aio.c b/block/linux-aio.c index d92513b..f66e8ad 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -38,11 +38,19 @@ struct qemu_laiocb { QLIST_ENTRY(qemu_laiocb) node; }; +/* + * TODO: support to batch I/O from multiple bs in one same + * AIO context, one important use case is multi-lun scsi, + * so in future the IO queue should be per AIO context. + */ typedef struct { struct iocb *iocbs[MAX_QUEUED_IO]; int plugged; unsigned int size; unsigned int idx; + + /* handle -EAGAIN and partial completion */ + QEMUBH *retry; } LaioQueue; struct qemu_laio_state { @@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque) } } +static void qemu_laio_start_retry(struct qemu_laio_state *s) +{ + if (s->io_q.idx) + qemu_bh_schedule(s->io_q.retry); +} + static void qemu_laio_completion_cb(EventNotifier *e) { struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); @@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) if (event_notifier_test_and_clear(&s->e)) { qemu_bh_schedule(s->completion_bh); } + qemu_laio_start_retry(s); } static void laio_cancel(BlockAIOCB *blockacb) @@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb) } laiocb->common.cb(laiocb->common.opaque, laiocb->ret); + + /* check if there are requests in io queue */ + qemu_laio_start_retry(laiocb->ctx); } static const AIOCBInfo laio_aiocb_info = { @@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q) io_q->plugged = 0; } -static int ioq_submit(struct qemu_laio_state *s) +static void abort_queue(struct qemu_laio_state *s) +{ + int i; + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = -EIO; + qemu_laio_process_completion(s, laiocb); + } +} + +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) { int ret, i = 0; int len = s->io_q.idx; + int j = 0; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + if (!len) { + return 0; + } - /* empty io queue */ - s->io_q.idx = 0; + ret = io_submit(s->ctx, len, s->io_q.iocbs); + if (ret == -EAGAIN) { /* retry in following completion cb */ + return 0; + } else if (ret < 0) { + if (enqueue) { + return ret; + } - if (ret < 0) { - i = 0; - } else { - i = ret; + /* in non-queue path, all IOs have to be completed */ + abort_queue(s); + ret = len; + } else if (ret == 0) { + goto out; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); - - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); + for (i = ret; i < len; i++) { + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; } + + out: + /* + * update io queue, for partial completion, retry will be + * started automatically in following completion cb. + */ + s->io_q.idx -= ret; + return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static void ioq_submit_retry(void *opaque) +{ + struct qemu_laio_state *s = opaque; + ioq_submit(s, false); +} + +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + if (unlikely(idx == s->io_q.size)) { + return -1; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { + return ioq_submit(s, true); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -237,7 +290,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) } if (s->io_q.idx > 0) { - ret = ioq_submit(s); + ret = ioq_submit(s, false); } return ret; @@ -281,7 +334,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) { + goto out_free_aiocb; + } } return &laiocb->common; @@ -296,12 +351,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) aio_set_event_notifier(old_context, &s->e, NULL); qemu_bh_delete(s->completion_bh); + qemu_bh_delete(s->io_q.retry); } void laio_attach_aio_context(void *s_, AioContext *new_context) { struct qemu_laio_state *s = s_; + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s); s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); }
In the enqueue path, we can't complete request, otherwise "Co-routine re-entered recursively" may be caused, so this patch fixes the issue with below ideas: - for -EAGAIN or partial completion, retry the submision by schedule an BH in following completion cb - for part of completion, also update the io queue - for other failure, return the failure if in enqueue path, otherwise, abort all queued I/O Signed-off-by: Ming Lei <ming.lei@canonical.com> --- block/linux-aio.c | 101 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 79 insertions(+), 22 deletions(-)