Message ID | 55B6E854.9090600@cn.fujitsu.com |
---|---|
State | New |
Headers | show |
On 28/07/2015 04:26, Wen Congyang wrote: > If rcu_(un)register_thread() is called together with synchronize_rcu(), > it will wait for the synchronize_rcu() to finish. But when synchronize_rcu() > waits for some events, we can modify the list registry. > We also use the lock rcu_gp_lock to assume that synchronize_rcu() isn't > executed in more than one thread at the same time. Add a new mutex lock > rcu_sync_lock to assume it and rename rcu_gp_lock to rcu_registry_lock. > Release rcu_registry_lock when synchronize_rcu() waits for some events. > > Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> > --- > util/rcu.c | 48 +++++++++++++++++++++++++++++++++++------------- > 1 file changed, 35 insertions(+), 13 deletions(-) > > diff --git a/util/rcu.c b/util/rcu.c > index cdcad67..4dd33df 100644 > --- a/util/rcu.c > +++ b/util/rcu.c > @@ -47,7 +47,8 @@ > unsigned long rcu_gp_ctr = RCU_GP_LOCKED; > > QemuEvent rcu_gp_event; > -static QemuMutex rcu_gp_lock; > +static QemuMutex rcu_registry_lock; > +static QemuMutex rcu_sync_lock; > > /* > * Check whether a quiescent state was crossed between the beginning of > @@ -66,7 +67,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr) > */ > __thread struct rcu_reader_data rcu_reader; > > -/* Protected by rcu_gp_lock. */ > +/* Protected by rcu_registry_lock. */ > typedef QLIST_HEAD(, rcu_reader_data) ThreadList; > static ThreadList registry = QLIST_HEAD_INITIALIZER(registry); > > @@ -114,10 +115,26 @@ static void wait_for_readers(void) > break; > } > > - /* Wait for one thread to report a quiescent state and > - * try again. > + /* Wait for one thread to report a quiescent state and try again. > + * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't > + * wait too much time. > + * > + * rcu_register_thread() may add nodes to ®istry; it will not > + * wake up synchronize_rcu, but that is okay because at least another > + * thread must exit its RCU read-side critical section before > + * synchronize_rcu is done. The next iteration of the loop will > + * move the new thread's rcu_reader from ®istry to &qsreaders, > + * because rcu_gp_ongoing() will return false. > + * > + * rcu_unregister_thread() may remove nodes from &qsreaders instead > + * of ®istry if it runs during qemu_event_wait. That's okay; > + * the node then will not be added back to ®istry by QLIST_SWAP > + * below. The invariant is that the node is part of one list when > + * rcu_registry_lock is released. > */ > + qemu_mutex_unlock(&rcu_registry_lock); > qemu_event_wait(&rcu_gp_event); > + qemu_mutex_lock(&rcu_registry_lock); > } > > /* put back the reader list in the registry */ > @@ -126,7 +143,8 @@ static void wait_for_readers(void) > > void synchronize_rcu(void) > { > - qemu_mutex_lock(&rcu_gp_lock); > + qemu_mutex_lock(&rcu_sync_lock); > + qemu_mutex_lock(&rcu_registry_lock); > > if (!QLIST_EMPTY(®istry)) { > /* In either case, the atomic_mb_set below blocks stores that free > @@ -149,7 +167,8 @@ void synchronize_rcu(void) > wait_for_readers(); > } > > - qemu_mutex_unlock(&rcu_gp_lock); > + qemu_mutex_unlock(&rcu_registry_lock); > + qemu_mutex_unlock(&rcu_sync_lock); > } > > > @@ -273,23 +292,24 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) > void rcu_register_thread(void) > { > assert(rcu_reader.ctr == 0); > - qemu_mutex_lock(&rcu_gp_lock); > + qemu_mutex_lock(&rcu_registry_lock); > QLIST_INSERT_HEAD(®istry, &rcu_reader, node); > - qemu_mutex_unlock(&rcu_gp_lock); > + qemu_mutex_unlock(&rcu_registry_lock); > } > > void rcu_unregister_thread(void) > { > - qemu_mutex_lock(&rcu_gp_lock); > + qemu_mutex_lock(&rcu_registry_lock); > QLIST_REMOVE(&rcu_reader, node); > - qemu_mutex_unlock(&rcu_gp_lock); > + qemu_mutex_unlock(&rcu_registry_lock); > } > > static void rcu_init_complete(void) > { > QemuThread thread; > > - qemu_mutex_init(&rcu_gp_lock); > + qemu_mutex_init(&rcu_registry_lock); > + qemu_mutex_init(&rcu_sync_lock); > qemu_event_init(&rcu_gp_event, true); > > qemu_event_init(&rcu_call_ready_event, false); > @@ -306,12 +326,14 @@ static void rcu_init_complete(void) > #ifdef CONFIG_POSIX > static void rcu_init_lock(void) > { > - qemu_mutex_lock(&rcu_gp_lock); > + qemu_mutex_lock(&rcu_sync_lock); > + qemu_mutex_lock(&rcu_registry_lock); > } > > static void rcu_init_unlock(void) > { > - qemu_mutex_unlock(&rcu_gp_lock); > + qemu_mutex_unlock(&rcu_registry_lock); > + qemu_mutex_unlock(&rcu_sync_lock); > } > #endif > > Ok, thanks. Paolo
On 07/28/2015 05:59 PM, Paolo Bonzini wrote: > > > On 28/07/2015 04:26, Wen Congyang wrote: >> If rcu_(un)register_thread() is called together with synchronize_rcu(), >> it will wait for the synchronize_rcu() to finish. But when synchronize_rcu() >> waits for some events, we can modify the list registry. >> We also use the lock rcu_gp_lock to assume that synchronize_rcu() isn't >> executed in more than one thread at the same time. Add a new mutex lock >> rcu_sync_lock to assume it and rename rcu_gp_lock to rcu_registry_lock. >> Release rcu_registry_lock when synchronize_rcu() waits for some events. >> >> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> >> --- >> util/rcu.c | 48 +++++++++++++++++++++++++++++++++++------------- >> 1 file changed, 35 insertions(+), 13 deletions(-) >> >> diff --git a/util/rcu.c b/util/rcu.c >> index cdcad67..4dd33df 100644 >> --- a/util/rcu.c >> +++ b/util/rcu.c >> @@ -47,7 +47,8 @@ >> unsigned long rcu_gp_ctr = RCU_GP_LOCKED; >> >> QemuEvent rcu_gp_event; >> -static QemuMutex rcu_gp_lock; >> +static QemuMutex rcu_registry_lock; >> +static QemuMutex rcu_sync_lock; >> >> /* >> * Check whether a quiescent state was crossed between the beginning of >> @@ -66,7 +67,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr) >> */ >> __thread struct rcu_reader_data rcu_reader; >> >> -/* Protected by rcu_gp_lock. */ >> +/* Protected by rcu_registry_lock. */ >> typedef QLIST_HEAD(, rcu_reader_data) ThreadList; >> static ThreadList registry = QLIST_HEAD_INITIALIZER(registry); >> >> @@ -114,10 +115,26 @@ static void wait_for_readers(void) >> break; >> } >> >> - /* Wait for one thread to report a quiescent state and >> - * try again. >> + /* Wait for one thread to report a quiescent state and try again. >> + * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't >> + * wait too much time. >> + * >> + * rcu_register_thread() may add nodes to ®istry; it will not >> + * wake up synchronize_rcu, but that is okay because at least another >> + * thread must exit its RCU read-side critical section before >> + * synchronize_rcu is done. The next iteration of the loop will >> + * move the new thread's rcu_reader from ®istry to &qsreaders, >> + * because rcu_gp_ongoing() will return false. >> + * >> + * rcu_unregister_thread() may remove nodes from &qsreaders instead >> + * of ®istry if it runs during qemu_event_wait. That's okay; >> + * the node then will not be added back to ®istry by QLIST_SWAP >> + * below. The invariant is that the node is part of one list when >> + * rcu_registry_lock is released. >> */ >> + qemu_mutex_unlock(&rcu_registry_lock); >> qemu_event_wait(&rcu_gp_event); >> + qemu_mutex_lock(&rcu_registry_lock); >> } >> >> /* put back the reader list in the registry */ >> @@ -126,7 +143,8 @@ static void wait_for_readers(void) >> >> void synchronize_rcu(void) >> { >> - qemu_mutex_lock(&rcu_gp_lock); >> + qemu_mutex_lock(&rcu_sync_lock); >> + qemu_mutex_lock(&rcu_registry_lock); >> >> if (!QLIST_EMPTY(®istry)) { >> /* In either case, the atomic_mb_set below blocks stores that free >> @@ -149,7 +167,8 @@ void synchronize_rcu(void) >> wait_for_readers(); >> } >> >> - qemu_mutex_unlock(&rcu_gp_lock); >> + qemu_mutex_unlock(&rcu_registry_lock); >> + qemu_mutex_unlock(&rcu_sync_lock); >> } >> >> >> @@ -273,23 +292,24 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) >> void rcu_register_thread(void) >> { >> assert(rcu_reader.ctr == 0); >> - qemu_mutex_lock(&rcu_gp_lock); >> + qemu_mutex_lock(&rcu_registry_lock); >> QLIST_INSERT_HEAD(®istry, &rcu_reader, node); >> - qemu_mutex_unlock(&rcu_gp_lock); >> + qemu_mutex_unlock(&rcu_registry_lock); >> } >> >> void rcu_unregister_thread(void) >> { >> - qemu_mutex_lock(&rcu_gp_lock); >> + qemu_mutex_lock(&rcu_registry_lock); >> QLIST_REMOVE(&rcu_reader, node); >> - qemu_mutex_unlock(&rcu_gp_lock); >> + qemu_mutex_unlock(&rcu_registry_lock); >> } >> >> static void rcu_init_complete(void) >> { >> QemuThread thread; >> >> - qemu_mutex_init(&rcu_gp_lock); >> + qemu_mutex_init(&rcu_registry_lock); >> + qemu_mutex_init(&rcu_sync_lock); >> qemu_event_init(&rcu_gp_event, true); >> >> qemu_event_init(&rcu_call_ready_event, false); >> @@ -306,12 +326,14 @@ static void rcu_init_complete(void) >> #ifdef CONFIG_POSIX >> static void rcu_init_lock(void) >> { >> - qemu_mutex_lock(&rcu_gp_lock); >> + qemu_mutex_lock(&rcu_sync_lock); >> + qemu_mutex_lock(&rcu_registry_lock); >> } >> >> static void rcu_init_unlock(void) >> { >> - qemu_mutex_unlock(&rcu_gp_lock); >> + qemu_mutex_unlock(&rcu_registry_lock); >> + qemu_mutex_unlock(&rcu_sync_lock); >> } >> #endif >> >> > > Ok, thanks. I have a question about rcu: while do we call wait_for_readers() twice for 32-bit host? Thanks Wen Congyang > > Paolo > . >
On 28/07/2015 12:02, Wen Congyang wrote: > I have a question about rcu: while do we call wait_for_readers() > twice for 32-bit host? Because there is a very small but non-zero probability of the counter going up by exactly 2^31 periods (periods are stored in bits 1-31 so you lose one bit) while the thread is sleeping. This detail of the implementation comes from URCU. Paolo
On 07/28/2015 06:18 PM, Paolo Bonzini wrote: > > > On 28/07/2015 12:02, Wen Congyang wrote: >> I have a question about rcu: while do we call wait_for_readers() >> twice for 32-bit host? > > Because there is a very small but non-zero probability of the counter > going up by exactly 2^31 periods (periods are stored in bits 1-31 so you > lose one bit) while the thread is sleeping. This detail of the > implementation comes from URCU. Yes, so you use rcu_gp_ctr ^ RCU_GP_CTR to instead of rcu_gp_ctr + RCU_GP_CTR. The initial value is 1, so rcu_gp_ctr is: 1, 3, 1, 3, ... The rcu_gp_ctr will never be 0. I think calling wait_for_readers() once is enough. Do I miss something? Thanks Wen Congyang > > Paolo >
On 28/07/2015 12:33, Wen Congyang wrote: > On 07/28/2015 06:18 PM, Paolo Bonzini wrote: >> >> >> On 28/07/2015 12:02, Wen Congyang wrote: >>> I have a question about rcu: while do we call wait_for_readers() >>> twice for 32-bit host? >> >> Because there is a very small but non-zero probability of the counter >> going up by exactly 2^31 periods (periods are stored in bits 1-31 so you >> lose one bit) while the thread is sleeping. This detail of the >> implementation comes from URCU. > > Yes, so you use rcu_gp_ctr ^ RCU_GP_CTR to instead of rcu_gp_ctr + RCU_GP_CTR. > The initial value is 1, so rcu_gp_ctr is: 1, 3, 1, 3, ... > The rcu_gp_ctr will never be 0. I think calling wait_for_readers() once is > enough. > > Do I miss something? If you call it just once, you have the same problem as before. In fact, it's worse because instead of having an overflow every 2^31 periods, you have one every 2 periods. Instead, by checking that rcu_reader went through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was quiescent), you are sure that the thread went through _at least one_ grace period. Paolo
At 2015/7/28 19:46, Paolo Bonzini Wrote: > > > On 28/07/2015 12:33, Wen Congyang wrote: >> On 07/28/2015 06:18 PM, Paolo Bonzini wrote: >>> >>> >>> On 28/07/2015 12:02, Wen Congyang wrote: >>>> I have a question about rcu: while do we call wait_for_readers() >>>> twice for 32-bit host? >>> >>> Because there is a very small but non-zero probability of the counter >>> going up by exactly 2^31 periods (periods are stored in bits 1-31 so you >>> lose one bit) while the thread is sleeping. This detail of the >>> implementation comes from URCU. >> >> Yes, so you use rcu_gp_ctr ^ RCU_GP_CTR to instead of rcu_gp_ctr + RCU_GP_CTR. >> The initial value is 1, so rcu_gp_ctr is: 1, 3, 1, 3, ... >> The rcu_gp_ctr will never be 0. I think calling wait_for_readers() once is >> enough. >> >> Do I miss something? > > If you call it just once, you have the same problem as before. In fact, > it's worse because instead of having an overflow every 2^31 periods, you > have one every 2 periods. Instead, by checking that rcu_reader went > through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was > quiescent), you are sure that the thread went through _at least one_ > grace period. The overflow is acceptable. We only compare if rcu_reader.ctr is equal than rcu_gp_ctr. If not, we should wait that thread to call rcu_read_unlock(). We don't care which is bigger. If no threads calls sync_rcu(), all threads rcu_read.ctr is 0 or rcu_gp_ctr. If one thread calls sync_rcu(), all threads rcu_read.ctr is 0, old_rcu_gp_ctr, or new_rcu_gp_ctr. We only wait the thread that's rcu_read.ctr is old_rcu_gp_ctr. Thanks Wen Congyang > > Paolo > >
On 28/07/2015 15:29, Wen Congyang wrote: >>> >> >> If you call it just once, you have the same problem as before. In fact, >> it's worse because instead of having an overflow every 2^31 periods, you >> have one every 2 periods. Instead, by checking that rcu_reader went >> through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was >> quiescent), you are sure that the thread went through _at least one_ >> grace period. > > The overflow is acceptable. We only compare if rcu_reader.ctr is equal than > rcu_gp_ctr. If not, we should wait that thread to call rcu_read_unlock(). > We don't care which is bigger. If no threads calls sync_rcu(), all threads > rcu_read.ctr is 0 or rcu_gp_ctr. If one thread calls sync_rcu(), all > threads rcu_read.ctr is 0, old_rcu_gp_ctr, or new_rcu_gp_ctr. We only wait the > thread that's rcu_read.ctr is old_rcu_gp_ctr. Suppose you have reader writer --------------------------------------------------------------- rcu_read_lock() ctr = atomic_read(&rcu_gp_ctr); ctr = 1 synchronize_rcu() rcu_gp_ctr = 3 atomic_xchg(&p_rcu_reader->ctr, ctr); rcu_reader.ctr = 1 We're in the critical section. synchronize_rcu() rcu_gp_ctr = 1 rcu_gp_ongoing(&p_rcu_reader->ctr) returns false, so synchronize_rcu() exits. But we're still in the critical section. p = atomic_rcu_read(&foo); g_free(p); Boom. :) For more information see http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.220.8810 Paolo
On 07/29/2015 12:14 AM, Paolo Bonzini wrote: > > > On 28/07/2015 15:29, Wen Congyang wrote: >>>> >>> >>> If you call it just once, you have the same problem as before. In fact, >>> it's worse because instead of having an overflow every 2^31 periods, you >>> have one every 2 periods. Instead, by checking that rcu_reader went >>> through 1 _and_ 3 (or that it was at least once 0, i.e. the thread was >>> quiescent), you are sure that the thread went through _at least one_ >>> grace period. >> >> The overflow is acceptable. We only compare if rcu_reader.ctr is equal than >> rcu_gp_ctr. If not, we should wait that thread to call rcu_read_unlock(). >> We don't care which is bigger. If no threads calls sync_rcu(), all threads >> rcu_read.ctr is 0 or rcu_gp_ctr. If one thread calls sync_rcu(), all >> threads rcu_read.ctr is 0, old_rcu_gp_ctr, or new_rcu_gp_ctr. We only wait the >> thread that's rcu_read.ctr is old_rcu_gp_ctr. > > Suppose you have > > reader writer > --------------------------------------------------------------- > rcu_read_lock() > ctr = atomic_read(&rcu_gp_ctr); > ctr = 1 > synchronize_rcu() > rcu_gp_ctr = 3 > atomic_xchg(&p_rcu_reader->ctr, ctr); > rcu_reader.ctr = 1 > > We're in the critical section. > > synchronize_rcu() > rcu_gp_ctr = 1 > > rcu_gp_ongoing(&p_rcu_reader->ctr) returns false, so synchronize_rcu() > exits. But we're still in the critical section. > > p = atomic_rcu_read(&foo); > g_free(p); > > Boom. :) For more information see > http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.220.8810 Thanks for your explanation. I understand it now. If we use rcu_gp_ctr + RCU_GP_CTR for 32-bit host, the problem only exists when we call synchronize_rcu() 2^31 times while another thread calls rcu_read_lock(). Thanks Wen Congyang > > Paolo >
diff --git a/util/rcu.c b/util/rcu.c index cdcad67..4dd33df 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -47,7 +47,8 @@ unsigned long rcu_gp_ctr = RCU_GP_LOCKED; QemuEvent rcu_gp_event; -static QemuMutex rcu_gp_lock; +static QemuMutex rcu_registry_lock; +static QemuMutex rcu_sync_lock; /* * Check whether a quiescent state was crossed between the beginning of @@ -66,7 +67,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr) */ __thread struct rcu_reader_data rcu_reader; -/* Protected by rcu_gp_lock. */ +/* Protected by rcu_registry_lock. */ typedef QLIST_HEAD(, rcu_reader_data) ThreadList; static ThreadList registry = QLIST_HEAD_INITIALIZER(registry); @@ -114,10 +115,26 @@ static void wait_for_readers(void) break; } - /* Wait for one thread to report a quiescent state and - * try again. + /* Wait for one thread to report a quiescent state and try again. + * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't + * wait too much time. + * + * rcu_register_thread() may add nodes to ®istry; it will not + * wake up synchronize_rcu, but that is okay because at least another + * thread must exit its RCU read-side critical section before + * synchronize_rcu is done. The next iteration of the loop will + * move the new thread's rcu_reader from ®istry to &qsreaders, + * because rcu_gp_ongoing() will return false. + * + * rcu_unregister_thread() may remove nodes from &qsreaders instead + * of ®istry if it runs during qemu_event_wait. That's okay; + * the node then will not be added back to ®istry by QLIST_SWAP + * below. The invariant is that the node is part of one list when + * rcu_registry_lock is released. */ + qemu_mutex_unlock(&rcu_registry_lock); qemu_event_wait(&rcu_gp_event); + qemu_mutex_lock(&rcu_registry_lock); } /* put back the reader list in the registry */ @@ -126,7 +143,8 @@ static void wait_for_readers(void) void synchronize_rcu(void) { - qemu_mutex_lock(&rcu_gp_lock); + qemu_mutex_lock(&rcu_sync_lock); + qemu_mutex_lock(&rcu_registry_lock); if (!QLIST_EMPTY(®istry)) { /* In either case, the atomic_mb_set below blocks stores that free @@ -149,7 +167,8 @@ void synchronize_rcu(void) wait_for_readers(); } - qemu_mutex_unlock(&rcu_gp_lock); + qemu_mutex_unlock(&rcu_registry_lock); + qemu_mutex_unlock(&rcu_sync_lock); } @@ -273,23 +292,24 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) void rcu_register_thread(void) { assert(rcu_reader.ctr == 0); - qemu_mutex_lock(&rcu_gp_lock); + qemu_mutex_lock(&rcu_registry_lock); QLIST_INSERT_HEAD(®istry, &rcu_reader, node); - qemu_mutex_unlock(&rcu_gp_lock); + qemu_mutex_unlock(&rcu_registry_lock); } void rcu_unregister_thread(void) { - qemu_mutex_lock(&rcu_gp_lock); + qemu_mutex_lock(&rcu_registry_lock); QLIST_REMOVE(&rcu_reader, node); - qemu_mutex_unlock(&rcu_gp_lock); + qemu_mutex_unlock(&rcu_registry_lock); } static void rcu_init_complete(void) { QemuThread thread; - qemu_mutex_init(&rcu_gp_lock); + qemu_mutex_init(&rcu_registry_lock); + qemu_mutex_init(&rcu_sync_lock); qemu_event_init(&rcu_gp_event, true); qemu_event_init(&rcu_call_ready_event, false); @@ -306,12 +326,14 @@ static void rcu_init_complete(void) #ifdef CONFIG_POSIX static void rcu_init_lock(void) { - qemu_mutex_lock(&rcu_gp_lock); + qemu_mutex_lock(&rcu_sync_lock); + qemu_mutex_lock(&rcu_registry_lock); } static void rcu_init_unlock(void) { - qemu_mutex_unlock(&rcu_gp_lock); + qemu_mutex_unlock(&rcu_registry_lock); + qemu_mutex_unlock(&rcu_sync_lock); } #endif
If rcu_(un)register_thread() is called together with synchronize_rcu(), it will wait for the synchronize_rcu() to finish. But when synchronize_rcu() waits for some events, we can modify the list registry. We also use the lock rcu_gp_lock to assume that synchronize_rcu() isn't executed in more than one thread at the same time. Add a new mutex lock rcu_sync_lock to assume it and rename rcu_gp_lock to rcu_registry_lock. Release rcu_registry_lock when synchronize_rcu() waits for some events. Signed-off-by: Wen Congyang <wency@cn.fujitsu.com> --- util/rcu.c | 48 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 13 deletions(-)