diff mbox

[09/17] aio: prepare for introducing GSource-based dispatch

Message ID 1348577763-12920-10-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Sept. 25, 2012, 12:55 p.m. UTC
This adds a GPollFD to each AioHandler.  It will then be possible to
attach these GPollFDs to a GSource, and from there to the main loop.
aio_wait examines the GPollFDs and avoids calling select() if any
is set (similar to what it does if bottom halves are available).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio.c      | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
 qemu-aio.h |  7 ++++++
 2 file modificati, 78 inserzioni(+), 11 rimozioni(-)

Comments

Anthony Liguori Sept. 25, 2012, 10:01 p.m. UTC | #1
Paolo Bonzini <pbonzini@redhat.com> writes:

> This adds a GPollFD to each AioHandler.  It will then be possible to
> attach these GPollFDs to a GSource, and from there to the main loop.
> aio_wait examines the GPollFDs and avoids calling select() if any
> is set (similar to what it does if bottom halves are available).
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  aio.c      | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
>  qemu-aio.h |  7 ++++++
>  2 file modificati, 78 inserzioni(+), 11 rimozioni(-)
>
> diff --git a/aio.c b/aio.c
> index 95ad467..c848a9f 100644
> --- a/aio.c
> +++ b/aio.c
> @@ -20,7 +20,7 @@
>  
>  struct AioHandler
>  {
> -    int fd;
> +    GPollFD pfd;
>      IOHandler *io_read;
>      IOHandler *io_write;
>      AioFlushHandler *io_flush;
> @@ -34,7 +34,7 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd)
>      AioHandler *node;
>  
>      QLIST_FOREACH(node, &ctx->aio_handlers, node) {
> -        if (node->fd == fd)
> +        if (node->pfd.fd == fd)
>              if (!node->deleted)
>                  return node;
>      }
> @@ -57,9 +57,10 @@ void aio_set_fd_handler(AioContext *ctx,
>      if (!io_read && !io_write) {
>          if (node) {
>              /* If the lock is held, just mark the node as deleted */
> -            if (ctx->walking_handlers)
> +            if (ctx->walking_handlers) {
>                  node->deleted = 1;
> -            else {
> +                node->pfd.revents = 0;
> +            } else {
>                  /* Otherwise, delete it for real.  We can't just mark it as
>                   * deleted because deleted nodes are only cleaned up after
>                   * releasing the walking_handlers lock.
> @@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx,
>          if (node == NULL) {
>              /* Alloc and insert if it's not already there */
>              node = g_malloc0(sizeof(AioHandler));
> -            node->fd = fd;
> +            node->pfd.fd = fd;
>              QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
>          }
>          /* Update handler with latest information */
> @@ -80,6 +81,10 @@ void aio_set_fd_handler(AioContext *ctx,
>          node->io_write = io_write;
>          node->io_flush = io_flush;
>          node->opaque = opaque;
> +
> +        node->pfd.events = G_IO_ERR;
> +        node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0);
> +        node->pfd.events |= (io_write ? G_IO_OUT : 0);
>      }

Should we even set G_IO_ERR?  I think that corresponds to exceptfd in
select() but we've never set that historically.  I know glib recommends
it but I don't think it's applicable to how we use it.

Moreover, the way you do dispatch, if G_IO_ERR did occur, we'd dispatch
both the read and write handlers which definitely isn't right.

I think it's easiest just to drop it.

>  }
>  
> @@ -93,6 +98,25 @@ void aio_set_event_notifier(AioContext *ctx,
>                         (AioFlushHandler *)io_flush, notifier);
>  }
>  
> +bool aio_pending(AioContext *ctx)
> +{
> +    AioHandler *node;
> +
> +    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
> +        int revents;
> +
> +        revents = node->pfd.revents & node->pfd.events;
> +        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
> +            return true;
> +        }
> +        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
> +            return true;
> +        }
> +    }
> +
> +    return false;
> +}
> +
>  bool aio_poll(AioContext *ctx, bool blocking)
>  {
>      static struct timeval tv0;
> @@ -114,6 +138,42 @@ bool aio_poll(AioContext *ctx, bool blocking)
>          progress = true;
>      }
>  
> +    /*
> +     * Then dispatch any pending callbacks from the GSource.
> +     *
> +     * We have to walk very carefully in case qemu_aio_set_fd_handler is
> +     * called while we're walking.
> +     */
> +    node = QLIST_FIRST(&ctx->aio_handlers);
> +    while (node) {
> +        AioHandler *tmp;
> +        int revents;
> +
> +        ctx->walking_handlers++;
> +
> +        revents = node->pfd.revents & node->pfd.events;
> +        node->pfd.revents &= ~revents;

This is interesting and I must admit I don't understand why it's
necessary.  What case are you trying to handle?

> +
> +        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
> +            node->io_read(node->opaque);
> +            progress = true;
> +        }
> +        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
> +            node->io_write(node->opaque);
> +            progress = true;
> +        }
> +
> +        tmp = node;
> +        node = QLIST_NEXT(node, node);
> +
> +        ctx->walking_handlers--;
> +
> +        if (!ctx->walking_handlers && tmp->deleted) {
> +            QLIST_REMOVE(tmp, node);
> +            g_free(tmp);
> +        }
> +    }
> +
>      if (progress && !blocking) {
>          return true;
>      }
> @@ -137,12 +197,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
>              busy = true;
>          }
>          if (!node->deleted && node->io_read) {
> -            FD_SET(node->fd, &rdfds);
> -            max_fd = MAX(max_fd, node->fd + 1);
> +            FD_SET(node->pfd.fd, &rdfds);
> +            max_fd = MAX(max_fd, node->pfd.fd + 1);
>          }
>          if (!node->deleted && node->io_write) {
> -            FD_SET(node->fd, &wrfds);
> -            max_fd = MAX(max_fd, node->fd + 1);
> +            FD_SET(node->pfd.fd, &wrfds);
> +            max_fd = MAX(max_fd, node->pfd.fd + 1);
>          }
>      }
>  
> @@ -167,12 +227,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
>              ctx->walking_handlers++;
>  
>              if (!node->deleted &&
> -                FD_ISSET(node->fd, &rdfds) &&
> +                FD_ISSET(node->pfd.fd, &rdfds) &&
>                  node->io_read) {
>                  node->io_read(node->opaque);
>              }
>              if (!node->deleted &&
> -                FD_ISSET(node->fd, &wrfds) &&
> +                FD_ISSET(node->pfd.fd, &wrfds) &&
>                  node->io_write) {
>                  node->io_write(node->opaque);
>              }
> diff --git a/qemu-aio.h b/qemu-aio.h
> index f19201e..ac24896 100644
> --- a/qemu-aio.h
> +++ b/qemu-aio.h
> @@ -133,6 +133,13 @@ void qemu_bh_delete(QEMUBH *bh);
>   * outstanding AIO operations have been completed or cancelled. */
>  void aio_flush(AioContext *ctx);
>  
> +/* Return whether there are any pending callbacks from the GSource
> + * attached to the AioContext.
> + *
> + * This is used internally in the implementation of the GSource.
> + */
> +bool aio_pending(AioContext *ctx);
> +
>  /* Progress in completing AIO work to occur.  This can issue new pending
>   * aio as a result of executing I/O completion or bh callbacks.
>   *
> -- 
> 1.7.12

Regards,

Anthony Liguori
Paolo Bonzini Sept. 26, 2012, 6:36 a.m. UTC | #2
Il 26/09/2012 00:01, Anthony Liguori ha scritto:
>> > +        node->pfd.events = G_IO_ERR;
>> > +        node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0);
>> > +        node->pfd.events |= (io_write ? G_IO_OUT : 0);
>> >      }
> Should we even set G_IO_ERR?  I think that corresponds to exceptfd

No, that would be G_IO_PRI.

> in select() but we've never set that historically.  I know glib recommends
> it but I don't think it's applicable to how we use it.
> 
> Moreover, the way you do dispatch, if G_IO_ERR did occur, we'd dispatch
> both the read and write handlers which definitely isn't right.

I'm not sure what gives POLLERR.  Probably a connect() that fails, and
in that case dispatching on the write handler would be okay.  But I was
not sure and calling both is safe: handlers have to be ready for
spurious wakeups anyway, it happens if qemu_aio_wait dispatches from a
VCPU thread before the main loop gets hold of the big lock.

> I think it's easiest just to drop it.

That's indeed the case, since the current code never sets either
G_IO_HUP or G_IO_ERR.

Paolo
Paolo Bonzini Sept. 26, 2012, 6:48 a.m. UTC | #3
Il 26/09/2012 00:01, Anthony Liguori ha scritto:
> > +        revents = node->pfd.revents & node->pfd.events;
> > +        node->pfd.revents &= ~revents;
> 
> This is interesting and I must admit I don't understand why it's
> necessary.  What case are you trying to handle?

That's for the case where you got a write event for fd Y, but disabled
the write handler from the handler of fd X (called before fd Y).  But
what the current code does is just eat the event, so I can do the same
and set node->pfd.revents to 0.

Paolo
Blue Swirl Sept. 29, 2012, 11:28 a.m. UTC | #4
On Tue, Sep 25, 2012 at 12:55 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> This adds a GPollFD to each AioHandler.  It will then be possible to
> attach these GPollFDs to a GSource, and from there to the main loop.
> aio_wait examines the GPollFDs and avoids calling select() if any
> is set (similar to what it does if bottom halves are available).
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  aio.c      | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
>  qemu-aio.h |  7 ++++++
>  2 file modificati, 78 inserzioni(+), 11 rimozioni(-)
>
> diff --git a/aio.c b/aio.c
> index 95ad467..c848a9f 100644
> --- a/aio.c
> +++ b/aio.c
> @@ -20,7 +20,7 @@
>
>  struct AioHandler
>  {
> -    int fd;
> +    GPollFD pfd;
>      IOHandler *io_read;
>      IOHandler *io_write;
>      AioFlushHandler *io_flush;
> @@ -34,7 +34,7 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd)
>      AioHandler *node;
>
>      QLIST_FOREACH(node, &ctx->aio_handlers, node) {
> -        if (node->fd == fd)
> +        if (node->pfd.fd == fd)

Forgot to run checkpatch.pl?

>              if (!node->deleted)
>                  return node;
>      }
> @@ -57,9 +57,10 @@ void aio_set_fd_handler(AioContext *ctx,
>      if (!io_read && !io_write) {
>          if (node) {
>              /* If the lock is held, just mark the node as deleted */
> -            if (ctx->walking_handlers)
> +            if (ctx->walking_handlers) {
>                  node->deleted = 1;
> -            else {
> +                node->pfd.revents = 0;
> +            } else {
>                  /* Otherwise, delete it for real.  We can't just mark it as
>                   * deleted because deleted nodes are only cleaned up after
>                   * releasing the walking_handlers lock.
> @@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx,
>          if (node == NULL) {
>              /* Alloc and insert if it's not already there */
>              node = g_malloc0(sizeof(AioHandler));
> -            node->fd = fd;
> +            node->pfd.fd = fd;
>              QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
>          }
>          /* Update handler with latest information */
> @@ -80,6 +81,10 @@ void aio_set_fd_handler(AioContext *ctx,
>          node->io_write = io_write;
>          node->io_flush = io_flush;
>          node->opaque = opaque;
> +
> +        node->pfd.events = G_IO_ERR;
> +        node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0);
> +        node->pfd.events |= (io_write ? G_IO_OUT : 0);
>      }
>  }
>
> @@ -93,6 +98,25 @@ void aio_set_event_notifier(AioContext *ctx,
>                         (AioFlushHandler *)io_flush, notifier);
>  }
>
> +bool aio_pending(AioContext *ctx)
> +{
> +    AioHandler *node;
> +
> +    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
> +        int revents;
> +
> +        revents = node->pfd.revents & node->pfd.events;
> +        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
> +            return true;
> +        }
> +        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
> +            return true;
> +        }
> +    }
> +
> +    return false;
> +}
> +
>  bool aio_poll(AioContext *ctx, bool blocking)
>  {
>      static struct timeval tv0;
> @@ -114,6 +138,42 @@ bool aio_poll(AioContext *ctx, bool blocking)
>          progress = true;
>      }
>
> +    /*
> +     * Then dispatch any pending callbacks from the GSource.
> +     *
> +     * We have to walk very carefully in case qemu_aio_set_fd_handler is
> +     * called while we're walking.
> +     */
> +    node = QLIST_FIRST(&ctx->aio_handlers);
> +    while (node) {
> +        AioHandler *tmp;
> +        int revents;
> +
> +        ctx->walking_handlers++;
> +
> +        revents = node->pfd.revents & node->pfd.events;
> +        node->pfd.revents &= ~revents;
> +
> +        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
> +            node->io_read(node->opaque);
> +            progress = true;
> +        }
> +        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
> +            node->io_write(node->opaque);
> +            progress = true;
> +        }
> +
> +        tmp = node;
> +        node = QLIST_NEXT(node, node);
> +
> +        ctx->walking_handlers--;
> +
> +        if (!ctx->walking_handlers && tmp->deleted) {
> +            QLIST_REMOVE(tmp, node);
> +            g_free(tmp);
> +        }
> +    }
> +
>      if (progress && !blocking) {
>          return true;
>      }
> @@ -137,12 +197,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
>              busy = true;
>          }
>          if (!node->deleted && node->io_read) {
> -            FD_SET(node->fd, &rdfds);
> -            max_fd = MAX(max_fd, node->fd + 1);
> +            FD_SET(node->pfd.fd, &rdfds);
> +            max_fd = MAX(max_fd, node->pfd.fd + 1);
>          }
>          if (!node->deleted && node->io_write) {
> -            FD_SET(node->fd, &wrfds);
> -            max_fd = MAX(max_fd, node->fd + 1);
> +            FD_SET(node->pfd.fd, &wrfds);
> +            max_fd = MAX(max_fd, node->pfd.fd + 1);
>          }
>      }
>
> @@ -167,12 +227,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
>              ctx->walking_handlers++;
>
>              if (!node->deleted &&
> -                FD_ISSET(node->fd, &rdfds) &&
> +                FD_ISSET(node->pfd.fd, &rdfds) &&
>                  node->io_read) {
>                  node->io_read(node->opaque);
>              }
>              if (!node->deleted &&
> -                FD_ISSET(node->fd, &wrfds) &&
> +                FD_ISSET(node->pfd.fd, &wrfds) &&
>                  node->io_write) {
>                  node->io_write(node->opaque);
>              }
> diff --git a/qemu-aio.h b/qemu-aio.h
> index f19201e..ac24896 100644
> --- a/qemu-aio.h
> +++ b/qemu-aio.h
> @@ -133,6 +133,13 @@ void qemu_bh_delete(QEMUBH *bh);
>   * outstanding AIO operations have been completed or cancelled. */
>  void aio_flush(AioContext *ctx);
>
> +/* Return whether there are any pending callbacks from the GSource
> + * attached to the AioContext.
> + *
> + * This is used internally in the implementation of the GSource.
> + */
> +bool aio_pending(AioContext *ctx);
> +
>  /* Progress in completing AIO work to occur.  This can issue new pending
>   * aio as a result of executing I/O completion or bh callbacks.
>   *
> --
> 1.7.12
>
>
>
Paolo Bonzini Oct. 1, 2012, 6:40 a.m. UTC | #5
Il 29/09/2012 13:28, Blue Swirl ha scritto:
>> > +        if (node->pfd.fd == fd)
> Forgot to run checkpatch.pl?
> 

No, just ignored its result for an RFC.

Paolo
diff mbox

Patch

diff --git a/aio.c b/aio.c
index 95ad467..c848a9f 100644
--- a/aio.c
+++ b/aio.c
@@ -20,7 +20,7 @@ 
 
 struct AioHandler
 {
-    int fd;
+    GPollFD pfd;
     IOHandler *io_read;
     IOHandler *io_write;
     AioFlushHandler *io_flush;
@@ -34,7 +34,7 @@  static AioHandler *find_aio_handler(AioContext *ctx, int fd)
     AioHandler *node;
 
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
-        if (node->fd == fd)
+        if (node->pfd.fd == fd)
             if (!node->deleted)
                 return node;
     }
@@ -57,9 +57,10 @@  void aio_set_fd_handler(AioContext *ctx,
     if (!io_read && !io_write) {
         if (node) {
             /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers)
+            if (ctx->walking_handlers) {
                 node->deleted = 1;
-            else {
+                node->pfd.revents = 0;
+            } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
                  * releasing the walking_handlers lock.
@@ -72,7 +73,7 @@  void aio_set_fd_handler(AioContext *ctx,
         if (node == NULL) {
             /* Alloc and insert if it's not already there */
             node = g_malloc0(sizeof(AioHandler));
-            node->fd = fd;
+            node->pfd.fd = fd;
             QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
         }
         /* Update handler with latest information */
@@ -80,6 +81,10 @@  void aio_set_fd_handler(AioContext *ctx,
         node->io_write = io_write;
         node->io_flush = io_flush;
         node->opaque = opaque;
+
+        node->pfd.events = G_IO_ERR;
+        node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0);
+        node->pfd.events |= (io_write ? G_IO_OUT : 0);
     }
 }
 
@@ -93,6 +98,25 @@  void aio_set_event_notifier(AioContext *ctx,
                        (AioFlushHandler *)io_flush, notifier);
 }
 
+bool aio_pending(AioContext *ctx)
+{
+    AioHandler *node;
+
+    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+        int revents;
+
+        revents = node->pfd.revents & node->pfd.events;
+        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
+            return true;
+        }
+        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 bool aio_poll(AioContext *ctx, bool blocking)
 {
     static struct timeval tv0;
@@ -114,6 +138,42 @@  bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
+    /*
+     * Then dispatch any pending callbacks from the GSource.
+     *
+     * We have to walk very carefully in case qemu_aio_set_fd_handler is
+     * called while we're walking.
+     */
+    node = QLIST_FIRST(&ctx->aio_handlers);
+    while (node) {
+        AioHandler *tmp;
+        int revents;
+
+        ctx->walking_handlers++;
+
+        revents = node->pfd.revents & node->pfd.events;
+        node->pfd.revents &= ~revents;
+
+        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
+            node->io_read(node->opaque);
+            progress = true;
+        }
+        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
+            node->io_write(node->opaque);
+            progress = true;
+        }
+
+        tmp = node;
+        node = QLIST_NEXT(node, node);
+
+        ctx->walking_handlers--;
+
+        if (!ctx->walking_handlers && tmp->deleted) {
+            QLIST_REMOVE(tmp, node);
+            g_free(tmp);
+        }
+    }
+
     if (progress && !blocking) {
         return true;
     }
@@ -137,12 +197,12 @@  bool aio_poll(AioContext *ctx, bool blocking)
             busy = true;
         }
         if (!node->deleted && node->io_read) {
-            FD_SET(node->fd, &rdfds);
-            max_fd = MAX(max_fd, node->fd + 1);
+            FD_SET(node->pfd.fd, &rdfds);
+            max_fd = MAX(max_fd, node->pfd.fd + 1);
         }
         if (!node->deleted && node->io_write) {
-            FD_SET(node->fd, &wrfds);
-            max_fd = MAX(max_fd, node->fd + 1);
+            FD_SET(node->pfd.fd, &wrfds);
+            max_fd = MAX(max_fd, node->pfd.fd + 1);
         }
     }
 
@@ -167,12 +227,12 @@  bool aio_poll(AioContext *ctx, bool blocking)
             ctx->walking_handlers++;
 
             if (!node->deleted &&
-                FD_ISSET(node->fd, &rdfds) &&
+                FD_ISSET(node->pfd.fd, &rdfds) &&
                 node->io_read) {
                 node->io_read(node->opaque);
             }
             if (!node->deleted &&
-                FD_ISSET(node->fd, &wrfds) &&
+                FD_ISSET(node->pfd.fd, &wrfds) &&
                 node->io_write) {
                 node->io_write(node->opaque);
             }
diff --git a/qemu-aio.h b/qemu-aio.h
index f19201e..ac24896 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -133,6 +133,13 @@  void qemu_bh_delete(QEMUBH *bh);
  * outstanding AIO operations have been completed or cancelled. */
 void aio_flush(AioContext *ctx);
 
+/* Return whether there are any pending callbacks from the GSource
+ * attached to the AioContext.
+ *
+ * This is used internally in the implementation of the GSource.
+ */
+bool aio_pending(AioContext *ctx);
+
 /* Progress in completing AIO work to occur.  This can issue new pending
  * aio as a result of executing I/O completion or bh callbacks.
  *