diff mbox

[ovs-dev,v3] dpif-netdev: Remove PMD latency on seq_mutex

Message ID 1466707244-7196-1-git-send-email-fbl@redhat.com
State Changes Requested
Headers show

Commit Message

Flavio Leitner June 23, 2016, 6:40 p.m. UTC
The PMD thread needs to keep processing RX queues in order
to achieve maximum throughput. It also needs to sweep emc
cache and quiesce which use seq_mutex. That mutex can
eventually block the PMD thread causing latency spikes and
affecting the throughput.

Since there is no requirement for running those tasks at a
specific time, this patch extend seq API to allow tentative
locking instead.

Reported-by: Karl Rister <krister@redhat.com>
Co-authored-by: Karl Rister <krister@redhat.com>
Signed-off-by: Flavio Leitner <fbl@redhat.com>
---
 lib/dpif-netdev.c |  5 +++--
 lib/ovs-rcu.c     | 36 ++++++++++++++++++++++++++++++++++--
 lib/ovs-rcu.h     |  1 +
 lib/seq.c         | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
 lib/seq.h         |  5 +++++
 5 files changed, 89 insertions(+), 7 deletions(-)

v3:
   - addressed clang annotation feedbacks from Daniele
   - tested over 4 days without spikes or other issues

v2:
   - expanded SEQ API instead of using recursive lock.

Comments

Jarno Rajahalme June 29, 2016, 12:04 p.m. UTC | #1
> On Jun 23, 2016, at 11:40 AM, Flavio Leitner <fbl@redhat.com> wrote:
> 
> The PMD thread needs to keep processing RX queues in order
> to achieve maximum throughput. It also needs to sweep emc
> cache and quiesce which use seq_mutex. That mutex can
> eventually block the PMD thread causing latency spikes and
> affecting the throughput.
> 
> Since there is no requirement for running those tasks at a
> specific time, this patch extend seq API to allow tentative
> locking instead.

I guess the risk that quiescing would be skipped many times in a row is pretty low?

> 
> Reported-by: Karl Rister <krister@redhat.com>
> Co-authored-by: Karl Rister <krister@redhat.com>
> Signed-off-by: Flavio Leitner <fbl@redhat.com>
> ---
> lib/dpif-netdev.c |  5 +++--
> lib/ovs-rcu.c     | 36 ++++++++++++++++++++++++++++++++++--
> lib/ovs-rcu.h     |  1 +
> lib/seq.c         | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
> lib/seq.h         |  5 +++++
> 5 files changed, 89 insertions(+), 7 deletions(-)
> 
> v3:
>   - addressed clang annotation feedbacks from Daniele
>   - tested over 4 days without spikes or other issues
> 
> v2:
>   - expanded SEQ API instead of using recursive lock.
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 8d39d9e..8749108 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -2872,9 +2872,10 @@ reload:
> 
>             lc = 0;
> 
> -            emc_cache_slow_sweep(&pmd->flow_cache);
>             coverage_try_clear();
> -            ovsrcu_quiesce();
> +            if (!ovsrcu_try_quiesce()) {
> +                emc_cache_slow_sweep(&pmd->flow_cache);
> +            }
> 
>             atomic_read_relaxed(&pmd->change_seq, &seq);
>             if (seq != port_seq) {
> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
> index 0ff508e..4ec3dc0 100644
> --- a/lib/ovs-rcu.c
> +++ b/lib/ovs-rcu.c
> @@ -57,6 +57,7 @@ static struct guarded_list flushed_cbsets;
> static struct seq *flushed_cbsets_seq;
> 
> static void ovsrcu_init_module(void);
> +static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
> static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
> static void ovsrcu_unregister__(struct ovsrcu_perthread *);
> static bool ovsrcu_call_postponed(void);
> @@ -151,6 +152,27 @@ ovsrcu_quiesce(void)
>     ovsrcu_quiesced();
> }
> 
> +int
> +ovsrcu_try_quiesce(void)
> +{
> +    struct ovsrcu_perthread *perthread;
> +    int ret = -1;

Maybe the error case return should be EBUSY as documented for the trylock functions in lib/ovs-thread.h?

> +
> +    ovs_assert(!single_threaded());
> +    perthread = ovsrcu_perthread_get();

Is there a particular reason not to do also these only when the locking succeeds? E.g., 

{
    int error = seq_try_lock();
    if (error) {
        return error;
    }
    ...

> +    if (!seq_try_lock()) {
> +        perthread->seqno = seq_read_protected(global_seqno);
> +        if (perthread->cbset) {
> +            ovsrcu_flush_cbset__(perthread, true);
> +        }
> +        seq_change_protected(global_seqno);
> +        seq_unlock();
> +        ovsrcu_quiesced();
> +        ret = 0;
> +    }
> +    return ret;
> +}
> +
> bool
> ovsrcu_is_quiescent(void)
> {
> @@ -292,7 +314,7 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
> }
> 
> static void
> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> +ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
> {
>     struct ovsrcu_cbset *cbset = perthread->cbset;
> 
> @@ -300,11 +322,21 @@ ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
>         guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
>         perthread->cbset = NULL;
> 
> -        seq_change(flushed_cbsets_seq);
> +        if (protected) {
> +            seq_change_protected(flushed_cbsets_seq);
> +        } else {
> +            seq_change(flushed_cbsets_seq);
> +        }
>     }
> }
> 
> static void
> +ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> +{
> +    ovsrcu_flush_cbset__(perthread, false);
> +}
> +
> +static void
> ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
> {
>     if (perthread->cbset) {
> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
> index 8750ead..dc75749 100644
> --- a/lib/ovs-rcu.h
> +++ b/lib/ovs-rcu.h
> @@ -217,6 +217,7 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux);
> void ovsrcu_quiesce_start(void);
> void ovsrcu_quiesce_end(void);
> void ovsrcu_quiesce(void);
> +int ovsrcu_try_quiesce(void);
> bool ovsrcu_is_quiescent(void);
> 
> /* Synchronization.  Waits for all non-quiescent threads to quiesce at least
> diff --git a/lib/seq.c b/lib/seq.c
> index 57e0775..f2e630c 100644
> --- a/lib/seq.c
> +++ b/lib/seq.c
> @@ -100,6 +100,38 @@ seq_destroy(struct seq *seq)
>     ovs_mutex_unlock(&seq_mutex);
> }
> 
> +int
> +seq_try_lock(void)
> +{
> +    return ovs_mutex_trylock(&seq_mutex);
> +}
> +
> +void
> +seq_lock(void)
> +     OVS_ACQUIRES(seq_mutex)
> +{
> +    ovs_mutex_lock(&seq_mutex);
> +}
> +
> +void
> +seq_unlock(void)
> +    OVS_RELEASES(seq_mutex)
> +{
> +    ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Increments 'seq''s sequence number, waking up any threads that are waiting
> + * on 'seq'. */
> +void
> +seq_change_protected(struct seq *seq)
> +    OVS_REQUIRES(seq_mutex)
> +{
> +    COVERAGE_INC(seq_change);
> +
> +    seq->value = seq_next++;
> +    seq_wake_waiters(seq);
> +}
> +
> /* Increments 'seq''s sequence number, waking up any threads that are waiting
>  * on 'seq'. */
> void
> @@ -109,8 +141,7 @@ seq_change(struct seq *seq)
>     COVERAGE_INC(seq_change);
> 
>     ovs_mutex_lock(&seq_mutex);
> -    seq->value = seq_next++;
> -    seq_wake_waiters(seq);
> +    seq_change_protected(seq);
>     ovs_mutex_unlock(&seq_mutex);
> }
> 
> @@ -120,13 +151,25 @@ seq_change(struct seq *seq)
>  * when an object changes, even without an ability to lock the object.  See
>  * Usage in seq.h for details. */
> uint64_t
> +seq_read_protected(const struct seq *seq)
> +    OVS_REQUIRES(seq_mutex)
> +{
> +    return seq->value;
> +}
> +
> +/* Returns 'seq''s current sequence number (which could change immediately).
> + *
> + * seq_read() and seq_wait() can be used together to yield a race-free wakeup
> + * when an object changes, even without an ability to lock the object.  See
> + * Usage in seq.h for details. */
> +uint64_t
> seq_read(const struct seq *seq)
>     OVS_EXCLUDED(seq_mutex)
> {
>     uint64_t value;
> 
>     ovs_mutex_lock(&seq_mutex);
> -    value = seq->value;
> +    value = seq_read_protected(seq);
>     ovs_mutex_unlock(&seq_mutex);
> 
>     return value;
> diff --git a/lib/seq.h b/lib/seq.h
> index f3f4b53..221ab9a 100644
> --- a/lib/seq.h
> +++ b/lib/seq.h
> @@ -121,9 +121,14 @@
> struct seq *seq_create(void);
> void seq_destroy(struct seq *);
> void seq_change(struct seq *);
> +void seq_change_protected(struct seq *);
> +void seq_lock(void);
> +int seq_try_lock(void);
> +void seq_unlock(void);
> 
> /* For observers. */
> uint64_t seq_read(const struct seq *);
> +uint64_t seq_read_protected(const struct seq *);
> 
> void seq_wait_at(const struct seq *, uint64_t value, const char *where);
> #define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)
> -- 
> 2.5.5
> 
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
Flavio Leitner June 29, 2016, 11:58 p.m. UTC | #2
On Wed, Jun 29, 2016 at 05:04:06AM -0700, Jarno Rajahalme wrote:
> 
> > On Jun 23, 2016, at 11:40 AM, Flavio Leitner <fbl@redhat.com> wrote:
> > 
> > The PMD thread needs to keep processing RX queues in order
> > to achieve maximum throughput. It also needs to sweep emc
> > cache and quiesce which use seq_mutex. That mutex can
> > eventually block the PMD thread causing latency spikes and
> > affecting the throughput.
> > 
> > Since there is no requirement for running those tasks at a
> > specific time, this patch extend seq API to allow tentative
> > locking instead.
> 
> I guess the risk that quiescing would be skipped many times in a row is pretty low?

Yes.  But if that becomes a problem, it is easy to change the
code to try harder to quiesce.

> > Reported-by: Karl Rister <krister@redhat.com>
> > Co-authored-by: Karl Rister <krister@redhat.com>
> > Signed-off-by: Flavio Leitner <fbl@redhat.com>
> > ---
> > lib/dpif-netdev.c |  5 +++--
> > lib/ovs-rcu.c     | 36 ++++++++++++++++++++++++++++++++++--
> > lib/ovs-rcu.h     |  1 +
> > lib/seq.c         | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
> > lib/seq.h         |  5 +++++
> > 5 files changed, 89 insertions(+), 7 deletions(-)
> > 
> > v3:
> >   - addressed clang annotation feedbacks from Daniele
> >   - tested over 4 days without spikes or other issues
> > 
> > v2:
> >   - expanded SEQ API instead of using recursive lock.
> > 
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > index 8d39d9e..8749108 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -2872,9 +2872,10 @@ reload:
> > 
> >             lc = 0;
> > 
> > -            emc_cache_slow_sweep(&pmd->flow_cache);
> >             coverage_try_clear();
> > -            ovsrcu_quiesce();
> > +            if (!ovsrcu_try_quiesce()) {
> > +                emc_cache_slow_sweep(&pmd->flow_cache);
> > +            }
> > 
> >             atomic_read_relaxed(&pmd->change_seq, &seq);
> >             if (seq != port_seq) {
> > diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
> > index 0ff508e..4ec3dc0 100644
> > --- a/lib/ovs-rcu.c
> > +++ b/lib/ovs-rcu.c
> > @@ -57,6 +57,7 @@ static struct guarded_list flushed_cbsets;
> > static struct seq *flushed_cbsets_seq;
> > 
> > static void ovsrcu_init_module(void);
> > +static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
> > static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
> > static void ovsrcu_unregister__(struct ovsrcu_perthread *);
> > static bool ovsrcu_call_postponed(void);
> > @@ -151,6 +152,27 @@ ovsrcu_quiesce(void)
> >     ovsrcu_quiesced();
> > }
> > 
> > +int
> > +ovsrcu_try_quiesce(void)
> > +{
> > +    struct ovsrcu_perthread *perthread;
> > +    int ret = -1;
> 
> Maybe the error case return should be EBUSY as documented for the
> trylock functions in lib/ovs-thread.h?

I will fix that.

> 
> > +
> > +    ovs_assert(!single_threaded());
> > +    perthread = ovsrcu_perthread_get();
> 
> Is there a particular reason not to do also these only when the locking succeeds? E.g., 

The reason was to not hold the lock more than needed even
though the above lines should be fast enough today. Are you
saying that the code looks significantly better if we try
locking first at the beginning?

Thanks!
fbl

> 
> {
>     int error = seq_try_lock();
>     if (error) {
>         return error;
>     }
>     ...
> 
> > +    if (!seq_try_lock()) {
> > +        perthread->seqno = seq_read_protected(global_seqno);
> > +        if (perthread->cbset) {
> > +            ovsrcu_flush_cbset__(perthread, true);
> > +        }
> > +        seq_change_protected(global_seqno);
> > +        seq_unlock();
> > +        ovsrcu_quiesced();
> > +        ret = 0;
> > +    }
> > +    return ret;
> > +}
> > +
> > bool
> > ovsrcu_is_quiescent(void)
> > {
> > @@ -292,7 +314,7 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
> > }
> > 
> > static void
> > -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> > +ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
> > {
> >     struct ovsrcu_cbset *cbset = perthread->cbset;
> > 
> > @@ -300,11 +322,21 @@ ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> >         guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
> >         perthread->cbset = NULL;
> > 
> > -        seq_change(flushed_cbsets_seq);
> > +        if (protected) {
> > +            seq_change_protected(flushed_cbsets_seq);
> > +        } else {
> > +            seq_change(flushed_cbsets_seq);
> > +        }
> >     }
> > }
> > 
> > static void
> > +ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> > +{
> > +    ovsrcu_flush_cbset__(perthread, false);
> > +}
> > +
> > +static void
> > ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
> > {
> >     if (perthread->cbset) {
> > diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
> > index 8750ead..dc75749 100644
> > --- a/lib/ovs-rcu.h
> > +++ b/lib/ovs-rcu.h
> > @@ -217,6 +217,7 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux);
> > void ovsrcu_quiesce_start(void);
> > void ovsrcu_quiesce_end(void);
> > void ovsrcu_quiesce(void);
> > +int ovsrcu_try_quiesce(void);
> > bool ovsrcu_is_quiescent(void);
> > 
> > /* Synchronization.  Waits for all non-quiescent threads to quiesce at least
> > diff --git a/lib/seq.c b/lib/seq.c
> > index 57e0775..f2e630c 100644
> > --- a/lib/seq.c
> > +++ b/lib/seq.c
> > @@ -100,6 +100,38 @@ seq_destroy(struct seq *seq)
> >     ovs_mutex_unlock(&seq_mutex);
> > }
> > 
> > +int
> > +seq_try_lock(void)
> > +{
> > +    return ovs_mutex_trylock(&seq_mutex);
> > +}
> > +
> > +void
> > +seq_lock(void)
> > +     OVS_ACQUIRES(seq_mutex)
> > +{
> > +    ovs_mutex_lock(&seq_mutex);
> > +}
> > +
> > +void
> > +seq_unlock(void)
> > +    OVS_RELEASES(seq_mutex)
> > +{
> > +    ovs_mutex_unlock(&seq_mutex);
> > +}
> > +
> > +/* Increments 'seq''s sequence number, waking up any threads that are waiting
> > + * on 'seq'. */
> > +void
> > +seq_change_protected(struct seq *seq)
> > +    OVS_REQUIRES(seq_mutex)
> > +{
> > +    COVERAGE_INC(seq_change);
> > +
> > +    seq->value = seq_next++;
> > +    seq_wake_waiters(seq);
> > +}
> > +
> > /* Increments 'seq''s sequence number, waking up any threads that are waiting
> >  * on 'seq'. */
> > void
> > @@ -109,8 +141,7 @@ seq_change(struct seq *seq)
> >     COVERAGE_INC(seq_change);
> > 
> >     ovs_mutex_lock(&seq_mutex);
> > -    seq->value = seq_next++;
> > -    seq_wake_waiters(seq);
> > +    seq_change_protected(seq);
> >     ovs_mutex_unlock(&seq_mutex);
> > }
> > 
> > @@ -120,13 +151,25 @@ seq_change(struct seq *seq)
> >  * when an object changes, even without an ability to lock the object.  See
> >  * Usage in seq.h for details. */
> > uint64_t
> > +seq_read_protected(const struct seq *seq)
> > +    OVS_REQUIRES(seq_mutex)
> > +{
> > +    return seq->value;
> > +}
> > +
> > +/* Returns 'seq''s current sequence number (which could change immediately).
> > + *
> > + * seq_read() and seq_wait() can be used together to yield a race-free wakeup
> > + * when an object changes, even without an ability to lock the object.  See
> > + * Usage in seq.h for details. */
> > +uint64_t
> > seq_read(const struct seq *seq)
> >     OVS_EXCLUDED(seq_mutex)
> > {
> >     uint64_t value;
> > 
> >     ovs_mutex_lock(&seq_mutex);
> > -    value = seq->value;
> > +    value = seq_read_protected(seq);
> >     ovs_mutex_unlock(&seq_mutex);
> > 
> >     return value;
> > diff --git a/lib/seq.h b/lib/seq.h
> > index f3f4b53..221ab9a 100644
> > --- a/lib/seq.h
> > +++ b/lib/seq.h
> > @@ -121,9 +121,14 @@
> > struct seq *seq_create(void);
> > void seq_destroy(struct seq *);
> > void seq_change(struct seq *);
> > +void seq_change_protected(struct seq *);
> > +void seq_lock(void);
> > +int seq_try_lock(void);
> > +void seq_unlock(void);
> > 
> > /* For observers. */
> > uint64_t seq_read(const struct seq *);
> > +uint64_t seq_read_protected(const struct seq *);
> > 
> > void seq_wait_at(const struct seq *, uint64_t value, const char *where);
> > #define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)
> > -- 
> > 2.5.5
> > 
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > http://openvswitch.org/mailman/listinfo/dev
>
Jarno Rajahalme July 4, 2016, 10:48 a.m. UTC | #3
> On Jun 29, 2016, at 4:58 PM, Flavio Leitner <fbl@redhat.com> wrote:
> 
> On Wed, Jun 29, 2016 at 05:04:06AM -0700, Jarno Rajahalme wrote:
>> 
>>> On Jun 23, 2016, at 11:40 AM, Flavio Leitner <fbl@redhat.com> wrote:
>>> 
...
> 
>>> +
>>> +    ovs_assert(!single_threaded());
>>> +    perthread = ovsrcu_perthread_get();
>> 
>> Is there a particular reason not to do also these only when the locking succeeds? E.g., 
> 
> The reason was to not hold the lock more than needed even
> though the above lines should be fast enough today. Are you
> saying that the code looks significantly better if we try
> locking first at the beginning?

I guess in addition to aesthetics my thinking was not to do unnecessary things if the locking does not succeed, but your design argument is better, especially when we expect the locking to succeed most of the time.

  Jarno
Flavio Leitner July 5, 2016, 7:04 p.m. UTC | #4
On Mon, Jul 04, 2016 at 03:48:30AM -0700, Jarno Rajahalme wrote:
> 
> > On Jun 29, 2016, at 4:58 PM, Flavio Leitner <fbl@redhat.com> wrote:
> > 
> > On Wed, Jun 29, 2016 at 05:04:06AM -0700, Jarno Rajahalme wrote:
> >> 
> >>> On Jun 23, 2016, at 11:40 AM, Flavio Leitner <fbl@redhat.com> wrote:
> >>> 
> ...
> > 
> >>> +
> >>> +    ovs_assert(!single_threaded());
> >>> +    perthread = ovsrcu_perthread_get();
> >> 
> >> Is there a particular reason not to do also these only when the locking succeeds? E.g., 
> > 
> > The reason was to not hold the lock more than needed even
> > though the above lines should be fast enough today. Are you
> > saying that the code looks significantly better if we try
> > locking first at the beginning?
> 
> I guess in addition to aesthetics my thinking was not to do
> unnecessary things if the locking does not succeed, but your design
> argument is better, especially when we expect the locking to succeed
> most of the time.

I sent the v4 with the return code fixed:
http://openvswitch.org/pipermail/dev/2016-July/074505.html
Thanks for your review!
fbl
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 8d39d9e..8749108 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -2872,9 +2872,10 @@  reload:
 
             lc = 0;
 
-            emc_cache_slow_sweep(&pmd->flow_cache);
             coverage_try_clear();
-            ovsrcu_quiesce();
+            if (!ovsrcu_try_quiesce()) {
+                emc_cache_slow_sweep(&pmd->flow_cache);
+            }
 
             atomic_read_relaxed(&pmd->change_seq, &seq);
             if (seq != port_seq) {
diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
index 0ff508e..4ec3dc0 100644
--- a/lib/ovs-rcu.c
+++ b/lib/ovs-rcu.c
@@ -57,6 +57,7 @@  static struct guarded_list flushed_cbsets;
 static struct seq *flushed_cbsets_seq;
 
 static void ovsrcu_init_module(void);
+static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
 static bool ovsrcu_call_postponed(void);
@@ -151,6 +152,27 @@  ovsrcu_quiesce(void)
     ovsrcu_quiesced();
 }
 
+int
+ovsrcu_try_quiesce(void)
+{
+    struct ovsrcu_perthread *perthread;
+    int ret = -1;
+
+    ovs_assert(!single_threaded());
+    perthread = ovsrcu_perthread_get();
+    if (!seq_try_lock()) {
+        perthread->seqno = seq_read_protected(global_seqno);
+        if (perthread->cbset) {
+            ovsrcu_flush_cbset__(perthread, true);
+        }
+        seq_change_protected(global_seqno);
+        seq_unlock();
+        ovsrcu_quiesced();
+        ret = 0;
+    }
+    return ret;
+}
+
 bool
 ovsrcu_is_quiescent(void)
 {
@@ -292,7 +314,7 @@  ovsrcu_postpone_thread(void *arg OVS_UNUSED)
 }
 
 static void
-ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
 {
     struct ovsrcu_cbset *cbset = perthread->cbset;
 
@@ -300,11 +322,21 @@  ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
         guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
         perthread->cbset = NULL;
 
-        seq_change(flushed_cbsets_seq);
+        if (protected) {
+            seq_change_protected(flushed_cbsets_seq);
+        } else {
+            seq_change(flushed_cbsets_seq);
+        }
     }
 }
 
 static void
+ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+{
+    ovsrcu_flush_cbset__(perthread, false);
+}
+
+static void
 ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
 {
     if (perthread->cbset) {
diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
index 8750ead..dc75749 100644
--- a/lib/ovs-rcu.h
+++ b/lib/ovs-rcu.h
@@ -217,6 +217,7 @@  void ovsrcu_postpone__(void (*function)(void *aux), void *aux);
 void ovsrcu_quiesce_start(void);
 void ovsrcu_quiesce_end(void);
 void ovsrcu_quiesce(void);
+int ovsrcu_try_quiesce(void);
 bool ovsrcu_is_quiescent(void);
 
 /* Synchronization.  Waits for all non-quiescent threads to quiesce at least
diff --git a/lib/seq.c b/lib/seq.c
index 57e0775..f2e630c 100644
--- a/lib/seq.c
+++ b/lib/seq.c
@@ -100,6 +100,38 @@  seq_destroy(struct seq *seq)
     ovs_mutex_unlock(&seq_mutex);
 }
 
+int
+seq_try_lock(void)
+{
+    return ovs_mutex_trylock(&seq_mutex);
+}
+
+void
+seq_lock(void)
+     OVS_ACQUIRES(seq_mutex)
+{
+    ovs_mutex_lock(&seq_mutex);
+}
+
+void
+seq_unlock(void)
+    OVS_RELEASES(seq_mutex)
+{
+    ovs_mutex_unlock(&seq_mutex);
+}
+
+/* Increments 'seq''s sequence number, waking up any threads that are waiting
+ * on 'seq'. */
+void
+seq_change_protected(struct seq *seq)
+    OVS_REQUIRES(seq_mutex)
+{
+    COVERAGE_INC(seq_change);
+
+    seq->value = seq_next++;
+    seq_wake_waiters(seq);
+}
+
 /* Increments 'seq''s sequence number, waking up any threads that are waiting
  * on 'seq'. */
 void
@@ -109,8 +141,7 @@  seq_change(struct seq *seq)
     COVERAGE_INC(seq_change);
 
     ovs_mutex_lock(&seq_mutex);
-    seq->value = seq_next++;
-    seq_wake_waiters(seq);
+    seq_change_protected(seq);
     ovs_mutex_unlock(&seq_mutex);
 }
 
@@ -120,13 +151,25 @@  seq_change(struct seq *seq)
  * when an object changes, even without an ability to lock the object.  See
  * Usage in seq.h for details. */
 uint64_t
+seq_read_protected(const struct seq *seq)
+    OVS_REQUIRES(seq_mutex)
+{
+    return seq->value;
+}
+
+/* Returns 'seq''s current sequence number (which could change immediately).
+ *
+ * seq_read() and seq_wait() can be used together to yield a race-free wakeup
+ * when an object changes, even without an ability to lock the object.  See
+ * Usage in seq.h for details. */
+uint64_t
 seq_read(const struct seq *seq)
     OVS_EXCLUDED(seq_mutex)
 {
     uint64_t value;
 
     ovs_mutex_lock(&seq_mutex);
-    value = seq->value;
+    value = seq_read_protected(seq);
     ovs_mutex_unlock(&seq_mutex);
 
     return value;
diff --git a/lib/seq.h b/lib/seq.h
index f3f4b53..221ab9a 100644
--- a/lib/seq.h
+++ b/lib/seq.h
@@ -121,9 +121,14 @@ 
 struct seq *seq_create(void);
 void seq_destroy(struct seq *);
 void seq_change(struct seq *);
+void seq_change_protected(struct seq *);
+void seq_lock(void);
+int seq_try_lock(void);
+void seq_unlock(void);
 
 /* For observers. */
 uint64_t seq_read(const struct seq *);
+uint64_t seq_read_protected(const struct seq *);
 
 void seq_wait_at(const struct seq *, uint64_t value, const char *where);
 #define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)