diff mbox

[v5,06/11] dataplane: add Linux AIO request queue

Message ID 1354740430-22452-7-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi Dec. 5, 2012, 8:47 p.m. UTC
The IOQueue has a pool of iocb structs and a function to add new
read/write requests.  Multiple requests can be added before calling the
submit function to actually tell the host kernel to begin I/O.  This
allows callers to batch requests and submit them in one go.

The actual I/O is performed using Linux AIO.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 hw/dataplane/Makefile.objs |   2 +-
 hw/dataplane/ioq.c         | 118 +++++++++++++++++++++++++++++++++++++++++++++
 hw/dataplane/ioq.h         |  57 ++++++++++++++++++++++
 3 files changed, 176 insertions(+), 1 deletion(-)
 create mode 100644 hw/dataplane/ioq.c
 create mode 100644 hw/dataplane/ioq.h

Comments

Kevin Wolf Dec. 7, 2012, 2:21 p.m. UTC | #1
Am 05.12.2012 21:47, schrieb Stefan Hajnoczi:
> The IOQueue has a pool of iocb structs and a function to add new
> read/write requests.  Multiple requests can be added before calling the
> submit function to actually tell the host kernel to begin I/O.  This
> allows callers to batch requests and submit them in one go.
> 
> The actual I/O is performed using Linux AIO.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  hw/dataplane/Makefile.objs |   2 +-
>  hw/dataplane/ioq.c         | 118 +++++++++++++++++++++++++++++++++++++++++++++
>  hw/dataplane/ioq.h         |  57 ++++++++++++++++++++++
>  3 files changed, 176 insertions(+), 1 deletion(-)
>  create mode 100644 hw/dataplane/ioq.c
>  create mode 100644 hw/dataplane/ioq.h
> 
> diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
> index e26bd7d..abd408f 100644
> --- a/hw/dataplane/Makefile.objs
> +++ b/hw/dataplane/Makefile.objs
> @@ -1,3 +1,3 @@
>  ifeq ($(CONFIG_VIRTIO), y)
> -common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o
> +common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o ioq.o
>  endif
> diff --git a/hw/dataplane/ioq.c b/hw/dataplane/ioq.c
> new file mode 100644
> index 0000000..7adeb5d
> --- /dev/null
> +++ b/hw/dataplane/ioq.c
> @@ -0,0 +1,118 @@
> +/*
> + * Linux AIO request queue
> + *
> + * Copyright 2012 IBM, Corp.
> + * Copyright 2012 Red Hat, Inc. and/or its affiliates
> + *
> + * Authors:
> + *   Stefan Hajnoczi <stefanha@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "hw/dataplane/ioq.h"
> +
> +void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs)
> +{
> +    int rc;
> +
> +    ioq->fd = fd;
> +    ioq->max_reqs = max_reqs;
> +
> +    memset(&ioq->io_ctx, 0, sizeof ioq->io_ctx);
> +    rc = io_setup(max_reqs, &ioq->io_ctx);
> +    if (rc != 0) {
> +        fprintf(stderr, "ioq io_setup failed %d\n", rc);
> +        exit(1);
> +    }
> +
> +    rc = event_notifier_init(&ioq->io_notifier, 0);
> +    if (rc != 0) {
> +        fprintf(stderr, "ioq io event notifier creation failed %d\n", rc);
> +        exit(1);
> +    }
> +
> +    ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * max_reqs);
> +    ioq->freelist_idx = 0;
> +
> +    ioq->queue = g_malloc0(sizeof ioq->queue[0] * max_reqs);
> +    ioq->queue_idx = 0;
> +}
> +
> +void ioq_cleanup(IOQueue *ioq)
> +{
> +    g_free(ioq->freelist);
> +    g_free(ioq->queue);
> +
> +    event_notifier_cleanup(&ioq->io_notifier);
> +    io_destroy(ioq->io_ctx);
> +}
> +
> +EventNotifier *ioq_get_notifier(IOQueue *ioq)
> +{
> +    return &ioq->io_notifier;
> +}
> +
> +struct iocb *ioq_get_iocb(IOQueue *ioq)
> +{
> +    if (unlikely(ioq->freelist_idx == 0)) {
> +        fprintf(stderr, "ioq underflow\n");
> +        exit(1);
> +    }

Can this happen? If no, it should be an assertion. If yes, the error
handling code is wrong, we can't just exit qemu. It's already not nice
to do it in setup functions, but during runtime I think it's not acceptable.

> +    struct iocb *iocb = ioq->freelist[--ioq->freelist_idx];
> +    ioq->queue[ioq->queue_idx++] = iocb;
> +    return iocb;
> +}
> +
> +void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
> +{
> +    if (unlikely(ioq->freelist_idx == ioq->max_reqs)) {
> +        fprintf(stderr, "ioq overflow\n");
> +        exit(1);
> +    }

Same here.

Kevin
Stefan Hajnoczi Dec. 10, 2012, 1:05 p.m. UTC | #2
On Fri, Dec 07, 2012 at 03:21:22PM +0100, Kevin Wolf wrote:
> Am 05.12.2012 21:47, schrieb Stefan Hajnoczi:
> > The IOQueue has a pool of iocb structs and a function to add new
> > read/write requests.  Multiple requests can be added before calling the
> > submit function to actually tell the host kernel to begin I/O.  This
> > allows callers to batch requests and submit them in one go.
> > 
> > The actual I/O is performed using Linux AIO.
> > 
> > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> > ---
> >  hw/dataplane/Makefile.objs |   2 +-
> >  hw/dataplane/ioq.c         | 118 +++++++++++++++++++++++++++++++++++++++++++++
> >  hw/dataplane/ioq.h         |  57 ++++++++++++++++++++++
> >  3 files changed, 176 insertions(+), 1 deletion(-)
> >  create mode 100644 hw/dataplane/ioq.c
> >  create mode 100644 hw/dataplane/ioq.h
> > 
> > diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
> > index e26bd7d..abd408f 100644
> > --- a/hw/dataplane/Makefile.objs
> > +++ b/hw/dataplane/Makefile.objs
> > @@ -1,3 +1,3 @@
> >  ifeq ($(CONFIG_VIRTIO), y)
> > -common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o
> > +common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o ioq.o
> >  endif
> > diff --git a/hw/dataplane/ioq.c b/hw/dataplane/ioq.c
> > new file mode 100644
> > index 0000000..7adeb5d
> > --- /dev/null
> > +++ b/hw/dataplane/ioq.c
> > @@ -0,0 +1,118 @@
> > +/*
> > + * Linux AIO request queue
> > + *
> > + * Copyright 2012 IBM, Corp.
> > + * Copyright 2012 Red Hat, Inc. and/or its affiliates
> > + *
> > + * Authors:
> > + *   Stefan Hajnoczi <stefanha@redhat.com>
> > + *
> > + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> > + * See the COPYING file in the top-level directory.
> > + *
> > + */
> > +
> > +#include "hw/dataplane/ioq.h"
> > +
> > +void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs)
> > +{
> > +    int rc;
> > +
> > +    ioq->fd = fd;
> > +    ioq->max_reqs = max_reqs;
> > +
> > +    memset(&ioq->io_ctx, 0, sizeof ioq->io_ctx);
> > +    rc = io_setup(max_reqs, &ioq->io_ctx);
> > +    if (rc != 0) {
> > +        fprintf(stderr, "ioq io_setup failed %d\n", rc);
> > +        exit(1);
> > +    }
> > +
> > +    rc = event_notifier_init(&ioq->io_notifier, 0);
> > +    if (rc != 0) {
> > +        fprintf(stderr, "ioq io event notifier creation failed %d\n", rc);
> > +        exit(1);
> > +    }
> > +
> > +    ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * max_reqs);
> > +    ioq->freelist_idx = 0;
> > +
> > +    ioq->queue = g_malloc0(sizeof ioq->queue[0] * max_reqs);
> > +    ioq->queue_idx = 0;
> > +}
> > +
> > +void ioq_cleanup(IOQueue *ioq)
> > +{
> > +    g_free(ioq->freelist);
> > +    g_free(ioq->queue);
> > +
> > +    event_notifier_cleanup(&ioq->io_notifier);
> > +    io_destroy(ioq->io_ctx);
> > +}
> > +
> > +EventNotifier *ioq_get_notifier(IOQueue *ioq)
> > +{
> > +    return &ioq->io_notifier;
> > +}
> > +
> > +struct iocb *ioq_get_iocb(IOQueue *ioq)
> > +{
> > +    if (unlikely(ioq->freelist_idx == 0)) {
> > +        fprintf(stderr, "ioq underflow\n");
> > +        exit(1);
> > +    }
> 
> Can this happen? If no, it should be an assertion. If yes, the error
> handling code is wrong, we can't just exit qemu. It's already not nice
> to do it in setup functions, but during runtime I think it's not acceptable.
> 
> > +    struct iocb *iocb = ioq->freelist[--ioq->freelist_idx];
> > +    ioq->queue[ioq->queue_idx++] = iocb;
> > +    return iocb;
> > +}
> > +
> > +void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
> > +{
> > +    if (unlikely(ioq->freelist_idx == ioq->max_reqs)) {
> > +        fprintf(stderr, "ioq overflow\n");
> > +        exit(1);
> > +    }
> 
> Same here.

The ioq is sized so that guest cannot submit more than max_reqs due to
vring size.  Therefore this cannot happen and I have changed them to
asserts.

Stefan
diff mbox

Patch

diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
index e26bd7d..abd408f 100644
--- a/hw/dataplane/Makefile.objs
+++ b/hw/dataplane/Makefile.objs
@@ -1,3 +1,3 @@ 
 ifeq ($(CONFIG_VIRTIO), y)
-common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o
+common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o ioq.o
 endif
diff --git a/hw/dataplane/ioq.c b/hw/dataplane/ioq.c
new file mode 100644
index 0000000..7adeb5d
--- /dev/null
+++ b/hw/dataplane/ioq.c
@@ -0,0 +1,118 @@ 
+/*
+ * Linux AIO request queue
+ *
+ * Copyright 2012 IBM, Corp.
+ * Copyright 2012 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ *   Stefan Hajnoczi <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "hw/dataplane/ioq.h"
+
+void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs)
+{
+    int rc;
+
+    ioq->fd = fd;
+    ioq->max_reqs = max_reqs;
+
+    memset(&ioq->io_ctx, 0, sizeof ioq->io_ctx);
+    rc = io_setup(max_reqs, &ioq->io_ctx);
+    if (rc != 0) {
+        fprintf(stderr, "ioq io_setup failed %d\n", rc);
+        exit(1);
+    }
+
+    rc = event_notifier_init(&ioq->io_notifier, 0);
+    if (rc != 0) {
+        fprintf(stderr, "ioq io event notifier creation failed %d\n", rc);
+        exit(1);
+    }
+
+    ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * max_reqs);
+    ioq->freelist_idx = 0;
+
+    ioq->queue = g_malloc0(sizeof ioq->queue[0] * max_reqs);
+    ioq->queue_idx = 0;
+}
+
+void ioq_cleanup(IOQueue *ioq)
+{
+    g_free(ioq->freelist);
+    g_free(ioq->queue);
+
+    event_notifier_cleanup(&ioq->io_notifier);
+    io_destroy(ioq->io_ctx);
+}
+
+EventNotifier *ioq_get_notifier(IOQueue *ioq)
+{
+    return &ioq->io_notifier;
+}
+
+struct iocb *ioq_get_iocb(IOQueue *ioq)
+{
+    if (unlikely(ioq->freelist_idx == 0)) {
+        fprintf(stderr, "ioq underflow\n");
+        exit(1);
+    }
+    struct iocb *iocb = ioq->freelist[--ioq->freelist_idx];
+    ioq->queue[ioq->queue_idx++] = iocb;
+    return iocb;
+}
+
+void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
+{
+    if (unlikely(ioq->freelist_idx == ioq->max_reqs)) {
+        fprintf(stderr, "ioq overflow\n");
+        exit(1);
+    }
+    ioq->freelist[ioq->freelist_idx++] = iocb;
+}
+
+struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov,
+                      unsigned int count, long long offset)
+{
+    struct iocb *iocb = ioq_get_iocb(ioq);
+
+    if (read) {
+        io_prep_preadv(iocb, ioq->fd, iov, count, offset);
+    } else {
+        io_prep_pwritev(iocb, ioq->fd, iov, count, offset);
+    }
+    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
+    return iocb;
+}
+
+int ioq_submit(IOQueue *ioq)
+{
+    int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
+    ioq->queue_idx = 0; /* reset */
+    return rc;
+}
+
+int ioq_run_completion(IOQueue *ioq, IOQueueCompletion *completion,
+                       void *opaque)
+{
+    struct io_event events[ioq->max_reqs];
+    int nevents, i;
+
+    nevents = io_getevents(ioq->io_ctx, 0, ioq->max_reqs, events, NULL);
+    if (unlikely(nevents < 0)) {
+        fprintf(stderr, "io_getevents failed %d\n", nevents);
+        exit(1);
+    }
+
+    for (i = 0; i < nevents; i++) {
+        ssize_t ret = ((uint64_t)events[i].res2 << 32) | events[i].res;
+
+        completion(events[i].obj, ret, opaque);
+        ioq_put_iocb(ioq, events[i].obj);
+    }
+    return nevents;
+}
diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
new file mode 100644
index 0000000..890db22
--- /dev/null
+++ b/hw/dataplane/ioq.h
@@ -0,0 +1,57 @@ 
+/*
+ * Linux AIO request queue
+ *
+ * Copyright 2012 IBM, Corp.
+ * Copyright 2012 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ *   Stefan Hajnoczi <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef IOQ_H
+#define IOQ_H
+
+#include <libaio.h>
+#include "event_notifier.h"
+
+typedef struct {
+    int fd;                         /* file descriptor */
+    unsigned int max_reqs;          /* max length of freelist and queue */
+
+    io_context_t io_ctx;            /* Linux AIO context */
+    EventNotifier io_notifier;      /* Linux AIO eventfd */
+
+    /* Requests can complete in any order so a free list is necessary to manage
+     * available iocbs.
+     */
+    struct iocb **freelist;         /* free iocbs */
+    unsigned int freelist_idx;
+
+    /* Multiple requests are queued up before submitting them all in one go */
+    struct iocb **queue;            /* queued iocbs */
+    unsigned int queue_idx;
+} IOQueue;
+
+void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs);
+void ioq_cleanup(IOQueue *ioq);
+EventNotifier *ioq_get_notifier(IOQueue *ioq);
+struct iocb *ioq_get_iocb(IOQueue *ioq);
+void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb);
+struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov,
+                      unsigned int count, long long offset);
+int ioq_submit(IOQueue *ioq);
+
+static inline unsigned int ioq_num_queued(IOQueue *ioq)
+{
+    return ioq->queue_idx;
+}
+
+typedef void IOQueueCompletion(struct iocb *iocb, ssize_t ret, void *opaque);
+int ioq_run_completion(IOQueue *ioq, IOQueueCompletion *completion,
+                       void *opaque);
+
+#endif /* IOQ_H */