diff mbox

[ovs-dev,v2] dpif-netdev: Remove PMD latency on seq_mutex

Message ID 1462387781-27467-1-git-send-email-fbl@redhat.com
State Changes Requested
Headers show

Commit Message

Flavio Leitner May 4, 2016, 6:49 p.m. UTC
The PMD thread needs to keep processing RX queues in order
to achieve maximum throughput. It also needs to sweep emc
cache and quiesce which use seq_mutex. That mutex can
eventually block the PMD thread causing latency spikes and
affecting the throughput.

Since there is no requirement for running those tasks at a
specific time, this patch extend seq API to allow tentative
locking instead.

Reported-by: Karl Rister <krister@redhat.com>
Signed-off-by: Flavio Leitner <fbl@redhat.com>
---
 lib/dpif-netdev.c |  5 +++--
 lib/ovs-rcu.c     | 36 ++++++++++++++++++++++++++++++++++--
 lib/ovs-rcu.h     |  1 +
 lib/seq.c         | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
 lib/seq.h         |  5 +++++
 5 files changed, 89 insertions(+), 7 deletions(-)

Comments

Daniele Di Proietto May 20, 2016, 1:54 a.m. UTC | #1
Hi Flavio,

thanks for the patch, it looks good to me

Some minor comments inline

2016-05-04 11:49 GMT-07:00 Flavio Leitner <fbl@redhat.com>:

> The PMD thread needs to keep processing RX queues in order
> to achieve maximum throughput. It also needs to sweep emc
> cache and quiesce which use seq_mutex. That mutex can
> eventually block the PMD thread causing latency spikes and
> affecting the throughput.
>
> Since there is no requirement for running those tasks at a
> specific time, this patch extend seq API to allow tentative
> locking instead.
>
> Reported-by: Karl Rister <krister@redhat.com>
> Signed-off-by: Flavio Leitner <fbl@redhat.com>
> ---
>  lib/dpif-netdev.c |  5 +++--
>  lib/ovs-rcu.c     | 36 ++++++++++++++++++++++++++++++++++--
>  lib/ovs-rcu.h     |  1 +
>  lib/seq.c         | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
>  lib/seq.h         |  5 +++++
>  5 files changed, 89 insertions(+), 7 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 1e8a37c..cab3058 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -2698,9 +2698,10 @@ reload:
>
>              lc = 0;
>
> -            emc_cache_slow_sweep(&pmd->flow_cache);
>              coverage_try_clear();
> -            ovsrcu_quiesce();
> +            if (!ovsrcu_try_quiesce()) {
> +                emc_cache_slow_sweep(&pmd->flow_cache);
> +            }
>

>              atomic_read_relaxed(&pmd->change_seq, &seq);
>              if (seq != port_seq) {
> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
> index 0ff508e..4ec3dc0 100644
> --- a/lib/ovs-rcu.c
> +++ b/lib/ovs-rcu.c
> @@ -57,6 +57,7 @@ static struct guarded_list flushed_cbsets;
>  static struct seq *flushed_cbsets_seq;
>
>  static void ovsrcu_init_module(void);
> +static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
>  static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
>  static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>  static bool ovsrcu_call_postponed(void);
> @@ -151,6 +152,27 @@ ovsrcu_quiesce(void)
>      ovsrcu_quiesced();
>  }
>
> +int
> +ovsrcu_try_quiesce(void)
> +{
> +    struct ovsrcu_perthread *perthread;
> +    int ret = -1;
> +
> +    ovs_assert(!single_threaded());
>

I'm curious: why is the above necessary?


> +    perthread = ovsrcu_perthread_get();
> +    if (!seq_try_lock()) {
> +        perthread->seqno = seq_read_protected(global_seqno);
> +        if (perthread->cbset) {
> +            ovsrcu_flush_cbset__(perthread, true);
> +        }
> +        seq_change_protected(global_seqno);
> +        seq_unlock();
> +        ovsrcu_quiesced();
> +        ret = 0;
> +    }
> +    return ret;
> +}
> +
>  bool
>  ovsrcu_is_quiescent(void)
>  {
> @@ -292,7 +314,7 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>  }
>
>  static void
> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> +ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
>  {
>      struct ovsrcu_cbset *cbset = perthread->cbset;
>
> @@ -300,11 +322,21 @@ ovsrcu_flush_cbset(struct ovsrcu_perthread
> *perthread)
>          guarded_list_push_back(&flushed_cbsets, &cbset->list_node,
> SIZE_MAX);
>          perthread->cbset = NULL;
>
> -        seq_change(flushed_cbsets_seq);
> +        if (protected) {
> +            seq_change_protected(flushed_cbsets_seq);
> +        } else {
> +            seq_change(flushed_cbsets_seq);
> +        }
>      }
>  }
>
>  static void
> +ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> +{
> +    ovsrcu_flush_cbset__(perthread, false);
> +}
> +
> +static void
>  ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>  {
>      if (perthread->cbset) {
> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
> index 600be0b..e4dcd7d 100644
> --- a/lib/ovs-rcu.h
> +++ b/lib/ovs-rcu.h
> @@ -218,6 +218,7 @@ void ovsrcu_postpone__(void (*function)(void *aux),
> void *aux);
>  void ovsrcu_quiesce_start(void);
>  void ovsrcu_quiesce_end(void);
>  void ovsrcu_quiesce(void);
> +int ovsrcu_try_quiesce(void);
>  bool ovsrcu_is_quiescent(void);
>
>  /* Synchronization.  Waits for all non-quiescent threads to quiesce at
> least
> diff --git a/lib/seq.c b/lib/seq.c
> index 57e0775..9abebdb 100644
> --- a/lib/seq.c
> +++ b/lib/seq.c
> @@ -100,6 +100,38 @@ seq_destroy(struct seq *seq)
>      ovs_mutex_unlock(&seq_mutex);
>  }
>
> +int
> +seq_try_lock(void)
> +{
> +    return ovs_mutex_trylock(&seq_mutex);
> +}
> +
> +void
> +seq_lock(void)
> +     OVS_EXCLUDED(seq_mutex)
>

My clang (3.5) complains about this

OVS_ACQUIRES appear to work


> +{
> +    ovs_mutex_lock(&seq_mutex);
> +}
> +
> +void
> +seq_unlock(void)
> +    OVS_REQUIRES(seq_mutex)
>

My clang (3.5) complains about this

OVS_RELEASES appear to work


> +{
> +    ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Increments 'seq''s sequence number, waking up any threads that are
> waiting
> + * on 'seq'. */
> +void
> +seq_change_protected(struct seq *seq)
> +    OVS_REQUIRES(seq_mutex)
> +{
> +    COVERAGE_INC(seq_change);
> +
> +    seq->value = seq_next++;
> +    seq_wake_waiters(seq);
> +}
> +
>  /* Increments 'seq''s sequence number, waking up any threads that are
> waiting
>   * on 'seq'. */
>  void
> @@ -109,8 +141,7 @@ seq_change(struct seq *seq)
>      COVERAGE_INC(seq_change);
>
>      ovs_mutex_lock(&seq_mutex);
> -    seq->value = seq_next++;
> -    seq_wake_waiters(seq);
> +    seq_change_protected(seq);
>      ovs_mutex_unlock(&seq_mutex);
>  }
>
> @@ -120,13 +151,25 @@ seq_change(struct seq *seq)
>   * when an object changes, even without an ability to lock the object.
> See
>   * Usage in seq.h for details. */
>  uint64_t
> +seq_read_protected(const struct seq *seq)
> +    OVS_REQUIRES(seq_mutex)
> +{
> +    return seq->value;
> +}
> +
> +/* Returns 'seq''s current sequence number (which could change
> immediately).
> + *
> + * seq_read() and seq_wait() can be used together to yield a race-free
> wakeup
> + * when an object changes, even without an ability to lock the object.
> See
> + * Usage in seq.h for details. */
> +uint64_t
>  seq_read(const struct seq *seq)
>      OVS_EXCLUDED(seq_mutex)
>  {
>      uint64_t value;
>
>      ovs_mutex_lock(&seq_mutex);
> -    value = seq->value;
> +    value = seq_read_protected(seq);
>      ovs_mutex_unlock(&seq_mutex);
>
>      return value;
> diff --git a/lib/seq.h b/lib/seq.h
> index f3f4b53..221ab9a 100644
> --- a/lib/seq.h
> +++ b/lib/seq.h
> @@ -121,9 +121,14 @@
>  struct seq *seq_create(void);
>  void seq_destroy(struct seq *);
>  void seq_change(struct seq *);
> +void seq_change_protected(struct seq *);
> +void seq_lock(void);
> +int seq_try_lock(void);
> +void seq_unlock(void);
>
>  /* For observers. */
>  uint64_t seq_read(const struct seq *);
> +uint64_t seq_read_protected(const struct seq *);
>
>  void seq_wait_at(const struct seq *, uint64_t value, const char *where);
>  #define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)
> --
> 2.5.5
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
Flavio Leitner May 25, 2016, 12:52 a.m. UTC | #2
On Thu, May 19, 2016 at 06:54:39PM -0700, Daniele Di Proietto wrote:
> Hi Flavio,
> 
> thanks for the patch, it looks good to me

Don't apply this yet.  Although I could not see any issues
during 24 hours testing, Karl seems to have found a spike
which I would like to investigate further.

> Some minor comments inline

OK
 
> 2016-05-04 11:49 GMT-07:00 Flavio Leitner <fbl@redhat.com>:
> 
> > The PMD thread needs to keep processing RX queues in order
> > to achieve maximum throughput. It also needs to sweep emc
> > cache and quiesce which use seq_mutex. That mutex can
> > eventually block the PMD thread causing latency spikes and
> > affecting the throughput.
> >
> > Since there is no requirement for running those tasks at a
> > specific time, this patch extend seq API to allow tentative
> > locking instead.
> >
> > Reported-by: Karl Rister <krister@redhat.com>
> > Signed-off-by: Flavio Leitner <fbl@redhat.com>
> > ---
> >  lib/dpif-netdev.c |  5 +++--
> >  lib/ovs-rcu.c     | 36 ++++++++++++++++++++++++++++++++++--
> >  lib/ovs-rcu.h     |  1 +
> >  lib/seq.c         | 49 ++++++++++++++++++++++++++++++++++++++++++++++---
> >  lib/seq.h         |  5 +++++
> >  5 files changed, 89 insertions(+), 7 deletions(-)
> >
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > index 1e8a37c..cab3058 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -2698,9 +2698,10 @@ reload:
> >
> >              lc = 0;
> >
> > -            emc_cache_slow_sweep(&pmd->flow_cache);
> >              coverage_try_clear();
> > -            ovsrcu_quiesce();
> > +            if (!ovsrcu_try_quiesce()) {
> > +                emc_cache_slow_sweep(&pmd->flow_cache);
> > +            }
> >
> 
> >              atomic_read_relaxed(&pmd->change_seq, &seq);
> >              if (seq != port_seq) {
> > diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
> > index 0ff508e..4ec3dc0 100644
> > --- a/lib/ovs-rcu.c
> > +++ b/lib/ovs-rcu.c
> > @@ -57,6 +57,7 @@ static struct guarded_list flushed_cbsets;
> >  static struct seq *flushed_cbsets_seq;
> >
> >  static void ovsrcu_init_module(void);
> > +static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
> >  static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
> >  static void ovsrcu_unregister__(struct ovsrcu_perthread *);
> >  static bool ovsrcu_call_postponed(void);
> > @@ -151,6 +152,27 @@ ovsrcu_quiesce(void)
> >      ovsrcu_quiesced();
> >  }
> >
> > +int
> > +ovsrcu_try_quiesce(void)
> > +{
> > +    struct ovsrcu_perthread *perthread;
> > +    int ret = -1;
> > +
> > +    ovs_assert(!single_threaded());
> >
> 
> I'm curious: why is the above necessary?

Because ovsrcu_quiesced() will run RCU callbacks for single
thread and I cannot guarantee that these calls won't use
seq lock.

> > +    perthread = ovsrcu_perthread_get();
> > +    if (!seq_try_lock()) {
> > +        perthread->seqno = seq_read_protected(global_seqno);
> > +        if (perthread->cbset) {
> > +            ovsrcu_flush_cbset__(perthread, true);
> > +        }
> > +        seq_change_protected(global_seqno);
> > +        seq_unlock();
> > +        ovsrcu_quiesced();
> > +        ret = 0;
> > +    }
> > +    return ret;
> > +}
> > +
> >  bool
> >  ovsrcu_is_quiescent(void)
> >  {
> > @@ -292,7 +314,7 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
> >  }
> >
> >  static void
> > -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> > +ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
> >  {
> >      struct ovsrcu_cbset *cbset = perthread->cbset;
> >
> > @@ -300,11 +322,21 @@ ovsrcu_flush_cbset(struct ovsrcu_perthread
> > *perthread)
> >          guarded_list_push_back(&flushed_cbsets, &cbset->list_node,
> > SIZE_MAX);
> >          perthread->cbset = NULL;
> >
> > -        seq_change(flushed_cbsets_seq);
> > +        if (protected) {
> > +            seq_change_protected(flushed_cbsets_seq);
> > +        } else {
> > +            seq_change(flushed_cbsets_seq);
> > +        }
> >      }
> >  }
> >
> >  static void
> > +ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> > +{
> > +    ovsrcu_flush_cbset__(perthread, false);
> > +}
> > +
> > +static void
> >  ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
> >  {
> >      if (perthread->cbset) {
> > diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
> > index 600be0b..e4dcd7d 100644
> > --- a/lib/ovs-rcu.h
> > +++ b/lib/ovs-rcu.h
> > @@ -218,6 +218,7 @@ void ovsrcu_postpone__(void (*function)(void *aux),
> > void *aux);
> >  void ovsrcu_quiesce_start(void);
> >  void ovsrcu_quiesce_end(void);
> >  void ovsrcu_quiesce(void);
> > +int ovsrcu_try_quiesce(void);
> >  bool ovsrcu_is_quiescent(void);
> >
> >  /* Synchronization.  Waits for all non-quiescent threads to quiesce at
> > least
> > diff --git a/lib/seq.c b/lib/seq.c
> > index 57e0775..9abebdb 100644
> > --- a/lib/seq.c
> > +++ b/lib/seq.c
> > @@ -100,6 +100,38 @@ seq_destroy(struct seq *seq)
> >      ovs_mutex_unlock(&seq_mutex);
> >  }
> >
> > +int
> > +seq_try_lock(void)
> > +{
> > +    return ovs_mutex_trylock(&seq_mutex);
> > +}
> > +
> > +void
> > +seq_lock(void)
> > +     OVS_EXCLUDED(seq_mutex)
> >
> 
> My clang (3.5) complains about this

Works with 3.7.0 :-)

> OVS_ACQUIRES appear to work

That's certainly better, I will fix it.

> > +{
> > +    ovs_mutex_lock(&seq_mutex);
> > +}
> > +
> > +void
> > +seq_unlock(void)
> > +    OVS_REQUIRES(seq_mutex)
> >
> 
> My clang (3.5) complains about this
> 
> OVS_RELEASES appear to work

Same here.

Thanks for your feedback!
fbl
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 1e8a37c..cab3058 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -2698,9 +2698,10 @@  reload:
 
             lc = 0;
 
-            emc_cache_slow_sweep(&pmd->flow_cache);
             coverage_try_clear();
-            ovsrcu_quiesce();
+            if (!ovsrcu_try_quiesce()) {
+                emc_cache_slow_sweep(&pmd->flow_cache);
+            }
 
             atomic_read_relaxed(&pmd->change_seq, &seq);
             if (seq != port_seq) {
diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
index 0ff508e..4ec3dc0 100644
--- a/lib/ovs-rcu.c
+++ b/lib/ovs-rcu.c
@@ -57,6 +57,7 @@  static struct guarded_list flushed_cbsets;
 static struct seq *flushed_cbsets_seq;
 
 static void ovsrcu_init_module(void);
+static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
 static bool ovsrcu_call_postponed(void);
@@ -151,6 +152,27 @@  ovsrcu_quiesce(void)
     ovsrcu_quiesced();
 }
 
+int
+ovsrcu_try_quiesce(void)
+{
+    struct ovsrcu_perthread *perthread;
+    int ret = -1;
+
+    ovs_assert(!single_threaded());
+    perthread = ovsrcu_perthread_get();
+    if (!seq_try_lock()) {
+        perthread->seqno = seq_read_protected(global_seqno);
+        if (perthread->cbset) {
+            ovsrcu_flush_cbset__(perthread, true);
+        }
+        seq_change_protected(global_seqno);
+        seq_unlock();
+        ovsrcu_quiesced();
+        ret = 0;
+    }
+    return ret;
+}
+
 bool
 ovsrcu_is_quiescent(void)
 {
@@ -292,7 +314,7 @@  ovsrcu_postpone_thread(void *arg OVS_UNUSED)
 }
 
 static void
-ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
 {
     struct ovsrcu_cbset *cbset = perthread->cbset;
 
@@ -300,11 +322,21 @@  ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
         guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
         perthread->cbset = NULL;
 
-        seq_change(flushed_cbsets_seq);
+        if (protected) {
+            seq_change_protected(flushed_cbsets_seq);
+        } else {
+            seq_change(flushed_cbsets_seq);
+        }
     }
 }
 
 static void
+ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+{
+    ovsrcu_flush_cbset__(perthread, false);
+}
+
+static void
 ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
 {
     if (perthread->cbset) {
diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
index 600be0b..e4dcd7d 100644
--- a/lib/ovs-rcu.h
+++ b/lib/ovs-rcu.h
@@ -218,6 +218,7 @@  void ovsrcu_postpone__(void (*function)(void *aux), void *aux);
 void ovsrcu_quiesce_start(void);
 void ovsrcu_quiesce_end(void);
 void ovsrcu_quiesce(void);
+int ovsrcu_try_quiesce(void);
 bool ovsrcu_is_quiescent(void);
 
 /* Synchronization.  Waits for all non-quiescent threads to quiesce at least
diff --git a/lib/seq.c b/lib/seq.c
index 57e0775..9abebdb 100644
--- a/lib/seq.c
+++ b/lib/seq.c
@@ -100,6 +100,38 @@  seq_destroy(struct seq *seq)
     ovs_mutex_unlock(&seq_mutex);
 }
 
+int
+seq_try_lock(void)
+{
+    return ovs_mutex_trylock(&seq_mutex);
+}
+
+void
+seq_lock(void)
+     OVS_EXCLUDED(seq_mutex)
+{
+    ovs_mutex_lock(&seq_mutex);
+}
+
+void
+seq_unlock(void)
+    OVS_REQUIRES(seq_mutex)
+{
+    ovs_mutex_unlock(&seq_mutex);
+}
+
+/* Increments 'seq''s sequence number, waking up any threads that are waiting
+ * on 'seq'. */
+void
+seq_change_protected(struct seq *seq)
+    OVS_REQUIRES(seq_mutex)
+{
+    COVERAGE_INC(seq_change);
+
+    seq->value = seq_next++;
+    seq_wake_waiters(seq);
+}
+
 /* Increments 'seq''s sequence number, waking up any threads that are waiting
  * on 'seq'. */
 void
@@ -109,8 +141,7 @@  seq_change(struct seq *seq)
     COVERAGE_INC(seq_change);
 
     ovs_mutex_lock(&seq_mutex);
-    seq->value = seq_next++;
-    seq_wake_waiters(seq);
+    seq_change_protected(seq);
     ovs_mutex_unlock(&seq_mutex);
 }
 
@@ -120,13 +151,25 @@  seq_change(struct seq *seq)
  * when an object changes, even without an ability to lock the object.  See
  * Usage in seq.h for details. */
 uint64_t
+seq_read_protected(const struct seq *seq)
+    OVS_REQUIRES(seq_mutex)
+{
+    return seq->value;
+}
+
+/* Returns 'seq''s current sequence number (which could change immediately).
+ *
+ * seq_read() and seq_wait() can be used together to yield a race-free wakeup
+ * when an object changes, even without an ability to lock the object.  See
+ * Usage in seq.h for details. */
+uint64_t
 seq_read(const struct seq *seq)
     OVS_EXCLUDED(seq_mutex)
 {
     uint64_t value;
 
     ovs_mutex_lock(&seq_mutex);
-    value = seq->value;
+    value = seq_read_protected(seq);
     ovs_mutex_unlock(&seq_mutex);
 
     return value;
diff --git a/lib/seq.h b/lib/seq.h
index f3f4b53..221ab9a 100644
--- a/lib/seq.h
+++ b/lib/seq.h
@@ -121,9 +121,14 @@ 
 struct seq *seq_create(void);
 void seq_destroy(struct seq *);
 void seq_change(struct seq *);
+void seq_change_protected(struct seq *);
+void seq_lock(void);
+int seq_try_lock(void);
+void seq_unlock(void);
 
 /* For observers. */
 uint64_t seq_read(const struct seq *);
+uint64_t seq_read_protected(const struct seq *);
 
 void seq_wait_at(const struct seq *, uint64_t value, const char *where);
 #define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)