diff mbox

[RFC,09/16] qcow2: Move COW and L2 update into own coroutine

Message ID 1347968442-8860-10-git-send-email-kwolf@redhat.com
State New
Headers show

Commit Message

Kevin Wolf Sept. 18, 2012, 11:40 a.m. UTC
This creates a separate coroutine for processing the COW and the L2
table update of allocating requests. The request itself can then
complete while the second part is still being processed.

We need a qemu_aio_flush() hook in order to ensure that these
coroutines for the second part aren't still running after bdrv_drain_all
(e.g. when the VM is stopped).

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 block.c       |    5 +++
 block/qcow2.c |   89 +++++++++++++++++++++++++++++++++++++++++++++++++--------
 block/qcow2.h |    8 +++++
 block_int.h   |    3 ++
 4 files changed, 93 insertions(+), 12 deletions(-)

Comments

Paolo Bonzini Sept. 18, 2012, 2:24 p.m. UTC | #1
Il 18/09/2012 13:40, Kevin Wolf ha scritto:
> +            qemu_co_mutex_unlock(&s->lock);
> +            qemu_co_rwlock_rdlock(&s->l2meta_flush);

Should this lock be taken in process_l2meta?  It's a bit easier to follow.

Paolo

> +            l2meta->is_written = true;
> +            co = qemu_coroutine_create(process_l2meta);
> +            qemu_coroutine_enter(co, &p);
Kevin Wolf Sept. 18, 2012, 2:44 p.m. UTC | #2
Am 18.09.2012 16:24, schrieb Paolo Bonzini:
> Il 18/09/2012 13:40, Kevin Wolf ha scritto:
>> +            qemu_co_mutex_unlock(&s->lock);
>> +            qemu_co_rwlock_rdlock(&s->l2meta_flush);
> 
> Should this lock be taken in process_l2meta?  It's a bit easier to follow.

I'm pretty sure there was a reason, but it isn't obvious any more. I
guess I should have put a comment there... Maybe it doesn't exist any
more, or maybe it's not that obvious.

The difference would be that while waiting for the lock, the original
write request could complete instead of waiting as well, and that the
lock is potentially taken only in a BH instead of immediately.

What happens if bdrv_aio_flush() and bdrv_aio_writev() are both in
flight? If the flush runs its stop_l2meta() after the write request has
signalled completion, but before the COW coroutine has started, it gets
the lock even though a COW must still be processed. I believe we could
then return a successful flush when the metadata isn't really on disk yet.

So if you agree, I think we need to leave it where it is.

Kevin
Paolo Bonzini Sept. 18, 2012, 2:59 p.m. UTC | #3
Il 18/09/2012 16:44, Kevin Wolf ha scritto:
>>> +            qemu_co_mutex_unlock(&s->lock);
>>> >> +            qemu_co_rwlock_rdlock(&s->l2meta_flush);
>> > 
>> > Should this lock be taken in process_l2meta?  It's a bit easier to follow.
> I'm pretty sure there was a reason, but it isn't obvious any more. I
> guess I should have put a comment there... Maybe it doesn't exist any
> more, or maybe it's not that obvious.
> 
> The difference would be that while waiting for the lock, the original
> write request could complete instead of waiting as well, and that the
> lock is potentially taken only in a BH instead of immediately.
> 
> What happens if bdrv_aio_flush() and bdrv_aio_writev() are both in
> flight? If the flush runs its stop_l2meta() after the write request has
> signalled completion, but before the COW coroutine has started, it gets
> the lock even though a COW must still be processed. I believe we could
> then return a successful flush when the metadata isn't really on disk yet.

Then you need two comments, because you also need to document that
process_l2meta expects the read lock to be taken.

> So if you agree, I think we need to leave it where it is.

We do, but there may be other alternatives:

1) Perhaps you can take the rdlock again in process_l2meta, and also add
an unlock here just after qemu_coroutine_enter?

2) Perhaps you can replace the rwlock with an explicit CoQueue, for
which an rwlock is just a wrapper.  In thread terms, that would be a
condition variable, which I find easier to reason on than an rwlock.  In
this case, the stop_l2meta would see that there is a pending write, and
yield.  The bottom half then can pick up the work.

Do you actually need the writers side of the lock, since you're already
taking s->lock immediately after calling stop_l2meta?  i.e. can
qcow2_{zero,discard}_clusters release the s->lock, or not?  If not,
you're just using the write side to kick all readers, and that would be
a point in favor of (2).

Paolo
diff mbox

Patch

diff --git a/block.c b/block.c
index e78039b..b852f3e 100644
--- a/block.c
+++ b/block.c
@@ -948,7 +948,12 @@  void bdrv_drain_all(void)
                 qemu_co_queue_restart_all(&bs->throttled_reqs);
                 busy = true;
             }
+
+            if (bs->drv && bs->drv->bdrv_drain) {
+                busy |= bs->drv->bdrv_drain(bs);
+            }
         }
+
     } while (busy);
 
     /* If requests are still pending there is a bug somewhere */
diff --git a/block/qcow2.c b/block/qcow2.c
index 2e32136..f9881d0 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -482,6 +482,7 @@  static int qcow2_open(BlockDriverState *bs, int flags)
 
     /* Initialise locks */
     qemu_co_mutex_init(&s->lock);
+    qemu_co_rwlock_init(&s->l2meta_flush);
 
     /* Repair image if dirty */
     if (!(flags & BDRV_O_CHECK) && !bs->read_only &&
@@ -751,6 +752,50 @@  static void run_dependent_requests(BDRVQcowState *s, QCowL2Meta *m)
     }
 }
 
+typedef struct ProcessL2Meta {
+    BlockDriverState *bs;
+    QCowL2Meta *m;
+} ProcessL2Meta;
+
+static void coroutine_fn process_l2meta(void *opaque)
+{
+    ProcessL2Meta *p = opaque;
+    QCowL2Meta *m = p->m;
+    BlockDriverState *bs = p->bs;
+    BDRVQcowState *s = bs->opaque;
+    int ret;
+
+    qemu_co_mutex_lock(&s->lock);
+
+    ret = qcow2_alloc_cluster_link_l2(bs, m);
+    if (ret < 0) {
+        /* FIXME */
+    }
+
+    run_dependent_requests(s, m);
+    g_free(m);
+
+    qemu_co_mutex_unlock(&s->lock);
+    qemu_co_rwlock_unlock(&s->l2meta_flush);
+}
+
+static inline coroutine_fn void stop_l2meta(BDRVQcowState *s)
+{
+    qemu_co_rwlock_wrlock(&s->l2meta_flush);
+}
+
+static inline coroutine_fn void resume_l2meta(BDRVQcowState *s)
+{
+    qemu_co_rwlock_unlock(&s->l2meta_flush);
+}
+
+static bool qcow2_drain(BlockDriverState *bs)
+{
+    BDRVQcowState *s = bs->opaque;
+
+    return !QLIST_EMPTY(&s->cluster_allocs);
+}
+
 static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
                            int64_t sector_num,
                            int remaining_sectors,
@@ -831,16 +876,21 @@  static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
         }
 
         if (l2meta != NULL) {
-            l2meta->is_written = true;
+            Coroutine *co;
+            ProcessL2Meta p = {
+                .bs = bs,
+                .m  = l2meta,
+            };
 
-            ret = qcow2_alloc_cluster_link_l2(bs, l2meta);
-            if (ret < 0) {
-                goto fail;
-            }
+            qemu_co_mutex_unlock(&s->lock);
+            qemu_co_rwlock_rdlock(&s->l2meta_flush);
+
+            l2meta->is_written = true;
+            co = qemu_coroutine_create(process_l2meta);
+            qemu_coroutine_enter(co, &p);
 
-            run_dependent_requests(s, l2meta);
-            g_free(l2meta);
             l2meta = NULL;
+            qemu_co_mutex_lock(&s->lock);
         }
 
         remaining_sectors -= cur_nr_sectors;
@@ -868,6 +918,11 @@  fail:
 static void qcow2_close(BlockDriverState *bs)
 {
     BDRVQcowState *s = bs->opaque;
+
+    while (qcow2_drain(bs)) {
+        qemu_aio_wait();
+    }
+
     g_free(s->l1_table);
 
     qcow2_cache_flush(bs, s->l2_table_cache);
@@ -1405,10 +1460,12 @@  static coroutine_fn int qcow2_co_write_zeroes(BlockDriverState *bs,
     }
 
     /* Whatever is left can use real zero clusters */
+    stop_l2meta(s);
     qemu_co_mutex_lock(&s->lock);
     ret = qcow2_zero_clusters(bs, sector_num << BDRV_SECTOR_BITS,
         nb_sectors);
     qemu_co_mutex_unlock(&s->lock);
+    resume_l2meta(s);
 
     return ret;
 }
@@ -1419,10 +1476,13 @@  static coroutine_fn int qcow2_co_discard(BlockDriverState *bs,
     int ret;
     BDRVQcowState *s = bs->opaque;
 
+    stop_l2meta(s);
     qemu_co_mutex_lock(&s->lock);
     ret = qcow2_discard_clusters(bs, sector_num << BDRV_SECTOR_BITS,
         nb_sectors);
     qemu_co_mutex_unlock(&s->lock);
+    resume_l2meta(s);
+
     return ret;
 }
 
@@ -1548,23 +1608,27 @@  static coroutine_fn int qcow2_co_flush_to_os(BlockDriverState *bs)
     BDRVQcowState *s = bs->opaque;
     int ret;
 
+    stop_l2meta(s);
     qemu_co_mutex_lock(&s->lock);
+
     ret = qcow2_cache_flush(bs, s->l2_table_cache);
     if (ret < 0) {
-        qemu_co_mutex_unlock(&s->lock);
-        return ret;
+        goto fail;
     }
 
     if (qcow2_need_accurate_refcounts(s)) {
         ret = qcow2_cache_flush(bs, s->refcount_block_cache);
         if (ret < 0) {
-            qemu_co_mutex_unlock(&s->lock);
-            return ret;
+            goto fail;
         }
     }
+
+    ret = 0;
+fail:
     qemu_co_mutex_unlock(&s->lock);
+    resume_l2meta(s);
 
-    return 0;
+    return ret;
 }
 
 static int64_t qcow2_vm_state_offset(BDRVQcowState *s)
@@ -1690,6 +1754,7 @@  static BlockDriver bdrv_qcow2 = {
     .bdrv_co_readv          = qcow2_co_readv,
     .bdrv_co_writev         = qcow2_co_writev,
     .bdrv_co_flush_to_os    = qcow2_co_flush_to_os,
+    .bdrv_drain             = qcow2_drain,
 
     .bdrv_co_write_zeroes   = qcow2_co_write_zeroes,
     .bdrv_co_discard        = qcow2_co_discard,
diff --git a/block/qcow2.h b/block/qcow2.h
index 504dbad..73dac17 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -162,6 +162,14 @@  typedef struct BDRVQcowState {
 
     CoMutex lock;
 
+    /*
+     * Only to be aquired while s->lock is not held.
+     *
+     * Readers: All l2meta coroutines that are in flight
+     * Writers: Anyone who requires l2meta to be flushed
+     */
+    CoRwlock l2meta_flush;
+
     uint32_t crypt_method; /* current crypt method, 0 if no key yet */
     uint32_t crypt_method_header;
     AES_KEY aes_encrypt_key;
diff --git a/block_int.h b/block_int.h
index 4452f6f..f259f52 100644
--- a/block_int.h
+++ b/block_int.h
@@ -198,6 +198,9 @@  struct BlockDriver {
      */
     int coroutine_fn (*bdrv_co_flush_to_os)(BlockDriverState *bs);
 
+    /** Returns true if the block device is still busy */
+    bool (*bdrv_drain)(BlockDriverState *bs);
+
     const char *protocol_name;
     int (*bdrv_truncate)(BlockDriverState *bs, int64_t offset);
     int64_t (*bdrv_getlength)(BlockDriverState *bs);