Show a patch.

Update a patch.

Update a patch.

GET /api/patches/1524985/
Content-Type: application/json
Vary: Accept

    "id": 1524985,
    "url": "",
    "web_url": "",
    "project": {
        "id": 68,
        "url": "",
        "name": "Open Virtual Network development",
        "link_name": "ovn",
        "list_id": "",
        "list_email": "",
        "web_url": "",
        "scm_url": "",
        "webscm_url": "",
        "list_archive_url": "",
        "list_archive_url_format": "",
        "commit_url_format": ""
    "msgid": "<>",
    "list_archive_url": null,
    "date": "2021-09-06T14:59:47",
    "name": "[ovs-dev,v3,2/2] Add support for configuring parallelization via unixctl",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": false,
    "hash": "6ae5060ed8cc2873753be13930e87d6835ac85fe",
    "submitter": {
        "id": 71996,
        "url": "",
        "name": "Anton Ivanov",
        "email": ""
    "delegate": null,
    "mbox": "",
    "series": [
            "id": 261146,
            "url": "",
            "web_url": "",
            "date": "2021-09-06T14:59:46",
            "name": "[ovs-dev,v3,1/2] Make changes to the parallel processing API to allow pool sizing",
            "version": 3,
            "mbox": ""
    "comments": "",
    "check": "fail",
    "checks": "",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<>",
        "X-Original-To": [
        "Delivered-To": [
        "Authentication-Results": ";\n spf=pass (sender SPF authorized)\n (client-ip=2605:bc80:3010::138;;\n; receiver=<UNKNOWN>)",
        "Received": [
            "from ( [IPv6:2605:bc80:3010::138])\n\t(using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)\n\t key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest\n SHA256)\n\t(No client certificate requested)\n\tby (Postfix) with ESMTPS id 4H3BQb4Q8Sz9sW5\n\tfor <>; Tue,  7 Sep 2021 01:00:07 +1000 (AEST)",
            "from localhost (localhost [])\n\tby (Postfix) with ESMTP id 2D00181B53;\n\tMon,  6 Sep 2021 15:00:05 +0000 (UTC)",
            "from ([])\n\tby localhost ( []) (amavisd-new, port 10024)\n\twith ESMTP id 1PBS5iSOJ3Hv; Mon,  6 Sep 2021 15:00:03 +0000 (UTC)",
            "from ( [])\n\tby (Postfix) with ESMTPS id 7E98080F85;\n\tMon,  6 Sep 2021 15:00:02 +0000 (UTC)",
            "from (localhost [])\n\tby (Postfix) with ESMTP id 552AEC001D;\n\tMon,  6 Sep 2021 15:00:02 +0000 (UTC)",
            "from ( [])\n by (Postfix) with ESMTP id 7B341C0021\n for <>; Mon,  6 Sep 2021 14:59:58 +0000 (UTC)",
            "from localhost (localhost [])\n by (Postfix) with ESMTP id 5EF09401C3\n for <>; Mon,  6 Sep 2021 14:59:58 +0000 (UTC)",
            "from ([])\n by localhost ( []) (amavisd-new, port 10024)\n with ESMTP id MtDJH8I9BxyX for <>;\n Mon,  6 Sep 2021 14:59:57 +0000 (UTC)",
            "from ( [])\n by (Postfix) with ESMTPS id C7AB140109\n for <>; Mon,  6 Sep 2021 14:59:56 +0000 (UTC)",
            "from ([]\n\n by with esmtps\n (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256)\n (Exim 4.92) (envelope-from <>)\n id 1mNG6E-0001Pn-Dv\n for; Mon, 06 Sep 2021 14:59:54 +0000",
            "from ([])\n by with esmtp (Exim 4.92)\n (envelope-from <>)\n id 1mNG6A-0004ph-Pv; Mon, 06 Sep 2021 15:59:52 +0100"
        "X-Virus-Scanned": [
            "amavisd-new at",
            "amavisd-new at"
        "X-Greylist": "from auto-whitelisted by SQLgrey-1.8.0",
        "From": "",
        "To": "",
        "Date": "Mon,  6 Sep 2021 15:59:47 +0100",
        "Message-Id": "<>",
        "X-Mailer": "git-send-email 2.20.1",
        "In-Reply-To": "<>",
        "References": "<>",
        "MIME-Version": "1.0",
        "X-Clacks-Overhead": "GNU Terry Pratchett",
        "Cc": "Anton Ivanov <>",
        "Subject": "[ovs-dev] [OVN Patch v3 2/2] Add support for configuring\n\tparallelization via unixctl",
        "X-BeenThere": "",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "<>",
        "List-Unsubscribe": "<>,\n <>",
        "List-Archive": "<>",
        "List-Post": "<>",
        "List-Help": "<>",
        "List-Subscribe": "<>,\n <>",
        "Content-Type": "text/plain; charset=\"us-ascii\"",
        "Content-Transfer-Encoding": "7bit",
        "Errors-To": "",
        "Sender": "\"dev\" <>"
    "content": "From: Anton Ivanov <>\n\nSigned-off-by: Anton Ivanov <>\n---\n lib/ovn-parallel-hmap.c | 209 ++++++++++++++++++++++++++++++++++++++--\n lib/ovn-parallel-hmap.h |  63 +++++++++++-\n northd/ovn-northd.c     |  26 ++---\n tests/     |  16 ++-\n 4 files changed, 283 insertions(+), 31 deletions(-)",
    "diff": "diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c\nindex 30de457b5..8a055b2c6 100644\n--- a/lib/ovn-parallel-hmap.c\n+++ b/lib/ovn-parallel-hmap.c\n@@ -33,6 +33,7 @@\n #include \"ovs-thread.h\"\n #include \"ovs-numa.h\"\n #include \"random.h\"\n+#include \"unixctl.h\"\n \n VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);\n \n@@ -46,6 +47,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);\n  */\n static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);\n static bool can_parallelize = false;\n+static bool should_parallelize = false;\n \n /* This is set only in the process of exit and the set is\n  * accompanied by a fence. It does not need to be atomic or be\n@@ -85,6 +87,19 @@ ovn_stop_parallel_processing(struct worker_pool *pool)\n     return pool->workers_must_exit;\n }\n \n+bool\n+ovn_set_parallel_processing(bool enable)\n+{\n+    should_parallelize = enable;\n+    return can_parallelize;\n+}\n+\n+bool\n+ovn_get_parallel_processing(void)\n+{\n+    return can_parallelize && should_parallelize;\n+}\n+\n bool\n ovn_can_parallelize_hashes(bool force_parallel)\n {\n@@ -110,6 +125,7 @@ destroy_pool(struct worker_pool *pool) {\n     sem_close(pool->done);\n     sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);\n     sem_unlink(sem_name);\n+    free(pool->name);\n     free(pool);\n }\n \n@@ -120,6 +136,10 @@ ovn_resize_pool(struct worker_pool *pool, int size)\n \n     ovs_assert(pool != NULL);\n \n+    if (!pool->is_mutable) {\n+        return false;\n+    }\n+\n     if (!size) {\n         size = pool_size;\n     }\n@@ -159,7 +179,8 @@ cleanup:\n \n \n struct worker_pool *\n-ovn_add_worker_pool(void *(*start)(void *), int size)\n+ovn_add_worker_pool(void *(*start)(void *), int size, char *name,\n+                    bool is_mutable)\n {\n     struct worker_pool *new_pool = NULL;\n     bool test = false;\n@@ -187,6 +208,8 @@ ovn_add_worker_pool(void *(*start)(void *), int size)\n         new_pool = xmalloc(sizeof(struct worker_pool));\n         new_pool->size = size;\n         new_pool->start = start;\n+        new_pool->is_mutable = is_mutable;\n+        new_pool->name = xstrdup(name);\n         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);\n         new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);\n         if (new_pool->done == SEM_FAILED) {\n@@ -219,6 +242,7 @@ cleanup:\n         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);\n         sem_unlink(sem_name);\n     }\n+    free(new_pool->name);\n     ovs_mutex_unlock(&init_mutex);\n     return NULL;\n }\n@@ -267,13 +291,9 @@ ovn_fast_hmap_size_for(struct hmap *hmap, int size)\n /* Run a thread pool which uses a callback function to process results\n  */\n void\n-ovn_run_pool_callback(struct worker_pool *pool,\n-                      void *fin_result, void *result_frags,\n-                      void (*helper_func)(struct worker_pool *pool,\n-                                          void *fin_result,\n-                                          void *result_frags, int index))\n+ovn_start_pool(struct worker_pool *pool)\n {\n-    int index, completed;\n+    int index;\n \n     /* Ensure that all worker threads see the same data as the\n      * main thread.\n@@ -284,8 +304,19 @@ ovn_run_pool_callback(struct worker_pool *pool,\n     for (index = 0; index < pool->size; index++) {\n         sem_post(pool->controls[index].fire);\n     }\n+}\n+\n \n-    completed = 0;\n+/* Run a thread pool which uses a callback function to process results\n+ */\n+void\n+ovn_complete_pool_callback(struct worker_pool *pool,\n+                      void *fin_result, void *result_frags,\n+                      void (*helper_func)(struct worker_pool *pool,\n+                                          void *fin_result,\n+                                          void *result_frags, int index))\n+{\n+    int index, completed = 0;\n \n     do {\n         bool test;\n@@ -327,6 +358,18 @@ ovn_run_pool_callback(struct worker_pool *pool,\n         }\n     } while (completed < pool->size);\n }\n+/* Run a thread pool which uses a callback function to process results\n+ */\n+void\n+ovn_run_pool_callback(struct worker_pool *pool,\n+                      void *fin_result, void *result_frags,\n+                      void (*helper_func)(struct worker_pool *pool,\n+                                          void *fin_result,\n+                                          void *result_frags, int index))\n+{\n+    start_pool(pool);\n+    complete_pool_callback(pool, fin_result, result_frags, helper_func);\n+}\n \n /* Run a thread pool - basic, does not do results processing.\n  */\n@@ -373,6 +416,28 @@ ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)\n     inc->n = 0;\n }\n \n+/* Run a thread pool which gathers results in an array\n+ * of hashes. Merge results.\n+ */\n+void\n+ovn_complete_pool_hash(struct worker_pool *pool,\n+                  struct hmap *result,\n+                  struct hmap *result_frags)\n+{\n+    complete_pool_callback(pool, result, result_frags, merge_hash_results);\n+}\n+\n+/* Run a thread pool which gathers results in an array of lists.\n+ * Merge results.\n+ */\n+void\n+ovn_complete_pool_list(struct worker_pool *pool,\n+                  struct ovs_list *result,\n+                  struct ovs_list *result_frags)\n+{\n+    complete_pool_callback(pool, result, result_frags, merge_list_results);\n+}\n+\n /* Run a thread pool which gathers results in an array\n  * of hashes. Merge results.\n  */\n@@ -486,7 +551,7 @@ static struct worker_control *alloc_controls(int size)\n \n static void\n worker_pool_hook(void *aux OVS_UNUSED) {\n-    static struct worker_pool *pool;\n+    struct worker_pool *pool;\n     char sem_name[256];\n \n     /* All workers must honour the must_exit flag and check for it regularly.\n@@ -564,4 +629,130 @@ merge_hash_results(struct worker_pool *pool OVS_UNUSED,\n     hmap_destroy(&res_frags[index]);\n }\n \n+static void\n+ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc OVS_UNUSED,\n+                            const char *argv[], void *unused OVS_UNUSED)\n+{\n+\n+    struct worker_pool *pool;\n+    int value;\n+\n+    if (!str_to_int(argv[2], 10, &value)) {\n+        unixctl_command_reply_error(conn, \"invalid argument\");\n+        return;\n+    }\n+\n+    if (value > 0) {\n+        pool_size = value;\n+    }\n+    LIST_FOR_EACH (pool, list_node, &worker_pools) {\n+        if (strcmp(pool->name, argv[1]) == 0) {\n+            resize_pool(pool, value);\n+            unixctl_command_reply_error(conn, NULL);\n+        }\n+    }\n+    unixctl_command_reply_error(conn, \"pool not found\");\n+}\n+\n+static void\n+ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc OVS_UNUSED,\n+                           const char *argv[] OVS_UNUSED,\n+                           void *unused OVS_UNUSED)\n+{\n+\n+    char *reply = NULL;\n+    char *new_reply;\n+    char buf[256];\n+    struct worker_pool *pool;\n+\n+    LIST_FOR_EACH (pool, list_node, &worker_pools) {\n+        snprintf(buf, 255, \"%s : %d\\n\", pool->name, pool->size);\n+        if (reply) {\n+            new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);\n+            ovs_strlcpy(new_reply, reply, strlen(reply));\n+            strcat(new_reply, buf);\n+            free(reply);\n+        }\n+        reply = new_reply;\n+    }\n+    unixctl_command_reply(conn, reply);\n+}\n+\n+static void\n+ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc OVS_UNUSED,\n+                                const char *argv[], void *unused OVS_UNUSED)\n+{\n+    int value;\n+    bool result;\n+    if (!str_to_int(argv[1], 10, &value)) {\n+        unixctl_command_reply_error(conn, \"invalid argument\");\n+        return;\n+    }\n+\n+    if (!ovn_can_parallelize_hashes(true)) {\n+        unixctl_command_reply_error(conn, \"cannot enable parallel processing\");\n+        return;\n+    }\n+\n+    if (value > 0) {\n+        /* Change default pool size */\n+        ovs_mutex_lock(&init_mutex);\n+        pool_size = value;\n+        ovs_mutex_unlock(&init_mutex);\n+    }\n+\n+    result = ovn_set_parallel_processing(true);\n+    unixctl_command_reply(conn, result ? \"enabled\" : \"disabled\");\n+}\n+\n+static void\n+ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,\n+                                 int argc OVS_UNUSED,\n+                                 const char *argv[] OVS_UNUSED,\n+                                 void *unused OVS_UNUSED)\n+{\n+    ovn_set_parallel_processing(false);\n+    unixctl_command_reply(conn, NULL);\n+}\n+\n+static void\n+ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc OVS_UNUSED,\n+                                const char *argv[] OVS_UNUSED,\n+                                void *unused OVS_UNUSED)\n+{\n+    char status[256];\n+\n+    sprintf(status, \"%s, default pool size %d\",\n+            get_parallel_processing() ? \"active\" : \"inactive\",\n+            pool_size);\n+\n+    unixctl_command_reply(conn, status);\n+}\n+\n+void\n+ovn_parallel_thread_pools_init(void)\n+{\n+    bool test = false;\n+\n+    if (atomic_compare_exchange_strong(\n+            &initial_pool_setup,\n+            &test,\n+            true)) {\n+        ovs_mutex_lock(&init_mutex);\n+        setup_worker_pools(false);\n+        ovs_mutex_unlock(&init_mutex);\n+    }\n+\n+    unixctl_command_register(\"thread-pool/set-parallel-on\", \"N\", 1, 1,\n+                             ovn_thread_pool_set_parallel_on, NULL);\n+    unixctl_command_register(\"thread-pool/set-parallel-off\", \"\", 0, 0,\n+                             ovn_thread_pool_set_parallel_off, NULL);\n+    unixctl_command_register(\"thread-pool/status\", \"\", 0, 0,\n+                             ovn_thread_pool_parallel_status, NULL);\n+    unixctl_command_register(\"thread-pool/list\", \"\", 0, 0,\n+                             ovn_thread_pool_list_pools, NULL);\n+    unixctl_command_register(\"thread-pool/reload-pool\", \"Pool Threads\", 2, 2,\n+                             ovn_thread_pool_resize_pool, NULL);\n+}\n+\n #endif\ndiff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h\nindex 4708f41f2..403d37736 100644\n--- a/lib/ovn-parallel-hmap.h\n+++ b/lib/ovn-parallel-hmap.h\n@@ -33,6 +33,7 @@ extern \"C\" {\n #include \"openvswitch/hmap.h\"\n #include \"openvswitch/thread.h\"\n #include \"ovs-atomic.h\"\n+#include \"unixctl.h\"\n \n /* Process this include only if OVS does not supply parallel definitions\n  */\n@@ -93,6 +94,8 @@ struct worker_pool {\n     sem_t *done; /* Work completion semaphorew. */\n     void *(*start)(void *); /* Work function. */\n     bool workers_must_exit; /* Pool to be destroyed flag. */\n+    char *name; /* Name to be used in cli commands */\n+    bool is_mutable; /* Can the pool be reloaded with different params */\n };\n \n /* Add a worker pool for thread function start() which expects a pointer to\n@@ -101,7 +104,9 @@ struct worker_pool {\n  * size uses system defaults.\n  */\n \n-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);\n+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),\n+                                        int size, char *name,\n+                                        bool is_mutable);\n \n /* Setting this to true will make all processing threads exit */\n \n@@ -149,6 +154,35 @@ void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,\n                            void *fin_result, void *result_frags, int index));\n \n \n+/* Start a pool. Do not wait for any results. They will be collected\n+ * using the _complete_ functions.\n+ */\n+void ovn_start_pool(struct worker_pool *pool);\n+\n+/* Complete a pool run started using start_pool();\n+ * Merge results from hash frags into a final hash result.\n+ * The hash frags must be pre-sized to the same size.\n+ */\n+\n+void ovn_complete_pool_hash(struct worker_pool *pool,\n+                       struct hmap *result, struct hmap *result_frags);\n+\n+/* Complete a pool run started using start_pool();\n+ * Merge results from list frags into a final list result.\n+ */\n+\n+void ovn_complete_pool_list(struct worker_pool *pool,\n+                       struct ovs_list *result, struct ovs_list *result_frags);\n+\n+/* Complete a pool run started using start_pool();\n+ * Call a callback function to perform processing of results.\n+ */\n+\n+void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,\n+                           void *result_frags,\n+                           void (*helper_func)(struct worker_pool *pool,\n+                           void *fin_result, void *result_frags, int index));\n+\n /* Returns the first node in 'hmap' in the bucket in which the given 'hash'\n  * would land, or a null pointer if that bucket is empty. */\n \n@@ -259,10 +293,16 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)\n \n bool ovn_can_parallelize_hashes(bool force_parallel);\n \n+bool ovn_set_parallel_processing(bool enable);\n+\n+bool ovn_get_parallel_processing(void);\n+\n void ovn_destroy_pool(struct worker_pool *pool);\n \n bool ovn_resize_pool(struct worker_pool *pool, int size);\n \n+void ovn_parallel_thread_pools_init(void);\n+\n /* Use the OVN library functions for stuff which OVS has not defined\n  * If OVS has defined these, they will still compile using the OVN\n  * local names, but will be dropped by the linker in favour of the OVS\n@@ -273,9 +313,16 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);\n \n #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)\n \n+#define set_parallel_processing(enable) ovn_set_parallel_processing(enable)\n+\n+#define get_parallel_processing() ovn_get_parallel_processing()\n+\n+#define enable_parallel_processing() ovn_enable_parallel_processing()\n+\n #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)\n \n-#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)\n+#define add_worker_pool(start, size, name, is_mutable) \\\n+        ovn_add_worker_pool(start, size, name, is_mutable)\n \n #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)\n \n@@ -296,10 +343,22 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);\n #define run_pool_callback(pool, fin_result, result_frags, helper_func) \\\n     ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)\n \n+#define start_pool(pool) ovn_start_pool(pool)\n+\n+#define complete_pool_hash(pool, result, result_frags) \\\n+    ovn_complete_pool_hash(pool, result, result_frags)\n+\n+#define complete_pool_list(pool, result, result_frags) \\\n+    ovn_complete_pool_list(pool, result, result_frags)\n+\n+#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \\\n+    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func)\n+\n #define destroy_pool(pool) ovn_destroy_pool(pool)\n \n #define resize_pool(pool, size) ovn_resize_pool(pool, size)\n \n+#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()\n \n #ifdef __clang__\n #pragma clang diagnostic pop\ndiff --git a/northd/ovn-northd.c b/northd/ovn-northd.c\nindex 324800c32..2f7728646 100644\n--- a/northd/ovn-northd.c\n+++ b/northd/ovn-northd.c\n@@ -4358,7 +4358,6 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,\n /* If this option is 'true' northd will combine logical flows that differ by\n  * logical datapath only by creating a datapath group. */\n static bool use_logical_dp_groups = false;\n-static bool use_parallel_build = true;\n \n static struct hashrow_locks lflow_locks;\n \n@@ -4395,7 +4394,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,\n                    nullable_xstrdup(ctrl_meter),\n                    ovn_lflow_hint(stage_hint), where);\n     hmapx_add(&lflow->od_group, od);\n-    if (!use_parallel_build) {\n+    if (!get_parallel_processing()) {\n         hmap_insert(lflow_map, &lflow->hmap_node, hash);\n     } else {\n         hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);\n@@ -4414,7 +4413,7 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,\n     struct ovn_lflow *lflow;\n \n     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));\n-    if (use_logical_dp_groups && use_parallel_build) {\n+    if (use_logical_dp_groups && get_parallel_processing()) {\n         lock_hash_row(&lflow_locks, hash);\n         lflow = do_ovn_lflow_add(lflow_map, od, hash, stage, priority, match,\n                                  actions, io_port, stage_hint, where,\n@@ -4454,7 +4453,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,\n         return false;\n     }\n \n-    if (use_parallel_build) {\n+    if (get_parallel_processing()) {\n         lock_hash_row(&lflow_locks, hash);\n         hmapx_add(&lflow_ref->od_group, od);\n         unlock_hash_row(&lflow_locks, hash);\n@@ -12919,7 +12918,8 @@ static void\n init_lflows_thread_pool(void)\n {\n     if (!pool_init_done) {\n-        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);\n+        build_lflows_pool = add_worker_pool(build_lflows_thread, 0,\n+                                            \"lflows\", true);\n         pool_init_done = true;\n     }\n }\n@@ -12951,14 +12951,11 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,\n \n     char *svc_check_match = xasprintf(\"eth.dst == %s\", svc_monitor_mac);\n \n-    if (use_parallel_build) {\n+    if (get_parallel_processing()) {\n         init_lflows_thread_pool();\n-        if (!can_parallelize_hashes(false)) {\n-            use_parallel_build = false;\n-        }\n     }\n \n-    if (use_parallel_build) {\n+    if (get_parallel_processing()) {\n         struct hmap *lflow_segs;\n         struct lswitch_flow_build_info *lsiv;\n         int index;\n@@ -13154,7 +13151,7 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,\n     struct hmap lflows;\n \n     fast_hmap_size_for(&lflows, max_seen_lflow_size);\n-    if (use_parallel_build) {\n+    if (get_parallel_processing()) {\n         update_hashrow_locks(&lflows, &lflow_locks);\n     }\n     build_lswitch_and_lrouter_flows(datapaths, ports,\n@@ -14226,10 +14223,6 @@ ovnnb_db_run(struct northd_context *ctx,\n     northd_probe_interval_nb = get_probe_interval(ovnnb_db, nb);\n     northd_probe_interval_sb = get_probe_interval(ovnsb_db, nb);\n \n-    use_parallel_build =\n-        (smap_get_bool(&nb->options, \"use_parallel_build\", false) &&\n-         can_parallelize_hashes(false));\n-\n     use_logical_dp_groups = smap_get_bool(&nb->options,\n                                           \"use_logical_dp_groups\", true);\n     use_ct_inv_match = smap_get_bool(&nb->options,\n@@ -15144,8 +15137,9 @@ main(int argc, char *argv[])\n \n     daemonize_complete();\n \n+    ovn_parallel_thread_pools_init();\n+\n     init_hash_row_locks(&lflow_locks);\n-    use_parallel_build = can_parallelize_hashes(false);\n \n     /* We want to detect (almost) all changes to the ovn-nb db. */\n     struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(\ndiff --git a/tests/ b/tests/\nindex f06f2e68e..958ce18b0 100644\n--- a/tests/\n+++ b/tests/\n@@ -179,6 +179,18 @@ ovn_start_northd() {\n     test -d \"$ovs_base/$name\" || mkdir \"$ovs_base/$name\"\n     as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \\\n                --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB\n+    if test -z \"$USE_PARALLEL_THREADS\" ; then\n+        USE_PARALLEL_THREADS=0\n+    fi\n+\n+    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then\n+        case ${NORTHD_TYPE:=ovn-northd} in\n+            ovn-northd) ovs-appctl --timeout=10 --target northd$suffix/ovn-northd \\\n+                            thread-pool/set-parallel-on $USE_PARALLEL_THREADS\n+            ;;\n+        esac\n+    fi\n+\n }\n \n # ovn_start [--backup-northd=none|paused] [AZ]\n@@ -252,10 +264,6 @@ ovn_start () {\n     else\n         ovn-nbctl set NB_Global . options:use_logical_dp_groups=false\n     fi\n-\n-    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then\n-        ovn-nbctl set NB_Global . options:use_parallel_build=true\n-    fi\n }\n \n # Interconnection networks.\n",
    "prefixes": [