@@ -56,6 +56,7 @@ typedef struct {
} LaioQueue;
struct qemu_laio_state {
+ unsigned long pending;
io_context_t ctx;
EventNotifier e;
@@ -98,6 +99,7 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
}
}
}
+ s->pending--;
laiocb->common.cb(laiocb->common.opaque, ret);
qemu_aio_unref(laiocb);
@@ -179,6 +181,7 @@ static void laio_cancel(BlockAIOCB *blockacb)
return;
}
+ laiocb->ctx->pending--;
laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
}
@@ -280,8 +283,13 @@ static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
s->io_q.iocbs[idx++] = iocb;
s->io_q.idx = idx;
- /* submit immediately if queue depth is above 2/3 */
- if (idx > s->io_q.size * 2 / 3) {
+ /*
+ * This is reached in two cases: queue not plugged but io_submit
+ * returned -EAGAIN, or queue plugged. In the latter case, start
+ * submitting some I/O if the queue is getting too full. In the
+ * former case, instead, wait until an I/O operation is completed.
+ */
+ if (s->io_q.plugged && unlikely(idx > s->io_q.size * 2 / 3)) {
ioq_submit(s);
}
@@ -346,15 +354,23 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
}
io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
- if (!s->io_q.plugged) {
- if (io_submit(s->ctx, 1, &iocbs) < 0) {
- goto out_free_aiocb;
- }
- } else {
- if (ioq_enqueue(s, iocbs) < 0) {
+ /* Switch to queue mode until -EAGAIN is handled */
+ if (!s->io_q.plugged && !s->io_q.idx) {
+ int ret = io_submit(s->ctx, 1, &iocbs);
+ if (ret >= 0) {
+ return &laiocb->common;
+ } else if (ret != -EAGAIN || (ret == -EAGAIN && !s->pending)) {
goto out_free_aiocb;
}
+ /*
+ * In case of -EAGAIN, only queue the req if there is pending
+ * I/O and it is resubmitted in completion of pending I/O
+ */
+ }
+ if (ioq_enqueue(s, iocbs) < 0) {
+ goto out_free_aiocb;
}
+ s->pending++;
return &laiocb->common;
out_free_aiocb: