diff mbox

[v2,for-2.5] rcu: Allow calling rcu_(un)register_thread() during synchronize_rcu()

Message ID 55B6E854.9090600@cn.fujitsu.com
State New
Headers show

Commit Message

Wen Congyang July 28, 2015, 2:26 a.m. UTC
If rcu_(un)register_thread() is called together with synchronize_rcu(),
it will wait for the synchronize_rcu() to finish. But when synchronize_rcu()
waits for some events, we can modify the list registry.
We also use the lock rcu_gp_lock to assume that synchronize_rcu() isn't
executed in more than one thread at the same time. Add a new mutex lock
rcu_sync_lock to assume it and rename rcu_gp_lock to rcu_registry_lock.
Release rcu_registry_lock when synchronize_rcu() waits for some events.

Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 util/rcu.c | 48 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 35 insertions(+), 13 deletions(-)

Comments

Paolo Bonzini July 28, 2015, 9:59 a.m. UTC | #1
On 28/07/2015 04:26, Wen Congyang wrote:
> If rcu_(un)register_thread() is called together with synchronize_rcu(),
> it will wait for the synchronize_rcu() to finish. But when synchronize_rcu()
> waits for some events, we can modify the list registry.
> We also use the lock rcu_gp_lock to assume that synchronize_rcu() isn't
> executed in more than one thread at the same time. Add a new mutex lock
> rcu_sync_lock to assume it and rename rcu_gp_lock to rcu_registry_lock.
> Release rcu_registry_lock when synchronize_rcu() waits for some events.
> 
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  util/rcu.c | 48 +++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 35 insertions(+), 13 deletions(-)
> 
> diff --git a/util/rcu.c b/util/rcu.c
> index cdcad67..4dd33df 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -47,7 +47,8 @@
>  unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
>  
>  QemuEvent rcu_gp_event;
> -static QemuMutex rcu_gp_lock;
> +static QemuMutex rcu_registry_lock;
> +static QemuMutex rcu_sync_lock;
>  
>  /*
>   * Check whether a quiescent state was crossed between the beginning of
> @@ -66,7 +67,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr)
>   */
>  __thread struct rcu_reader_data rcu_reader;
>  
> -/* Protected by rcu_gp_lock.  */
> +/* Protected by rcu_registry_lock.  */
>  typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
>  static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
>  
> @@ -114,10 +115,26 @@ static void wait_for_readers(void)
>              break;
>          }
>  
> -        /* Wait for one thread to report a quiescent state and
> -         * try again.
> +        /* Wait for one thread to report a quiescent state and try again.
> +         * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't
> +         * wait too much time.
> +         *
> +         * rcu_register_thread() may add nodes to &registry; it will not
> +         * wake up synchronize_rcu, but that is okay because at least another
> +         * thread must exit its RCU read-side critical section before
> +         * synchronize_rcu is done.  The next iteration of the loop will
> +         * move the new thread's rcu_reader from &registry to &qsreaders,
> +         * because rcu_gp_ongoing() will return false.
> +         *
> +         * rcu_unregister_thread() may remove nodes from &qsreaders instead
> +         * of &registry if it runs during qemu_event_wait.  That's okay;
> +         * the node then will not be added back to &registry by QLIST_SWAP
> +         * below. The invariant is that the node is part of one list when
> +         * rcu_registry_lock is released.
>           */
> +        qemu_mutex_unlock(&rcu_registry_lock);
>          qemu_event_wait(&rcu_gp_event);
> +        qemu_mutex_lock(&rcu_registry_lock);
>      }
>  
>      /* put back the reader list in the registry */
> @@ -126,7 +143,8 @@ static void wait_for_readers(void)
>  
>  void synchronize_rcu(void)
>  {
> -    qemu_mutex_lock(&rcu_gp_lock);
> +    qemu_mutex_lock(&rcu_sync_lock);
> +    qemu_mutex_lock(&rcu_registry_lock);
>  
>      if (!QLIST_EMPTY(&registry)) {
>          /* In either case, the atomic_mb_set below blocks stores that free
> @@ -149,7 +167,8 @@ void synchronize_rcu(void)
>          wait_for_readers();
>      }
>  
> -    qemu_mutex_unlock(&rcu_gp_lock);
> +    qemu_mutex_unlock(&rcu_registry_lock);
> +    qemu_mutex_unlock(&rcu_sync_lock);
>  }
>  
>  
> @@ -273,23 +292,24 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
>  void rcu_register_thread(void)
>  {
>      assert(rcu_reader.ctr == 0);
> -    qemu_mutex_lock(&rcu_gp_lock);
> +    qemu_mutex_lock(&rcu_registry_lock);
>      QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
> -    qemu_mutex_unlock(&rcu_gp_lock);
> +    qemu_mutex_unlock(&rcu_registry_lock);
>  }
>  
>  void rcu_unregister_thread(void)
>  {
> -    qemu_mutex_lock(&rcu_gp_lock);
> +    qemu_mutex_lock(&rcu_registry_lock);
>      QLIST_REMOVE(&rcu_reader, node);
> -    qemu_mutex_unlock(&rcu_gp_lock);
> +    qemu_mutex_unlock(&rcu_registry_lock);
>  }
>  
>  static void rcu_init_complete(void)
>  {
>      QemuThread thread;
>  
> -    qemu_mutex_init(&rcu_gp_lock);
> +    qemu_mutex_init(&rcu_registry_lock);
> +    qemu_mutex_init(&rcu_sync_lock);
>      qemu_event_init(&rcu_gp_event, true);
>  
>      qemu_event_init(&rcu_call_ready_event, false);
> @@ -306,12 +326,14 @@ static void rcu_init_complete(void)
>  #ifdef CONFIG_POSIX
>  static void rcu_init_lock(void)
>  {
> -    qemu_mutex_lock(&rcu_gp_lock);
> +    qemu_mutex_lock(&rcu_sync_lock);
> +    qemu_mutex_lock(&rcu_registry_lock);
>  }
>  
>  static void rcu_init_unlock(void)
>  {
> -    qemu_mutex_unlock(&rcu_gp_lock);
> +    qemu_mutex_unlock(&rcu_registry_lock);
> +    qemu_mutex_unlock(&rcu_sync_lock);
>  }
>  #endif
>  
> 

Ok, thanks.

Paolo
Wen Congyang July 28, 2015, 10:02 a.m. UTC | #2
On 07/28/2015 05:59 PM, Paolo Bonzini wrote:
> 
> 
> On 28/07/2015 04:26, Wen Congyang wrote:
>> If rcu_(un)register_thread() is called together with synchronize_rcu(),
>> it will wait for the synchronize_rcu() to finish. But when synchronize_rcu()
>> waits for some events, we can modify the list registry.
>> We also use the lock rcu_gp_lock to assume that synchronize_rcu() isn't
>> executed in more than one thread at the same time. Add a new mutex lock
>> rcu_sync_lock to assume it and rename rcu_gp_lock to rcu_registry_lock.
>> Release rcu_registry_lock when synchronize_rcu() waits for some events.
>>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>  util/rcu.c | 48 +++++++++++++++++++++++++++++++++++-------------
>>  1 file changed, 35 insertions(+), 13 deletions(-)
>>
>> diff --git a/util/rcu.c b/util/rcu.c
>> index cdcad67..4dd33df 100644
>> --- a/util/rcu.c
>> +++ b/util/rcu.c
>> @@ -47,7 +47,8 @@
>>  unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
>>  
>>  QemuEvent rcu_gp_event;
>> -static QemuMutex rcu_gp_lock;
>> +static QemuMutex rcu_registry_lock;
>> +static QemuMutex rcu_sync_lock;
>>  
>>  /*
>>   * Check whether a quiescent state was crossed between the beginning of
>> @@ -66,7 +67,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr)
>>   */
>>  __thread struct rcu_reader_data rcu_reader;
>>  
>> -/* Protected by rcu_gp_lock.  */
>> +/* Protected by rcu_registry_lock.  */
>>  typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
>>  static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
>>  
>> @@ -114,10 +115,26 @@ static void wait_for_readers(void)
>>              break;
>>          }
>>  
>> -        /* Wait for one thread to report a quiescent state and
>> -         * try again.
>> +        /* Wait for one thread to report a quiescent state and try again.
>> +         * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't
>> +         * wait too much time.
>> +         *
>> +         * rcu_register_thread() may add nodes to &registry; it will not
>> +         * wake up synchronize_rcu, but that is okay because at least another
>> +         * thread must exit its RCU read-side critical section before
>> +         * synchronize_rcu is done.  The next iteration of the loop will
>> +         * move the new thread's rcu_reader from &registry to &qsreaders,
>> +         * because rcu_gp_ongoing() will return false.
>> +         *
>> +         * rcu_unregister_thread() may remove nodes from &qsreaders instead
>> +         * of &registry if it runs during qemu_event_wait.  That's okay;
>> +         * the node then will not be added back to &registry by QLIST_SWAP
>> +         * below. The invariant is that the node is part of one list when
>> +         * rcu_registry_lock is released.
>>           */
>> +        qemu_mutex_unlock(&rcu_registry_lock);
>>          qemu_event_wait(&rcu_gp_event);
>> +        qemu_mutex_lock(&rcu_registry_lock);
>>      }
>>  
>>      /* put back the reader list in the registry */
>> @@ -126,7 +143,8 @@ static void wait_for_readers(void)
>>  
>>  void synchronize_rcu(void)
>>  {
>> -    qemu_mutex_lock(&rcu_gp_lock);
>> +    qemu_mutex_lock(&rcu_sync_lock);
>> +    qemu_mutex_lock(&rcu_registry_lock);
>>  
>>      if (!QLIST_EMPTY(&registry)) {
>>          /* In either case, the atomic_mb_set below blocks stores that free
>> @@ -149,7 +167,8 @@ void synchronize_rcu(void)
>>          wait_for_readers();
>>      }
>>  
>> -    qemu_mutex_unlock(&rcu_gp_lock);
>> +    qemu_mutex_unlock(&rcu_registry_lock);
>> +    qemu_mutex_unlock(&rcu_sync_lock);
>>  }
>>  
>>  
>> @@ -273,23 +292,24 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
>>  void rcu_register_thread(void)
>>  {
>>      assert(rcu_reader.ctr == 0);
>> -    qemu_mutex_lock(&rcu_gp_lock);
>> +    qemu_mutex_lock(&rcu_registry_lock);
>>      QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
>> -    qemu_mutex_unlock(&rcu_gp_lock);
>> +    qemu_mutex_unlock(&rcu_registry_lock);
>>  }
>>  
>>  void rcu_unregister_thread(void)
>>  {
>> -    qemu_mutex_lock(&rcu_gp_lock);
>> +    qemu_mutex_lock(&rcu_registry_lock);
>>      QLIST_REMOVE(&rcu_reader, node);
>> -    qemu_mutex_unlock(&rcu_gp_lock);
>> +    qemu_mutex_unlock(&rcu_registry_lock);
>>  }
>>  
>>  static void rcu_init_complete(void)
>>  {
>>      QemuThread thread;
>>  
>> -    qemu_mutex_init(&rcu_gp_lock);
>> +    qemu_mutex_init(&rcu_registry_lock);
>> +    qemu_mutex_init(&rcu_sync_lock);
>>      qemu_event_init(&rcu_gp_event, true);
>>  
>>      qemu_event_init(&rcu_call_ready_event, false);
>> @@ -306,12 +326,14 @@ static void rcu_init_complete(void)
>>  #ifdef CONFIG_POSIX
>>  static void rcu_init_lock(void)
>>  {
>> -    qemu_mutex_lock(&rcu_gp_lock);
>> +    qemu_mutex_lock(&rcu_sync_lock);
>> +    qemu_mutex_lock(&rcu_registry_lock);
>>  }
>>  
>>  static void rcu_init_unlock(void)
>>  {
>> -    qemu_mutex_unlock(&rcu_gp_lock);
>> +    qemu_mutex_unlock(&rcu_registry_lock);
>> +    qemu_mutex_unlock(&rcu_sync_lock);
>>  }
>>  #endif
>>  
>>
> 
> Ok, thanks.

I have a question about rcu: while do we call wait_for_readers() twice for 32-bit host?

Thanks
Wen Congyang

> 
> Paolo
> .
>
Paolo Bonzini July 28, 2015, 10:18 a.m. UTC | #3
On 28/07/2015 12:02, Wen Congyang wrote:
> I have a question about rcu: while do we call wait_for_readers()
> twice for 32-bit host?

Because there is a very small but non-zero probability of the counter
going up by exactly 2^31 periods (periods are stored in bits 1-31 so you
lose one bit) while the thread is sleeping.  This detail of the
implementation comes from URCU.

Paolo
Wen Congyang July 28, 2015, 10:33 a.m. UTC | #4
On 07/28/2015 06:18 PM, Paolo Bonzini wrote:
> 
> 
> On 28/07/2015 12:02, Wen Congyang wrote:
>> I have a question about rcu: while do we call wait_for_readers()
>> twice for 32-bit host?
> 
> Because there is a very small but non-zero probability of the counter
> going up by exactly 2^31 periods (periods are stored in bits 1-31 so you
> lose one bit) while the thread is sleeping.  This detail of the
> implementation comes from URCU.

Yes, so you use rcu_gp_ctr ^ RCU_GP_CTR to instead of rcu_gp_ctr + RCU_GP_CTR.
The initial value is 1, so rcu_gp_ctr is: 1, 3, 1, 3, ...
The rcu_gp_ctr will never be 0. I think calling wait_for_readers() once is
enough.

Do I miss something?

Thanks
Wen Congyang

> 
> Paolo
>
Paolo Bonzini July 28, 2015, 11:46 a.m. UTC | #5
On 28/07/2015 12:33, Wen Congyang wrote:
> On 07/28/2015 06:18 PM, Paolo Bonzini wrote:
>>
>>
>> On 28/07/2015 12:02, Wen Congyang wrote:
>>> I have a question about rcu: while do we call wait_for_readers()
>>> twice for 32-bit host?
>>
>> Because there is a very small but non-zero probability of the counter
>> going up by exactly 2^31 periods (periods are stored in bits 1-31 so you
>> lose one bit) while the thread is sleeping.  This detail of the
>> implementation comes from URCU.
> 
> Yes, so you use rcu_gp_ctr ^ RCU_GP_CTR to instead of rcu_gp_ctr + RCU_GP_CTR.
> The initial value is 1, so rcu_gp_ctr is: 1, 3, 1, 3, ...
> The rcu_gp_ctr will never be 0. I think calling wait_for_readers() once is
> enough.
> 
> Do I miss something?

If you call it just once, you have the same problem as before.  In fact,
it's worse because instead of having an overflow every 2^31 periods, you
have one every 2 periods.  Instead, by checking that rcu_reader went
through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was
quiescent), you are sure that the thread went through _at least one_
grace period.

Paolo
Wen Congyang July 28, 2015, 1:29 p.m. UTC | #6
At 2015/7/28 19:46, Paolo Bonzini Wrote:
>
>
> On 28/07/2015 12:33, Wen Congyang wrote:
>> On 07/28/2015 06:18 PM, Paolo Bonzini wrote:
>>>
>>>
>>> On 28/07/2015 12:02, Wen Congyang wrote:
>>>> I have a question about rcu: while do we call wait_for_readers()
>>>> twice for 32-bit host?
>>>
>>> Because there is a very small but non-zero probability of the counter
>>> going up by exactly 2^31 periods (periods are stored in bits 1-31 so you
>>> lose one bit) while the thread is sleeping.  This detail of the
>>> implementation comes from URCU.
>>
>> Yes, so you use rcu_gp_ctr ^ RCU_GP_CTR to instead of rcu_gp_ctr + RCU_GP_CTR.
>> The initial value is 1, so rcu_gp_ctr is: 1, 3, 1, 3, ...
>> The rcu_gp_ctr will never be 0. I think calling wait_for_readers() once is
>> enough.
>>
>> Do I miss something?
>
> If you call it just once, you have the same problem as before.  In fact,
> it's worse because instead of having an overflow every 2^31 periods, you
> have one every 2 periods.  Instead, by checking that rcu_reader went
> through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was
> quiescent), you are sure that the thread went through _at least one_
> grace period.

The overflow is acceptable. We only compare if rcu_reader.ctr is equal than
rcu_gp_ctr. If not, we should wait that thread to call rcu_read_unlock().
We don't care which is bigger. If no threads calls sync_rcu(), all threads
rcu_read.ctr is 0 or rcu_gp_ctr. If one thread calls sync_rcu(), all threads
rcu_read.ctr is 0, old_rcu_gp_ctr, or new_rcu_gp_ctr. We only wait the 
thread
that's rcu_read.ctr is old_rcu_gp_ctr.

Thanks
Wen Congyang

>
> Paolo
>
>
Paolo Bonzini July 28, 2015, 4:14 p.m. UTC | #7
On 28/07/2015 15:29, Wen Congyang wrote:
>>>
>>
>> If you call it just once, you have the same problem as before.  In fact,
>> it's worse because instead of having an overflow every 2^31 periods, you
>> have one every 2 periods.  Instead, by checking that rcu_reader went
>> through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was
>> quiescent), you are sure that the thread went through _at least one_
>> grace period.
> 
> The overflow is acceptable. We only compare if rcu_reader.ctr is equal than
> rcu_gp_ctr. If not, we should wait that thread to call rcu_read_unlock().
> We don't care which is bigger. If no threads calls sync_rcu(), all threads
> rcu_read.ctr is 0 or rcu_gp_ctr. If one thread calls sync_rcu(), all
> threads rcu_read.ctr is 0, old_rcu_gp_ctr, or new_rcu_gp_ctr. We only wait the
> thread that's rcu_read.ctr is old_rcu_gp_ctr.

Suppose you have

    reader                                        writer
 ---------------------------------------------------------------
    rcu_read_lock()
      ctr = atomic_read(&rcu_gp_ctr);
        ctr = 1
                                                  synchronize_rcu()
                                                     rcu_gp_ctr = 3
      atomic_xchg(&p_rcu_reader->ctr, ctr);
        rcu_reader.ctr = 1

We're in the critical section.

                                                  synchronize_rcu()
                                                     rcu_gp_ctr = 1

rcu_gp_ongoing(&p_rcu_reader->ctr) returns false, so synchronize_rcu()
exits.  But we're still in the critical section.

    p = atomic_rcu_read(&foo);
                                                  g_free(p);

Boom. :)  For more information see
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.220.8810

Paolo
Wen Congyang July 29, 2015, 1:11 a.m. UTC | #8
On 07/29/2015 12:14 AM, Paolo Bonzini wrote:
> 
> 
> On 28/07/2015 15:29, Wen Congyang wrote:
>>>>
>>>
>>> If you call it just once, you have the same problem as before.  In fact,
>>> it's worse because instead of having an overflow every 2^31 periods, you
>>> have one every 2 periods.  Instead, by checking that rcu_reader went
>>> through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was
>>> quiescent), you are sure that the thread went through _at least one_
>>> grace period.
>>
>> The overflow is acceptable. We only compare if rcu_reader.ctr is equal than
>> rcu_gp_ctr. If not, we should wait that thread to call rcu_read_unlock().
>> We don't care which is bigger. If no threads calls sync_rcu(), all threads
>> rcu_read.ctr is 0 or rcu_gp_ctr. If one thread calls sync_rcu(), all
>> threads rcu_read.ctr is 0, old_rcu_gp_ctr, or new_rcu_gp_ctr. We only wait the
>> thread that's rcu_read.ctr is old_rcu_gp_ctr.
> 
> Suppose you have
> 
>     reader                                        writer
>  ---------------------------------------------------------------
>     rcu_read_lock()
>       ctr = atomic_read(&rcu_gp_ctr);
>         ctr = 1
>                                                   synchronize_rcu()
>                                                      rcu_gp_ctr = 3
>       atomic_xchg(&p_rcu_reader->ctr, ctr);
>         rcu_reader.ctr = 1
> 
> We're in the critical section.
> 
>                                                   synchronize_rcu()
>                                                      rcu_gp_ctr = 1
> 
> rcu_gp_ongoing(&p_rcu_reader->ctr) returns false, so synchronize_rcu()
> exits.  But we're still in the critical section.
> 
>     p = atomic_rcu_read(&foo);
>                                                   g_free(p);
> 
> Boom. :)  For more information see
> http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.220.8810

Thanks for your explanation. I understand it now. If we use
rcu_gp_ctr + RCU_GP_CTR for 32-bit host, the problem only exists when we call
synchronize_rcu() 2^31 times while another thread calls rcu_read_lock().

Thanks
Wen Congyang

> 
> Paolo
>
diff mbox

Patch

diff --git a/util/rcu.c b/util/rcu.c
index cdcad67..4dd33df 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -47,7 +47,8 @@ 
 unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
 
 QemuEvent rcu_gp_event;
-static QemuMutex rcu_gp_lock;
+static QemuMutex rcu_registry_lock;
+static QemuMutex rcu_sync_lock;
 
 /*
  * Check whether a quiescent state was crossed between the beginning of
@@ -66,7 +67,7 @@  static inline int rcu_gp_ongoing(unsigned long *ctr)
  */
 __thread struct rcu_reader_data rcu_reader;
 
-/* Protected by rcu_gp_lock.  */
+/* Protected by rcu_registry_lock.  */
 typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
 static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
 
@@ -114,10 +115,26 @@  static void wait_for_readers(void)
             break;
         }
 
-        /* Wait for one thread to report a quiescent state and
-         * try again.
+        /* Wait for one thread to report a quiescent state and try again.
+         * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't
+         * wait too much time.
+         *
+         * rcu_register_thread() may add nodes to &registry; it will not
+         * wake up synchronize_rcu, but that is okay because at least another
+         * thread must exit its RCU read-side critical section before
+         * synchronize_rcu is done.  The next iteration of the loop will
+         * move the new thread's rcu_reader from &registry to &qsreaders,
+         * because rcu_gp_ongoing() will return false.
+         *
+         * rcu_unregister_thread() may remove nodes from &qsreaders instead
+         * of &registry if it runs during qemu_event_wait.  That's okay;
+         * the node then will not be added back to &registry by QLIST_SWAP
+         * below. The invariant is that the node is part of one list when
+         * rcu_registry_lock is released.
          */
+        qemu_mutex_unlock(&rcu_registry_lock);
         qemu_event_wait(&rcu_gp_event);
+        qemu_mutex_lock(&rcu_registry_lock);
     }
 
     /* put back the reader list in the registry */
@@ -126,7 +143,8 @@  static void wait_for_readers(void)
 
 void synchronize_rcu(void)
 {
-    qemu_mutex_lock(&rcu_gp_lock);
+    qemu_mutex_lock(&rcu_sync_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
 
     if (!QLIST_EMPTY(&registry)) {
         /* In either case, the atomic_mb_set below blocks stores that free
@@ -149,7 +167,8 @@  void synchronize_rcu(void)
         wait_for_readers();
     }
 
-    qemu_mutex_unlock(&rcu_gp_lock);
+    qemu_mutex_unlock(&rcu_registry_lock);
+    qemu_mutex_unlock(&rcu_sync_lock);
 }
 
 
@@ -273,23 +292,24 @@  void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
 void rcu_register_thread(void)
 {
     assert(rcu_reader.ctr == 0);
-    qemu_mutex_lock(&rcu_gp_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
     QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
-    qemu_mutex_unlock(&rcu_gp_lock);
+    qemu_mutex_unlock(&rcu_registry_lock);
 }
 
 void rcu_unregister_thread(void)
 {
-    qemu_mutex_lock(&rcu_gp_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
     QLIST_REMOVE(&rcu_reader, node);
-    qemu_mutex_unlock(&rcu_gp_lock);
+    qemu_mutex_unlock(&rcu_registry_lock);
 }
 
 static void rcu_init_complete(void)
 {
     QemuThread thread;
 
-    qemu_mutex_init(&rcu_gp_lock);
+    qemu_mutex_init(&rcu_registry_lock);
+    qemu_mutex_init(&rcu_sync_lock);
     qemu_event_init(&rcu_gp_event, true);
 
     qemu_event_init(&rcu_call_ready_event, false);
@@ -306,12 +326,14 @@  static void rcu_init_complete(void)
 #ifdef CONFIG_POSIX
 static void rcu_init_lock(void)
 {
-    qemu_mutex_lock(&rcu_gp_lock);
+    qemu_mutex_lock(&rcu_sync_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
 }
 
 static void rcu_init_unlock(void)
 {
-    qemu_mutex_unlock(&rcu_gp_lock);
+    qemu_mutex_unlock(&rcu_registry_lock);
+    qemu_mutex_unlock(&rcu_sync_lock);
 }
 #endif