@@ -55,6 +55,7 @@
#include "id-pool.h"
#include "ipf.h"
#include "mov-avg.h"
+#include "mpsc-queue.h"
#include "netdev.h"
#include "netdev-offload.h"
#include "netdev-provider.h"
@@ -366,25 +367,22 @@ union dp_offload_thread_data {
};
struct dp_offload_thread_item {
- struct ovs_list node;
+ struct mpsc_queue_node node;
enum dp_offload_type type;
long long int timestamp;
union dp_offload_thread_data data[0];
};
struct dp_offload_thread {
- struct ovs_mutex mutex;
- struct ovs_list list;
- uint64_t enqueued_item;
+ struct mpsc_queue queue;
+ atomic_uint64_t enqueued_item;
struct mov_avg_cma cma;
struct mov_avg_ema ema;
- pthread_cond_t cond;
};
static struct dp_offload_thread dp_offload_thread = {
- .mutex = OVS_MUTEX_INITIALIZER,
- .list = OVS_LIST_INITIALIZER(&dp_offload_thread.list),
- .enqueued_item = 0,
+ .queue = MPSC_QUEUE_INITIALIZER(&dp_offload_thread.queue),
+ .enqueued_item = ATOMIC_VAR_INIT(0),
.cma = MOV_AVG_CMA_INITIALIZER,
.ema = MOV_AVG_EMA_INITIALIZER(100),
};
@@ -2616,11 +2614,8 @@ dp_netdev_free_offload(struct dp_offload_thread_item *offload)
static void
dp_netdev_append_offload(struct dp_offload_thread_item *offload)
{
- ovs_mutex_lock(&dp_offload_thread.mutex);
- ovs_list_push_back(&dp_offload_thread.list, &offload->node);
- dp_offload_thread.enqueued_item++;
- xpthread_cond_signal(&dp_offload_thread.cond);
- ovs_mutex_unlock(&dp_offload_thread.mutex);
+ mpsc_queue_insert(&dp_offload_thread.queue, &offload->node);
+ atomic_count_inc64(&dp_offload_thread.enqueued_item);
}
static int
@@ -2765,58 +2760,68 @@ dp_offload_flush(struct dp_offload_thread_item *item)
ovs_barrier_block(flush->barrier);
}
+#define DP_NETDEV_OFFLOAD_BACKOFF_MIN 1
+#define DP_NETDEV_OFFLOAD_BACKOFF_MAX 64
#define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
static void *
dp_netdev_flow_offload_main(void *data OVS_UNUSED)
{
struct dp_offload_thread_item *offload;
- struct ovs_list *list;
+ struct mpsc_queue_node *node;
+ struct mpsc_queue *queue;
long long int latency_us;
long long int next_rcu;
long long int now;
+ uint64_t backoff;
- next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
- for (;;) {
- ovs_mutex_lock(&dp_offload_thread.mutex);
- if (ovs_list_is_empty(&dp_offload_thread.list)) {
- ovsrcu_quiesce_start();
- ovs_mutex_cond_wait(&dp_offload_thread.cond,
- &dp_offload_thread.mutex);
- ovsrcu_quiesce_end();
- next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
- }
- list = ovs_list_pop_front(&dp_offload_thread.list);
- dp_offload_thread.enqueued_item--;
- offload = CONTAINER_OF(list, struct dp_offload_thread_item, node);
- ovs_mutex_unlock(&dp_offload_thread.mutex);
-
- switch (offload->type) {
- case DP_OFFLOAD_FLOW:
- dp_offload_flow(offload);
- break;
- case DP_OFFLOAD_FLUSH:
- dp_offload_flush(offload);
- break;
- default:
- OVS_NOT_REACHED();
+ queue = &dp_offload_thread.queue;
+ mpsc_queue_acquire(queue);
+
+ while (true) {
+ backoff = DP_NETDEV_OFFLOAD_BACKOFF_MIN;
+ while (mpsc_queue_tail(queue) == NULL) {
+ xnanosleep(backoff * 1E6);
+ if (backoff < DP_NETDEV_OFFLOAD_BACKOFF_MAX) {
+ backoff <<= 1;
+ }
}
- now = time_usec();
+ next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
+ MPSC_QUEUE_FOR_EACH_POP (node, queue) {
+ offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
+ atomic_count_dec64(&dp_offload_thread.enqueued_item);
- latency_us = now - offload->timestamp;
- mov_avg_cma_update(&dp_offload_thread.cma, latency_us);
- mov_avg_ema_update(&dp_offload_thread.ema, latency_us);
+ switch (offload->type) {
+ case DP_OFFLOAD_FLOW:
+ dp_offload_flow(offload);
+ break;
+ case DP_OFFLOAD_FLUSH:
+ dp_offload_flush(offload);
+ break;
+ default:
+ OVS_NOT_REACHED();
+ }
- dp_netdev_free_offload(offload);
+ now = time_usec();
- /* Do RCU synchronization at fixed interval. */
- if (now > next_rcu) {
- ovsrcu_quiesce();
- next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
+ latency_us = now - offload->timestamp;
+ mov_avg_cma_update(&dp_offload_thread.cma, latency_us);
+ mov_avg_ema_update(&dp_offload_thread.ema, latency_us);
+
+ dp_netdev_free_offload(offload);
+
+ /* Do RCU synchronization at fixed interval. */
+ if (now > next_rcu) {
+ ovsrcu_quiesce();
+ next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
+ }
}
}
+ OVS_NOT_REACHED();
+ mpsc_queue_release(queue);
+
return NULL;
}
@@ -2827,7 +2832,7 @@ queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
struct dp_offload_thread_item *offload;
if (ovsthread_once_start(&offload_thread_once)) {
- xpthread_cond_init(&dp_offload_thread.cond, NULL);
+ mpsc_queue_init(&dp_offload_thread.queue);
ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
ovsthread_once_done(&offload_thread_once);
}
@@ -2917,7 +2922,7 @@ queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
}
if (ovsthread_once_start(&offload_thread_once)) {
- xpthread_cond_init(&dp_offload_thread.cond, NULL);
+ mpsc_queue_init(&dp_offload_thread.queue);
ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
ovsthread_once_done(&offload_thread_once);
}
@@ -2969,7 +2974,7 @@ dp_netdev_offload_flush_enqueue(struct dp_netdev *dp,
struct dp_offload_flush_item *flush;
if (ovsthread_once_start(&offload_thread_once)) {
- xpthread_cond_init(&dp_offload_thread.cond, NULL);
+ mpsc_queue_init(&dp_offload_thread.queue);
ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
ovsthread_once_done(&offload_thread_once);
}
@@ -4389,8 +4394,8 @@ dpif_netdev_offload_stats_get(struct dpif *dpif,
}
ovs_mutex_unlock(&dp->port_mutex);
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value =
- dp_offload_thread.enqueued_item;
+ atomic_read_relaxed(&dp_offload_thread.enqueued_item,
+ &stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value);
stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].value = nb_offloads;
stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].value =
mov_avg_cma(&dp_offload_thread.cma);