diff mbox series

[RFC,4/4] coroutine/rwlock: Wake writers in preference to readers

Message ID 20210309102157.365356-5-david.edmondson@oracle.com
State New
Headers show
Series coroutine rwlock downgrade fix, minor VDI changes | expand

Commit Message

David Edmondson March 9, 2021, 10:21 a.m. UTC
A feature of the current rwlock is that if multiple coroutines hold a
reader lock, all must be runnable. The unlock implementation relies on
this, choosing to wake a single coroutine when the final read lock
holder exits the critical section, assuming that it will wake a
coroutine attempting to acquire a write lock.

The downgrade implementation violates this assumption by creating a
read lock owning coroutine that is exclusively runnable - any other
coroutines that are waiting to acquire a read lock are *not* made
runnable when the write lock holder converts its ownership to read
only.

As a result of this, a coroutine that downgrades a write lock can
later cause unlock to wake a coroutine that is attempting to acquire a
read lock rather than one aiming for a write lock, should the
coroutines be so ordered in the wait queue.

If the wait queue contains both read hopefuls and write hopefuls, any
read hopeful coroutine that is woken will immediately go back onto the
wait queue when it attempts to acquire the rwlock, due to the pending
write acquisition. At this point there are no coroutines holding
either read or write locks and no way for the coroutines in the queue
to be made runnable. A hang ensues.

Address this by using separate queues for coroutines attempting to
acquire read and write ownership of the rwlock. When unlocking, prefer
to make runnable a coroutine that is waiting for a write lock, but if
none is available, make all coroutines waiting to take a read lock
runnable.

Signed-off-by: David Edmondson <david.edmondson@oracle.com>
---
 include/qemu/coroutine.h   |  8 +++++---
 util/qemu-coroutine-lock.c | 24 +++++++++++++++---------
 2 files changed, 20 insertions(+), 12 deletions(-)

Comments

Paolo Bonzini March 9, 2021, 10:59 a.m. UTC | #1
On 09/03/21 11:21, David Edmondson wrote:
> A feature of the current rwlock is that if multiple coroutines hold a
> reader lock, all must be runnable. The unlock implementation relies on
> this, choosing to wake a single coroutine when the final read lock
> holder exits the critical section, assuming that it will wake a
> coroutine attempting to acquire a write lock.
> 
> The downgrade implementation violates this assumption by creating a
> read lock owning coroutine that is exclusively runnable - any other
> coroutines that are waiting to acquire a read lock are *not* made
> runnable when the write lock holder converts its ownership to read
> only.
> 
> As a result of this, a coroutine that downgrades a write lock can
> later cause unlock to wake a coroutine that is attempting to acquire a
> read lock rather than one aiming for a write lock, should the
> coroutines be so ordered in the wait queue.
> 
> If the wait queue contains both read hopefuls and write hopefuls, any
> read hopeful coroutine that is woken will immediately go back onto the
> wait queue when it attempts to acquire the rwlock, due to the pending
> write acquisition. At this point there are no coroutines holding
> either read or write locks and no way for the coroutines in the queue
> to be made runnable. A hang ensues.
> 
> Address this by using separate queues for coroutines attempting to
> acquire read and write ownership of the rwlock. When unlocking, prefer
> to make runnable a coroutine that is waiting for a write lock, but if
> none is available, make all coroutines waiting to take a read lock
> runnable.
> 
> Signed-off-by: David Edmondson <david.edmondson@oracle.com>

This is certainly the simplest solution, I like it.  And if I understand 
it correctly, doing this instead in unlock:

         if (lock->reader || !qemu_co_queue_next(&lock->wqueue)) {
             qemu_co_queue_restart_all(&lock->rqueue);

would be incorrect because readers could starve writers.

Regarding this particular bug, do you think you could write a testcase too?

Thanks,

Paolo

> ---
>   include/qemu/coroutine.h   |  8 +++++---
>   util/qemu-coroutine-lock.c | 24 +++++++++++++++---------
>   2 files changed, 20 insertions(+), 12 deletions(-)
> 
> diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
> index 84eab6e3bf..3dfbf57faf 100644
> --- a/include/qemu/coroutine.h
> +++ b/include/qemu/coroutine.h
> @@ -241,7 +241,8 @@ typedef struct CoRwlock {
>       int pending_writer;
>       int reader;
>       CoMutex mutex;
> -    CoQueue queue;
> +    CoQueue rqueue;
> +    CoQueue wqueue;
>   } CoRwlock;
>   
>   /**
> @@ -283,8 +284,9 @@ void qemu_co_rwlock_downgrade(CoRwlock *lock);
>   void qemu_co_rwlock_wrlock(CoRwlock *lock);
>   
>   /**
> - * Unlocks the read/write lock and schedules the next coroutine that was
> - * waiting for this lock to be run.
> + * Unlocks the read/write lock and schedules the next coroutine that
> + * was waiting for this lock to be run, preferring to wake one
> + * attempting to take a write lock over those taking a read lock.
>    */
>   void qemu_co_rwlock_unlock(CoRwlock *lock);
>   
> diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
> index eb73cf11dc..c05c143142 100644
> --- a/util/qemu-coroutine-lock.c
> +++ b/util/qemu-coroutine-lock.c
> @@ -330,7 +330,8 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
>   void qemu_co_rwlock_init(CoRwlock *lock)
>   {
>       memset(lock, 0, sizeof(*lock));
> -    qemu_co_queue_init(&lock->queue);
> +    qemu_co_queue_init(&lock->rqueue);
> +    qemu_co_queue_init(&lock->wqueue);
>       qemu_co_mutex_init(&lock->mutex);
>   }
>   
> @@ -341,7 +342,7 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
>       qemu_co_mutex_lock(&lock->mutex);
>       /* For fairness, wait if a writer is in line.  */
>       while (lock->pending_writer) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +        qemu_co_queue_wait(&lock->rqueue, &lock->mutex);
>       }
>       lock->reader++;
>       qemu_co_mutex_unlock(&lock->mutex);
> @@ -356,17 +357,22 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
>   
>       assert(qemu_in_coroutine());
>       if (!lock->reader) {
> -        /* The critical section started in qemu_co_rwlock_wrlock.  */
> -        qemu_co_queue_restart_all(&lock->queue);
> +        /* The critical section started in qemu_co_rwlock_wrlock or
> +         * qemu_co_rwlock_upgrade.
> +         */
> +        qemu_co_queue_restart_all(&lock->wqueue);
> +        qemu_co_queue_restart_all(&lock->rqueue);
>       } else {
>           self->locks_held--;
>   
>           qemu_co_mutex_lock(&lock->mutex);
>           lock->reader--;
>           assert(lock->reader >= 0);
> -        /* Wakeup only one waiting writer */
> -        if (!lock->reader) {
> -            qemu_co_queue_next(&lock->queue);
> +        /* If there are no remaining readers wake one waiting writer
> +         * or all waiting readers.
> +         */
> +        if (!lock->reader && !qemu_co_queue_next(&lock->wqueue)) {
> +            qemu_co_queue_restart_all(&lock->rqueue);
>           }
>       }
>       qemu_co_mutex_unlock(&lock->mutex);
> @@ -392,7 +398,7 @@ void qemu_co_rwlock_wrlock(CoRwlock *lock)
>       qemu_co_mutex_lock(&lock->mutex);
>       lock->pending_writer++;
>       while (lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +        qemu_co_queue_wait(&lock->wqueue, &lock->mutex);
>       }
>       lock->pending_writer--;
>   
> @@ -411,7 +417,7 @@ void qemu_co_rwlock_upgrade(CoRwlock *lock)
>       lock->reader--;
>       lock->pending_writer++;
>       while (lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +        qemu_co_queue_wait(&lock->wqueue, &lock->mutex);
>       }
>       lock->pending_writer--;
>   
>
Paolo Bonzini March 9, 2021, 11:06 a.m. UTC | #2
On 09/03/21 11:21, David Edmondson wrote:
> -        /* The critical section started in qemu_co_rwlock_wrlock.  */
> -        qemu_co_queue_restart_all(&lock->queue);
> +        /* The critical section started in qemu_co_rwlock_wrlock or
> +         * qemu_co_rwlock_upgrade.
> +         */
> +        qemu_co_queue_restart_all(&lock->wqueue);
> +        qemu_co_queue_restart_all(&lock->rqueue);

Hmm, the devil is in the details---this is a thundering herd waiting to 
happen.  But fortunately this can be fixed while making the unlock 
primitive even simpler:

     if (lock->reader) {
          self->locks_held--;

          /* Read-side critical sections do not keep lock->mutex.  */
          qemu_co_mutex_lock(&lock->mutex);
          lock->reader--;
          assert(lock->reader >= 0);
      }

      /* If there are no remaining readers wake one waiting writer
       * or all waiting readers.
       */
      if (!lock->reader && !qemu_co_queue_next(&lock->wqueue)) {
          assert(!lock->pending_writer);
          qemu_co_queue_restart_all(&lock->rqueue);
      }

Thanks,

Paolo
David Edmondson March 9, 2021, 11:57 a.m. UTC | #3
On Tuesday, 2021-03-09 at 12:06:22 +01, Paolo Bonzini wrote:

> On 09/03/21 11:21, David Edmondson wrote:
>> -        /* The critical section started in qemu_co_rwlock_wrlock.  */
>> -        qemu_co_queue_restart_all(&lock->queue);
>> +        /* The critical section started in qemu_co_rwlock_wrlock or
>> +         * qemu_co_rwlock_upgrade.
>> +         */
>> +        qemu_co_queue_restart_all(&lock->wqueue);
>> +        qemu_co_queue_restart_all(&lock->rqueue);
>
> Hmm, the devil is in the details---this is a thundering herd waiting to 
> happen.  But fortunately this can be fixed while making the unlock 
> primitive even simpler:
>
>      if (lock->reader) {
>           self->locks_held--;
>
>           /* Read-side critical sections do not keep lock->mutex.  */
>           qemu_co_mutex_lock(&lock->mutex);
>           lock->reader--;
>           assert(lock->reader >= 0);
>       }
>
>       /* If there are no remaining readers wake one waiting writer
>        * or all waiting readers.
>        */
>       if (!lock->reader && !qemu_co_queue_next(&lock->wqueue)) {
>           assert(!lock->pending_writer);
>           qemu_co_queue_restart_all(&lock->rqueue);
>       }

That's a nice improvement, I agree. I'll roll it into another revision
and work on a test.

dme.
diff mbox series

Patch

diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 84eab6e3bf..3dfbf57faf 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -241,7 +241,8 @@  typedef struct CoRwlock {
     int pending_writer;
     int reader;
     CoMutex mutex;
-    CoQueue queue;
+    CoQueue rqueue;
+    CoQueue wqueue;
 } CoRwlock;
 
 /**
@@ -283,8 +284,9 @@  void qemu_co_rwlock_downgrade(CoRwlock *lock);
 void qemu_co_rwlock_wrlock(CoRwlock *lock);
 
 /**
- * Unlocks the read/write lock and schedules the next coroutine that was
- * waiting for this lock to be run.
+ * Unlocks the read/write lock and schedules the next coroutine that
+ * was waiting for this lock to be run, preferring to wake one
+ * attempting to take a write lock over those taking a read lock.
  */
 void qemu_co_rwlock_unlock(CoRwlock *lock);
 
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index eb73cf11dc..c05c143142 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -330,7 +330,8 @@  void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 void qemu_co_rwlock_init(CoRwlock *lock)
 {
     memset(lock, 0, sizeof(*lock));
-    qemu_co_queue_init(&lock->queue);
+    qemu_co_queue_init(&lock->rqueue);
+    qemu_co_queue_init(&lock->wqueue);
     qemu_co_mutex_init(&lock->mutex);
 }
 
@@ -341,7 +342,7 @@  void qemu_co_rwlock_rdlock(CoRwlock *lock)
     qemu_co_mutex_lock(&lock->mutex);
     /* For fairness, wait if a writer is in line.  */
     while (lock->pending_writer) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+        qemu_co_queue_wait(&lock->rqueue, &lock->mutex);
     }
     lock->reader++;
     qemu_co_mutex_unlock(&lock->mutex);
@@ -356,17 +357,22 @@  void qemu_co_rwlock_unlock(CoRwlock *lock)
 
     assert(qemu_in_coroutine());
     if (!lock->reader) {
-        /* The critical section started in qemu_co_rwlock_wrlock.  */
-        qemu_co_queue_restart_all(&lock->queue);
+        /* The critical section started in qemu_co_rwlock_wrlock or
+         * qemu_co_rwlock_upgrade.
+         */
+        qemu_co_queue_restart_all(&lock->wqueue);
+        qemu_co_queue_restart_all(&lock->rqueue);
     } else {
         self->locks_held--;
 
         qemu_co_mutex_lock(&lock->mutex);
         lock->reader--;
         assert(lock->reader >= 0);
-        /* Wakeup only one waiting writer */
-        if (!lock->reader) {
-            qemu_co_queue_next(&lock->queue);
+        /* If there are no remaining readers wake one waiting writer
+         * or all waiting readers.
+         */
+        if (!lock->reader && !qemu_co_queue_next(&lock->wqueue)) {
+            qemu_co_queue_restart_all(&lock->rqueue);
         }
     }
     qemu_co_mutex_unlock(&lock->mutex);
@@ -392,7 +398,7 @@  void qemu_co_rwlock_wrlock(CoRwlock *lock)
     qemu_co_mutex_lock(&lock->mutex);
     lock->pending_writer++;
     while (lock->reader) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+        qemu_co_queue_wait(&lock->wqueue, &lock->mutex);
     }
     lock->pending_writer--;
 
@@ -411,7 +417,7 @@  void qemu_co_rwlock_upgrade(CoRwlock *lock)
     lock->reader--;
     lock->pending_writer++;
     while (lock->reader) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+        qemu_co_queue_wait(&lock->wqueue, &lock->mutex);
     }
     lock->pending_writer--;