@@ -252,8 +252,10 @@ BlockDriverState *bdrv_new(void)
bdrv_iostatus_disable(bs);
notifier_list_init(&bs->close_notifiers);
notifier_with_return_list_init(&bs->before_write_notifiers);
+ notifier_list_init(&bs->lock_notifiers);
qemu_co_queue_init(&bs->throttled_reqs[0]);
qemu_co_queue_init(&bs->throttled_reqs[1]);
+ qemu_co_queue_init(&bs->lock_queue);
bs->refcnt = 1;
bs->aio_context = qemu_get_aio_context();
@@ -1716,6 +1718,7 @@ void bdrv_close(BlockDriverState *bs)
{
BdrvAioNotifier *ban, *ban_next;
+ assert(!bdrv_is_locked(bs));
if (bs->job) {
block_job_cancel_sync(bs->job);
}
@@ -1846,12 +1849,19 @@ static void bdrv_move_feature_fields(BlockDriverState *bs_dest,
/* job */
bs_dest->job = bs_src->job;
+ /* lock */
+ bs_dest->lock_owner = bs_src->lock_owner;
+ bs_dest->lock_level = bs_src->lock_level;
+ bs_dest->lock_queue = bs_src->lock_queue;
+ bs_dest->lock_notifiers = bs_src->lock_notifiers;
+
/* keep the same entry in bdrv_states */
bs_dest->device_list = bs_src->device_list;
bs_dest->blk = bs_src->blk;
memcpy(bs_dest->op_blockers, bs_src->op_blockers,
sizeof(bs_dest->op_blockers));
+
}
/*
@@ -2601,3 +2601,72 @@ void bdrv_flush_io_queue(BlockDriverState *bs)
bdrv_flush_io_queue(bs->file);
}
}
+
+static void bdrv_lock_notify(BlockDriverState *bs, bool locking)
+{
+ BdrvLockEvent event = (BdrvLockEvent) {
+ .bs = bs,
+ .locking = locking,
+ };
+ notifier_list_notify(&bs->lock_notifiers, &event);
+}
+
+void bdrv_lock(BlockDriverState *bs)
+{
+ Coroutine *self = qemu_coroutine_self();
+ bool notify = true;
+
+ /*
+ * XXX: eventually we only allow coroutine callers. For now, let's allow
+ * the exceptional non-coroutine callers to serialize by themselves, e.g.
+ * with BQL.
+ */
+ assert(qemu_in_coroutine()
+ || self == bs->lock_owner || bs->lock_level == 0);
+
+ if (bs->lock_level) {
+ if (self == bs->lock_owner) {
+ bs->lock_level++;
+ return;
+ } else {
+ qemu_co_queue_wait(&bs->lock_queue);
+ notify = false;
+ }
+ }
+ assert(bs->lock_level == 0);
+
+ if (notify) {
+ bdrv_lock_notify(bs, true);
+ }
+ bs->lock_level++;
+ bs->lock_owner = self;
+
+ bdrv_drain(bs);
+}
+
+void bdrv_unlock(BlockDriverState *bs)
+{
+ assert(bs->lock_level > 0);
+ if (!--bs->lock_level) {
+ if (!qemu_co_queue_empty(&bs->lock_queue)) {
+ /*
+ * XXX: do we need a BH to run lock_queue?
+ * If so, be careful of bdrv_set_aio_context().
+ **/
+ qemu_co_queue_next(&bs->lock_queue);
+ } else {
+ bdrv_lock_notify(bs, false);
+ }
+ }
+}
+
+bool bdrv_is_locked(BlockDriverState *bs)
+{
+ assert((bs->lock_level == 0) == qemu_co_queue_empty(&bs->lock_queue));
+ return !!bs->lock_level;
+}
+
+void bdrv_add_lock_unlock_notifier(BlockDriverState *bs, Notifier *notifier)
+{
+ notifier_list_add(&bs->lock_notifiers, notifier);
+}
@@ -591,6 +591,45 @@ void bdrv_io_plug(BlockDriverState *bs);
void bdrv_io_unplug(BlockDriverState *bs);
void bdrv_flush_io_queue(BlockDriverState *bs);
+/**
+ * bdrv_lock:
+ *
+ * Begin a temporary exclusive accessing by locking the BDS.
+ *
+ * This lock is recursive: if bs is unlocked, the caller context will acquire
+ * the lock; otherwise if the caller is in the same coroutine context that
+ * already holds the lock, it will only add a recurse level; otherwise, this
+ * function will block until the lock is released by the other owner.
+ */
+void bdrv_lock(BlockDriverState *bs);
+
+/**
+ * bdrv_lock:
+ *
+ * Reduce the recurse level, or if it's the outermost unlock, release the lock.
+ */
+void bdrv_unlock(BlockDriverState *bs);
+
+/**
+ * bdrv_is_locked:
+ *
+ * Return if the bs is locked.
+ */
+bool bdrv_is_locked(BlockDriverState *bs);
+
+typedef struct {
+ BlockDriverState *bs;
+ bool locking;
+} BdrvLockEvent;
+
+/**
+ * bdrv_add_lock_unlock_notifier:
+ *
+ * Add a notifier that will get notified when bs is locked or unlocked, with a
+ * BdrvLockEvent data.
+ */
+void bdrv_add_lock_unlock_notifier(BlockDriverState *bs, Notifier *notifier);
+
BlockAcctStats *bdrv_get_stats(BlockDriverState *bs);
#endif
@@ -433,6 +433,11 @@ struct BlockDriverState {
/* threshold limit for writes, in bytes. "High water mark". */
uint64_t write_threshold_offset;
NotifierWithReturn write_threshold_notifier;
+
+ Coroutine *lock_owner;
+ int lock_level;
+ CoQueue lock_queue;
+ NotifierList lock_notifiers;
};
For various purposes, BDS users call bdrv_drain or bdrv_drain_all to make sure there are no pending requests duringA a series of operations on the BDS. But in the middle of operations, the caller may 1) yield from a coroutine (mirror_run); 2) defer the next part of work to a BH (mirror_run); 3) call nested aio_poll (qmp_transaction); etc.. This lock/unlock API is introduced to help assure above complications won't spoil the purpose of the bdrv_drain(): bdrv_lock should help quiesce other readers and writers in the beginning of such operations, and bdrv_unlock should resume the blocked requests. A notifier list is added to allow devices to cooperate with the lock and pause themselves, for example, by not processing more requests on the NBD export. Signed-off-by: Fam Zheng <famz@redhat.com> --- block.c | 10 +++++++ block/io.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++ include/block/block.h | 39 +++++++++++++++++++++++++++ include/block/block_int.h | 5 ++++ 4 files changed, 123 insertions(+)