Patchwork [4/8] qed: intelligent streaming implementation

login
register
mail settings
Submitter Stefan Hajnoczi
Date April 27, 2011, 1:27 p.m.
Message ID <1303910855-28999-5-git-send-email-stefanha@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/93048/
State New
Headers show

Comments

Stefan Hajnoczi - April 27, 2011, 1:27 p.m.
From: Anthony Liguori <aliguori@us.ibm.com>

Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>
---
 block/qed.c |  165 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 158 insertions(+), 7 deletions(-)

Patch

diff --git a/block/qed.c b/block/qed.c
index 7487683..56150c3 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -1222,11 +1222,11 @@  static void qed_aio_next_io(void *opaque, int ret)
                       io_fn, acb);
 }
 
-static BlockDriverAIOCB *qed_aio_setup(BlockDriverState *bs,
-                                       int64_t sector_num,
-                                       QEMUIOVector *qiov, int nb_sectors,
-                                       BlockDriverCompletionFunc *cb,
-                                       void *opaque, bool is_write)
+static QEDAIOCB *qed_aio_setup(BlockDriverState *bs,
+                               int64_t sector_num,
+                               QEMUIOVector *qiov, int nb_sectors,
+                               BlockDriverCompletionFunc *cb,
+                               void *opaque, bool is_write)
 {
     QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);
 
@@ -1242,8 +1242,22 @@  static BlockDriverAIOCB *qed_aio_setup(BlockDriverState *bs,
     acb->request.l2_table = NULL;
     qemu_iovec_init(&acb->cur_qiov, qiov->niov);
 
+    return acb;
+}
+
+static BlockDriverAIOCB *bdrv_qed_aio_setup(BlockDriverState *bs,
+                                            int64_t sector_num,
+                                            QEMUIOVector *qiov, int nb_sectors,
+                                            BlockDriverCompletionFunc *cb,
+                                            void *opaque, bool is_write)
+{
+    QEDAIOCB *acb;
+
+    acb = qed_aio_setup(bs, sector_num, qiov, nb_sectors,
+                        cb, opaque, is_write);
     /* Start request */
     qed_aio_next_io(acb, 0);
+
     return &acb->common;
 }
 
@@ -1253,7 +1267,8 @@  static BlockDriverAIOCB *bdrv_qed_aio_readv(BlockDriverState *bs,
                                             BlockDriverCompletionFunc *cb,
                                             void *opaque)
 {
-    return qed_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, false);
+    return bdrv_qed_aio_setup(bs, sector_num, qiov, nb_sectors,
+                              cb, opaque, false);
 }
 
 static BlockDriverAIOCB *bdrv_qed_aio_writev(BlockDriverState *bs,
@@ -1262,7 +1277,142 @@  static BlockDriverAIOCB *bdrv_qed_aio_writev(BlockDriverState *bs,
                                              BlockDriverCompletionFunc *cb,
                                              void *opaque)
 {
-    return qed_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, true);
+    return bdrv_qed_aio_setup(bs, sector_num, qiov, nb_sectors,
+                              cb, opaque, true);
+}
+
+typedef struct QEDStreamData {
+    QEDAIOCB *acb;
+    uint64_t offset;
+    QEMUIOVector qiov;
+    void *buffer;
+    size_t len;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+} QEDStreamData;
+
+static void qed_aio_stream_cb(void *opaque, int ret)
+{
+    QEDStreamData *stream_data = opaque;
+    QEDAIOCB *acb = stream_data->acb;
+
+    if (ret) {
+        ret = -EIO;
+    } else {
+        ret = (acb->end_pos - stream_data->offset) / BDRV_SECTOR_SIZE;
+    }
+
+    stream_data->cb(stream_data->opaque, ret);
+
+    qemu_iovec_destroy(&stream_data->qiov);
+    qemu_vfree(stream_data->buffer);
+    qemu_free(stream_data);
+}
+
+static void qed_stream_find_cluster_cb(void *opaque, int ret,
+                                       uint64_t offset, size_t len);
+
+/**
+ * Perform the next qed_find_cluster() from a BH
+ *
+ * This is necessary because we iterate over each cluster in turn.
+ * qed_find_cluster() may invoke its callback immediately without returning up
+ * the call stack, causing us to overflow the call stack.  By starting each
+ * iteration from a BH we guarantee that a fresh stack is used each time.
+ */
+static void qed_stream_next_cluster_bh(void *opaque)
+{
+    QEDStreamData *stream_data = opaque;
+    QEDAIOCB *acb = stream_data->acb;
+    BDRVQEDState *s = acb_to_s(acb);
+
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    acb->cur_pos += s->header.cluster_size;
+    acb->end_pos += s->header.cluster_size;
+
+    qed_find_cluster(s, &acb->request, acb->cur_pos,
+                     acb->end_pos - acb->cur_pos,
+                     qed_stream_find_cluster_cb, stream_data);
+}
+
+/**
+ * Search for an unallocated cluster adjusting the current request until we
+ * can use it to read an unallocated cluster.
+ *
+ * Callback from qed_find_cluster().
+ */
+static void qed_stream_find_cluster_cb(void *opaque, int ret,
+                                       uint64_t offset, size_t len)
+{
+    QEDStreamData *stream_data = opaque;
+    QEDAIOCB *acb = stream_data->acb;
+    BDRVQEDState *s = acb_to_s(acb);
+
+    if (ret < 0) {
+        qed_aio_complete(acb, ret);
+        return;
+    }
+
+    if (ret == QED_CLUSTER_FOUND ||
+        ret == QED_CLUSTER_ZERO) {
+        /* proceed to next cluster */
+
+        if (acb->end_pos == s->header.image_size) {
+            qed_aio_complete(acb, 0);
+            return;
+        }
+
+        acb->bh = qemu_bh_new(qed_stream_next_cluster_bh, stream_data);
+        qemu_bh_schedule(acb->bh);
+    } else {
+        /* found a hole, kick off request */
+        qed_aio_next_io(acb, 0);
+    }
+}
+
+static BlockDriverAIOCB *bdrv_qed_aio_stream(BlockDriverState *bs,
+                                             int64_t sector_num,
+                                             BlockDriverCompletionFunc *cb,
+                                             void *opaque)
+{
+    BDRVQEDState *s = bs->opaque;
+    QEDStreamData *stream_data;
+    QEDAIOCB *acb;
+    uint32_t cluster_size = s->header.cluster_size;
+    uint64_t start_cluster;
+    QEMUIOVector *qiov;
+
+    if (!(s->header.compat_features & QED_CF_COPY_ON_READ)) {
+        return NULL;
+    }
+
+    stream_data = qemu_mallocz(sizeof(*stream_data));
+
+    stream_data->cb = cb;
+    stream_data->opaque = opaque;
+    stream_data->len = cluster_size;
+    stream_data->buffer = qemu_blockalign(s->bs, cluster_size);
+    stream_data->offset = sector_num * BDRV_SECTOR_SIZE;
+
+    start_cluster = qed_start_of_cluster(s, stream_data->offset);
+    sector_num = start_cluster / BDRV_SECTOR_SIZE;
+
+    qiov = &stream_data->qiov;
+    qemu_iovec_init(qiov, 1);
+    qemu_iovec_add(qiov, stream_data->buffer, cluster_size);
+
+    acb = qed_aio_setup(bs, sector_num, qiov,
+                        cluster_size / BDRV_SECTOR_SIZE,
+                        qed_aio_stream_cb, stream_data, false);
+    stream_data->acb = acb;
+
+    qed_find_cluster(s, &acb->request, acb->cur_pos,
+                     acb->end_pos - acb->cur_pos,
+                     qed_stream_find_cluster_cb, stream_data);
+
+    return &acb->common;
 }
 
 static BlockDriverAIOCB *bdrv_qed_aio_flush(BlockDriverState *bs,
@@ -1412,6 +1562,7 @@  static BlockDriver bdrv_qed = {
     .bdrv_make_empty          = bdrv_qed_make_empty,
     .bdrv_aio_readv           = bdrv_qed_aio_readv,
     .bdrv_aio_writev          = bdrv_qed_aio_writev,
+    .bdrv_aio_stream          = bdrv_qed_aio_stream,
     .bdrv_aio_flush           = bdrv_qed_aio_flush,
     .bdrv_truncate            = bdrv_qed_truncate,
     .bdrv_getlength           = bdrv_qed_getlength,