diff mbox

[RFC,1/3] aio-posix: add aio_set_poll_handler()

Message ID 1478711602-12620-2-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi Nov. 9, 2016, 5:13 p.m. UTC
Poll handlers are executed for a certain amount of time before the event
loop polls file descriptors.  This can be used to keep the event loop
thread scheduled and may therefore recognize events faster than blocking
poll(2) calls.

This is an experimental feature to reduce I/O latency in high IOPS
scenarios.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 aio-posix.c         | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 include/block/aio.h |  16 +++++++
 2 files changed, 149 insertions(+)

Comments

Paolo Bonzini Nov. 9, 2016, 5:30 p.m. UTC | #1
On 09/11/2016 18:13, Stefan Hajnoczi wrote:
> Poll handlers are executed for a certain amount of time before the event
> loop polls file descriptors.  This can be used to keep the event loop
> thread scheduled and may therefore recognize events faster than blocking
> poll(2) calls.
> 
> This is an experimental feature to reduce I/O latency in high IOPS
> scenarios.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  aio-posix.c         | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  include/block/aio.h |  16 +++++++
>  2 files changed, 149 insertions(+)
> 
> diff --git a/aio-posix.c b/aio-posix.c
> index e13b9ab..933a972 100644
> --- a/aio-posix.c
> +++ b/aio-posix.c
> @@ -18,6 +18,7 @@
>  #include "block/block.h"
>  #include "qemu/queue.h"
>  #include "qemu/sockets.h"
> +#include "qemu/cutils.h"
>  #ifdef CONFIG_EPOLL_CREATE1
>  #include <sys/epoll.h>
>  #endif
> @@ -33,6 +34,19 @@ struct AioHandler
>      QLIST_ENTRY(AioHandler) node;
>  };
>  
> +struct AioPollHandler {
> +    QLIST_ENTRY(AioPollHandler) node;
> +
> +    AioPollFn *poll_fn;     /* check whether to invoke io_fn() */
> +    IOHandler *io_fn;       /* handler callback */
> +    void *opaque;           /* user-defined argument to callbacks */
> +
> +    bool deleted;
> +};
> +
> +/* How long to poll AioPollHandlers before monitoring file descriptors */
> +static int64_t aio_poll_max_ns;
> +
>  #ifdef CONFIG_EPOLL_CREATE1
>  
>  /* The fd number threashold to switch to epoll */
> @@ -264,8 +278,61 @@ void aio_set_event_notifier(AioContext *ctx,
>                         is_external, (IOHandler *)io_read, NULL, notifier);
>  }
>  
> +static AioPollHandler *find_aio_poll_handler(AioContext *ctx,
> +                                             AioPollFn *poll_fn,
> +                                             void *opaque)
> +{
> +    AioPollHandler *node;
> +
> +    QLIST_FOREACH(node, &ctx->aio_poll_handlers, node) {
> +        if (node->poll_fn == poll_fn &&
> +            node->opaque == opaque) {
> +            if (!node->deleted) {
> +                return node;
> +            }
> +        }
> +    }
> +
> +    return NULL;
> +}
> +
> +void aio_set_poll_handler(AioContext *ctx,
> +                          AioPollFn *poll_fn,
> +                          IOHandler *io_fn,
> +                          void *opaque)
> +{
> +    AioPollHandler *node;
> +
> +    node = find_aio_poll_handler(ctx, poll_fn, opaque);
> +    if (!io_fn) { /* remove */
> +        if (!node) {
> +            return;
> +        }
> +
> +        if (ctx->walking_poll_handlers) {
> +            node->deleted = true;
> +        } else {
> +            QLIST_REMOVE(node, node);
> +            g_free(node);
> +        }
> +    } else { /* add or update */
> +        if (!node) {
> +            node = g_new(AioPollHandler, 1);
> +            QLIST_INSERT_HEAD(&ctx->aio_poll_handlers, node, node);
> +        }
> +
> +        node->poll_fn = poll_fn;
> +        node->io_fn = io_fn;
> +        node->opaque = opaque;
> +    }
> +
> +    aio_notify(ctx);
> +}
> +
> +
>  bool aio_prepare(AioContext *ctx)
>  {
> +    /* TODO run poll handlers? */
>      return false;
>  }
>  
> @@ -400,6 +467,47 @@ static void add_pollfd(AioHandler *node)
>      npfd++;
>  }
>  
> +static bool run_poll_handlers(AioContext *ctx)
> +{
> +    int64_t start_time;
> +    unsigned int loop_count = 0;
> +    bool fired = false;
> +
> +    /* Is there any polling to be done? */

I think the question is not "is there any polling to be done" but rather
"is there anything that requires looking at a file descriptor".  If you
have e.g. an NBD device on the AioContext you cannot poll.  On the other
hand if all you have is bottom halves (which you can poll with
ctx->notified), AIO and virtio ioeventfds, you can poll.

In particular, testing for bottom halves is necessary to avoid incurring
extra latency on flushes, which use the thread pool.

Perhaps the poll handler could be a parameter to aio_set_event_notifier?
 run_poll_handlers can just set revents (to G_IO_IN for example) if the
polling handler returns true, and return true as well.  aio_poll can
then call aio_notify_accept and aio_dispatch, bypassing the poll system
call altogether.

Thanks,

Paolo

> +    if (!QLIST_FIRST(&ctx->aio_poll_handlers)) {
> +        return false;
> +    }
> +
> +    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> +    while (!fired) {
> +        AioPollHandler *node;
> +        AioPollHandler *tmp;
> +
> +        QLIST_FOREACH_SAFE(node, &ctx->aio_poll_handlers, node, tmp) {
> +            ctx->walking_poll_handlers++;
> +            if (!node->deleted && node->poll_fn(node->opaque)) {
> +                node->io_fn(node->opaque);
> +                fired = true;
> +            }
> +            ctx->walking_poll_handlers--;
> +
> +            if (!ctx->walking_poll_handlers && node->deleted) {
> +                QLIST_REMOVE(node, node);
> +                g_free(node);
> +            }
> +        }
> +
> +        loop_count++;
> +        if ((loop_count % 1024) == 0 &&
> +            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time >
> +            aio_poll_max_ns) {
> +            break;
> +        }
> +    }
> +
> +    return fired;
> +}
> +
>  bool aio_poll(AioContext *ctx, bool blocking)
>  {
>      AioHandler *node;
> @@ -410,6 +518,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
>      aio_context_acquire(ctx);
>      progress = false;
>  
> +    if (aio_poll_max_ns &&
> +        /* see qemu_soonest_timeout() uint64_t hack */
> +        (uint64_t)aio_compute_timeout(ctx) > (uint64_t)aio_poll_max_ns) {
> +        if (run_poll_handlers(ctx)) {
> +            progress = true;
> +            blocking = false; /* poll again, don't block */
> +        }
> +    }
> +
>      /* aio_notify can avoid the expensive event_notifier_set if
>       * everything (file descriptors, bottom halves, timers) will
>       * be re-evaluated before the next blocking poll().  This is
> @@ -484,6 +601,22 @@ bool aio_poll(AioContext *ctx, bool blocking)
>  
>  void aio_context_setup(AioContext *ctx)
>  {
> +    if (!aio_poll_max_ns) {
> +        int64_t val;
> +        const char *env_str = getenv("QEMU_AIO_POLL_MAX_NS");
> +
> +        if (!env_str) {
> +            env_str = "0";
> +        }
> +
> +        if (!qemu_strtoll(env_str, NULL, 10, &val)) {
> +            aio_poll_max_ns = val;
> +        } else {
> +            fprintf(stderr, "Unable to parse QEMU_AIO_POLL_MAX_NS "
> +                            "environment variable\n");
> +        }
> +    }
> +
>  #ifdef CONFIG_EPOLL_CREATE1
>      assert(!ctx->epollfd);
>      ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
> diff --git a/include/block/aio.h b/include/block/aio.h
> index c7ae27c..2be1955 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -42,8 +42,10 @@ void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
>  void qemu_aio_unref(void *p);
>  void qemu_aio_ref(void *p);
>  
> +typedef struct AioPollHandler AioPollHandler;
>  typedef struct AioHandler AioHandler;
>  typedef void QEMUBHFunc(void *opaque);
> +typedef bool AioPollFn(void *opaque);
>  typedef void IOHandler(void *opaque);
>  
>  struct ThreadPool;
> @@ -64,6 +66,15 @@ struct AioContext {
>       */
>      int walking_handlers;
>  
> +    /* The list of registered AIO poll handlers */
> +    QLIST_HEAD(, AioPollHandler) aio_poll_handlers;
> +
> +    /* This is a simple lock used to protect the aio_poll_handlers list.
> +     * Specifically, it's used to ensure that no callbacks are removed while
> +     * we're walking and dispatching callbacks.
> +     */
> +    int walking_poll_handlers;
> +
>      /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
>       * accessed with atomic primitives.  If this field is 0, everything
>       * (file descriptors, bottom halves, timers) will be re-evaluated
> @@ -327,6 +338,11 @@ void aio_set_fd_handler(AioContext *ctx,
>                          IOHandler *io_write,
>                          void *opaque);
>  
> +void aio_set_poll_handler(AioContext *ctx,
> +                          AioPollFn *poll_fn,
> +                          IOHandler *io_fn,
> +                          void *opaque);
> +
>  /* Register an event notifier and associated callbacks.  Behaves very similarly
>   * to event_notifier_set_handler.  Unlike event_notifier_set_handler, these callbacks
>   * will be invoked when using aio_poll().
>
Stefan Hajnoczi Nov. 10, 2016, 10:17 a.m. UTC | #2
On Wed, Nov 09, 2016 at 06:30:11PM +0100, Paolo Bonzini wrote:

Thanks for the feedback.  I hope that Karl will be able to find a
QEMU_AIO_POLL_MAX_NS setting that improves the benchmark.  At that point
I'll send a new version of this series so we can iron out the details.

> > +static bool run_poll_handlers(AioContext *ctx)
> > +{
> > +    int64_t start_time;
> > +    unsigned int loop_count = 0;
> > +    bool fired = false;
> > +
> > +    /* Is there any polling to be done? */
> 
> I think the question is not "is there any polling to be done" but rather
> "is there anything that requires looking at a file descriptor".  If you
> have e.g. an NBD device on the AioContext you cannot poll.  On the other
> hand if all you have is bottom halves (which you can poll with
> ctx->notified), AIO and virtio ioeventfds, you can poll.

This is a good point.  Polling should only be done if all resources in
the AioContext benefit from polling - otherwise it adds latency to
resources that don't support polling.

Another thing: only poll if there is work to be done.  Linux AIO must
only poll the ring when there are >0 requests outstanding.  Currently it
always polls (doh!).

> In particular, testing for bottom halves is necessary to avoid incurring
> extra latency on flushes, which use the thread pool.

The current code uses a half-solution: it uses aio_compute_timeout() to
see if any existing BHs are ready to execute *before* beginning to poll.

Really we should poll BHs since they can be scheduled during the polling
loop.

> Perhaps the poll handler could be a parameter to aio_set_event_notifier?
>  run_poll_handlers can just set revents (to G_IO_IN for example) if the
> polling handler returns true, and return true as well.  aio_poll can
> then call aio_notify_accept and aio_dispatch, bypassing the poll system
> call altogether.

This is problematic.  The poll source != file descriptor so there is a
race condition:

1. Guest increments virtqueue avail.idx

2. QEMU poll notices avail.idx update and marks fd.revents readable.

3. QEMU dispatches fd handler:

void virtio_queue_host_notifier_read(EventNotifier *n)
{
    VirtQueue *vq = container_of(n, VirtQueue, host_notifier);
    if (event_notifier_test_and_clear(n)) {
        virtio_queue_notify_vq(vq);
    }
}

4. Guest kicks virtqueue -> ioeventfd is signalled

Unfortunately polling is "too fast" and event_notifier_test_and_clear()
returns false; we won't process the virtqueue!

Pretending that polling is the same as fd monitoring only works when #4
happens before #3.  We have to solve this race condition.

The simplest solution is to get rid of the if statement (i.e. enable
spurious event processing).  Not sure if that has a drawback though.

Do you have a nicer solution in mind?
Paolo Bonzini Nov. 10, 2016, 1:20 p.m. UTC | #3
On Thursday, November 10, 2016 11:17:35 AM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> > I think the question is not "is there any polling to be done" but rather
> > "is there anything that requires looking at a file descriptor".  If you
> > have e.g. an NBD device on the AioContext you cannot poll.  On the other
> > hand if all you have is bottom halves (which you can poll with
> > ctx->notified), AIO and virtio ioeventfds, you can poll.
> 
> This is a good point.  Polling should only be done if all resources in
> the AioContext benefit from polling - otherwise it adds latency to
> resources that don't support polling.
> 
> Another thing: only poll if there is work to be done.  Linux AIO must
> only poll the ring when there are >0 requests outstanding.  Currently it
> always polls (doh!).

Good idea.  So the result of polling callback could be one of ready, not
ready and not active?  Or did you have in mind something else?

> > In particular, testing for bottom halves is necessary to avoid incurring
> > extra latency on flushes, which use the thread pool.
> 
> The current code uses a half-solution: it uses aio_compute_timeout() to
> see if any existing BHs are ready to execute *before* beginning to poll.
> 
> Really we should poll BHs since they can be scheduled during the polling
> loop.

We should do so for correctness (hopefully with just ctx->notified: there
should be no need to walk the BH list during polling).  However, the user
of the BH should activate polling "manually" by registering its own
polling handler: if there are no active polling handlers, just ignore
bottom halves and do the poll().

This is because there are always a handful of registered bottom halves, but
they are not necessarily "activatable" from other threads.  For example the
thread pool always has one BH but as you noticed for Linux AIO, it may not
have any pending requests.  So the thread pool would still have to register
with aio_set_poll_handler, even if it uses bottom halves internally for
the signaling.  I guess it would not need to register an associated IOHandler,
since it can just use aio_bh_poll.

A couple more random observations:

- you can pass the output of aio_compute_timeout(ctx) to run_poll_handlers,
like MIN((uint64_t)aio_compute_timeout(ctx), (uint64_t)aio_poll_max_ns).

- since we know that all resources are pollable, we don't need to poll() at
all if polling succeeds (though we do need aio_notify_accept()+aio_bh_poll()).

> > Perhaps the poll handler could be a parameter to aio_set_event_notifier?
> >  run_poll_handlers can just set revents (to G_IO_IN for example) if the
> > polling handler returns true, and return true as well.  aio_poll can
> > then call aio_notify_accept and aio_dispatch, bypassing the poll system
> > call altogether.
> 
> This is problematic.  The poll source != file descriptor so there is a
> race condition:
> 
> 1. Guest increments virtqueue avail.idx
> 2. QEMU poll notices avail.idx update and marks fd.revents readable.
> 3. QEMU dispatches fd handler:
> 4. Guest kicks virtqueue -> ioeventfd is signalled
> 
> Unfortunately polling is "too fast" and event_notifier_test_and_clear()
> returns false; we won't process the virtqueue!
> 
> Pretending that polling is the same as fd monitoring only works when #4
> happens before #3.  We have to solve this race condition.
> 
> The simplest solution is to get rid of the if statement (i.e. enable
> spurious event processing).  Not sure if that has a drawback though.
> Do you have a nicer solution in mind?

No, I don't.  Removing the if seems sensible, but I like the polling handler
more now that I know why it's there.  The event_notifier_test_and_clear does
add a small latency.

On one hand, because you need to check if *all* "resources"
support polling, you need a common definition of "resource" (e.g.
aio_set_fd_handler).  But on the other hand it would be nice to have
a streamlined polling callback.  I guess you could add something like
aio_set_handler that takes a struct with all interesting callbacks:

- in/out callbacks (for aio_set_fd_handler)
- polling handler
- polling callback

Then there would be simplified interfaces on top, such as aio_set_fd_handler,
aio_set_event_notifier and your own aio_set_poll_handler.

Paolo
Fam Zheng Nov. 15, 2016, 8:14 p.m. UTC | #4
On Wed, 11/09 17:13, Stefan Hajnoczi wrote:
> +struct AioPollHandler {
> +    QLIST_ENTRY(AioPollHandler) node;
> +
> +    AioPollFn *poll_fn;     /* check whether to invoke io_fn() */
> +    IOHandler *io_fn;       /* handler callback */
> +    void *opaque;           /* user-defined argument to callbacks */
> +
> +    bool deleted;
> +};

<...>

> +    } else { /* add or update */
> +        if (!node) {
> +            node = g_new(AioPollHandler, 1);
> +            QLIST_INSERT_HEAD(&ctx->aio_poll_handlers, node, node);
> +        }
> +
> +        node->poll_fn = poll_fn;
> +        node->io_fn = io_fn;
> +        node->opaque = opaque;

Ouch, "deleted" is not initialzed and may cause the node to be removed at next
run_poll_handlers() call! :(

This is the cause of the jumpy numbers I saw, with it fixed I expect the
behavior will be much more consistent.

Fam

> +    }
> +
> +    aio_notify(ctx);
> +}
> +
> +
>  bool aio_prepare(AioContext *ctx)
>  {
> +    /* TODO run poll handlers? */
>      return false;
>  }
>  
> @@ -400,6 +467,47 @@ static void add_pollfd(AioHandler *node)
>      npfd++;
>  }
>  
> +static bool run_poll_handlers(AioContext *ctx)
> +{
> +    int64_t start_time;
> +    unsigned int loop_count = 0;
> +    bool fired = false;
> +
> +    /* Is there any polling to be done? */
> +    if (!QLIST_FIRST(&ctx->aio_poll_handlers)) {
> +        return false;
> +    }
> +
> +    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> +    while (!fired) {
> +        AioPollHandler *node;
> +        AioPollHandler *tmp;
> +
> +        QLIST_FOREACH_SAFE(node, &ctx->aio_poll_handlers, node, tmp) {
> +            ctx->walking_poll_handlers++;
> +            if (!node->deleted && node->poll_fn(node->opaque)) {
> +                node->io_fn(node->opaque);
> +                fired = true;
> +            }
> +            ctx->walking_poll_handlers--;
> +
> +            if (!ctx->walking_poll_handlers && node->deleted) {
> +                QLIST_REMOVE(node, node);
> +                g_free(node);
> +            }
> +        }
> +
> +        loop_count++;
> +        if ((loop_count % 1024) == 0 &&
> +            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time >
> +            aio_poll_max_ns) {
> +            break;
> +        }
> +    }
> +
> +    return fired;
> +}
> +
>  bool aio_poll(AioContext *ctx, bool blocking)
>  {
>      AioHandler *node;
> @@ -410,6 +518,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
>      aio_context_acquire(ctx);
>      progress = false;
>  
> +    if (aio_poll_max_ns &&
> +        /* see qemu_soonest_timeout() uint64_t hack */
> +        (uint64_t)aio_compute_timeout(ctx) > (uint64_t)aio_poll_max_ns) {
> +        if (run_poll_handlers(ctx)) {
> +            progress = true;
> +            blocking = false; /* poll again, don't block */
> +        }
> +    }
> +
>      /* aio_notify can avoid the expensive event_notifier_set if
>       * everything (file descriptors, bottom halves, timers) will
>       * be re-evaluated before the next blocking poll().  This is
> @@ -484,6 +601,22 @@ bool aio_poll(AioContext *ctx, bool blocking)
>  
>  void aio_context_setup(AioContext *ctx)
>  {
> +    if (!aio_poll_max_ns) {
> +        int64_t val;
> +        const char *env_str = getenv("QEMU_AIO_POLL_MAX_NS");
> +
> +        if (!env_str) {
> +            env_str = "0";
> +        }
> +
> +        if (!qemu_strtoll(env_str, NULL, 10, &val)) {
> +            aio_poll_max_ns = val;
> +        } else {
> +            fprintf(stderr, "Unable to parse QEMU_AIO_POLL_MAX_NS "
> +                            "environment variable\n");
> +        }
> +    }
> +
>  #ifdef CONFIG_EPOLL_CREATE1
>      assert(!ctx->epollfd);
>      ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
> diff --git a/include/block/aio.h b/include/block/aio.h
> index c7ae27c..2be1955 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -42,8 +42,10 @@ void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
>  void qemu_aio_unref(void *p);
>  void qemu_aio_ref(void *p);
>  
> +typedef struct AioPollHandler AioPollHandler;
>  typedef struct AioHandler AioHandler;
>  typedef void QEMUBHFunc(void *opaque);
> +typedef bool AioPollFn(void *opaque);
>  typedef void IOHandler(void *opaque);
>  
>  struct ThreadPool;
> @@ -64,6 +66,15 @@ struct AioContext {
>       */
>      int walking_handlers;
>  
> +    /* The list of registered AIO poll handlers */
> +    QLIST_HEAD(, AioPollHandler) aio_poll_handlers;
> +
> +    /* This is a simple lock used to protect the aio_poll_handlers list.
> +     * Specifically, it's used to ensure that no callbacks are removed while
> +     * we're walking and dispatching callbacks.
> +     */
> +    int walking_poll_handlers;
> +
>      /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
>       * accessed with atomic primitives.  If this field is 0, everything
>       * (file descriptors, bottom halves, timers) will be re-evaluated
> @@ -327,6 +338,11 @@ void aio_set_fd_handler(AioContext *ctx,
>                          IOHandler *io_write,
>                          void *opaque);
>  
> +void aio_set_poll_handler(AioContext *ctx,
> +                          AioPollFn *poll_fn,
> +                          IOHandler *io_fn,
> +                          void *opaque);
> +
>  /* Register an event notifier and associated callbacks.  Behaves very similarly
>   * to event_notifier_set_handler.  Unlike event_notifier_set_handler, these callbacks
>   * will be invoked when using aio_poll().
> -- 
> 2.7.4
>
diff mbox

Patch

diff --git a/aio-posix.c b/aio-posix.c
index e13b9ab..933a972 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -18,6 +18,7 @@ 
 #include "block/block.h"
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
+#include "qemu/cutils.h"
 #ifdef CONFIG_EPOLL_CREATE1
 #include <sys/epoll.h>
 #endif
@@ -33,6 +34,19 @@  struct AioHandler
     QLIST_ENTRY(AioHandler) node;
 };
 
+struct AioPollHandler {
+    QLIST_ENTRY(AioPollHandler) node;
+
+    AioPollFn *poll_fn;     /* check whether to invoke io_fn() */
+    IOHandler *io_fn;       /* handler callback */
+    void *opaque;           /* user-defined argument to callbacks */
+
+    bool deleted;
+};
+
+/* How long to poll AioPollHandlers before monitoring file descriptors */
+static int64_t aio_poll_max_ns;
+
 #ifdef CONFIG_EPOLL_CREATE1
 
 /* The fd number threashold to switch to epoll */
@@ -264,8 +278,61 @@  void aio_set_event_notifier(AioContext *ctx,
                        is_external, (IOHandler *)io_read, NULL, notifier);
 }
 
+static AioPollHandler *find_aio_poll_handler(AioContext *ctx,
+                                             AioPollFn *poll_fn,
+                                             void *opaque)
+{
+    AioPollHandler *node;
+
+    QLIST_FOREACH(node, &ctx->aio_poll_handlers, node) {
+        if (node->poll_fn == poll_fn &&
+            node->opaque == opaque) {
+            if (!node->deleted) {
+                return node;
+            }
+        }
+    }
+
+    return NULL;
+}
+
+void aio_set_poll_handler(AioContext *ctx,
+                          AioPollFn *poll_fn,
+                          IOHandler *io_fn,
+                          void *opaque)
+{
+    AioPollHandler *node;
+
+    node = find_aio_poll_handler(ctx, poll_fn, opaque);
+    if (!io_fn) { /* remove */
+        if (!node) {
+            return;
+        }
+
+        if (ctx->walking_poll_handlers) {
+            node->deleted = true;
+        } else {
+            QLIST_REMOVE(node, node);
+            g_free(node);
+        }
+    } else { /* add or update */
+        if (!node) {
+            node = g_new(AioPollHandler, 1);
+            QLIST_INSERT_HEAD(&ctx->aio_poll_handlers, node, node);
+        }
+
+        node->poll_fn = poll_fn;
+        node->io_fn = io_fn;
+        node->opaque = opaque;
+    }
+
+    aio_notify(ctx);
+}
+
+
 bool aio_prepare(AioContext *ctx)
 {
+    /* TODO run poll handlers? */
     return false;
 }
 
@@ -400,6 +467,47 @@  static void add_pollfd(AioHandler *node)
     npfd++;
 }
 
+static bool run_poll_handlers(AioContext *ctx)
+{
+    int64_t start_time;
+    unsigned int loop_count = 0;
+    bool fired = false;
+
+    /* Is there any polling to be done? */
+    if (!QLIST_FIRST(&ctx->aio_poll_handlers)) {
+        return false;
+    }
+
+    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+    while (!fired) {
+        AioPollHandler *node;
+        AioPollHandler *tmp;
+
+        QLIST_FOREACH_SAFE(node, &ctx->aio_poll_handlers, node, tmp) {
+            ctx->walking_poll_handlers++;
+            if (!node->deleted && node->poll_fn(node->opaque)) {
+                node->io_fn(node->opaque);
+                fired = true;
+            }
+            ctx->walking_poll_handlers--;
+
+            if (!ctx->walking_poll_handlers && node->deleted) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+        }
+
+        loop_count++;
+        if ((loop_count % 1024) == 0 &&
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time >
+            aio_poll_max_ns) {
+            break;
+        }
+    }
+
+    return fired;
+}
+
 bool aio_poll(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
@@ -410,6 +518,15 @@  bool aio_poll(AioContext *ctx, bool blocking)
     aio_context_acquire(ctx);
     progress = false;
 
+    if (aio_poll_max_ns &&
+        /* see qemu_soonest_timeout() uint64_t hack */
+        (uint64_t)aio_compute_timeout(ctx) > (uint64_t)aio_poll_max_ns) {
+        if (run_poll_handlers(ctx)) {
+            progress = true;
+            blocking = false; /* poll again, don't block */
+        }
+    }
+
     /* aio_notify can avoid the expensive event_notifier_set if
      * everything (file descriptors, bottom halves, timers) will
      * be re-evaluated before the next blocking poll().  This is
@@ -484,6 +601,22 @@  bool aio_poll(AioContext *ctx, bool blocking)
 
 void aio_context_setup(AioContext *ctx)
 {
+    if (!aio_poll_max_ns) {
+        int64_t val;
+        const char *env_str = getenv("QEMU_AIO_POLL_MAX_NS");
+
+        if (!env_str) {
+            env_str = "0";
+        }
+
+        if (!qemu_strtoll(env_str, NULL, 10, &val)) {
+            aio_poll_max_ns = val;
+        } else {
+            fprintf(stderr, "Unable to parse QEMU_AIO_POLL_MAX_NS "
+                            "environment variable\n");
+        }
+    }
+
 #ifdef CONFIG_EPOLL_CREATE1
     assert(!ctx->epollfd);
     ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
diff --git a/include/block/aio.h b/include/block/aio.h
index c7ae27c..2be1955 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -42,8 +42,10 @@  void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
 void qemu_aio_unref(void *p);
 void qemu_aio_ref(void *p);
 
+typedef struct AioPollHandler AioPollHandler;
 typedef struct AioHandler AioHandler;
 typedef void QEMUBHFunc(void *opaque);
+typedef bool AioPollFn(void *opaque);
 typedef void IOHandler(void *opaque);
 
 struct ThreadPool;
@@ -64,6 +66,15 @@  struct AioContext {
      */
     int walking_handlers;
 
+    /* The list of registered AIO poll handlers */
+    QLIST_HEAD(, AioPollHandler) aio_poll_handlers;
+
+    /* This is a simple lock used to protect the aio_poll_handlers list.
+     * Specifically, it's used to ensure that no callbacks are removed while
+     * we're walking and dispatching callbacks.
+     */
+    int walking_poll_handlers;
+
     /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
      * accessed with atomic primitives.  If this field is 0, everything
      * (file descriptors, bottom halves, timers) will be re-evaluated
@@ -327,6 +338,11 @@  void aio_set_fd_handler(AioContext *ctx,
                         IOHandler *io_write,
                         void *opaque);
 
+void aio_set_poll_handler(AioContext *ctx,
+                          AioPollFn *poll_fn,
+                          IOHandler *io_fn,
+                          void *opaque);
+
 /* Register an event notifier and associated callbacks.  Behaves very similarly
  * to event_notifier_set_handler.  Unlike event_notifier_set_handler, these callbacks
  * will be invoked when using aio_poll().