diff mbox series

[ovs-dev,v1,19/23] dpif-netdev: Use lockless queue to manage offloads

Message ID 4f26bd9c6667264d961176f7e2be46c2a6fd492e.1612968146.git.grive@u256.net
State New
Headers show
Series dpif-netdev: Parallel offload processing | expand

Commit Message

Gaƫtan Rivet Feb. 10, 2021, 3:34 p.m. UTC
The dataplane threads (PMDs) send offloading commands to a dedicated
offload management thread. The current implementation uses a lock
and benchmarks show a high contention on the queue in some cases.

With high-contention, the mutex will more often lead to the locking
thread yielding in wait, using a syscall. This should be avoided in
a userland dataplane.

The mpsc-queue can be used instead. It uses less cycles and has
lower latency. Benchmarks show better behavior as multiple
revalidators and one or multiple PMDs writes to a single queue
while another thread polls it.

One trade-off with the new scheme however is to be forced to poll
the queue from the offload thread. Without mutex, a cond_wait
cannot be used for signaling. The offload thread is implementing
an exponential backoff and will sleep in short increments when no
data is available. This makes the thread yield, at the price of
some latency to manage offloads after an inactivity period.

Signed-off-by: Gaetan Rivet <grive@u256.net>
Reviewed-by: Eli Britstein <elibr@nvidia.com>
---
 lib/dpif-netdev.c | 78 ++++++++++++++++++++++++++++-------------------
 1 file changed, 46 insertions(+), 32 deletions(-)
diff mbox series

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 8b9115609..09d62a3d5 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -52,6 +52,7 @@ 
 #include "id-pool.h"
 #include "ipf.h"
 #include "mov-avg.h"
+#include "mpsc-queue.h"
 #include "netdev.h"
 #include "netdev-offload.h"
 #include "netdev-provider.h"
@@ -434,22 +435,19 @@  struct dp_offload_thread_item {
     size_t actions_len;
     long long int timestamp;
 
-    struct ovs_list node;
+    struct mpsc_queue_node node;
 };
 
 struct dp_offload_thread {
-    struct ovs_mutex mutex;
-    struct ovs_list list;
-    uint64_t enqueued_item;
+    struct mpsc_queue queue;
+    atomic_uint64_t enqueued_item;
     struct mov_avg_cma cma;
     struct mov_avg_ema ema;
-    pthread_cond_t cond;
 };
 
 static struct dp_offload_thread dp_offload_thread = {
-    .mutex = OVS_MUTEX_INITIALIZER,
-    .list  = OVS_LIST_INITIALIZER(&dp_offload_thread.list),
-    .enqueued_item = 0,
+    .queue = MPSC_QUEUE_INITIALIZER(&dp_offload_thread.queue),
+    .enqueued_item = ATOMIC_VAR_INIT(0),
     .cma = MOV_AVG_CMA_INITIALIZER,
     .ema = MOV_AVG_EMA_INITIALIZER(100),
 };
@@ -2649,11 +2647,8 @@  dp_netdev_free_flow_offload(struct dp_offload_thread_item *offload)
 static void
 dp_netdev_append_flow_offload(struct dp_offload_thread_item *offload)
 {
-    ovs_mutex_lock(&dp_offload_thread.mutex);
-    ovs_list_push_back(&dp_offload_thread.list, &offload->node);
-    dp_offload_thread.enqueued_item++;
-    xpthread_cond_signal(&dp_offload_thread.cond);
-    ovs_mutex_unlock(&dp_offload_thread.mutex);
+    mpsc_queue_insert(&dp_offload_thread.queue, &offload->node);
+    atomic_count_inc64(&dp_offload_thread.enqueued_item);
 }
 
 static int
@@ -2751,33 +2746,48 @@  err_free:
     return -1;
 }
 
+#define DP_NETDEV_OFFLOAD_BACKOFF_MIN 1
+#define DP_NETDEV_OFFLOAD_BACKOFF_MAX 64
 #define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
 
 static void *
 dp_netdev_flow_offload_main(void *data OVS_UNUSED)
 {
     struct dp_offload_thread_item *offload;
-    struct ovs_list *list;
+    enum mpsc_queue_poll_result poll_result;
+    struct mpsc_queue_node *node;
+    struct mpsc_queue *queue;
     long long int latency_us;
     long long int next_rcu;
     long long int now;
+    uint64_t backoff;
     const char *op;
     int ret;
 
+    queue = &dp_offload_thread.queue;
+    if (!mpsc_queue_acquire(queue)) {
+        VLOG_ERR("failed to register as consumer of the offload queue");
+        return NULL;
+    }
+
+sleep_until_next:
+    backoff = DP_NETDEV_OFFLOAD_BACKOFF_MIN;
+    while ((poll_result = mpsc_queue_poll(queue, &node)) == MPSC_QUEUE_EMPTY) {
+        xnanosleep(backoff * 1E6);
+        if (backoff < DP_NETDEV_OFFLOAD_BACKOFF_MAX) {
+            backoff <<= 1;
+        }
+    }
+
     next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
-    for (;;) {
-        ovs_mutex_lock(&dp_offload_thread.mutex);
-        if (ovs_list_is_empty(&dp_offload_thread.list)) {
-            ovsrcu_quiesce_start();
-            ovs_mutex_cond_wait(&dp_offload_thread.cond,
-                                &dp_offload_thread.mutex);
-            ovsrcu_quiesce_end();
-            next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
-        }
-        list = ovs_list_pop_front(&dp_offload_thread.list);
-        dp_offload_thread.enqueued_item--;
-        offload = CONTAINER_OF(list, struct dp_offload_thread_item, node);
-        ovs_mutex_unlock(&dp_offload_thread.mutex);
+
+    do {
+        while (poll_result == MPSC_QUEUE_RETRY) {
+            poll_result = mpsc_queue_poll(queue, &node);
+        }
+
+        offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
+        atomic_count_dec64(&dp_offload_thread.enqueued_item);
 
         switch (offload->op) {
         case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
@@ -2813,7 +2823,11 @@  dp_netdev_flow_offload_main(void *data OVS_UNUSED)
                 next_rcu += DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
             }
         }
-    }
+
+        poll_result = mpsc_queue_poll(queue, &node);
+    } while (poll_result != MPSC_QUEUE_EMPTY);
+
+    goto sleep_until_next;
 
     return NULL;
 }
@@ -2825,7 +2839,7 @@  queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
     struct dp_offload_thread_item *offload;
 
     if (ovsthread_once_start(&offload_thread_once)) {
-        xpthread_cond_init(&dp_offload_thread.cond, NULL);
+        mpsc_queue_init(&dp_offload_thread.queue);
         ovs_thread_create("dp_netdev_flow_offload",
                           dp_netdev_flow_offload_main, NULL);
         ovsthread_once_done(&offload_thread_once);
@@ -2850,7 +2864,7 @@  queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
     }
 
     if (ovsthread_once_start(&offload_thread_once)) {
-        xpthread_cond_init(&dp_offload_thread.cond, NULL);
+        mpsc_queue_init(&dp_offload_thread.queue);
         ovs_thread_create("dp_netdev_flow_offload",
                           dp_netdev_flow_offload_main, NULL);
         ovsthread_once_done(&offload_thread_once);
@@ -4295,8 +4309,8 @@  dpif_netdev_offload_stats_get(struct dpif *dpif,
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
-    stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value =
-        dp_offload_thread.enqueued_item;
+    atomic_read_relaxed(&dp_offload_thread.enqueued_item,
+        &stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value);
     stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].value = nb_offloads;
     stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].value =
         mov_avg_cma(&dp_offload_thread.cma);