diff mbox

[v3,2/7] posix-aio: Use QEMU poll interface

Message ID 1429160256-27231-3-git-send-email-famz@redhat.com
State New
Headers show

Commit Message

Fam Zheng April 16, 2015, 4:57 a.m. UTC
The AIO handler list is only modified by aio_set_fd_handler, so we can
easily add del poll fd there. Initialize a QEMUPoll and keep track of
all the fds, so we don't need to rebuild a GPollFD array for g_poll in
aio_poll.

Signed-off-by: Fam Zheng <famz@redhat.com>
---
 aio-posix.c         | 52 +++++++++++++++++++++-------------------------------
 async.c             |  5 +++--
 include/block/aio.h |  7 +++++--
 3 files changed, 29 insertions(+), 35 deletions(-)
diff mbox

Patch

diff --git a/aio-posix.c b/aio-posix.c
index cbd4c34..5c81fe6 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -17,6 +17,7 @@ 
 #include "block/block.h"
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
+#include "qemu/poll.h"
 
 struct AioHandler
 {
@@ -55,6 +56,7 @@  void aio_set_fd_handler(AioContext *ctx,
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write) {
         if (node) {
+            qemu_poll_del(ctx->qpoll, fd);
             g_source_remove_poll(&ctx->source, &node->pfd);
 
             /* If the lock is held, just mark the node as deleted */
@@ -71,13 +73,15 @@  void aio_set_fd_handler(AioContext *ctx,
             }
         }
     } else {
-        if (node == NULL) {
+        if (node) {
+            /* Remove the old */
+            qemu_poll_del(ctx->qpoll, fd);
+            g_source_remove_poll(&ctx->source, &node->pfd);
+        } else {
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
             QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
-
-            g_source_add_poll(&ctx->source, &node->pfd);
         }
         /* Update handler with latest information */
         node->io_read = io_read;
@@ -87,6 +91,8 @@  void aio_set_fd_handler(AioContext *ctx,
 
         node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
         node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
+        qemu_poll_add(ctx->qpoll, fd, node->pfd.events, node);
+        g_source_add_poll(&ctx->source, &node->pfd);
     }
 
     aio_notify(ctx);
@@ -191,6 +197,7 @@  bool aio_poll(AioContext *ctx, bool blocking)
     AioHandler *node;
     bool was_dispatching;
     int ret;
+    int i;
     bool progress;
 
     was_dispatching = ctx->dispatching;
@@ -208,38 +215,21 @@  bool aio_poll(AioContext *ctx, bool blocking)
      */
     aio_set_dispatching(ctx, !blocking);
 
-    ctx->walking_handlers++;
-
-    g_array_set_size(ctx->pollfds, 0);
-
-    /* fill pollfds */
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-        node->pollfds_idx = -1;
-        if (!node->deleted && node->pfd.events) {
-            GPollFD pfd = {
-                .fd = node->pfd.fd,
-                .events = node->pfd.events,
-            };
-            node->pollfds_idx = ctx->pollfds->len;
-            g_array_append_val(ctx->pollfds, pfd);
-        }
-    }
-
-    ctx->walking_handlers--;
-
     /* wait until next event */
-    ret = qemu_poll_ns((GPollFD *)ctx->pollfds->data,
-                         ctx->pollfds->len,
-                         blocking ? aio_compute_timeout(ctx) : 0);
+    ret = qemu_poll(ctx->qpoll, blocking ? aio_compute_timeout(ctx) : 0);
 
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-            if (node->pollfds_idx != -1) {
-                GPollFD *pfd = &g_array_index(ctx->pollfds, GPollFD,
-                                              node->pollfds_idx);
-                node->pfd.revents = pfd->revents;
-            }
+        int r;
+        g_array_set_size(ctx->events, ret);
+        r = qemu_poll_get_events(ctx->qpoll,
+                                (QEMUPollEvent *)ctx->events->data,
+                                ret);
+        assert(r == ret);
+        for (i = 0; i < r; i++) {
+            QEMUPollEvent *e = &g_array_index(ctx->events, QEMUPollEvent, i);
+            node = e->opaque;
+            node->pfd.revents = e->revents;
         }
     }
 
diff --git a/async.c b/async.c
index 2b51e87..dd5d62f 100644
--- a/async.c
+++ b/async.c
@@ -27,6 +27,7 @@ 
 #include "block/thread-pool.h"
 #include "qemu/main-loop.h"
 #include "qemu/atomic.h"
+#include "qemu/poll.h"
 
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
@@ -230,7 +231,6 @@  aio_ctx_finalize(GSource     *source)
     event_notifier_cleanup(&ctx->notifier);
     rfifolock_destroy(&ctx->lock);
     qemu_mutex_destroy(&ctx->bh_lock);
-    g_array_free(ctx->pollfds, TRUE);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -299,10 +299,11 @@  AioContext *aio_context_new(Error **errp)
         return NULL;
     }
     g_source_set_can_recurse(&ctx->source, true);
+    ctx->qpoll = qemu_poll_new();
+    ctx->events = g_array_new(FALSE, FALSE, sizeof(QEMUPollEvent));
     aio_set_event_notifier(ctx, &ctx->notifier,
                            (EventNotifierHandler *)
                            event_notifier_test_and_clear);
-    ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
     rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
diff --git a/include/block/aio.h b/include/block/aio.h
index 7d1e26b..af7edba 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -82,8 +82,11 @@  struct AioContext {
     /* Used for aio_notify.  */
     EventNotifier notifier;
 
-    /* GPollFDs for aio_poll() */
-    GArray *pollfds;
+    /* qemu_poll context */
+    QEMUPoll *qpoll;
+
+    /* QEMUPollEvents for qemu_poll_get_events() */
+    GArray *events;
 
     /* Thread pool for performing work and receiving completion callbacks */
     struct ThreadPool *thread_pool;