diff mbox

[6/6] coroutine-lock: make CoRwlock thread-safe and fair

Message ID 20170213181244.16297-7-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Feb. 13, 2017, 6:12 p.m. UTC
This adds a CoMutex around the existing CoQueue.  Because the write-side
can just take CoMutex, the old "writer" field is not necessary anymore.
Instead of removing it altogether, count the number of pending writers
during a read-side critical section and forbid further readers from
entering.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h   |  3 ++-
 util/qemu-coroutine-lock.c | 35 ++++++++++++++++++++++++-----------
 2 files changed, 26 insertions(+), 12 deletions(-)

Comments

Fam Zheng Feb. 15, 2017, 9:23 a.m. UTC | #1
On Mon, 02/13 19:12, Paolo Bonzini wrote:
> This adds a CoMutex around the existing CoQueue.  Because the write-side

s/CoQueue/CoRwlock/

> can just take CoMutex, the old "writer" field is not necessary anymore.
> Instead of removing it altogether, count the number of pending writers
> during a read-side critical section and forbid further readers from
> entering.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/qemu/coroutine.h   |  3 ++-
>  util/qemu-coroutine-lock.c | 35 ++++++++++++++++++++++++-----------
>  2 files changed, 26 insertions(+), 12 deletions(-)
> 
> diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
> index d2de268..e60beaf 100644
> --- a/include/qemu/coroutine.h
> +++ b/include/qemu/coroutine.h
> @@ -204,8 +204,9 @@ bool qemu_co_queue_empty(CoQueue *queue);
>  
>  
>  typedef struct CoRwlock {
> -    bool writer;
> +    int pending_writer;
>      int reader;
> +    CoMutex mutex;
>      CoQueue queue;
>  } CoRwlock;
>  
> diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
> index b0a554f..6328eed 100644
> --- a/util/qemu-coroutine-lock.c
> +++ b/util/qemu-coroutine-lock.c
> @@ -346,16 +346,22 @@ void qemu_co_rwlock_init(CoRwlock *lock)
>  {
>      memset(lock, 0, sizeof(*lock));
>      qemu_co_queue_init(&lock->queue);
> +    qemu_co_mutex_init(&lock->mutex);
>  }
>  
>  void qemu_co_rwlock_rdlock(CoRwlock *lock)
>  {
>      Coroutine *self = qemu_coroutine_self();
>  
> -    while (lock->writer) {
> -        qemu_co_queue_wait(&lock->queue, NULL);
> +    qemu_co_mutex_lock(&lock->mutex);
> +    /* For fairness, wait if a writer is in line.  */
> +    while (lock->pending_writer) {
> +        qemu_co_queue_wait(&lock->queue, &lock->mutex);
>      }
>      lock->reader++;
> +    qemu_co_mutex_unlock(&lock->mutex);
> +
> +    /* The rest of the read-side critical section is run without the mutex.  */
>      self->locks_held++;
>  }
>  
> @@ -364,10 +370,13 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
>      Coroutine *self = qemu_coroutine_self();
>  
>      assert(qemu_in_coroutine());
> -    if (lock->writer) {
> -        lock->writer = false;
> +    if (!lock->reader) {
> +        /* The critical section started in qemu_co_rwlock_wrlock.  */
>          qemu_co_queue_restart_all(&lock->queue);
>      } else {
> +        self->locks_held--;
> +
> +        qemu_co_mutex_lock(&lock->mutex);
>          lock->reader--;
>          assert(lock->reader >= 0);
>          /* Wakeup only one waiting writer */
> @@ -375,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
>              qemu_co_queue_next(&lock->queue);
>          }
>      }
> -    self->locks_held--;
> +    qemu_co_mutex_unlock(&lock->mutex);
>  }
>  
>  void qemu_co_rwlock_wrlock(CoRwlock *lock)
>  {
> -    Coroutine *self = qemu_coroutine_self();
> -
> -    while (lock->writer || lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, NULL);
> +    qemu_co_mutex_lock(&lock->mutex);
> +    lock->pending_writer++;
> +    while (lock->reader) {
> +        qemu_co_queue_wait(&lock->queue, &lock->mutex);
>      }
> -    lock->writer = true;
> -    self->locks_held++;
> +    lock->pending_writer--;
> +
> +    /* The rest of the write-side critical section is run with
> +     * the mutex taken, so that lock->reader remains zero.
> +     * There is no need to update self->locks_held.
> +     */

But is it still better to update self->locks_held anyway for the
'assert(!co->locks_held)' in qemu_coroutine_enter? Or is the same thing checked
elsewhere?

Fam
Paolo Bonzini Feb. 15, 2017, 11:20 a.m. UTC | #2
On 15/02/2017 10:23, Fam Zheng wrote:
> On Mon, 02/13 19:12, Paolo Bonzini wrote:
>> This adds a CoMutex around the existing CoQueue.  Because the write-side
> 
> s/CoQueue/CoRwlock/

No, I meant that CoRwlock has a CoQueue, and after this patch it is
wrapped by a CoMutex too.


>> @@ -375,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
>>              qemu_co_queue_next(&lock->queue);
>>          }
>>      }
>> -    self->locks_held--;
>> +    qemu_co_mutex_unlock(&lock->mutex);
>>  }
>>  
>>  void qemu_co_rwlock_wrlock(CoRwlock *lock)
>>  {
>> -    Coroutine *self = qemu_coroutine_self();
>> -
>> -    while (lock->writer || lock->reader) {
>> -        qemu_co_queue_wait(&lock->queue, NULL);
>> +    qemu_co_mutex_lock(&lock->mutex);
>> +    lock->pending_writer++;
>> +    while (lock->reader) {
>> +        qemu_co_queue_wait(&lock->queue, &lock->mutex);
>>      }
>> -    lock->writer = true;
>> -    self->locks_held++;
>> +    lock->pending_writer--;
>> +
>> +    /* The rest of the write-side critical section is run with
>> +     * the mutex taken, so that lock->reader remains zero.
>> +     * There is no need to update self->locks_held.
>> +     */
> 
> But is it still better to update self->locks_held anyway for the
> 'assert(!co->locks_held)' in qemu_coroutine_enter? Or is the same thing checked
> elsewhere?

self->locks_held is already incremented by the qemu_co_mutex_lock call
at the beginning of qemu_co_rwlock_wrlock.  It is then decremented in
qemu_co_rwlock_unlock.

For the read side, rdlock _unlocks_ lock->mutex before returning, so
self->locks_held must be incremented by rdlock and decremented by unlock.

Paolo
Fam Zheng Feb. 15, 2017, 11:37 a.m. UTC | #3
On Wed, 02/15 12:20, Paolo Bonzini wrote:
> 
> 
> On 15/02/2017 10:23, Fam Zheng wrote:
> > On Mon, 02/13 19:12, Paolo Bonzini wrote:
> >> This adds a CoMutex around the existing CoQueue.  Because the write-side
> > 
> > s/CoQueue/CoRwlock/
> 
> No, I meant that CoRwlock has a CoQueue, and after this patch it is
> wrapped by a CoMutex too.

OK.

> 
> 
> >> @@ -375,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
> >>              qemu_co_queue_next(&lock->queue);
> >>          }
> >>      }
> >> -    self->locks_held--;
> >> +    qemu_co_mutex_unlock(&lock->mutex);
> >>  }
> >>  
> >>  void qemu_co_rwlock_wrlock(CoRwlock *lock)
> >>  {
> >> -    Coroutine *self = qemu_coroutine_self();
> >> -
> >> -    while (lock->writer || lock->reader) {
> >> -        qemu_co_queue_wait(&lock->queue, NULL);
> >> +    qemu_co_mutex_lock(&lock->mutex);
> >> +    lock->pending_writer++;
> >> +    while (lock->reader) {
> >> +        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> >>      }
> >> -    lock->writer = true;
> >> -    self->locks_held++;
> >> +    lock->pending_writer--;
> >> +
> >> +    /* The rest of the write-side critical section is run with
> >> +     * the mutex taken, so that lock->reader remains zero.
> >> +     * There is no need to update self->locks_held.
> >> +     */
> > 
> > But is it still better to update self->locks_held anyway for the
> > 'assert(!co->locks_held)' in qemu_coroutine_enter? Or is the same thing checked
> > elsewhere?
> 
> self->locks_held is already incremented by the qemu_co_mutex_lock call
> at the beginning of qemu_co_rwlock_wrlock.  It is then decremented in
> qemu_co_rwlock_unlock.
> 
> For the read side, rdlock _unlocks_ lock->mutex before returning, so
> self->locks_held must be incremented by rdlock and decremented by unlock.

Makes sense.

Fam
diff mbox

Patch

diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index d2de268..e60beaf 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -204,8 +204,9 @@  bool qemu_co_queue_empty(CoQueue *queue);
 
 
 typedef struct CoRwlock {
-    bool writer;
+    int pending_writer;
     int reader;
+    CoMutex mutex;
     CoQueue queue;
 } CoRwlock;
 
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index b0a554f..6328eed 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -346,16 +346,22 @@  void qemu_co_rwlock_init(CoRwlock *lock)
 {
     memset(lock, 0, sizeof(*lock));
     qemu_co_queue_init(&lock->queue);
+    qemu_co_mutex_init(&lock->mutex);
 }
 
 void qemu_co_rwlock_rdlock(CoRwlock *lock)
 {
     Coroutine *self = qemu_coroutine_self();
 
-    while (lock->writer) {
-        qemu_co_queue_wait(&lock->queue, NULL);
+    qemu_co_mutex_lock(&lock->mutex);
+    /* For fairness, wait if a writer is in line.  */
+    while (lock->pending_writer) {
+        qemu_co_queue_wait(&lock->queue, &lock->mutex);
     }
     lock->reader++;
+    qemu_co_mutex_unlock(&lock->mutex);
+
+    /* The rest of the read-side critical section is run without the mutex.  */
     self->locks_held++;
 }
 
@@ -364,10 +370,13 @@  void qemu_co_rwlock_unlock(CoRwlock *lock)
     Coroutine *self = qemu_coroutine_self();
 
     assert(qemu_in_coroutine());
-    if (lock->writer) {
-        lock->writer = false;
+    if (!lock->reader) {
+        /* The critical section started in qemu_co_rwlock_wrlock.  */
         qemu_co_queue_restart_all(&lock->queue);
     } else {
+        self->locks_held--;
+
+        qemu_co_mutex_lock(&lock->mutex);
         lock->reader--;
         assert(lock->reader >= 0);
         /* Wakeup only one waiting writer */
@@ -375,16 +384,20 @@  void qemu_co_rwlock_unlock(CoRwlock *lock)
             qemu_co_queue_next(&lock->queue);
         }
     }
-    self->locks_held--;
+    qemu_co_mutex_unlock(&lock->mutex);
 }
 
 void qemu_co_rwlock_wrlock(CoRwlock *lock)
 {
-    Coroutine *self = qemu_coroutine_self();
-
-    while (lock->writer || lock->reader) {
-        qemu_co_queue_wait(&lock->queue, NULL);
+    qemu_co_mutex_lock(&lock->mutex);
+    lock->pending_writer++;
+    while (lock->reader) {
+        qemu_co_queue_wait(&lock->queue, &lock->mutex);
     }
-    lock->writer = true;
-    self->locks_held++;
+    lock->pending_writer--;
+
+    /* The rest of the write-side critical section is run with
+     * the mutex taken, so that lock->reader remains zero.
+     * There is no need to update self->locks_held.
+     */
 }