@@ -52,6 +52,7 @@
#include "id-pool.h"
#include "ipf.h"
#include "mov-avg.h"
+#include "mpsc-queue.h"
#include "netdev.h"
#include "netdev-offload.h"
#include "netdev-provider.h"
@@ -434,22 +435,19 @@ struct dp_offload_thread_item {
size_t actions_len;
long long int timestamp;
- struct ovs_list node;
+ struct mpsc_queue_node node;
};
struct dp_offload_thread {
- struct ovs_mutex mutex;
- struct ovs_list list;
- uint64_t enqueued_item;
+ struct mpsc_queue queue;
+ atomic_uint64_t enqueued_item;
struct mov_avg_cma cma;
struct mov_avg_ema ema;
- pthread_cond_t cond;
};
static struct dp_offload_thread dp_offload_thread = {
- .mutex = OVS_MUTEX_INITIALIZER,
- .list = OVS_LIST_INITIALIZER(&dp_offload_thread.list),
- .enqueued_item = 0,
+ .queue = MPSC_QUEUE_INITIALIZER(&dp_offload_thread.queue),
+ .enqueued_item = ATOMIC_VAR_INIT(0),
.cma = MOV_AVG_CMA_INITIALIZER,
.ema = MOV_AVG_EMA_INITIALIZER(100),
};
@@ -2649,11 +2647,8 @@ dp_netdev_free_flow_offload(struct dp_offload_thread_item *offload)
static void
dp_netdev_append_flow_offload(struct dp_offload_thread_item *offload)
{
- ovs_mutex_lock(&dp_offload_thread.mutex);
- ovs_list_push_back(&dp_offload_thread.list, &offload->node);
- dp_offload_thread.enqueued_item++;
- xpthread_cond_signal(&dp_offload_thread.cond);
- ovs_mutex_unlock(&dp_offload_thread.mutex);
+ mpsc_queue_insert(&dp_offload_thread.queue, &offload->node);
+ atomic_count_inc64(&dp_offload_thread.enqueued_item);
}
static int
@@ -2751,33 +2746,48 @@ err_free:
return -1;
}
+#define DP_NETDEV_OFFLOAD_BACKOFF_MIN 1
+#define DP_NETDEV_OFFLOAD_BACKOFF_MAX 64
#define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
static void *
dp_netdev_flow_offload_main(void *data OVS_UNUSED)
{
struct dp_offload_thread_item *offload;
- struct ovs_list *list;
+ enum mpsc_queue_poll_result poll_result;
+ struct mpsc_queue_node *node;
+ struct mpsc_queue *queue;
long long int latency_us;
long long int next_rcu;
long long int now;
+ uint64_t backoff;
const char *op;
int ret;
+ queue = &dp_offload_thread.queue;
+ if (!mpsc_queue_acquire(queue)) {
+ VLOG_ERR("failed to register as consumer of the offload queue");
+ return NULL;
+ }
+
+sleep_until_next:
+ backoff = DP_NETDEV_OFFLOAD_BACKOFF_MIN;
+ while ((poll_result = mpsc_queue_poll(queue, &node)) == MPSC_QUEUE_EMPTY) {
+ xnanosleep(backoff * 1E6);
+ if (backoff < DP_NETDEV_OFFLOAD_BACKOFF_MAX) {
+ backoff <<= 1;
+ }
+ }
+
next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
- for (;;) {
- ovs_mutex_lock(&dp_offload_thread.mutex);
- if (ovs_list_is_empty(&dp_offload_thread.list)) {
- ovsrcu_quiesce_start();
- ovs_mutex_cond_wait(&dp_offload_thread.cond,
- &dp_offload_thread.mutex);
- ovsrcu_quiesce_end();
- next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
- }
- list = ovs_list_pop_front(&dp_offload_thread.list);
- dp_offload_thread.enqueued_item--;
- offload = CONTAINER_OF(list, struct dp_offload_thread_item, node);
- ovs_mutex_unlock(&dp_offload_thread.mutex);
+
+ do {
+ while (poll_result == MPSC_QUEUE_RETRY) {
+ poll_result = mpsc_queue_poll(queue, &node);
+ }
+
+ offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
+ atomic_count_dec64(&dp_offload_thread.enqueued_item);
switch (offload->op) {
case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
@@ -2813,7 +2823,11 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
next_rcu += DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
}
}
- }
+
+ poll_result = mpsc_queue_poll(queue, &node);
+ } while (poll_result != MPSC_QUEUE_EMPTY);
+
+ goto sleep_until_next;
return NULL;
}
@@ -2825,7 +2839,7 @@ queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
struct dp_offload_thread_item *offload;
if (ovsthread_once_start(&offload_thread_once)) {
- xpthread_cond_init(&dp_offload_thread.cond, NULL);
+ mpsc_queue_init(&dp_offload_thread.queue);
ovs_thread_create("dp_netdev_flow_offload",
dp_netdev_flow_offload_main, NULL);
ovsthread_once_done(&offload_thread_once);
@@ -2850,7 +2864,7 @@ queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
}
if (ovsthread_once_start(&offload_thread_once)) {
- xpthread_cond_init(&dp_offload_thread.cond, NULL);
+ mpsc_queue_init(&dp_offload_thread.queue);
ovs_thread_create("dp_netdev_flow_offload",
dp_netdev_flow_offload_main, NULL);
ovsthread_once_done(&offload_thread_once);
@@ -4295,8 +4309,8 @@ dpif_netdev_offload_stats_get(struct dpif *dpif,
}
ovs_mutex_unlock(&dp->port_mutex);
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value =
- dp_offload_thread.enqueued_item;
+ atomic_read_relaxed(&dp_offload_thread.enqueued_item,
+ &stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value);
stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].value = nb_offloads;
stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].value =
mov_avg_cma(&dp_offload_thread.cma);