diff mbox

rbd: switch from pipe to QEMUBH completion notification

Message ID 1386257913-13700-1-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi Dec. 5, 2013, 3:38 p.m. UTC
rbd callbacks are called from non-QEMU threads.  Up until now a pipe was
used to signal completion back to the QEMU iothread.

The pipe writer code handles EAGAIN using select(2).  The select(2) API
is not scalable since fd_set size is static.  FD_SET() can write beyond
the end of fd_set if the file descriptor number is too high.  (QEMU's
main loop uses poll(2) to avoid this issue with select(2).)

Since the pipe itself is quite clumsy to use and QEMUBH is now
thread-safe, just schedule a BH from the rbd callback function.  This
way we can simplify I/O completion in addition to eliminating the
potential FD_SET() crash when file descriptor numbers become too high.

Crash scenario: QEMU already has 1024 file descriptors open.  Hotplug an
rbd drive and get the pipe writer to take the select(2) code path.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
Josh: This patch has not been tested.  I have just compiled it.

 block/rbd.c | 130 ++++++++++--------------------------------------------------
 1 file changed, 22 insertions(+), 108 deletions(-)

Comments

Josh Durgin Dec. 28, 2013, 1:07 a.m. UTC | #1
On 12/05/2013 07:38 AM, Stefan Hajnoczi wrote:
> rbd callbacks are called from non-QEMU threads.  Up until now a pipe was
> used to signal completion back to the QEMU iothread.
>
> The pipe writer code handles EAGAIN using select(2).  The select(2) API
> is not scalable since fd_set size is static.  FD_SET() can write beyond
> the end of fd_set if the file descriptor number is too high.  (QEMU's
> main loop uses poll(2) to avoid this issue with select(2).)
>
> Since the pipe itself is quite clumsy to use and QEMUBH is now
> thread-safe, just schedule a BH from the rbd callback function.  This
> way we can simplify I/O completion in addition to eliminating the
> potential FD_SET() crash when file descriptor numbers become too high.
>
> Crash scenario: QEMU already has 1024 file descriptors open.  Hotplug an
> rbd drive and get the pipe writer to take the select(2) code path.
>
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> Josh: This patch has not been tested.  I have just compiled it.

Sorry for the delay. This is a great simplification, and it passed my
tests. Thanks!

Reviewed- and Tested-by: Josh Durgin <josh.durgin@inktank.com>
Stefan Hajnoczi Jan. 2, 2014, 3:54 a.m. UTC | #2
On Thu, Dec 05, 2013 at 04:38:33PM +0100, Stefan Hajnoczi wrote:
> rbd callbacks are called from non-QEMU threads.  Up until now a pipe was
> used to signal completion back to the QEMU iothread.
> 
> The pipe writer code handles EAGAIN using select(2).  The select(2) API
> is not scalable since fd_set size is static.  FD_SET() can write beyond
> the end of fd_set if the file descriptor number is too high.  (QEMU's
> main loop uses poll(2) to avoid this issue with select(2).)
> 
> Since the pipe itself is quite clumsy to use and QEMUBH is now
> thread-safe, just schedule a BH from the rbd callback function.  This
> way we can simplify I/O completion in addition to eliminating the
> potential FD_SET() crash when file descriptor numbers become too high.
> 
> Crash scenario: QEMU already has 1024 file descriptors open.  Hotplug an
> rbd drive and get the pipe writer to take the select(2) code path.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> Josh: This patch has not been tested.  I have just compiled it.
> 
>  block/rbd.c | 130 ++++++++++--------------------------------------------------
>  1 file changed, 22 insertions(+), 108 deletions(-)

Applied to my block-next tree:
https://github.com/stefanha/qemu/commits/block-next

Stefan
diff mbox

Patch

diff --git a/block/rbd.c b/block/rbd.c
index 4a1ea5b..441e757 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -95,18 +95,13 @@  typedef struct RADOSCB {
 #define RBD_FD_WRITE 1
 
 typedef struct BDRVRBDState {
-    int fds[2];
     rados_t cluster;
     rados_ioctx_t io_ctx;
     rbd_image_t image;
     char name[RBD_MAX_IMAGE_NAME_SIZE];
     char *snap;
-    int event_reader_pos;
-    RADOSCB *event_rcb;
 } BDRVRBDState;
 
-static void rbd_aio_bh_cb(void *opaque);
-
 static int qemu_rbd_next_tok(char *dst, int dst_len,
                              char *src, char delim,
                              const char *name,
@@ -369,9 +364,8 @@  static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options,
 }
 
 /*
- * This aio completion is being called from qemu_rbd_aio_event_reader()
- * and runs in qemu context. It schedules a bh, but just in case the aio
- * was not cancelled before.
+ * This aio completion is being called from rbd_finish_bh() and runs in qemu
+ * BH context.
  */
 static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
@@ -401,36 +395,19 @@  static void qemu_rbd_complete_aio(RADOSCB *rcb)
             acb->ret = r;
         }
     }
-    /* Note that acb->bh can be NULL in case where the aio was cancelled */
-    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
-    qemu_bh_schedule(acb->bh);
-    g_free(rcb);
-}
 
-/*
- * aio fd read handler. It runs in the qemu context and calls the
- * completion handling of completed rados aio operations.
- */
-static void qemu_rbd_aio_event_reader(void *opaque)
-{
-    BDRVRBDState *s = opaque;
+    g_free(rcb);
 
-    ssize_t ret;
+    if (acb->cmd == RBD_AIO_READ) {
+        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    acb->status = 0;
 
-    do {
-        char *p = (char *)&s->event_rcb;
-
-        /* now read the rcb pointer that was sent from a non qemu thread */
-        ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
-                   sizeof(s->event_rcb) - s->event_reader_pos);
-        if (ret > 0) {
-            s->event_reader_pos += ret;
-            if (s->event_reader_pos == sizeof(s->event_rcb)) {
-                s->event_reader_pos = 0;
-                qemu_rbd_complete_aio(s->event_rcb);
-            }
-        }
-    } while (ret < 0 && errno == EINTR);
+    if (!acb->cancelled) {
+        qemu_aio_release(acb);
+    }
 }
 
 /* TODO Convert to fine grained options */
@@ -538,23 +515,9 @@  static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
 
     bs->read_only = (s->snap != NULL);
 
-    s->event_reader_pos = 0;
-    r = qemu_pipe(s->fds);
-    if (r < 0) {
-        error_report("error opening eventfd");
-        goto failed;
-    }
-    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
-    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
-    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
-                            NULL, s);
-
-
     qemu_opts_del(opts);
     return 0;
 
-failed:
-    rbd_close(s->image);
 failed_open:
     rados_ioctx_destroy(s->io_ctx);
 failed_shutdown:
@@ -569,10 +532,6 @@  static void qemu_rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
-    close(s->fds[0]);
-    close(s->fds[1]);
-    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL);
-
     rbd_close(s->image);
     rados_ioctx_destroy(s->io_ctx);
     g_free(s->snap);
@@ -600,34 +559,11 @@  static const AIOCBInfo rbd_aiocb_info = {
     .cancel = qemu_rbd_aio_cancel,
 };
 
-static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
+static void rbd_finish_bh(void *opaque)
 {
-    int ret = 0;
-    while (1) {
-        fd_set wfd;
-        int fd = s->fds[RBD_FD_WRITE];
-
-        /* send the op pointer to the qemu thread that is responsible
-           for the aio/op completion. Must do it in a qemu thread context */
-        ret = write(fd, (void *)&rcb, sizeof(rcb));
-        if (ret >= 0) {
-            break;
-        }
-        if (errno == EINTR) {
-            continue;
-        }
-        if (errno != EAGAIN) {
-            break;
-        }
-
-        FD_ZERO(&wfd);
-        FD_SET(fd, &wfd);
-        do {
-            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
-        } while (ret < 0 && errno == EINTR);
-    }
-
-    return ret;
+    RADOSCB *rcb = opaque;
+    qemu_bh_delete(rcb->acb->bh);
+    qemu_rbd_complete_aio(rcb);
 }
 
 /*
@@ -635,40 +571,18 @@  static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
  *
  * Note: this function is being called from a non qemu thread so
  * we need to be careful about what we do here. Generally we only
- * write to the block notification pipe, and do the rest of the
- * io completion handling from qemu_rbd_aio_event_reader() which
- * runs in a qemu context.
+ * schedule a BH, and do the rest of the io completion handling
+ * from rbd_finish_bh() which runs in a qemu context.
  */
 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 {
-    int ret;
+    RBDAIOCB *acb = rcb->acb;
+
     rcb->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
-    ret = qemu_rbd_send_pipe(rcb->s, rcb);
-    if (ret < 0) {
-        error_report("failed writing to acb->s->fds");
-        g_free(rcb);
-    }
-}
-
-/* Callback when all queued rbd_aio requests are complete */
 
-static void rbd_aio_bh_cb(void *opaque)
-{
-    RBDAIOCB *acb = opaque;
-
-    if (acb->cmd == RBD_AIO_READ) {
-        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
-    }
-    qemu_vfree(acb->bounce);
-    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-    acb->status = 0;
-
-    if (!acb->cancelled) {
-        qemu_aio_release(acb);
-    }
+    acb->bh = qemu_bh_new(rbd_finish_bh, rcb);
+    qemu_bh_schedule(acb->bh);
 }
 
 static int rbd_aio_discard_wrapper(rbd_image_t image,