diff mbox series

[ovs-dev,v4] northd: add configuration option for enabling parallelization

Message ID 20220518100755.1087444-1-xsimonar@redhat.com
State Superseded
Headers show
Series [ovs-dev,v4] northd: add configuration option for enabling parallelization | expand

Checks

Context Check Description
ovsrobot/apply-robot warning apply and check: warning
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/github-robot-_ovn-kubernetes success github build: passed

Commit Message

Xavier Simonart May 18, 2022, 10:07 a.m. UTC
This patch is intended to change the way to enable northd lflow build
parallelization, as well as enable runtime change of number of threads.
Before this patch, the following was needed to use parallelization:
- enable parallelization through use_parallel_build in NBDB
- use --dummy-numa to select number of threads.
This second part was needed as otherwise as many threads as cores in the system
were used, while parallelization showed some performance improvement only until
using around 4 (or maybe 8) threads.

With this patch, the number of threads used for lflow parallel build can be
specified either:
- at startup, using --n-threads=<N> as ovn-northd command line option
- using unixctl
If the number of threads specified is > 1, then parallelization is enabled.
If the number is 1, parallelization is disabled.
If the number is < 1, parallelization is disabled at startup and a warning
is logged.
If the number is > 256, parallelization is enabled (with 256 threads) and
a warning is logged.

The following unixctl have been added:
- set-n-threads <N>: set the number of treads used.
- get-n-threads: returns the number of threads used
If the number of threads is within <2-256> bounds, parallelization is enabled.
If the number of thread is 1, parallelization is disabled.
Otherwise an error is thrown.

Note that, if set-n-threads failed for any reason (e.g. failure to setup some
semaphore), parallelization is disabled, and get-n-thread will return 1.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
Signed-off-by: Xavier Simonart <xsimonar@redhat.com>

---
v2:  - handled Dumitru's comments
     - added missing mutex_destroy
     - fixed issue when use_logical_dp_group is enabled after northd startup
     - rebased on top of main
v3:
     - fix mutex_destroy issue
v4:
     - handled Mark's comments
     - rebased on top of main
---
 NEWS                      |   7 +
 lib/ovn-parallel-hmap.c   | 291 +++++++++++++++++++++-----------------
 lib/ovn-parallel-hmap.h   |  30 ++--
 northd/northd.c           | 176 ++++++++++++-----------
 northd/northd.h           |   1 +
 northd/ovn-northd-ddlog.c |   6 -
 northd/ovn-northd.8.xml   |  70 +++++----
 northd/ovn-northd.c       |  68 ++++++++-
 tests/ovn-macros.at       |  59 ++------
 tests/ovn-northd.at       | 109 ++++++++++++++
 10 files changed, 495 insertions(+), 322 deletions(-)

Comments

0-day Robot May 18, 2022, 10:19 a.m. UTC | #1
Bleep bloop.  Greetings Xavier Simonart, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line lacks whitespace around operator
WARNING: Line lacks whitespace around operator
#1073 FILE: northd/ovn-northd.c:533:
  --n-threads=N             specify number of threads\n\

Lines checked: 1406, Warnings: 2, Errors: 0


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Dumitru Ceara May 18, 2022, 12:59 p.m. UTC | #2
On 5/18/22 12:07, Xavier Simonart wrote:
> This patch is intended to change the way to enable northd lflow build
> parallelization, as well as enable runtime change of number of threads.
> Before this patch, the following was needed to use parallelization:
> - enable parallelization through use_parallel_build in NBDB
> - use --dummy-numa to select number of threads.
> This second part was needed as otherwise as many threads as cores in the system
> were used, while parallelization showed some performance improvement only until
> using around 4 (or maybe 8) threads.
> 
> With this patch, the number of threads used for lflow parallel build can be
> specified either:
> - at startup, using --n-threads=<N> as ovn-northd command line option
> - using unixctl
> If the number of threads specified is > 1, then parallelization is enabled.
> If the number is 1, parallelization is disabled.
> If the number is < 1, parallelization is disabled at startup and a warning
> is logged.
> If the number is > 256, parallelization is enabled (with 256 threads) and
> a warning is logged.
> 
> The following unixctl have been added:
> - set-n-threads <N>: set the number of treads used.
> - get-n-threads: returns the number of threads used
> If the number of threads is within <2-256> bounds, parallelization is enabled.
> If the number of thread is 1, parallelization is disabled.
> Otherwise an error is thrown.
> 
> Note that, if set-n-threads failed for any reason (e.g. failure to setup some
> semaphore), parallelization is disabled, and get-n-thread will return 1.
> 
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
> Signed-off-by: Xavier Simonart <xsimonar@redhat.com>
> 
> ---
> v2:  - handled Dumitru's comments
>      - added missing mutex_destroy
>      - fixed issue when use_logical_dp_group is enabled after northd startup
>      - rebased on top of main
> v3:
>      - fix mutex_destroy issue
> v4:
>      - handled Mark's comments
>      - rebased on top of main
> ---
>  NEWS                      |   7 +
>  lib/ovn-parallel-hmap.c   | 291 +++++++++++++++++++++-----------------
>  lib/ovn-parallel-hmap.h   |  30 ++--
>  northd/northd.c           | 176 ++++++++++++-----------
>  northd/northd.h           |   1 +
>  northd/ovn-northd-ddlog.c |   6 -
>  northd/ovn-northd.8.xml   |  70 +++++----
>  northd/ovn-northd.c       |  68 ++++++++-
>  tests/ovn-macros.at       |  59 ++------
>  tests/ovn-northd.at       | 109 ++++++++++++++
>  10 files changed, 495 insertions(+), 322 deletions(-)
> 

Hi Xavier,

Sorry, I should've mentioned this earlier but I forgot.  Can you please
also add an option to ovn-ctl to allow specifiying the number of threads?

Something along the lines of:

ovn-ctl start_northd .. --ovn-northd-n-threads=42

This would allow CMSs that use ovn-ctl [0] to enable parallelization.
For ovn-org/ovn-kubernetes in particular, we could even make it such
that in the ovn-org/ovn repo the ovn-kubernetes CI run we do runs both
with and without northd parallelization.

Overall the code looks good but given that we probably also want the
ovn-ctl change I also left a few very small style comments below.

Thanks,
Dumitru

> diff --git a/NEWS b/NEWS
> index 244824e3f..6e489df32 100644
> --- a/NEWS
> +++ b/NEWS
> @@ -9,6 +9,13 @@ Post v22.03.0
>      implicit drop behavior on logical switches with ACLs applied.
>    - Support (LSP.options:qos_min_rate) to guarantee minimal bandwidth available
>      for a logical port.
> +  - Changed the way to enable northd parallelization.
> +    Removed support for:
> +    - use_parallel_build in NBDB.
> +    - --dummy-numa in northd cmdline.
> +    Added support for:
> +    -  --n-threads=<N> in northd cmdline.
> +    - set-n-threads/get-n-threads unixctls.
>  
>  OVN v22.03.0 - 11 Mar 2022
>  --------------------------
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> index 7edc4c0b6..c4d8cee16 100644
> --- a/lib/ovn-parallel-hmap.c
> +++ b/lib/ovn-parallel-hmap.c
> @@ -38,14 +38,10 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>  
>  #ifndef OVS_HAS_PARALLEL_HMAP
>  
> -#define WORKER_SEM_NAME "%x-%p-%x"
> +#define WORKER_SEM_NAME "%x-%p-%"PRIxSIZE
>  #define MAIN_SEM_NAME "%x-%p-main"
>  
> -/* These are accessed under mutex inside add_worker_pool().
> - * They do not need to be atomic.
> - */
>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> -static bool can_parallelize = false;
>  
>  /* This is set only in the process of exit and the set is
>   * accompanied by a fence. It does not need to be atomic or be
> @@ -57,18 +53,18 @@ static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>  
>  static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>  
> -static int pool_size;
> +static size_t pool_size = 1;
>  
>  static int sembase;
>  
>  static void worker_pool_hook(void *aux OVS_UNUSED);
> -static void setup_worker_pools(bool force);
> +static void setup_worker_pools(void);
>  static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>                                 void *fin_result, void *result_frags,
> -                               int index);
> +                               size_t index);
>  static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>                                 void *fin_result, void *result_frags,
> -                               int index);
> +                               size_t index);
>  
>  bool
>  ovn_stop_parallel_processing(void)
> @@ -76,107 +72,184 @@ ovn_stop_parallel_processing(void)
>      return workers_must_exit;
>  }
>  
> -bool
> -ovn_can_parallelize_hashes(bool force_parallel)
> +size_t
> +ovn_get_worker_pool_size(void)
>  {
> -    bool test = false;
> +    return pool_size;
> +}
>  
> -    if (atomic_compare_exchange_strong(
> -            &initial_pool_setup,
> -            &test,
> -            true)) {
> -        ovs_mutex_lock(&init_mutex);
> -        setup_worker_pools(force_parallel);
> -        ovs_mutex_unlock(&init_mutex);
> +static void
> +stop_controls(struct worker_pool *pool)
> +{
> +    if (pool->controls) {
> +        workers_must_exit = true;
> +
> +        /* unlock threads */

Nit: /* Unlock threads. */

> +        for (size_t i = 0; i < pool->size ; i++) {
> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_post(pool->controls[i].fire);
> +            }
> +        }
> +
> +        /* Wait completion */

Nit: /* Wait for completion. */

> +        for (size_t i = 0; i < pool->size ; i++) {
> +            if (pool->controls[i].worker) {
> +                pthread_join(pool->controls[i].worker, NULL);
> +                pool->controls[i].worker = 0;
> +            }
> +        }
> +        workers_must_exit = false;
>      }
> -    return can_parallelize;
>  }
>  
> -struct worker_pool *
> -ovn_add_worker_pool(void *(*start)(void *))
> +static void
> +free_controls(struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    if (pool->controls) {
> +        /* Close/unlink semaphores */

Nit: /* Close/unlink semaphores. */

> +        for (size_t i = 0; i < pool->size; i++) {
> +            ovs_mutex_destroy(&pool->controls[i].mutex);
> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_close(pool->controls[i].fire);
> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +                sem_unlink(sem_name);
> +            } else {
> +               /* This and following controls are not initialized */

Nit: indent needs one extra space.

> +                break;
> +            }
> +        }
> +        free(pool->controls);
> +        pool->controls = NULL;
> +    }
> +}
> +
> +static void
> +free_pool(struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    stop_controls(pool);
> +    free_controls(pool);
> +    if (pool->done != SEM_FAILED) {
> +        sem_close(pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +        sem_unlink(sem_name);
> +    }
> +    free(pool);
> +}
> +
> +static int
> +init_controls(struct worker_pool *pool)
>  {
> -    struct worker_pool *new_pool = NULL;
>      struct worker_control *new_control;
> +    char sem_name[256];
> +
> +    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
> +    for (size_t i = 0; i < pool->size ; i++) {
> +        pool->controls[i].fire = SEM_FAILED;
> +    }
> +    for (size_t i = 0; i < pool->size; i++) {
> +        new_control = &pool->controls[i];
> +        new_control->id = i;
> +        new_control->done = pool->done;
> +        new_control->data = NULL;
> +        new_control->pool = pool;
> +        new_control->worker = 0;
> +        ovs_mutex_init(&new_control->mutex);
> +        new_control->finished = ATOMIC_VAR_INIT(false);
> +        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +        if (new_control->fire == SEM_FAILED) {
> +            free_controls(pool);
> +            return -1;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static void
> +init_threads(struct worker_pool *pool, void *(*start)(void *))
> +{
> +    for (size_t i = 0; i < pool_size; i++) {
> +        pool->controls[i].worker =
> +            ovs_thread_create("worker pool helper", start, &pool->controls[i]);
> +    }
> +    ovs_list_push_back(&worker_pools, &pool->list_node);
> +}
> +
> +enum pool_update_status
> +ovn_update_worker_pool(size_t requested_pool_size,
> +                       struct worker_pool **pool, void *(*start)(void *))
> +{
>      bool test = false;
> -    int i;
>      char sem_name[256];
>  
> -    /* Belt and braces - initialize the pool system just in case if
> -     * if it is not yet initialized.
> -     */
> +    if (requested_pool_size == pool_size) {
> +        return POOL_UNCHANGED;
> +    }
> +
>      if (atomic_compare_exchange_strong(
>              &initial_pool_setup,
>              &test,
>              true)) {
>          ovs_mutex_lock(&init_mutex);
> -        setup_worker_pools(false);
> +        setup_worker_pools();
>          ovs_mutex_unlock(&init_mutex);
>      }
> -
>      ovs_mutex_lock(&init_mutex);
> -    if (can_parallelize) {
> -        new_pool = xmalloc(sizeof(struct worker_pool));
> -        new_pool->size = pool_size;
> -        new_pool->controls = NULL;
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> -        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -        if (new_pool->done == SEM_FAILED) {
> -            goto cleanup;
> -        }
> -
> -        new_pool->controls =
> -            xmalloc(sizeof(struct worker_control) * new_pool->size);
> -
> -        for (i = 0; i < new_pool->size; i++) {
> -            new_control = &new_pool->controls[i];
> -            new_control->id = i;
> -            new_control->done = new_pool->done;
> -            new_control->data = NULL;
> -            ovs_mutex_init(&new_control->mutex);
> -            new_control->finished = ATOMIC_VAR_INIT(false);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -            if (new_control->fire == SEM_FAILED) {
> +    pool_size = requested_pool_size;
> +    VLOG_INFO("Setting thread count to %"PRIuSIZE, pool_size);
> +
> +    if (*pool == NULL) {
> +        if (pool_size > 1) {
> +            VLOG_INFO("Creating new pool with size %"PRIuSIZE, pool_size);
> +            *pool = xmalloc(sizeof(struct worker_pool));
> +            (*pool)->size = pool_size;
> +            (*pool)->controls = NULL;
> +            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
> +            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +            if ((*pool)->done == SEM_FAILED) {
>                  goto cleanup;
>              }
> +            if (init_controls(*pool) == -1) {
> +                goto cleanup;
> +            }
> +            init_threads(*pool, start);
>          }
> -
> -        for (i = 0; i < pool_size; i++) {
> -            new_pool->controls[i].worker =
> -                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> +    } else {
> +        if (pool_size > 1) {
> +            VLOG_INFO("Changing size of existing pool to %"PRIuSIZE,
> +                      pool_size);
> +            stop_controls(*pool);
> +            free_controls(*pool);
> +            ovs_list_remove(&(*pool)->list_node);
> +            (*pool)->size = pool_size;
> +            if (init_controls(*pool) == -1) {
> +                goto cleanup;
> +            }
> +            init_threads(*pool, start);
> +        } else {
> +            VLOG_INFO("Deleting existing pool");
> +            worker_pool_hook(NULL);
> +            *pool = NULL;
>          }
> -        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>      }
>      ovs_mutex_unlock(&init_mutex);
> -    return new_pool;
> -cleanup:
> +    return POOL_UPDATED;
>  
> +cleanup:
>      /* Something went wrong when opening semaphores. In this case
>       * it is better to shut off parallel procesing altogether
>       */
> -
> -    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
> -    can_parallelize = false;
> -    if (new_pool->controls) {
> -        for (i = 0; i < new_pool->size; i++) {
> -            if (new_pool->controls[i].fire != SEM_FAILED) {
> -                sem_close(new_pool->controls[i].fire);
> -                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -                sem_unlink(sem_name);
> -                break; /* semaphores past this one are uninitialized */
> -            }
> -        }
> -    }
> -    if (new_pool->done != SEM_FAILED) {
> -        sem_close(new_pool->done);
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> -        sem_unlink(sem_name);
> -    }
> +    VLOG_ERR("Failed to initialize parallel processing: %s",
> +             ovs_strerror(errno));
> +    free_pool(*pool);
> +    *pool = NULL;
> +    pool_size = 1;
>      ovs_mutex_unlock(&init_mutex);
> -    return NULL;
> +    return POOL_UPDATE_FAILED;
>  }
>  
> -
>  /* Initializes 'hmap' as an empty hash table with mask N. */
>  void
>  ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> @@ -225,9 +298,9 @@ ovn_run_pool_callback(struct worker_pool *pool,
>                        void *fin_result, void *result_frags,
>                        void (*helper_func)(struct worker_pool *pool,
>                                            void *fin_result,
> -                                          void *result_frags, int index))
> +                                          void *result_frags, size_t index))
>  {
> -    int index, completed;
> +    size_t index, completed;
>  
>      /* Ensure that all worker threads see the same data as the
>       * main thread.
> @@ -367,9 +440,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>  
>  static void
>  worker_pool_hook(void *aux OVS_UNUSED) {
> -    int i;
>      static struct worker_pool *pool;
> -    char sem_name[256];
>  
>      workers_must_exit = true;
>  
> @@ -380,55 +451,15 @@ worker_pool_hook(void *aux OVS_UNUSED) {
>       */
>      atomic_thread_fence(memory_order_acq_rel);
>  
> -    /* Wake up the workers after the must_exit flag has been set */
> -
> -    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_post(pool->controls[i].fire);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            pthread_join(pool->controls[i].worker, NULL);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_close(pool->controls[i].fire);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> -            sem_unlink(sem_name);
> -        }
> -        sem_close(pool->done);
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> -        sem_unlink(sem_name);
> +    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
> +        ovs_list_remove(&pool->list_node);
> +        free_pool(pool);
>      }
>  }
>  
>  static void
> -setup_worker_pools(bool force) {
> -    int cores, nodes;
> -
> -    ovs_numa_init();
> -    nodes = ovs_numa_get_n_numas();
> -    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> -        nodes = 1;
> -    }
> -    cores = ovs_numa_get_n_cores();
> -
> -    /* If there is no NUMA config, use 4 cores.
> -     * If there is NUMA config use half the cores on
> -     * one node so that the OS does not start pushing
> -     * threads to other nodes.
> -     */
> -    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> -        /* If there is no NUMA we can try the ovs-threads routine.
> -         * It falls back to sysconf and/or affinity mask.
> -         */
> -        cores = count_cpu_cores();
> -        pool_size = cores;
> -    } else {
> -        pool_size = cores / nodes;
> -    }
> -    if ((pool_size < 4) && force) {
> -        pool_size = 4;
> -    }
> -    can_parallelize = (pool_size >= 3);
> +setup_worker_pools(void)
> +{
>      fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>      sembase = random_uint32();
>  }
> @@ -436,7 +467,7 @@ setup_worker_pools(bool force) {
>  static void
>  merge_list_results(struct worker_pool *pool OVS_UNUSED,
>                     void *fin_result, void *result_frags,
> -                   int index)
> +                   size_t index)
>  {
>      struct ovs_list *result = (struct ovs_list *)fin_result;
>      struct ovs_list *res_frags = (struct ovs_list *)result_frags;
> @@ -450,7 +481,7 @@ merge_list_results(struct worker_pool *pool OVS_UNUSED,
>  static void
>  merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>                     void *fin_result, void *result_frags,
> -                   int index)
> +                   size_t index)
>  {
>      struct hmap *result = (struct hmap *)fin_result;
>      struct hmap *res_frags = (struct hmap *)result_frags;
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> index 0f7d68770..72b31b489 100644
> --- a/lib/ovn-parallel-hmap.h
> +++ b/lib/ovn-parallel-hmap.h
> @@ -81,21 +81,30 @@ struct worker_control {
>      sem_t *done; /* Work completion semaphore - sem_post on completion. */
>      struct ovs_mutex mutex; /* Guards the data. */
>      void *data; /* Pointer to data to be processed. */
> -    void *workload; /* back-pointer to the worker pool structure. */
>      pthread_t worker;
> +    struct worker_pool *pool;
>  };
>  
>  struct worker_pool {
> -    int size;   /* Number of threads in the pool. */
> +    size_t size;   /* Number of threads in the pool. */
>      struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>      struct worker_control *controls; /* "Handles" in this pool. */
>      sem_t *done; /* Work completion semaphorew. */
>  };
>  
> -/* Add a worker pool for thread function start() which expects a pointer to
> - * a worker_control structure as an argument. */
> +/* Return pool size; bigger than 1 means parallelization has been enabled. */
> +size_t ovn_get_worker_pool_size(void);
>  
> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +enum pool_update_status {
> +     POOL_UNCHANGED,     /* no change to pool */
> +     POOL_UPDATED,       /* pool has been updated */
> +     POOL_UPDATE_FAILED, /* pool update failed; parallelization disabled */
> +};

Nit: I would add an empty line here.

> +/* Add/delete a worker pool for thread function start() which expects a pointer
> + * to a worker_control structure as an argument. Return true if updated */
> +enum pool_update_status ovn_update_worker_pool(size_t requested_pool_size,
> +                                               struct worker_pool **,
> +                                               void *(*start)(void *));
>  
>  /* Setting this to true will make all processing threads exit */
>  
> @@ -140,7 +149,8 @@ void ovn_run_pool_list(struct worker_pool *pool,
>  void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>                             void *result_frags,
>                             void (*helper_func)(struct worker_pool *pool,
> -                           void *fin_result, void *result_frags, int index));
> +                           void *fin_result, void *result_frags,
> +                           size_t index));
>  
>  
>  /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> @@ -251,17 +261,17 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>      hrl->row_locks = NULL;
>  }
>  
> -bool ovn_can_parallelize_hashes(bool force_parallel);
> -
>  /* Use the OVN library functions for stuff which OVS has not defined
>   * If OVS has defined these, they will still compile using the OVN
>   * local names, but will be dropped by the linker in favour of the OVS
>   * supplied functions.
>   */
> +#define update_worker_pool(requested_pool_size, existing_pool, func) \
> +    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
>  
> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
> +#define get_worker_pool_size() ovn_get_worker_pool_size()
>  
> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>  
>  #define stop_parallel_processing() ovn_stop_parallel_processing()
>  
> diff --git a/northd/northd.c b/northd/northd.c
> index 67c39df88..48426c801 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -59,6 +59,7 @@
>  VLOG_DEFINE_THIS_MODULE(northd);
>  
>  static bool controller_event_en;
> +static bool lflow_hash_lock_initialized = false;
>  
>  static bool check_lsp_is_up;
>  
> @@ -4740,7 +4741,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od,
>  /* If this option is 'true' northd will combine logical flows that differ by
>   * logical datapath only by creating a datapath group. */
>  static bool use_logical_dp_groups = false;
> -static bool use_parallel_build = false;
> +
> +enum {
> +    STATE_NULL,               /* parallelization is off */
> +    STATE_INIT_HASH_SIZES,    /* parallelization is on; hashes sizing needed */
> +    STATE_USE_PARALLELIZATION /* parallelization is on */
> +};
> +static int parallelization_state = STATE_NULL;
>  
>  static void
>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
> @@ -4759,7 +4766,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>      lflow->ctrl_meter = ctrl_meter;
>      lflow->dpg = NULL;
>      lflow->where = where;
> -    if (use_parallel_build && use_logical_dp_groups) {
> +    if ((parallelization_state != STATE_NULL)
> +        && use_logical_dp_groups) {
>          ovs_mutex_init(&lflow->odg_lock);
>      }
>  }
> @@ -4773,7 +4781,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
>          return false;
>      }
>  
> -    if (use_parallel_build) {
> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>          ovs_mutex_lock(&lflow_ref->odg_lock);
>          hmapx_add(&lflow_ref->od_group, od);
>          ovs_mutex_unlock(&lflow_ref->odg_lock);
> @@ -4803,9 +4811,23 @@ static struct ovs_mutex lflow_hash_locks[LFLOW_HASH_LOCK_MASK + 1];
>  static void
>  lflow_hash_lock_init(void)
>  {
> -    for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> -        ovs_mutex_init(&lflow_hash_locks[i]);
> +    if (!lflow_hash_lock_initialized) {
> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> +            ovs_mutex_init(&lflow_hash_locks[i]);
> +        }
> +        lflow_hash_lock_initialized = true;
> +    }
> +}
> +
> +static void
> +lflow_hash_lock_destroy(void)
> +{
> +    if (lflow_hash_lock_initialized) {
> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> +            ovs_mutex_destroy(&lflow_hash_locks[i]);
> +        }
>      }
> +    lflow_hash_lock_initialized = false;
>  }
>  
>  /* This thread-local var is used for parallel lflow building when dp-groups is
> @@ -4853,7 +4875,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
>                     nullable_xstrdup(ctrl_meter),
>                     ovn_lflow_hint(stage_hint), where);
>      hmapx_add(&lflow->od_group, od);
> -    if (!use_parallel_build) {
> +    if (parallelization_state != STATE_USE_PARALLELIZATION) {
>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>      } else {
>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> @@ -4896,7 +4918,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
>      struct ovn_lflow *lflow;
>  
>      ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
> -    if (use_logical_dp_groups && use_parallel_build) {
> +    if (use_logical_dp_groups
> +        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>                                      match, actions, io_port, stage_hint, where,
>                                      ctrl_meter);
> @@ -4982,6 +5005,10 @@ static void
>  ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
>  {
>      if (lflow) {
> +        if ((parallelization_state != STATE_NULL)
> +            && use_logical_dp_groups) {
> +            ovs_mutex_destroy(&lflow->odg_lock);
> +        }
>          if (lflows) {
>              hmap_remove(lflows, &lflow->hmap_node);
>          }
> @@ -13925,15 +13952,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>                                        &lsi->actions);
>  }
>  
> -struct lflows_thread_pool {
> -    struct worker_pool *pool;
> -};
> -
>  static void *
>  build_lflows_thread(void *arg)
>  {
>      struct worker_control *control = (struct worker_control *) arg;
> -    struct lflows_thread_pool *workload;
>      struct lswitch_flow_build_info *lsi;
>  
>      struct ovn_datapath *od;
> @@ -13944,17 +13966,16 @@ build_lflows_thread(void *arg)
>  
>      while (!stop_parallel_processing()) {
>          wait_for_work(control);
> -        workload = (struct lflows_thread_pool *) control->workload;
>          lsi = (struct lswitch_flow_build_info *) control->data;
>          if (stop_parallel_processing()) {
>              return NULL;
>          }
>          thread_lflow_counter = 0;
> -        if (lsi && workload) {
> +        if (lsi) {
>              /* Iterate over bucket ThreadID, ThreadID+size, ... */
>              for (bnum = control->id;
>                      bnum <= lsi->datapaths->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
>                      if (stop_parallel_processing()) {
> @@ -13965,7 +13986,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->ports->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
>                      if (stop_parallel_processing()) {
> @@ -13976,7 +13997,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->lbs->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
>                      if (stop_parallel_processing()) {
> @@ -13997,7 +14018,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->igmp_groups->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (
>                          igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> @@ -14016,39 +14037,13 @@ build_lflows_thread(void *arg)
>      return NULL;
>  }
>  
> -static bool pool_init_done = false;
> -static struct lflows_thread_pool *build_lflows_pool = NULL;
> -
> -static void
> -init_lflows_thread_pool(void)
> -{
> -    int index;
> -
> -    if (!pool_init_done) {
> -        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> -        pool_init_done = true;
> -        if (pool) {
> -            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
> -            build_lflows_pool->pool = pool;
> -            for (index = 0; index < build_lflows_pool->pool->size; index++) {
> -                build_lflows_pool->pool->controls[index].workload =
> -                    build_lflows_pool;
> -            }
> -        }
> -    }
> -}
> -
> -/* TODO: replace hard cutoffs by configurable via commands. These are
> - * temporary defines to determine single-thread to multi-thread processing
> - * cutoff.
> - * Setting to 1 forces "all parallel" lflow build.
> - */
> +static struct worker_pool *build_lflows_pool = NULL;
>  
>  static void
>  noop_callback(struct worker_pool *pool OVS_UNUSED,
>                void *fin_result OVS_UNUSED,
>                void *result_frags OVS_UNUSED,
> -              int index OVS_UNUSED)
> +              size_t index OVS_UNUSED)
>  {
>      /* Do nothing */
>  }
> @@ -14088,28 +14083,21 @@ build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
>  
>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>  
> -    if (use_parallel_build) {
> -        init_lflows_thread_pool();
> -        if (!can_parallelize_hashes(false)) {
> -            use_parallel_build = false;
> -        }
> -    }
> -
> -    if (use_parallel_build) {
> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>          struct hmap *lflow_segs;
>          struct lswitch_flow_build_info *lsiv;
>          int index;
>  
> -        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
>          if (use_logical_dp_groups) {
>              lflow_segs = NULL;
>          } else {
> -            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
> +            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
>          }
>  
>          /* Set up "work chunks" for each thread to work on. */
>  
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              if (use_logical_dp_groups) {
>                  /* if dp_groups are in use we lock a shared lflows hash
>                   * on a per-bucket level instead of merging hash frags */
> @@ -14132,19 +14120,19 @@ build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
>              ds_init(&lsiv[index].match);
>              ds_init(&lsiv[index].actions);
>  
> -            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +            build_lflows_pool->controls[index].data = &lsiv[index];
>          }
>  
>          /* Run thread pool. */
>          if (use_logical_dp_groups) {
> -            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
> +            run_pool_callback(build_lflows_pool, NULL, NULL,
>                                noop_callback);
> -            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
> +            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
>          } else {
> -            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
>          }
>  
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              ds_destroy(&lsiv[index].match);
>              ds_destroy(&lsiv[index].actions);
>          }
> @@ -14280,8 +14268,39 @@ ovn_sb_set_lflow_logical_dp_group(
>  }
>  
>  static ssize_t max_seen_lflow_size = 128;
> -static bool needs_parallel_init = true;
> -static bool reset_parallel = false;
> +
> +void run_update_worker_pool(int n_threads)
> +{
> +    /* If number of threads has been updated (or initially set),
> +     * update the worker pool. */
> +    if (update_worker_pool(n_threads, &build_lflows_pool,
> +                           build_lflows_thread) != POOL_UNCHANGED) {
> +        /* worker pool was updated */
> +        if (get_worker_pool_size() <= 1) {
> +            /* destroy potentially created lflow_hash_lock */
> +            lflow_hash_lock_destroy();
> +            parallelization_state = STATE_NULL;
> +        } else if (parallelization_state != STATE_USE_PARALLELIZATION) {
> +            if (use_logical_dp_groups) {
> +                lflow_hash_lock_init();
> +                parallelization_state = STATE_INIT_HASH_SIZES;
> +            } else {
> +                parallelization_state = STATE_USE_PARALLELIZATION;
> +            }
> +        }
> +    }
> +}
> +
> +static void worker_pool_init_for_ldp(void)
> +{
> +    /* If parallelization is enabled, make sure locks are initialized
> +     * when ldp are used.
> +     */
> +    if (parallelization_state != STATE_NULL) {
> +        lflow_hash_lock_init();
> +        parallelization_state = STATE_INIT_HASH_SIZES;
> +    }
> +}
>  
>  static void
>  build_mcast_groups(struct lflow_input *data,
> @@ -14302,30 +14321,18 @@ void build_lflows(struct lflow_input *input_data,
>      build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
>                         &mcast_groups, &igmp_groups);
>  
> -    if (reset_parallel) {
> -        /* Parallel build was disabled before, we need to
> -         * re-enable it. */
> -        use_parallel_build = true;
> -        reset_parallel = false;
> -    }
> -
>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>  
> -    if (use_parallel_build && use_logical_dp_groups &&
> -        needs_parallel_init) {
> -        lflow_hash_lock_init();
> -        needs_parallel_init = false;
> -        /* Disable parallel build on first run with dp_groups
> -         * to determine the correct sizing of hashes. */
> -        use_parallel_build = false;
> -        reset_parallel = true;
> -    }
>      build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
>                                      input_data->port_groups, &lflows,
>                                      &mcast_groups, &igmp_groups,
>                                      input_data->meter_groups, input_data->lbs,
>                                      input_data->bfd_connections);
>  
> +    if (parallelization_state == STATE_INIT_HASH_SIZES) {
> +        parallelization_state = STATE_USE_PARALLELIZATION;
> +    }
> +
>      /* Parallel build may result in a suboptimal hash. Resize the
>       * hash to a correct size before doing lookups */
>  
> @@ -15567,12 +15574,13 @@ ovnnb_db_run(struct northd_input *input_data,
>  
>      smap_destroy(&options);
>  
> -    use_parallel_build =
> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
> -         can_parallelize_hashes(false));
> -
> +    bool old_use_ldp = use_logical_dp_groups;
>      use_logical_dp_groups = smap_get_bool(&nb->options,
>                                            "use_logical_dp_groups", true);
> +    if (use_logical_dp_groups && !old_use_ldp) {
> +        worker_pool_init_for_ldp();
> +    }
> +
>      use_ct_inv_match = smap_get_bool(&nb->options,
>                                       "use_ct_inv_match", true);
>  
> diff --git a/northd/northd.h b/northd/northd.h
> index 2d804a22e..fe8dad03a 100644
> --- a/northd/northd.h
> +++ b/northd/northd.h
> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data,
>                       struct hmap *bfd_connections, struct hmap *ports);
>  void bfd_cleanup_connections(struct lflow_input *input_data,
>                               struct hmap *bfd_map);
> +void run_update_worker_pool(int n_threads);
>  
>  #endif /* NORTHD_H */
> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
> index 718d052cc..e9afda4c6 100644
> --- a/northd/ovn-northd-ddlog.c
> +++ b/northd/ovn-northd-ddlog.c
> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>          SSL_OPTION_ENUMS,
>          OPT_DRY_RUN,
>          OPT_DDLOG_RECORD,
> -        OPT_DUMMY_NUMA,
>      };
>      static const struct option long_options[] = {
>          {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>          OVN_DAEMON_LONG_OPTIONS,
>          VLOG_LONG_OPTIONS,
>          STREAM_SSL_LONG_OPTIONS,
> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>          {NULL, 0, NULL, 0},
>      };
>      char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>              *pause = true;
>              break;
>  
> -        case OPT_DUMMY_NUMA:
> -            ovs_numa_set_dummy(optarg);
> -            break;
> -
>          case OPT_DDLOG_RECORD:
>              record_file = optarg;
>              break;
> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
> index 73ecfbc2d..077bd1f41 100644
> --- a/northd/ovn-northd.8.xml
> +++ b/northd/ovn-northd.8.xml
> @@ -68,53 +68,26 @@
>            restarting a process or disturbing a running system.
>          </p>
>        </dd>
> -      <dt><code>--dummy-numa</code></dt>
> +      <dt><code>n-threads N</code></dt>
>        <dd>
> -        <p>
> -          Typically, OVS uses sysfs to determine the number of NUMA nodes and
> -          CPU cores that are available on a machine. The parallelization code
> -          in OVN uses this information to determine if there are enough
> -          resources to use parallelization. The current algorithm enables
> -          parallelization if the total number of CPU cores divided by the
> -          number of NUMA nodes is greater than or equal to four.
> -        </p>
> -
>          <p>
>            In certain situations, it may be desirable to enable parallelization
> -          on a system that otherwise would not have it allowed. The
> -          <code>--dummy-numa</code> option allows for you to fake the NUMA
> -          nodes and cores that OVS thinks your system has. The syntax consists
> -          of using numbers to represent the NUMA node IDs. The number of times
> -          that a NUMA node ID appears represents how many CPU cores that NUMA
> -          node contains. So for instance, if you did the following:
> -        </p>
> -
> -        <p>
> -          <code>--dummy-numa=0,0,0,0</code>
> -        </p>
> -
> -        <p>
> -          it would make OVS assume that you have a single NUMA node with ID 0,
> -          and that NUMA node consists of four CPU cores. Similarly, you could
> -          do:
> -        </p>
> -
> -        <p>
> -          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
> +          on a system to decrease latency (at the potential cost of increasing
> +          CPU usage).
>          </p>
>  
>          <p>
> -          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
> -          with six CPU cores.
> +          This option will cause ovn-northd to use N threads when building
> +          logical flows, when N is within [2-256].
> +          If N is 1, parallelization is disabled (default behavior).
> +          If N is less than 1, then N is set to 1, parallelization is disabled
> +          and a warning is logged.
> +          If N is more than 256, then N is set to 256, parallelization is
> +          enabled (with 256 threads) and a warning is logged.
>          </p>
>  
>          <p>
> -          Currently, the only affect this option has is on whether
> -          parallelization can be enabled in ovn-northd. There are no NUMA node
> -          or CPU core-specific actions performed by OVN. Setting
> -          <code>--dummy-numa</code> in ovn-northd does not affect how other OVS
> -          processes on the system (such as ovs-vswitchd) count the number of
> -          NUMA nodes and CPU cores; this setting is local to ovn-northd.
> +          ovn-northd-ddlog does not support this option.
>          </p>
>        </dd>
>      </dl>
> @@ -210,6 +183,27 @@
>          except for the northbound database client.
>        </p>
>        </dd>
> +
> +
> +      <dt><code>set-n-threads N</code></dt>
> +      <dd>
> +      <p>
> +        Set the number of threads used for building logical flows.
> +        When N is within [2-256], parallelization is enabled.
> +        When N is 1 parallelization is disabled.
> +        When N is less than 1 or more than 256, an error is returned.
> +        If ovn-northd fails to start parallelization (e.g. fails to setup
> +        semaphores, parallelization is disabled and an error is returned.
> +      </p>
> +      </dd>
> +
> +      <dt><code>get-n-threads</code></dt>
> +      <dd>
> +      <p>
> +        Return the number of threads used for building logical flows.
> +      </p>
> +      </dd>
> +
>        </dl>
>      </p>
>  
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 0a0f85010..b7363b16d 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -41,6 +41,7 @@
>  #include "unixctl.h"
>  #include "util.h"
>  #include "openvswitch/vlog.h"
> +#include "lib/ovn-parallel-hmap.h"
>  
>  VLOG_DEFINE_THIS_MODULE(ovn_northd);
>  
> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume;
>  static unixctl_cb_func ovn_northd_is_paused;
>  static unixctl_cb_func ovn_northd_status;
>  static unixctl_cb_func cluster_state_reset_cmd;
> +static unixctl_cb_func ovn_northd_set_thread_count_cmd;
> +static unixctl_cb_func ovn_northd_get_thread_count_cmd;
>  
>  struct northd_state {
>      bool had_lock;
>      bool paused;
>  };
>  
> +#define OVN_MAX_SUPPORTED_THREADS 256
> +
>  static const char *ovnnb_db;
>  static const char *ovnsb_db;
>  static const char *unixctl_path;
> @@ -525,7 +530,7 @@ Options:\n\
>    --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
>                              (default: %s)\n\
>    --dry-run                 start in paused state (do not commit db changes)\n\
> -  --dummy-numa              override default NUMA node and CPU core discovery\n\
> +  --n-threads=N             specify number of threads\n\
>    --unixctl=SOCKET          override default control socket name\n\
>    -h, --help                display this help message\n\
>    -o, --options             list available options\n\
> @@ -538,14 +543,14 @@ Options:\n\
>  
>  static void
>  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
> -              bool *paused)
> +              bool *paused, int *n_threads)
>  {
>      enum {
>          OVN_DAEMON_OPTION_ENUMS,
>          VLOG_OPTION_ENUMS,
>          SSL_OPTION_ENUMS,
>          OPT_DRY_RUN,
> -        OPT_DUMMY_NUMA,
> +        OPT_N_THREADS,
>      };
>      static const struct option long_options[] = {
>          {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>          {"options", no_argument, NULL, 'o'},
>          {"version", no_argument, NULL, 'V'},
>          {"dry-run", no_argument, NULL, OPT_DRY_RUN},
> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
> +        {"n-threads", required_argument, NULL, OPT_N_THREADS},
>          OVN_DAEMON_LONG_OPTIONS,
>          VLOG_LONG_OPTIONS,
>          STREAM_SSL_LONG_OPTIONS,
> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>              ovn_print_version(0, 0);
>              exit(EXIT_SUCCESS);
>  
> -        case OPT_DUMMY_NUMA:
> -            ovs_numa_set_dummy(optarg);
> +        case OPT_N_THREADS:
> +            *n_threads = strtoul(optarg, NULL, 10);
> +            if (*n_threads < 1) {
> +                *n_threads = 1;
> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
> +                    "set to : [%s]", *n_threads, optarg);
> +            }
> +            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
> +                *n_threads = OVN_MAX_SUPPORTED_THREADS;
> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
> +                    "set to : [%s]", *n_threads, optarg);
> +            }
> +            if (*n_threads != 1) {
> +                VLOG_INFO("Using %d threads", *n_threads);
> +            }
>              break;
>  
>          case OPT_DRY_RUN:
> @@ -668,6 +686,7 @@ main(int argc, char *argv[])
>      struct unixctl_server *unixctl;
>      int retval;
>      bool exiting;
> +    int n_threads = 1;
>      struct northd_state state = {
>          .had_lock = false,
>          .paused = false
> @@ -677,7 +696,7 @@ main(int argc, char *argv[])
>      ovs_cmdl_proctitle_init(argc, argv);
>      ovn_set_program_name(argv[0]);
>      service_start(&argc, &argv);
> -    parse_options(argc, argv, &state.paused);
> +    parse_options(argc, argv, &state.paused, &n_threads);
>  
>      daemonize_start(false);
>  
> @@ -704,6 +723,12 @@ main(int argc, char *argv[])
>      unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
>                               cluster_state_reset_cmd,
>                               &reset_ovnnb_idl_min_index);
> +    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1, 1,
> +                             ovn_northd_set_thread_count_cmd,
> +                             NULL);
> +    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
> +                             ovn_northd_get_thread_count_cmd,
> +                             NULL);
>  
>      daemonize_complete();
>  
> @@ -761,6 +786,8 @@ main(int argc, char *argv[])
>      unsigned int ovnnb_cond_seqno = UINT_MAX;
>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>  
> +    run_update_worker_pool(n_threads);
> +
>      /* Main loop. */
>      exiting = false;
>  
> @@ -1017,3 +1044,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
>      poll_immediate_wake();
>      unixctl_command_reply(conn, NULL);
>  }
> +
> +static void
> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
> +               const char *argv[], void *aux OVS_UNUSED)
> +{
> +    int n_threads = atoi(argv[1]);
> +
> +    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
> +        struct ds s = DS_EMPTY_INITIALIZER;
> +        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
> +        unixctl_command_reply_error(conn, ds_cstr(&s));
> +        ds_destroy(&s);
> +    } else {
> +        run_update_worker_pool(n_threads);
> +        unixctl_command_reply(conn, NULL);
> +    }
> +}
> +
> +static void
> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
> +               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
> +{
> +    struct ds s = DS_EMPTY_INITIALIZER;
> +    ds_put_format(&s, "%"PRIuSIZE"\n", get_worker_pool_size());
> +    unixctl_command_reply(conn, ds_cstr(&s));
> +    ds_destroy(&s);
> +}
> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
> index d78315c75..c6f0f6251 100644
> --- a/tests/ovn-macros.at
> +++ b/tests/ovn-macros.at
> @@ -170,8 +170,8 @@ ovn_start_northd() {
>          ovn-northd-ddlog) northd_args="$northd_args --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
>      esac
>  
> -    if test X$NORTHD_DUMMY_NUMA = Xyes; then
> -        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> +        northd_args="$northd_args --n-threads=4"
>      fi
>  
>      local name=${d_prefix}northd${suffix}
> @@ -252,10 +252,6 @@ ovn_start () {
>      else
>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>      fi
> -
> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
> -    fi
>  }
>  
>  # Interconnection networks.
> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS
>  
>  m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
>  
> -# Use --dummy-numa if system has low cores and we want to force parallelization
> -m4_define([NORTHD_DUMMY_NUMA],
> -  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
> -     then
> -       echo "yes"
> -     else
> -       echo "no"
> -     fi)
> -])
> -
>  # Defines a versions of a test with all combinations of northd and
>  # datapath groups.
>  m4_define([OVN_FOR_EACH_NORTHD],
> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD],
>  # Some tests aren't prepared for dp groups to be enabled.
>  m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
>    [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
> -     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
> -])])])
> -
> -# Test parallelization with dp groups enabled and disabled
> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> -    [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [no])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> -    [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -# Use --dummy-numa if system has low cores
> -m4_define([HOST_HAS_LOW_CORES], [
> -    if test $(nproc) -lt 4; then
> -        :
> -        $1
> -    else
> -        :
> -        $2
> -    fi
> -])
> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
> +
> +# Some tests aren't prepared for ddlog to be enabled.
> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
> +  [m4_foreach([NORTHD_TYPE], [ovn-northd],
> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
>  
> -m4_define([NORTHD_PARALLELIZATION], [
> -    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
> -])
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 17e37fb77..1a9d5ef76 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -7174,3 +7174,112 @@ ct_next(ct_state=new|trk);
>  
>  AT_CLEANUP
>  ])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization unixctl])
> +ovn_start
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [4
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 0], [2], [],
> +  [invalid n_threads: 0
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads -1], [2], [],
> +  [invalid n_threads: -1
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 300], [2], [],
> +  [invalid n_threads: 300
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads], [2], [],
> +  ["parallel-build/set-n-threads" command requires at least 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 2], [2], [],
> +  ["parallel-build/set-n-threads" command takes at most 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CLEANUP
> +])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization runtime])
> +ovn_start
> +
> +add_switch_ports() {
> +    for port in $(seq $1 $2); do
> +        logical_switch_port=lsp${port}
> +        check ovn-nbctl lsp-add ls1 $logical_switch_port
> +        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
> +    done
> +}
> +
> +# Build some rather heavy config and modify number of threads in the middle
> +check ovn-nbctl ls-add ls1
> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
> +
> +check ovn-nbctl lr-add lr1
> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router options:router-port=lrp0 addresses=dynamic
> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
> +add_switch_ports 1 50
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 51 100
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +add_switch_ports 101 150
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 151 200
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +add_switch_ports 201 250
> +check ovn-nbctl --wait=sb sync
> +
> +# Run 3 times: one with parallelization enabled, one with disabled, and one while changing
> +# Compare the flows produced by the three runs
> +# Ignore IP/MAC addresses
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
> +
> +# Restart with 1 thread
> +for port in $(seq 1 250); do
> +    logical_switch_port=lsp${port}
> +    check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
> +AT_CHECK([diff flows1 flows2])
> +
> +# Restart with with 8 threads
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +for port in $(seq 1 250); do
> +    logical_switch_port=lsp${port}
> +    check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
> +AT_CHECK([diff flows1 flows3])
> +
> +AT_CLEANUP
> +])
Dumitru Ceara May 18, 2022, 1 p.m. UTC | #3
On 5/18/22 14:59, Dumitru Ceara wrote:
> On 5/18/22 12:07, Xavier Simonart wrote:
>> This patch is intended to change the way to enable northd lflow build
>> parallelization, as well as enable runtime change of number of threads.
>> Before this patch, the following was needed to use parallelization:
>> - enable parallelization through use_parallel_build in NBDB
>> - use --dummy-numa to select number of threads.
>> This second part was needed as otherwise as many threads as cores in the system
>> were used, while parallelization showed some performance improvement only until
>> using around 4 (or maybe 8) threads.
>>
>> With this patch, the number of threads used for lflow parallel build can be
>> specified either:
>> - at startup, using --n-threads=<N> as ovn-northd command line option
>> - using unixctl
>> If the number of threads specified is > 1, then parallelization is enabled.
>> If the number is 1, parallelization is disabled.
>> If the number is < 1, parallelization is disabled at startup and a warning
>> is logged.
>> If the number is > 256, parallelization is enabled (with 256 threads) and
>> a warning is logged.
>>
>> The following unixctl have been added:
>> - set-n-threads <N>: set the number of treads used.
>> - get-n-threads: returns the number of threads used
>> If the number of threads is within <2-256> bounds, parallelization is enabled.
>> If the number of thread is 1, parallelization is disabled.
>> Otherwise an error is thrown.
>>
>> Note that, if set-n-threads failed for any reason (e.g. failure to setup some
>> semaphore), parallelization is disabled, and get-n-thread will return 1.
>>
>> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
>> Signed-off-by: Xavier Simonart <xsimonar@redhat.com>
>>
>> ---
>> v2:  - handled Dumitru's comments
>>      - added missing mutex_destroy
>>      - fixed issue when use_logical_dp_group is enabled after northd startup
>>      - rebased on top of main
>> v3:
>>      - fix mutex_destroy issue
>> v4:
>>      - handled Mark's comments
>>      - rebased on top of main
>> ---
>>  NEWS                      |   7 +
>>  lib/ovn-parallel-hmap.c   | 291 +++++++++++++++++++++-----------------
>>  lib/ovn-parallel-hmap.h   |  30 ++--
>>  northd/northd.c           | 176 ++++++++++++-----------
>>  northd/northd.h           |   1 +
>>  northd/ovn-northd-ddlog.c |   6 -
>>  northd/ovn-northd.8.xml   |  70 +++++----
>>  northd/ovn-northd.c       |  68 ++++++++-
>>  tests/ovn-macros.at       |  59 ++------
>>  tests/ovn-northd.at       | 109 ++++++++++++++
>>  10 files changed, 495 insertions(+), 322 deletions(-)
>>
> 
> Hi Xavier,
> 
> Sorry, I should've mentioned this earlier but I forgot.  Can you please
> also add an option to ovn-ctl to allow specifiying the number of threads?
> 
> Something along the lines of:
> 
> ovn-ctl start_northd .. --ovn-northd-n-threads=42
> 
> This would allow CMSs that use ovn-ctl [0] to enable parallelization.
> For ovn-org/ovn-kubernetes in particular, we could even make it such
> that in the ovn-org/ovn repo the ovn-kubernetes CI run we do runs both
> with and without northd parallelization.
> 
> Overall the code looks good but given that we probably also want the
> ovn-ctl change I also left a few very small style comments below.
> 
> Thanks,
> Dumitru

[0]
https://github.com/ovn-org/ovn-kubernetes/blob/4d79168271dcb22e8f2a580f5e75e5e38232224b/dist/images/ovnkube.sh#L857

> 
>> diff --git a/NEWS b/NEWS
>> index 244824e3f..6e489df32 100644
>> --- a/NEWS
>> +++ b/NEWS
>> @@ -9,6 +9,13 @@ Post v22.03.0
>>      implicit drop behavior on logical switches with ACLs applied.
>>    - Support (LSP.options:qos_min_rate) to guarantee minimal bandwidth available
>>      for a logical port.
>> +  - Changed the way to enable northd parallelization.
>> +    Removed support for:
>> +    - use_parallel_build in NBDB.
>> +    - --dummy-numa in northd cmdline.
>> +    Added support for:
>> +    -  --n-threads=<N> in northd cmdline.
>> +    - set-n-threads/get-n-threads unixctls.
>>  
>>  OVN v22.03.0 - 11 Mar 2022
>>  --------------------------
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> index 7edc4c0b6..c4d8cee16 100644
>> --- a/lib/ovn-parallel-hmap.c
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -38,14 +38,10 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>  
>>  #ifndef OVS_HAS_PARALLEL_HMAP
>>  
>> -#define WORKER_SEM_NAME "%x-%p-%x"
>> +#define WORKER_SEM_NAME "%x-%p-%"PRIxSIZE
>>  #define MAIN_SEM_NAME "%x-%p-main"
>>  
>> -/* These are accessed under mutex inside add_worker_pool().
>> - * They do not need to be atomic.
>> - */
>>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>> -static bool can_parallelize = false;
>>  
>>  /* This is set only in the process of exit and the set is
>>   * accompanied by a fence. It does not need to be atomic or be
>> @@ -57,18 +53,18 @@ static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>>  
>>  static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>>  
>> -static int pool_size;
>> +static size_t pool_size = 1;
>>  
>>  static int sembase;
>>  
>>  static void worker_pool_hook(void *aux OVS_UNUSED);
>> -static void setup_worker_pools(bool force);
>> +static void setup_worker_pools(void);
>>  static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>                                 void *fin_result, void *result_frags,
>> -                               int index);
>> +                               size_t index);
>>  static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>                                 void *fin_result, void *result_frags,
>> -                               int index);
>> +                               size_t index);
>>  
>>  bool
>>  ovn_stop_parallel_processing(void)
>> @@ -76,107 +72,184 @@ ovn_stop_parallel_processing(void)
>>      return workers_must_exit;
>>  }
>>  
>> -bool
>> -ovn_can_parallelize_hashes(bool force_parallel)
>> +size_t
>> +ovn_get_worker_pool_size(void)
>>  {
>> -    bool test = false;
>> +    return pool_size;
>> +}
>>  
>> -    if (atomic_compare_exchange_strong(
>> -            &initial_pool_setup,
>> -            &test,
>> -            true)) {
>> -        ovs_mutex_lock(&init_mutex);
>> -        setup_worker_pools(force_parallel);
>> -        ovs_mutex_unlock(&init_mutex);
>> +static void
>> +stop_controls(struct worker_pool *pool)
>> +{
>> +    if (pool->controls) {
>> +        workers_must_exit = true;
>> +
>> +        /* unlock threads */
> 
> Nit: /* Unlock threads. */
> 
>> +        for (size_t i = 0; i < pool->size ; i++) {
>> +            if (pool->controls[i].fire != SEM_FAILED) {
>> +                sem_post(pool->controls[i].fire);
>> +            }
>> +        }
>> +
>> +        /* Wait completion */
> 
> Nit: /* Wait for completion. */
> 
>> +        for (size_t i = 0; i < pool->size ; i++) {
>> +            if (pool->controls[i].worker) {
>> +                pthread_join(pool->controls[i].worker, NULL);
>> +                pool->controls[i].worker = 0;
>> +            }
>> +        }
>> +        workers_must_exit = false;
>>      }
>> -    return can_parallelize;
>>  }
>>  
>> -struct worker_pool *
>> -ovn_add_worker_pool(void *(*start)(void *))
>> +static void
>> +free_controls(struct worker_pool *pool)
>> +{
>> +    char sem_name[256];
>> +    if (pool->controls) {
>> +        /* Close/unlink semaphores */
> 
> Nit: /* Close/unlink semaphores. */
> 
>> +        for (size_t i = 0; i < pool->size; i++) {
>> +            ovs_mutex_destroy(&pool->controls[i].mutex);
>> +            if (pool->controls[i].fire != SEM_FAILED) {
>> +                sem_close(pool->controls[i].fire);
>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> +                sem_unlink(sem_name);
>> +            } else {
>> +               /* This and following controls are not initialized */
> 
> Nit: indent needs one extra space.
> 
>> +                break;
>> +            }
>> +        }
>> +        free(pool->controls);
>> +        pool->controls = NULL;
>> +    }
>> +}
>> +
>> +static void
>> +free_pool(struct worker_pool *pool)
>> +{
>> +    char sem_name[256];
>> +    stop_controls(pool);
>> +    free_controls(pool);
>> +    if (pool->done != SEM_FAILED) {
>> +        sem_close(pool->done);
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> +        sem_unlink(sem_name);
>> +    }
>> +    free(pool);
>> +}
>> +
>> +static int
>> +init_controls(struct worker_pool *pool)
>>  {
>> -    struct worker_pool *new_pool = NULL;
>>      struct worker_control *new_control;
>> +    char sem_name[256];
>> +
>> +    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
>> +    for (size_t i = 0; i < pool->size ; i++) {
>> +        pool->controls[i].fire = SEM_FAILED;
>> +    }
>> +    for (size_t i = 0; i < pool->size; i++) {
>> +        new_control = &pool->controls[i];
>> +        new_control->id = i;
>> +        new_control->done = pool->done;
>> +        new_control->data = NULL;
>> +        new_control->pool = pool;
>> +        new_control->worker = 0;
>> +        ovs_mutex_init(&new_control->mutex);
>> +        new_control->finished = ATOMIC_VAR_INIT(false);
>> +        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> +        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +        if (new_control->fire == SEM_FAILED) {
>> +            free_controls(pool);
>> +            return -1;
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +
>> +static void
>> +init_threads(struct worker_pool *pool, void *(*start)(void *))
>> +{
>> +    for (size_t i = 0; i < pool_size; i++) {
>> +        pool->controls[i].worker =
>> +            ovs_thread_create("worker pool helper", start, &pool->controls[i]);
>> +    }
>> +    ovs_list_push_back(&worker_pools, &pool->list_node);
>> +}
>> +
>> +enum pool_update_status
>> +ovn_update_worker_pool(size_t requested_pool_size,
>> +                       struct worker_pool **pool, void *(*start)(void *))
>> +{
>>      bool test = false;
>> -    int i;
>>      char sem_name[256];
>>  
>> -    /* Belt and braces - initialize the pool system just in case if
>> -     * if it is not yet initialized.
>> -     */
>> +    if (requested_pool_size == pool_size) {
>> +        return POOL_UNCHANGED;
>> +    }
>> +
>>      if (atomic_compare_exchange_strong(
>>              &initial_pool_setup,
>>              &test,
>>              true)) {
>>          ovs_mutex_lock(&init_mutex);
>> -        setup_worker_pools(false);
>> +        setup_worker_pools();
>>          ovs_mutex_unlock(&init_mutex);
>>      }
>> -
>>      ovs_mutex_lock(&init_mutex);
>> -    if (can_parallelize) {
>> -        new_pool = xmalloc(sizeof(struct worker_pool));
>> -        new_pool->size = pool_size;
>> -        new_pool->controls = NULL;
>> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> -        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> -        if (new_pool->done == SEM_FAILED) {
>> -            goto cleanup;
>> -        }
>> -
>> -        new_pool->controls =
>> -            xmalloc(sizeof(struct worker_control) * new_pool->size);
>> -
>> -        for (i = 0; i < new_pool->size; i++) {
>> -            new_control = &new_pool->controls[i];
>> -            new_control->id = i;
>> -            new_control->done = new_pool->done;
>> -            new_control->data = NULL;
>> -            ovs_mutex_init(&new_control->mutex);
>> -            new_control->finished = ATOMIC_VAR_INIT(false);
>> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> -            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> -            if (new_control->fire == SEM_FAILED) {
>> +    pool_size = requested_pool_size;
>> +    VLOG_INFO("Setting thread count to %"PRIuSIZE, pool_size);
>> +
>> +    if (*pool == NULL) {
>> +        if (pool_size > 1) {
>> +            VLOG_INFO("Creating new pool with size %"PRIuSIZE, pool_size);
>> +            *pool = xmalloc(sizeof(struct worker_pool));
>> +            (*pool)->size = pool_size;
>> +            (*pool)->controls = NULL;
>> +            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
>> +            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +            if ((*pool)->done == SEM_FAILED) {
>>                  goto cleanup;
>>              }
>> +            if (init_controls(*pool) == -1) {
>> +                goto cleanup;
>> +            }
>> +            init_threads(*pool, start);
>>          }
>> -
>> -        for (i = 0; i < pool_size; i++) {
>> -            new_pool->controls[i].worker =
>> -                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>> +    } else {
>> +        if (pool_size > 1) {
>> +            VLOG_INFO("Changing size of existing pool to %"PRIuSIZE,
>> +                      pool_size);
>> +            stop_controls(*pool);
>> +            free_controls(*pool);
>> +            ovs_list_remove(&(*pool)->list_node);
>> +            (*pool)->size = pool_size;
>> +            if (init_controls(*pool) == -1) {
>> +                goto cleanup;
>> +            }
>> +            init_threads(*pool, start);
>> +        } else {
>> +            VLOG_INFO("Deleting existing pool");
>> +            worker_pool_hook(NULL);
>> +            *pool = NULL;
>>          }
>> -        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>      }
>>      ovs_mutex_unlock(&init_mutex);
>> -    return new_pool;
>> -cleanup:
>> +    return POOL_UPDATED;
>>  
>> +cleanup:
>>      /* Something went wrong when opening semaphores. In this case
>>       * it is better to shut off parallel procesing altogether
>>       */
>> -
>> -    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>> -    can_parallelize = false;
>> -    if (new_pool->controls) {
>> -        for (i = 0; i < new_pool->size; i++) {
>> -            if (new_pool->controls[i].fire != SEM_FAILED) {
>> -                sem_close(new_pool->controls[i].fire);
>> -                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> -                sem_unlink(sem_name);
>> -                break; /* semaphores past this one are uninitialized */
>> -            }
>> -        }
>> -    }
>> -    if (new_pool->done != SEM_FAILED) {
>> -        sem_close(new_pool->done);
>> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> -        sem_unlink(sem_name);
>> -    }
>> +    VLOG_ERR("Failed to initialize parallel processing: %s",
>> +             ovs_strerror(errno));
>> +    free_pool(*pool);
>> +    *pool = NULL;
>> +    pool_size = 1;
>>      ovs_mutex_unlock(&init_mutex);
>> -    return NULL;
>> +    return POOL_UPDATE_FAILED;
>>  }
>>  
>> -
>>  /* Initializes 'hmap' as an empty hash table with mask N. */
>>  void
>>  ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>> @@ -225,9 +298,9 @@ ovn_run_pool_callback(struct worker_pool *pool,
>>                        void *fin_result, void *result_frags,
>>                        void (*helper_func)(struct worker_pool *pool,
>>                                            void *fin_result,
>> -                                          void *result_frags, int index))
>> +                                          void *result_frags, size_t index))
>>  {
>> -    int index, completed;
>> +    size_t index, completed;
>>  
>>      /* Ensure that all worker threads see the same data as the
>>       * main thread.
>> @@ -367,9 +440,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>>  
>>  static void
>>  worker_pool_hook(void *aux OVS_UNUSED) {
>> -    int i;
>>      static struct worker_pool *pool;
>> -    char sem_name[256];
>>  
>>      workers_must_exit = true;
>>  
>> @@ -380,55 +451,15 @@ worker_pool_hook(void *aux OVS_UNUSED) {
>>       */
>>      atomic_thread_fence(memory_order_acq_rel);
>>  
>> -    /* Wake up the workers after the must_exit flag has been set */
>> -
>> -    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> -        for (i = 0; i < pool->size ; i++) {
>> -            sem_post(pool->controls[i].fire);
>> -        }
>> -        for (i = 0; i < pool->size ; i++) {
>> -            pthread_join(pool->controls[i].worker, NULL);
>> -        }
>> -        for (i = 0; i < pool->size ; i++) {
>> -            sem_close(pool->controls[i].fire);
>> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> -            sem_unlink(sem_name);
>> -        }
>> -        sem_close(pool->done);
>> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> -        sem_unlink(sem_name);
>> +    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
>> +        ovs_list_remove(&pool->list_node);
>> +        free_pool(pool);
>>      }
>>  }
>>  
>>  static void
>> -setup_worker_pools(bool force) {
>> -    int cores, nodes;
>> -
>> -    ovs_numa_init();
>> -    nodes = ovs_numa_get_n_numas();
>> -    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>> -        nodes = 1;
>> -    }
>> -    cores = ovs_numa_get_n_cores();
>> -
>> -    /* If there is no NUMA config, use 4 cores.
>> -     * If there is NUMA config use half the cores on
>> -     * one node so that the OS does not start pushing
>> -     * threads to other nodes.
>> -     */
>> -    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>> -        /* If there is no NUMA we can try the ovs-threads routine.
>> -         * It falls back to sysconf and/or affinity mask.
>> -         */
>> -        cores = count_cpu_cores();
>> -        pool_size = cores;
>> -    } else {
>> -        pool_size = cores / nodes;
>> -    }
>> -    if ((pool_size < 4) && force) {
>> -        pool_size = 4;
>> -    }
>> -    can_parallelize = (pool_size >= 3);
>> +setup_worker_pools(void)
>> +{
>>      fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>>      sembase = random_uint32();
>>  }
>> @@ -436,7 +467,7 @@ setup_worker_pools(bool force) {
>>  static void
>>  merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>                     void *fin_result, void *result_frags,
>> -                   int index)
>> +                   size_t index)
>>  {
>>      struct ovs_list *result = (struct ovs_list *)fin_result;
>>      struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>> @@ -450,7 +481,7 @@ merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>  static void
>>  merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>                     void *fin_result, void *result_frags,
>> -                   int index)
>> +                   size_t index)
>>  {
>>      struct hmap *result = (struct hmap *)fin_result;
>>      struct hmap *res_frags = (struct hmap *)result_frags;
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> index 0f7d68770..72b31b489 100644
>> --- a/lib/ovn-parallel-hmap.h
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -81,21 +81,30 @@ struct worker_control {
>>      sem_t *done; /* Work completion semaphore - sem_post on completion. */
>>      struct ovs_mutex mutex; /* Guards the data. */
>>      void *data; /* Pointer to data to be processed. */
>> -    void *workload; /* back-pointer to the worker pool structure. */
>>      pthread_t worker;
>> +    struct worker_pool *pool;
>>  };
>>  
>>  struct worker_pool {
>> -    int size;   /* Number of threads in the pool. */
>> +    size_t size;   /* Number of threads in the pool. */
>>      struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>>      struct worker_control *controls; /* "Handles" in this pool. */
>>      sem_t *done; /* Work completion semaphorew. */
>>  };
>>  
>> -/* Add a worker pool for thread function start() which expects a pointer to
>> - * a worker_control structure as an argument. */
>> +/* Return pool size; bigger than 1 means parallelization has been enabled. */
>> +size_t ovn_get_worker_pool_size(void);
>>  
>> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>> +enum pool_update_status {
>> +     POOL_UNCHANGED,     /* no change to pool */
>> +     POOL_UPDATED,       /* pool has been updated */
>> +     POOL_UPDATE_FAILED, /* pool update failed; parallelization disabled */
>> +};
> 
> Nit: I would add an empty line here.
> 
>> +/* Add/delete a worker pool for thread function start() which expects a pointer
>> + * to a worker_control structure as an argument. Return true if updated */
>> +enum pool_update_status ovn_update_worker_pool(size_t requested_pool_size,
>> +                                               struct worker_pool **,
>> +                                               void *(*start)(void *));
>>  
>>  /* Setting this to true will make all processing threads exit */
>>  
>> @@ -140,7 +149,8 @@ void ovn_run_pool_list(struct worker_pool *pool,
>>  void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>>                             void *result_frags,
>>                             void (*helper_func)(struct worker_pool *pool,
>> -                           void *fin_result, void *result_frags, int index));
>> +                           void *fin_result, void *result_frags,
>> +                           size_t index));
>>  
>>  
>>  /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>> @@ -251,17 +261,17 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>>      hrl->row_locks = NULL;
>>  }
>>  
>> -bool ovn_can_parallelize_hashes(bool force_parallel);
>> -
>>  /* Use the OVN library functions for stuff which OVS has not defined
>>   * If OVS has defined these, they will still compile using the OVN
>>   * local names, but will be dropped by the linker in favour of the OVS
>>   * supplied functions.
>>   */
>> +#define update_worker_pool(requested_pool_size, existing_pool, func) \
>> +    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
>>  
>> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>> +#define get_worker_pool_size() ovn_get_worker_pool_size()
>>  
>> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>>  
>>  #define stop_parallel_processing() ovn_stop_parallel_processing()
>>  
>> diff --git a/northd/northd.c b/northd/northd.c
>> index 67c39df88..48426c801 100644
>> --- a/northd/northd.c
>> +++ b/northd/northd.c
>> @@ -59,6 +59,7 @@
>>  VLOG_DEFINE_THIS_MODULE(northd);
>>  
>>  static bool controller_event_en;
>> +static bool lflow_hash_lock_initialized = false;
>>  
>>  static bool check_lsp_is_up;
>>  
>> @@ -4740,7 +4741,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od,
>>  /* If this option is 'true' northd will combine logical flows that differ by
>>   * logical datapath only by creating a datapath group. */
>>  static bool use_logical_dp_groups = false;
>> -static bool use_parallel_build = false;
>> +
>> +enum {
>> +    STATE_NULL,               /* parallelization is off */
>> +    STATE_INIT_HASH_SIZES,    /* parallelization is on; hashes sizing needed */
>> +    STATE_USE_PARALLELIZATION /* parallelization is on */
>> +};
>> +static int parallelization_state = STATE_NULL;
>>  
>>  static void
>>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>> @@ -4759,7 +4766,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>>      lflow->ctrl_meter = ctrl_meter;
>>      lflow->dpg = NULL;
>>      lflow->where = where;
>> -    if (use_parallel_build && use_logical_dp_groups) {
>> +    if ((parallelization_state != STATE_NULL)
>> +        && use_logical_dp_groups) {
>>          ovs_mutex_init(&lflow->odg_lock);
>>      }
>>  }
>> @@ -4773,7 +4781,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
>>          return false;
>>      }
>>  
>> -    if (use_parallel_build) {
>> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>>          ovs_mutex_lock(&lflow_ref->odg_lock);
>>          hmapx_add(&lflow_ref->od_group, od);
>>          ovs_mutex_unlock(&lflow_ref->odg_lock);
>> @@ -4803,9 +4811,23 @@ static struct ovs_mutex lflow_hash_locks[LFLOW_HASH_LOCK_MASK + 1];
>>  static void
>>  lflow_hash_lock_init(void)
>>  {
>> -    for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>> -        ovs_mutex_init(&lflow_hash_locks[i]);
>> +    if (!lflow_hash_lock_initialized) {
>> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>> +            ovs_mutex_init(&lflow_hash_locks[i]);
>> +        }
>> +        lflow_hash_lock_initialized = true;
>> +    }
>> +}
>> +
>> +static void
>> +lflow_hash_lock_destroy(void)
>> +{
>> +    if (lflow_hash_lock_initialized) {
>> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>> +            ovs_mutex_destroy(&lflow_hash_locks[i]);
>> +        }
>>      }
>> +    lflow_hash_lock_initialized = false;
>>  }
>>  
>>  /* This thread-local var is used for parallel lflow building when dp-groups is
>> @@ -4853,7 +4875,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
>>                     nullable_xstrdup(ctrl_meter),
>>                     ovn_lflow_hint(stage_hint), where);
>>      hmapx_add(&lflow->od_group, od);
>> -    if (!use_parallel_build) {
>> +    if (parallelization_state != STATE_USE_PARALLELIZATION) {
>>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>>      } else {
>>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> @@ -4896,7 +4918,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
>>      struct ovn_lflow *lflow;
>>  
>>      ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
>> -    if (use_logical_dp_groups && use_parallel_build) {
>> +    if (use_logical_dp_groups
>> +        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
>>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>>                                      match, actions, io_port, stage_hint, where,
>>                                      ctrl_meter);
>> @@ -4982,6 +5005,10 @@ static void
>>  ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
>>  {
>>      if (lflow) {
>> +        if ((parallelization_state != STATE_NULL)
>> +            && use_logical_dp_groups) {
>> +            ovs_mutex_destroy(&lflow->odg_lock);
>> +        }
>>          if (lflows) {
>>              hmap_remove(lflows, &lflow->hmap_node);
>>          }
>> @@ -13925,15 +13952,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>                                        &lsi->actions);
>>  }
>>  
>> -struct lflows_thread_pool {
>> -    struct worker_pool *pool;
>> -};
>> -
>>  static void *
>>  build_lflows_thread(void *arg)
>>  {
>>      struct worker_control *control = (struct worker_control *) arg;
>> -    struct lflows_thread_pool *workload;
>>      struct lswitch_flow_build_info *lsi;
>>  
>>      struct ovn_datapath *od;
>> @@ -13944,17 +13966,16 @@ build_lflows_thread(void *arg)
>>  
>>      while (!stop_parallel_processing()) {
>>          wait_for_work(control);
>> -        workload = (struct lflows_thread_pool *) control->workload;
>>          lsi = (struct lswitch_flow_build_info *) control->data;
>>          if (stop_parallel_processing()) {
>>              return NULL;
>>          }
>>          thread_lflow_counter = 0;
>> -        if (lsi && workload) {
>> +        if (lsi) {
>>              /* Iterate over bucket ThreadID, ThreadID+size, ... */
>>              for (bnum = control->id;
>>                      bnum <= lsi->datapaths->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
>>                      if (stop_parallel_processing()) {
>> @@ -13965,7 +13986,7 @@ build_lflows_thread(void *arg)
>>              }
>>              for (bnum = control->id;
>>                      bnum <= lsi->ports->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
>>                      if (stop_parallel_processing()) {
>> @@ -13976,7 +13997,7 @@ build_lflows_thread(void *arg)
>>              }
>>              for (bnum = control->id;
>>                      bnum <= lsi->lbs->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
>>                      if (stop_parallel_processing()) {
>> @@ -13997,7 +14018,7 @@ build_lflows_thread(void *arg)
>>              }
>>              for (bnum = control->id;
>>                      bnum <= lsi->igmp_groups->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (
>>                          igmp_group, hmap_node, bnum, lsi->igmp_groups) {
>> @@ -14016,39 +14037,13 @@ build_lflows_thread(void *arg)
>>      return NULL;
>>  }
>>  
>> -static bool pool_init_done = false;
>> -static struct lflows_thread_pool *build_lflows_pool = NULL;
>> -
>> -static void
>> -init_lflows_thread_pool(void)
>> -{
>> -    int index;
>> -
>> -    if (!pool_init_done) {
>> -        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> -        pool_init_done = true;
>> -        if (pool) {
>> -            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
>> -            build_lflows_pool->pool = pool;
>> -            for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> -                build_lflows_pool->pool->controls[index].workload =
>> -                    build_lflows_pool;
>> -            }
>> -        }
>> -    }
>> -}
>> -
>> -/* TODO: replace hard cutoffs by configurable via commands. These are
>> - * temporary defines to determine single-thread to multi-thread processing
>> - * cutoff.
>> - * Setting to 1 forces "all parallel" lflow build.
>> - */
>> +static struct worker_pool *build_lflows_pool = NULL;
>>  
>>  static void
>>  noop_callback(struct worker_pool *pool OVS_UNUSED,
>>                void *fin_result OVS_UNUSED,
>>                void *result_frags OVS_UNUSED,
>> -              int index OVS_UNUSED)
>> +              size_t index OVS_UNUSED)
>>  {
>>      /* Do nothing */
>>  }
>> @@ -14088,28 +14083,21 @@ build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
>>  
>>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>  
>> -    if (use_parallel_build) {
>> -        init_lflows_thread_pool();
>> -        if (!can_parallelize_hashes(false)) {
>> -            use_parallel_build = false;
>> -        }
>> -    }
>> -
>> -    if (use_parallel_build) {
>> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>>          struct hmap *lflow_segs;
>>          struct lswitch_flow_build_info *lsiv;
>>          int index;
>>  
>> -        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
>> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
>>          if (use_logical_dp_groups) {
>>              lflow_segs = NULL;
>>          } else {
>> -            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
>> +            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
>>          }
>>  
>>          /* Set up "work chunks" for each thread to work on. */
>>  
>> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +        for (index = 0; index < build_lflows_pool->size; index++) {
>>              if (use_logical_dp_groups) {
>>                  /* if dp_groups are in use we lock a shared lflows hash
>>                   * on a per-bucket level instead of merging hash frags */
>> @@ -14132,19 +14120,19 @@ build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
>>              ds_init(&lsiv[index].match);
>>              ds_init(&lsiv[index].actions);
>>  
>> -            build_lflows_pool->pool->controls[index].data = &lsiv[index];
>> +            build_lflows_pool->controls[index].data = &lsiv[index];
>>          }
>>  
>>          /* Run thread pool. */
>>          if (use_logical_dp_groups) {
>> -            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
>> +            run_pool_callback(build_lflows_pool, NULL, NULL,
>>                                noop_callback);
>> -            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
>> +            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
>>          } else {
>> -            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
>>          }
>>  
>> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +        for (index = 0; index < build_lflows_pool->size; index++) {
>>              ds_destroy(&lsiv[index].match);
>>              ds_destroy(&lsiv[index].actions);
>>          }
>> @@ -14280,8 +14268,39 @@ ovn_sb_set_lflow_logical_dp_group(
>>  }
>>  
>>  static ssize_t max_seen_lflow_size = 128;
>> -static bool needs_parallel_init = true;
>> -static bool reset_parallel = false;
>> +
>> +void run_update_worker_pool(int n_threads)
>> +{
>> +    /* If number of threads has been updated (or initially set),
>> +     * update the worker pool. */
>> +    if (update_worker_pool(n_threads, &build_lflows_pool,
>> +                           build_lflows_thread) != POOL_UNCHANGED) {
>> +        /* worker pool was updated */
>> +        if (get_worker_pool_size() <= 1) {
>> +            /* destroy potentially created lflow_hash_lock */
>> +            lflow_hash_lock_destroy();
>> +            parallelization_state = STATE_NULL;
>> +        } else if (parallelization_state != STATE_USE_PARALLELIZATION) {
>> +            if (use_logical_dp_groups) {
>> +                lflow_hash_lock_init();
>> +                parallelization_state = STATE_INIT_HASH_SIZES;
>> +            } else {
>> +                parallelization_state = STATE_USE_PARALLELIZATION;
>> +            }
>> +        }
>> +    }
>> +}
>> +
>> +static void worker_pool_init_for_ldp(void)
>> +{
>> +    /* If parallelization is enabled, make sure locks are initialized
>> +     * when ldp are used.
>> +     */
>> +    if (parallelization_state != STATE_NULL) {
>> +        lflow_hash_lock_init();
>> +        parallelization_state = STATE_INIT_HASH_SIZES;
>> +    }
>> +}
>>  
>>  static void
>>  build_mcast_groups(struct lflow_input *data,
>> @@ -14302,30 +14321,18 @@ void build_lflows(struct lflow_input *input_data,
>>      build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
>>                         &mcast_groups, &igmp_groups);
>>  
>> -    if (reset_parallel) {
>> -        /* Parallel build was disabled before, we need to
>> -         * re-enable it. */
>> -        use_parallel_build = true;
>> -        reset_parallel = false;
>> -    }
>> -
>>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>>  
>> -    if (use_parallel_build && use_logical_dp_groups &&
>> -        needs_parallel_init) {
>> -        lflow_hash_lock_init();
>> -        needs_parallel_init = false;
>> -        /* Disable parallel build on first run with dp_groups
>> -         * to determine the correct sizing of hashes. */
>> -        use_parallel_build = false;
>> -        reset_parallel = true;
>> -    }
>>      build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
>>                                      input_data->port_groups, &lflows,
>>                                      &mcast_groups, &igmp_groups,
>>                                      input_data->meter_groups, input_data->lbs,
>>                                      input_data->bfd_connections);
>>  
>> +    if (parallelization_state == STATE_INIT_HASH_SIZES) {
>> +        parallelization_state = STATE_USE_PARALLELIZATION;
>> +    }
>> +
>>      /* Parallel build may result in a suboptimal hash. Resize the
>>       * hash to a correct size before doing lookups */
>>  
>> @@ -15567,12 +15574,13 @@ ovnnb_db_run(struct northd_input *input_data,
>>  
>>      smap_destroy(&options);
>>  
>> -    use_parallel_build =
>> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
>> -         can_parallelize_hashes(false));
>> -
>> +    bool old_use_ldp = use_logical_dp_groups;
>>      use_logical_dp_groups = smap_get_bool(&nb->options,
>>                                            "use_logical_dp_groups", true);
>> +    if (use_logical_dp_groups && !old_use_ldp) {
>> +        worker_pool_init_for_ldp();
>> +    }
>> +
>>      use_ct_inv_match = smap_get_bool(&nb->options,
>>                                       "use_ct_inv_match", true);
>>  
>> diff --git a/northd/northd.h b/northd/northd.h
>> index 2d804a22e..fe8dad03a 100644
>> --- a/northd/northd.h
>> +++ b/northd/northd.h
>> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data,
>>                       struct hmap *bfd_connections, struct hmap *ports);
>>  void bfd_cleanup_connections(struct lflow_input *input_data,
>>                               struct hmap *bfd_map);
>> +void run_update_worker_pool(int n_threads);
>>  
>>  #endif /* NORTHD_H */
>> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
>> index 718d052cc..e9afda4c6 100644
>> --- a/northd/ovn-northd-ddlog.c
>> +++ b/northd/ovn-northd-ddlog.c
>> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>>          SSL_OPTION_ENUMS,
>>          OPT_DRY_RUN,
>>          OPT_DDLOG_RECORD,
>> -        OPT_DUMMY_NUMA,
>>      };
>>      static const struct option long_options[] = {
>>          {"ovnsb-db", required_argument, NULL, 'd'},
>> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>>          OVN_DAEMON_LONG_OPTIONS,
>>          VLOG_LONG_OPTIONS,
>>          STREAM_SSL_LONG_OPTIONS,
>> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>>          {NULL, 0, NULL, 0},
>>      };
>>      char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
>> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>>              *pause = true;
>>              break;
>>  
>> -        case OPT_DUMMY_NUMA:
>> -            ovs_numa_set_dummy(optarg);
>> -            break;
>> -
>>          case OPT_DDLOG_RECORD:
>>              record_file = optarg;
>>              break;
>> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
>> index 73ecfbc2d..077bd1f41 100644
>> --- a/northd/ovn-northd.8.xml
>> +++ b/northd/ovn-northd.8.xml
>> @@ -68,53 +68,26 @@
>>            restarting a process or disturbing a running system.
>>          </p>
>>        </dd>
>> -      <dt><code>--dummy-numa</code></dt>
>> +      <dt><code>n-threads N</code></dt>
>>        <dd>
>> -        <p>
>> -          Typically, OVS uses sysfs to determine the number of NUMA nodes and
>> -          CPU cores that are available on a machine. The parallelization code
>> -          in OVN uses this information to determine if there are enough
>> -          resources to use parallelization. The current algorithm enables
>> -          parallelization if the total number of CPU cores divided by the
>> -          number of NUMA nodes is greater than or equal to four.
>> -        </p>
>> -
>>          <p>
>>            In certain situations, it may be desirable to enable parallelization
>> -          on a system that otherwise would not have it allowed. The
>> -          <code>--dummy-numa</code> option allows for you to fake the NUMA
>> -          nodes and cores that OVS thinks your system has. The syntax consists
>> -          of using numbers to represent the NUMA node IDs. The number of times
>> -          that a NUMA node ID appears represents how many CPU cores that NUMA
>> -          node contains. So for instance, if you did the following:
>> -        </p>
>> -
>> -        <p>
>> -          <code>--dummy-numa=0,0,0,0</code>
>> -        </p>
>> -
>> -        <p>
>> -          it would make OVS assume that you have a single NUMA node with ID 0,
>> -          and that NUMA node consists of four CPU cores. Similarly, you could
>> -          do:
>> -        </p>
>> -
>> -        <p>
>> -          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
>> +          on a system to decrease latency (at the potential cost of increasing
>> +          CPU usage).
>>          </p>
>>  
>>          <p>
>> -          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
>> -          with six CPU cores.
>> +          This option will cause ovn-northd to use N threads when building
>> +          logical flows, when N is within [2-256].
>> +          If N is 1, parallelization is disabled (default behavior).
>> +          If N is less than 1, then N is set to 1, parallelization is disabled
>> +          and a warning is logged.
>> +          If N is more than 256, then N is set to 256, parallelization is
>> +          enabled (with 256 threads) and a warning is logged.
>>          </p>
>>  
>>          <p>
>> -          Currently, the only affect this option has is on whether
>> -          parallelization can be enabled in ovn-northd. There are no NUMA node
>> -          or CPU core-specific actions performed by OVN. Setting
>> -          <code>--dummy-numa</code> in ovn-northd does not affect how other OVS
>> -          processes on the system (such as ovs-vswitchd) count the number of
>> -          NUMA nodes and CPU cores; this setting is local to ovn-northd.
>> +          ovn-northd-ddlog does not support this option.
>>          </p>
>>        </dd>
>>      </dl>
>> @@ -210,6 +183,27 @@
>>          except for the northbound database client.
>>        </p>
>>        </dd>
>> +
>> +
>> +      <dt><code>set-n-threads N</code></dt>
>> +      <dd>
>> +      <p>
>> +        Set the number of threads used for building logical flows.
>> +        When N is within [2-256], parallelization is enabled.
>> +        When N is 1 parallelization is disabled.
>> +        When N is less than 1 or more than 256, an error is returned.
>> +        If ovn-northd fails to start parallelization (e.g. fails to setup
>> +        semaphores, parallelization is disabled and an error is returned.
>> +      </p>
>> +      </dd>
>> +
>> +      <dt><code>get-n-threads</code></dt>
>> +      <dd>
>> +      <p>
>> +        Return the number of threads used for building logical flows.
>> +      </p>
>> +      </dd>
>> +
>>        </dl>
>>      </p>
>>  
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 0a0f85010..b7363b16d 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -41,6 +41,7 @@
>>  #include "unixctl.h"
>>  #include "util.h"
>>  #include "openvswitch/vlog.h"
>> +#include "lib/ovn-parallel-hmap.h"
>>  
>>  VLOG_DEFINE_THIS_MODULE(ovn_northd);
>>  
>> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume;
>>  static unixctl_cb_func ovn_northd_is_paused;
>>  static unixctl_cb_func ovn_northd_status;
>>  static unixctl_cb_func cluster_state_reset_cmd;
>> +static unixctl_cb_func ovn_northd_set_thread_count_cmd;
>> +static unixctl_cb_func ovn_northd_get_thread_count_cmd;
>>  
>>  struct northd_state {
>>      bool had_lock;
>>      bool paused;
>>  };
>>  
>> +#define OVN_MAX_SUPPORTED_THREADS 256
>> +
>>  static const char *ovnnb_db;
>>  static const char *ovnsb_db;
>>  static const char *unixctl_path;
>> @@ -525,7 +530,7 @@ Options:\n\
>>    --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
>>                              (default: %s)\n\
>>    --dry-run                 start in paused state (do not commit db changes)\n\
>> -  --dummy-numa              override default NUMA node and CPU core discovery\n\
>> +  --n-threads=N             specify number of threads\n\
>>    --unixctl=SOCKET          override default control socket name\n\
>>    -h, --help                display this help message\n\
>>    -o, --options             list available options\n\
>> @@ -538,14 +543,14 @@ Options:\n\
>>  
>>  static void
>>  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>> -              bool *paused)
>> +              bool *paused, int *n_threads)
>>  {
>>      enum {
>>          OVN_DAEMON_OPTION_ENUMS,
>>          VLOG_OPTION_ENUMS,
>>          SSL_OPTION_ENUMS,
>>          OPT_DRY_RUN,
>> -        OPT_DUMMY_NUMA,
>> +        OPT_N_THREADS,
>>      };
>>      static const struct option long_options[] = {
>>          {"ovnsb-db", required_argument, NULL, 'd'},
>> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>>          {"options", no_argument, NULL, 'o'},
>>          {"version", no_argument, NULL, 'V'},
>>          {"dry-run", no_argument, NULL, OPT_DRY_RUN},
>> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>> +        {"n-threads", required_argument, NULL, OPT_N_THREADS},
>>          OVN_DAEMON_LONG_OPTIONS,
>>          VLOG_LONG_OPTIONS,
>>          STREAM_SSL_LONG_OPTIONS,
>> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>>              ovn_print_version(0, 0);
>>              exit(EXIT_SUCCESS);
>>  
>> -        case OPT_DUMMY_NUMA:
>> -            ovs_numa_set_dummy(optarg);
>> +        case OPT_N_THREADS:
>> +            *n_threads = strtoul(optarg, NULL, 10);
>> +            if (*n_threads < 1) {
>> +                *n_threads = 1;
>> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
>> +                    "set to : [%s]", *n_threads, optarg);
>> +            }
>> +            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
>> +                *n_threads = OVN_MAX_SUPPORTED_THREADS;
>> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
>> +                    "set to : [%s]", *n_threads, optarg);
>> +            }
>> +            if (*n_threads != 1) {
>> +                VLOG_INFO("Using %d threads", *n_threads);
>> +            }
>>              break;
>>  
>>          case OPT_DRY_RUN:
>> @@ -668,6 +686,7 @@ main(int argc, char *argv[])
>>      struct unixctl_server *unixctl;
>>      int retval;
>>      bool exiting;
>> +    int n_threads = 1;
>>      struct northd_state state = {
>>          .had_lock = false,
>>          .paused = false
>> @@ -677,7 +696,7 @@ main(int argc, char *argv[])
>>      ovs_cmdl_proctitle_init(argc, argv);
>>      ovn_set_program_name(argv[0]);
>>      service_start(&argc, &argv);
>> -    parse_options(argc, argv, &state.paused);
>> +    parse_options(argc, argv, &state.paused, &n_threads);
>>  
>>      daemonize_start(false);
>>  
>> @@ -704,6 +723,12 @@ main(int argc, char *argv[])
>>      unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
>>                               cluster_state_reset_cmd,
>>                               &reset_ovnnb_idl_min_index);
>> +    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1, 1,
>> +                             ovn_northd_set_thread_count_cmd,
>> +                             NULL);
>> +    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
>> +                             ovn_northd_get_thread_count_cmd,
>> +                             NULL);
>>  
>>      daemonize_complete();
>>  
>> @@ -761,6 +786,8 @@ main(int argc, char *argv[])
>>      unsigned int ovnnb_cond_seqno = UINT_MAX;
>>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>>  
>> +    run_update_worker_pool(n_threads);
>> +
>>      /* Main loop. */
>>      exiting = false;
>>  
>> @@ -1017,3 +1044,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
>>      poll_immediate_wake();
>>      unixctl_command_reply(conn, NULL);
>>  }
>> +
>> +static void
>> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
>> +               const char *argv[], void *aux OVS_UNUSED)
>> +{
>> +    int n_threads = atoi(argv[1]);
>> +
>> +    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
>> +        struct ds s = DS_EMPTY_INITIALIZER;
>> +        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
>> +        unixctl_command_reply_error(conn, ds_cstr(&s));
>> +        ds_destroy(&s);
>> +    } else {
>> +        run_update_worker_pool(n_threads);
>> +        unixctl_command_reply(conn, NULL);
>> +    }
>> +}
>> +
>> +static void
>> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
>> +               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
>> +{
>> +    struct ds s = DS_EMPTY_INITIALIZER;
>> +    ds_put_format(&s, "%"PRIuSIZE"\n", get_worker_pool_size());
>> +    unixctl_command_reply(conn, ds_cstr(&s));
>> +    ds_destroy(&s);
>> +}
>> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
>> index d78315c75..c6f0f6251 100644
>> --- a/tests/ovn-macros.at
>> +++ b/tests/ovn-macros.at
>> @@ -170,8 +170,8 @@ ovn_start_northd() {
>>          ovn-northd-ddlog) northd_args="$northd_args --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
>>      esac
>>  
>> -    if test X$NORTHD_DUMMY_NUMA = Xyes; then
>> -        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
>> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
>> +        northd_args="$northd_args --n-threads=4"
>>      fi
>>  
>>      local name=${d_prefix}northd${suffix}
>> @@ -252,10 +252,6 @@ ovn_start () {
>>      else
>>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>>      fi
>> -
>> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
>> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
>> -    fi
>>  }
>>  
>>  # Interconnection networks.
>> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS
>>  
>>  m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
>>  
>> -# Use --dummy-numa if system has low cores and we want to force parallelization
>> -m4_define([NORTHD_DUMMY_NUMA],
>> -  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
>> -     then
>> -       echo "yes"
>> -     else
>> -       echo "no"
>> -     fi)
>> -])
>> -
>>  # Defines a versions of a test with all combinations of northd and
>>  # datapath groups.
>>  m4_define([OVN_FOR_EACH_NORTHD],
>> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD],
>>  # Some tests aren't prepared for dp groups to be enabled.
>>  m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
>>    [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
>> -     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
>> -])])])
>> -
>> -# Test parallelization with dp groups enabled and disabled
>> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
>> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
>> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
>> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
>> -    [[NORTHD_USE_PARALLELIZATION], [yes]
>> -])]])
>> -
>> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
>> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
>> -m4_pushdef(NORTHD_DUMMY_NUMA, [no])
>> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
>> -    [[NORTHD_USE_PARALLELIZATION], [yes]
>> -])]])
>> -
>> -# Use --dummy-numa if system has low cores
>> -m4_define([HOST_HAS_LOW_CORES], [
>> -    if test $(nproc) -lt 4; then
>> -        :
>> -        $1
>> -    else
>> -        :
>> -        $2
>> -    fi
>> -])
>> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
>> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
>> +])])])])
>> +
>> +# Some tests aren't prepared for ddlog to be enabled.
>> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
>> +  [m4_foreach([NORTHD_TYPE], [ovn-northd],
>> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
>> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
>> +])])])])
>>  
>> -m4_define([NORTHD_PARALLELIZATION], [
>> -    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
>> -])
>> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
>> index 17e37fb77..1a9d5ef76 100644
>> --- a/tests/ovn-northd.at
>> +++ b/tests/ovn-northd.at
>> @@ -7174,3 +7174,112 @@ ct_next(ct_state=new|trk);
>>  
>>  AT_CLEANUP
>>  ])
>> +
>> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
>> +AT_SETUP([northd-parallelization unixctl])
>> +ovn_start
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
>> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
>> +])
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
>> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [4
>> +])
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
>> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 0], [2], [],
>> +  [invalid n_threads: 0
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads -1], [2], [],
>> +  [invalid n_threads: -1
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 300], [2], [],
>> +  [invalid n_threads: 300
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads], [2], [],
>> +  ["parallel-build/set-n-threads" command requires at least 1 arguments
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 2], [2], [],
>> +  ["parallel-build/set-n-threads" command takes at most 1 arguments
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CLEANUP
>> +])
>> +
>> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
>> +AT_SETUP([northd-parallelization runtime])
>> +ovn_start
>> +
>> +add_switch_ports() {
>> +    for port in $(seq $1 $2); do
>> +        logical_switch_port=lsp${port}
>> +        check ovn-nbctl lsp-add ls1 $logical_switch_port
>> +        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
>> +    done
>> +}
>> +
>> +# Build some rather heavy config and modify number of threads in the middle
>> +check ovn-nbctl ls-add ls1
>> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
>> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
>> +
>> +check ovn-nbctl lr-add lr1
>> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router options:router-port=lrp0 addresses=dynamic
>> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
>> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
>> +add_switch_ports 1 50
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
>> +add_switch_ports 51 100
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
>> +add_switch_ports 101 150
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
>> +add_switch_ports 151 200
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
>> +add_switch_ports 201 250
>> +check ovn-nbctl --wait=sb sync
>> +
>> +# Run 3 times: one with parallelization enabled, one with disabled, and one while changing
>> +# Compare the flows produced by the three runs
>> +# Ignore IP/MAC addresses
>> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
>> +
>> +# Restart with 1 thread
>> +for port in $(seq 1 250); do
>> +    logical_switch_port=lsp${port}
>> +    check ovn-nbctl lsp-del $logical_switch_port
>> +done
>> +add_switch_ports 1 250
>> +check ovn-nbctl --wait=sb sync
>> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
>> +AT_CHECK([diff flows1 flows2])
>> +
>> +# Restart with with 8 threads
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
>> +for port in $(seq 1 250); do
>> +    logical_switch_port=lsp${port}
>> +    check ovn-nbctl lsp-del $logical_switch_port
>> +done
>> +add_switch_ports 1 250
>> +check ovn-nbctl --wait=sb sync
>> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
>> +AT_CHECK([diff flows1 flows3])
>> +
>> +AT_CLEANUP
>> +])
>
diff mbox series

Patch

diff --git a/NEWS b/NEWS
index 244824e3f..6e489df32 100644
--- a/NEWS
+++ b/NEWS
@@ -9,6 +9,13 @@  Post v22.03.0
     implicit drop behavior on logical switches with ACLs applied.
   - Support (LSP.options:qos_min_rate) to guarantee minimal bandwidth available
     for a logical port.
+  - Changed the way to enable northd parallelization.
+    Removed support for:
+    - use_parallel_build in NBDB.
+    - --dummy-numa in northd cmdline.
+    Added support for:
+    -  --n-threads=<N> in northd cmdline.
+    - set-n-threads/get-n-threads unixctls.
 
 OVN v22.03.0 - 11 Mar 2022
 --------------------------
diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index 7edc4c0b6..c4d8cee16 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -38,14 +38,10 @@  VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
 
 #ifndef OVS_HAS_PARALLEL_HMAP
 
-#define WORKER_SEM_NAME "%x-%p-%x"
+#define WORKER_SEM_NAME "%x-%p-%"PRIxSIZE
 #define MAIN_SEM_NAME "%x-%p-main"
 
-/* These are accessed under mutex inside add_worker_pool().
- * They do not need to be atomic.
- */
 static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
-static bool can_parallelize = false;
 
 /* This is set only in the process of exit and the set is
  * accompanied by a fence. It does not need to be atomic or be
@@ -57,18 +53,18 @@  static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
 
 static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
 
-static int pool_size;
+static size_t pool_size = 1;
 
 static int sembase;
 
 static void worker_pool_hook(void *aux OVS_UNUSED);
-static void setup_worker_pools(bool force);
+static void setup_worker_pools(void);
 static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
                                void *fin_result, void *result_frags,
-                               int index);
+                               size_t index);
 static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
                                void *fin_result, void *result_frags,
-                               int index);
+                               size_t index);
 
 bool
 ovn_stop_parallel_processing(void)
@@ -76,107 +72,184 @@  ovn_stop_parallel_processing(void)
     return workers_must_exit;
 }
 
-bool
-ovn_can_parallelize_hashes(bool force_parallel)
+size_t
+ovn_get_worker_pool_size(void)
 {
-    bool test = false;
+    return pool_size;
+}
 
-    if (atomic_compare_exchange_strong(
-            &initial_pool_setup,
-            &test,
-            true)) {
-        ovs_mutex_lock(&init_mutex);
-        setup_worker_pools(force_parallel);
-        ovs_mutex_unlock(&init_mutex);
+static void
+stop_controls(struct worker_pool *pool)
+{
+    if (pool->controls) {
+        workers_must_exit = true;
+
+        /* unlock threads */
+        for (size_t i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].fire != SEM_FAILED) {
+                sem_post(pool->controls[i].fire);
+            }
+        }
+
+        /* Wait completion */
+        for (size_t i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].worker) {
+                pthread_join(pool->controls[i].worker, NULL);
+                pool->controls[i].worker = 0;
+            }
+        }
+        workers_must_exit = false;
     }
-    return can_parallelize;
 }
 
-struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *))
+static void
+free_controls(struct worker_pool *pool)
+{
+    char sem_name[256];
+    if (pool->controls) {
+        /* Close/unlink semaphores */
+        for (size_t i = 0; i < pool->size; i++) {
+            ovs_mutex_destroy(&pool->controls[i].mutex);
+            if (pool->controls[i].fire != SEM_FAILED) {
+                sem_close(pool->controls[i].fire);
+                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+                sem_unlink(sem_name);
+            } else {
+               /* This and following controls are not initialized */
+                break;
+            }
+        }
+        free(pool->controls);
+        pool->controls = NULL;
+    }
+}
+
+static void
+free_pool(struct worker_pool *pool)
+{
+    char sem_name[256];
+    stop_controls(pool);
+    free_controls(pool);
+    if (pool->done != SEM_FAILED) {
+        sem_close(pool->done);
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+        sem_unlink(sem_name);
+    }
+    free(pool);
+}
+
+static int
+init_controls(struct worker_pool *pool)
 {
-    struct worker_pool *new_pool = NULL;
     struct worker_control *new_control;
+    char sem_name[256];
+
+    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
+    for (size_t i = 0; i < pool->size ; i++) {
+        pool->controls[i].fire = SEM_FAILED;
+    }
+    for (size_t i = 0; i < pool->size; i++) {
+        new_control = &pool->controls[i];
+        new_control->id = i;
+        new_control->done = pool->done;
+        new_control->data = NULL;
+        new_control->pool = pool;
+        new_control->worker = 0;
+        ovs_mutex_init(&new_control->mutex);
+        new_control->finished = ATOMIC_VAR_INIT(false);
+        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+        if (new_control->fire == SEM_FAILED) {
+            free_controls(pool);
+            return -1;
+        }
+    }
+    return 0;
+}
+
+static void
+init_threads(struct worker_pool *pool, void *(*start)(void *))
+{
+    for (size_t i = 0; i < pool_size; i++) {
+        pool->controls[i].worker =
+            ovs_thread_create("worker pool helper", start, &pool->controls[i]);
+    }
+    ovs_list_push_back(&worker_pools, &pool->list_node);
+}
+
+enum pool_update_status
+ovn_update_worker_pool(size_t requested_pool_size,
+                       struct worker_pool **pool, void *(*start)(void *))
+{
     bool test = false;
-    int i;
     char sem_name[256];
 
-    /* Belt and braces - initialize the pool system just in case if
-     * if it is not yet initialized.
-     */
+    if (requested_pool_size == pool_size) {
+        return POOL_UNCHANGED;
+    }
+
     if (atomic_compare_exchange_strong(
             &initial_pool_setup,
             &test,
             true)) {
         ovs_mutex_lock(&init_mutex);
-        setup_worker_pools(false);
+        setup_worker_pools();
         ovs_mutex_unlock(&init_mutex);
     }
-
     ovs_mutex_lock(&init_mutex);
-    if (can_parallelize) {
-        new_pool = xmalloc(sizeof(struct worker_pool));
-        new_pool->size = pool_size;
-        new_pool->controls = NULL;
-        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
-        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
-        if (new_pool->done == SEM_FAILED) {
-            goto cleanup;
-        }
-
-        new_pool->controls =
-            xmalloc(sizeof(struct worker_control) * new_pool->size);
-
-        for (i = 0; i < new_pool->size; i++) {
-            new_control = &new_pool->controls[i];
-            new_control->id = i;
-            new_control->done = new_pool->done;
-            new_control->data = NULL;
-            ovs_mutex_init(&new_control->mutex);
-            new_control->finished = ATOMIC_VAR_INIT(false);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
-            if (new_control->fire == SEM_FAILED) {
+    pool_size = requested_pool_size;
+    VLOG_INFO("Setting thread count to %"PRIuSIZE, pool_size);
+
+    if (*pool == NULL) {
+        if (pool_size > 1) {
+            VLOG_INFO("Creating new pool with size %"PRIuSIZE, pool_size);
+            *pool = xmalloc(sizeof(struct worker_pool));
+            (*pool)->size = pool_size;
+            (*pool)->controls = NULL;
+            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
+            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+            if ((*pool)->done == SEM_FAILED) {
                 goto cleanup;
             }
+            if (init_controls(*pool) == -1) {
+                goto cleanup;
+            }
+            init_threads(*pool, start);
         }
-
-        for (i = 0; i < pool_size; i++) {
-            new_pool->controls[i].worker =
-                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+    } else {
+        if (pool_size > 1) {
+            VLOG_INFO("Changing size of existing pool to %"PRIuSIZE,
+                      pool_size);
+            stop_controls(*pool);
+            free_controls(*pool);
+            ovs_list_remove(&(*pool)->list_node);
+            (*pool)->size = pool_size;
+            if (init_controls(*pool) == -1) {
+                goto cleanup;
+            }
+            init_threads(*pool, start);
+        } else {
+            VLOG_INFO("Deleting existing pool");
+            worker_pool_hook(NULL);
+            *pool = NULL;
         }
-        ovs_list_push_back(&worker_pools, &new_pool->list_node);
     }
     ovs_mutex_unlock(&init_mutex);
-    return new_pool;
-cleanup:
+    return POOL_UPDATED;
 
+cleanup:
     /* Something went wrong when opening semaphores. In this case
      * it is better to shut off parallel procesing altogether
      */
-
-    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
-    can_parallelize = false;
-    if (new_pool->controls) {
-        for (i = 0; i < new_pool->size; i++) {
-            if (new_pool->controls[i].fire != SEM_FAILED) {
-                sem_close(new_pool->controls[i].fire);
-                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-                sem_unlink(sem_name);
-                break; /* semaphores past this one are uninitialized */
-            }
-        }
-    }
-    if (new_pool->done != SEM_FAILED) {
-        sem_close(new_pool->done);
-        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
-        sem_unlink(sem_name);
-    }
+    VLOG_ERR("Failed to initialize parallel processing: %s",
+             ovs_strerror(errno));
+    free_pool(*pool);
+    *pool = NULL;
+    pool_size = 1;
     ovs_mutex_unlock(&init_mutex);
-    return NULL;
+    return POOL_UPDATE_FAILED;
 }
 
-
 /* Initializes 'hmap' as an empty hash table with mask N. */
 void
 ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
@@ -225,9 +298,9 @@  ovn_run_pool_callback(struct worker_pool *pool,
                       void *fin_result, void *result_frags,
                       void (*helper_func)(struct worker_pool *pool,
                                           void *fin_result,
-                                          void *result_frags, int index))
+                                          void *result_frags, size_t index))
 {
-    int index, completed;
+    size_t index, completed;
 
     /* Ensure that all worker threads see the same data as the
      * main thread.
@@ -367,9 +440,7 @@  ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
 
 static void
 worker_pool_hook(void *aux OVS_UNUSED) {
-    int i;
     static struct worker_pool *pool;
-    char sem_name[256];
 
     workers_must_exit = true;
 
@@ -380,55 +451,15 @@  worker_pool_hook(void *aux OVS_UNUSED) {
      */
     atomic_thread_fence(memory_order_acq_rel);
 
-    /* Wake up the workers after the must_exit flag has been set */
-
-    LIST_FOR_EACH (pool, list_node, &worker_pools) {
-        for (i = 0; i < pool->size ; i++) {
-            sem_post(pool->controls[i].fire);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            pthread_join(pool->controls[i].worker, NULL);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            sem_close(pool->controls[i].fire);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
-            sem_unlink(sem_name);
-        }
-        sem_close(pool->done);
-        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
-        sem_unlink(sem_name);
+    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
+        ovs_list_remove(&pool->list_node);
+        free_pool(pool);
     }
 }
 
 static void
-setup_worker_pools(bool force) {
-    int cores, nodes;
-
-    ovs_numa_init();
-    nodes = ovs_numa_get_n_numas();
-    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
-        nodes = 1;
-    }
-    cores = ovs_numa_get_n_cores();
-
-    /* If there is no NUMA config, use 4 cores.
-     * If there is NUMA config use half the cores on
-     * one node so that the OS does not start pushing
-     * threads to other nodes.
-     */
-    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
-        /* If there is no NUMA we can try the ovs-threads routine.
-         * It falls back to sysconf and/or affinity mask.
-         */
-        cores = count_cpu_cores();
-        pool_size = cores;
-    } else {
-        pool_size = cores / nodes;
-    }
-    if ((pool_size < 4) && force) {
-        pool_size = 4;
-    }
-    can_parallelize = (pool_size >= 3);
+setup_worker_pools(void)
+{
     fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
     sembase = random_uint32();
 }
@@ -436,7 +467,7 @@  setup_worker_pools(bool force) {
 static void
 merge_list_results(struct worker_pool *pool OVS_UNUSED,
                    void *fin_result, void *result_frags,
-                   int index)
+                   size_t index)
 {
     struct ovs_list *result = (struct ovs_list *)fin_result;
     struct ovs_list *res_frags = (struct ovs_list *)result_frags;
@@ -450,7 +481,7 @@  merge_list_results(struct worker_pool *pool OVS_UNUSED,
 static void
 merge_hash_results(struct worker_pool *pool OVS_UNUSED,
                    void *fin_result, void *result_frags,
-                   int index)
+                   size_t index)
 {
     struct hmap *result = (struct hmap *)fin_result;
     struct hmap *res_frags = (struct hmap *)result_frags;
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 0f7d68770..72b31b489 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -81,21 +81,30 @@  struct worker_control {
     sem_t *done; /* Work completion semaphore - sem_post on completion. */
     struct ovs_mutex mutex; /* Guards the data. */
     void *data; /* Pointer to data to be processed. */
-    void *workload; /* back-pointer to the worker pool structure. */
     pthread_t worker;
+    struct worker_pool *pool;
 };
 
 struct worker_pool {
-    int size;   /* Number of threads in the pool. */
+    size_t size;   /* Number of threads in the pool. */
     struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
     struct worker_control *controls; /* "Handles" in this pool. */
     sem_t *done; /* Work completion semaphorew. */
 };
 
-/* Add a worker pool for thread function start() which expects a pointer to
- * a worker_control structure as an argument. */
+/* Return pool size; bigger than 1 means parallelization has been enabled. */
+size_t ovn_get_worker_pool_size(void);
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+enum pool_update_status {
+     POOL_UNCHANGED,     /* no change to pool */
+     POOL_UPDATED,       /* pool has been updated */
+     POOL_UPDATE_FAILED, /* pool update failed; parallelization disabled */
+};
+/* Add/delete a worker pool for thread function start() which expects a pointer
+ * to a worker_control structure as an argument. Return true if updated */
+enum pool_update_status ovn_update_worker_pool(size_t requested_pool_size,
+                                               struct worker_pool **,
+                                               void *(*start)(void *));
 
 /* Setting this to true will make all processing threads exit */
 
@@ -140,7 +149,8 @@  void ovn_run_pool_list(struct worker_pool *pool,
 void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
                            void *result_frags,
                            void (*helper_func)(struct worker_pool *pool,
-                           void *fin_result, void *result_frags, int index));
+                           void *fin_result, void *result_frags,
+                           size_t index));
 
 
 /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
@@ -251,17 +261,17 @@  static inline void init_hash_row_locks(struct hashrow_locks *hrl)
     hrl->row_locks = NULL;
 }
 
-bool ovn_can_parallelize_hashes(bool force_parallel);
-
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
  * supplied functions.
  */
+#define update_worker_pool(requested_pool_size, existing_pool, func) \
+    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
 
-#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
+#define get_worker_pool_size() ovn_get_worker_pool_size()
 
-#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
+#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
 
 #define stop_parallel_processing() ovn_stop_parallel_processing()
 
diff --git a/northd/northd.c b/northd/northd.c
index 67c39df88..48426c801 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -59,6 +59,7 @@ 
 VLOG_DEFINE_THIS_MODULE(northd);
 
 static bool controller_event_en;
+static bool lflow_hash_lock_initialized = false;
 
 static bool check_lsp_is_up;
 
@@ -4740,7 +4741,13 @@  ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od,
 /* If this option is 'true' northd will combine logical flows that differ by
  * logical datapath only by creating a datapath group. */
 static bool use_logical_dp_groups = false;
-static bool use_parallel_build = false;
+
+enum {
+    STATE_NULL,               /* parallelization is off */
+    STATE_INIT_HASH_SIZES,    /* parallelization is on; hashes sizing needed */
+    STATE_USE_PARALLELIZATION /* parallelization is on */
+};
+static int parallelization_state = STATE_NULL;
 
 static void
 ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
@@ -4759,7 +4766,8 @@  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
     lflow->ctrl_meter = ctrl_meter;
     lflow->dpg = NULL;
     lflow->where = where;
-    if (use_parallel_build && use_logical_dp_groups) {
+    if ((parallelization_state != STATE_NULL)
+        && use_logical_dp_groups) {
         ovs_mutex_init(&lflow->odg_lock);
     }
 }
@@ -4773,7 +4781,7 @@  ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
         return false;
     }
 
-    if (use_parallel_build) {
+    if (parallelization_state == STATE_USE_PARALLELIZATION) {
         ovs_mutex_lock(&lflow_ref->odg_lock);
         hmapx_add(&lflow_ref->od_group, od);
         ovs_mutex_unlock(&lflow_ref->odg_lock);
@@ -4803,9 +4811,23 @@  static struct ovs_mutex lflow_hash_locks[LFLOW_HASH_LOCK_MASK + 1];
 static void
 lflow_hash_lock_init(void)
 {
-    for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
-        ovs_mutex_init(&lflow_hash_locks[i]);
+    if (!lflow_hash_lock_initialized) {
+        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
+            ovs_mutex_init(&lflow_hash_locks[i]);
+        }
+        lflow_hash_lock_initialized = true;
+    }
+}
+
+static void
+lflow_hash_lock_destroy(void)
+{
+    if (lflow_hash_lock_initialized) {
+        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
+            ovs_mutex_destroy(&lflow_hash_locks[i]);
+        }
     }
+    lflow_hash_lock_initialized = false;
 }
 
 /* This thread-local var is used for parallel lflow building when dp-groups is
@@ -4853,7 +4875,7 @@  do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
                    nullable_xstrdup(ctrl_meter),
                    ovn_lflow_hint(stage_hint), where);
     hmapx_add(&lflow->od_group, od);
-    if (!use_parallel_build) {
+    if (parallelization_state != STATE_USE_PARALLELIZATION) {
         hmap_insert(lflow_map, &lflow->hmap_node, hash);
     } else {
         hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
@@ -4896,7 +4918,8 @@  ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
     struct ovn_lflow *lflow;
 
     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
-    if (use_logical_dp_groups && use_parallel_build) {
+    if (use_logical_dp_groups
+        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
         lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
                                     match, actions, io_port, stage_hint, where,
                                     ctrl_meter);
@@ -4982,6 +5005,10 @@  static void
 ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
 {
     if (lflow) {
+        if ((parallelization_state != STATE_NULL)
+            && use_logical_dp_groups) {
+            ovs_mutex_destroy(&lflow->odg_lock);
+        }
         if (lflows) {
             hmap_remove(lflows, &lflow->hmap_node);
         }
@@ -13925,15 +13952,10 @@  build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
                                       &lsi->actions);
 }
 
-struct lflows_thread_pool {
-    struct worker_pool *pool;
-};
-
 static void *
 build_lflows_thread(void *arg)
 {
     struct worker_control *control = (struct worker_control *) arg;
-    struct lflows_thread_pool *workload;
     struct lswitch_flow_build_info *lsi;
 
     struct ovn_datapath *od;
@@ -13944,17 +13966,16 @@  build_lflows_thread(void *arg)
 
     while (!stop_parallel_processing()) {
         wait_for_work(control);
-        workload = (struct lflows_thread_pool *) control->workload;
         lsi = (struct lswitch_flow_build_info *) control->data;
         if (stop_parallel_processing()) {
             return NULL;
         }
         thread_lflow_counter = 0;
-        if (lsi && workload) {
+        if (lsi) {
             /* Iterate over bucket ThreadID, ThreadID+size, ... */
             for (bnum = control->id;
                     bnum <= lsi->datapaths->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
                     if (stop_parallel_processing()) {
@@ -13965,7 +13986,7 @@  build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->ports->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
                     if (stop_parallel_processing()) {
@@ -13976,7 +13997,7 @@  build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->lbs->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
                     if (stop_parallel_processing()) {
@@ -13997,7 +14018,7 @@  build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->igmp_groups->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (
                         igmp_group, hmap_node, bnum, lsi->igmp_groups) {
@@ -14016,39 +14037,13 @@  build_lflows_thread(void *arg)
     return NULL;
 }
 
-static bool pool_init_done = false;
-static struct lflows_thread_pool *build_lflows_pool = NULL;
-
-static void
-init_lflows_thread_pool(void)
-{
-    int index;
-
-    if (!pool_init_done) {
-        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
-        pool_init_done = true;
-        if (pool) {
-            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
-            build_lflows_pool->pool = pool;
-            for (index = 0; index < build_lflows_pool->pool->size; index++) {
-                build_lflows_pool->pool->controls[index].workload =
-                    build_lflows_pool;
-            }
-        }
-    }
-}
-
-/* TODO: replace hard cutoffs by configurable via commands. These are
- * temporary defines to determine single-thread to multi-thread processing
- * cutoff.
- * Setting to 1 forces "all parallel" lflow build.
- */
+static struct worker_pool *build_lflows_pool = NULL;
 
 static void
 noop_callback(struct worker_pool *pool OVS_UNUSED,
               void *fin_result OVS_UNUSED,
               void *result_frags OVS_UNUSED,
-              int index OVS_UNUSED)
+              size_t index OVS_UNUSED)
 {
     /* Do nothing */
 }
@@ -14088,28 +14083,21 @@  build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    if (use_parallel_build) {
-        init_lflows_thread_pool();
-        if (!can_parallelize_hashes(false)) {
-            use_parallel_build = false;
-        }
-    }
-
-    if (use_parallel_build) {
+    if (parallelization_state == STATE_USE_PARALLELIZATION) {
         struct hmap *lflow_segs;
         struct lswitch_flow_build_info *lsiv;
         int index;
 
-        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
+        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
         if (use_logical_dp_groups) {
             lflow_segs = NULL;
         } else {
-            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
+            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
         }
 
         /* Set up "work chunks" for each thread to work on. */
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             if (use_logical_dp_groups) {
                 /* if dp_groups are in use we lock a shared lflows hash
                  * on a per-bucket level instead of merging hash frags */
@@ -14132,19 +14120,19 @@  build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
             ds_init(&lsiv[index].match);
             ds_init(&lsiv[index].actions);
 
-            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+            build_lflows_pool->controls[index].data = &lsiv[index];
         }
 
         /* Run thread pool. */
         if (use_logical_dp_groups) {
-            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
+            run_pool_callback(build_lflows_pool, NULL, NULL,
                               noop_callback);
-            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
+            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
         } else {
-            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
         }
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             ds_destroy(&lsiv[index].match);
             ds_destroy(&lsiv[index].actions);
         }
@@ -14280,8 +14268,39 @@  ovn_sb_set_lflow_logical_dp_group(
 }
 
 static ssize_t max_seen_lflow_size = 128;
-static bool needs_parallel_init = true;
-static bool reset_parallel = false;
+
+void run_update_worker_pool(int n_threads)
+{
+    /* If number of threads has been updated (or initially set),
+     * update the worker pool. */
+    if (update_worker_pool(n_threads, &build_lflows_pool,
+                           build_lflows_thread) != POOL_UNCHANGED) {
+        /* worker pool was updated */
+        if (get_worker_pool_size() <= 1) {
+            /* destroy potentially created lflow_hash_lock */
+            lflow_hash_lock_destroy();
+            parallelization_state = STATE_NULL;
+        } else if (parallelization_state != STATE_USE_PARALLELIZATION) {
+            if (use_logical_dp_groups) {
+                lflow_hash_lock_init();
+                parallelization_state = STATE_INIT_HASH_SIZES;
+            } else {
+                parallelization_state = STATE_USE_PARALLELIZATION;
+            }
+        }
+    }
+}
+
+static void worker_pool_init_for_ldp(void)
+{
+    /* If parallelization is enabled, make sure locks are initialized
+     * when ldp are used.
+     */
+    if (parallelization_state != STATE_NULL) {
+        lflow_hash_lock_init();
+        parallelization_state = STATE_INIT_HASH_SIZES;
+    }
+}
 
 static void
 build_mcast_groups(struct lflow_input *data,
@@ -14302,30 +14321,18 @@  void build_lflows(struct lflow_input *input_data,
     build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
                        &mcast_groups, &igmp_groups);
 
-    if (reset_parallel) {
-        /* Parallel build was disabled before, we need to
-         * re-enable it. */
-        use_parallel_build = true;
-        reset_parallel = false;
-    }
-
     fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
-    if (use_parallel_build && use_logical_dp_groups &&
-        needs_parallel_init) {
-        lflow_hash_lock_init();
-        needs_parallel_init = false;
-        /* Disable parallel build on first run with dp_groups
-         * to determine the correct sizing of hashes. */
-        use_parallel_build = false;
-        reset_parallel = true;
-    }
     build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
                                     input_data->port_groups, &lflows,
                                     &mcast_groups, &igmp_groups,
                                     input_data->meter_groups, input_data->lbs,
                                     input_data->bfd_connections);
 
+    if (parallelization_state == STATE_INIT_HASH_SIZES) {
+        parallelization_state = STATE_USE_PARALLELIZATION;
+    }
+
     /* Parallel build may result in a suboptimal hash. Resize the
      * hash to a correct size before doing lookups */
 
@@ -15567,12 +15574,13 @@  ovnnb_db_run(struct northd_input *input_data,
 
     smap_destroy(&options);
 
-    use_parallel_build =
-        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
-         can_parallelize_hashes(false));
-
+    bool old_use_ldp = use_logical_dp_groups;
     use_logical_dp_groups = smap_get_bool(&nb->options,
                                           "use_logical_dp_groups", true);
+    if (use_logical_dp_groups && !old_use_ldp) {
+        worker_pool_init_for_ldp();
+    }
+
     use_ct_inv_match = smap_get_bool(&nb->options,
                                      "use_ct_inv_match", true);
 
diff --git a/northd/northd.h b/northd/northd.h
index 2d804a22e..fe8dad03a 100644
--- a/northd/northd.h
+++ b/northd/northd.h
@@ -107,5 +107,6 @@  void build_bfd_table(struct lflow_input *input_data,
                      struct hmap *bfd_connections, struct hmap *ports);
 void bfd_cleanup_connections(struct lflow_input *input_data,
                              struct hmap *bfd_map);
+void run_update_worker_pool(int n_threads);
 
 #endif /* NORTHD_H */
diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
index 718d052cc..e9afda4c6 100644
--- a/northd/ovn-northd-ddlog.c
+++ b/northd/ovn-northd-ddlog.c
@@ -1084,7 +1084,6 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
         SSL_OPTION_ENUMS,
         OPT_DRY_RUN,
         OPT_DDLOG_RECORD,
-        OPT_DUMMY_NUMA,
     };
     static const struct option long_options[] = {
         {"ovnsb-db", required_argument, NULL, 'd'},
@@ -1098,7 +1097,6 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
         OVN_DAEMON_LONG_OPTIONS,
         VLOG_LONG_OPTIONS,
         STREAM_SSL_LONG_OPTIONS,
-        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
         {NULL, 0, NULL, 0},
     };
     char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
@@ -1155,10 +1153,6 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
             *pause = true;
             break;
 
-        case OPT_DUMMY_NUMA:
-            ovs_numa_set_dummy(optarg);
-            break;
-
         case OPT_DDLOG_RECORD:
             record_file = optarg;
             break;
diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
index 73ecfbc2d..077bd1f41 100644
--- a/northd/ovn-northd.8.xml
+++ b/northd/ovn-northd.8.xml
@@ -68,53 +68,26 @@ 
           restarting a process or disturbing a running system.
         </p>
       </dd>
-      <dt><code>--dummy-numa</code></dt>
+      <dt><code>n-threads N</code></dt>
       <dd>
-        <p>
-          Typically, OVS uses sysfs to determine the number of NUMA nodes and
-          CPU cores that are available on a machine. The parallelization code
-          in OVN uses this information to determine if there are enough
-          resources to use parallelization. The current algorithm enables
-          parallelization if the total number of CPU cores divided by the
-          number of NUMA nodes is greater than or equal to four.
-        </p>
-
         <p>
           In certain situations, it may be desirable to enable parallelization
-          on a system that otherwise would not have it allowed. The
-          <code>--dummy-numa</code> option allows for you to fake the NUMA
-          nodes and cores that OVS thinks your system has. The syntax consists
-          of using numbers to represent the NUMA node IDs. The number of times
-          that a NUMA node ID appears represents how many CPU cores that NUMA
-          node contains. So for instance, if you did the following:
-        </p>
-
-        <p>
-          <code>--dummy-numa=0,0,0,0</code>
-        </p>
-
-        <p>
-          it would make OVS assume that you have a single NUMA node with ID 0,
-          and that NUMA node consists of four CPU cores. Similarly, you could
-          do:
-        </p>
-
-        <p>
-          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
+          on a system to decrease latency (at the potential cost of increasing
+          CPU usage).
         </p>
 
         <p>
-          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
-          with six CPU cores.
+          This option will cause ovn-northd to use N threads when building
+          logical flows, when N is within [2-256].
+          If N is 1, parallelization is disabled (default behavior).
+          If N is less than 1, then N is set to 1, parallelization is disabled
+          and a warning is logged.
+          If N is more than 256, then N is set to 256, parallelization is
+          enabled (with 256 threads) and a warning is logged.
         </p>
 
         <p>
-          Currently, the only affect this option has is on whether
-          parallelization can be enabled in ovn-northd. There are no NUMA node
-          or CPU core-specific actions performed by OVN. Setting
-          <code>--dummy-numa</code> in ovn-northd does not affect how other OVS
-          processes on the system (such as ovs-vswitchd) count the number of
-          NUMA nodes and CPU cores; this setting is local to ovn-northd.
+          ovn-northd-ddlog does not support this option.
         </p>
       </dd>
     </dl>
@@ -210,6 +183,27 @@ 
         except for the northbound database client.
       </p>
       </dd>
+
+
+      <dt><code>set-n-threads N</code></dt>
+      <dd>
+      <p>
+        Set the number of threads used for building logical flows.
+        When N is within [2-256], parallelization is enabled.
+        When N is 1 parallelization is disabled.
+        When N is less than 1 or more than 256, an error is returned.
+        If ovn-northd fails to start parallelization (e.g. fails to setup
+        semaphores, parallelization is disabled and an error is returned.
+      </p>
+      </dd>
+
+      <dt><code>get-n-threads</code></dt>
+      <dd>
+      <p>
+        Return the number of threads used for building logical flows.
+      </p>
+      </dd>
+
       </dl>
     </p>
 
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 0a0f85010..b7363b16d 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -41,6 +41,7 @@ 
 #include "unixctl.h"
 #include "util.h"
 #include "openvswitch/vlog.h"
+#include "lib/ovn-parallel-hmap.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_northd);
 
@@ -50,12 +51,16 @@  static unixctl_cb_func ovn_northd_resume;
 static unixctl_cb_func ovn_northd_is_paused;
 static unixctl_cb_func ovn_northd_status;
 static unixctl_cb_func cluster_state_reset_cmd;
+static unixctl_cb_func ovn_northd_set_thread_count_cmd;
+static unixctl_cb_func ovn_northd_get_thread_count_cmd;
 
 struct northd_state {
     bool had_lock;
     bool paused;
 };
 
+#define OVN_MAX_SUPPORTED_THREADS 256
+
 static const char *ovnnb_db;
 static const char *ovnsb_db;
 static const char *unixctl_path;
@@ -525,7 +530,7 @@  Options:\n\
   --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
                             (default: %s)\n\
   --dry-run                 start in paused state (do not commit db changes)\n\
-  --dummy-numa              override default NUMA node and CPU core discovery\n\
+  --n-threads=N             specify number of threads\n\
   --unixctl=SOCKET          override default control socket name\n\
   -h, --help                display this help message\n\
   -o, --options             list available options\n\
@@ -538,14 +543,14 @@  Options:\n\
 
 static void
 parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
-              bool *paused)
+              bool *paused, int *n_threads)
 {
     enum {
         OVN_DAEMON_OPTION_ENUMS,
         VLOG_OPTION_ENUMS,
         SSL_OPTION_ENUMS,
         OPT_DRY_RUN,
-        OPT_DUMMY_NUMA,
+        OPT_N_THREADS,
     };
     static const struct option long_options[] = {
         {"ovnsb-db", required_argument, NULL, 'd'},
@@ -555,7 +560,7 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
         {"options", no_argument, NULL, 'o'},
         {"version", no_argument, NULL, 'V'},
         {"dry-run", no_argument, NULL, OPT_DRY_RUN},
-        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
+        {"n-threads", required_argument, NULL, OPT_N_THREADS},
         OVN_DAEMON_LONG_OPTIONS,
         VLOG_LONG_OPTIONS,
         STREAM_SSL_LONG_OPTIONS,
@@ -611,8 +616,21 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
             ovn_print_version(0, 0);
             exit(EXIT_SUCCESS);
 
-        case OPT_DUMMY_NUMA:
-            ovs_numa_set_dummy(optarg);
+        case OPT_N_THREADS:
+            *n_threads = strtoul(optarg, NULL, 10);
+            if (*n_threads < 1) {
+                *n_threads = 1;
+                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
+                    "set to : [%s]", *n_threads, optarg);
+            }
+            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
+                *n_threads = OVN_MAX_SUPPORTED_THREADS;
+                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
+                    "set to : [%s]", *n_threads, optarg);
+            }
+            if (*n_threads != 1) {
+                VLOG_INFO("Using %d threads", *n_threads);
+            }
             break;
 
         case OPT_DRY_RUN:
@@ -668,6 +686,7 @@  main(int argc, char *argv[])
     struct unixctl_server *unixctl;
     int retval;
     bool exiting;
+    int n_threads = 1;
     struct northd_state state = {
         .had_lock = false,
         .paused = false
@@ -677,7 +696,7 @@  main(int argc, char *argv[])
     ovs_cmdl_proctitle_init(argc, argv);
     ovn_set_program_name(argv[0]);
     service_start(&argc, &argv);
-    parse_options(argc, argv, &state.paused);
+    parse_options(argc, argv, &state.paused, &n_threads);
 
     daemonize_start(false);
 
@@ -704,6 +723,12 @@  main(int argc, char *argv[])
     unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
                              cluster_state_reset_cmd,
                              &reset_ovnnb_idl_min_index);
+    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1, 1,
+                             ovn_northd_set_thread_count_cmd,
+                             NULL);
+    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
+                             ovn_northd_get_thread_count_cmd,
+                             NULL);
 
     daemonize_complete();
 
@@ -761,6 +786,8 @@  main(int argc, char *argv[])
     unsigned int ovnnb_cond_seqno = UINT_MAX;
     unsigned int ovnsb_cond_seqno = UINT_MAX;
 
+    run_update_worker_pool(n_threads);
+
     /* Main loop. */
     exiting = false;
 
@@ -1017,3 +1044,30 @@  cluster_state_reset_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
     poll_immediate_wake();
     unixctl_command_reply(conn, NULL);
 }
+
+static void
+ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
+               const char *argv[], void *aux OVS_UNUSED)
+{
+    int n_threads = atoi(argv[1]);
+
+    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
+        struct ds s = DS_EMPTY_INITIALIZER;
+        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
+        unixctl_command_reply_error(conn, ds_cstr(&s));
+        ds_destroy(&s);
+    } else {
+        run_update_worker_pool(n_threads);
+        unixctl_command_reply(conn, NULL);
+    }
+}
+
+static void
+ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
+               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    struct ds s = DS_EMPTY_INITIALIZER;
+    ds_put_format(&s, "%"PRIuSIZE"\n", get_worker_pool_size());
+    unixctl_command_reply(conn, ds_cstr(&s));
+    ds_destroy(&s);
+}
diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
index d78315c75..c6f0f6251 100644
--- a/tests/ovn-macros.at
+++ b/tests/ovn-macros.at
@@ -170,8 +170,8 @@  ovn_start_northd() {
         ovn-northd-ddlog) northd_args="$northd_args --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
     esac
 
-    if test X$NORTHD_DUMMY_NUMA = Xyes; then
-        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
+    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
+        northd_args="$northd_args --n-threads=4"
     fi
 
     local name=${d_prefix}northd${suffix}
@@ -252,10 +252,6 @@  ovn_start () {
     else
         ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
     fi
-
-    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
-        ovn-nbctl set NB_Global . options:use_parallel_build=true
-    fi
 }
 
 # Interconnection networks.
@@ -749,16 +745,6 @@  OVS_END_SHELL_HELPERS
 
 m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
 
-# Use --dummy-numa if system has low cores and we want to force parallelization
-m4_define([NORTHD_DUMMY_NUMA],
-  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
-     then
-       echo "yes"
-     else
-       echo "no"
-     fi)
-])
-
 # Defines a versions of a test with all combinations of northd and
 # datapath groups.
 m4_define([OVN_FOR_EACH_NORTHD],
@@ -770,35 +756,14 @@  m4_define([OVN_FOR_EACH_NORTHD],
 # Some tests aren't prepared for dp groups to be enabled.
 m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
   [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
-     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
-])])])
-
-# Test parallelization with dp groups enabled and disabled
-m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
-m4_pushdef([NORTHD_TYPE], [ovn_northd])
-m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
-[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
-    [[NORTHD_USE_PARALLELIZATION], [yes]
-])]])
-
-m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
-m4_pushdef([NORTHD_TYPE], [ovn_northd])
-m4_pushdef(NORTHD_DUMMY_NUMA, [no])
-[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
-    [[NORTHD_USE_PARALLELIZATION], [yes]
-])]])
-
-# Use --dummy-numa if system has low cores
-m4_define([HOST_HAS_LOW_CORES], [
-    if test $(nproc) -lt 4; then
-        :
-        $1
-    else
-        :
-        $2
-    fi
-])
+     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
+       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
+])])])])
+
+# Some tests aren't prepared for ddlog to be enabled.
+m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
+  [m4_foreach([NORTHD_TYPE], [ovn-northd],
+     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
+       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
+])])])])
 
-m4_define([NORTHD_PARALLELIZATION], [
-    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
-])
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 17e37fb77..1a9d5ef76 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -7174,3 +7174,112 @@  ct_next(ct_state=new|trk);
 
 AT_CLEANUP
 ])
+
+OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
+AT_SETUP([northd-parallelization unixctl])
+ovn_start
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
+OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
+])
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
+OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [4
+])
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
+OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 0], [2], [],
+  [invalid n_threads: 0
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads -1], [2], [],
+  [invalid n_threads: -1
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 300], [2], [],
+  [invalid n_threads: 300
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads], [2], [],
+  ["parallel-build/set-n-threads" command requires at least 1 arguments
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 2], [2], [],
+  ["parallel-build/set-n-threads" command takes at most 1 arguments
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CLEANUP
+])
+
+OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
+AT_SETUP([northd-parallelization runtime])
+ovn_start
+
+add_switch_ports() {
+    for port in $(seq $1 $2); do
+        logical_switch_port=lsp${port}
+        check ovn-nbctl lsp-add ls1 $logical_switch_port
+        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
+    done
+}
+
+# Build some rather heavy config and modify number of threads in the middle
+check ovn-nbctl ls-add ls1
+check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
+check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
+
+check ovn-nbctl lr-add lr1
+check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router options:router-port=lrp0 addresses=dynamic
+check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
+check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
+add_switch_ports 1 50
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
+add_switch_ports 51 100
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
+add_switch_ports 101 150
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
+add_switch_ports 151 200
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
+add_switch_ports 201 250
+check ovn-nbctl --wait=sb sync
+
+# Run 3 times: one with parallelization enabled, one with disabled, and one while changing
+# Compare the flows produced by the three runs
+# Ignore IP/MAC addresses
+ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
+
+# Restart with 1 thread
+for port in $(seq 1 250); do
+    logical_switch_port=lsp${port}
+    check ovn-nbctl lsp-del $logical_switch_port
+done
+add_switch_ports 1 250
+check ovn-nbctl --wait=sb sync
+ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
+AT_CHECK([diff flows1 flows2])
+
+# Restart with with 8 threads
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
+for port in $(seq 1 250); do
+    logical_switch_port=lsp${port}
+    check ovn-nbctl lsp-del $logical_switch_port
+done
+add_switch_ports 1 250
+check ovn-nbctl --wait=sb sync
+ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
+AT_CHECK([diff flows1 flows3])
+
+AT_CLEANUP
+])