Message ID | 1416900193-3763-2-git-send-email-ming.lei@canonical.com |
---|---|
State | New |
Headers | show |
On Tue, Nov 25, 2014 at 03:23:11PM +0800, Ming Lei wrote: > @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) > > aio_set_event_notifier(old_context, &s->e, NULL); > qemu_bh_delete(s->completion_bh); > + qemu_bh_delete(s->io_q.abort_bh); > } > > void laio_attach_aio_context(void *s_, AioContext *new_context) > { > struct qemu_laio_state *s = s_; > > + s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s); > s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); > aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); > } These functions are incomplete when ->aborting == true. I can't think of a reason why we are guaranteed never to hit that state, and fixing it is easy. Just add the following to the end of laio_attach_aio_context(): if (s->aborting) { qemu_bh_schedule(s->io_q.abort_bh); } Stefan
On Tue, Nov 25, 2014 at 9:08 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > On Tue, Nov 25, 2014 at 03:23:11PM +0800, Ming Lei wrote: >> @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) >> >> aio_set_event_notifier(old_context, &s->e, NULL); >> qemu_bh_delete(s->completion_bh); >> + qemu_bh_delete(s->io_q.abort_bh); >> } >> >> void laio_attach_aio_context(void *s_, AioContext *new_context) >> { >> struct qemu_laio_state *s = s_; >> >> + s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s); >> s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); >> aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); >> } > > These functions are incomplete when ->aborting == true. I can't think Could you explain in a bit when ->aborting is true during attach callback? > of a reason why we are guaranteed never to hit that state, and fixing it > is easy. Just add the following to the end of > laio_attach_aio_context(): > > if (s->aborting) { > qemu_bh_schedule(s->io_q.abort_bh); > } You mean the abort BH may not have chance to run before its deletion in the detach callback? If so, bdrv_drain_all() from bdrv_set_aio_context() should have handled the pending BH, right? Thanks, Ming Lei
On Tue, Nov 25, 2014 at 10:45:05PM +0800, Ming Lei wrote: > On Tue, Nov 25, 2014 at 9:08 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > > On Tue, Nov 25, 2014 at 03:23:11PM +0800, Ming Lei wrote: > >> @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) > >> > >> aio_set_event_notifier(old_context, &s->e, NULL); > >> qemu_bh_delete(s->completion_bh); > >> + qemu_bh_delete(s->io_q.abort_bh); > >> } > >> > >> void laio_attach_aio_context(void *s_, AioContext *new_context) > >> { > >> struct qemu_laio_state *s = s_; > >> > >> + s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s); > >> s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); > >> aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); > >> } > > > > These functions are incomplete when ->aborting == true. I can't think > > Could you explain in a bit when ->aborting is true during attach callback? > > > of a reason why we are guaranteed never to hit that state, and fixing it > > is easy. Just add the following to the end of > > laio_attach_aio_context(): > > > > if (s->aborting) { > > qemu_bh_schedule(s->io_q.abort_bh); > > } > > You mean the abort BH may not have chance to run before its deletion > in the detach callback? Exactly. Any time you schedule a BH you need to be aware of things that may happen before the BH is invoked. > If so, bdrv_drain_all() from bdrv_set_aio_context() should have > handled the pending BH, right? I'm not sure if it's good to make subtle assumptions like that. If the code changes they will break. Since it is very easy to protect against this case (the code I posted before), it seems worthwhile to be on the safe side. Stefan
On Wed, Nov 26, 2014 at 12:18 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote: >> >> You mean the abort BH may not have chance to run before its deletion >> in the detach callback? > > Exactly. Any time you schedule a BH you need to be aware of things that > may happen before the BH is invoked. > >> If so, bdrv_drain_all() from bdrv_set_aio_context() should have >> handled the pending BH, right? > > I'm not sure if it's good to make subtle assumptions like that. If the > code changes they will break. IMO, that should be the purpose of bdrv_drain_all(), at least from its comment: /* ensure there are no in-flight requests */ If it changes in future, the general problem has to be considered. > Since it is very easy to protect against this case (the code I posted > before), it seems worthwhile to be on the safe side. Given there hasn't the potential problem in current tree, could you agree on merging it first? BTW, there isn't sort of handling for 'completion_bh' of linux aio too, :-) Thanks, Ming Lei
Am 25.11.2014 um 08:23 hat Ming Lei geschrieben: > In the submit path, we can't complete request directly, > otherwise "Co-routine re-entered recursively" may be caused, > so this patch fixes the issue with below ideas: > > - for -EAGAIN or partial completion, retry the submision > in following completion cb which is run in BH context > - for part of completion, update the io queue too > - for case of io queue full, submit queued requests > immediatelly and return failure to caller > - for other failure, abort all queued requests in BH > context, and requests won't be allow to submit until > aborting is handled > > Reviewed-by: Paolo Bonzini <pbonzini@redhat.com> > Signed-off-by: Ming Lei <ming.lei@canonical.com> This looks like a quite complex fix to this problem, and it introduces new error cases, too (while aborting, requests fail now, but they really should just be waiting). I'm wondering if this is the time to convert the linux-aio interface to coroutines finally. It wouldn't only be a performance optimisation, but would potentially also simplify this code. > block/linux-aio.c | 114 ++++++++++++++++++++++++++++++++++++++++++++--------- > 1 file changed, 95 insertions(+), 19 deletions(-) > > diff --git a/block/linux-aio.c b/block/linux-aio.c > index d92513b..11ac828 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -38,11 +38,20 @@ struct qemu_laiocb { > QLIST_ENTRY(qemu_laiocb) node; > }; > > +/* > + * TODO: support to batch I/O from multiple bs in one same > + * AIO context, one important use case is multi-lun scsi, > + * so in future the IO queue should be per AIO context. > + */ > typedef struct { > struct iocb *iocbs[MAX_QUEUED_IO]; > int plugged; > unsigned int size; > unsigned int idx; > + > + /* abort queued requests in BH context */ > + QEMUBH *abort_bh; > + bool aborting; Two spaces. > } LaioQueue; > > struct qemu_laio_state { > @@ -59,6 +68,8 @@ struct qemu_laio_state { > int event_max; > }; > > +static int ioq_submit(struct qemu_laio_state *s); > + > static inline ssize_t io_event_ret(struct io_event *ev) > { > return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); > @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, > qemu_aio_unref(laiocb); > } > > +static void qemu_laio_start_retry(struct qemu_laio_state *s) > +{ > + if (s->io_q.idx) { > + ioq_submit(s); > + } > +} > + > /* The completion BH fetches completed I/O requests and invokes their > * callbacks. > * > @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque) > > qemu_laio_process_completion(s, laiocb); > } > + > + qemu_laio_start_retry(s); > } Why is qemu_laio_start_retry() a separate function? This is the only caller. > static void qemu_laio_completion_cb(EventNotifier *e) > @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q) > io_q->size = MAX_QUEUED_IO; > io_q->idx = 0; > io_q->plugged = 0; > + io_q->aborting = false; > } > > +/* Always return >= 0 and it means how many requests are submitted */ > static int ioq_submit(struct qemu_laio_state *s) > { > - int ret, i = 0; > + int ret; > int len = s->io_q.idx; > > - do { > - ret = io_submit(s->ctx, len, s->io_q.iocbs); > - } while (i++ < 3 && ret == -EAGAIN); > - > - /* empty io queue */ > - s->io_q.idx = 0; > + if (!len) { > + return 0; > + } > > + ret = io_submit(s->ctx, len, s->io_q.iocbs); > if (ret < 0) { > - i = 0; > - } else { > - i = ret; > + /* retry in following completion cb */ > + if (ret == -EAGAIN) { > + return 0; > + } > + > + /* > + * Abort in BH context for avoiding Co-routine re-entered, > + * and update io queue at that time > + */ > + qemu_bh_schedule(s->io_q.abort_bh); > + s->io_q.aborting = true; > + ret = 0; > } > > - for (; i < len; i++) { > - struct qemu_laiocb *laiocb = > - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); > + /* > + * update io queue, and retry will be started automatically > + * in following completion cb for the remainder > + */ > + if (ret > 0) { > + if (ret < len) { > + memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret], > + (len - ret) * sizeof(struct iocb *)); > + } > + s->io_q.idx -= ret; > + } Support for partly handled queues is nice, but a logically separate change. Please move this to its own patch. > - laiocb->ret = (ret < 0) ? ret : -EIO; > + return ret; > +} > + > +static void ioq_abort_bh(void *opaque) > +{ > + struct qemu_laio_state *s = opaque; > + int i; > + > + for (i = 0; i < s->io_q.idx; i++) { > + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], > + struct qemu_laiocb, > + iocb); > + laiocb->ret = -EIO; We shouldn't be throwing the real error code away. > qemu_laio_process_completion(s, laiocb); > } > - return ret; > + > + s->io_q.idx = 0; > + s->io_q.aborting = false; > } > > -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > { > unsigned int idx = s->io_q.idx; > > + /* Request can't be allowed to submit until aborting is handled */ > + if (unlikely(s->io_q.aborting)) { > + return -EIO; > + } > + > + if (unlikely(idx == s->io_q.size)) { > + ioq_submit(s); > + > + if (unlikely(s->io_q.aborting)) { > + return -EIO; > + } > + idx = s->io_q.idx; > + } > + > + /* It has to return now if queue is still full */ > + if (unlikely(idx == s->io_q.size)) { > + return -EAGAIN; > + } > + > s->io_q.iocbs[idx++] = iocb; > s->io_q.idx = idx; > > - /* submit immediately if queue is full */ > - if (idx == s->io_q.size) { > + /* submit immediately if queue depth is above 2/3 */ > + if (idx > s->io_q.size * 2 / 3) { > ioq_submit(s); > } This change looks independent as well. Also isn't mentioned in the commit message. Why is it a good idea to use only 2/3 of the queue instead of making use of its full length? > + return 0; > } Kevin
Am 25.11.2014 um 08:23 hat Ming Lei geschrieben: > In the submit path, we can't complete request directly, > otherwise "Co-routine re-entered recursively" may be caused, One more thing: Can you write a qemu-iotests case to test this code path? (Which should fail on master and be fixed with this patch, obviously.) Kevin
On Wed, Nov 26, 2014 at 05:15:44PM +0800, Ming Lei wrote: > On Wed, Nov 26, 2014 at 12:18 AM, Stefan Hajnoczi <stefanha@gmail.com> wrote: > >> > >> You mean the abort BH may not have chance to run before its deletion > >> in the detach callback? > > > > Exactly. Any time you schedule a BH you need to be aware of things that > > may happen before the BH is invoked. > > > >> If so, bdrv_drain_all() from bdrv_set_aio_context() should have > >> handled the pending BH, right? > > > > I'm not sure if it's good to make subtle assumptions like that. If the > > code changes they will break. > > IMO, that should be the purpose of bdrv_drain_all(), at least from > its comment: > > /* ensure there are no in-flight requests */ > > If it changes in future, the general problem has to be considered. > > > Since it is very easy to protect against this case (the code I posted > > before), it seems worthwhile to be on the safe side. > > Given there hasn't the potential problem in current tree, could you > agree on merging it first? > > BTW, there isn't sort of handling for 'completion_bh' of linux aio too, :-) That's incorrect, completion_bh is protected: void laio_attach_aio_context(void *s_, AioContext *new_context) { struct qemu_laio_state *s = s_; s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); ^---- this will reschedule completion_bh if s->e has events pending I am asking for a one-line if statement to avoid introducing a subtle assumption that would be a pain to debug in the future. It's so easy to add that I'm against merging the patch without this protection.
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 On 27/11/2014 17:56, Stefan Hajnoczi wrote: > I am asking for a one-line if statement to avoid introducing a > subtle assumption that would be a pain to debug in the future. > It's so easy to add that I'm against merging the patch without this > protection. Yeah, either that or an assertion must be there. Paolo -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQEcBAEBAgAGBQJUd1guAAoJEL/70l94x66DfR8H/1De/j4oEmPA1XKSguoXtJz+ vjMzSg93cfpE4e095LqQ7Ho7imSxWuSNPf7vT6NBM0jRTebBTwMk224NCp1IkCYv U9nFCo3aJsRizSlDO4AjUJv55AsGUJXlo/gjXwwYPBcbn4lC3GV2MNCcZi/rAfMe FGPozxARBqT+6BD200meRsrIbsGvLjQXwCgWsDpb5OuxBM3tB7amJQNIIG9GI0uI YTTn1Vtn7E2lsYKNu4Rv2yL6Lg6zAkJizZWMBk1uJI9WzdFpbb+tarQKI0nrsB3D 4SXAhyAGTkFmvNpWwoFQLHd9hWtqUIZzN1n7Vlperj+/O27pKPtJujpl5jGgkc8= =lXXB -----END PGP SIGNATURE-----
On Wed, Nov 26, 2014 at 7:18 PM, Kevin Wolf <kwolf@redhat.com> wrote: > Am 25.11.2014 um 08:23 hat Ming Lei geschrieben: >> In the submit path, we can't complete request directly, >> otherwise "Co-routine re-entered recursively" may be caused, >> so this patch fixes the issue with below ideas: >> >> - for -EAGAIN or partial completion, retry the submision >> in following completion cb which is run in BH context >> - for part of completion, update the io queue too >> - for case of io queue full, submit queued requests >> immediatelly and return failure to caller >> - for other failure, abort all queued requests in BH >> context, and requests won't be allow to submit until >> aborting is handled >> >> Reviewed-by: Paolo Bonzini <pbonzini@redhat.com> >> Signed-off-by: Ming Lei <ming.lei@canonical.com> > > This looks like a quite complex fix to this problem, and it introduces > new error cases, too (while aborting, requests fail now, but they really > should just be waiting). "Co-routine re-entered recursively" is only triggered when io_submit() returns failure(either -EAGAIN or other error) or partial completion, so this patch actually is for handling failure of io_submit() and making linux-aio more reliably. After io queue is introduce, it is a bit easy to trigger the failure, especially in case of multi-queue, or VM driver sets a longer queue depth. So all cases(EAGAIN, other failure and partial completion) have to be considered too. So it doesn't mean a complex fix, and the actual problem is complex too. Not mention previously linux aio doesn't handle -EAGAIN. > > I'm wondering if this is the time to convert the linux-aio interface to > coroutines finally. It wouldn't only be a performance optimisation, but > would potentially also simplify this code. Even with coroutine, the above io_submit() issues has to be considered too. > >> block/linux-aio.c | 114 ++++++++++++++++++++++++++++++++++++++++++++--------- >> 1 file changed, 95 insertions(+), 19 deletions(-) >> >> diff --git a/block/linux-aio.c b/block/linux-aio.c >> index d92513b..11ac828 100644 >> --- a/block/linux-aio.c >> +++ b/block/linux-aio.c >> @@ -38,11 +38,20 @@ struct qemu_laiocb { >> QLIST_ENTRY(qemu_laiocb) node; >> }; >> >> +/* >> + * TODO: support to batch I/O from multiple bs in one same >> + * AIO context, one important use case is multi-lun scsi, >> + * so in future the IO queue should be per AIO context. >> + */ >> typedef struct { >> struct iocb *iocbs[MAX_QUEUED_IO]; >> int plugged; >> unsigned int size; >> unsigned int idx; >> + >> + /* abort queued requests in BH context */ >> + QEMUBH *abort_bh; >> + bool aborting; > > Two spaces. OK. > >> } LaioQueue; >> >> struct qemu_laio_state { >> @@ -59,6 +68,8 @@ struct qemu_laio_state { >> int event_max; >> }; >> >> +static int ioq_submit(struct qemu_laio_state *s); >> + >> static inline ssize_t io_event_ret(struct io_event *ev) >> { >> return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); >> @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, >> qemu_aio_unref(laiocb); >> } >> >> +static void qemu_laio_start_retry(struct qemu_laio_state *s) >> +{ >> + if (s->io_q.idx) { >> + ioq_submit(s); >> + } >> +} >> + >> /* The completion BH fetches completed I/O requests and invokes their >> * callbacks. >> * >> @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque) >> >> qemu_laio_process_completion(s, laiocb); >> } >> + >> + qemu_laio_start_retry(s); >> } > > Why is qemu_laio_start_retry() a separate function? This is the only > caller. OK. > >> static void qemu_laio_completion_cb(EventNotifier *e) >> @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q) >> io_q->size = MAX_QUEUED_IO; >> io_q->idx = 0; >> io_q->plugged = 0; >> + io_q->aborting = false; >> } >> >> +/* Always return >= 0 and it means how many requests are submitted */ >> static int ioq_submit(struct qemu_laio_state *s) >> { >> - int ret, i = 0; >> + int ret; >> int len = s->io_q.idx; >> >> - do { >> - ret = io_submit(s->ctx, len, s->io_q.iocbs); >> - } while (i++ < 3 && ret == -EAGAIN); >> - >> - /* empty io queue */ >> - s->io_q.idx = 0; >> + if (!len) { >> + return 0; >> + } >> >> + ret = io_submit(s->ctx, len, s->io_q.iocbs); >> if (ret < 0) { >> - i = 0; >> - } else { >> - i = ret; >> + /* retry in following completion cb */ >> + if (ret == -EAGAIN) { >> + return 0; >> + } >> + >> + /* >> + * Abort in BH context for avoiding Co-routine re-entered, >> + * and update io queue at that time >> + */ >> + qemu_bh_schedule(s->io_q.abort_bh); >> + s->io_q.aborting = true; >> + ret = 0; >> } >> >> - for (; i < len; i++) { >> - struct qemu_laiocb *laiocb = >> - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); > >> + /* >> + * update io queue, and retry will be started automatically >> + * in following completion cb for the remainder >> + */ >> + if (ret > 0) { >> + if (ret < len) { >> + memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret], >> + (len - ret) * sizeof(struct iocb *)); >> + } >> + s->io_q.idx -= ret; >> + } > > Support for partly handled queues is nice, but a logically separate > change. Please move this to its own patch. As I described above, the issue of "Co-routine re-entered recursively" is triggered in current handling of partly completion. > >> - laiocb->ret = (ret < 0) ? ret : -EIO; >> + return ret; >> +} >> + >> +static void ioq_abort_bh(void *opaque) >> +{ >> + struct qemu_laio_state *s = opaque; >> + int i; >> + >> + for (i = 0; i < s->io_q.idx; i++) { >> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], >> + struct qemu_laiocb, >> + iocb); >> + laiocb->ret = -EIO; > > We shouldn't be throwing the real error code away. OK, will do that. > >> qemu_laio_process_completion(s, laiocb); >> } >> - return ret; >> + >> + s->io_q.idx = 0; >> + s->io_q.aborting = false; >> } >> >> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> { >> unsigned int idx = s->io_q.idx; >> >> + /* Request can't be allowed to submit until aborting is handled */ >> + if (unlikely(s->io_q.aborting)) { >> + return -EIO; >> + } >> + >> + if (unlikely(idx == s->io_q.size)) { >> + ioq_submit(s); >> + >> + if (unlikely(s->io_q.aborting)) { >> + return -EIO; >> + } >> + idx = s->io_q.idx; >> + } >> + >> + /* It has to return now if queue is still full */ >> + if (unlikely(idx == s->io_q.size)) { >> + return -EAGAIN; >> + } >> + >> s->io_q.iocbs[idx++] = iocb; >> s->io_q.idx = idx; >> >> - /* submit immediately if queue is full */ >> - if (idx == s->io_q.size) { >> + /* submit immediately if queue depth is above 2/3 */ >> + if (idx > s->io_q.size * 2 / 3) { >> ioq_submit(s); >> } > > This change looks independent as well. Also isn't mentioned in the > commit message. Why is it a good idea to use only 2/3 of the queue > instead of making use of its full length? When queue is full, the new submission has to return failure, so it is used for making linux-aio more reliable. Thanks, Ming Lei
On Fri, Nov 28, 2014 at 12:58 AM, Paolo Bonzini <pbonzini@redhat.com> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > > > On 27/11/2014 17:56, Stefan Hajnoczi wrote: >> I am asking for a one-line if statement to avoid introducing a >> subtle assumption that would be a pain to debug in the future. >> It's so easy to add that I'm against merging the patch without this >> protection. > > Yeah, either that or an assertion must be there. OK. Thanks, Ming Lei
diff --git a/block/linux-aio.c b/block/linux-aio.c index d92513b..11ac828 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -38,11 +38,20 @@ struct qemu_laiocb { QLIST_ENTRY(qemu_laiocb) node; }; +/* + * TODO: support to batch I/O from multiple bs in one same + * AIO context, one important use case is multi-lun scsi, + * so in future the IO queue should be per AIO context. + */ typedef struct { struct iocb *iocbs[MAX_QUEUED_IO]; int plugged; unsigned int size; unsigned int idx; + + /* abort queued requests in BH context */ + QEMUBH *abort_bh; + bool aborting; } LaioQueue; struct qemu_laio_state { @@ -59,6 +68,8 @@ struct qemu_laio_state { int event_max; }; +static int ioq_submit(struct qemu_laio_state *s); + static inline ssize_t io_event_ret(struct io_event *ev) { return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, qemu_aio_unref(laiocb); } +static void qemu_laio_start_retry(struct qemu_laio_state *s) +{ + if (s->io_q.idx) { + ioq_submit(s); + } +} + /* The completion BH fetches completed I/O requests and invokes their * callbacks. * @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque) qemu_laio_process_completion(s, laiocb); } + + qemu_laio_start_retry(s); } static void qemu_laio_completion_cb(EventNotifier *e) @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q) io_q->size = MAX_QUEUED_IO; io_q->idx = 0; io_q->plugged = 0; + io_q->aborting = false; } +/* Always return >= 0 and it means how many requests are submitted */ static int ioq_submit(struct qemu_laio_state *s) { - int ret, i = 0; + int ret; int len = s->io_q.idx; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); - - /* empty io queue */ - s->io_q.idx = 0; + if (!len) { + return 0; + } + ret = io_submit(s->ctx, len, s->io_q.iocbs); if (ret < 0) { - i = 0; - } else { - i = ret; + /* retry in following completion cb */ + if (ret == -EAGAIN) { + return 0; + } + + /* + * Abort in BH context for avoiding Co-routine re-entered, + * and update io queue at that time + */ + qemu_bh_schedule(s->io_q.abort_bh); + s->io_q.aborting = true; + ret = 0; } - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); + /* + * update io queue, and retry will be started automatically + * in following completion cb for the remainder + */ + if (ret > 0) { + if (ret < len) { + memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret], + (len - ret) * sizeof(struct iocb *)); + } + s->io_q.idx -= ret; + } - laiocb->ret = (ret < 0) ? ret : -EIO; + return ret; +} + +static void ioq_abort_bh(void *opaque) +{ + struct qemu_laio_state *s = opaque; + int i; + + for (i = 0; i < s->io_q.idx; i++) { + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], + struct qemu_laiocb, + iocb); + laiocb->ret = -EIO; qemu_laio_process_completion(s, laiocb); } - return ret; + + s->io_q.idx = 0; + s->io_q.aborting = false; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + /* Request can't be allowed to submit until aborting is handled */ + if (unlikely(s->io_q.aborting)) { + return -EIO; + } + + if (unlikely(idx == s->io_q.size)) { + ioq_submit(s); + + if (unlikely(s->io_q.aborting)) { + return -EIO; + } + idx = s->io_q.idx; + } + + /* It has to return now if queue is still full */ + if (unlikely(idx == s->io_q.size)) { + return -EAGAIN; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { + /* submit immediately if queue depth is above 2/3 */ + if (idx > s->io_q.size * 2 / 3) { ioq_submit(s); } + + return 0; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) @@ -281,7 +353,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, goto out_free_aiocb; } } else { - ioq_enqueue(s, iocbs); + if (ioq_enqueue(s, iocbs) < 0) { + goto out_free_aiocb; + } } return &laiocb->common; @@ -296,12 +370,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) aio_set_event_notifier(old_context, &s->e, NULL); qemu_bh_delete(s->completion_bh); + qemu_bh_delete(s->io_q.abort_bh); } void laio_attach_aio_context(void *s_, AioContext *new_context) { struct qemu_laio_state *s = s_; + s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s); s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); }