Message ID | 1348577763-12920-10-git-send-email-pbonzini@redhat.com |
---|---|
State | New |
Headers | show |
Paolo Bonzini <pbonzini@redhat.com> writes: > This adds a GPollFD to each AioHandler. It will then be possible to > attach these GPollFDs to a GSource, and from there to the main loop. > aio_wait examines the GPollFDs and avoids calling select() if any > is set (similar to what it does if bottom halves are available). > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> > --- > aio.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++--------- > qemu-aio.h | 7 ++++++ > 2 file modificati, 78 inserzioni(+), 11 rimozioni(-) > > diff --git a/aio.c b/aio.c > index 95ad467..c848a9f 100644 > --- a/aio.c > +++ b/aio.c > @@ -20,7 +20,7 @@ > > struct AioHandler > { > - int fd; > + GPollFD pfd; > IOHandler *io_read; > IOHandler *io_write; > AioFlushHandler *io_flush; > @@ -34,7 +34,7 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd) > AioHandler *node; > > QLIST_FOREACH(node, &ctx->aio_handlers, node) { > - if (node->fd == fd) > + if (node->pfd.fd == fd) > if (!node->deleted) > return node; > } > @@ -57,9 +57,10 @@ void aio_set_fd_handler(AioContext *ctx, > if (!io_read && !io_write) { > if (node) { > /* If the lock is held, just mark the node as deleted */ > - if (ctx->walking_handlers) > + if (ctx->walking_handlers) { > node->deleted = 1; > - else { > + node->pfd.revents = 0; > + } else { > /* Otherwise, delete it for real. We can't just mark it as > * deleted because deleted nodes are only cleaned up after > * releasing the walking_handlers lock. > @@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx, > if (node == NULL) { > /* Alloc and insert if it's not already there */ > node = g_malloc0(sizeof(AioHandler)); > - node->fd = fd; > + node->pfd.fd = fd; > QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); > } > /* Update handler with latest information */ > @@ -80,6 +81,10 @@ void aio_set_fd_handler(AioContext *ctx, > node->io_write = io_write; > node->io_flush = io_flush; > node->opaque = opaque; > + > + node->pfd.events = G_IO_ERR; > + node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0); > + node->pfd.events |= (io_write ? G_IO_OUT : 0); > } Should we even set G_IO_ERR? I think that corresponds to exceptfd in select() but we've never set that historically. I know glib recommends it but I don't think it's applicable to how we use it. Moreover, the way you do dispatch, if G_IO_ERR did occur, we'd dispatch both the read and write handlers which definitely isn't right. I think it's easiest just to drop it. > } > > @@ -93,6 +98,25 @@ void aio_set_event_notifier(AioContext *ctx, > (AioFlushHandler *)io_flush, notifier); > } > > +bool aio_pending(AioContext *ctx) > +{ > + AioHandler *node; > + > + QLIST_FOREACH(node, &ctx->aio_handlers, node) { > + int revents; > + > + revents = node->pfd.revents & node->pfd.events; > + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { > + return true; > + } > + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { > + return true; > + } > + } > + > + return false; > +} > + > bool aio_poll(AioContext *ctx, bool blocking) > { > static struct timeval tv0; > @@ -114,6 +138,42 @@ bool aio_poll(AioContext *ctx, bool blocking) > progress = true; > } > > + /* > + * Then dispatch any pending callbacks from the GSource. > + * > + * We have to walk very carefully in case qemu_aio_set_fd_handler is > + * called while we're walking. > + */ > + node = QLIST_FIRST(&ctx->aio_handlers); > + while (node) { > + AioHandler *tmp; > + int revents; > + > + ctx->walking_handlers++; > + > + revents = node->pfd.revents & node->pfd.events; > + node->pfd.revents &= ~revents; This is interesting and I must admit I don't understand why it's necessary. What case are you trying to handle? > + > + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { > + node->io_read(node->opaque); > + progress = true; > + } > + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { > + node->io_write(node->opaque); > + progress = true; > + } > + > + tmp = node; > + node = QLIST_NEXT(node, node); > + > + ctx->walking_handlers--; > + > + if (!ctx->walking_handlers && tmp->deleted) { > + QLIST_REMOVE(tmp, node); > + g_free(tmp); > + } > + } > + > if (progress && !blocking) { > return true; > } > @@ -137,12 +197,12 @@ bool aio_poll(AioContext *ctx, bool blocking) > busy = true; > } > if (!node->deleted && node->io_read) { > - FD_SET(node->fd, &rdfds); > - max_fd = MAX(max_fd, node->fd + 1); > + FD_SET(node->pfd.fd, &rdfds); > + max_fd = MAX(max_fd, node->pfd.fd + 1); > } > if (!node->deleted && node->io_write) { > - FD_SET(node->fd, &wrfds); > - max_fd = MAX(max_fd, node->fd + 1); > + FD_SET(node->pfd.fd, &wrfds); > + max_fd = MAX(max_fd, node->pfd.fd + 1); > } > } > > @@ -167,12 +227,12 @@ bool aio_poll(AioContext *ctx, bool blocking) > ctx->walking_handlers++; > > if (!node->deleted && > - FD_ISSET(node->fd, &rdfds) && > + FD_ISSET(node->pfd.fd, &rdfds) && > node->io_read) { > node->io_read(node->opaque); > } > if (!node->deleted && > - FD_ISSET(node->fd, &wrfds) && > + FD_ISSET(node->pfd.fd, &wrfds) && > node->io_write) { > node->io_write(node->opaque); > } > diff --git a/qemu-aio.h b/qemu-aio.h > index f19201e..ac24896 100644 > --- a/qemu-aio.h > +++ b/qemu-aio.h > @@ -133,6 +133,13 @@ void qemu_bh_delete(QEMUBH *bh); > * outstanding AIO operations have been completed or cancelled. */ > void aio_flush(AioContext *ctx); > > +/* Return whether there are any pending callbacks from the GSource > + * attached to the AioContext. > + * > + * This is used internally in the implementation of the GSource. > + */ > +bool aio_pending(AioContext *ctx); > + > /* Progress in completing AIO work to occur. This can issue new pending > * aio as a result of executing I/O completion or bh callbacks. > * > -- > 1.7.12 Regards, Anthony Liguori
Il 26/09/2012 00:01, Anthony Liguori ha scritto: >> > + node->pfd.events = G_IO_ERR; >> > + node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0); >> > + node->pfd.events |= (io_write ? G_IO_OUT : 0); >> > } > Should we even set G_IO_ERR? I think that corresponds to exceptfd No, that would be G_IO_PRI. > in select() but we've never set that historically. I know glib recommends > it but I don't think it's applicable to how we use it. > > Moreover, the way you do dispatch, if G_IO_ERR did occur, we'd dispatch > both the read and write handlers which definitely isn't right. I'm not sure what gives POLLERR. Probably a connect() that fails, and in that case dispatching on the write handler would be okay. But I was not sure and calling both is safe: handlers have to be ready for spurious wakeups anyway, it happens if qemu_aio_wait dispatches from a VCPU thread before the main loop gets hold of the big lock. > I think it's easiest just to drop it. That's indeed the case, since the current code never sets either G_IO_HUP or G_IO_ERR. Paolo
Il 26/09/2012 00:01, Anthony Liguori ha scritto: > > + revents = node->pfd.revents & node->pfd.events; > > + node->pfd.revents &= ~revents; > > This is interesting and I must admit I don't understand why it's > necessary. What case are you trying to handle? That's for the case where you got a write event for fd Y, but disabled the write handler from the handler of fd X (called before fd Y). But what the current code does is just eat the event, so I can do the same and set node->pfd.revents to 0. Paolo
On Tue, Sep 25, 2012 at 12:55 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > This adds a GPollFD to each AioHandler. It will then be possible to > attach these GPollFDs to a GSource, and from there to the main loop. > aio_wait examines the GPollFDs and avoids calling select() if any > is set (similar to what it does if bottom halves are available). > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> > --- > aio.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++--------- > qemu-aio.h | 7 ++++++ > 2 file modificati, 78 inserzioni(+), 11 rimozioni(-) > > diff --git a/aio.c b/aio.c > index 95ad467..c848a9f 100644 > --- a/aio.c > +++ b/aio.c > @@ -20,7 +20,7 @@ > > struct AioHandler > { > - int fd; > + GPollFD pfd; > IOHandler *io_read; > IOHandler *io_write; > AioFlushHandler *io_flush; > @@ -34,7 +34,7 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd) > AioHandler *node; > > QLIST_FOREACH(node, &ctx->aio_handlers, node) { > - if (node->fd == fd) > + if (node->pfd.fd == fd) Forgot to run checkpatch.pl? > if (!node->deleted) > return node; > } > @@ -57,9 +57,10 @@ void aio_set_fd_handler(AioContext *ctx, > if (!io_read && !io_write) { > if (node) { > /* If the lock is held, just mark the node as deleted */ > - if (ctx->walking_handlers) > + if (ctx->walking_handlers) { > node->deleted = 1; > - else { > + node->pfd.revents = 0; > + } else { > /* Otherwise, delete it for real. We can't just mark it as > * deleted because deleted nodes are only cleaned up after > * releasing the walking_handlers lock. > @@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx, > if (node == NULL) { > /* Alloc and insert if it's not already there */ > node = g_malloc0(sizeof(AioHandler)); > - node->fd = fd; > + node->pfd.fd = fd; > QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); > } > /* Update handler with latest information */ > @@ -80,6 +81,10 @@ void aio_set_fd_handler(AioContext *ctx, > node->io_write = io_write; > node->io_flush = io_flush; > node->opaque = opaque; > + > + node->pfd.events = G_IO_ERR; > + node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0); > + node->pfd.events |= (io_write ? G_IO_OUT : 0); > } > } > > @@ -93,6 +98,25 @@ void aio_set_event_notifier(AioContext *ctx, > (AioFlushHandler *)io_flush, notifier); > } > > +bool aio_pending(AioContext *ctx) > +{ > + AioHandler *node; > + > + QLIST_FOREACH(node, &ctx->aio_handlers, node) { > + int revents; > + > + revents = node->pfd.revents & node->pfd.events; > + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { > + return true; > + } > + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { > + return true; > + } > + } > + > + return false; > +} > + > bool aio_poll(AioContext *ctx, bool blocking) > { > static struct timeval tv0; > @@ -114,6 +138,42 @@ bool aio_poll(AioContext *ctx, bool blocking) > progress = true; > } > > + /* > + * Then dispatch any pending callbacks from the GSource. > + * > + * We have to walk very carefully in case qemu_aio_set_fd_handler is > + * called while we're walking. > + */ > + node = QLIST_FIRST(&ctx->aio_handlers); > + while (node) { > + AioHandler *tmp; > + int revents; > + > + ctx->walking_handlers++; > + > + revents = node->pfd.revents & node->pfd.events; > + node->pfd.revents &= ~revents; > + > + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { > + node->io_read(node->opaque); > + progress = true; > + } > + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { > + node->io_write(node->opaque); > + progress = true; > + } > + > + tmp = node; > + node = QLIST_NEXT(node, node); > + > + ctx->walking_handlers--; > + > + if (!ctx->walking_handlers && tmp->deleted) { > + QLIST_REMOVE(tmp, node); > + g_free(tmp); > + } > + } > + > if (progress && !blocking) { > return true; > } > @@ -137,12 +197,12 @@ bool aio_poll(AioContext *ctx, bool blocking) > busy = true; > } > if (!node->deleted && node->io_read) { > - FD_SET(node->fd, &rdfds); > - max_fd = MAX(max_fd, node->fd + 1); > + FD_SET(node->pfd.fd, &rdfds); > + max_fd = MAX(max_fd, node->pfd.fd + 1); > } > if (!node->deleted && node->io_write) { > - FD_SET(node->fd, &wrfds); > - max_fd = MAX(max_fd, node->fd + 1); > + FD_SET(node->pfd.fd, &wrfds); > + max_fd = MAX(max_fd, node->pfd.fd + 1); > } > } > > @@ -167,12 +227,12 @@ bool aio_poll(AioContext *ctx, bool blocking) > ctx->walking_handlers++; > > if (!node->deleted && > - FD_ISSET(node->fd, &rdfds) && > + FD_ISSET(node->pfd.fd, &rdfds) && > node->io_read) { > node->io_read(node->opaque); > } > if (!node->deleted && > - FD_ISSET(node->fd, &wrfds) && > + FD_ISSET(node->pfd.fd, &wrfds) && > node->io_write) { > node->io_write(node->opaque); > } > diff --git a/qemu-aio.h b/qemu-aio.h > index f19201e..ac24896 100644 > --- a/qemu-aio.h > +++ b/qemu-aio.h > @@ -133,6 +133,13 @@ void qemu_bh_delete(QEMUBH *bh); > * outstanding AIO operations have been completed or cancelled. */ > void aio_flush(AioContext *ctx); > > +/* Return whether there are any pending callbacks from the GSource > + * attached to the AioContext. > + * > + * This is used internally in the implementation of the GSource. > + */ > +bool aio_pending(AioContext *ctx); > + > /* Progress in completing AIO work to occur. This can issue new pending > * aio as a result of executing I/O completion or bh callbacks. > * > -- > 1.7.12 > > >
Il 29/09/2012 13:28, Blue Swirl ha scritto: >> > + if (node->pfd.fd == fd) > Forgot to run checkpatch.pl? > No, just ignored its result for an RFC. Paolo
diff --git a/aio.c b/aio.c index 95ad467..c848a9f 100644 --- a/aio.c +++ b/aio.c @@ -20,7 +20,7 @@ struct AioHandler { - int fd; + GPollFD pfd; IOHandler *io_read; IOHandler *io_write; AioFlushHandler *io_flush; @@ -34,7 +34,7 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd) AioHandler *node; QLIST_FOREACH(node, &ctx->aio_handlers, node) { - if (node->fd == fd) + if (node->pfd.fd == fd) if (!node->deleted) return node; } @@ -57,9 +57,10 @@ void aio_set_fd_handler(AioContext *ctx, if (!io_read && !io_write) { if (node) { /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) + if (ctx->walking_handlers) { node->deleted = 1; - else { + node->pfd.revents = 0; + } else { /* Otherwise, delete it for real. We can't just mark it as * deleted because deleted nodes are only cleaned up after * releasing the walking_handlers lock. @@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx, if (node == NULL) { /* Alloc and insert if it's not already there */ node = g_malloc0(sizeof(AioHandler)); - node->fd = fd; + node->pfd.fd = fd; QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); } /* Update handler with latest information */ @@ -80,6 +81,10 @@ void aio_set_fd_handler(AioContext *ctx, node->io_write = io_write; node->io_flush = io_flush; node->opaque = opaque; + + node->pfd.events = G_IO_ERR; + node->pfd.events |= (io_read ? G_IO_IN | G_IO_HUP : 0); + node->pfd.events |= (io_write ? G_IO_OUT : 0); } } @@ -93,6 +98,25 @@ void aio_set_event_notifier(AioContext *ctx, (AioFlushHandler *)io_flush, notifier); } +bool aio_pending(AioContext *ctx) +{ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + int revents; + + revents = node->pfd.revents & node->pfd.events; + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { + return true; + } + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { + return true; + } + } + + return false; +} + bool aio_poll(AioContext *ctx, bool blocking) { static struct timeval tv0; @@ -114,6 +138,42 @@ bool aio_poll(AioContext *ctx, bool blocking) progress = true; } + /* + * Then dispatch any pending callbacks from the GSource. + * + * We have to walk very carefully in case qemu_aio_set_fd_handler is + * called while we're walking. + */ + node = QLIST_FIRST(&ctx->aio_handlers); + while (node) { + AioHandler *tmp; + int revents; + + ctx->walking_handlers++; + + revents = node->pfd.revents & node->pfd.events; + node->pfd.revents &= ~revents; + + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { + node->io_read(node->opaque); + progress = true; + } + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + tmp = node; + node = QLIST_NEXT(node, node); + + ctx->walking_handlers--; + + if (!ctx->walking_handlers && tmp->deleted) { + QLIST_REMOVE(tmp, node); + g_free(tmp); + } + } + if (progress && !blocking) { return true; } @@ -137,12 +197,12 @@ bool aio_poll(AioContext *ctx, bool blocking) busy = true; } if (!node->deleted && node->io_read) { - FD_SET(node->fd, &rdfds); - max_fd = MAX(max_fd, node->fd + 1); + FD_SET(node->pfd.fd, &rdfds); + max_fd = MAX(max_fd, node->pfd.fd + 1); } if (!node->deleted && node->io_write) { - FD_SET(node->fd, &wrfds); - max_fd = MAX(max_fd, node->fd + 1); + FD_SET(node->pfd.fd, &wrfds); + max_fd = MAX(max_fd, node->pfd.fd + 1); } } @@ -167,12 +227,12 @@ bool aio_poll(AioContext *ctx, bool blocking) ctx->walking_handlers++; if (!node->deleted && - FD_ISSET(node->fd, &rdfds) && + FD_ISSET(node->pfd.fd, &rdfds) && node->io_read) { node->io_read(node->opaque); } if (!node->deleted && - FD_ISSET(node->fd, &wrfds) && + FD_ISSET(node->pfd.fd, &wrfds) && node->io_write) { node->io_write(node->opaque); } diff --git a/qemu-aio.h b/qemu-aio.h index f19201e..ac24896 100644 --- a/qemu-aio.h +++ b/qemu-aio.h @@ -133,6 +133,13 @@ void qemu_bh_delete(QEMUBH *bh); * outstanding AIO operations have been completed or cancelled. */ void aio_flush(AioContext *ctx); +/* Return whether there are any pending callbacks from the GSource + * attached to the AioContext. + * + * This is used internally in the implementation of the GSource. + */ +bool aio_pending(AioContext *ctx); + /* Progress in completing AIO work to occur. This can issue new pending * aio as a result of executing I/O completion or bh callbacks. *
This adds a GPollFD to each AioHandler. It will then be possible to attach these GPollFDs to a GSource, and from there to the main loop. aio_wait examines the GPollFDs and avoids calling select() if any is set (similar to what it does if bottom halves are available). Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> --- aio.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++--------- qemu-aio.h | 7 ++++++ 2 file modificati, 78 inserzioni(+), 11 rimozioni(-)