get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/1525723/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 1525723,
    "url": "http://patchwork.ozlabs.org/api/patches/1525723/?format=api",
    "web_url": "http://patchwork.ozlabs.org/project/openvswitch/patch/a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net/",
    "project": {
        "id": 47,
        "url": "http://patchwork.ozlabs.org/api/projects/47/?format=api",
        "name": "Open vSwitch",
        "link_name": "openvswitch",
        "list_id": "ovs-dev.openvswitch.org",
        "list_email": "ovs-dev@openvswitch.org",
        "web_url": "http://openvswitch.org/",
        "scm_url": "git@github.com:openvswitch/ovs.git",
        "webscm_url": "https://github.com/openvswitch/ovs",
        "list_archive_url": "",
        "list_archive_url_format": "",
        "commit_url_format": ""
    },
    "msgid": "<a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net>",
    "list_archive_url": null,
    "date": "2021-09-08T09:47:36",
    "name": "[ovs-dev,v5,12/27] mpsc-queue: Module for lock-free message passing",
    "commit_ref": "5396ba5b21c4eae9726f4726786e53d317ccfdce",
    "pull_url": null,
    "state": "accepted",
    "archived": false,
    "hash": "bfa204f40b2c922fc1225ab1560bc4b5a6a8935c",
    "submitter": {
        "id": 78795,
        "url": "http://patchwork.ozlabs.org/api/people/78795/?format=api",
        "name": "Gaetan Rivet",
        "email": "grive@u256.net"
    },
    "delegate": null,
    "mbox": "http://patchwork.ozlabs.org/project/openvswitch/patch/a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net/mbox/",
    "series": [
        {
            "id": 261424,
            "url": "http://patchwork.ozlabs.org/api/series/261424/?format=api",
            "web_url": "http://patchwork.ozlabs.org/project/openvswitch/list/?series=261424",
            "date": "2021-09-08T09:47:24",
            "name": "dpif-netdev: Parallel offload processing",
            "version": 5,
            "mbox": "http://patchwork.ozlabs.org/series/261424/mbox/"
        }
    ],
    "comments": "http://patchwork.ozlabs.org/api/patches/1525723/comments/",
    "check": "warning",
    "checks": "http://patchwork.ozlabs.org/api/patches/1525723/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<ovs-dev-bounces@openvswitch.org>",
        "X-Original-To": [
            "incoming@patchwork.ozlabs.org",
            "ovs-dev@openvswitch.org"
        ],
        "Delivered-To": [
            "patchwork-incoming@bilbo.ozlabs.org",
            "ovs-dev@lists.linuxfoundation.org"
        ],
        "Authentication-Results": [
            "ozlabs.org;\n\tdkim=fail reason=\"signature verification failed\" (2048-bit key;\n unprotected) header.d=u256.net header.i=@u256.net header.a=rsa-sha256\n header.s=fm2 header.b=Rb2k+LYw;\n\tdkim=fail reason=\"signature verification failed\" (2048-bit key;\n unprotected) header.d=messagingengine.com header.i=@messagingengine.com\n header.a=rsa-sha256 header.s=fm3 header.b=QdY2uD2B;\n\tdkim-atps=neutral",
            "ozlabs.org;\n spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org\n (client-ip=140.211.166.138; helo=smtp1.osuosl.org;\n envelope-from=ovs-dev-bounces@openvswitch.org; receiver=<UNKNOWN>)",
            "smtp4.osuosl.org (amavisd-new);\n dkim=pass (2048-bit key) header.d=u256.net header.b=\"Rb2k+LYw\";\n dkim=pass (2048-bit key) header.d=messagingengine.com\n header.b=\"QdY2uD2B\""
        ],
        "Received": [
            "from smtp1.osuosl.org (smtp1.osuosl.org [140.211.166.138])\n\t(using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)\n\t key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest\n SHA256)\n\t(No client certificate requested)\n\tby ozlabs.org (Postfix) with ESMTPS id 4H4HR41fpWz9sW8\n\tfor <incoming@patchwork.ozlabs.org>; Wed,  8 Sep 2021 19:49:20 +1000 (AEST)",
            "from localhost (localhost [127.0.0.1])\n\tby smtp1.osuosl.org (Postfix) with ESMTP id 4CC8483F0C;\n\tWed,  8 Sep 2021 09:49:18 +0000 (UTC)",
            "from smtp1.osuosl.org ([127.0.0.1])\n\tby localhost (smtp1.osuosl.org [127.0.0.1]) (amavisd-new, port 10024)\n\twith ESMTP id Li3P0W4W0n11; Wed,  8 Sep 2021 09:49:11 +0000 (UTC)",
            "from lists.linuxfoundation.org (lf-lists.osuosl.org\n [IPv6:2605:bc80:3010:104::8cd3:938])\n\tby smtp1.osuosl.org (Postfix) with ESMTPS id 7BA7483E58;\n\tWed,  8 Sep 2021 09:49:07 +0000 (UTC)",
            "from lf-lists.osuosl.org (localhost [127.0.0.1])\n\tby lists.linuxfoundation.org (Postfix) with ESMTP id 39541C0030;\n\tWed,  8 Sep 2021 09:49:05 +0000 (UTC)",
            "from smtp4.osuosl.org (smtp4.osuosl.org [140.211.166.137])\n by lists.linuxfoundation.org (Postfix) with ESMTP id 19791C0020\n for <ovs-dev@openvswitch.org>; Wed,  8 Sep 2021 09:49:03 +0000 (UTC)",
            "from localhost (localhost [127.0.0.1])\n by smtp4.osuosl.org (Postfix) with ESMTP id 2C626406C3\n for <ovs-dev@openvswitch.org>; Wed,  8 Sep 2021 09:48:24 +0000 (UTC)",
            "from smtp4.osuosl.org ([127.0.0.1])\n by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024)\n with ESMTP id O5tR8M7Rqfgs for <ovs-dev@openvswitch.org>;\n Wed,  8 Sep 2021 09:48:21 +0000 (UTC)",
            "from wout3-smtp.messagingengine.com (wout3-smtp.messagingengine.com\n [64.147.123.19])\n by smtp4.osuosl.org (Postfix) with ESMTPS id 45C6840794\n for <ovs-dev@openvswitch.org>; Wed,  8 Sep 2021 09:48:21 +0000 (UTC)",
            "from compute5.internal (compute5.nyi.internal [10.202.2.45])\n by mailout.west.internal (Postfix) with ESMTP id 8EF9A32009D5;\n Wed,  8 Sep 2021 05:48:20 -0400 (EDT)",
            "from mailfrontend2 ([10.202.2.163])\n by compute5.internal (MEProxy); Wed, 08 Sep 2021 05:48:20 -0400",
            "by mail.messagingengine.com (Postfix) with ESMTPA; Wed,\n 8 Sep 2021 05:48:19 -0400 (EDT)"
        ],
        "X-Virus-Scanned": [
            "amavisd-new at osuosl.org",
            "amavisd-new at osuosl.org"
        ],
        "X-Greylist": "from auto-whitelisted by SQLgrey-1.8.0",
        "DKIM-Signature": [
            "v=1; a=rsa-sha256; c=relaxed/relaxed; d=u256.net; h=from\n :to:cc:subject:date:message-id:in-reply-to:references\n :mime-version:content-transfer-encoding; s=fm2; bh=yhJtu1g26vkv8\n OyhYUY/gKFutzdKMMVNBqytIqHaUxE=; b=Rb2k+LYwWTo7wEDYfbeCsmz7n/GMo\n KR22YZGsZAV2rcqvVTUWsLjVM06ySM3dGdnDinJWs/HUqNiwD7IMxyk2BUTNdck9\n U3p5VwNWZVK1cnmCD8EUkH2S2DX7IGm8d40K3eQsiVktjDuTqLbIc6VHoFhf5QfU\n YqJuv4qH5JlZLum8xHM820pAsj3f9hgIwkAjLda4zJKZikITIW4ye2G7ADcLVa2p\n 7voAX7ZdursWlGi6ZDH2lXzsXRXxg/3Dn0IUIakN1SzN9a1ZH41rHBvR7aKMhneM\n iAcMZUmaKKtakjRHrb/G3ZSKjBGRe0xsRS8mJGo+QXCdAy+c42Fxg7Vyg==",
            "v=1; a=rsa-sha256; c=relaxed/relaxed; d=\n messagingengine.com; h=cc:content-transfer-encoding:date:from\n :in-reply-to:message-id:mime-version:references:subject:to\n :x-me-proxy:x-me-proxy:x-me-sender:x-me-sender:x-sasl-enc; s=\n fm3; bh=yhJtu1g26vkv8OyhYUY/gKFutzdKMMVNBqytIqHaUxE=; b=QdY2uD2B\n 6BIMU/TNCxL4CWCtwk7cgRunQJ0YFRDM/R/zvEXQ5ncdCmjwWl7Jbi76m8/J60LB\n BlO53CT3zhRl2FVzhUqUIWhzEuA/UxqKqL5KIFWgN5kgexlr/GTwYjvafARD/H0M\n z2lOCNjuSDMg5b1CrkvSBXeru250GT5jgfwX/IcROKVoEHlLHW4GO+Lz4ELSpTn1\n hSSkGeNgLe2DSYv1vhOdLXgYMO3+qYrrtAPHA2XhuKQtTo2idU+KwiQsoBk9HwWP\n fn8Nm3O3v50WY+w3xtLtBOFL897KtpYc04cEH2lr15dYFHlPAHmQLUDANq4yheIG\n XoA9wyXlRFbxPw=="
        ],
        "X-ME-Sender": "<xms:5IY4YRzhBWzadmApsveL7xWxm02j8jzCqPwYM9wkcwzr1rBwkQdw6w>\n <xme:5IY4YRQdhlyTCLcAGuLjQAuYhBb_KqFS2zyRv433dGYnNFmOMnTO6NwTfGnp9zbQx\n ulqCJAWkHDBPDD5hMI>",
        "X-ME-Received": "\n <xmr:5IY4YbWxCgGd9RGl6PUlxLTX7Iow31ziAQDRWFByKBhx6axIFGkbIKohAjvvWXsE37nsEkQKMcBzNUTM68Vda8vQzA>",
        "X-ME-Proxy-Cause": "\n gggruggvucftvghtrhhoucdtuddrgedvtddrudefjedgudekucetufdoteggodetrfdotf\n fvucfrrhhofhhilhgvmecuhfgrshhtofgrihhlpdfqfgfvpdfurfetoffkrfgpnffqhgen\n uceurghilhhouhhtmecufedttdenucesvcftvggtihhpihgvnhhtshculddquddttddmne\n cujfgurhephffvufffkffojghfggfgsedtkeertdertddtnecuhfhrohhmpefirggvthgr\n nhcutfhivhgvthcuoehgrhhivhgvsehuvdehiedrnhgvtheqnecuggftrfgrthhtvghrnh\n eptedthefhudeugffgffelhffffedtkeefudefjedvgffhgfdttdfgueeuhfffgfejnecu\n ffhomhgrihhnpegrphgrtghhvgdrohhrghdpuddtvdegtghorhgvshdrnhgvthdprhhotg\n hhvghsthgvrhdrvgguuhenucevlhhushhtvghrufhiiigvpedtnecurfgrrhgrmhepmhgr\n ihhlfhhrohhmpehgrhhivhgvsehuvdehiedrnhgvth",
        "X-ME-Proxy": "<xmx:5IY4YTi09RoQQR4vIWJ-GEG60k9uUq124F_D-uNJxkFECzn3cQiZVQ>\n <xmx:5IY4YTDQayr0eWlZN7J3Q8LRyOmHhOfliRcwT6hHKiaTIIM3dSIP2g>\n <xmx:5IY4YcJgGfrTcEc3Uphzkn7tu86CwsdJyPlp7UsUUGrNBylZ58fLcQ>\n <xmx:5IY4YY4OCgRMQnz4ZQ_P6A-m9Ythwu8jguyGm3Bza7HPiMYO_YGIHA>",
        "From": "Gaetan Rivet <grive@u256.net>",
        "To": "ovs-dev@openvswitch.org",
        "Date": "Wed,  8 Sep 2021 11:47:36 +0200",
        "Message-Id": "\n <a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net>",
        "X-Mailer": "git-send-email 2.31.1",
        "In-Reply-To": "<cover.1631094144.git.grive@u256.net>",
        "References": "<cover.1631094144.git.grive@u256.net>",
        "MIME-Version": "1.0",
        "Cc": "Eli Britstein <elibr@nvidia.com>,\n Maxime Coquelin <maxime.coquelin@redhat.com>",
        "Subject": "[ovs-dev] [PATCH v5 12/27] mpsc-queue: Module for lock-free message\n\tpassing",
        "X-BeenThere": "ovs-dev@openvswitch.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "<ovs-dev.openvswitch.org>",
        "List-Unsubscribe": "<https://mail.openvswitch.org/mailman/options/ovs-dev>,\n <mailto:ovs-dev-request@openvswitch.org?subject=unsubscribe>",
        "List-Archive": "<http://mail.openvswitch.org/pipermail/ovs-dev/>",
        "List-Post": "<mailto:ovs-dev@openvswitch.org>",
        "List-Help": "<mailto:ovs-dev-request@openvswitch.org?subject=help>",
        "List-Subscribe": "<https://mail.openvswitch.org/mailman/listinfo/ovs-dev>,\n <mailto:ovs-dev-request@openvswitch.org?subject=subscribe>",
        "Content-Type": "text/plain; charset=\"us-ascii\"",
        "Content-Transfer-Encoding": "7bit",
        "Errors-To": "ovs-dev-bounces@openvswitch.org",
        "Sender": "\"dev\" <ovs-dev-bounces@openvswitch.org>"
    },
    "content": "Add a lockless multi-producer/single-consumer (MPSC), linked-list based,\nintrusive, unbounded queue that does not require deferred memory\nmanagement.\n\nThe queue is designed to improve the specific MPSC setup.  A benchmark\naccompanies the unit tests to measure the difference in this configuration.\nA single reader thread polls the queue while N writers enqueue elements\nas fast as possible.  The mpsc-queue is compared against the regular ovs-list\nas well as the guarded list.  The latter usually offers a slight improvement\nby batching the element removal, however the mpsc-queue is faster.\n\nThe average is of each producer threads time:\n\n   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1\n   Benchmarking n=3000000 on 1 + 1 threads.\n    type\\thread:  Reader      1    Avg\n     mpsc-queue:     167    167    167 ms\n     list(spin):      89     80     80 ms\n    list(mutex):     745    745    745 ms\n   guarded list:     788    788    788 ms\n\n   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2\n   Benchmarking n=3000000 on 1 + 2 threads.\n    type\\thread:  Reader      1      2    Avg\n     mpsc-queue:      98     97     94     95 ms\n     list(spin):     185    171    173    172 ms\n    list(mutex):     203    199    203    201 ms\n   guarded list:     269    269    188    228 ms\n\n   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3\n   Benchmarking n=3000000 on 1 + 3 threads.\n    type\\thread:  Reader      1      2      3    Avg\n     mpsc-queue:      76     76     65     76     72 ms\n     list(spin):     246    110    240    238    196 ms\n    list(mutex):     542    541    541    539    540 ms\n   guarded list:     535    535    507    511    517 ms\n\n   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4\n   Benchmarking n=3000000 on 1 + 4 threads.\n    type\\thread:  Reader      1      2      3      4    Avg\n     mpsc-queue:      73     68     68     68     68     68 ms\n     list(spin):     294    275    279    277    282    278 ms\n    list(mutex):     346    309    287    345    302    310 ms\n   guarded list:     378    319    334    378    351    345 ms\n\nSigned-off-by: Gaetan Rivet <grive@u256.net>\nReviewed-by: Eli Britstein <elibr@nvidia.com>\nReviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>\n---\n lib/automake.mk         |   2 +\n lib/mpsc-queue.c        | 251 +++++++++++++\n lib/mpsc-queue.h        | 190 ++++++++++\n tests/automake.mk       |   1 +\n tests/library.at        |   5 +\n tests/test-mpsc-queue.c | 772 ++++++++++++++++++++++++++++++++++++++++\n 6 files changed, 1221 insertions(+)\n create mode 100644 lib/mpsc-queue.c\n create mode 100644 lib/mpsc-queue.h\n create mode 100644 tests/test-mpsc-queue.c",
    "diff": "diff --git a/lib/automake.mk b/lib/automake.mk\nindex 804c8da6f..098337078 100644\n--- a/lib/automake.mk\n+++ b/lib/automake.mk\n@@ -180,6 +180,8 @@ lib_libopenvswitch_la_SOURCES = \\\n \tlib/memory.h \\\n \tlib/meta-flow.c \\\n \tlib/mov-avg.h \\\n+\tlib/mpsc-queue.c \\\n+\tlib/mpsc-queue.h \\\n \tlib/multipath.c \\\n \tlib/multipath.h \\\n \tlib/namemap.c \\\ndiff --git a/lib/mpsc-queue.c b/lib/mpsc-queue.c\nnew file mode 100644\nindex 000000000..4e99c94f7\n--- /dev/null\n+++ b/lib/mpsc-queue.c\n@@ -0,0 +1,251 @@\n+/*\n+ * Copyright (c) 2021 NVIDIA Corporation.\n+ *\n+ * Licensed under the Apache License, Version 2.0 (the \"License\");\n+ * you may not use this file except in compliance with the License.\n+ * You may obtain a copy of the License at:\n+ *\n+ *     http://www.apache.org/licenses/LICENSE-2.0\n+ *\n+ * Unless required by applicable law or agreed to in writing, software\n+ * distributed under the License is distributed on an \"AS IS\" BASIS,\n+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n+ * See the License for the specific language governing permissions and\n+ * limitations under the License.\n+ */\n+\n+#include <config.h>\n+\n+#include \"ovs-atomic.h\"\n+\n+#include \"mpsc-queue.h\"\n+\n+/* Multi-producer, single-consumer queue\n+ * =====================================\n+ *\n+ * This an implementation of the MPSC queue described by Dmitri Vyukov [1].\n+ *\n+ * One atomic exchange operation is done per insertion.  Removal in most cases\n+ * will not require atomic operation and will use one atomic exchange to close\n+ * the queue chain.\n+ *\n+ * Insertion\n+ * =========\n+ *\n+ * The queue is implemented using a linked-list.  Insertion is done at the\n+ * back of the queue, by swapping the current end with the new node atomically,\n+ * then pointing the previous end toward the new node.  To follow Vyukov\n+ * nomenclature, the end-node of the chain is called head.  A producer will\n+ * only manipulate the head.\n+ *\n+ * The head swap is atomic, however the link from the previous head to the new\n+ * one is done in a separate operation.  This means that the chain is\n+ * momentarily broken, when the previous head still points to NULL and the\n+ * current head has been inserted.\n+ *\n+ * Considering a series of insertions, the queue state will remain consistent\n+ * and the insertions order is compatible with their precedence, thus the\n+ * queue is serializable.  However, because an insertion consists in two\n+ * separate memory transactions, it is not linearizable.\n+ *\n+ * Removal\n+ * =======\n+ *\n+ * The consumer must deal with the queue inconsistency.  It will manipulate\n+ * the tail of the queue and move it along the latest consumed elements.\n+ * When an end of the chain of elements is found (the next pointer is NULL),\n+ * the tail is compared with the head.\n+ *\n+ * If both points to different addresses, then the queue is in an inconsistent\n+ * state: the tail cannot move forward as the next is NULL, but the head is not\n+ * the last element in the chain: this can only happen if the chain is broken.\n+ *\n+ * In this case, the consumer must wait for the producer to finish writing the\n+ * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.\n+ *\n+ * Removal is thus in most cases (when there are elements in the queue)\n+ * accomplished without using atomics, until the last element of the queue.\n+ * There, the head is atomically loaded. If the queue is in a consistent state,\n+ * the head is moved back to the queue stub by inserting the stub in the queue:\n+ * ending the queue is the same as an insertion, which is one atomic XCHG.\n+ *\n+ * Forward guarantees\n+ * ==================\n+ *\n+ * Insertion and peeking are wait-free: they will execute in a known bounded\n+ * number of instructions, regardless of the state of the queue.\n+ *\n+ * However, while removal consists in peeking and a constant write to\n+ * update the tail, it can repeatedly fail until the queue become consistent.\n+ * It is thus dependent on other threads progressing.  This means that the\n+ * queue forward progress is obstruction-free only.  It has a potential for\n+ * livelocking.\n+ *\n+ * The chain will remain broken as long as a producer is not finished writing\n+ * its next pointer.  If a producer is cancelled for example, the queue could\n+ * remain broken for any future readings.  This queue should either be used\n+ * with cooperative threads or insertion must only be done outside cancellable\n+ * sections.\n+ *\n+ * Performances\n+ * ============\n+ *\n+ * In benchmarks this structure was better than alternatives such as:\n+ *\n+ *   * A reversed Treiber stack [2], using 1 CAS per operations\n+ *     and requiring reversal of the node list on removal.\n+ *\n+ *   * Michael-Scott lock-free queue [3], using 2 CAS per operations.\n+ *\n+ * While it is not linearizable, this queue is well-suited for message passing.\n+ * If a proper hardware XCHG operation is used, it scales better than\n+ * CAS-based implementations.\n+ *\n+ * References\n+ * ==========\n+ *\n+ * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue\n+ *\n+ * [2]: R. K. Treiber. Systems programming: Coping with parallelism.\n+ *      Technical Report RJ 5118, IBM Almaden Research Center, April 1986.\n+ *\n+ * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and\n+ *      Blocking Concurrent Queue Algorithms\n+ * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html\n+ *\n+ */\n+\n+void\n+mpsc_queue_init(struct mpsc_queue *queue)\n+{\n+    atomic_store_relaxed(&queue->head, &queue->stub);\n+    atomic_store_relaxed(&queue->tail, &queue->stub);\n+    atomic_store_relaxed(&queue->stub.next, NULL);\n+\n+    ovs_mutex_init(&queue->read_lock);\n+}\n+\n+void\n+mpsc_queue_destroy(struct mpsc_queue *queue)\n+    OVS_EXCLUDED(queue->read_lock)\n+{\n+    ovs_mutex_destroy(&queue->read_lock);\n+}\n+\n+enum mpsc_queue_poll_result\n+mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)\n+    OVS_REQUIRES(queue->read_lock)\n+{\n+    struct mpsc_queue_node *tail;\n+    struct mpsc_queue_node *next;\n+    struct mpsc_queue_node *head;\n+\n+    atomic_read_relaxed(&queue->tail, &tail);\n+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+\n+    if (tail == &queue->stub) {\n+        if (next == NULL) {\n+            return MPSC_QUEUE_EMPTY;\n+        }\n+\n+        atomic_store_relaxed(&queue->tail, next);\n+        tail = next;\n+        atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+    }\n+\n+    if (next != NULL) {\n+        atomic_store_relaxed(&queue->tail, next);\n+        *node = tail;\n+        return MPSC_QUEUE_ITEM;\n+    }\n+\n+    atomic_read_explicit(&queue->head, &head, memory_order_acquire);\n+    if (tail != head) {\n+        return MPSC_QUEUE_RETRY;\n+    }\n+\n+    mpsc_queue_insert(queue, &queue->stub);\n+\n+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+    if (next != NULL) {\n+        atomic_store_relaxed(&queue->tail, next);\n+        *node = tail;\n+        return MPSC_QUEUE_ITEM;\n+    }\n+\n+    return MPSC_QUEUE_EMPTY;\n+}\n+\n+struct mpsc_queue_node *\n+mpsc_queue_pop(struct mpsc_queue *queue)\n+    OVS_REQUIRES(queue->read_lock)\n+{\n+    enum mpsc_queue_poll_result result;\n+    struct mpsc_queue_node *node;\n+\n+    do {\n+        result = mpsc_queue_poll(queue, &node);\n+        if (result == MPSC_QUEUE_EMPTY) {\n+            return NULL;\n+        }\n+    } while (result == MPSC_QUEUE_RETRY);\n+\n+    return node;\n+}\n+\n+void\n+mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node)\n+    OVS_REQUIRES(queue->read_lock)\n+{\n+    struct mpsc_queue_node *tail;\n+\n+    atomic_read_relaxed(&queue->tail, &tail);\n+    atomic_store_relaxed(&node->next, tail);\n+    atomic_store_relaxed(&queue->tail, node);\n+}\n+\n+struct mpsc_queue_node *\n+mpsc_queue_tail(struct mpsc_queue *queue)\n+    OVS_REQUIRES(queue->read_lock)\n+{\n+    struct mpsc_queue_node *tail;\n+    struct mpsc_queue_node *next;\n+\n+    atomic_read_relaxed(&queue->tail, &tail);\n+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+\n+    if (tail == &queue->stub) {\n+        if (next == NULL) {\n+            return NULL;\n+        }\n+\n+        atomic_store_relaxed(&queue->tail, next);\n+        tail = next;\n+    }\n+\n+    return tail;\n+}\n+\n+/* Get the next element of a node. */\n+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,\n+                                        struct mpsc_queue_node *prev)\n+    OVS_REQUIRES(queue->read_lock)\n+{\n+    struct mpsc_queue_node *next;\n+\n+    atomic_read_explicit(&prev->next, &next, memory_order_acquire);\n+    if (next == &queue->stub) {\n+        atomic_read_explicit(&next->next, &next, memory_order_acquire);\n+    }\n+    return next;\n+}\n+\n+void\n+mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)\n+{\n+    struct mpsc_queue_node *prev;\n+\n+    atomic_store_relaxed(&node->next, NULL);\n+    prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);\n+    atomic_store_explicit(&prev->next, node, memory_order_release);\n+}\ndiff --git a/lib/mpsc-queue.h b/lib/mpsc-queue.h\nnew file mode 100644\nindex 000000000..8c7109621\n--- /dev/null\n+++ b/lib/mpsc-queue.h\n@@ -0,0 +1,190 @@\n+/*\n+ * Copyright (c) 2021 NVIDIA Corporation.\n+ *\n+ * Licensed under the Apache License, Version 2.0 (the \"License\");\n+ * you may not use this file except in compliance with the License.\n+ * You may obtain a copy of the License at:\n+ *\n+ *     http://www.apache.org/licenses/LICENSE-2.0\n+ *\n+ * Unless required by applicable law or agreed to in writing, software\n+ * distributed under the License is distributed on an \"AS IS\" BASIS,\n+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n+ * See the License for the specific language governing permissions and\n+ * limitations under the License.\n+ */\n+\n+#ifndef MPSC_QUEUE_H\n+#define MPSC_QUEUE_H 1\n+\n+#include <stdbool.h>\n+#include <stdint.h>\n+#include <stddef.h>\n+\n+#include <openvswitch/thread.h>\n+#include <openvswitch/util.h>\n+\n+#include \"ovs-atomic.h\"\n+\n+/* Multi-producer, single-consumer queue\n+ * =====================================\n+ *\n+ * This data structure is a lockless queue implementation with\n+ * the following properties:\n+ *\n+ *  * Multi-producer: multiple threads can write concurrently.\n+ *    Insertion in the queue is thread-safe, no inter-thread\n+ *    synchronization is necessary.\n+ *\n+ *  * Single-consumer: only a single thread can safely remove\n+ *    nodes from the queue.  The queue must be 'acquired' using\n+ *    'mpsc_queue_acquire()' before removing nodes.\n+ *\n+ *  * Unbounded: the queue is backed by a linked-list and is not\n+ *    limited in number of elements.\n+ *\n+ *  * Intrusive: queue elements are allocated as part of larger\n+ *    objects.  Objects are retrieved by offset manipulation.\n+ *\n+ *  * per-producer FIFO: Elements in the queue are kept in the\n+ *    order their producer inserted them.  The consumer retrieves\n+ *    them in in the same insertion order.  When multiple\n+ *    producers insert at the same time, either will proceed.\n+ *\n+ * This queue is well-suited for message passing between threads,\n+ * where any number of thread can insert a message and a single\n+ * thread is meant to receive and process it.\n+ *\n+ * Thread-safety\n+ * =============\n+ *\n+ *  The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.\n+ *  Once the queue is protected against concurrent reads, the thread can call\n+ *  the consumer API:\n+ *\n+ *      * mpsc_queue_poll() to peek and return the tail of the queue\n+ *      * mpsc_queue_pop() to remove the tail of the queue\n+ *      * mpsc_queue_tail() to read the current tail\n+ *      * mpsc_queue_push_front() to enqueue an element safely at the tail\n+ *      * MPSC_QUEUE_FOR_EACH() to iterate over the current elements,\n+ *        without removing them.\n+ *      * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while\n+ *        removing them.\n+ *\n+ *  When a thread is finished with reading the queue, it can release the\n+ *  reader lock using 'mpsc_queue_release()'.\n+ *\n+ *  Producers can always insert elements in the queue, even if no consumer\n+ *  acquired the reader lock.  No inter-producer synchronization is needed.\n+ *\n+ *  The consumer thread is also allowed to insert elements while it holds the\n+ *  reader lock.\n+ *\n+ *  Producer threads must never be cancelled while writing to the queue.\n+ *  This will block the consumer, that will then lose any subsequent elements\n+ *  in the queue.  Producers should ideally be cooperatively managed or\n+ *  the queue insertion should be within non-cancellable sections.\n+ *\n+ * Queue state\n+ * ===========\n+ *\n+ *  When polling the queue, three states can be observed: 'empty', 'non-empty',\n+ *  and 'inconsistent'.  Three polling results are defined, respectively:\n+ *\n+ *   * MPSC_QUEUE_EMPTY: the queue is empty.\n+ *   * MPSC_QUEUE_ITEM: an item was available and has been removed.\n+ *   * MPSC_QUEUE_RETRY: the queue is inconsistent.\n+ *\n+ *  If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished\n+ *  writing to the queue and the list of nodes is not coherent.  The consumer\n+ *  can retry shortly to check if the producer has finished.\n+ *\n+ *  This behavior is the reason the removal function is called\n+ *  'mpsc_queue_poll()'.\n+ *\n+ */\n+\n+struct mpsc_queue_node {\n+    ATOMIC(struct mpsc_queue_node *) next;\n+};\n+\n+struct mpsc_queue {\n+    ATOMIC(struct mpsc_queue_node *) head;\n+    ATOMIC(struct mpsc_queue_node *) tail;\n+    struct mpsc_queue_node stub;\n+    struct ovs_mutex read_lock;\n+};\n+\n+#define MPSC_QUEUE_INITIALIZER(Q) { \\\n+    .head = ATOMIC_VAR_INIT(&(Q)->stub), \\\n+    .tail = ATOMIC_VAR_INIT(&(Q)->stub), \\\n+    .stub = { .next = ATOMIC_VAR_INIT(NULL) }, \\\n+    .read_lock = OVS_MUTEX_INITIALIZER, \\\n+}\n+\n+/* Consumer API. */\n+\n+/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */\n+void mpsc_queue_init(struct mpsc_queue *queue);\n+/* The reader lock must be released prior to destroying the queue. */\n+void mpsc_queue_destroy(struct mpsc_queue *queue);\n+\n+/* Acquire and release the consumer lock. */\n+#define mpsc_queue_acquire(q) do { \\\n+        ovs_mutex_lock(&(q)->read_lock); \\\n+    } while (0)\n+#define mpsc_queue_release(q) do { \\\n+        ovs_mutex_unlock(&(q)->read_lock); \\\n+    } while (0)\n+\n+enum mpsc_queue_poll_result {\n+    /* Queue is empty. */\n+    MPSC_QUEUE_EMPTY,\n+    /* Polling the queue returned an item. */\n+    MPSC_QUEUE_ITEM,\n+    /* Data has been enqueued but one or more producer thread have not\n+     * finished writing it. The queue is in an inconsistent state.\n+     * Retrying shortly, if the producer threads are still active, will\n+     * return the data.\n+     */\n+    MPSC_QUEUE_RETRY,\n+};\n+\n+/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is\n+ * returned, otherwise 'node' is not set.\n+ */\n+enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue,\n+                                            struct mpsc_queue_node **node)\n+    OVS_REQUIRES(queue->read_lock);\n+\n+/* Pop an element if there is any in the queue. */\n+struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue)\n+    OVS_REQUIRES(queue->read_lock);\n+\n+/* Insert at the front of the queue. Only the consumer can do it. */\n+void mpsc_queue_push_front(struct mpsc_queue *queue,\n+                           struct mpsc_queue_node *node)\n+    OVS_REQUIRES(queue->read_lock);\n+\n+/* Get the current queue tail. */\n+struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue)\n+    OVS_REQUIRES(queue->read_lock);\n+\n+/* Get the next element of a node. */\n+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,\n+                                        struct mpsc_queue_node *prev)\n+    OVS_REQUIRES(queue->read_lock);\n+\n+#define MPSC_QUEUE_FOR_EACH(node, queue) \\\n+    for (node = mpsc_queue_tail(queue); node != NULL; \\\n+         node = mpsc_queue_next((queue), node))\n+\n+#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \\\n+    for (node = mpsc_queue_pop(queue); node != NULL; \\\n+         node = mpsc_queue_pop(queue))\n+\n+/* Producer API. */\n+\n+void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);\n+\n+#endif /* MPSC_QUEUE_H */\ndiff --git a/tests/automake.mk b/tests/automake.mk\nindex 977779765..8fdec27ef 100644\n--- a/tests/automake.mk\n+++ b/tests/automake.mk\n@@ -472,6 +472,7 @@ tests_ovstest_SOURCES = \\\n \ttests/test-list.c \\\n \ttests/test-lockfile.c \\\n \ttests/test-multipath.c \\\n+\ttests/test-mpsc-queue.c \\\n \ttests/test-netflow.c \\\n \ttests/test-odp.c \\\n \ttests/test-ofpbuf.c \\\ndiff --git a/tests/library.at b/tests/library.at\nindex 42a5ce1aa..661e95727 100644\n--- a/tests/library.at\n+++ b/tests/library.at\n@@ -265,3 +265,8 @@ AT_SKIP_IF([test \"$IS_WIN32\" = \"yes\"])\n AT_SKIP_IF([test \"$IS_BSD\" = \"yes\"])\n AT_CHECK([ovstest test-netlink-policy ll_addr], [0])\n AT_CLEANUP\n+\n+AT_SETUP([mpsc-queue module])\n+AT_CHECK([ovstest test-mpsc-queue check], [0], [....\n+])\n+AT_CLEANUP\ndiff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c\nnew file mode 100644\nindex 000000000..a38bf9e6d\n--- /dev/null\n+++ b/tests/test-mpsc-queue.c\n@@ -0,0 +1,772 @@\n+/*\n+ * Copyright (c) 2021 NVIDIA Corporation.\n+ *\n+ * Licensed under the Apache License, Version 2.0 (the \"License\");\n+ * you may not use this file except in compliance with the License.\n+ * You may obtain a copy of the License at:\n+ *\n+ *     http://www.apache.org/licenses/LICENSE-2.0\n+ *\n+ * Unless required by applicable law or agreed to in writing, software\n+ * distributed under the License is distributed on an \"AS IS\" BASIS,\n+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n+ * See the License for the specific language governing permissions and\n+ * limitations under the License.\n+ */\n+\n+#undef NDEBUG\n+#include <assert.h>\n+#include <getopt.h>\n+#include <string.h>\n+\n+#include <config.h>\n+\n+#include \"command-line.h\"\n+#include \"guarded-list.h\"\n+#include \"mpsc-queue.h\"\n+#include \"openvswitch/list.h\"\n+#include \"openvswitch/util.h\"\n+#include \"openvswitch/vlog.h\"\n+#include \"ovs-rcu.h\"\n+#include \"ovs-thread.h\"\n+#include \"ovstest.h\"\n+#include \"timeval.h\"\n+#include \"util.h\"\n+\n+struct element {\n+    union {\n+        struct mpsc_queue_node mpscq;\n+        struct ovs_list list;\n+    } node;\n+    uint64_t mark;\n+};\n+\n+static void\n+test_mpsc_queue_mark_element(struct mpsc_queue_node *node,\n+                             uint64_t mark,\n+                             unsigned int *counter)\n+{\n+    struct element *elem;\n+\n+    elem = CONTAINER_OF(node, struct element, node.mpscq);\n+    elem->mark = mark;\n+    *counter += 1;\n+}\n+\n+static void\n+test_mpsc_queue_insert(void)\n+{\n+    struct element elements[100];\n+    struct mpsc_queue_node *node;\n+    struct mpsc_queue queue;\n+    unsigned int counter;\n+    size_t i;\n+\n+    memset(elements, 0, sizeof(elements));\n+    mpsc_queue_init(&queue);\n+    mpsc_queue_acquire(&queue);\n+\n+    for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);\n+    }\n+\n+    counter = 0;\n+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+        test_mpsc_queue_mark_element(node, 1, &counter);\n+    }\n+\n+    mpsc_queue_release(&queue);\n+    mpsc_queue_destroy(&queue);\n+\n+    ovs_assert(counter == ARRAY_SIZE(elements));\n+    for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+        ovs_assert(elements[i].mark == 1);\n+    }\n+\n+    printf(\".\");\n+}\n+\n+static void\n+test_mpsc_queue_removal_fifo(void)\n+{\n+    struct element elements[100];\n+    struct mpsc_queue_node *node;\n+    struct mpsc_queue queue;\n+    unsigned int counter;\n+    size_t i;\n+\n+    memset(elements, 0, sizeof(elements));\n+\n+    mpsc_queue_init(&queue);\n+    mpsc_queue_acquire(&queue);\n+\n+    for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);\n+    }\n+\n+    /* Elements are in the same order in the list as they\n+     * were declared / initialized.\n+     */\n+    counter = 0;\n+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+        test_mpsc_queue_mark_element(node, counter, &counter);\n+    }\n+\n+    /* The list is valid once extracted from the queue,\n+     * the queue can be destroyed here.\n+     */\n+    mpsc_queue_release(&queue);\n+    mpsc_queue_destroy(&queue);\n+\n+    for (i = 0; i < ARRAY_SIZE(elements) - 1; i++) {\n+        struct element *e1, *e2;\n+\n+        e1 = &elements[i];\n+        e2 = &elements[i + 1];\n+\n+        ovs_assert(e1->mark < e2->mark);\n+    }\n+\n+    printf(\".\");\n+}\n+\n+/* Partial insert:\n+ *\n+ * Those functions are 'mpsc_queue_insert()' divided in two parts.\n+ * They serve to test the behavior of the queue when forcing the potential\n+ * condition of a thread starting an insertion then yielding.\n+ */\n+static struct mpsc_queue_node *\n+mpsc_queue_insert_begin(struct mpsc_queue *queue, struct mpsc_queue_node *node)\n+{\n+    struct mpsc_queue_node *prev;\n+\n+    atomic_store_explicit(&node->next, NULL, memory_order_relaxed);\n+    prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);\n+    return prev;\n+}\n+\n+static void\n+mpsc_queue_insert_end(struct mpsc_queue_node *prev,\n+                      struct mpsc_queue_node *node)\n+{\n+    atomic_store_explicit(&prev->next, node, memory_order_release);\n+}\n+\n+static void\n+test_mpsc_queue_insert_partial(void)\n+{\n+    struct element elements[10];\n+    struct mpsc_queue_node *prevs[ARRAY_SIZE(elements)];\n+    struct mpsc_queue_node *node;\n+    struct mpsc_queue queue, *q = &queue;\n+    size_t i;\n+\n+    mpsc_queue_init(q);\n+\n+    /* Insert the first half of elements entirely,\n+     * insert the second hald of elements partially.\n+     */\n+    for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+        elements[i].mark = i;\n+        if (i > ARRAY_SIZE(elements) / 2) {\n+            prevs[i] = mpsc_queue_insert_begin(q, &elements[i].node.mpscq);\n+        } else {\n+            prevs[i] = NULL;\n+            mpsc_queue_insert(q, &elements[i].node.mpscq);\n+        }\n+    }\n+\n+    mpsc_queue_acquire(q);\n+\n+    /* Verify that when the chain is broken, iterators will stop. */\n+    i = 0;\n+    MPSC_QUEUE_FOR_EACH (node, q) {\n+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+        ovs_assert(e == &elements[i]);\n+        i++;\n+    }\n+    ovs_assert(i < ARRAY_SIZE(elements));\n+\n+    for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+        if (prevs[i] != NULL) {\n+            mpsc_queue_insert_end(prevs[i], &elements[i].node.mpscq);\n+        }\n+    }\n+\n+    i = 0;\n+    MPSC_QUEUE_FOR_EACH (node, q) {\n+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+        ovs_assert(e == &elements[i]);\n+        i++;\n+    }\n+    ovs_assert(i == ARRAY_SIZE(elements));\n+\n+    MPSC_QUEUE_FOR_EACH_POP (node, q) {\n+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+        ovs_assert(e->mark == (unsigned int)(e - elements));\n+    }\n+\n+    mpsc_queue_release(q);\n+    mpsc_queue_destroy(q);\n+\n+    printf(\".\");\n+}\n+\n+static void\n+test_mpsc_queue_push_front(void)\n+{\n+    struct mpsc_queue queue, *q = &queue;\n+    struct mpsc_queue_node *node;\n+    struct element elements[10];\n+    size_t i;\n+\n+    mpsc_queue_init(q);\n+    mpsc_queue_acquire(q);\n+\n+    ovs_assert(mpsc_queue_pop(q) == NULL);\n+    mpsc_queue_push_front(q, &elements[0].node.mpscq);\n+    node = mpsc_queue_pop(q);\n+    ovs_assert(node == &elements[0].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == NULL);\n+\n+    mpsc_queue_push_front(q, &elements[0].node.mpscq);\n+    mpsc_queue_push_front(q, &elements[1].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == NULL);\n+\n+    mpsc_queue_push_front(q, &elements[1].node.mpscq);\n+    mpsc_queue_push_front(q, &elements[0].node.mpscq);\n+    mpsc_queue_insert(q, &elements[2].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == &elements[2].node.mpscq);\n+    ovs_assert(mpsc_queue_pop(q) == NULL);\n+\n+    for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+        elements[i].mark = i;\n+        mpsc_queue_insert(q, &elements[i].node.mpscq);\n+    }\n+\n+    node = mpsc_queue_pop(q);\n+    mpsc_queue_push_front(q, node);\n+    ovs_assert(mpsc_queue_pop(q) == node);\n+    mpsc_queue_push_front(q, node);\n+\n+    i = 0;\n+    MPSC_QUEUE_FOR_EACH (node, q) {\n+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+        ovs_assert(e == &elements[i]);\n+        i++;\n+    }\n+    ovs_assert(i == ARRAY_SIZE(elements));\n+\n+    MPSC_QUEUE_FOR_EACH_POP (node, q) {\n+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+        ovs_assert(e->mark == (unsigned int)(e - elements));\n+    }\n+\n+    mpsc_queue_release(q);\n+    mpsc_queue_destroy(q);\n+\n+    printf(\".\");\n+}\n+\n+static void\n+run_tests(struct ovs_cmdl_context *ctx OVS_UNUSED)\n+{\n+    /* Verify basic insertion. */\n+    test_mpsc_queue_insert();\n+    /* Test partial insertion. */\n+    test_mpsc_queue_insert_partial();\n+    /* Verify removal order is respected. */\n+    test_mpsc_queue_removal_fifo();\n+    /* Verify tail-end insertion works. */\n+    test_mpsc_queue_push_front();\n+    printf(\"\\n\");\n+}\n+\f\n+static struct element *elements;\n+static uint64_t *thread_working_ms; /* Measured work time. */\n+\n+static unsigned int n_threads;\n+static unsigned int n_elems;\n+\n+static struct ovs_barrier barrier;\n+static volatile bool working;\n+\n+static int\n+elapsed(const struct timeval *start)\n+{\n+    struct timeval end;\n+\n+    xgettimeofday(&end);\n+    return timeval_to_msec(&end) - timeval_to_msec(start);\n+}\n+\n+static void\n+print_result(const char *prefix, int reader_elapsed)\n+{\n+    uint64_t avg;\n+    size_t i;\n+\n+    avg = 0;\n+    for (i = 0; i < n_threads; i++) {\n+        avg += thread_working_ms[i];\n+    }\n+    avg /= n_threads;\n+    printf(\"%s:  %6d\", prefix, reader_elapsed);\n+    for (i = 0; i < n_threads; i++) {\n+        printf(\" %6\" PRIu64, thread_working_ms[i]);\n+    }\n+    printf(\" %6\" PRIu64 \" ms\\n\", avg);\n+}\n+\n+struct mpscq_aux {\n+    struct mpsc_queue *queue;\n+    atomic_uint thread_id;\n+};\n+\n+static void *\n+mpsc_queue_insert_thread(void *aux_)\n+{\n+    unsigned int n_elems_per_thread;\n+    struct element *th_elements;\n+    struct mpscq_aux *aux = aux_;\n+    struct timeval start;\n+    unsigned int id;\n+    size_t i;\n+\n+    atomic_add(&aux->thread_id, 1u, &id);\n+    n_elems_per_thread = n_elems / n_threads;\n+    th_elements = &elements[id * n_elems_per_thread];\n+\n+    ovs_barrier_block(&barrier);\n+    xgettimeofday(&start);\n+\n+    for (i = 0; i < n_elems_per_thread; i++) {\n+        mpsc_queue_insert(aux->queue, &th_elements[i].node.mpscq);\n+    }\n+\n+    thread_working_ms[id] = elapsed(&start);\n+    ovs_barrier_block(&barrier);\n+\n+    working = false;\n+\n+    return NULL;\n+}\n+\n+static void\n+benchmark_mpsc_queue(void)\n+{\n+    struct mpsc_queue_node *node;\n+    struct mpsc_queue queue;\n+    struct timeval start;\n+    unsigned int counter;\n+    bool work_complete;\n+    pthread_t *threads;\n+    struct mpscq_aux aux;\n+    uint64_t epoch;\n+    size_t i;\n+\n+    memset(elements, 0, n_elems & sizeof *elements);\n+    memset(thread_working_ms, 0, n_threads & sizeof *thread_working_ms);\n+\n+    mpsc_queue_init(&queue);\n+\n+    aux.queue = &queue;\n+    atomic_store(&aux.thread_id, 0);\n+\n+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {\n+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);\n+    }\n+\n+    working = true;\n+\n+    threads = xmalloc(n_threads * sizeof *threads);\n+    ovs_barrier_init(&barrier, n_threads);\n+\n+    for (i = 0; i < n_threads; i++) {\n+        threads[i] = ovs_thread_create(\"sc_queue_insert\",\n+                                       mpsc_queue_insert_thread, &aux);\n+    }\n+\n+    mpsc_queue_acquire(&queue);\n+    xgettimeofday(&start);\n+\n+    counter = 0;\n+    epoch = 1;\n+    do {\n+        while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+            test_mpsc_queue_mark_element(node, epoch, &counter);\n+        }\n+        if (epoch == UINT64_MAX) {\n+            epoch = 0;\n+        }\n+        epoch++;\n+    } while (working);\n+\n+    for (i = 0; i < n_threads; i++) {\n+        xpthread_join(threads[i], NULL);\n+    }\n+\n+    /* Elements might have been inserted before threads were joined. */\n+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+        test_mpsc_queue_mark_element(node, epoch, &counter);\n+    }\n+\n+    print_result(\"  mpsc-queue\", elapsed(&start));\n+\n+    mpsc_queue_release(&queue);\n+    mpsc_queue_destroy(&queue);\n+    ovs_barrier_destroy(&barrier);\n+    free(threads);\n+\n+    work_complete = true;\n+    for (i = 0; i < n_elems; i++) {\n+        if (elements[i].mark == 0) {\n+            printf(\"Element %\" PRIuSIZE \" was never consumed.\\n\", i);\n+            work_complete = false;\n+        }\n+    }\n+    ovs_assert(work_complete);\n+    ovs_assert(counter == n_elems);\n+}\n+\n+#ifdef HAVE_PTHREAD_SPIN_LOCK\n+#define spin_lock_type       ovs_spin\n+#define spin_lock_init(l)    ovs_spin_init(l)\n+#define spin_lock_destroy(l) ovs_spin_destroy(l)\n+#define spin_lock(l)         ovs_spin_lock(l)\n+#define spin_unlock(l)       ovs_spin_unlock(l)\n+#else\n+#define spin_lock_type       ovs_mutex\n+#define spin_lock_init(l)    ovs_mutex_init(l)\n+#define spin_lock_destroy(l) ovs_mutex_destroy(l)\n+#define spin_lock(l)         ovs_mutex_lock(l)\n+#define spin_unlock(l)       ovs_mutex_unlock(l)\n+#endif\n+\n+struct list_aux {\n+    struct ovs_list *list;\n+    struct ovs_mutex *mutex;\n+    struct spin_lock_type *spin;\n+    atomic_uint thread_id;\n+};\n+\n+static void *\n+locked_list_insert_main(void *aux_)\n+    OVS_NO_THREAD_SAFETY_ANALYSIS\n+{\n+    unsigned int n_elems_per_thread;\n+    struct element *th_elements;\n+    struct list_aux *aux = aux_;\n+    struct timeval start;\n+    unsigned int id;\n+    size_t i;\n+\n+    atomic_add(&aux->thread_id, 1u, &id);\n+    n_elems_per_thread = n_elems / n_threads;\n+    th_elements = &elements[id * n_elems_per_thread];\n+\n+    ovs_barrier_block(&barrier);\n+    xgettimeofday(&start);\n+\n+    for (i = 0; i < n_elems_per_thread; i++) {\n+        aux->mutex ? ovs_mutex_lock(aux->mutex)\n+                   : spin_lock(aux->spin);\n+        ovs_list_push_front(aux->list, &th_elements[i].node.list);\n+        aux->mutex ? ovs_mutex_unlock(aux->mutex)\n+                   : spin_unlock(aux->spin);\n+    }\n+\n+    thread_working_ms[id] = elapsed(&start);\n+    ovs_barrier_block(&barrier);\n+\n+    working = false;\n+\n+    return NULL;\n+}\n+\n+static void\n+benchmark_list(bool use_mutex)\n+{\n+    struct ovs_mutex mutex;\n+    struct spin_lock_type spin;\n+    struct ovs_list list;\n+    struct element *elem;\n+    struct timeval start;\n+    unsigned int counter;\n+    bool work_complete;\n+    pthread_t *threads;\n+    struct list_aux aux;\n+    uint64_t epoch;\n+    size_t i;\n+\n+    memset(elements, 0, n_elems * sizeof *elements);\n+    memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);\n+\n+    use_mutex ? ovs_mutex_init(&mutex) : spin_lock_init(&spin);\n+\n+    ovs_list_init(&list);\n+\n+    aux.list = &list;\n+    aux.mutex = use_mutex ? &mutex : NULL;\n+    aux.spin = use_mutex ? NULL : &spin;\n+    atomic_store(&aux.thread_id, 0);\n+\n+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {\n+        ovs_list_push_front(&list, &elements[i].node.list);\n+    }\n+\n+    working = true;\n+\n+    threads = xmalloc(n_threads * sizeof *threads);\n+    ovs_barrier_init(&barrier, n_threads);\n+\n+    for (i = 0; i < n_threads; i++) {\n+        threads[i] = ovs_thread_create(\"locked_list_insert\",\n+                                       locked_list_insert_main, &aux);\n+    }\n+\n+    xgettimeofday(&start);\n+\n+    counter = 0;\n+    epoch = 1;\n+    do {\n+        if (use_mutex) {\n+            ovs_mutex_lock(&mutex);\n+            LIST_FOR_EACH_POP (elem, node.list, &list) {\n+                elem->mark = epoch;\n+                counter++;\n+            }\n+            ovs_mutex_unlock(&mutex);\n+        } else {\n+            struct ovs_list *node = NULL;\n+\n+            spin_lock(&spin);\n+            if (!ovs_list_is_empty(&list)) {\n+                node = ovs_list_pop_front(&list);\n+            }\n+            spin_unlock(&spin);\n+\n+            if (!node) {\n+                continue;\n+            }\n+\n+            elem = CONTAINER_OF(node, struct element, node.list);\n+            elem->mark = epoch;\n+            counter++;\n+        }\n+        if (epoch == UINT64_MAX) {\n+            epoch = 0;\n+        }\n+        epoch++;\n+    } while (working);\n+\n+    for (i = 0; i < n_threads; i++) {\n+        xpthread_join(threads[i], NULL);\n+    }\n+\n+    /* Elements might have been inserted before threads were joined. */\n+    LIST_FOR_EACH_POP (elem, node.list, &list) {\n+        elem->mark = epoch;\n+        counter++;\n+    }\n+\n+    if (use_mutex) {\n+        print_result(\" list(mutex)\", elapsed(&start));\n+    } else {\n+        print_result(\"  list(spin)\", elapsed(&start));\n+    }\n+\n+    use_mutex ? ovs_mutex_destroy(&mutex) : spin_lock_destroy(&spin);\n+    ovs_barrier_destroy(&barrier);\n+    free(threads);\n+\n+    work_complete = true;\n+    for (i = 0; i < n_elems; i++) {\n+        if (elements[i].mark == 0) {\n+            printf(\"Element %\" PRIuSIZE \" was never consumed.\\n\", i);\n+            work_complete = false;\n+        }\n+    }\n+    ovs_assert(work_complete);\n+    ovs_assert(counter == n_elems);\n+}\n+\n+struct guarded_list_aux {\n+    struct guarded_list *glist;\n+    atomic_uint thread_id;\n+};\n+\n+static void *\n+guarded_list_insert_thread(void *aux_)\n+{\n+    unsigned int n_elems_per_thread;\n+    struct element *th_elements;\n+    struct guarded_list_aux *aux = aux_;\n+    struct timeval start;\n+    unsigned int id;\n+    size_t i;\n+\n+    atomic_add(&aux->thread_id, 1u, &id);\n+    n_elems_per_thread = n_elems / n_threads;\n+    th_elements = &elements[id * n_elems_per_thread];\n+\n+    ovs_barrier_block(&barrier);\n+    xgettimeofday(&start);\n+\n+    for (i = 0; i < n_elems_per_thread; i++) {\n+        guarded_list_push_back(aux->glist, &th_elements[i].node.list, n_elems);\n+    }\n+\n+    thread_working_ms[id] = elapsed(&start);\n+    ovs_barrier_block(&barrier);\n+\n+    working = false;\n+\n+    return NULL;\n+}\n+\n+static void\n+benchmark_guarded_list(void)\n+{\n+    struct guarded_list_aux aux;\n+    struct ovs_list extracted;\n+    struct guarded_list glist;\n+    struct element *elem;\n+    struct timeval start;\n+    unsigned int counter;\n+    bool work_complete;\n+    pthread_t *threads;\n+    uint64_t epoch;\n+    size_t i;\n+\n+    memset(elements, 0, n_elems * sizeof *elements);\n+    memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);\n+\n+    guarded_list_init(&glist);\n+    ovs_list_init(&extracted);\n+\n+    aux.glist = &glist;\n+    atomic_store(&aux.thread_id, 0);\n+\n+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {\n+        guarded_list_push_back(&glist, &elements[i].node.list, n_elems);\n+    }\n+\n+    working = true;\n+\n+    threads = xmalloc(n_threads * sizeof *threads);\n+    ovs_barrier_init(&barrier, n_threads);\n+\n+    for (i = 0; i < n_threads; i++) {\n+        threads[i] = ovs_thread_create(\"guarded_list_insert\",\n+                                       guarded_list_insert_thread, &aux);\n+    }\n+\n+    xgettimeofday(&start);\n+\n+    counter = 0;\n+    epoch = 1;\n+    do {\n+        guarded_list_pop_all(&glist, &extracted);\n+        LIST_FOR_EACH_POP (elem, node.list, &extracted) {\n+            elem->mark = epoch;\n+            counter++;\n+        }\n+        if (epoch == UINT64_MAX) {\n+            epoch = 0;\n+        }\n+        epoch++;\n+    } while (working);\n+\n+    for (i = 0; i < n_threads; i++) {\n+        xpthread_join(threads[i], NULL);\n+    }\n+\n+    /* Elements might have been inserted before threads were joined. */\n+    guarded_list_pop_all(&glist, &extracted);\n+    LIST_FOR_EACH_POP (elem, node.list, &extracted) {\n+        elem->mark = epoch;\n+        counter++;\n+    }\n+\n+    print_result(\"guarded list\", elapsed(&start));\n+\n+    ovs_barrier_destroy(&barrier);\n+    free(threads);\n+    guarded_list_destroy(&glist);\n+\n+    work_complete = true;\n+    for (i = 0; i < n_elems; i++) {\n+        if (elements[i].mark == 0) {\n+            printf(\"Element %\" PRIuSIZE \" was never consumed.\\n\", i);\n+            work_complete = false;\n+        }\n+    }\n+    ovs_assert(work_complete);\n+    ovs_assert(counter == n_elems);\n+}\n+\n+static void\n+run_benchmarks(struct ovs_cmdl_context *ctx)\n+{\n+    long int l_threads;\n+    long int l_elems;\n+    size_t i;\n+\n+    ovsrcu_quiesce_start();\n+\n+    l_elems = strtol(ctx->argv[1], NULL, 10);\n+    l_threads = strtol(ctx->argv[2], NULL, 10);\n+    ovs_assert(l_elems > 0 && l_threads > 0);\n+\n+    n_elems = l_elems;\n+    n_threads = l_threads;\n+\n+    elements = xcalloc(n_elems, sizeof *elements);\n+    thread_working_ms = xcalloc(n_threads, sizeof *thread_working_ms);\n+\n+    printf(\"Benchmarking n=%u on 1 + %u threads.\\n\", n_elems, n_threads);\n+\n+    printf(\" type\\\\thread:  Reader \");\n+    for (i = 0; i < n_threads; i++) {\n+        printf(\"   %3\" PRIuSIZE \" \", i + 1);\n+    }\n+    printf(\"   Avg\\n\");\n+\n+    benchmark_mpsc_queue();\n+#ifdef HAVE_PTHREAD_SPIN_LOCK\n+    benchmark_list(false);\n+#endif\n+    benchmark_list(true);\n+    benchmark_guarded_list();\n+\n+    free(thread_working_ms);\n+    free(elements);\n+}\n+\f\n+static const struct ovs_cmdl_command commands[] = {\n+    {\"check\", NULL, 0, 0, run_tests, OVS_RO},\n+    {\"benchmark\", \"<nb elem> <nb threads>\", 2, 2, run_benchmarks, OVS_RO},\n+    {NULL, NULL, 0, 0, NULL, OVS_RO},\n+};\n+\n+static void\n+test_mpsc_queue_main(int argc, char *argv[])\n+{\n+    struct ovs_cmdl_context ctx = {\n+        .argc = argc - optind,\n+        .argv = argv + optind,\n+    };\n+\n+    vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);\n+\n+    set_program_name(argv[0]);\n+    ovs_cmdl_run_command(&ctx, commands);\n+}\n+\n+OVSTEST_REGISTER(\"test-mpsc-queue\", test_mpsc_queue_main);\n",
    "prefixes": [
        "ovs-dev",
        "v5",
        "12/27"
    ]
}