diff mbox

[RFC,02/12] block: Introduce bdrv_lock and bdrv_unlock API

Message ID 1432896805-23867-3-git-send-email-famz@redhat.com
State New
Headers show

Commit Message

Fam Zheng May 29, 2015, 10:53 a.m. UTC
For various purposes, BDS users call bdrv_drain or bdrv_drain_all to make sure
there are no pending requests duringA a series of operations on the BDS. But in
the middle of operations, the caller may 1) yield from a coroutine (mirror_run);
2) defer the next part of work to a BH (mirror_run); 3) call nested aio_poll
(qmp_transaction); etc..

This lock/unlock API is introduced to help assure above complications won't
spoil the purpose of the bdrv_drain(): bdrv_lock should help quiesce other
readers and writers in the beginning of such operations, and bdrv_unlock should
resume the blocked requests.

A notifier list is added to allow devices to cooperate with the lock and pause
themselves, for example, by not processing more requests on the NBD export.

Signed-off-by: Fam Zheng <famz@redhat.com>
---
 block.c                   | 10 +++++++
 block/io.c                | 69 +++++++++++++++++++++++++++++++++++++++++++++++
 include/block/block.h     | 39 +++++++++++++++++++++++++++
 include/block/block_int.h |  5 ++++
 4 files changed, 123 insertions(+)

Comments

Eric Blake May 29, 2015, 4:57 p.m. UTC | #1
On 05/29/2015 04:53 AM, Fam Zheng wrote:
> For various purposes, BDS users call bdrv_drain or bdrv_drain_all to make sure
> there are no pending requests duringA a series of operations on the BDS. But in

s/duringA/during/

> the middle of operations, the caller may 1) yield from a coroutine (mirror_run);
> 2) defer the next part of work to a BH (mirror_run); 3) call nested aio_poll
> (qmp_transaction); etc..
> 
> This lock/unlock API is introduced to help assure above complications won't
> spoil the purpose of the bdrv_drain(): bdrv_lock should help quiesce other
> readers and writers in the beginning of such operations, and bdrv_unlock should
> resume the blocked requests.
> 
> A notifier list is added to allow devices to cooperate with the lock and pause
> themselves, for example, by not processing more requests on the NBD export.
> 
> Signed-off-by: Fam Zheng <famz@redhat.com>
> ---
>  block.c                   | 10 +++++++
>  block/io.c                | 69 +++++++++++++++++++++++++++++++++++++++++++++++
>  include/block/block.h     | 39 +++++++++++++++++++++++++++
>  include/block/block_int.h |  5 ++++
>  4 files changed, 123 insertions(+)
> 

I'll leave the technical review on this series to those more familiar
with coroutines.
diff mbox

Patch

diff --git a/block.c b/block.c
index e9f31b7..abda2f7 100644
--- a/block.c
+++ b/block.c
@@ -252,8 +252,10 @@  BlockDriverState *bdrv_new(void)
     bdrv_iostatus_disable(bs);
     notifier_list_init(&bs->close_notifiers);
     notifier_with_return_list_init(&bs->before_write_notifiers);
+    notifier_list_init(&bs->lock_notifiers);
     qemu_co_queue_init(&bs->throttled_reqs[0]);
     qemu_co_queue_init(&bs->throttled_reqs[1]);
+    qemu_co_queue_init(&bs->lock_queue);
     bs->refcnt = 1;
     bs->aio_context = qemu_get_aio_context();
 
@@ -1716,6 +1718,7 @@  void bdrv_close(BlockDriverState *bs)
 {
     BdrvAioNotifier *ban, *ban_next;
 
+    assert(!bdrv_is_locked(bs));
     if (bs->job) {
         block_job_cancel_sync(bs->job);
     }
@@ -1846,12 +1849,19 @@  static void bdrv_move_feature_fields(BlockDriverState *bs_dest,
     /* job */
     bs_dest->job                = bs_src->job;
 
+    /* lock */
+    bs_dest->lock_owner         = bs_src->lock_owner;
+    bs_dest->lock_level         = bs_src->lock_level;
+    bs_dest->lock_queue         = bs_src->lock_queue;
+    bs_dest->lock_notifiers     = bs_src->lock_notifiers;
+
     /* keep the same entry in bdrv_states */
     bs_dest->device_list = bs_src->device_list;
     bs_dest->blk = bs_src->blk;
 
     memcpy(bs_dest->op_blockers, bs_src->op_blockers,
            sizeof(bs_dest->op_blockers));
+
 }
 
 /*
diff --git a/block/io.c b/block/io.c
index e394d92..9aa4b71 100644
--- a/block/io.c
+++ b/block/io.c
@@ -2601,3 +2601,72 @@  void bdrv_flush_io_queue(BlockDriverState *bs)
         bdrv_flush_io_queue(bs->file);
     }
 }
+
+static void bdrv_lock_notify(BlockDriverState *bs, bool locking)
+{
+    BdrvLockEvent event = (BdrvLockEvent) {
+        .bs = bs,
+        .locking = locking,
+    };
+    notifier_list_notify(&bs->lock_notifiers, &event);
+}
+
+void bdrv_lock(BlockDriverState *bs)
+{
+    Coroutine *self = qemu_coroutine_self();
+    bool notify = true;
+
+    /*
+     * XXX: eventually we only allow coroutine callers. For now, let's allow
+     * the exceptional non-coroutine callers to serialize by themselves, e.g.
+     * with BQL.
+     */
+    assert(qemu_in_coroutine()
+           || self == bs->lock_owner || bs->lock_level == 0);
+
+    if (bs->lock_level) {
+        if (self == bs->lock_owner) {
+            bs->lock_level++;
+            return;
+        } else {
+            qemu_co_queue_wait(&bs->lock_queue);
+            notify = false;
+        }
+    }
+    assert(bs->lock_level == 0);
+
+    if (notify) {
+        bdrv_lock_notify(bs, true);
+    }
+    bs->lock_level++;
+    bs->lock_owner = self;
+
+    bdrv_drain(bs);
+}
+
+void bdrv_unlock(BlockDriverState *bs)
+{
+    assert(bs->lock_level > 0);
+    if (!--bs->lock_level) {
+        if (!qemu_co_queue_empty(&bs->lock_queue)) {
+            /*
+             * XXX: do we need a BH to run lock_queue?
+             * If so, be careful of bdrv_set_aio_context().
+             **/
+            qemu_co_queue_next(&bs->lock_queue);
+        } else {
+            bdrv_lock_notify(bs, false);
+        }
+    }
+}
+
+bool bdrv_is_locked(BlockDriverState *bs)
+{
+    assert((bs->lock_level == 0) == qemu_co_queue_empty(&bs->lock_queue));
+    return !!bs->lock_level;
+}
+
+void bdrv_add_lock_unlock_notifier(BlockDriverState *bs, Notifier *notifier)
+{
+    notifier_list_add(&bs->lock_notifiers, notifier);
+}
diff --git a/include/block/block.h b/include/block/block.h
index c1c963e..068b01e 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -591,6 +591,45 @@  void bdrv_io_plug(BlockDriverState *bs);
 void bdrv_io_unplug(BlockDriverState *bs);
 void bdrv_flush_io_queue(BlockDriverState *bs);
 
+/**
+ * bdrv_lock:
+ *
+ * Begin a temporary exclusive accessing by locking the BDS.
+ *
+ * This lock is recursive: if bs is unlocked, the caller context will acquire
+ * the lock; otherwise if the caller is in the same coroutine context that
+ * already holds the lock, it will only add a recurse level; otherwise, this
+ * function will block until the lock is released by the other owner.
+ */
+void bdrv_lock(BlockDriverState *bs);
+
+/**
+ * bdrv_lock:
+ *
+ * Reduce the recurse level, or if it's the outermost unlock, release the lock.
+ */
+void bdrv_unlock(BlockDriverState *bs);
+
+/**
+ * bdrv_is_locked:
+ *
+ * Return if the bs is locked.
+ */
+bool bdrv_is_locked(BlockDriverState *bs);
+
+typedef struct {
+    BlockDriverState *bs;
+    bool locking;
+} BdrvLockEvent;
+
+/**
+ * bdrv_add_lock_unlock_notifier:
+ *
+ * Add a notifier that will get notified when bs is locked or unlocked, with a
+ * BdrvLockEvent data.
+ */
+void bdrv_add_lock_unlock_notifier(BlockDriverState *bs, Notifier *notifier);
+
 BlockAcctStats *bdrv_get_stats(BlockDriverState *bs);
 
 #endif
diff --git a/include/block/block_int.h b/include/block/block_int.h
index f004378..a742fea 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -433,6 +433,11 @@  struct BlockDriverState {
     /* threshold limit for writes, in bytes. "High water mark". */
     uint64_t write_threshold_offset;
     NotifierWithReturn write_threshold_notifier;
+
+    Coroutine    *lock_owner;
+    int          lock_level;
+    CoQueue      lock_queue;
+    NotifierList lock_notifiers;
 };