@@ -166,6 +166,8 @@ lib_libopenvswitch_la_SOURCES = \
lib/memory.c \
lib/memory.h \
lib/meta-flow.c \
+ lib/mpsc-queue.c \
+ lib/mpsc-queue.h \
lib/multipath.c \
lib/multipath.h \
lib/namemap.c \
new file mode 100644
@@ -0,0 +1,251 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "ovs-atomic.h"
+
+#include "mpsc-queue.h"
+
+/* Multi-producer, single-consumer queue
+ * =====================================
+ *
+ * This an implementation of the MPSC queue described by Dmitri Vyukov [1].
+ *
+ * One atomic exchange operation is done per insertion. Removal in most cases
+ * will not require atomic operation and will use one atomic exchange to close
+ * the queue chain.
+ *
+ * Insertion
+ * =========
+ *
+ * The queue is implemented using a linked-list. Insertion is done at the
+ * back of the queue, by swapping the current end with the new node atomically,
+ * then pointing the previous end toward the new node. To follow Vyukov
+ * nomenclature, the end-node of the chain is called head. A producer will
+ * only manipulate the head.
+ *
+ * The head swap is atomic, however the link from the previous head to the new
+ * one is done in a separate operation. This means that the chain is
+ * momentarily broken, when the previous head still points to NULL and the
+ * current head has been inserted.
+ *
+ * Considering a series of insertions, the queue state will remain consistent
+ * and the insertions order is compatible with their precedence, thus the
+ * queue is serializable. However, because an insertion consists in two
+ * separate memory transactions, it is not linearizable.
+ *
+ * Removal
+ * =======
+ *
+ * The consumer must deal with the queue inconsistency. It will manipulate
+ * the tail of the queue and move it along the latest consumed elements.
+ * When an end of the chain of elements is found (the next pointer is NULL),
+ * the tail is compared with the head.
+ *
+ * If both points to different addresses, then the queue is in an inconsistent
+ * state: the tail cannot move forward as the next is NULL, but the head is not
+ * the last element in the chain: this can only happen if the chain is broken.
+ *
+ * In this case, the consumer must wait for the producer to finish writing the
+ * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.
+ *
+ * Removal is thus in most cases (when there are elements in the queue)
+ * accomplished without using atomics, until the last element of the queue.
+ * There, the head is atomically loaded. If the queue is in a consistent state,
+ * the head is moved back to the queue stub by inserting the stub in the queue:
+ * ending the queue is the same as an insertion, which is one atomic XCHG.
+ *
+ * Forward guarantees
+ * ==================
+ *
+ * Insertion and peeking are wait-free: they will execute in a known bounded
+ * number of instructions, regardless of the state of the queue.
+ *
+ * However, while removal consists in peeking and a constant write to
+ * update the tail, it can repeatedly fail until the queue become consistent.
+ * It is thus dependent on other threads progressing. This means that the
+ * queue forward progress is obstruction-free only. It has a potential for
+ * livelocking.
+ *
+ * The chain will remain broken as long as a producer is not finished writing
+ * its next pointer. If a producer is cancelled for example, the queue could
+ * remain broken for any future readings. This queue should either be used
+ * with cooperative threads or insertion must only be done outside cancellable
+ * sections.
+ *
+ * Performances
+ * ============
+ *
+ * In benchmarks this structure was better than alternatives such as:
+ *
+ * * A reversed Treiber stack [2], using 1 CAS per operations
+ * and requiring reversal of the node list on removal.
+ *
+ * * Michael-Scott lock-free queue [3], using 2 CAS per operations.
+ *
+ * While it is not linearizable, this queue is well-suited for message passing.
+ * If a proper hardware XCHG operation is used, it scales better than
+ * CAS-based implementations.
+ *
+ * References
+ * ==========
+ *
+ * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+ *
+ * [2]: R. K. Treiber. Systems programming: Coping with parallelism.
+ * Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
+ *
+ * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and
+ * Blocking Concurrent Queue Algorithms
+ * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
+ *
+ */
+
+void
+mpsc_queue_init(struct mpsc_queue *queue)
+{
+ atomic_store_relaxed(&queue->head, &queue->stub);
+ atomic_store_relaxed(&queue->tail, &queue->stub);
+ atomic_store_relaxed(&queue->stub.next, NULL);
+
+ ovs_mutex_init(&queue->read_lock);
+}
+
+void
+mpsc_queue_destroy(struct mpsc_queue *queue)
+ OVS_EXCLUDED(queue->read_lock)
+{
+ ovs_mutex_destroy(&queue->read_lock);
+}
+
+enum mpsc_queue_poll_result
+mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)
+ OVS_REQUIRES(queue->read_lock)
+{
+ struct mpsc_queue_node *tail;
+ struct mpsc_queue_node *next;
+ struct mpsc_queue_node *head;
+
+ atomic_read_relaxed(&queue->tail, &tail);
+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+
+ if (tail == &queue->stub) {
+ if (next == NULL) {
+ return MPSC_QUEUE_EMPTY;
+ }
+
+ atomic_store_relaxed(&queue->tail, next);
+ tail = next;
+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+ }
+
+ if (next != NULL) {
+ atomic_store_relaxed(&queue->tail, next);
+ *node = tail;
+ return MPSC_QUEUE_ITEM;
+ }
+
+ atomic_read_explicit(&queue->head, &head, memory_order_acquire);
+ if (tail != head) {
+ return MPSC_QUEUE_RETRY;
+ }
+
+ mpsc_queue_insert(queue, &queue->stub);
+
+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+ if (next != NULL) {
+ atomic_store_relaxed(&queue->tail, next);
+ *node = tail;
+ return MPSC_QUEUE_ITEM;
+ }
+
+ return MPSC_QUEUE_EMPTY;
+}
+
+struct mpsc_queue_node *
+mpsc_queue_pop(struct mpsc_queue *queue)
+ OVS_REQUIRES(queue->read_lock)
+{
+ enum mpsc_queue_poll_result result;
+ struct mpsc_queue_node *node;
+
+ do {
+ result = mpsc_queue_poll(queue, &node);
+ if (result == MPSC_QUEUE_EMPTY) {
+ return NULL;
+ }
+ } while (result == MPSC_QUEUE_RETRY);
+
+ return node;
+}
+
+void
+mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+ OVS_REQUIRES(queue->read_lock)
+{
+ struct mpsc_queue_node *tail;
+
+ atomic_read_relaxed(&queue->tail, &tail);
+ atomic_store_relaxed(&node->next, tail);
+ atomic_store_relaxed(&queue->tail, node);
+}
+
+struct mpsc_queue_node *
+mpsc_queue_tail(struct mpsc_queue *queue)
+ OVS_REQUIRES(queue->read_lock)
+{
+ struct mpsc_queue_node *tail;
+ struct mpsc_queue_node *next;
+
+ atomic_read_relaxed(&queue->tail, &tail);
+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+
+ if (tail == &queue->stub) {
+ if (next == NULL) {
+ return NULL;
+ }
+
+ atomic_store_relaxed(&queue->tail, next);
+ tail = next;
+ }
+
+ return tail;
+}
+
+/* Get the next element of a node. */
+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
+ struct mpsc_queue_node *prev)
+ OVS_REQUIRES(queue->read_lock)
+{
+ struct mpsc_queue_node *next;
+
+ atomic_read_explicit(&prev->next, &next, memory_order_acquire);
+ if (next == &queue->stub) {
+ atomic_read_explicit(&next->next, &next, memory_order_acquire);
+ }
+ return next;
+}
+
+void
+mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+{
+ struct mpsc_queue_node *prev;
+
+ atomic_store_relaxed(&node->next, NULL);
+ prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
+ atomic_store_explicit(&prev->next, node, memory_order_release);
+}
new file mode 100644
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MPSC_QUEUE_H
+#define MPSC_QUEUE_H 1
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include <openvswitch/thread.h>
+#include <openvswitch/util.h>
+
+#include "ovs-atomic.h"
+
+/* Multi-producer, single-consumer queue
+ * =====================================
+ *
+ * This data structure is a lockless queue implementation with
+ * the following properties:
+ *
+ * * Multi-producer: multiple threads can write concurrently.
+ * Insertion in the queue is thread-safe, no inter-thread
+ * synchronization is necessary.
+ *
+ * * Single-consumer: only a single thread can safely remove
+ * nodes from the queue. The queue must be 'acquired' using
+ * 'mpsc_queue_acquire()' before removing nodes.
+ *
+ * * Unbounded: the queue is backed by a linked-list and is not
+ * limited in number of elements.
+ *
+ * * Intrusive: queue elements are allocated as part of larger
+ * objects. Objects are retrieved by offset manipulation.
+ *
+ * * per-producer FIFO: Elements in the queue are kept in the
+ * order their producer inserted them. The consumer retrieves
+ * them in in the same insertion order. When multiple
+ * producers insert at the same time, either will proceed.
+ *
+ * This queue is well-suited for message passing between threads,
+ * where any number of thread can insert a message and a single
+ * thread is meant to receive and process it.
+ *
+ * Thread-safety
+ * =============
+ *
+ * The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.
+ * Once the queue is protected against concurrent reads, the thread can call
+ * the consumer API:
+ *
+ * * mpsc_queue_poll() to peek and return the tail of the queue
+ * * mpsc_queue_pop() to remove the tail of the queue
+ * * mpsc_queue_tail() to read the current tail
+ * * mpsc_queue_push_front() to enqueue an element safely at the tail
+ * * MPSC_QUEUE_FOR_EACH() to iterate over the current elements,
+ * without removing them.
+ * * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while
+ * removing them.
+ *
+ * When a thread is finished with reading the queue, it can release the
+ * reader lock using 'mpsc_queue_release()'.
+ *
+ * Producers can always insert elements in the queue, even if no consumer
+ * acquired the reader lock. No inter-producer synchronization is needed.
+ *
+ * The consumer thread is also allowed to insert elements while it holds the
+ * reader lock.
+ *
+ * Producer threads must never be cancelled while writing to the queue.
+ * This will block the consumer, that will then lose any subsequent elements
+ * in the queue. Producers should ideally be cooperatively managed or
+ * the queue insertion should be within non-cancellable sections.
+ *
+ * Queue state
+ * ===========
+ *
+ * When polling the queue, three states can be observed: 'empty', 'non-empty',
+ * and 'inconsistent'. Three polling results are defined, respectively:
+ *
+ * * MPSC_QUEUE_EMPTY: the queue is empty.
+ * * MPSC_QUEUE_ITEM: an item was available and has been removed.
+ * * MPSC_QUEUE_RETRY: the queue is inconsistent.
+ *
+ * If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished
+ * writing to the queue and the list of nodes is not coherent. The consumer
+ * can retry shortly to check if the producer has finished.
+ *
+ * This behavior is the reason the removal function is called
+ * 'mpsc_queue_poll()'.
+ *
+ */
+
+struct mpsc_queue_node {
+ ATOMIC(struct mpsc_queue_node *) next;
+};
+
+struct mpsc_queue {
+ ATOMIC(struct mpsc_queue_node *) head;
+ ATOMIC(struct mpsc_queue_node *) tail;
+ struct mpsc_queue_node stub;
+ struct ovs_mutex read_lock;
+};
+
+#define MPSC_QUEUE_INITIALIZER(Q) { \
+ .head = ATOMIC_VAR_INIT(&(Q)->stub), \
+ .tail = ATOMIC_VAR_INIT(&(Q)->stub), \
+ .stub = { .next = ATOMIC_VAR_INIT(NULL) }, \
+ .read_lock = OVS_MUTEX_INITIALIZER, \
+}
+
+/* Consumer API. */
+
+/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */
+void mpsc_queue_init(struct mpsc_queue *queue);
+/* The reader lock must be released prior to destroying the queue. */
+void mpsc_queue_destroy(struct mpsc_queue *queue);
+
+/* Acquire and release the consumer lock. */
+#define mpsc_queue_acquire(q) do { \
+ ovs_mutex_lock(&(q)->read_lock); \
+ } while (0)
+#define mpsc_queue_release(q) do { \
+ ovs_mutex_unlock(&(q)->read_lock); \
+ } while (0)
+
+enum mpsc_queue_poll_result {
+ /* Queue is empty. */
+ MPSC_QUEUE_EMPTY,
+ /* Polling the queue returned an item. */
+ MPSC_QUEUE_ITEM,
+ /* Data has been enqueued but one or more producer thread have not
+ * finished writing it. The queue is in an inconsistent state.
+ * Retrying shortly, if the producer threads are still active, will
+ * return the data.
+ */
+ MPSC_QUEUE_RETRY,
+};
+
+/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is
+ * returned, otherwise 'node' is not set.
+ */
+enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue,
+ struct mpsc_queue_node **node)
+ OVS_REQUIRES(queue->read_lock);
+
+/* Pop an element if there is any in the queue. */
+struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue)
+ OVS_REQUIRES(queue->read_lock);
+
+/* Insert at the front of the queue. Only the consumer can do it. */
+void mpsc_queue_push_front(struct mpsc_queue *queue,
+ struct mpsc_queue_node *node)
+ OVS_REQUIRES(queue->read_lock);
+
+/* Get the current queue tail. */
+struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue)
+ OVS_REQUIRES(queue->read_lock);
+
+/* Get the next element of a node. */
+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
+ struct mpsc_queue_node *prev)
+ OVS_REQUIRES(queue->read_lock);
+
+#define MPSC_QUEUE_FOR_EACH(node, queue) \
+ for (node = mpsc_queue_tail(queue); node != NULL; \
+ node = mpsc_queue_next((queue), node))
+
+#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \
+ for (node = mpsc_queue_pop(queue); node != NULL; \
+ node = mpsc_queue_pop(queue))
+
+/* Producer API. */
+
+void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);
+
+#endif /* MPSC_QUEUE_H */
@@ -465,6 +465,7 @@ tests_ovstest_SOURCES = \
tests/test-list.c \
tests/test-lockfile.c \
tests/test-multipath.c \
+ tests/test-mpsc-queue.c \
tests/test-netflow.c \
tests/test-odp.c \
tests/test-ofpbuf.c \
@@ -254,3 +254,8 @@ AT_SETUP([stopwatch module])
AT_CHECK([ovstest test-stopwatch], [0], [......
], [ignore])
AT_CLEANUP
+
+AT_SETUP([mpsc-queue module])
+AT_CHECK([ovstest test-mpsc-queue check], [0], [....
+])
+AT_CLEANUP
new file mode 100644
@@ -0,0 +1,772 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <assert.h>
+#include <getopt.h>
+#include <string.h>
+
+#include <config.h>
+
+#include "command-line.h"
+#include "guarded-list.h"
+#include "mpsc-queue.h"
+#include "openvswitch/list.h"
+#include "openvswitch/util.h"
+#include "openvswitch/vlog.h"
+#include "ovs-rcu.h"
+#include "ovs-thread.h"
+#include "ovstest.h"
+#include "timeval.h"
+#include "util.h"
+
+struct element {
+ union {
+ struct mpsc_queue_node mpscq;
+ struct ovs_list list;
+ } node;
+ uint64_t mark;
+};
+
+static void
+test_mpsc_queue_mark_element(struct mpsc_queue_node *node,
+ uint64_t mark,
+ unsigned int *counter)
+{
+ struct element *elem;
+
+ elem = CONTAINER_OF(node, struct element, node.mpscq);
+ elem->mark = mark;
+ *counter += 1;
+}
+
+static void
+test_mpsc_queue_insert(void)
+{
+ struct element elements[100];
+ struct mpsc_queue_node *node;
+ struct mpsc_queue queue;
+ unsigned int counter;
+ size_t i;
+
+ memset(elements, 0, sizeof(elements));
+ mpsc_queue_init(&queue);
+ mpsc_queue_acquire(&queue);
+
+ for (i = 0; i < ARRAY_SIZE(elements); i++) {
+ mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+ }
+
+ counter = 0;
+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+ test_mpsc_queue_mark_element(node, 1, &counter);
+ }
+
+ mpsc_queue_release(&queue);
+ mpsc_queue_destroy(&queue);
+
+ ovs_assert(counter == ARRAY_SIZE(elements));
+ for (i = 0; i < ARRAY_SIZE(elements); i++) {
+ ovs_assert(elements[i].mark == 1);
+ }
+
+ printf(".");
+}
+
+static void
+test_mpsc_queue_removal_fifo(void)
+{
+ struct element elements[100];
+ struct mpsc_queue_node *node;
+ struct mpsc_queue queue;
+ unsigned int counter;
+ size_t i;
+
+ memset(elements, 0, sizeof(elements));
+
+ mpsc_queue_init(&queue);
+ mpsc_queue_acquire(&queue);
+
+ for (i = 0; i < ARRAY_SIZE(elements); i++) {
+ mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+ }
+
+ /* Elements are in the same order in the list as they
+ * were declared / initialized.
+ */
+ counter = 0;
+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+ test_mpsc_queue_mark_element(node, counter, &counter);
+ }
+
+ /* The list is valid once extracted from the queue,
+ * the queue can be destroyed here.
+ */
+ mpsc_queue_release(&queue);
+ mpsc_queue_destroy(&queue);
+
+ for (i = 0; i < ARRAY_SIZE(elements) - 1; i++) {
+ struct element *e1, *e2;
+
+ e1 = &elements[i];
+ e2 = &elements[i + 1];
+
+ ovs_assert(e1->mark < e2->mark);
+ }
+
+ printf(".");
+}
+
+/* Partial insert:
+ *
+ * Those functions are 'mpsc_queue_insert()' divided in two parts.
+ * They serve to test the behavior of the queue when forcing the potential
+ * condition of a thread starting an insertion then yielding.
+ */
+static struct mpsc_queue_node *
+mpsc_queue_insert_begin(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+{
+ struct mpsc_queue_node *prev;
+
+ atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
+ prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
+ return prev;
+}
+
+static void
+mpsc_queue_insert_end(struct mpsc_queue_node *prev,
+ struct mpsc_queue_node *node)
+{
+ atomic_store_explicit(&prev->next, node, memory_order_release);
+}
+
+static void
+test_mpsc_queue_insert_partial(void)
+{
+ struct element elements[10];
+ struct mpsc_queue_node *prevs[ARRAY_SIZE(elements)];
+ struct mpsc_queue_node *node;
+ struct mpsc_queue queue, *q = &queue;
+ size_t i;
+
+ mpsc_queue_init(q);
+
+ /* Insert the first half of elements entirely,
+ * insert the second hald of elements partially.
+ */
+ for (i = 0; i < ARRAY_SIZE(elements); i++) {
+ elements[i].mark = i;
+ if (i > ARRAY_SIZE(elements) / 2) {
+ prevs[i] = mpsc_queue_insert_begin(q, &elements[i].node.mpscq);
+ } else {
+ prevs[i] = NULL;
+ mpsc_queue_insert(q, &elements[i].node.mpscq);
+ }
+ }
+
+ mpsc_queue_acquire(q);
+
+ /* Verify that when the chain is broken, iterators will stop. */
+ i = 0;
+ MPSC_QUEUE_FOR_EACH (node, q) {
+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+ ovs_assert(e == &elements[i]);
+ i++;
+ }
+ ovs_assert(i < ARRAY_SIZE(elements));
+
+ for (i = 0; i < ARRAY_SIZE(elements); i++) {
+ if (prevs[i] != NULL) {
+ mpsc_queue_insert_end(prevs[i], &elements[i].node.mpscq);
+ }
+ }
+
+ i = 0;
+ MPSC_QUEUE_FOR_EACH (node, q) {
+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+ ovs_assert(e == &elements[i]);
+ i++;
+ }
+ ovs_assert(i == ARRAY_SIZE(elements));
+
+ MPSC_QUEUE_FOR_EACH_POP (node, q) {
+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+ ovs_assert(e->mark == (unsigned int)(e - elements));
+ }
+
+ mpsc_queue_release(q);
+ mpsc_queue_destroy(q);
+
+ printf(".");
+}
+
+static void
+test_mpsc_queue_push_front(void)
+{
+ struct mpsc_queue queue, *q = &queue;
+ struct mpsc_queue_node *node;
+ struct element elements[10];
+ size_t i;
+
+ mpsc_queue_init(q);
+ mpsc_queue_acquire(q);
+
+ ovs_assert(mpsc_queue_pop(q) == NULL);
+ mpsc_queue_push_front(q, &elements[0].node.mpscq);
+ node = mpsc_queue_pop(q);
+ ovs_assert(node == &elements[0].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == NULL);
+
+ mpsc_queue_push_front(q, &elements[0].node.mpscq);
+ mpsc_queue_push_front(q, &elements[1].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == NULL);
+
+ mpsc_queue_push_front(q, &elements[1].node.mpscq);
+ mpsc_queue_push_front(q, &elements[0].node.mpscq);
+ mpsc_queue_insert(q, &elements[2].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == &elements[2].node.mpscq);
+ ovs_assert(mpsc_queue_pop(q) == NULL);
+
+ for (i = 0; i < ARRAY_SIZE(elements); i++) {
+ elements[i].mark = i;
+ mpsc_queue_insert(q, &elements[i].node.mpscq);
+ }
+
+ node = mpsc_queue_pop(q);
+ mpsc_queue_push_front(q, node);
+ ovs_assert(mpsc_queue_pop(q) == node);
+ mpsc_queue_push_front(q, node);
+
+ i = 0;
+ MPSC_QUEUE_FOR_EACH (node, q) {
+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+ ovs_assert(e == &elements[i]);
+ i++;
+ }
+ ovs_assert(i == ARRAY_SIZE(elements));
+
+ MPSC_QUEUE_FOR_EACH_POP (node, q) {
+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+ ovs_assert(e->mark == (unsigned int)(e - elements));
+ }
+
+ mpsc_queue_release(q);
+ mpsc_queue_destroy(q);
+
+ printf(".");
+}
+
+static void
+run_tests(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+ /* Verify basic insertion. */
+ test_mpsc_queue_insert();
+ /* Test partial insertion. */
+ test_mpsc_queue_insert_partial();
+ /* Verify removal order is respected. */
+ test_mpsc_queue_removal_fifo();
+ /* Verify tail-end insertion works. */
+ test_mpsc_queue_push_front();
+ printf("\n");
+}
+
+static struct element *elements;
+static uint64_t *thread_working_ms; /* Measured work time. */
+
+static unsigned int n_threads;
+static unsigned int n_elems;
+
+static struct ovs_barrier barrier;
+static volatile bool working;
+
+static int
+elapsed(const struct timeval *start)
+{
+ struct timeval end;
+
+ xgettimeofday(&end);
+ return timeval_to_msec(&end) - timeval_to_msec(start);
+}
+
+static void
+print_result(const char *prefix, int reader_elapsed)
+{
+ uint64_t avg;
+ size_t i;
+
+ avg = 0;
+ for (i = 0; i < n_threads; i++) {
+ avg += thread_working_ms[i];
+ }
+ avg /= n_threads;
+ printf("%s: %6d", prefix, reader_elapsed);
+ for (i = 0; i < n_threads; i++) {
+ printf(" %6" PRIu64, thread_working_ms[i]);
+ }
+ printf(" %6" PRIu64 " ms\n", avg);
+}
+
+struct mpscq_aux {
+ struct mpsc_queue *queue;
+ atomic_uint thread_id;
+};
+
+static void *
+mpsc_queue_insert_thread(void *aux_)
+{
+ unsigned int n_elems_per_thread;
+ struct element *th_elements;
+ struct mpscq_aux *aux = aux_;
+ struct timeval start;
+ unsigned int id;
+ size_t i;
+
+ atomic_add(&aux->thread_id, 1u, &id);
+ n_elems_per_thread = n_elems / n_threads;
+ th_elements = &elements[id * n_elems_per_thread];
+
+ ovs_barrier_block(&barrier);
+ xgettimeofday(&start);
+
+ for (i = 0; i < n_elems_per_thread; i++) {
+ mpsc_queue_insert(aux->queue, &th_elements[i].node.mpscq);
+ }
+
+ thread_working_ms[id] = elapsed(&start);
+ ovs_barrier_block(&barrier);
+
+ working = false;
+
+ return NULL;
+}
+
+static void
+benchmark_mpsc_queue(void)
+{
+ struct mpsc_queue_node *node;
+ struct mpsc_queue queue;
+ struct timeval start;
+ unsigned int counter;
+ bool work_complete;
+ pthread_t *threads;
+ struct mpscq_aux aux;
+ uint64_t epoch;
+ size_t i;
+
+ memset(elements, 0, n_elems & sizeof *elements);
+ memset(thread_working_ms, 0, n_threads & sizeof *thread_working_ms);
+
+ mpsc_queue_init(&queue);
+
+ aux.queue = &queue;
+ atomic_store(&aux.thread_id, 0);
+
+ for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+ mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+ }
+
+ working = true;
+
+ threads = xmalloc(n_threads * sizeof *threads);
+ ovs_barrier_init(&barrier, n_threads);
+
+ for (i = 0; i < n_threads; i++) {
+ threads[i] = ovs_thread_create("sc_queue_insert",
+ mpsc_queue_insert_thread, &aux);
+ }
+
+ mpsc_queue_acquire(&queue);
+ xgettimeofday(&start);
+
+ counter = 0;
+ epoch = 1;
+ do {
+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+ test_mpsc_queue_mark_element(node, epoch, &counter);
+ }
+ if (epoch == UINT64_MAX) {
+ epoch = 0;
+ }
+ epoch++;
+ } while (working);
+
+ for (i = 0; i < n_threads; i++) {
+ xpthread_join(threads[i], NULL);
+ }
+
+ /* Elements might have been inserted before threads were joined. */
+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+ test_mpsc_queue_mark_element(node, epoch, &counter);
+ }
+
+ print_result(" mpsc-queue", elapsed(&start));
+
+ mpsc_queue_release(&queue);
+ mpsc_queue_destroy(&queue);
+ ovs_barrier_destroy(&barrier);
+ free(threads);
+
+ work_complete = true;
+ for (i = 0; i < n_elems; i++) {
+ if (elements[i].mark == 0) {
+ printf("Element %" PRIuSIZE " was never consumed.\n", i);
+ work_complete = false;
+ }
+ }
+ ovs_assert(work_complete);
+ ovs_assert(counter == n_elems);
+}
+
+#ifdef HAVE_PTHREAD_SPIN_LOCK
+#define spin_lock_type ovs_spin
+#define spin_lock_init(l) ovs_spin_init(l)
+#define spin_lock_destroy(l) ovs_spin_destroy(l)
+#define spin_lock(l) ovs_spin_lock(l)
+#define spin_unlock(l) ovs_spin_unlock(l)
+#else
+#define spin_lock_type ovs_mutex
+#define spin_lock_init(l) ovs_mutex_init(l)
+#define spin_lock_destroy(l) ovs_mutex_destroy(l)
+#define spin_lock(l) ovs_mutex_lock(l)
+#define spin_unlock(l) ovs_mutex_unlock(l)
+#endif
+
+struct list_aux {
+ struct ovs_list *list;
+ struct ovs_mutex *mutex;
+ struct spin_lock_type *spin;
+ atomic_uint thread_id;
+};
+
+static void *
+locked_list_insert_main(void *aux_)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ unsigned int n_elems_per_thread;
+ struct element *th_elements;
+ struct list_aux *aux = aux_;
+ struct timeval start;
+ unsigned int id;
+ size_t i;
+
+ atomic_add(&aux->thread_id, 1u, &id);
+ n_elems_per_thread = n_elems / n_threads;
+ th_elements = &elements[id * n_elems_per_thread];
+
+ ovs_barrier_block(&barrier);
+ xgettimeofday(&start);
+
+ for (i = 0; i < n_elems_per_thread; i++) {
+ aux->mutex ? ovs_mutex_lock(aux->mutex)
+ : spin_lock(aux->spin);
+ ovs_list_push_front(aux->list, &th_elements[i].node.list);
+ aux->mutex ? ovs_mutex_unlock(aux->mutex)
+ : spin_unlock(aux->spin);
+ }
+
+ thread_working_ms[id] = elapsed(&start);
+ ovs_barrier_block(&barrier);
+
+ working = false;
+
+ return NULL;
+}
+
+static void
+benchmark_list(bool use_mutex)
+{
+ struct ovs_mutex mutex;
+ struct spin_lock_type spin;
+ struct ovs_list list;
+ struct element *elem;
+ struct timeval start;
+ unsigned int counter;
+ bool work_complete;
+ pthread_t *threads;
+ struct list_aux aux;
+ uint64_t epoch;
+ size_t i;
+
+ memset(elements, 0, n_elems * sizeof *elements);
+ memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
+
+ use_mutex ? ovs_mutex_init(&mutex) : spin_lock_init(&spin);
+
+ ovs_list_init(&list);
+
+ aux.list = &list;
+ aux.mutex = use_mutex ? &mutex : NULL;
+ aux.spin = use_mutex ? NULL : &spin;
+ atomic_store(&aux.thread_id, 0);
+
+ for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+ ovs_list_push_front(&list, &elements[i].node.list);
+ }
+
+ working = true;
+
+ threads = xmalloc(n_threads * sizeof *threads);
+ ovs_barrier_init(&barrier, n_threads);
+
+ for (i = 0; i < n_threads; i++) {
+ threads[i] = ovs_thread_create("locked_list_insert",
+ locked_list_insert_main, &aux);
+ }
+
+ xgettimeofday(&start);
+
+ counter = 0;
+ epoch = 1;
+ do {
+ if (use_mutex) {
+ ovs_mutex_lock(&mutex);
+ LIST_FOR_EACH_POP (elem, node.list, &list) {
+ elem->mark = epoch;
+ counter++;
+ }
+ ovs_mutex_unlock(&mutex);
+ } else {
+ struct ovs_list *node = NULL;
+
+ spin_lock(&spin);
+ if (!ovs_list_is_empty(&list)) {
+ node = ovs_list_pop_front(&list);
+ }
+ spin_unlock(&spin);
+
+ if (!node) {
+ continue;
+ }
+
+ elem = CONTAINER_OF(node, struct element, node.list);
+ elem->mark = epoch;
+ counter++;
+ }
+ if (epoch == UINT64_MAX) {
+ epoch = 0;
+ }
+ epoch++;
+ } while (working);
+
+ for (i = 0; i < n_threads; i++) {
+ xpthread_join(threads[i], NULL);
+ }
+
+ /* Elements might have been inserted before threads were joined. */
+ LIST_FOR_EACH_POP (elem, node.list, &list) {
+ elem->mark = epoch;
+ counter++;
+ }
+
+ if (use_mutex) {
+ print_result(" list(mutex)", elapsed(&start));
+ } else {
+ print_result(" list(spin)", elapsed(&start));
+ }
+
+ use_mutex ? ovs_mutex_destroy(&mutex) : spin_lock_destroy(&spin);
+ ovs_barrier_destroy(&barrier);
+ free(threads);
+
+ work_complete = true;
+ for (i = 0; i < n_elems; i++) {
+ if (elements[i].mark == 0) {
+ printf("Element %" PRIuSIZE " was never consumed.\n", i);
+ work_complete = false;
+ }
+ }
+ ovs_assert(work_complete);
+ ovs_assert(counter == n_elems);
+}
+
+struct guarded_list_aux {
+ struct guarded_list *glist;
+ atomic_uint thread_id;
+};
+
+static void *
+guarded_list_insert_thread(void *aux_)
+{
+ unsigned int n_elems_per_thread;
+ struct element *th_elements;
+ struct guarded_list_aux *aux = aux_;
+ struct timeval start;
+ unsigned int id;
+ size_t i;
+
+ atomic_add(&aux->thread_id, 1u, &id);
+ n_elems_per_thread = n_elems / n_threads;
+ th_elements = &elements[id * n_elems_per_thread];
+
+ ovs_barrier_block(&barrier);
+ xgettimeofday(&start);
+
+ for (i = 0; i < n_elems_per_thread; i++) {
+ guarded_list_push_back(aux->glist, &th_elements[i].node.list, n_elems);
+ }
+
+ thread_working_ms[id] = elapsed(&start);
+ ovs_barrier_block(&barrier);
+
+ working = false;
+
+ return NULL;
+}
+
+static void
+benchmark_guarded_list(void)
+{
+ struct guarded_list_aux aux;
+ struct ovs_list extracted;
+ struct guarded_list glist;
+ struct element *elem;
+ struct timeval start;
+ unsigned int counter;
+ bool work_complete;
+ pthread_t *threads;
+ uint64_t epoch;
+ size_t i;
+
+ memset(elements, 0, n_elems * sizeof *elements);
+ memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
+
+ guarded_list_init(&glist);
+ ovs_list_init(&extracted);
+
+ aux.glist = &glist;
+ atomic_store(&aux.thread_id, 0);
+
+ for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+ guarded_list_push_back(&glist, &elements[i].node.list, n_elems);
+ }
+
+ working = true;
+
+ threads = xmalloc(n_threads * sizeof *threads);
+ ovs_barrier_init(&barrier, n_threads);
+
+ for (i = 0; i < n_threads; i++) {
+ threads[i] = ovs_thread_create("guarded_list_insert",
+ guarded_list_insert_thread, &aux);
+ }
+
+ xgettimeofday(&start);
+
+ counter = 0;
+ epoch = 1;
+ do {
+ guarded_list_pop_all(&glist, &extracted);
+ LIST_FOR_EACH_POP (elem, node.list, &extracted) {
+ elem->mark = epoch;
+ counter++;
+ }
+ if (epoch == UINT64_MAX) {
+ epoch = 0;
+ }
+ epoch++;
+ } while (working);
+
+ for (i = 0; i < n_threads; i++) {
+ xpthread_join(threads[i], NULL);
+ }
+
+ /* Elements might have been inserted before threads were joined. */
+ guarded_list_pop_all(&glist, &extracted);
+ LIST_FOR_EACH_POP (elem, node.list, &extracted) {
+ elem->mark = epoch;
+ counter++;
+ }
+
+ print_result("guarded list", elapsed(&start));
+
+ ovs_barrier_destroy(&barrier);
+ free(threads);
+ guarded_list_destroy(&glist);
+
+ work_complete = true;
+ for (i = 0; i < n_elems; i++) {
+ if (elements[i].mark == 0) {
+ printf("Element %" PRIuSIZE " was never consumed.\n", i);
+ work_complete = false;
+ }
+ }
+ ovs_assert(work_complete);
+ ovs_assert(counter == n_elems);
+}
+
+static void
+run_benchmarks(struct ovs_cmdl_context *ctx)
+{
+ long int l_threads;
+ long int l_elems;
+ size_t i;
+
+ ovsrcu_quiesce_start();
+
+ l_elems = strtol(ctx->argv[1], NULL, 10);
+ l_threads = strtol(ctx->argv[2], NULL, 10);
+ ovs_assert(l_elems > 0 && l_threads > 0);
+
+ n_elems = l_elems;
+ n_threads = l_threads;
+
+ elements = xcalloc(n_elems, sizeof *elements);
+ thread_working_ms = xcalloc(n_threads, sizeof *thread_working_ms);
+
+ printf("Benchmarking n=%u on 1 + %u threads.\n", n_elems, n_threads);
+
+ printf(" type\\thread: Reader ");
+ for (i = 0; i < n_threads; i++) {
+ printf(" %3" PRIuSIZE " ", i + 1);
+ }
+ printf(" Avg\n");
+
+ benchmark_mpsc_queue();
+#ifdef HAVE_PTHREAD_SPIN_LOCK
+ benchmark_list(false);
+#endif
+ benchmark_list(true);
+ benchmark_guarded_list();
+
+ free(thread_working_ms);
+ free(elements);
+}
+
+static const struct ovs_cmdl_command commands[] = {
+ {"check", NULL, 0, 0, run_tests, OVS_RO},
+ {"benchmark", "<nb elem> <nb threads>", 2, 2, run_benchmarks, OVS_RO},
+ {NULL, NULL, 0, 0, NULL, OVS_RO},
+};
+
+static void
+test_mpsc_queue_main(int argc, char *argv[])
+{
+ struct ovs_cmdl_context ctx = {
+ .argc = argc - optind,
+ .argv = argv + optind,
+ };
+
+ vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);
+
+ set_program_name(argv[0]);
+ ovs_cmdl_run_command(&ctx, commands);
+}
+
+OVSTEST_REGISTER("test-mpsc-queue", test_mpsc_queue_main);