Patch Detail
get:
Show a patch.
patch:
Update a patch.
put:
Update a patch.
GET /api/patches/1525723/?format=api
http://patchwork.ozlabs.org/api/patches/1525723/?format=api", "web_url": "http://patchwork.ozlabs.org/project/openvswitch/patch/a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net/", "project": { "id": 47, "url": "http://patchwork.ozlabs.org/api/projects/47/?format=api", "name": "Open vSwitch", "link_name": "openvswitch", "list_id": "ovs-dev.openvswitch.org", "list_email": "ovs-dev@openvswitch.org", "web_url": "http://openvswitch.org/", "scm_url": "git@github.com:openvswitch/ovs.git", "webscm_url": "https://github.com/openvswitch/ovs", "list_archive_url": "", "list_archive_url_format": "", "commit_url_format": "" }, "msgid": "<a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net>", "list_archive_url": null, "date": "2021-09-08T09:47:36", "name": "[ovs-dev,v5,12/27] mpsc-queue: Module for lock-free message passing", "commit_ref": "5396ba5b21c4eae9726f4726786e53d317ccfdce", "pull_url": null, "state": "accepted", "archived": false, "hash": "bfa204f40b2c922fc1225ab1560bc4b5a6a8935c", "submitter": { "id": 78795, "url": "http://patchwork.ozlabs.org/api/people/78795/?format=api", "name": "Gaetan Rivet", "email": "grive@u256.net" }, "delegate": null, "mbox": "http://patchwork.ozlabs.org/project/openvswitch/patch/a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net/mbox/", "series": [ { "id": 261424, "url": "http://patchwork.ozlabs.org/api/series/261424/?format=api", "web_url": "http://patchwork.ozlabs.org/project/openvswitch/list/?series=261424", "date": "2021-09-08T09:47:24", "name": "dpif-netdev: Parallel offload processing", "version": 5, "mbox": "http://patchwork.ozlabs.org/series/261424/mbox/" } ], "comments": "http://patchwork.ozlabs.org/api/patches/1525723/comments/", "check": "warning", "checks": "http://patchwork.ozlabs.org/api/patches/1525723/checks/", "tags": {}, "related": [], "headers": { "Return-Path": "<ovs-dev-bounces@openvswitch.org>", "X-Original-To": [ "incoming@patchwork.ozlabs.org", "ovs-dev@openvswitch.org" ], "Delivered-To": [ "patchwork-incoming@bilbo.ozlabs.org", "ovs-dev@lists.linuxfoundation.org" ], "Authentication-Results": [ "ozlabs.org;\n\tdkim=fail reason=\"signature verification failed\" (2048-bit key;\n unprotected) header.d=u256.net header.i=@u256.net header.a=rsa-sha256\n header.s=fm2 header.b=Rb2k+LYw;\n\tdkim=fail reason=\"signature verification failed\" (2048-bit key;\n unprotected) header.d=messagingengine.com header.i=@messagingengine.com\n header.a=rsa-sha256 header.s=fm3 header.b=QdY2uD2B;\n\tdkim-atps=neutral", "ozlabs.org;\n spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org\n (client-ip=140.211.166.138; helo=smtp1.osuosl.org;\n envelope-from=ovs-dev-bounces@openvswitch.org; receiver=<UNKNOWN>)", "smtp4.osuosl.org (amavisd-new);\n dkim=pass (2048-bit key) header.d=u256.net header.b=\"Rb2k+LYw\";\n dkim=pass (2048-bit key) header.d=messagingengine.com\n header.b=\"QdY2uD2B\"" ], "Received": [ "from smtp1.osuosl.org (smtp1.osuosl.org [140.211.166.138])\n\t(using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)\n\t key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest\n SHA256)\n\t(No client certificate requested)\n\tby ozlabs.org (Postfix) with ESMTPS id 4H4HR41fpWz9sW8\n\tfor <incoming@patchwork.ozlabs.org>; Wed, 8 Sep 2021 19:49:20 +1000 (AEST)", "from localhost (localhost [127.0.0.1])\n\tby smtp1.osuosl.org (Postfix) with ESMTP id 4CC8483F0C;\n\tWed, 8 Sep 2021 09:49:18 +0000 (UTC)", "from smtp1.osuosl.org ([127.0.0.1])\n\tby localhost (smtp1.osuosl.org [127.0.0.1]) (amavisd-new, port 10024)\n\twith ESMTP id Li3P0W4W0n11; Wed, 8 Sep 2021 09:49:11 +0000 (UTC)", "from lists.linuxfoundation.org (lf-lists.osuosl.org\n [IPv6:2605:bc80:3010:104::8cd3:938])\n\tby smtp1.osuosl.org (Postfix) with ESMTPS id 7BA7483E58;\n\tWed, 8 Sep 2021 09:49:07 +0000 (UTC)", "from lf-lists.osuosl.org (localhost [127.0.0.1])\n\tby lists.linuxfoundation.org (Postfix) with ESMTP id 39541C0030;\n\tWed, 8 Sep 2021 09:49:05 +0000 (UTC)", "from smtp4.osuosl.org (smtp4.osuosl.org [140.211.166.137])\n by lists.linuxfoundation.org (Postfix) with ESMTP id 19791C0020\n for <ovs-dev@openvswitch.org>; Wed, 8 Sep 2021 09:49:03 +0000 (UTC)", "from localhost (localhost [127.0.0.1])\n by smtp4.osuosl.org (Postfix) with ESMTP id 2C626406C3\n for <ovs-dev@openvswitch.org>; Wed, 8 Sep 2021 09:48:24 +0000 (UTC)", "from smtp4.osuosl.org ([127.0.0.1])\n by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024)\n with ESMTP id O5tR8M7Rqfgs for <ovs-dev@openvswitch.org>;\n Wed, 8 Sep 2021 09:48:21 +0000 (UTC)", "from wout3-smtp.messagingengine.com (wout3-smtp.messagingengine.com\n [64.147.123.19])\n by smtp4.osuosl.org (Postfix) with ESMTPS id 45C6840794\n for <ovs-dev@openvswitch.org>; Wed, 8 Sep 2021 09:48:21 +0000 (UTC)", "from compute5.internal (compute5.nyi.internal [10.202.2.45])\n by mailout.west.internal (Postfix) with ESMTP id 8EF9A32009D5;\n Wed, 8 Sep 2021 05:48:20 -0400 (EDT)", "from mailfrontend2 ([10.202.2.163])\n by compute5.internal (MEProxy); Wed, 08 Sep 2021 05:48:20 -0400", "by mail.messagingengine.com (Postfix) with ESMTPA; Wed,\n 8 Sep 2021 05:48:19 -0400 (EDT)" ], "X-Virus-Scanned": [ "amavisd-new at osuosl.org", "amavisd-new at osuosl.org" ], "X-Greylist": "from auto-whitelisted by SQLgrey-1.8.0", "DKIM-Signature": [ "v=1; a=rsa-sha256; c=relaxed/relaxed; d=u256.net; h=from\n :to:cc:subject:date:message-id:in-reply-to:references\n :mime-version:content-transfer-encoding; s=fm2; bh=yhJtu1g26vkv8\n OyhYUY/gKFutzdKMMVNBqytIqHaUxE=; b=Rb2k+LYwWTo7wEDYfbeCsmz7n/GMo\n KR22YZGsZAV2rcqvVTUWsLjVM06ySM3dGdnDinJWs/HUqNiwD7IMxyk2BUTNdck9\n U3p5VwNWZVK1cnmCD8EUkH2S2DX7IGm8d40K3eQsiVktjDuTqLbIc6VHoFhf5QfU\n YqJuv4qH5JlZLum8xHM820pAsj3f9hgIwkAjLda4zJKZikITIW4ye2G7ADcLVa2p\n 7voAX7ZdursWlGi6ZDH2lXzsXRXxg/3Dn0IUIakN1SzN9a1ZH41rHBvR7aKMhneM\n iAcMZUmaKKtakjRHrb/G3ZSKjBGRe0xsRS8mJGo+QXCdAy+c42Fxg7Vyg==", "v=1; a=rsa-sha256; c=relaxed/relaxed; d=\n messagingengine.com; h=cc:content-transfer-encoding:date:from\n :in-reply-to:message-id:mime-version:references:subject:to\n :x-me-proxy:x-me-proxy:x-me-sender:x-me-sender:x-sasl-enc; s=\n fm3; bh=yhJtu1g26vkv8OyhYUY/gKFutzdKMMVNBqytIqHaUxE=; b=QdY2uD2B\n 6BIMU/TNCxL4CWCtwk7cgRunQJ0YFRDM/R/zvEXQ5ncdCmjwWl7Jbi76m8/J60LB\n BlO53CT3zhRl2FVzhUqUIWhzEuA/UxqKqL5KIFWgN5kgexlr/GTwYjvafARD/H0M\n z2lOCNjuSDMg5b1CrkvSBXeru250GT5jgfwX/IcROKVoEHlLHW4GO+Lz4ELSpTn1\n hSSkGeNgLe2DSYv1vhOdLXgYMO3+qYrrtAPHA2XhuKQtTo2idU+KwiQsoBk9HwWP\n fn8Nm3O3v50WY+w3xtLtBOFL897KtpYc04cEH2lr15dYFHlPAHmQLUDANq4yheIG\n XoA9wyXlRFbxPw==" ], "X-ME-Sender": "<xms:5IY4YRzhBWzadmApsveL7xWxm02j8jzCqPwYM9wkcwzr1rBwkQdw6w>\n <xme:5IY4YRQdhlyTCLcAGuLjQAuYhBb_KqFS2zyRv433dGYnNFmOMnTO6NwTfGnp9zbQx\n ulqCJAWkHDBPDD5hMI>", "X-ME-Received": "\n <xmr:5IY4YbWxCgGd9RGl6PUlxLTX7Iow31ziAQDRWFByKBhx6axIFGkbIKohAjvvWXsE37nsEkQKMcBzNUTM68Vda8vQzA>", "X-ME-Proxy-Cause": "\n gggruggvucftvghtrhhoucdtuddrgedvtddrudefjedgudekucetufdoteggodetrfdotf\n fvucfrrhhofhhilhgvmecuhfgrshhtofgrihhlpdfqfgfvpdfurfetoffkrfgpnffqhgen\n uceurghilhhouhhtmecufedttdenucesvcftvggtihhpihgvnhhtshculddquddttddmne\n cujfgurhephffvufffkffojghfggfgsedtkeertdertddtnecuhfhrohhmpefirggvthgr\n nhcutfhivhgvthcuoehgrhhivhgvsehuvdehiedrnhgvtheqnecuggftrfgrthhtvghrnh\n eptedthefhudeugffgffelhffffedtkeefudefjedvgffhgfdttdfgueeuhfffgfejnecu\n ffhomhgrihhnpegrphgrtghhvgdrohhrghdpuddtvdegtghorhgvshdrnhgvthdprhhotg\n hhvghsthgvrhdrvgguuhenucevlhhushhtvghrufhiiigvpedtnecurfgrrhgrmhepmhgr\n ihhlfhhrohhmpehgrhhivhgvsehuvdehiedrnhgvth", "X-ME-Proxy": "<xmx:5IY4YTi09RoQQR4vIWJ-GEG60k9uUq124F_D-uNJxkFECzn3cQiZVQ>\n <xmx:5IY4YTDQayr0eWlZN7J3Q8LRyOmHhOfliRcwT6hHKiaTIIM3dSIP2g>\n <xmx:5IY4YcJgGfrTcEc3Uphzkn7tu86CwsdJyPlp7UsUUGrNBylZ58fLcQ>\n <xmx:5IY4YY4OCgRMQnz4ZQ_P6A-m9Ythwu8jguyGm3Bza7HPiMYO_YGIHA>", "From": "Gaetan Rivet <grive@u256.net>", "To": "ovs-dev@openvswitch.org", "Date": "Wed, 8 Sep 2021 11:47:36 +0200", "Message-Id": "\n <a45007f0565d960d59b5dbd85368b848c7472675.1631094144.git.grive@u256.net>", "X-Mailer": "git-send-email 2.31.1", "In-Reply-To": "<cover.1631094144.git.grive@u256.net>", "References": "<cover.1631094144.git.grive@u256.net>", "MIME-Version": "1.0", "Cc": "Eli Britstein <elibr@nvidia.com>,\n Maxime Coquelin <maxime.coquelin@redhat.com>", "Subject": "[ovs-dev] [PATCH v5 12/27] mpsc-queue: Module for lock-free message\n\tpassing", "X-BeenThere": "ovs-dev@openvswitch.org", "X-Mailman-Version": "2.1.15", "Precedence": "list", "List-Id": "<ovs-dev.openvswitch.org>", "List-Unsubscribe": "<https://mail.openvswitch.org/mailman/options/ovs-dev>,\n <mailto:ovs-dev-request@openvswitch.org?subject=unsubscribe>", "List-Archive": "<http://mail.openvswitch.org/pipermail/ovs-dev/>", "List-Post": "<mailto:ovs-dev@openvswitch.org>", "List-Help": "<mailto:ovs-dev-request@openvswitch.org?subject=help>", "List-Subscribe": "<https://mail.openvswitch.org/mailman/listinfo/ovs-dev>,\n <mailto:ovs-dev-request@openvswitch.org?subject=subscribe>", "Content-Type": "text/plain; charset=\"us-ascii\"", "Content-Transfer-Encoding": "7bit", "Errors-To": "ovs-dev-bounces@openvswitch.org", "Sender": "\"dev\" <ovs-dev-bounces@openvswitch.org>" }, "content": "Add a lockless multi-producer/single-consumer (MPSC), linked-list based,\nintrusive, unbounded queue that does not require deferred memory\nmanagement.\n\nThe queue is designed to improve the specific MPSC setup. A benchmark\naccompanies the unit tests to measure the difference in this configuration.\nA single reader thread polls the queue while N writers enqueue elements\nas fast as possible. The mpsc-queue is compared against the regular ovs-list\nas well as the guarded list. The latter usually offers a slight improvement\nby batching the element removal, however the mpsc-queue is faster.\n\nThe average is of each producer threads time:\n\n $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1\n Benchmarking n=3000000 on 1 + 1 threads.\n type\\thread: Reader 1 Avg\n mpsc-queue: 167 167 167 ms\n list(spin): 89 80 80 ms\n list(mutex): 745 745 745 ms\n guarded list: 788 788 788 ms\n\n $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2\n Benchmarking n=3000000 on 1 + 2 threads.\n type\\thread: Reader 1 2 Avg\n mpsc-queue: 98 97 94 95 ms\n list(spin): 185 171 173 172 ms\n list(mutex): 203 199 203 201 ms\n guarded list: 269 269 188 228 ms\n\n $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3\n Benchmarking n=3000000 on 1 + 3 threads.\n type\\thread: Reader 1 2 3 Avg\n mpsc-queue: 76 76 65 76 72 ms\n list(spin): 246 110 240 238 196 ms\n list(mutex): 542 541 541 539 540 ms\n guarded list: 535 535 507 511 517 ms\n\n $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4\n Benchmarking n=3000000 on 1 + 4 threads.\n type\\thread: Reader 1 2 3 4 Avg\n mpsc-queue: 73 68 68 68 68 68 ms\n list(spin): 294 275 279 277 282 278 ms\n list(mutex): 346 309 287 345 302 310 ms\n guarded list: 378 319 334 378 351 345 ms\n\nSigned-off-by: Gaetan Rivet <grive@u256.net>\nReviewed-by: Eli Britstein <elibr@nvidia.com>\nReviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>\n---\n lib/automake.mk | 2 +\n lib/mpsc-queue.c | 251 +++++++++++++\n lib/mpsc-queue.h | 190 ++++++++++\n tests/automake.mk | 1 +\n tests/library.at | 5 +\n tests/test-mpsc-queue.c | 772 ++++++++++++++++++++++++++++++++++++++++\n 6 files changed, 1221 insertions(+)\n create mode 100644 lib/mpsc-queue.c\n create mode 100644 lib/mpsc-queue.h\n create mode 100644 tests/test-mpsc-queue.c", "diff": "diff --git a/lib/automake.mk b/lib/automake.mk\nindex 804c8da6f..098337078 100644\n--- a/lib/automake.mk\n+++ b/lib/automake.mk\n@@ -180,6 +180,8 @@ lib_libopenvswitch_la_SOURCES = \\\n \tlib/memory.h \\\n \tlib/meta-flow.c \\\n \tlib/mov-avg.h \\\n+\tlib/mpsc-queue.c \\\n+\tlib/mpsc-queue.h \\\n \tlib/multipath.c \\\n \tlib/multipath.h \\\n \tlib/namemap.c \\\ndiff --git a/lib/mpsc-queue.c b/lib/mpsc-queue.c\nnew file mode 100644\nindex 000000000..4e99c94f7\n--- /dev/null\n+++ b/lib/mpsc-queue.c\n@@ -0,0 +1,251 @@\n+/*\n+ * Copyright (c) 2021 NVIDIA Corporation.\n+ *\n+ * Licensed under the Apache License, Version 2.0 (the \"License\");\n+ * you may not use this file except in compliance with the License.\n+ * You may obtain a copy of the License at:\n+ *\n+ * http://www.apache.org/licenses/LICENSE-2.0\n+ *\n+ * Unless required by applicable law or agreed to in writing, software\n+ * distributed under the License is distributed on an \"AS IS\" BASIS,\n+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n+ * See the License for the specific language governing permissions and\n+ * limitations under the License.\n+ */\n+\n+#include <config.h>\n+\n+#include \"ovs-atomic.h\"\n+\n+#include \"mpsc-queue.h\"\n+\n+/* Multi-producer, single-consumer queue\n+ * =====================================\n+ *\n+ * This an implementation of the MPSC queue described by Dmitri Vyukov [1].\n+ *\n+ * One atomic exchange operation is done per insertion. Removal in most cases\n+ * will not require atomic operation and will use one atomic exchange to close\n+ * the queue chain.\n+ *\n+ * Insertion\n+ * =========\n+ *\n+ * The queue is implemented using a linked-list. Insertion is done at the\n+ * back of the queue, by swapping the current end with the new node atomically,\n+ * then pointing the previous end toward the new node. To follow Vyukov\n+ * nomenclature, the end-node of the chain is called head. A producer will\n+ * only manipulate the head.\n+ *\n+ * The head swap is atomic, however the link from the previous head to the new\n+ * one is done in a separate operation. This means that the chain is\n+ * momentarily broken, when the previous head still points to NULL and the\n+ * current head has been inserted.\n+ *\n+ * Considering a series of insertions, the queue state will remain consistent\n+ * and the insertions order is compatible with their precedence, thus the\n+ * queue is serializable. However, because an insertion consists in two\n+ * separate memory transactions, it is not linearizable.\n+ *\n+ * Removal\n+ * =======\n+ *\n+ * The consumer must deal with the queue inconsistency. It will manipulate\n+ * the tail of the queue and move it along the latest consumed elements.\n+ * When an end of the chain of elements is found (the next pointer is NULL),\n+ * the tail is compared with the head.\n+ *\n+ * If both points to different addresses, then the queue is in an inconsistent\n+ * state: the tail cannot move forward as the next is NULL, but the head is not\n+ * the last element in the chain: this can only happen if the chain is broken.\n+ *\n+ * In this case, the consumer must wait for the producer to finish writing the\n+ * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.\n+ *\n+ * Removal is thus in most cases (when there are elements in the queue)\n+ * accomplished without using atomics, until the last element of the queue.\n+ * There, the head is atomically loaded. If the queue is in a consistent state,\n+ * the head is moved back to the queue stub by inserting the stub in the queue:\n+ * ending the queue is the same as an insertion, which is one atomic XCHG.\n+ *\n+ * Forward guarantees\n+ * ==================\n+ *\n+ * Insertion and peeking are wait-free: they will execute in a known bounded\n+ * number of instructions, regardless of the state of the queue.\n+ *\n+ * However, while removal consists in peeking and a constant write to\n+ * update the tail, it can repeatedly fail until the queue become consistent.\n+ * It is thus dependent on other threads progressing. This means that the\n+ * queue forward progress is obstruction-free only. It has a potential for\n+ * livelocking.\n+ *\n+ * The chain will remain broken as long as a producer is not finished writing\n+ * its next pointer. If a producer is cancelled for example, the queue could\n+ * remain broken for any future readings. This queue should either be used\n+ * with cooperative threads or insertion must only be done outside cancellable\n+ * sections.\n+ *\n+ * Performances\n+ * ============\n+ *\n+ * In benchmarks this structure was better than alternatives such as:\n+ *\n+ * * A reversed Treiber stack [2], using 1 CAS per operations\n+ * and requiring reversal of the node list on removal.\n+ *\n+ * * Michael-Scott lock-free queue [3], using 2 CAS per operations.\n+ *\n+ * While it is not linearizable, this queue is well-suited for message passing.\n+ * If a proper hardware XCHG operation is used, it scales better than\n+ * CAS-based implementations.\n+ *\n+ * References\n+ * ==========\n+ *\n+ * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue\n+ *\n+ * [2]: R. K. Treiber. Systems programming: Coping with parallelism.\n+ * Technical Report RJ 5118, IBM Almaden Research Center, April 1986.\n+ *\n+ * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and\n+ * Blocking Concurrent Queue Algorithms\n+ * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html\n+ *\n+ */\n+\n+void\n+mpsc_queue_init(struct mpsc_queue *queue)\n+{\n+ atomic_store_relaxed(&queue->head, &queue->stub);\n+ atomic_store_relaxed(&queue->tail, &queue->stub);\n+ atomic_store_relaxed(&queue->stub.next, NULL);\n+\n+ ovs_mutex_init(&queue->read_lock);\n+}\n+\n+void\n+mpsc_queue_destroy(struct mpsc_queue *queue)\n+ OVS_EXCLUDED(queue->read_lock)\n+{\n+ ovs_mutex_destroy(&queue->read_lock);\n+}\n+\n+enum mpsc_queue_poll_result\n+mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)\n+ OVS_REQUIRES(queue->read_lock)\n+{\n+ struct mpsc_queue_node *tail;\n+ struct mpsc_queue_node *next;\n+ struct mpsc_queue_node *head;\n+\n+ atomic_read_relaxed(&queue->tail, &tail);\n+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+\n+ if (tail == &queue->stub) {\n+ if (next == NULL) {\n+ return MPSC_QUEUE_EMPTY;\n+ }\n+\n+ atomic_store_relaxed(&queue->tail, next);\n+ tail = next;\n+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+ }\n+\n+ if (next != NULL) {\n+ atomic_store_relaxed(&queue->tail, next);\n+ *node = tail;\n+ return MPSC_QUEUE_ITEM;\n+ }\n+\n+ atomic_read_explicit(&queue->head, &head, memory_order_acquire);\n+ if (tail != head) {\n+ return MPSC_QUEUE_RETRY;\n+ }\n+\n+ mpsc_queue_insert(queue, &queue->stub);\n+\n+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+ if (next != NULL) {\n+ atomic_store_relaxed(&queue->tail, next);\n+ *node = tail;\n+ return MPSC_QUEUE_ITEM;\n+ }\n+\n+ return MPSC_QUEUE_EMPTY;\n+}\n+\n+struct mpsc_queue_node *\n+mpsc_queue_pop(struct mpsc_queue *queue)\n+ OVS_REQUIRES(queue->read_lock)\n+{\n+ enum mpsc_queue_poll_result result;\n+ struct mpsc_queue_node *node;\n+\n+ do {\n+ result = mpsc_queue_poll(queue, &node);\n+ if (result == MPSC_QUEUE_EMPTY) {\n+ return NULL;\n+ }\n+ } while (result == MPSC_QUEUE_RETRY);\n+\n+ return node;\n+}\n+\n+void\n+mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node)\n+ OVS_REQUIRES(queue->read_lock)\n+{\n+ struct mpsc_queue_node *tail;\n+\n+ atomic_read_relaxed(&queue->tail, &tail);\n+ atomic_store_relaxed(&node->next, tail);\n+ atomic_store_relaxed(&queue->tail, node);\n+}\n+\n+struct mpsc_queue_node *\n+mpsc_queue_tail(struct mpsc_queue *queue)\n+ OVS_REQUIRES(queue->read_lock)\n+{\n+ struct mpsc_queue_node *tail;\n+ struct mpsc_queue_node *next;\n+\n+ atomic_read_relaxed(&queue->tail, &tail);\n+ atomic_read_explicit(&tail->next, &next, memory_order_acquire);\n+\n+ if (tail == &queue->stub) {\n+ if (next == NULL) {\n+ return NULL;\n+ }\n+\n+ atomic_store_relaxed(&queue->tail, next);\n+ tail = next;\n+ }\n+\n+ return tail;\n+}\n+\n+/* Get the next element of a node. */\n+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,\n+ struct mpsc_queue_node *prev)\n+ OVS_REQUIRES(queue->read_lock)\n+{\n+ struct mpsc_queue_node *next;\n+\n+ atomic_read_explicit(&prev->next, &next, memory_order_acquire);\n+ if (next == &queue->stub) {\n+ atomic_read_explicit(&next->next, &next, memory_order_acquire);\n+ }\n+ return next;\n+}\n+\n+void\n+mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)\n+{\n+ struct mpsc_queue_node *prev;\n+\n+ atomic_store_relaxed(&node->next, NULL);\n+ prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);\n+ atomic_store_explicit(&prev->next, node, memory_order_release);\n+}\ndiff --git a/lib/mpsc-queue.h b/lib/mpsc-queue.h\nnew file mode 100644\nindex 000000000..8c7109621\n--- /dev/null\n+++ b/lib/mpsc-queue.h\n@@ -0,0 +1,190 @@\n+/*\n+ * Copyright (c) 2021 NVIDIA Corporation.\n+ *\n+ * Licensed under the Apache License, Version 2.0 (the \"License\");\n+ * you may not use this file except in compliance with the License.\n+ * You may obtain a copy of the License at:\n+ *\n+ * http://www.apache.org/licenses/LICENSE-2.0\n+ *\n+ * Unless required by applicable law or agreed to in writing, software\n+ * distributed under the License is distributed on an \"AS IS\" BASIS,\n+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n+ * See the License for the specific language governing permissions and\n+ * limitations under the License.\n+ */\n+\n+#ifndef MPSC_QUEUE_H\n+#define MPSC_QUEUE_H 1\n+\n+#include <stdbool.h>\n+#include <stdint.h>\n+#include <stddef.h>\n+\n+#include <openvswitch/thread.h>\n+#include <openvswitch/util.h>\n+\n+#include \"ovs-atomic.h\"\n+\n+/* Multi-producer, single-consumer queue\n+ * =====================================\n+ *\n+ * This data structure is a lockless queue implementation with\n+ * the following properties:\n+ *\n+ * * Multi-producer: multiple threads can write concurrently.\n+ * Insertion in the queue is thread-safe, no inter-thread\n+ * synchronization is necessary.\n+ *\n+ * * Single-consumer: only a single thread can safely remove\n+ * nodes from the queue. The queue must be 'acquired' using\n+ * 'mpsc_queue_acquire()' before removing nodes.\n+ *\n+ * * Unbounded: the queue is backed by a linked-list and is not\n+ * limited in number of elements.\n+ *\n+ * * Intrusive: queue elements are allocated as part of larger\n+ * objects. Objects are retrieved by offset manipulation.\n+ *\n+ * * per-producer FIFO: Elements in the queue are kept in the\n+ * order their producer inserted them. The consumer retrieves\n+ * them in in the same insertion order. When multiple\n+ * producers insert at the same time, either will proceed.\n+ *\n+ * This queue is well-suited for message passing between threads,\n+ * where any number of thread can insert a message and a single\n+ * thread is meant to receive and process it.\n+ *\n+ * Thread-safety\n+ * =============\n+ *\n+ * The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.\n+ * Once the queue is protected against concurrent reads, the thread can call\n+ * the consumer API:\n+ *\n+ * * mpsc_queue_poll() to peek and return the tail of the queue\n+ * * mpsc_queue_pop() to remove the tail of the queue\n+ * * mpsc_queue_tail() to read the current tail\n+ * * mpsc_queue_push_front() to enqueue an element safely at the tail\n+ * * MPSC_QUEUE_FOR_EACH() to iterate over the current elements,\n+ * without removing them.\n+ * * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while\n+ * removing them.\n+ *\n+ * When a thread is finished with reading the queue, it can release the\n+ * reader lock using 'mpsc_queue_release()'.\n+ *\n+ * Producers can always insert elements in the queue, even if no consumer\n+ * acquired the reader lock. No inter-producer synchronization is needed.\n+ *\n+ * The consumer thread is also allowed to insert elements while it holds the\n+ * reader lock.\n+ *\n+ * Producer threads must never be cancelled while writing to the queue.\n+ * This will block the consumer, that will then lose any subsequent elements\n+ * in the queue. Producers should ideally be cooperatively managed or\n+ * the queue insertion should be within non-cancellable sections.\n+ *\n+ * Queue state\n+ * ===========\n+ *\n+ * When polling the queue, three states can be observed: 'empty', 'non-empty',\n+ * and 'inconsistent'. Three polling results are defined, respectively:\n+ *\n+ * * MPSC_QUEUE_EMPTY: the queue is empty.\n+ * * MPSC_QUEUE_ITEM: an item was available and has been removed.\n+ * * MPSC_QUEUE_RETRY: the queue is inconsistent.\n+ *\n+ * If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished\n+ * writing to the queue and the list of nodes is not coherent. The consumer\n+ * can retry shortly to check if the producer has finished.\n+ *\n+ * This behavior is the reason the removal function is called\n+ * 'mpsc_queue_poll()'.\n+ *\n+ */\n+\n+struct mpsc_queue_node {\n+ ATOMIC(struct mpsc_queue_node *) next;\n+};\n+\n+struct mpsc_queue {\n+ ATOMIC(struct mpsc_queue_node *) head;\n+ ATOMIC(struct mpsc_queue_node *) tail;\n+ struct mpsc_queue_node stub;\n+ struct ovs_mutex read_lock;\n+};\n+\n+#define MPSC_QUEUE_INITIALIZER(Q) { \\\n+ .head = ATOMIC_VAR_INIT(&(Q)->stub), \\\n+ .tail = ATOMIC_VAR_INIT(&(Q)->stub), \\\n+ .stub = { .next = ATOMIC_VAR_INIT(NULL) }, \\\n+ .read_lock = OVS_MUTEX_INITIALIZER, \\\n+}\n+\n+/* Consumer API. */\n+\n+/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */\n+void mpsc_queue_init(struct mpsc_queue *queue);\n+/* The reader lock must be released prior to destroying the queue. */\n+void mpsc_queue_destroy(struct mpsc_queue *queue);\n+\n+/* Acquire and release the consumer lock. */\n+#define mpsc_queue_acquire(q) do { \\\n+ ovs_mutex_lock(&(q)->read_lock); \\\n+ } while (0)\n+#define mpsc_queue_release(q) do { \\\n+ ovs_mutex_unlock(&(q)->read_lock); \\\n+ } while (0)\n+\n+enum mpsc_queue_poll_result {\n+ /* Queue is empty. */\n+ MPSC_QUEUE_EMPTY,\n+ /* Polling the queue returned an item. */\n+ MPSC_QUEUE_ITEM,\n+ /* Data has been enqueued but one or more producer thread have not\n+ * finished writing it. The queue is in an inconsistent state.\n+ * Retrying shortly, if the producer threads are still active, will\n+ * return the data.\n+ */\n+ MPSC_QUEUE_RETRY,\n+};\n+\n+/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is\n+ * returned, otherwise 'node' is not set.\n+ */\n+enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue,\n+ struct mpsc_queue_node **node)\n+ OVS_REQUIRES(queue->read_lock);\n+\n+/* Pop an element if there is any in the queue. */\n+struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue)\n+ OVS_REQUIRES(queue->read_lock);\n+\n+/* Insert at the front of the queue. Only the consumer can do it. */\n+void mpsc_queue_push_front(struct mpsc_queue *queue,\n+ struct mpsc_queue_node *node)\n+ OVS_REQUIRES(queue->read_lock);\n+\n+/* Get the current queue tail. */\n+struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue)\n+ OVS_REQUIRES(queue->read_lock);\n+\n+/* Get the next element of a node. */\n+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,\n+ struct mpsc_queue_node *prev)\n+ OVS_REQUIRES(queue->read_lock);\n+\n+#define MPSC_QUEUE_FOR_EACH(node, queue) \\\n+ for (node = mpsc_queue_tail(queue); node != NULL; \\\n+ node = mpsc_queue_next((queue), node))\n+\n+#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \\\n+ for (node = mpsc_queue_pop(queue); node != NULL; \\\n+ node = mpsc_queue_pop(queue))\n+\n+/* Producer API. */\n+\n+void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);\n+\n+#endif /* MPSC_QUEUE_H */\ndiff --git a/tests/automake.mk b/tests/automake.mk\nindex 977779765..8fdec27ef 100644\n--- a/tests/automake.mk\n+++ b/tests/automake.mk\n@@ -472,6 +472,7 @@ tests_ovstest_SOURCES = \\\n \ttests/test-list.c \\\n \ttests/test-lockfile.c \\\n \ttests/test-multipath.c \\\n+\ttests/test-mpsc-queue.c \\\n \ttests/test-netflow.c \\\n \ttests/test-odp.c \\\n \ttests/test-ofpbuf.c \\\ndiff --git a/tests/library.at b/tests/library.at\nindex 42a5ce1aa..661e95727 100644\n--- a/tests/library.at\n+++ b/tests/library.at\n@@ -265,3 +265,8 @@ AT_SKIP_IF([test \"$IS_WIN32\" = \"yes\"])\n AT_SKIP_IF([test \"$IS_BSD\" = \"yes\"])\n AT_CHECK([ovstest test-netlink-policy ll_addr], [0])\n AT_CLEANUP\n+\n+AT_SETUP([mpsc-queue module])\n+AT_CHECK([ovstest test-mpsc-queue check], [0], [....\n+])\n+AT_CLEANUP\ndiff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c\nnew file mode 100644\nindex 000000000..a38bf9e6d\n--- /dev/null\n+++ b/tests/test-mpsc-queue.c\n@@ -0,0 +1,772 @@\n+/*\n+ * Copyright (c) 2021 NVIDIA Corporation.\n+ *\n+ * Licensed under the Apache License, Version 2.0 (the \"License\");\n+ * you may not use this file except in compliance with the License.\n+ * You may obtain a copy of the License at:\n+ *\n+ * http://www.apache.org/licenses/LICENSE-2.0\n+ *\n+ * Unless required by applicable law or agreed to in writing, software\n+ * distributed under the License is distributed on an \"AS IS\" BASIS,\n+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n+ * See the License for the specific language governing permissions and\n+ * limitations under the License.\n+ */\n+\n+#undef NDEBUG\n+#include <assert.h>\n+#include <getopt.h>\n+#include <string.h>\n+\n+#include <config.h>\n+\n+#include \"command-line.h\"\n+#include \"guarded-list.h\"\n+#include \"mpsc-queue.h\"\n+#include \"openvswitch/list.h\"\n+#include \"openvswitch/util.h\"\n+#include \"openvswitch/vlog.h\"\n+#include \"ovs-rcu.h\"\n+#include \"ovs-thread.h\"\n+#include \"ovstest.h\"\n+#include \"timeval.h\"\n+#include \"util.h\"\n+\n+struct element {\n+ union {\n+ struct mpsc_queue_node mpscq;\n+ struct ovs_list list;\n+ } node;\n+ uint64_t mark;\n+};\n+\n+static void\n+test_mpsc_queue_mark_element(struct mpsc_queue_node *node,\n+ uint64_t mark,\n+ unsigned int *counter)\n+{\n+ struct element *elem;\n+\n+ elem = CONTAINER_OF(node, struct element, node.mpscq);\n+ elem->mark = mark;\n+ *counter += 1;\n+}\n+\n+static void\n+test_mpsc_queue_insert(void)\n+{\n+ struct element elements[100];\n+ struct mpsc_queue_node *node;\n+ struct mpsc_queue queue;\n+ unsigned int counter;\n+ size_t i;\n+\n+ memset(elements, 0, sizeof(elements));\n+ mpsc_queue_init(&queue);\n+ mpsc_queue_acquire(&queue);\n+\n+ for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+ mpsc_queue_insert(&queue, &elements[i].node.mpscq);\n+ }\n+\n+ counter = 0;\n+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+ test_mpsc_queue_mark_element(node, 1, &counter);\n+ }\n+\n+ mpsc_queue_release(&queue);\n+ mpsc_queue_destroy(&queue);\n+\n+ ovs_assert(counter == ARRAY_SIZE(elements));\n+ for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+ ovs_assert(elements[i].mark == 1);\n+ }\n+\n+ printf(\".\");\n+}\n+\n+static void\n+test_mpsc_queue_removal_fifo(void)\n+{\n+ struct element elements[100];\n+ struct mpsc_queue_node *node;\n+ struct mpsc_queue queue;\n+ unsigned int counter;\n+ size_t i;\n+\n+ memset(elements, 0, sizeof(elements));\n+\n+ mpsc_queue_init(&queue);\n+ mpsc_queue_acquire(&queue);\n+\n+ for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+ mpsc_queue_insert(&queue, &elements[i].node.mpscq);\n+ }\n+\n+ /* Elements are in the same order in the list as they\n+ * were declared / initialized.\n+ */\n+ counter = 0;\n+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+ test_mpsc_queue_mark_element(node, counter, &counter);\n+ }\n+\n+ /* The list is valid once extracted from the queue,\n+ * the queue can be destroyed here.\n+ */\n+ mpsc_queue_release(&queue);\n+ mpsc_queue_destroy(&queue);\n+\n+ for (i = 0; i < ARRAY_SIZE(elements) - 1; i++) {\n+ struct element *e1, *e2;\n+\n+ e1 = &elements[i];\n+ e2 = &elements[i + 1];\n+\n+ ovs_assert(e1->mark < e2->mark);\n+ }\n+\n+ printf(\".\");\n+}\n+\n+/* Partial insert:\n+ *\n+ * Those functions are 'mpsc_queue_insert()' divided in two parts.\n+ * They serve to test the behavior of the queue when forcing the potential\n+ * condition of a thread starting an insertion then yielding.\n+ */\n+static struct mpsc_queue_node *\n+mpsc_queue_insert_begin(struct mpsc_queue *queue, struct mpsc_queue_node *node)\n+{\n+ struct mpsc_queue_node *prev;\n+\n+ atomic_store_explicit(&node->next, NULL, memory_order_relaxed);\n+ prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);\n+ return prev;\n+}\n+\n+static void\n+mpsc_queue_insert_end(struct mpsc_queue_node *prev,\n+ struct mpsc_queue_node *node)\n+{\n+ atomic_store_explicit(&prev->next, node, memory_order_release);\n+}\n+\n+static void\n+test_mpsc_queue_insert_partial(void)\n+{\n+ struct element elements[10];\n+ struct mpsc_queue_node *prevs[ARRAY_SIZE(elements)];\n+ struct mpsc_queue_node *node;\n+ struct mpsc_queue queue, *q = &queue;\n+ size_t i;\n+\n+ mpsc_queue_init(q);\n+\n+ /* Insert the first half of elements entirely,\n+ * insert the second hald of elements partially.\n+ */\n+ for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+ elements[i].mark = i;\n+ if (i > ARRAY_SIZE(elements) / 2) {\n+ prevs[i] = mpsc_queue_insert_begin(q, &elements[i].node.mpscq);\n+ } else {\n+ prevs[i] = NULL;\n+ mpsc_queue_insert(q, &elements[i].node.mpscq);\n+ }\n+ }\n+\n+ mpsc_queue_acquire(q);\n+\n+ /* Verify that when the chain is broken, iterators will stop. */\n+ i = 0;\n+ MPSC_QUEUE_FOR_EACH (node, q) {\n+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+ ovs_assert(e == &elements[i]);\n+ i++;\n+ }\n+ ovs_assert(i < ARRAY_SIZE(elements));\n+\n+ for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+ if (prevs[i] != NULL) {\n+ mpsc_queue_insert_end(prevs[i], &elements[i].node.mpscq);\n+ }\n+ }\n+\n+ i = 0;\n+ MPSC_QUEUE_FOR_EACH (node, q) {\n+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+ ovs_assert(e == &elements[i]);\n+ i++;\n+ }\n+ ovs_assert(i == ARRAY_SIZE(elements));\n+\n+ MPSC_QUEUE_FOR_EACH_POP (node, q) {\n+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+ ovs_assert(e->mark == (unsigned int)(e - elements));\n+ }\n+\n+ mpsc_queue_release(q);\n+ mpsc_queue_destroy(q);\n+\n+ printf(\".\");\n+}\n+\n+static void\n+test_mpsc_queue_push_front(void)\n+{\n+ struct mpsc_queue queue, *q = &queue;\n+ struct mpsc_queue_node *node;\n+ struct element elements[10];\n+ size_t i;\n+\n+ mpsc_queue_init(q);\n+ mpsc_queue_acquire(q);\n+\n+ ovs_assert(mpsc_queue_pop(q) == NULL);\n+ mpsc_queue_push_front(q, &elements[0].node.mpscq);\n+ node = mpsc_queue_pop(q);\n+ ovs_assert(node == &elements[0].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == NULL);\n+\n+ mpsc_queue_push_front(q, &elements[0].node.mpscq);\n+ mpsc_queue_push_front(q, &elements[1].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == NULL);\n+\n+ mpsc_queue_push_front(q, &elements[1].node.mpscq);\n+ mpsc_queue_push_front(q, &elements[0].node.mpscq);\n+ mpsc_queue_insert(q, &elements[2].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == &elements[2].node.mpscq);\n+ ovs_assert(mpsc_queue_pop(q) == NULL);\n+\n+ for (i = 0; i < ARRAY_SIZE(elements); i++) {\n+ elements[i].mark = i;\n+ mpsc_queue_insert(q, &elements[i].node.mpscq);\n+ }\n+\n+ node = mpsc_queue_pop(q);\n+ mpsc_queue_push_front(q, node);\n+ ovs_assert(mpsc_queue_pop(q) == node);\n+ mpsc_queue_push_front(q, node);\n+\n+ i = 0;\n+ MPSC_QUEUE_FOR_EACH (node, q) {\n+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+ ovs_assert(e == &elements[i]);\n+ i++;\n+ }\n+ ovs_assert(i == ARRAY_SIZE(elements));\n+\n+ MPSC_QUEUE_FOR_EACH_POP (node, q) {\n+ struct element *e = CONTAINER_OF(node, struct element, node.mpscq);\n+ ovs_assert(e->mark == (unsigned int)(e - elements));\n+ }\n+\n+ mpsc_queue_release(q);\n+ mpsc_queue_destroy(q);\n+\n+ printf(\".\");\n+}\n+\n+static void\n+run_tests(struct ovs_cmdl_context *ctx OVS_UNUSED)\n+{\n+ /* Verify basic insertion. */\n+ test_mpsc_queue_insert();\n+ /* Test partial insertion. */\n+ test_mpsc_queue_insert_partial();\n+ /* Verify removal order is respected. */\n+ test_mpsc_queue_removal_fifo();\n+ /* Verify tail-end insertion works. */\n+ test_mpsc_queue_push_front();\n+ printf(\"\\n\");\n+}\n+\f\n+static struct element *elements;\n+static uint64_t *thread_working_ms; /* Measured work time. */\n+\n+static unsigned int n_threads;\n+static unsigned int n_elems;\n+\n+static struct ovs_barrier barrier;\n+static volatile bool working;\n+\n+static int\n+elapsed(const struct timeval *start)\n+{\n+ struct timeval end;\n+\n+ xgettimeofday(&end);\n+ return timeval_to_msec(&end) - timeval_to_msec(start);\n+}\n+\n+static void\n+print_result(const char *prefix, int reader_elapsed)\n+{\n+ uint64_t avg;\n+ size_t i;\n+\n+ avg = 0;\n+ for (i = 0; i < n_threads; i++) {\n+ avg += thread_working_ms[i];\n+ }\n+ avg /= n_threads;\n+ printf(\"%s: %6d\", prefix, reader_elapsed);\n+ for (i = 0; i < n_threads; i++) {\n+ printf(\" %6\" PRIu64, thread_working_ms[i]);\n+ }\n+ printf(\" %6\" PRIu64 \" ms\\n\", avg);\n+}\n+\n+struct mpscq_aux {\n+ struct mpsc_queue *queue;\n+ atomic_uint thread_id;\n+};\n+\n+static void *\n+mpsc_queue_insert_thread(void *aux_)\n+{\n+ unsigned int n_elems_per_thread;\n+ struct element *th_elements;\n+ struct mpscq_aux *aux = aux_;\n+ struct timeval start;\n+ unsigned int id;\n+ size_t i;\n+\n+ atomic_add(&aux->thread_id, 1u, &id);\n+ n_elems_per_thread = n_elems / n_threads;\n+ th_elements = &elements[id * n_elems_per_thread];\n+\n+ ovs_barrier_block(&barrier);\n+ xgettimeofday(&start);\n+\n+ for (i = 0; i < n_elems_per_thread; i++) {\n+ mpsc_queue_insert(aux->queue, &th_elements[i].node.mpscq);\n+ }\n+\n+ thread_working_ms[id] = elapsed(&start);\n+ ovs_barrier_block(&barrier);\n+\n+ working = false;\n+\n+ return NULL;\n+}\n+\n+static void\n+benchmark_mpsc_queue(void)\n+{\n+ struct mpsc_queue_node *node;\n+ struct mpsc_queue queue;\n+ struct timeval start;\n+ unsigned int counter;\n+ bool work_complete;\n+ pthread_t *threads;\n+ struct mpscq_aux aux;\n+ uint64_t epoch;\n+ size_t i;\n+\n+ memset(elements, 0, n_elems & sizeof *elements);\n+ memset(thread_working_ms, 0, n_threads & sizeof *thread_working_ms);\n+\n+ mpsc_queue_init(&queue);\n+\n+ aux.queue = &queue;\n+ atomic_store(&aux.thread_id, 0);\n+\n+ for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {\n+ mpsc_queue_insert(&queue, &elements[i].node.mpscq);\n+ }\n+\n+ working = true;\n+\n+ threads = xmalloc(n_threads * sizeof *threads);\n+ ovs_barrier_init(&barrier, n_threads);\n+\n+ for (i = 0; i < n_threads; i++) {\n+ threads[i] = ovs_thread_create(\"sc_queue_insert\",\n+ mpsc_queue_insert_thread, &aux);\n+ }\n+\n+ mpsc_queue_acquire(&queue);\n+ xgettimeofday(&start);\n+\n+ counter = 0;\n+ epoch = 1;\n+ do {\n+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+ test_mpsc_queue_mark_element(node, epoch, &counter);\n+ }\n+ if (epoch == UINT64_MAX) {\n+ epoch = 0;\n+ }\n+ epoch++;\n+ } while (working);\n+\n+ for (i = 0; i < n_threads; i++) {\n+ xpthread_join(threads[i], NULL);\n+ }\n+\n+ /* Elements might have been inserted before threads were joined. */\n+ while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {\n+ test_mpsc_queue_mark_element(node, epoch, &counter);\n+ }\n+\n+ print_result(\" mpsc-queue\", elapsed(&start));\n+\n+ mpsc_queue_release(&queue);\n+ mpsc_queue_destroy(&queue);\n+ ovs_barrier_destroy(&barrier);\n+ free(threads);\n+\n+ work_complete = true;\n+ for (i = 0; i < n_elems; i++) {\n+ if (elements[i].mark == 0) {\n+ printf(\"Element %\" PRIuSIZE \" was never consumed.\\n\", i);\n+ work_complete = false;\n+ }\n+ }\n+ ovs_assert(work_complete);\n+ ovs_assert(counter == n_elems);\n+}\n+\n+#ifdef HAVE_PTHREAD_SPIN_LOCK\n+#define spin_lock_type ovs_spin\n+#define spin_lock_init(l) ovs_spin_init(l)\n+#define spin_lock_destroy(l) ovs_spin_destroy(l)\n+#define spin_lock(l) ovs_spin_lock(l)\n+#define spin_unlock(l) ovs_spin_unlock(l)\n+#else\n+#define spin_lock_type ovs_mutex\n+#define spin_lock_init(l) ovs_mutex_init(l)\n+#define spin_lock_destroy(l) ovs_mutex_destroy(l)\n+#define spin_lock(l) ovs_mutex_lock(l)\n+#define spin_unlock(l) ovs_mutex_unlock(l)\n+#endif\n+\n+struct list_aux {\n+ struct ovs_list *list;\n+ struct ovs_mutex *mutex;\n+ struct spin_lock_type *spin;\n+ atomic_uint thread_id;\n+};\n+\n+static void *\n+locked_list_insert_main(void *aux_)\n+ OVS_NO_THREAD_SAFETY_ANALYSIS\n+{\n+ unsigned int n_elems_per_thread;\n+ struct element *th_elements;\n+ struct list_aux *aux = aux_;\n+ struct timeval start;\n+ unsigned int id;\n+ size_t i;\n+\n+ atomic_add(&aux->thread_id, 1u, &id);\n+ n_elems_per_thread = n_elems / n_threads;\n+ th_elements = &elements[id * n_elems_per_thread];\n+\n+ ovs_barrier_block(&barrier);\n+ xgettimeofday(&start);\n+\n+ for (i = 0; i < n_elems_per_thread; i++) {\n+ aux->mutex ? ovs_mutex_lock(aux->mutex)\n+ : spin_lock(aux->spin);\n+ ovs_list_push_front(aux->list, &th_elements[i].node.list);\n+ aux->mutex ? ovs_mutex_unlock(aux->mutex)\n+ : spin_unlock(aux->spin);\n+ }\n+\n+ thread_working_ms[id] = elapsed(&start);\n+ ovs_barrier_block(&barrier);\n+\n+ working = false;\n+\n+ return NULL;\n+}\n+\n+static void\n+benchmark_list(bool use_mutex)\n+{\n+ struct ovs_mutex mutex;\n+ struct spin_lock_type spin;\n+ struct ovs_list list;\n+ struct element *elem;\n+ struct timeval start;\n+ unsigned int counter;\n+ bool work_complete;\n+ pthread_t *threads;\n+ struct list_aux aux;\n+ uint64_t epoch;\n+ size_t i;\n+\n+ memset(elements, 0, n_elems * sizeof *elements);\n+ memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);\n+\n+ use_mutex ? ovs_mutex_init(&mutex) : spin_lock_init(&spin);\n+\n+ ovs_list_init(&list);\n+\n+ aux.list = &list;\n+ aux.mutex = use_mutex ? &mutex : NULL;\n+ aux.spin = use_mutex ? NULL : &spin;\n+ atomic_store(&aux.thread_id, 0);\n+\n+ for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {\n+ ovs_list_push_front(&list, &elements[i].node.list);\n+ }\n+\n+ working = true;\n+\n+ threads = xmalloc(n_threads * sizeof *threads);\n+ ovs_barrier_init(&barrier, n_threads);\n+\n+ for (i = 0; i < n_threads; i++) {\n+ threads[i] = ovs_thread_create(\"locked_list_insert\",\n+ locked_list_insert_main, &aux);\n+ }\n+\n+ xgettimeofday(&start);\n+\n+ counter = 0;\n+ epoch = 1;\n+ do {\n+ if (use_mutex) {\n+ ovs_mutex_lock(&mutex);\n+ LIST_FOR_EACH_POP (elem, node.list, &list) {\n+ elem->mark = epoch;\n+ counter++;\n+ }\n+ ovs_mutex_unlock(&mutex);\n+ } else {\n+ struct ovs_list *node = NULL;\n+\n+ spin_lock(&spin);\n+ if (!ovs_list_is_empty(&list)) {\n+ node = ovs_list_pop_front(&list);\n+ }\n+ spin_unlock(&spin);\n+\n+ if (!node) {\n+ continue;\n+ }\n+\n+ elem = CONTAINER_OF(node, struct element, node.list);\n+ elem->mark = epoch;\n+ counter++;\n+ }\n+ if (epoch == UINT64_MAX) {\n+ epoch = 0;\n+ }\n+ epoch++;\n+ } while (working);\n+\n+ for (i = 0; i < n_threads; i++) {\n+ xpthread_join(threads[i], NULL);\n+ }\n+\n+ /* Elements might have been inserted before threads were joined. */\n+ LIST_FOR_EACH_POP (elem, node.list, &list) {\n+ elem->mark = epoch;\n+ counter++;\n+ }\n+\n+ if (use_mutex) {\n+ print_result(\" list(mutex)\", elapsed(&start));\n+ } else {\n+ print_result(\" list(spin)\", elapsed(&start));\n+ }\n+\n+ use_mutex ? ovs_mutex_destroy(&mutex) : spin_lock_destroy(&spin);\n+ ovs_barrier_destroy(&barrier);\n+ free(threads);\n+\n+ work_complete = true;\n+ for (i = 0; i < n_elems; i++) {\n+ if (elements[i].mark == 0) {\n+ printf(\"Element %\" PRIuSIZE \" was never consumed.\\n\", i);\n+ work_complete = false;\n+ }\n+ }\n+ ovs_assert(work_complete);\n+ ovs_assert(counter == n_elems);\n+}\n+\n+struct guarded_list_aux {\n+ struct guarded_list *glist;\n+ atomic_uint thread_id;\n+};\n+\n+static void *\n+guarded_list_insert_thread(void *aux_)\n+{\n+ unsigned int n_elems_per_thread;\n+ struct element *th_elements;\n+ struct guarded_list_aux *aux = aux_;\n+ struct timeval start;\n+ unsigned int id;\n+ size_t i;\n+\n+ atomic_add(&aux->thread_id, 1u, &id);\n+ n_elems_per_thread = n_elems / n_threads;\n+ th_elements = &elements[id * n_elems_per_thread];\n+\n+ ovs_barrier_block(&barrier);\n+ xgettimeofday(&start);\n+\n+ for (i = 0; i < n_elems_per_thread; i++) {\n+ guarded_list_push_back(aux->glist, &th_elements[i].node.list, n_elems);\n+ }\n+\n+ thread_working_ms[id] = elapsed(&start);\n+ ovs_barrier_block(&barrier);\n+\n+ working = false;\n+\n+ return NULL;\n+}\n+\n+static void\n+benchmark_guarded_list(void)\n+{\n+ struct guarded_list_aux aux;\n+ struct ovs_list extracted;\n+ struct guarded_list glist;\n+ struct element *elem;\n+ struct timeval start;\n+ unsigned int counter;\n+ bool work_complete;\n+ pthread_t *threads;\n+ uint64_t epoch;\n+ size_t i;\n+\n+ memset(elements, 0, n_elems * sizeof *elements);\n+ memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);\n+\n+ guarded_list_init(&glist);\n+ ovs_list_init(&extracted);\n+\n+ aux.glist = &glist;\n+ atomic_store(&aux.thread_id, 0);\n+\n+ for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {\n+ guarded_list_push_back(&glist, &elements[i].node.list, n_elems);\n+ }\n+\n+ working = true;\n+\n+ threads = xmalloc(n_threads * sizeof *threads);\n+ ovs_barrier_init(&barrier, n_threads);\n+\n+ for (i = 0; i < n_threads; i++) {\n+ threads[i] = ovs_thread_create(\"guarded_list_insert\",\n+ guarded_list_insert_thread, &aux);\n+ }\n+\n+ xgettimeofday(&start);\n+\n+ counter = 0;\n+ epoch = 1;\n+ do {\n+ guarded_list_pop_all(&glist, &extracted);\n+ LIST_FOR_EACH_POP (elem, node.list, &extracted) {\n+ elem->mark = epoch;\n+ counter++;\n+ }\n+ if (epoch == UINT64_MAX) {\n+ epoch = 0;\n+ }\n+ epoch++;\n+ } while (working);\n+\n+ for (i = 0; i < n_threads; i++) {\n+ xpthread_join(threads[i], NULL);\n+ }\n+\n+ /* Elements might have been inserted before threads were joined. */\n+ guarded_list_pop_all(&glist, &extracted);\n+ LIST_FOR_EACH_POP (elem, node.list, &extracted) {\n+ elem->mark = epoch;\n+ counter++;\n+ }\n+\n+ print_result(\"guarded list\", elapsed(&start));\n+\n+ ovs_barrier_destroy(&barrier);\n+ free(threads);\n+ guarded_list_destroy(&glist);\n+\n+ work_complete = true;\n+ for (i = 0; i < n_elems; i++) {\n+ if (elements[i].mark == 0) {\n+ printf(\"Element %\" PRIuSIZE \" was never consumed.\\n\", i);\n+ work_complete = false;\n+ }\n+ }\n+ ovs_assert(work_complete);\n+ ovs_assert(counter == n_elems);\n+}\n+\n+static void\n+run_benchmarks(struct ovs_cmdl_context *ctx)\n+{\n+ long int l_threads;\n+ long int l_elems;\n+ size_t i;\n+\n+ ovsrcu_quiesce_start();\n+\n+ l_elems = strtol(ctx->argv[1], NULL, 10);\n+ l_threads = strtol(ctx->argv[2], NULL, 10);\n+ ovs_assert(l_elems > 0 && l_threads > 0);\n+\n+ n_elems = l_elems;\n+ n_threads = l_threads;\n+\n+ elements = xcalloc(n_elems, sizeof *elements);\n+ thread_working_ms = xcalloc(n_threads, sizeof *thread_working_ms);\n+\n+ printf(\"Benchmarking n=%u on 1 + %u threads.\\n\", n_elems, n_threads);\n+\n+ printf(\" type\\\\thread: Reader \");\n+ for (i = 0; i < n_threads; i++) {\n+ printf(\" %3\" PRIuSIZE \" \", i + 1);\n+ }\n+ printf(\" Avg\\n\");\n+\n+ benchmark_mpsc_queue();\n+#ifdef HAVE_PTHREAD_SPIN_LOCK\n+ benchmark_list(false);\n+#endif\n+ benchmark_list(true);\n+ benchmark_guarded_list();\n+\n+ free(thread_working_ms);\n+ free(elements);\n+}\n+\f\n+static const struct ovs_cmdl_command commands[] = {\n+ {\"check\", NULL, 0, 0, run_tests, OVS_RO},\n+ {\"benchmark\", \"<nb elem> <nb threads>\", 2, 2, run_benchmarks, OVS_RO},\n+ {NULL, NULL, 0, 0, NULL, OVS_RO},\n+};\n+\n+static void\n+test_mpsc_queue_main(int argc, char *argv[])\n+{\n+ struct ovs_cmdl_context ctx = {\n+ .argc = argc - optind,\n+ .argv = argv + optind,\n+ };\n+\n+ vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);\n+\n+ set_program_name(argv[0]);\n+ ovs_cmdl_run_command(&ctx, commands);\n+}\n+\n+OVSTEST_REGISTER(\"test-mpsc-queue\", test_mpsc_queue_main);\n", "prefixes": [ "ovs-dev", "v5", "12/27" ] }{ "id": 1525723, "url": "