Patchwork [RFC,11/13] dataplane: use block layer for I/O

login
register
mail settings
Submitter Stefan Hajnoczi
Date June 14, 2013, 9:48 a.m.
Message ID <1371203313-26490-12-git-send-email-stefanha@redhat.com>
Download mbox | patch
Permalink /patch/251312/
State New
Headers show

Comments

Stefan Hajnoczi - June 14, 2013, 9:48 a.m.
Use the QEMU block layer instead of custom Linux AIO code.  A couple of
changes are necessary to make request processing work in this new
environment:

1. Replace ioq_*() calls with bdrv_aio_*().  We finally support
   asynchronous flush since we get it for free from the QEMU block
   layer!

2. Add a data buffer QEMUIOVector field to struct VirtIOBlockRequest.
   We previously relied on io_submit(2) semantics where the kernel
   copied in iovecs, and we therefore didn't keep them around.  The QEMU
   block layer *does* require that the iovecs are alive across the
   duration of the request.

Note that we still prevent block jobs, hot unplug, and live migration
since there is no synchronization with the main thread.  The next patch
series will introduce a mechanism to arbitrate block layer calls between
threads.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 hw/block/dataplane/virtio-blk.c | 288 +++++++++++++---------------------------
 1 file changed, 92 insertions(+), 196 deletions(-)

Patch

diff --git a/hw/block/dataplane/virtio-blk.c b/hw/block/dataplane/virtio-blk.c
index 2faed43..aeb3668 100644
--- a/hw/block/dataplane/virtio-blk.c
+++ b/hw/block/dataplane/virtio-blk.c
@@ -17,7 +17,6 @@ 
 #include "qemu/thread.h"
 #include "qemu/error-report.h"
 #include "hw/virtio/dataplane/vring.h"
-#include "ioq.h"
 #include "migration/migration.h"
 #include "block/block.h"
 #include "hw/virtio/virtio-blk.h"
@@ -34,11 +33,10 @@  enum {
 };
 
 typedef struct {
-    struct iocb iocb;               /* Linux AIO control block */
-    QEMUIOVector *inhdr;            /* iovecs for virtio_blk_inhdr */
+    VirtIOBlockDataPlane *dataplane;
+    QEMUIOVector data;              /* data buffer */
+    QEMUIOVector inhdr;             /* iovecs for virtio_blk_inhdr */
     unsigned int head;              /* vring descriptor index */
-    struct iovec *bounce_iov;       /* used if guest buffers are unaligned */
-    QEMUIOVector *read_qiov;        /* for read completion /w bounce buffer */
 } VirtIOBlockRequest;
 
 struct VirtIOBlockDataPlane {
@@ -48,7 +46,7 @@  struct VirtIOBlockDataPlane {
     QemuThread thread;
 
     VirtIOBlkConf *blk;
-    int fd;                         /* image file descriptor */
+    BlockDriverState *bs;           /* block device */
 
     VirtIODevice *vdev;
     Vring vring;                    /* virtqueue vring */
@@ -60,14 +58,8 @@  struct VirtIOBlockDataPlane {
      * use it).
      */
     AioContext *ctx;
-    EventNotifier io_notifier;      /* Linux AIO completion */
     EventNotifier host_notifier;    /* doorbell */
 
-    IOQueue ioqueue;                /* Linux AIO queue (should really be per
-                                       dataplane thread) */
-    VirtIOBlockRequest requests[REQ_MAX]; /* pool of requests, managed by the
-                                             queue */
-
     unsigned int num_reqs;
 
     Error *migration_blocker;
@@ -83,133 +75,113 @@  static void notify_guest(VirtIOBlockDataPlane *s)
     event_notifier_set(s->guest_notifier);
 }
 
-static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
+static void complete_request(VirtIOBlockRequest *req,
+                             unsigned char status,
+                             int len)
 {
-    VirtIOBlockDataPlane *s = opaque;
-    VirtIOBlockRequest *req = container_of(iocb, VirtIOBlockRequest, iocb);
-    struct virtio_blk_inhdr hdr;
-    int len;
-
-    if (likely(ret >= 0)) {
-        hdr.status = VIRTIO_BLK_S_OK;
-        len = ret;
-    } else {
-        hdr.status = VIRTIO_BLK_S_IOERR;
-        len = 0;
-    }
-
-    trace_virtio_blk_data_plane_complete_request(s, req->head, ret);
-
-    if (req->read_qiov) {
-        assert(req->bounce_iov);
-        qemu_iovec_from_buf(req->read_qiov, 0, req->bounce_iov->iov_base, len);
-        qemu_iovec_destroy(req->read_qiov);
-        g_slice_free(QEMUIOVector, req->read_qiov);
-    }
-
-    if (req->bounce_iov) {
-        qemu_vfree(req->bounce_iov->iov_base);
-        g_slice_free(struct iovec, req->bounce_iov);
-    }
+    VirtIOBlockDataPlane *s = req->dataplane;
+    unsigned int head = req->head;
+    struct virtio_blk_inhdr hdr = {
+        .status = status,
+    };
 
-    qemu_iovec_from_buf(req->inhdr, 0, &hdr, sizeof(hdr));
-    qemu_iovec_destroy(req->inhdr);
-    g_slice_free(QEMUIOVector, req->inhdr);
+    qemu_iovec_from_buf(&req->inhdr, 0, &hdr, sizeof(hdr));
+    qemu_iovec_destroy(&req->inhdr);
+    qemu_iovec_destroy(&req->data);
+    g_slice_free(VirtIOBlockRequest, req);
 
     /* According to the virtio specification len should be the number of bytes
      * written to, but for virtio-blk it seems to be the number of bytes
      * transferred plus the status bytes.
      */
-    vring_push(&s->vring, req->head, len + sizeof(hdr));
+    vring_push(&s->vring, head, len + sizeof(hdr));
+    notify_guest(s);
 
     s->num_reqs--;
 }
 
-static void complete_request_early(VirtIOBlockDataPlane *s, unsigned int head,
-                                   QEMUIOVector *inhdr, unsigned char status)
+static void request_cb(void *opaque, int ret)
 {
-    struct virtio_blk_inhdr hdr = {
-        .status = status,
-    };
+    VirtIOBlockRequest *req = opaque;
+    VirtIOBlockDataPlane *s = req->dataplane;
+    unsigned char status;
+    int len;
 
-    qemu_iovec_from_buf(inhdr, 0, &hdr, sizeof(hdr));
-    qemu_iovec_destroy(inhdr);
-    g_slice_free(QEMUIOVector, inhdr);
+    /* Completion must be invoked in our thread */
+    assert(*get_thread_aio_context() == s->ctx);
 
-    vring_push(&s->vring, head, sizeof(hdr));
-    notify_guest(s);
+    if (likely(ret >= 0)) {
+        status = VIRTIO_BLK_S_OK;
+        len = ret;
+    } else {
+        status = VIRTIO_BLK_S_IOERR;
+        len = 0;
+    }
+
+    trace_virtio_blk_data_plane_complete_request(s, req->head, ret);
+
+    complete_request(req, status, len);
 }
 
 /* Get disk serial number */
-static void do_get_id_cmd(VirtIOBlockDataPlane *s,
-                          struct iovec *iov, unsigned int iov_cnt,
-                          unsigned int head, QEMUIOVector *inhdr)
+static void do_get_id_cmd(VirtIOBlockRequest *req,
+                          struct iovec *iov, unsigned int iov_cnt)
 {
+    VirtIOBlockDataPlane *s = req->dataplane;
     char id[VIRTIO_BLK_ID_BYTES];
 
     /* Serial number not NUL-terminated when shorter than buffer */
     strncpy(id, s->blk->serial ? s->blk->serial : "", sizeof(id));
     iov_from_buf(iov, iov_cnt, 0, id, sizeof(id));
-    complete_request_early(s, head, inhdr, VIRTIO_BLK_S_OK);
+    complete_request(req, VIRTIO_BLK_S_OK, 0);
 }
 
-static int do_rdwr_cmd(VirtIOBlockDataPlane *s, bool read,
+static int do_rdwr_cmd(VirtIOBlockRequest *req, bool read,
                        struct iovec *iov, unsigned int iov_cnt,
-                       long long offset, unsigned int head,
-                       QEMUIOVector *inhdr)
+                       long long offset)
 {
-    struct iocb *iocb;
-    QEMUIOVector qiov;
-    struct iovec *bounce_iov = NULL;
-    QEMUIOVector *read_qiov = NULL;
-
-    qemu_iovec_init_external(&qiov, iov, iov_cnt);
-    if (!bdrv_qiov_is_aligned(s->blk->conf.bs, &qiov)) {
-        void *bounce_buffer = qemu_blockalign(s->blk->conf.bs, qiov.size);
-
-        if (read) {
-            /* Need to copy back from bounce buffer on completion */
-            read_qiov = g_slice_new(QEMUIOVector);
-            qemu_iovec_init(read_qiov, iov_cnt);
-            qemu_iovec_concat_iov(read_qiov, iov, iov_cnt, 0, qiov.size);
-        } else {
-            qemu_iovec_to_buf(&qiov, 0, bounce_buffer, qiov.size);
-        }
+    VirtIOBlockDataPlane *s = req->dataplane;
+    int64_t sector_num = offset / BDRV_SECTOR_SIZE;
+    int nb_sectors;
+    unsigned int i;
 
-        /* Redirect I/O to aligned bounce buffer */
-        bounce_iov = g_slice_new(struct iovec);
-        bounce_iov->iov_base = bounce_buffer;
-        bounce_iov->iov_len = qiov.size;
-        iov = bounce_iov;
-        iov_cnt = 1;
+    for (i = 0; i < iov_cnt; i++) {
+        qemu_iovec_add(&req->data, iov[i].iov_base, iov[i].iov_len);
     }
+    nb_sectors = req->data.size / BDRV_SECTOR_SIZE;
 
-    iocb = ioq_rdwr(&s->ioqueue, read, iov, iov_cnt, offset);
-
-    /* Fill in virtio block metadata needed for completion */
-    VirtIOBlockRequest *req = container_of(iocb, VirtIOBlockRequest, iocb);
-    req->head = head;
-    req->inhdr = inhdr;
-    req->bounce_iov = bounce_iov;
-    req->read_qiov = read_qiov;
+    if (read) {
+        bdrv_aio_readv(s->bs, sector_num, &req->data, nb_sectors,
+                       request_cb, req);
+    } else {
+        bdrv_aio_writev(s->bs, sector_num, &req->data, nb_sectors,
+                        request_cb, req);
+    }
     return 0;
 }
 
-static int process_request(IOQueue *ioq, struct iovec iov[],
+static int process_request(VirtIOBlockDataPlane *s, struct iovec iov[],
                            unsigned int out_num, unsigned int in_num,
                            unsigned int head)
 {
-    VirtIOBlockDataPlane *s = container_of(ioq, VirtIOBlockDataPlane, ioqueue);
+    VirtIOBlockRequest *req;
     struct iovec *in_iov = &iov[out_num];
     struct virtio_blk_outhdr outhdr;
-    QEMUIOVector *inhdr;
     size_t in_size;
 
+    req = g_slice_new(VirtIOBlockRequest);
+    req->dataplane = s;
+    req->head = head;
+    qemu_iovec_init(&req->data, out_num + in_num);
+    qemu_iovec_init(&req->inhdr, 1);
+
+    s->num_reqs++;
+
     /* Copy in outhdr */
     if (unlikely(iov_to_buf(iov, out_num, 0, &outhdr,
                             sizeof(outhdr)) != sizeof(outhdr))) {
         error_report("virtio-blk request outhdr too short");
-        return -EFAULT;
+        goto fault;
     }
     iov_discard_front(&iov, &out_num, sizeof(outhdr));
 
@@ -217,11 +189,9 @@  static int process_request(IOQueue *ioq, struct iovec iov[],
     in_size = iov_size(in_iov, in_num);
     if (in_size < sizeof(struct virtio_blk_inhdr)) {
         error_report("virtio_blk request inhdr too short");
-        return -EFAULT;
+        goto fault;
     }
-    inhdr = g_slice_new(QEMUIOVector);
-    qemu_iovec_init(inhdr, 1);
-    qemu_iovec_concat_iov(inhdr, in_iov, in_num,
+    qemu_iovec_concat_iov(&req->inhdr, in_iov, in_num,
             in_size - sizeof(struct virtio_blk_inhdr),
             sizeof(struct virtio_blk_inhdr));
     iov_discard_back(in_iov, &in_num, sizeof(struct virtio_blk_inhdr));
@@ -231,37 +201,37 @@  static int process_request(IOQueue *ioq, struct iovec iov[],
 
     switch (outhdr.type) {
     case VIRTIO_BLK_T_IN:
-        do_rdwr_cmd(s, true, in_iov, in_num, outhdr.sector * 512, head, inhdr);
+        do_rdwr_cmd(req, true, in_iov, in_num, outhdr.sector * 512);
         return 0;
 
     case VIRTIO_BLK_T_OUT:
-        do_rdwr_cmd(s, false, iov, out_num, outhdr.sector * 512, head, inhdr);
+        do_rdwr_cmd(req, false, iov, out_num, outhdr.sector * 512);
         return 0;
 
     case VIRTIO_BLK_T_SCSI_CMD:
         /* TODO support SCSI commands */
-        complete_request_early(s, head, inhdr, VIRTIO_BLK_S_UNSUPP);
+        complete_request(req, VIRTIO_BLK_S_UNSUPP, 0);
         return 0;
 
     case VIRTIO_BLK_T_FLUSH:
-        /* TODO fdsync not supported by Linux AIO, do it synchronously here! */
-        if (qemu_fdatasync(s->fd) < 0) {
-            complete_request_early(s, head, inhdr, VIRTIO_BLK_S_IOERR);
-        } else {
-            complete_request_early(s, head, inhdr, VIRTIO_BLK_S_OK);
-        }
+        bdrv_aio_flush(s->bs, request_cb, req);
         return 0;
 
     case VIRTIO_BLK_T_GET_ID:
-        do_get_id_cmd(s, in_iov, in_num, head, inhdr);
+        do_get_id_cmd(req, in_iov, in_num);
         return 0;
 
     default:
         error_report("virtio-blk unsupported request type %#x", outhdr.type);
-        qemu_iovec_destroy(inhdr);
-        g_slice_free(QEMUIOVector, inhdr);
-        return -EFAULT;
+        goto fault;
     }
+
+fault:
+    qemu_iovec_destroy(&req->data);
+    qemu_iovec_destroy(&req->inhdr);
+    g_slice_free(VirtIOBlockRequest, req);
+    s->num_reqs--;
+    return -EFAULT;
 }
 
 static int flush_true(EventNotifier *e)
@@ -273,20 +243,8 @@  static void handle_notify(EventNotifier *e)
 {
     VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
                                            host_notifier);
-
-    /* There is one array of iovecs into which all new requests are extracted
-     * from the vring.  Requests are read from the vring and the translated
-     * descriptors are written to the iovecs array.  The iovecs do not have to
-     * persist across handle_notify() calls because the kernel copies the
-     * iovecs on io_submit().
-     *
-     * Handling io_submit() EAGAIN may require storing the requests across
-     * handle_notify() calls until the kernel has sufficient resources to
-     * accept more I/O.  This is not implemented yet.
-     */
-    struct iovec iovec[VRING_MAX];
-    struct iovec *end = &iovec[VRING_MAX];
-    struct iovec *iov = iovec;
+    struct iovec iov[VRING_MAX];
+    struct iovec *end = &iov[VRING_MAX];
 
     /* When a request is read from the vring, the index of the first descriptor
      * (aka head) is returned so that the completed request can be pushed onto
@@ -297,7 +255,6 @@  static void handle_notify(EventNotifier *e)
      */
     int head;
     unsigned int out_num = 0, in_num = 0;
-    unsigned int num_queued;
 
     event_notifier_test_and_clear(&s->host_notifier);
     for (;;) {
@@ -313,11 +270,10 @@  static void handle_notify(EventNotifier *e)
             trace_virtio_blk_data_plane_process_request(s, out_num, in_num,
                                                         head);
 
-            if (process_request(&s->ioqueue, iov, out_num, in_num, head) < 0) {
+            if (process_request(s, iov, out_num, in_num, head) < 0) {
                 vring_set_broken(&s->vring);
                 break;
             }
-            iov += out_num + in_num;
         }
 
         if (likely(head == -EAGAIN)) { /* vring emptied */
@@ -327,58 +283,18 @@  static void handle_notify(EventNotifier *e)
             if (vring_enable_notification(s->vdev, &s->vring)) {
                 break;
             }
-        } else { /* head == -ENOBUFS or fatal error, iovecs[] is depleted */
-            /* Since there are no iovecs[] left, stop processing for now.  Do
-             * not re-enable guest->host notifies since the I/O completion
-             * handler knows to check for more vring descriptors anyway.
-             */
-            break;
-        }
-    }
-
-    num_queued = ioq_num_queued(&s->ioqueue);
-    if (num_queued > 0) {
-        s->num_reqs += num_queued;
-
-        int rc = ioq_submit(&s->ioqueue);
-        if (unlikely(rc < 0)) {
-            fprintf(stderr, "ioq_submit failed %d\n", rc);
-            exit(1);
+        } else {
+            return; /* Fatal error, leave ring in broken state */
         }
     }
 }
 
-static int flush_io(EventNotifier *e)
-{
-    VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
-                                           io_notifier);
-
-    return s->num_reqs > 0;
-}
-
-static void handle_io(EventNotifier *e)
-{
-    VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
-                                           io_notifier);
-
-    event_notifier_test_and_clear(&s->io_notifier);
-    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
-        notify_guest(s);
-    }
-
-    /* If there were more requests than iovecs, the vring will not be empty yet
-     * so check again.  There should now be enough resources to process more
-     * requests.
-     */
-    if (unlikely(vring_more_avail(&s->vring))) {
-        handle_notify(&s->host_notifier);
-    }
-}
-
 static void *data_plane_thread(void *opaque)
 {
     VirtIOBlockDataPlane *s = opaque;
 
+    *alloc_thread_aio_context() = s->ctx;
+
     do {
         aio_poll(s->ctx, true);
     } while (!s->stopping || s->num_reqs > 0);
@@ -399,7 +315,6 @@  bool virtio_blk_data_plane_create(VirtIODevice *vdev, VirtIOBlkConf *blk,
                                   VirtIOBlockDataPlane **dataplane)
 {
     VirtIOBlockDataPlane *s;
-    int fd;
 
     *dataplane = NULL;
 
@@ -418,20 +333,13 @@  bool virtio_blk_data_plane_create(VirtIODevice *vdev, VirtIOBlkConf *blk,
         return false;
     }
 
-    fd = raw_get_aio_fd(blk->conf.bs);
-    if (fd < 0) {
-        error_report("drive is incompatible with x-data-plane, "
-                     "use format=raw,cache=none,aio=native");
-        return false;
-    }
-
     s = g_new0(VirtIOBlockDataPlane, 1);
     s->vdev = vdev;
-    s->fd = fd;
     s->blk = blk;
+    s->bs = blk->conf.bs;
 
     /* Prevent block operations that conflict with data plane thread */
-    bdrv_set_in_use(blk->conf.bs, 1);
+    bdrv_set_in_use(s->bs, 1);
 
     error_setg(&s->migration_blocker,
             "x-data-plane does not support migration");
@@ -450,7 +358,7 @@  void virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s)
     virtio_blk_data_plane_stop(s);
     migrate_del_blocker(s->migration_blocker);
     error_free(s->migration_blocker);
-    bdrv_set_in_use(s->blk->conf.bs, 0);
+    bdrv_set_in_use(s->bs, 0);
     g_free(s);
 }
 
@@ -459,7 +367,6 @@  void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(s->vdev)));
     VirtioBusClass *k = VIRTIO_BUS_GET_CLASS(qbus);
     VirtQueue *vq;
-    int i;
 
     if (s->started) {
         return;
@@ -488,14 +395,6 @@  void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
     s->host_notifier = *virtio_queue_get_host_notifier(vq);
     aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, flush_true);
 
-    /* Set up ioqueue */
-    ioq_init(&s->ioqueue, s->fd, REQ_MAX);
-    for (i = 0; i < ARRAY_SIZE(s->requests); i++) {
-        ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb);
-    }
-    s->io_notifier = *ioq_get_notifier(&s->ioqueue);
-    aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, flush_io);
-
     s->started = true;
     trace_virtio_blk_data_plane_start(s);
 
@@ -526,9 +425,6 @@  void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s)
         qemu_thread_join(&s->thread);
     }
 
-    aio_set_event_notifier(s->ctx, &s->io_notifier, NULL, NULL);
-    ioq_cleanup(&s->ioqueue);
-
     aio_set_event_notifier(s->ctx, &s->host_notifier, NULL, NULL);
     k->set_host_notifier(qbus->parent, 0, false);