Patchwork [v4,06/11] dataplane: add Linux AIO request queue

login
register
mail settings
Submitter Stefan Hajnoczi
Date Nov. 22, 2012, 3:16 p.m.
Message ID <1353597412-12232-7-git-send-email-stefanha@redhat.com>
Download mbox | patch
Permalink /patch/201076/
State New
Headers show

Comments

Stefan Hajnoczi - Nov. 22, 2012, 3:16 p.m.
The IOQueue has a pool of iocb structs and a function to add new
read/write requests.  Multiple requests can be added before calling the
submit function to actually tell the host kernel to begin I/O.  This
allows callers to batch requests and submit them in one go.

The actual I/O is performed using Linux AIO.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 hw/dataplane/Makefile.objs |   2 +-
 hw/dataplane/ioq.c         | 118 +++++++++++++++++++++++++++++++++++++++++++++
 hw/dataplane/ioq.h         |  57 ++++++++++++++++++++++
 3 files changed, 176 insertions(+), 1 deletion(-)
 create mode 100644 hw/dataplane/ioq.c
 create mode 100644 hw/dataplane/ioq.h

Patch

diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
index e26bd7d..abd408f 100644
--- a/hw/dataplane/Makefile.objs
+++ b/hw/dataplane/Makefile.objs
@@ -1,3 +1,3 @@ 
 ifeq ($(CONFIG_VIRTIO), y)
-common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o
+common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o event-poll.o ioq.o
 endif
diff --git a/hw/dataplane/ioq.c b/hw/dataplane/ioq.c
new file mode 100644
index 0000000..7adeb5d
--- /dev/null
+++ b/hw/dataplane/ioq.c
@@ -0,0 +1,118 @@ 
+/*
+ * Linux AIO request queue
+ *
+ * Copyright 2012 IBM, Corp.
+ * Copyright 2012 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ *   Stefan Hajnoczi <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "hw/dataplane/ioq.h"
+
+void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs)
+{
+    int rc;
+
+    ioq->fd = fd;
+    ioq->max_reqs = max_reqs;
+
+    memset(&ioq->io_ctx, 0, sizeof ioq->io_ctx);
+    rc = io_setup(max_reqs, &ioq->io_ctx);
+    if (rc != 0) {
+        fprintf(stderr, "ioq io_setup failed %d\n", rc);
+        exit(1);
+    }
+
+    rc = event_notifier_init(&ioq->io_notifier, 0);
+    if (rc != 0) {
+        fprintf(stderr, "ioq io event notifier creation failed %d\n", rc);
+        exit(1);
+    }
+
+    ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * max_reqs);
+    ioq->freelist_idx = 0;
+
+    ioq->queue = g_malloc0(sizeof ioq->queue[0] * max_reqs);
+    ioq->queue_idx = 0;
+}
+
+void ioq_cleanup(IOQueue *ioq)
+{
+    g_free(ioq->freelist);
+    g_free(ioq->queue);
+
+    event_notifier_cleanup(&ioq->io_notifier);
+    io_destroy(ioq->io_ctx);
+}
+
+EventNotifier *ioq_get_notifier(IOQueue *ioq)
+{
+    return &ioq->io_notifier;
+}
+
+struct iocb *ioq_get_iocb(IOQueue *ioq)
+{
+    if (unlikely(ioq->freelist_idx == 0)) {
+        fprintf(stderr, "ioq underflow\n");
+        exit(1);
+    }
+    struct iocb *iocb = ioq->freelist[--ioq->freelist_idx];
+    ioq->queue[ioq->queue_idx++] = iocb;
+    return iocb;
+}
+
+void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
+{
+    if (unlikely(ioq->freelist_idx == ioq->max_reqs)) {
+        fprintf(stderr, "ioq overflow\n");
+        exit(1);
+    }
+    ioq->freelist[ioq->freelist_idx++] = iocb;
+}
+
+struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov,
+                      unsigned int count, long long offset)
+{
+    struct iocb *iocb = ioq_get_iocb(ioq);
+
+    if (read) {
+        io_prep_preadv(iocb, ioq->fd, iov, count, offset);
+    } else {
+        io_prep_pwritev(iocb, ioq->fd, iov, count, offset);
+    }
+    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
+    return iocb;
+}
+
+int ioq_submit(IOQueue *ioq)
+{
+    int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
+    ioq->queue_idx = 0; /* reset */
+    return rc;
+}
+
+int ioq_run_completion(IOQueue *ioq, IOQueueCompletion *completion,
+                       void *opaque)
+{
+    struct io_event events[ioq->max_reqs];
+    int nevents, i;
+
+    nevents = io_getevents(ioq->io_ctx, 0, ioq->max_reqs, events, NULL);
+    if (unlikely(nevents < 0)) {
+        fprintf(stderr, "io_getevents failed %d\n", nevents);
+        exit(1);
+    }
+
+    for (i = 0; i < nevents; i++) {
+        ssize_t ret = ((uint64_t)events[i].res2 << 32) | events[i].res;
+
+        completion(events[i].obj, ret, opaque);
+        ioq_put_iocb(ioq, events[i].obj);
+    }
+    return nevents;
+}
diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
new file mode 100644
index 0000000..890db22
--- /dev/null
+++ b/hw/dataplane/ioq.h
@@ -0,0 +1,57 @@ 
+/*
+ * Linux AIO request queue
+ *
+ * Copyright 2012 IBM, Corp.
+ * Copyright 2012 Red Hat, Inc. and/or its affiliates
+ *
+ * Authors:
+ *   Stefan Hajnoczi <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef IOQ_H
+#define IOQ_H
+
+#include <libaio.h>
+#include "event_notifier.h"
+
+typedef struct {
+    int fd;                         /* file descriptor */
+    unsigned int max_reqs;          /* max length of freelist and queue */
+
+    io_context_t io_ctx;            /* Linux AIO context */
+    EventNotifier io_notifier;      /* Linux AIO eventfd */
+
+    /* Requests can complete in any order so a free list is necessary to manage
+     * available iocbs.
+     */
+    struct iocb **freelist;         /* free iocbs */
+    unsigned int freelist_idx;
+
+    /* Multiple requests are queued up before submitting them all in one go */
+    struct iocb **queue;            /* queued iocbs */
+    unsigned int queue_idx;
+} IOQueue;
+
+void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs);
+void ioq_cleanup(IOQueue *ioq);
+EventNotifier *ioq_get_notifier(IOQueue *ioq);
+struct iocb *ioq_get_iocb(IOQueue *ioq);
+void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb);
+struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov,
+                      unsigned int count, long long offset);
+int ioq_submit(IOQueue *ioq);
+
+static inline unsigned int ioq_num_queued(IOQueue *ioq)
+{
+    return ioq->queue_idx;
+}
+
+typedef void IOQueueCompletion(struct iocb *iocb, ssize_t ret, void *opaque);
+int ioq_run_completion(IOQueue *ioq, IOQueueCompletion *completion,
+                       void *opaque);
+
+#endif /* IOQ_H */