Patchwork [05/10] qcow2: Use coroutines

login
register
mail settings
Submitter Kevin Wolf
Date July 26, 2011, 11:49 a.m.
Message ID <1311680948-7648-6-git-send-email-kwolf@redhat.com>
Download mbox | patch
Permalink /patch/106859/
State New
Headers show

Comments

Kevin Wolf - July 26, 2011, 11:49 a.m.
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 block/qcow2-cluster.c |   26 +++---
 block/qcow2.c         |  240 ++++++++++++++++++-------------------------------
 block/qcow2.h         |    5 +-
 3 files changed, 102 insertions(+), 169 deletions(-)
Stefan Hajnoczi - July 29, 2011, 1:20 p.m.
On Tue, Jul 26, 2011 at 12:49 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
>  block/qcow2-cluster.c |   26 +++---
>  block/qcow2.c         |  240 ++++++++++++++++++-------------------------------
>  block/qcow2.h         |    5 +-
>  3 files changed, 102 insertions(+), 169 deletions(-)

I like this.  The only thing that I would like to see is comments
explain what "lock" protects and why.

As I understand it lock is needed because the metadata accesses which
use bdrv_pread() and friends will yield.  So it is necessary to
prevent in-memory qcow2 metadata from getting corrupted during these
operations.

Stefan

Patch

diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 882f50a..81cf77d 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -697,12 +697,12 @@  err:
  * m->depends_on is set to NULL and the other fields in m are meaningless.
  *
  * If the cluster is newly allocated, m->nb_clusters is set to the number of
- * contiguous clusters that have been allocated. This may be 0 if the request
- * conflict with another write request in flight; in this case, m->depends_on
- * is set and the remaining fields of m are meaningless.
+ * contiguous clusters that have been allocated. In this case, the other
+ * fields of m are valid and contain information about the first allocated
+ * cluster.
  *
- * If m->nb_clusters is non-zero, the other fields of m are valid and contain
- * information about the first allocated cluster.
+ * If the request conflicts with another write request in flight, the coroutine
+ * is queued and will be reentered when the dependency has completed.
  *
  * Return 0 on success and -errno in error cases
  */
@@ -721,6 +721,7 @@  int qcow2_alloc_cluster_offset(BlockDriverState *bs, uint64_t offset,
         return ret;
     }
 
+again:
     nb_clusters = size_to_clusters(s, n_end << 9);
 
     nb_clusters = MIN(nb_clusters, s->l2_size - l2_index);
@@ -792,12 +793,12 @@  int qcow2_alloc_cluster_offset(BlockDriverState *bs, uint64_t offset,
             }
 
             if (nb_clusters == 0) {
-                /* Set dependency and wait for a callback */
-                m->depends_on = old_alloc;
-                m->nb_clusters = 0;
-                *num = 0;
-
-                goto out_wait_dependency;
+                /* Wait for the dependency to complete. We need to recheck
+                 * the free/allocated clusters when we continue. */
+                qemu_co_mutex_unlock(&s->lock);
+                qemu_co_queue_wait(&old_alloc->dependent_requests);
+                qemu_co_mutex_lock(&s->lock);
+                goto again;
             }
         }
     }
@@ -834,9 +835,6 @@  out:
 
     return 0;
 
-out_wait_dependency:
-    return qcow2_cache_put(bs, s->l2_table_cache, (void**) &l2_table);
-
 fail:
     qcow2_cache_put(bs, s->l2_table_cache, (void**) &l2_table);
 fail_put:
diff --git a/block/qcow2.c b/block/qcow2.c
index 48e1b95..f07d550 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -276,6 +276,9 @@  static int qcow2_open(BlockDriverState *bs, int flags)
         goto fail;
     }
 
+    /* Initialise locks */
+    qemu_co_mutex_init(&s->lock);
+
 #ifdef DEBUG_ALLOC
     qcow2_check_refcounts(bs);
 #endif
@@ -379,7 +382,6 @@  typedef struct QCowAIOCB {
     uint64_t cluster_offset;
     uint8_t *cluster_data;
     bool is_write;
-    BlockDriverAIOCB *hd_aiocb;
     QEMUIOVector hd_qiov;
     QEMUBH *bh;
     QCowL2Meta l2meta;
@@ -389,8 +391,6 @@  typedef struct QCowAIOCB {
 static void qcow2_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     QCowAIOCB *acb = container_of(blockacb, QCowAIOCB, common);
-    if (acb->hd_aiocb)
-        bdrv_aio_cancel(acb->hd_aiocb);
     qemu_aio_release(acb);
 }
 
@@ -399,46 +399,16 @@  static AIOPool qcow2_aio_pool = {
     .cancel             = qcow2_aio_cancel,
 };
 
-static void qcow2_aio_read_cb(void *opaque, int ret);
-static void qcow2_aio_write_cb(void *opaque, int ret);
-
-static void qcow2_aio_rw_bh(void *opaque)
-{
-    QCowAIOCB *acb = opaque;
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-
-    if (acb->is_write) {
-        qcow2_aio_write_cb(opaque, 0);
-    } else {
-        qcow2_aio_read_cb(opaque, 0);
-    }
-}
-
-static int qcow2_schedule_bh(QEMUBHFunc *cb, QCowAIOCB *acb)
-{
-    if (acb->bh)
-        return -EIO;
-
-    acb->bh = qemu_bh_new(cb, acb);
-    if (!acb->bh)
-        return -EIO;
-
-    qemu_bh_schedule(acb->bh);
-
-    return 0;
-}
-
-static void qcow2_aio_read_cb(void *opaque, int ret)
+/*
+ * Returns 0 when the request is completed successfully, 1 when there is still
+ * a part left to do and -errno in error cases.
+ */
+static int qcow2_aio_read_cb(QCowAIOCB *acb)
 {
-    QCowAIOCB *acb = opaque;
     BlockDriverState *bs = acb->common.bs;
     BDRVQcowState *s = bs->opaque;
     int index_in_cluster, n1;
-
-    acb->hd_aiocb = NULL;
-    if (ret < 0)
-        goto done;
+    int ret;
 
     /* post process the read buffer */
     if (!acb->cluster_offset) {
@@ -463,8 +433,7 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
 
     if (acb->remaining_sectors == 0) {
         /* request completed */
-        ret = 0;
-        goto done;
+        return 0;
     }
 
     /* prepare next AIO request */
@@ -477,7 +446,7 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
     ret = qcow2_get_cluster_offset(bs, acb->sector_num << 9,
         &acb->cur_nr_sectors, &acb->cluster_offset);
     if (ret < 0) {
-        goto done;
+        return ret;
     }
 
     index_in_cluster = acb->sector_num & (s->cluster_sectors - 1);
@@ -494,42 +463,35 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
                 acb->sector_num, acb->cur_nr_sectors);
             if (n1 > 0) {
                 BLKDBG_EVENT(bs->file, BLKDBG_READ_BACKING_AIO);
-                acb->hd_aiocb = bdrv_aio_readv(bs->backing_hd, acb->sector_num,
-                                    &acb->hd_qiov, n1, qcow2_aio_read_cb, acb);
-                if (acb->hd_aiocb == NULL) {
-                    ret = -EIO;
-                    goto done;
+                qemu_co_mutex_unlock(&s->lock);
+                ret = bdrv_co_readv(bs->backing_hd, acb->sector_num,
+                                    n1, &acb->hd_qiov);
+                qemu_co_mutex_lock(&s->lock);
+                if (ret < 0) {
+                    return ret;
                 }
-            } else {
-                ret = qcow2_schedule_bh(qcow2_aio_rw_bh, acb);
-                if (ret < 0)
-                    goto done;
             }
+            return 1;
         } else {
             /* Note: in this case, no need to wait */
             qemu_iovec_memset(&acb->hd_qiov, 0, 512 * acb->cur_nr_sectors);
-            ret = qcow2_schedule_bh(qcow2_aio_rw_bh, acb);
-            if (ret < 0)
-                goto done;
+            return 1;
         }
     } else if (acb->cluster_offset & QCOW_OFLAG_COMPRESSED) {
         /* add AIO support for compressed blocks ? */
         ret = qcow2_decompress_cluster(bs, acb->cluster_offset);
         if (ret < 0) {
-            goto done;
+            return ret;
         }
 
         qemu_iovec_from_buffer(&acb->hd_qiov,
             s->cluster_cache + index_in_cluster * 512,
             512 * acb->cur_nr_sectors);
 
-        ret = qcow2_schedule_bh(qcow2_aio_rw_bh, acb);
-        if (ret < 0)
-            goto done;
+        return 1;
     } else {
         if ((acb->cluster_offset & 511) != 0) {
-            ret = -EIO;
-            goto done;
+            return -EIO;
         }
 
         if (s->crypt_method) {
@@ -550,21 +512,17 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
         }
 
         BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
-        acb->hd_aiocb = bdrv_aio_readv(bs->file,
+        qemu_co_mutex_unlock(&s->lock);
+        ret = bdrv_co_readv(bs->file,
                             (acb->cluster_offset >> 9) + index_in_cluster,
-                            &acb->hd_qiov, acb->cur_nr_sectors,
-                            qcow2_aio_read_cb, acb);
-        if (acb->hd_aiocb == NULL) {
-            ret = -EIO;
-            goto done;
+                            acb->cur_nr_sectors, &acb->hd_qiov);
+        qemu_co_mutex_lock(&s->lock);
+        if (ret < 0) {
+            return ret;
         }
     }
 
-    return;
-done:
-    acb->common.cb(acb->common.opaque, ret);
-    qemu_iovec_destroy(&acb->hd_qiov);
-    qemu_aio_release(acb);
+    return 1;
 }
 
 static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
@@ -577,7 +535,6 @@  static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
     acb = qemu_aio_get(&qcow2_aio_pool, bs, cb, opaque);
     if (!acb)
         return NULL;
-    acb->hd_aiocb = NULL;
     acb->sector_num = sector_num;
     acb->qiov = qiov;
     acb->is_write = is_write;
@@ -589,70 +546,65 @@  static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
     acb->cur_nr_sectors = 0;
     acb->cluster_offset = 0;
     acb->l2meta.nb_clusters = 0;
-    QLIST_INIT(&acb->l2meta.dependent_requests);
+    qemu_co_queue_init(&acb->l2meta.dependent_requests);
     return acb;
 }
 
-static BlockDriverAIOCB *qcow2_aio_readv(BlockDriverState *bs,
-                                         int64_t sector_num,
-                                         QEMUIOVector *qiov, int nb_sectors,
-                                         BlockDriverCompletionFunc *cb,
-                                         void *opaque)
+static int qcow2_co_readv(BlockDriverState *bs, int64_t sector_num,
+                          int nb_sectors, QEMUIOVector *qiov)
 {
+    BDRVQcowState *s = bs->opaque;
     QCowAIOCB *acb;
     int ret;
 
-    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
-    if (!acb)
-        return NULL;
+    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, NULL, NULL, 0);
 
-    ret = qcow2_schedule_bh(qcow2_aio_rw_bh, acb);
-    if (ret < 0) {
-        qemu_iovec_destroy(&acb->hd_qiov);
-        qemu_aio_release(acb);
-        return NULL;
-    }
+    qemu_co_mutex_lock(&s->lock);
+    do {
+        ret = qcow2_aio_read_cb(acb);
+    } while (ret > 0);
+    qemu_co_mutex_unlock(&s->lock);
 
-    return &acb->common;
+    qemu_iovec_destroy(&acb->hd_qiov);
+    qemu_aio_release(acb);
+
+    return ret;
 }
 
-static void run_dependent_requests(QCowL2Meta *m)
+static void run_dependent_requests(BDRVQcowState *s, QCowL2Meta *m)
 {
-    QCowAIOCB *req;
-    QCowAIOCB *next;
-
     /* Take the request off the list of running requests */
     if (m->nb_clusters != 0) {
         QLIST_REMOVE(m, next_in_flight);
     }
 
     /* Restart all dependent requests */
-    QLIST_FOREACH_SAFE(req, &m->dependent_requests, next_depend, next) {
-        qcow2_aio_write_cb(req, 0);
+    if (!qemu_co_queue_empty(&m->dependent_requests)) {
+        qemu_co_mutex_unlock(&s->lock);
+        while(qemu_co_queue_next(&m->dependent_requests));
+        qemu_co_mutex_lock(&s->lock);
     }
-
-    /* Empty the list for the next part of the request */
-    QLIST_INIT(&m->dependent_requests);
 }
 
-static void qcow2_aio_write_cb(void *opaque, int ret)
+/*
+ * Returns 0 when the request is completed successfully, 1 when there is still
+ * a part left to do and -errno in error cases.
+ */
+static int qcow2_aio_write_cb(QCowAIOCB *acb)
 {
-    QCowAIOCB *acb = opaque;
     BlockDriverState *bs = acb->common.bs;
     BDRVQcowState *s = bs->opaque;
     int index_in_cluster;
     int n_end;
+    int ret;
 
-    acb->hd_aiocb = NULL;
-
-    if (ret >= 0) {
-        ret = qcow2_alloc_cluster_link_l2(bs, &acb->l2meta);
-    }
+    ret = qcow2_alloc_cluster_link_l2(bs, &acb->l2meta);
 
-    run_dependent_requests(&acb->l2meta);
+    run_dependent_requests(s, &acb->l2meta);
 
-    if (ret < 0)
-        goto done;
+    if (ret < 0) {
+        return ret;
+    }
 
     acb->remaining_sectors -= acb->cur_nr_sectors;
     acb->sector_num += acb->cur_nr_sectors;
@@ -660,8 +612,7 @@  static void qcow2_aio_write_cb(void *opaque, int ret)
 
     if (acb->remaining_sectors == 0) {
         /* request completed */
-        ret = 0;
-        goto done;
+        return 0;
     }
 
     index_in_cluster = acb->sector_num & (s->cluster_sectors - 1);
@@ -673,18 +624,10 @@  static void qcow2_aio_write_cb(void *opaque, int ret)
     ret = qcow2_alloc_cluster_offset(bs, acb->sector_num << 9,
         index_in_cluster, n_end, &acb->cur_nr_sectors, &acb->l2meta);
     if (ret < 0) {
-        goto done;
+        return ret;
     }
 
     acb->cluster_offset = acb->l2meta.cluster_offset;
-
-    /* Need to wait for another request? If so, we are done for now. */
-    if (acb->l2meta.nb_clusters == 0 && acb->l2meta.depends_on != NULL) {
-        QLIST_INSERT_HEAD(&acb->l2meta.depends_on->dependent_requests,
-            acb, next_depend);
-        return;
-    }
-
     assert((acb->cluster_offset & 511) == 0);
 
     qemu_iovec_reset(&acb->hd_qiov);
@@ -709,51 +652,40 @@  static void qcow2_aio_write_cb(void *opaque, int ret)
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
-    acb->hd_aiocb = bdrv_aio_writev(bs->file,
-                                    (acb->cluster_offset >> 9) + index_in_cluster,
-                                    &acb->hd_qiov, acb->cur_nr_sectors,
-                                    qcow2_aio_write_cb, acb);
-    if (acb->hd_aiocb == NULL) {
-        ret = -EIO;
-        goto fail;
+    qemu_co_mutex_unlock(&s->lock);
+    ret = bdrv_co_writev(bs->file,
+                         (acb->cluster_offset >> 9) + index_in_cluster,
+                         acb->cur_nr_sectors, &acb->hd_qiov);
+    qemu_co_mutex_lock(&s->lock);
+    if (ret < 0) {
+        return ret;
     }
 
-    return;
-
-fail:
-    if (acb->l2meta.nb_clusters != 0) {
-        QLIST_REMOVE(&acb->l2meta, next_in_flight);
-    }
-done:
-    acb->common.cb(acb->common.opaque, ret);
-    qemu_iovec_destroy(&acb->hd_qiov);
-    qemu_aio_release(acb);
+    return 1;
 }
 
-static BlockDriverAIOCB *qcow2_aio_writev(BlockDriverState *bs,
-                                          int64_t sector_num,
-                                          QEMUIOVector *qiov, int nb_sectors,
-                                          BlockDriverCompletionFunc *cb,
-                                          void *opaque)
+static int qcow2_co_writev(BlockDriverState *bs,
+                           int64_t sector_num,
+                           int nb_sectors,
+                           QEMUIOVector *qiov)
 {
     BDRVQcowState *s = bs->opaque;
     QCowAIOCB *acb;
     int ret;
 
+    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, NULL, NULL, 1);
     s->cluster_cache_offset = -1; /* disable compressed cache */
 
-    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
-    if (!acb)
-        return NULL;
+    qemu_co_mutex_lock(&s->lock);
+    do {
+        ret = qcow2_aio_write_cb(acb);
+    } while (ret > 0);
+    qemu_co_mutex_unlock(&s->lock);
 
-    ret = qcow2_schedule_bh(qcow2_aio_rw_bh, acb);
-    if (ret < 0) {
-        qemu_iovec_destroy(&acb->hd_qiov);
-        qemu_aio_release(acb);
-        return NULL;
-    }
+    qemu_iovec_destroy(&acb->hd_qiov);
+    qemu_aio_release(acb);
 
-    return &acb->common;
+    return ret;
 }
 
 static void qcow2_close(BlockDriverState *bs)
@@ -881,7 +813,7 @@  static int preallocate(BlockDriverState *bs)
 
     nb_sectors = bdrv_getlength(bs) >> 9;
     offset = 0;
-    QLIST_INIT(&meta.dependent_requests);
+    qemu_co_queue_init(&meta.dependent_requests);
     meta.cluster_offset = 0;
 
     while (nb_sectors) {
@@ -899,7 +831,7 @@  static int preallocate(BlockDriverState *bs)
 
         /* There are no dependent requests, but we need to remove our request
          * from the list of in-flight requests */
-        run_dependent_requests(&meta);
+        run_dependent_requests(bs->opaque, &meta);
 
         /* TODO Preallocate data if requested */
 
@@ -1387,8 +1319,8 @@  static BlockDriver bdrv_qcow2 = {
     .bdrv_set_key       = qcow2_set_key,
     .bdrv_make_empty    = qcow2_make_empty,
 
-    .bdrv_aio_readv     = qcow2_aio_readv,
-    .bdrv_aio_writev    = qcow2_aio_writev,
+    .bdrv_co_readv      = qcow2_co_readv,
+    .bdrv_co_writev     = qcow2_co_writev,
     .bdrv_aio_flush     = qcow2_aio_flush,
 
     .bdrv_discard           = qcow2_discard,
diff --git a/block/qcow2.h b/block/qcow2.h
index 6a0a21b..de23abe 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -26,6 +26,7 @@ 
 #define BLOCK_QCOW2_H
 
 #include "aes.h"
+#include "qemu-coroutine.h"
 
 //#define DEBUG_ALLOC
 //#define DEBUG_ALLOC2
@@ -114,6 +115,8 @@  typedef struct BDRVQcowState {
     int64_t free_cluster_index;
     int64_t free_byte_offset;
 
+    CoMutex lock;
+
     uint32_t crypt_method; /* current crypt method, 0 if no key yet */
     uint32_t crypt_method_header;
     AES_KEY aes_encrypt_key;
@@ -146,7 +149,7 @@  typedef struct QCowL2Meta
     int nb_available;
     int nb_clusters;
     struct QCowL2Meta *depends_on;
-    QLIST_HEAD(QCowAioDependencies, QCowAIOCB) dependent_requests;
+    CoQueue dependent_requests;
 
     QLIST_ENTRY(QCowL2Meta) next_in_flight;
 } QCowL2Meta;