diff mbox series

[ovs-dev,v1,10/23] llring: Add lockless MPMC bounded queue structure

Message ID 80b89f3f0806b3124d3bb23754f99395d20308e0.1612968146.git.grive@u256.net
State New
Headers show
Series dpif-netdev: Parallel offload processing | expand

Commit Message

Gaƫtan Rivet Feb. 10, 2021, 3:33 p.m. UTC
Add a lockless multi-producer/multi-consumer, array-based,
non-intrusive, bounded queue that will fail on overflow.

Each operation (enqueue, dequeue) uses a CAS(). As such, both producer
and consumer sides guarantee lock-free forward progress. If the queue
is full, enqueuing will fail.  Conversely, if the queue is empty,
dequeueing will fail.

The bound of the queue are restricted to power-of-twos, to allow simpler
overflow on unsigned position markers.

Signed-off-by: Gaetan Rivet <grive@u256.net>
Reviewed-by: Eli Britstein <elibr@nvidia.com>
---
 lib/automake.mk |   2 +
 lib/llring.c    | 153 ++++++++++++++++++++++++++++++++++++++++++++++++
 lib/llring.h    |  76 ++++++++++++++++++++++++
 3 files changed, 231 insertions(+)
 create mode 100644 lib/llring.c
 create mode 100644 lib/llring.h
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index cbdda460a..45948e519 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -156,6 +156,8 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/learn.h \
 	lib/learning-switch.c \
 	lib/learning-switch.h \
+	lib/llring.c \
+	lib/llring.h \
 	lib/lockfile.c \
 	lib/lockfile.h \
 	lib/mac-learning.c \
diff --git a/lib/llring.c b/lib/llring.c
new file mode 100644
index 000000000..1fb930017
--- /dev/null
+++ b/lib/llring.c
@@ -0,0 +1,153 @@ 
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "ovs-atomic.h"
+
+#include "llring.h"
+
+/* A queue element.
+ * Calling 'llring_create' will allocate an array of such elements,
+ * that will hold the inserted data.
+ */
+struct llring_node {
+    atomic_uint32_t seq;
+    uint32_t data;
+};
+
+/* A ring description.
+ * The head and tail of the ring are padded to avoid false-sharing,
+ * which improves slightly multi-thread performance, at the cost
+ * of some memory.
+ */
+struct llring {
+    PADDED_MEMBERS(CACHE_LINE_SIZE, atomic_uint32_t head;);
+    PADDED_MEMBERS(CACHE_LINE_SIZE, atomic_uint32_t tail;);
+    uint32_t mask;
+    struct llring_node nodes[0];
+};
+
+struct llring *
+llring_create(uint32_t size)
+{
+    struct llring *r;
+    uint32_t i;
+
+    if (size < 2 || !IS_POW2(size)) {
+        return NULL;
+    }
+
+    r = xmalloc(sizeof *r + size * sizeof r->nodes[0]);
+
+    r->mask = size - 1;
+    for (i = 0; i < size; i++) {
+        atomic_store_relaxed(&r->nodes[i].seq, i);
+    }
+    atomic_store_relaxed(&r->head, 0);
+    atomic_store_relaxed(&r->tail, 0);
+
+    return r;
+}
+
+void
+llring_destroy(struct llring *r)
+{
+    free(r);
+}
+
+bool
+llring_enqueue(struct llring *r, uint32_t data)
+{
+    struct llring_node *node;
+    uint32_t pos;
+
+    atomic_read_relaxed(&r->head, &pos);
+    while (true) {
+        int64_t diff;
+        uint32_t seq;
+
+        node = &r->nodes[pos & r->mask];
+        atomic_read_explicit(&node->seq, &seq, memory_order_acquire);
+        diff = (int64_t) seq - (int64_t) pos;
+
+        if (diff < 0) {
+            /* Current ring[head].seq is from previous ring generation,
+             * ring is full and enqueue fails. */
+            return false;
+        }
+
+        if (diff == 0) {
+            /* If head == ring[head].seq, then the slot is free,
+             * attempt to take it by moving the head, if no one moved it since.
+             */
+            if (atomic_compare_exchange_weak_explicit(&r->head, &pos, pos + 1,
+                                                      memory_order_relaxed,
+                                                      memory_order_relaxed)) {
+                break;
+            }
+        } else {
+            /* Someone changed the head since last read, retry. */
+            atomic_read_relaxed(&r->head, &pos);
+        }
+    }
+
+    node->data = data;
+    atomic_store_explicit(&node->seq, pos + 1, memory_order_release);
+    return true;
+}
+
+bool
+llring_dequeue(struct llring *r, uint32_t *data)
+{
+    struct llring_node *node;
+    uint32_t pos;
+
+    atomic_read_relaxed(&r->tail, &pos);
+    while (true) {
+        int64_t diff;
+        uint32_t seq;
+
+        node = &r->nodes[pos & r->mask];
+        atomic_read_explicit(&node->seq, &seq, memory_order_acquire);
+        diff = (int64_t) seq - (int64_t) (pos + 1);
+
+        if (diff < 0) {
+            /* Current ring[tail + 1].seq is from previous ring generation,
+             * ring is empty and dequeue fails. */
+            return false;
+        }
+
+        if (diff == 0) {
+            /* If tail + 1 == ring[tail + 1].seq, then the slot is allocated,
+             * attempt to free it by moving the tail, if no one moved it since.
+             */
+            if (atomic_compare_exchange_weak_explicit(&r->tail, &pos, pos + 1,
+                                                      memory_order_relaxed,
+                                                      memory_order_relaxed)) {
+                break;
+            }
+        } else {
+            /* Someone changed the tail since last read, retry. */
+            atomic_read_relaxed(&r->tail, &pos);
+        }
+    }
+
+    *data = node->data;
+    /* Advance the slot to next gen by adding r->mask + 1 to its sequence. */
+    atomic_store_explicit(&node->seq, pos + r->mask + 1, memory_order_release);
+    return true;
+}
diff --git a/lib/llring.h b/lib/llring.h
new file mode 100644
index 000000000..f97baa343
--- /dev/null
+++ b/lib/llring.h
@@ -0,0 +1,76 @@ 
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include "ovs-atomic.h"
+
+/* Bounded lockless queue
+ * ======================
+ *
+ * A lockless FIFO queue bounded to a known size.
+ * Each operation (insert, remove) uses one CAS().
+ *
+ * The structure is:
+ *
+ *   Multi-producer: multiple threads can write to it
+ *   concurrently.
+ *
+ *   Multi-consumer: multiple threads can read from it
+ *   concurrently.
+ *
+ *   Bounded: the queue is backed by external memory.
+ *   No new allocation is made on insertion, only the
+ *   used elements in the queue are marked as such.
+ *   The boundary of the queue is defined as the size given
+ *   at init, which must be a power of two.
+ *
+ *   Failing: when an operation (enqueue, dequeue) cannot
+ *   be performed due to the queue being full/empty, the
+ *   operation immediately fails, instead of waiting on
+ *   a state change.
+ *
+ *   Non-intrusive: queue elements are allocated prior to
+ *   initialization.  Data is shallow-copied to those
+ *   allocated elements.
+ *
+ * Thread safety
+ * =============
+ *
+ * The queue is thread-safe for MPMC case.
+ * No lock is taken by the queue.  The queue guarantees
+ * lock-free forward progress for each of its operations.
+ *
+ */
+
+/* Create a circular lockless ring.
+ * The 'size' parameter must be a power-of-two higher than 2,
+ * otherwise allocation will fail.
+ */
+struct llring;
+struct llring *llring_create(uint32_t size);
+
+/* Free a lockless ring. */
+void llring_destroy(struct llring *r);
+
+/* 'data' is copied to the latest free slot in the queue. */
+bool llring_enqueue(struct llring *r, uint32_t data);
+
+/* The value within the oldest slot taken in the queue is copied
+ * to the address pointed by 'data'.
+ */
+bool llring_dequeue(struct llring *r, uint32_t *data);