diff mbox series

[v2,5/6] coroutine/rwlock: Wake writers in preference to readers

Message ID 20210309144015.557477-6-david.edmondson@oracle.com
State New
Headers show
Series coroutine rwlock downgrade fix, minor VDI changes | expand

Commit Message

David Edmondson March 9, 2021, 2:40 p.m. UTC
A feature of the current rwlock is that if multiple coroutines hold a
reader lock, all must be runnable. The unlock implementation relies on
this, choosing to wake a single coroutine when the final read lock
holder exits the critical section, assuming that it will wake a
coroutine attempting to acquire a write lock.

The downgrade implementation violates this assumption by creating a
read lock owning coroutine that is exclusively runnable - any other
coroutines that are waiting to acquire a read lock are *not* made
runnable when the write lock holder converts its ownership to read
only.

As a result of this, a coroutine that downgrades a write lock can
later cause unlock to wake a coroutine that is attempting to acquire a
read lock rather than one aiming for a write lock, should the
coroutines be so ordered in the wait queue.

If the wait queue contains both read hopefuls and write hopefuls, any
read hopeful coroutine that is woken will immediately go back onto the
wait queue when it attempts to acquire the rwlock, due to the pending
write acquisition. At this point there are no coroutines holding
either read or write locks and no way for the coroutines in the queue
to be made runnable. A hang ensues.

Address this by using separate queues for coroutines attempting to
acquire read and write ownership of the rwlock. When unlocking, prefer
to make runnable a coroutine that is waiting for a write lock, but if
none is available, make all coroutines waiting to take a read lock
runnable.

Signed-off-by: David Edmondson <david.edmondson@oracle.com>
---
 include/qemu/coroutine.h   |  8 +++++---
 util/qemu-coroutine-lock.c | 24 +++++++++++++++---------
 2 files changed, 20 insertions(+), 12 deletions(-)
diff mbox series

Patch

diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 84eab6e3bf..3dfbf57faf 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -241,7 +241,8 @@  typedef struct CoRwlock {
     int pending_writer;
     int reader;
     CoMutex mutex;
-    CoQueue queue;
+    CoQueue rqueue;
+    CoQueue wqueue;
 } CoRwlock;
 
 /**
@@ -283,8 +284,9 @@  void qemu_co_rwlock_downgrade(CoRwlock *lock);
 void qemu_co_rwlock_wrlock(CoRwlock *lock);
 
 /**
- * Unlocks the read/write lock and schedules the next coroutine that was
- * waiting for this lock to be run.
+ * Unlocks the read/write lock and schedules the next coroutine that
+ * was waiting for this lock to be run, preferring to wake one
+ * attempting to take a write lock over those taking a read lock.
  */
 void qemu_co_rwlock_unlock(CoRwlock *lock);
 
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index eb73cf11dc..c05c143142 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -330,7 +330,8 @@  void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 void qemu_co_rwlock_init(CoRwlock *lock)
 {
     memset(lock, 0, sizeof(*lock));
-    qemu_co_queue_init(&lock->queue);
+    qemu_co_queue_init(&lock->rqueue);
+    qemu_co_queue_init(&lock->wqueue);
     qemu_co_mutex_init(&lock->mutex);
 }
 
@@ -341,7 +342,7 @@  void qemu_co_rwlock_rdlock(CoRwlock *lock)
     qemu_co_mutex_lock(&lock->mutex);
     /* For fairness, wait if a writer is in line.  */
     while (lock->pending_writer) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+        qemu_co_queue_wait(&lock->rqueue, &lock->mutex);
     }
     lock->reader++;
     qemu_co_mutex_unlock(&lock->mutex);
@@ -356,17 +357,22 @@  void qemu_co_rwlock_unlock(CoRwlock *lock)
 
     assert(qemu_in_coroutine());
     if (!lock->reader) {
-        /* The critical section started in qemu_co_rwlock_wrlock.  */
-        qemu_co_queue_restart_all(&lock->queue);
+        /* The critical section started in qemu_co_rwlock_wrlock or
+         * qemu_co_rwlock_upgrade.
+         */
+        qemu_co_queue_restart_all(&lock->wqueue);
+        qemu_co_queue_restart_all(&lock->rqueue);
     } else {
         self->locks_held--;
 
         qemu_co_mutex_lock(&lock->mutex);
         lock->reader--;
         assert(lock->reader >= 0);
-        /* Wakeup only one waiting writer */
-        if (!lock->reader) {
-            qemu_co_queue_next(&lock->queue);
+        /* If there are no remaining readers wake one waiting writer
+         * or all waiting readers.
+         */
+        if (!lock->reader && !qemu_co_queue_next(&lock->wqueue)) {
+            qemu_co_queue_restart_all(&lock->rqueue);
         }
     }
     qemu_co_mutex_unlock(&lock->mutex);
@@ -392,7 +398,7 @@  void qemu_co_rwlock_wrlock(CoRwlock *lock)
     qemu_co_mutex_lock(&lock->mutex);
     lock->pending_writer++;
     while (lock->reader) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+        qemu_co_queue_wait(&lock->wqueue, &lock->mutex);
     }
     lock->pending_writer--;
 
@@ -411,7 +417,7 @@  void qemu_co_rwlock_upgrade(CoRwlock *lock)
     lock->reader--;
     lock->pending_writer++;
     while (lock->reader) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+        qemu_co_queue_wait(&lock->wqueue, &lock->mutex);
     }
     lock->pending_writer--;