Message ID | 1406720388-18671-10-git-send-email-ming.lei@canonical.com |
---|---|
State | New |
Headers | show |
Il 30/07/2014 13:39, Ming Lei ha scritto: > In the enqueue path, we can't complete request, otherwise > "Co-routine re-entered recursively" may be caused, so this > patch fixes the issue with below ideas: > > - for -EAGAIN or partial completion, retry the submission by > an introduced event handler > - for part of completion, also update the io queue > - for other failure, return the failure if in enqueue path, > otherwise, abort all queued I/O > > Signed-off-by: Ming Lei <ming.lei@canonical.com> > --- > block/linux-aio.c | 90 ++++++++++++++++++++++++++++++++++++++++------------- > 1 file changed, 68 insertions(+), 22 deletions(-) > > diff --git a/block/linux-aio.c b/block/linux-aio.c > index 7ac7e8c..5eb9c92 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -51,6 +51,7 @@ struct qemu_laio_state { > > /* io queue for submit at batch */ > LaioQueue io_q; > + EventNotifier retry; /* handle -EAGAIN and partial completion */ > }; > > static inline ssize_t io_event_ret(struct io_event *ev) > @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q) > io_q->plugged = 0; > } > > -static int ioq_submit(struct qemu_laio_state *s) > +static void abort_queue(struct qemu_laio_state *s) > +{ > + int i; > + for (i = 0; i < s->io_q.idx; i++) { > + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], > + struct qemu_laiocb, > + iocb); > + laiocb->ret = -EIO; > + qemu_laio_process_completion(s, laiocb); > + } > +} > + > +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) > { > int ret, i = 0; > int len = s->io_q.idx; > + int j = 0; > > - do { > - ret = io_submit(s->ctx, len, s->io_q.iocbs); > - } while (i++ < 3 && ret == -EAGAIN); > + if (!len) { > + return 0; > + } > > - /* empty io queue */ > - s->io_q.idx = 0; > + ret = io_submit(s->ctx, len, s->io_q.iocbs); > + if (ret == -EAGAIN) { > + event_notifier_set(&s->retry); Retrying immediately (and just doing a couple of system calls to waste time) is not an improvement. The right place to retry is in qemu_laio_completion_cb, after io_getevents has been called and presumably the queue depth has decreased. If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it will just work. Then you can only go to out_free_aiocb if the queue is full (independent of the "plug" state). Paolo > + return 0; > + } else if (ret < 0) { > + if (enqueue) { > + return ret; > + } > > - if (ret < 0) { > - i = 0; > - } else { > - i = ret; > + /* in non-queue path, all IOs have to be completed */ > + abort_queue(s); > + ret = len; > + } else if (ret == 0) { > + goto out; > } > > - for (; i < len; i++) { > - struct qemu_laiocb *laiocb = > - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); > - > - laiocb->ret = (ret < 0) ? ret : -EIO; > - qemu_laio_process_completion(s, laiocb); > + for (i = ret; i < len; i++) { > + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; > } > + > + out: > + /* update io queue */ > + s->io_q.idx -= ret; > + > return ret; > } > > -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > +static void ioq_submit_retry(EventNotifier *e) > +{ > + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, retry); > + > + event_notifier_test_and_clear(e); > + ioq_submit(s, false); > +} > + > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > { > unsigned int idx = s->io_q.idx; > > + if (unlikely(idx == s->io_q.size)) { > + return -1; > + } > + > s->io_q.iocbs[idx++] = iocb; > s->io_q.idx = idx; > > - /* submit immediately if queue is full */ > - if (idx == s->io_q.size) { > - ioq_submit(s); > + /* submit immediately if queue depth is above 2/3 */ > + if (idx > s->io_q.size * 2 / 3) { > + return ioq_submit(s, true); > } > + > + return 0; > } > > void laio_io_plug(BlockDriverState *bs, void *aio_ctx) > @@ -214,7 +250,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) > } > > if (s->io_q.idx > 0) { > - ret = ioq_submit(s); > + ret = ioq_submit(s, false); > } > > return ret; > @@ -258,7 +294,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, > goto out_free_aiocb; > } > } else { > - ioq_enqueue(s, iocbs); > + if (ioq_enqueue(s, iocbs) < 0) { > + goto out_free_aiocb; > + } > } > return &laiocb->common; > > @@ -272,6 +310,7 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) > struct qemu_laio_state *s = s_; > > aio_set_event_notifier(old_context, &s->e, NULL); > + aio_set_event_notifier(old_context, &s->retry, NULL); > } > > void laio_attach_aio_context(void *s_, AioContext *new_context) > @@ -279,6 +318,7 @@ void laio_attach_aio_context(void *s_, AioContext *new_context) > struct qemu_laio_state *s = s_; > > aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); > + aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry); > } > > void *laio_init(void) > @@ -295,9 +335,14 @@ void *laio_init(void) > } > > ioq_init(&s->io_q); > + if (event_notifier_init(&s->retry, false) < 0) { > + goto out_notifer_init; > + } > > return s; > > +out_notifer_init: > + io_destroy(s->ctx); > out_close_efd: > event_notifier_cleanup(&s->e); > out_free_state: > @@ -310,6 +355,7 @@ void laio_cleanup(void *s_) > struct qemu_laio_state *s = s_; > > event_notifier_cleanup(&s->e); > + event_notifier_cleanup(&s->retry); > > if (io_destroy(s->ctx) != 0) { > fprintf(stderr, "%s: destroy AIO context %p failed\n", >
On Wed, Jul 30, 2014 at 9:59 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > Il 30/07/2014 13:39, Ming Lei ha scritto: >> In the enqueue path, we can't complete request, otherwise >> "Co-routine re-entered recursively" may be caused, so this >> patch fixes the issue with below ideas: >> >> - for -EAGAIN or partial completion, retry the submission by >> an introduced event handler >> - for part of completion, also update the io queue >> - for other failure, return the failure if in enqueue path, >> otherwise, abort all queued I/O >> >> Signed-off-by: Ming Lei <ming.lei@canonical.com> >> --- >> block/linux-aio.c | 90 ++++++++++++++++++++++++++++++++++++++++------------- >> 1 file changed, 68 insertions(+), 22 deletions(-) >> >> diff --git a/block/linux-aio.c b/block/linux-aio.c >> index 7ac7e8c..5eb9c92 100644 >> --- a/block/linux-aio.c >> +++ b/block/linux-aio.c >> @@ -51,6 +51,7 @@ struct qemu_laio_state { >> >> /* io queue for submit at batch */ >> LaioQueue io_q; >> + EventNotifier retry; /* handle -EAGAIN and partial completion */ >> }; >> >> static inline ssize_t io_event_ret(struct io_event *ev) >> @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q) >> io_q->plugged = 0; >> } >> >> -static int ioq_submit(struct qemu_laio_state *s) >> +static void abort_queue(struct qemu_laio_state *s) >> +{ >> + int i; >> + for (i = 0; i < s->io_q.idx; i++) { >> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], >> + struct qemu_laiocb, >> + iocb); >> + laiocb->ret = -EIO; >> + qemu_laio_process_completion(s, laiocb); >> + } >> +} >> + >> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) >> { >> int ret, i = 0; >> int len = s->io_q.idx; >> + int j = 0; >> >> - do { >> - ret = io_submit(s->ctx, len, s->io_q.iocbs); >> - } while (i++ < 3 && ret == -EAGAIN); >> + if (!len) { >> + return 0; >> + } >> >> - /* empty io queue */ >> - s->io_q.idx = 0; >> + ret = io_submit(s->ctx, len, s->io_q.iocbs); >> + if (ret == -EAGAIN) { >> + event_notifier_set(&s->retry); > > Retrying immediately (and just doing a couple of system calls to waste > time) is not an improvement. The right place to retry is in > qemu_laio_completion_cb, after io_getevents has been called and > presumably the queue depth has decreased. Good point. > > If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it When will the queued I/O be submitted? That will introduce extra complexity definitely. It is a change for !s->io_q.plugged case, and it isn't good to do that in this patch, IMO. > will just work. Then you can only go to out_free_aiocb if the queue is > full (independent of the "plug" state). Thanks,
Il 30/07/2014 19:32, Ming Lei ha scritto: > On Wed, Jul 30, 2014 at 9:59 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: >> Il 30/07/2014 13:39, Ming Lei ha scritto: >>> In the enqueue path, we can't complete request, otherwise >>> "Co-routine re-entered recursively" may be caused, so this >>> patch fixes the issue with below ideas: >>> >>> - for -EAGAIN or partial completion, retry the submission by >>> an introduced event handler >>> - for part of completion, also update the io queue >>> - for other failure, return the failure if in enqueue path, >>> otherwise, abort all queued I/O >>> >>> Signed-off-by: Ming Lei <ming.lei@canonical.com> >>> --- >>> block/linux-aio.c | 90 ++++++++++++++++++++++++++++++++++++++++------------- >>> 1 file changed, 68 insertions(+), 22 deletions(-) >>> >>> diff --git a/block/linux-aio.c b/block/linux-aio.c >>> index 7ac7e8c..5eb9c92 100644 >>> --- a/block/linux-aio.c >>> +++ b/block/linux-aio.c >>> @@ -51,6 +51,7 @@ struct qemu_laio_state { >>> >>> /* io queue for submit at batch */ >>> LaioQueue io_q; >>> + EventNotifier retry; /* handle -EAGAIN and partial completion */ >>> }; >>> >>> static inline ssize_t io_event_ret(struct io_event *ev) >>> @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q) >>> io_q->plugged = 0; >>> } >>> >>> -static int ioq_submit(struct qemu_laio_state *s) >>> +static void abort_queue(struct qemu_laio_state *s) >>> +{ >>> + int i; >>> + for (i = 0; i < s->io_q.idx; i++) { >>> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], >>> + struct qemu_laiocb, >>> + iocb); >>> + laiocb->ret = -EIO; >>> + qemu_laio_process_completion(s, laiocb); >>> + } >>> +} >>> + >>> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) >>> { >>> int ret, i = 0; >>> int len = s->io_q.idx; >>> + int j = 0; >>> >>> - do { >>> - ret = io_submit(s->ctx, len, s->io_q.iocbs); >>> - } while (i++ < 3 && ret == -EAGAIN); >>> + if (!len) { >>> + return 0; >>> + } >>> >>> - /* empty io queue */ >>> - s->io_q.idx = 0; >>> + ret = io_submit(s->ctx, len, s->io_q.iocbs); >>> + if (ret == -EAGAIN) { >>> + event_notifier_set(&s->retry); >> >> Retrying immediately (and just doing a couple of system calls to waste >> time) is not an improvement. The right place to retry is in >> qemu_laio_completion_cb, after io_getevents has been called and >> presumably the queue depth has decreased. > > Good point. > >> >> If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it > > When will the queued I/O be submitted? That will introduce extra > complexity definitely. It will be submitted when qemu_laio_completion_cb is called. > It is a change for !s->io_q.plugged case, and it isn't good to do that in > this patch, IMO. I agree with you that this series is doing too many things at a single time. You can submit separate series for 1) no-coroutine fast path, 2) full queue, 3) multiqueue. If you do things properly you won't have a single conflict, since they affect respectively block.c, block/linux-aio.c and hw/block/. Paolo
diff --git a/block/linux-aio.c b/block/linux-aio.c index 7ac7e8c..5eb9c92 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -51,6 +51,7 @@ struct qemu_laio_state { /* io queue for submit at batch */ LaioQueue io_q; + EventNotifier retry; /* handle -EAGAIN and partial completion */ }; static inline ssize_t io_event_ret(struct io_event *ev) @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q) io_q->plugged = 0; } -static int ioq_submit(struct qemu_laio_state *s) +static void abort_queue(struct qemu_laio_state *s) +{ + int i; + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = -EIO; + qemu_laio_process_completion(s, laiocb); + } +} + +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) { int ret, i = 0; int len = s->io_q.idx; + int j = 0; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + if (!len) { + return 0; + } - /* empty io queue */ - s->io_q.idx = 0; + ret = io_submit(s->ctx, len, s->io_q.iocbs); + if (ret == -EAGAIN) { + event_notifier_set(&s->retry); + return 0; + } else if (ret < 0) { + if (enqueue) { + return ret; + } - if (ret < 0) { - i = 0; - } else { - i = ret; + /* in non-queue path, all IOs have to be completed */ + abort_queue(s); + ret = len; + } else if (ret == 0) { + goto out; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); - - laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); + for (i = ret; i < len; i++) { + s->io_q.iocbs[j++] = s->io_q.iocbs[i]; } + + out: + /* update io queue */ + s->io_q.idx -= ret; + return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static void ioq_submit_retry(EventNotifier *e) +{ + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, retry); + + event_notifier_test_and_clear(e); + ioq_submit(s, false); +} + +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + if (unlikely(idx == s->io_q.size)) { + return -1; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { - ioq_submit(s); + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { + return ioq_submit(s, true); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -214,7 +250,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) } if (s->io_q.idx > 0) { - ret = ioq_submit(s); + ret = ioq_submit(s, false); } return ret; @@ -258,7 +294,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) { + goto out_free_aiocb; + } } return &laiocb->common; @@ -272,6 +310,7 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(old_context, &s->e, NULL); + aio_set_event_notifier(old_context, &s->retry, NULL); } void laio_attach_aio_context(void *s_, AioContext *new_context) @@ -279,6 +318,7 @@ void laio_attach_aio_context(void *s_, AioContext *new_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); + aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry); } void *laio_init(void) @@ -295,9 +335,14 @@ void *laio_init(void) } ioq_init(&s->io_q); + if (event_notifier_init(&s->retry, false) < 0) { + goto out_notifer_init; + } return s; +out_notifer_init: + io_destroy(s->ctx); out_close_efd: event_notifier_cleanup(&s->e); out_free_state: @@ -310,6 +355,7 @@ void laio_cleanup(void *s_) struct qemu_laio_state *s = s_; event_notifier_cleanup(&s->e); + event_notifier_cleanup(&s->retry); if (io_destroy(s->ctx) != 0) { fprintf(stderr, "%s: destroy AIO context %p failed\n",
In the enqueue path, we can't complete request, otherwise "Co-routine re-entered recursively" may be caused, so this patch fixes the issue with below ideas: - for -EAGAIN or partial completion, retry the submission by an introduced event handler - for part of completion, also update the io queue - for other failure, return the failure if in enqueue path, otherwise, abort all queued I/O Signed-off-by: Ming Lei <ming.lei@canonical.com> --- block/linux-aio.c | 90 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 22 deletions(-)