diff mbox series

[ovs-dev] northd: add configuration option for enabling parallelization

Message ID 20220429163126.442843-1-xsimonar@redhat.com
State Superseded
Headers show
Series [ovs-dev] northd: add configuration option for enabling parallelization | expand

Checks

Context Check Description
ovsrobot/apply-robot warning apply and check: warning
ovsrobot/github-robot-_Build_and_Test fail github build: failed
ovsrobot/github-robot-_ovn-kubernetes fail github build: failed

Commit Message

Xavier Simonart April 29, 2022, 4:31 p.m. UTC
This patch is intended to change the way to enable northd lflow build
parallelization, as well as enable runtime change of number of threads.
Before this patch, the following was needed to use parallelization:
- enable parallelization through use_parallel_build in NBDB
- use --dummy-numa to select number of threads.
This second part was needed as otherwise as many threads as cores in the system
were used, while parallelization showed some performance improvement only until
using around 4 (or maybe 8) threads.

With this patch, the number of threads used for lflow parallel build can be
specified either:
- at startup, using --n-threads=<N> as ovn-northd command line option
- using unixctl
If the number of threads specified is > 1, then parallelization is enabled.
If the number is 1, parallelization is disabled.
If the number is < 1 or > 256, parallelization is disabled at startup and
a warning is logged.

Th following unixctl have been added:
- set-n-threads <N>: set the number of treads used.
- get-n-threads: returns the number of threads used
If the number of threads is within <2-256> bounds, parallelization is enabled.
If the number of thread is 1, parallelization is disabled.
Otherwise an error is thrown.

Note that, if set-n-threads failed for any reason (e.g. failure to setup some
semaphore), parallelization is disabled, and get-n-thread will return 1.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
Signed-off-by: Xavier Simonart <xsimonar@redhat.com>
---
 lib/ovn-parallel-hmap.c   | 273 +++++++++++++++++++++-----------------
 lib/ovn-parallel-hmap.h   |  19 +--
 northd/northd.c           | 148 ++++++++++-----------
 northd/northd.h           |   1 +
 northd/ovn-northd-ddlog.c |   6 -
 northd/ovn-northd.8.xml   |  68 +++++-----
 northd/ovn-northd.c       |  68 +++++++++-
 tests/ovn-macros.at       |  59 ++------
 tests/ovn-northd.at       | 109 +++++++++++++++
 9 files changed, 444 insertions(+), 307 deletions(-)

Comments

0-day Robot April 29, 2022, 4:39 p.m. UTC | #1
Bleep bloop.  Greetings Xavier Simonart, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line lacks whitespace around operator
WARNING: Line lacks whitespace around operator
#939 FILE: northd/ovn-northd.c:533:
  --n-threads=N             specify number of threads\n\

Lines checked: 1272, Warnings: 2, Errors: 0


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Dumitru Ceara May 12, 2022, 11:35 a.m. UTC | #2
On 4/29/22 18:31, Xavier Simonart wrote:
> This patch is intended to change the way to enable northd lflow build
> parallelization, as well as enable runtime change of number of threads.
> Before this patch, the following was needed to use parallelization:
> - enable parallelization through use_parallel_build in NBDB
> - use --dummy-numa to select number of threads.
> This second part was needed as otherwise as many threads as cores in the system
> were used, while parallelization showed some performance improvement only until
> using around 4 (or maybe 8) threads.
> 
> With this patch, the number of threads used for lflow parallel build can be
> specified either:
> - at startup, using --n-threads=<N> as ovn-northd command line option
> - using unixctl
> If the number of threads specified is > 1, then parallelization is enabled.
> If the number is 1, parallelization is disabled.
> If the number is < 1 or > 256, parallelization is disabled at startup and
> a warning is logged.
> 
> Th following unixctl have been added:
> - set-n-threads <N>: set the number of treads used.
> - get-n-threads: returns the number of threads used
> If the number of threads is within <2-256> bounds, parallelization is enabled.
> If the number of thread is 1, parallelization is disabled.
> Otherwise an error is thrown.
> 
> Note that, if set-n-threads failed for any reason (e.g. failure to setup some
> semaphore), parallelization is disabled, and get-n-thread will return 1.
> 
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
> Signed-off-by: Xavier Simonart <xsimonar@redhat.com>
> ---

Hi Xavier,

Thanks for working on this, I think it's already a nice improvement on
the previous interface for parallel lflow build!

We probably also need a NEWS entry.

I have a few more minor comments below.

Thanks,
Dumitru

>  lib/ovn-parallel-hmap.c   | 273 +++++++++++++++++++++-----------------
>  lib/ovn-parallel-hmap.h   |  19 +--
>  northd/northd.c           | 148 ++++++++++-----------
>  northd/northd.h           |   1 +
>  northd/ovn-northd-ddlog.c |   6 -
>  northd/ovn-northd.8.xml   |  68 +++++-----
>  northd/ovn-northd.c       |  68 +++++++++-
>  tests/ovn-macros.at       |  59 ++------
>  tests/ovn-northd.at       | 109 +++++++++++++++
>  9 files changed, 444 insertions(+), 307 deletions(-)
> 
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> index 7edc4c0b6..0f5140d25 100644
> --- a/lib/ovn-parallel-hmap.c
> +++ b/lib/ovn-parallel-hmap.c
> @@ -41,11 +41,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>  #define WORKER_SEM_NAME "%x-%p-%x"
>  #define MAIN_SEM_NAME "%x-%p-main"
>  
> -/* These are accessed under mutex inside add_worker_pool().
> - * They do not need to be atomic.
> - */
>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> -static bool can_parallelize = false;
>  
>  /* This is set only in the process of exit and the set is
>   * accompanied by a fence. It does not need to be atomic or be
> @@ -57,12 +53,12 @@ static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>  
>  static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>  
> -static int pool_size;
> +static int pool_size = 1;

Everywhere in the ovn-parallel-hmap.[hc] files we define the pool size
as signed integer.  But that's never the case, the pool size is always >= 1.

Now that you're refactoring and improving all this code, would it be
possible to change the pool sizes to size_t wherever appropriate (e.g.,
in 'struct worker_pool' too)?

>  
>  static int sembase;
>  
>  static void worker_pool_hook(void *aux OVS_UNUSED);
> -static void setup_worker_pools(bool force);
> +static void setup_worker_pools(void);
>  static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>                                 void *fin_result, void *result_frags,
>                                 int index);
> @@ -76,107 +72,182 @@ ovn_stop_parallel_processing(void)
>      return workers_must_exit;
>  }
>  
> -bool
> -ovn_can_parallelize_hashes(bool force_parallel)
> +int
> +ovn_get_worker_pool_size(void)
>  {
> -    bool test = false;
> +    return pool_size;
> +}
>  
> -    if (atomic_compare_exchange_strong(
> -            &initial_pool_setup,
> -            &test,
> -            true)) {
> -        ovs_mutex_lock(&init_mutex);
> -        setup_worker_pools(force_parallel);
> -        ovs_mutex_unlock(&init_mutex);
> +static void
> +stop_controls(struct worker_pool *pool)
> +{
> +    if (pool->controls) {
> +        workers_must_exit = true;
> +
> +        /* unlock threads */
> +        for (int i = 0; i < pool->size ; i++) {

If we change pool->size to be size_t, 'i' can be size_t too (and in a
few more places below).

> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_post(pool->controls[i].fire);
> +            }
> +        }
> +
> +        /* Wait completion */
> +        for (int i = 0; i < pool->size ; i++) {
> +            if (pool->controls[i].worker) {
> +                pthread_join(pool->controls[i].worker, NULL);
> +                pool->controls[i].worker = 0;
> +            }
> +        }
> +        workers_must_exit = false;
> +    }
> +}
> +
> +static void
> +free_controls(struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    if (pool->controls) {
> +        /* Close/unlink semaphores */
> +        for (int i = 0; i < pool->size; i++) {
> +            ovs_mutex_destroy(&pool->controls[i].mutex);
> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_close(pool->controls[i].fire);
> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +                sem_unlink(sem_name);
> +            } else {
> +               /* This and following controls are not initialized */
> +                break;
> +            }
> +        }
> +        free(pool->controls);
> +        pool->controls = NULL;
>      }
> -    return can_parallelize;
>  }
>  
> -struct worker_pool *
> -ovn_add_worker_pool(void *(*start)(void *))
> +static void
> +free_pool(struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    stop_controls(pool);
> +    free_controls(pool);
> +    if (pool->done != SEM_FAILED) {
> +        sem_close(pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +        sem_unlink(sem_name);
> +    }
> +    free(pool);
> +}
> +
> +static int
> +init_controls(struct worker_pool *pool)
>  {
> -    struct worker_pool *new_pool = NULL;
>      struct worker_control *new_control;
> +    char sem_name[256];
> +
> +    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
> +    for (int i = 0; i < pool->size ; i++) {
> +        pool->controls[i].fire = SEM_FAILED;
> +    }
> +    for (int i = 0; i < pool->size; i++) {
> +        new_control = &pool->controls[i];
> +        new_control->id = i;
> +        new_control->done = pool->done;
> +        new_control->data = NULL;
> +        new_control->pool = pool;
> +        new_control->worker = 0;
> +        ovs_mutex_init(&new_control->mutex);
> +        new_control->finished = ATOMIC_VAR_INIT(false);
> +        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +        if (new_control->fire == SEM_FAILED) {
> +            free_controls(pool);
> +            return -1;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static void
> +init_threads(struct worker_pool *pool, void *(*start)(void *))
> +{
> +    for (int i = 0; i < pool_size; i++) {
> +        pool->controls[i].worker =
> +            ovs_thread_create("worker pool helper", start, &pool->controls[i]);
> +    }
> +    ovs_list_push_back(&worker_pools, &pool->list_node);
> +}
> +
> +bool
> +ovn_update_worker_pool(int requested_pool_size,
> +                       struct worker_pool **pool, void *(*start)(void *))
> +{
>      bool test = false;
> -    int i;
>      char sem_name[256];
>  
> -    /* Belt and braces - initialize the pool system just in case if
> -     * if it is not yet initialized.
> -     */
> +    if (requested_pool_size == pool_size) {
> +        return false;
> +    }
> +
>      if (atomic_compare_exchange_strong(
>              &initial_pool_setup,
>              &test,
>              true)) {
>          ovs_mutex_lock(&init_mutex);
> -        setup_worker_pools(false);
> +        setup_worker_pools();
>          ovs_mutex_unlock(&init_mutex);
>      }
> -
>      ovs_mutex_lock(&init_mutex);
> -    if (can_parallelize) {
> -        new_pool = xmalloc(sizeof(struct worker_pool));
> -        new_pool->size = pool_size;
> -        new_pool->controls = NULL;
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> -        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -        if (new_pool->done == SEM_FAILED) {
> -            goto cleanup;
> -        }
> -
> -        new_pool->controls =
> -            xmalloc(sizeof(struct worker_control) * new_pool->size);
> -
> -        for (i = 0; i < new_pool->size; i++) {
> -            new_control = &new_pool->controls[i];
> -            new_control->id = i;
> -            new_control->done = new_pool->done;
> -            new_control->data = NULL;
> -            ovs_mutex_init(&new_control->mutex);
> -            new_control->finished = ATOMIC_VAR_INIT(false);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -            if (new_control->fire == SEM_FAILED) {
> +    pool_size = requested_pool_size;
> +    VLOG_DBG("setting thread count to %d", pool_size);

This can be VLOG_INFO, it's useful information and we should probably
always display it.

> +
> +    if (*pool == NULL) {
> +        if (pool_size > 1) {
> +            VLOG_DBG("Creating new pool with size %d", pool_size);

Same here.

> +            *pool = xmalloc(sizeof(struct worker_pool));
> +            (*pool)->size = pool_size;
> +            (*pool)->controls = NULL;
> +            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
> +            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +            if ((*pool)->done == SEM_FAILED) {
>                  goto cleanup;
>              }
> +            if (init_controls(*pool) == -1) {
> +                goto cleanup;
> +            }
> +            init_threads(*pool, start);
>          }
> -
> -        for (i = 0; i < pool_size; i++) {
> -            new_pool->controls[i].worker =
> -                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> +    } else {
> +        if (pool_size > 1) {
> +            VLOG_DBG("Changing size of existing pool to %d", pool_size);

This too.

> +            stop_controls(*pool);
> +            free_controls(*pool);
> +            ovs_list_remove(&(*pool)->list_node);
> +            (*pool)->size = pool_size;
> +            if (init_controls(*pool) == -1) {
> +                goto cleanup;
> +            }
> +            init_threads(*pool, start);
> +        } else {
> +            VLOG_DBG("Deleting existing pool");

Here too.

> +            worker_pool_hook(NULL);
> +            *pool = NULL;
>          }
> -        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>      }
>      ovs_mutex_unlock(&init_mutex);
> -    return new_pool;
> -cleanup:
> +    return true;
>  
> +cleanup:
>      /* Something went wrong when opening semaphores. In this case
>       * it is better to shut off parallel procesing altogether
>       */
> -
>      VLOG_INFO("Failed to initialize parallel processing, error %d", errno);

This should be VLOG_ERR in my opinion.

> -    can_parallelize = false;
> -    if (new_pool->controls) {
> -        for (i = 0; i < new_pool->size; i++) {
> -            if (new_pool->controls[i].fire != SEM_FAILED) {
> -                sem_close(new_pool->controls[i].fire);
> -                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -                sem_unlink(sem_name);
> -                break; /* semaphores past this one are uninitialized */
> -            }
> -        }
> -    }
> -    if (new_pool->done != SEM_FAILED) {
> -        sem_close(new_pool->done);
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> -        sem_unlink(sem_name);
> -    }
> +    free_pool(*pool);
> +    *pool = NULL;
> +    pool_size = 1;
>      ovs_mutex_unlock(&init_mutex);
> -    return NULL;
> +    return true;
>  }
>  
> -
>  /* Initializes 'hmap' as an empty hash table with mask N. */
>  void
>  ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> @@ -367,9 +438,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>  
>  static void
>  worker_pool_hook(void *aux OVS_UNUSED) {
> -    int i;
>      static struct worker_pool *pool;
> -    char sem_name[256];
>  
>      workers_must_exit = true;
>  
> @@ -380,55 +449,15 @@ worker_pool_hook(void *aux OVS_UNUSED) {
>       */
>      atomic_thread_fence(memory_order_acq_rel);
>  
> -    /* Wake up the workers after the must_exit flag has been set */
> -
> -    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_post(pool->controls[i].fire);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            pthread_join(pool->controls[i].worker, NULL);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_close(pool->controls[i].fire);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> -            sem_unlink(sem_name);
> -        }
> -        sem_close(pool->done);
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> -        sem_unlink(sem_name);
> +    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
> +        ovs_list_remove(&pool->list_node);
> +        free_pool(pool);
>      }
>  }
>  
>  static void
> -setup_worker_pools(bool force) {
> -    int cores, nodes;
> -
> -    ovs_numa_init();
> -    nodes = ovs_numa_get_n_numas();
> -    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> -        nodes = 1;
> -    }
> -    cores = ovs_numa_get_n_cores();
> -
> -    /* If there is no NUMA config, use 4 cores.
> -     * If there is NUMA config use half the cores on
> -     * one node so that the OS does not start pushing
> -     * threads to other nodes.
> -     */
> -    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> -        /* If there is no NUMA we can try the ovs-threads routine.
> -         * It falls back to sysconf and/or affinity mask.
> -         */
> -        cores = count_cpu_cores();
> -        pool_size = cores;
> -    } else {
> -        pool_size = cores / nodes;
> -    }
> -    if ((pool_size < 4) && force) {
> -        pool_size = 4;
> -    }
> -    can_parallelize = (pool_size >= 3);
> +setup_worker_pools(void)
> +{
>      fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>      sembase = random_uint32();
>  }
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> index 0f7d68770..dc00ad635 100644
> --- a/lib/ovn-parallel-hmap.h
> +++ b/lib/ovn-parallel-hmap.h
> @@ -81,8 +81,8 @@ struct worker_control {
>      sem_t *done; /* Work completion semaphore - sem_post on completion. */
>      struct ovs_mutex mutex; /* Guards the data. */
>      void *data; /* Pointer to data to be processed. */
> -    void *workload; /* back-pointer to the worker pool structure. */
>      pthread_t worker;
> +    struct worker_pool *pool;
>  };
>  
>  struct worker_pool {
> @@ -92,10 +92,13 @@ struct worker_pool {
>      sem_t *done; /* Work completion semaphorew. */
>  };
>  
> -/* Add a worker pool for thread function start() which expects a pointer to
> - * a worker_control structure as an argument. */
> +/* Return pool size; bigger than 1 means parallelization has been enabled */

Nit: missing '.' at the end of the comment.

> +int ovn_get_worker_pool_size(void);
>  
> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +/* Add/delete a worker pool for thread function start() which expects a pointer
> + * to a worker_control structure as an argument. Return true if updated */
> +bool ovn_update_worker_pool(int requested_pool_size, struct worker_pool **,
> +                            void *(*start)(void *));
>  
>  /* Setting this to true will make all processing threads exit */
>  
> @@ -251,17 +254,17 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>      hrl->row_locks = NULL;
>  }
>  
> -bool ovn_can_parallelize_hashes(bool force_parallel);
> -
>  /* Use the OVN library functions for stuff which OVS has not defined
>   * If OVS has defined these, they will still compile using the OVN
>   * local names, but will be dropped by the linker in favour of the OVS
>   * supplied functions.
>   */
> +#define update_worker_pool(requested_pool_size, existing_pool, func) \
> +    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
>  
> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
> +#define get_worker_pool_size() ovn_get_worker_pool_size()
>  
> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>  
>  #define stop_parallel_processing() ovn_stop_parallel_processing()
>  
> diff --git a/northd/northd.c b/northd/northd.c
> index a56666297..a113349cd 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -59,6 +59,7 @@
>  VLOG_DEFINE_THIS_MODULE(northd);
>  
>  static bool controller_event_en;
> +static bool lflow_hash_lock_initialized = false;
>  
>  static bool check_lsp_is_up;
>  
> @@ -4673,7 +4674,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od,
>  /* If this option is 'true' northd will combine logical flows that differ by
>   * logical datapath only by creating a datapath group. */
>  static bool use_logical_dp_groups = false;
> -static bool use_parallel_build = false;
> +
> +enum {
> +    STATE_NULL,
> +    STATE_INIT_HASH_SIZES,
> +    STATE_USE_PARALLELIZATION
> +};
> +static int parallelization_state = STATE_NULL;
>  
>  static void
>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
> @@ -4692,7 +4699,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>      lflow->ctrl_meter = ctrl_meter;
>      lflow->dpg = NULL;
>      lflow->where = where;
> -    if (use_parallel_build && use_logical_dp_groups) {
> +    if ((parallelization_state == STATE_USE_PARALLELIZATION)
> +        && use_logical_dp_groups) {
>          ovs_mutex_init(&lflow->odg_lock);
>      }
>  }
> @@ -4706,7 +4714,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
>          return false;
>      }
>  
> -    if (use_parallel_build) {
> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>          ovs_mutex_lock(&lflow_ref->odg_lock);
>          hmapx_add(&lflow_ref->od_group, od);
>          ovs_mutex_unlock(&lflow_ref->odg_lock);
> @@ -4739,6 +4747,18 @@ lflow_hash_lock_init(void)
>      for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>          ovs_mutex_init(&lflow_hash_locks[i]);
>      }
> +    lflow_hash_lock_initialized = true;
> +}
> +
> +static void
> +lflow_hash_lock_destroy(void)
> +{
> +    if (lflow_hash_lock_initialized) {
> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> +            ovs_mutex_destroy(&lflow_hash_locks[i]);
> +        }
> +    }
> +    lflow_hash_lock_initialized = false;
>  }
>  
>  /* This thread-local var is used for parallel lflow building when dp-groups is
> @@ -4786,7 +4806,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
>                     nullable_xstrdup(ctrl_meter),
>                     ovn_lflow_hint(stage_hint), where);
>      hmapx_add(&lflow->od_group, od);
> -    if (!use_parallel_build) {
> +    if (parallelization_state != STATE_USE_PARALLELIZATION) {
>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>      } else {
>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> @@ -4829,7 +4849,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
>      struct ovn_lflow *lflow;
>  
>      ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
> -    if (use_logical_dp_groups && use_parallel_build) {
> +    if (use_logical_dp_groups
> +        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>                                      match, actions, io_port, stage_hint, where,
>                                      ctrl_meter);
> @@ -13708,15 +13729,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>                                        &lsi->actions);
>  }
>  
> -struct lflows_thread_pool {
> -    struct worker_pool *pool;
> -};
> -
>  static void *
>  build_lflows_thread(void *arg)
>  {
>      struct worker_control *control = (struct worker_control *) arg;
> -    struct lflows_thread_pool *workload;
>      struct lswitch_flow_build_info *lsi;
>  
>      struct ovn_datapath *od;
> @@ -13727,17 +13743,16 @@ build_lflows_thread(void *arg)
>  
>      while (!stop_parallel_processing()) {
>          wait_for_work(control);
> -        workload = (struct lflows_thread_pool *) control->workload;
>          lsi = (struct lswitch_flow_build_info *) control->data;
>          if (stop_parallel_processing()) {
>              return NULL;
>          }
>          thread_lflow_counter = 0;
> -        if (lsi && workload) {
> +        if (lsi) {
>              /* Iterate over bucket ThreadID, ThreadID+size, ... */
>              for (bnum = control->id;
>                      bnum <= lsi->datapaths->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
>                      if (stop_parallel_processing()) {
> @@ -13748,7 +13763,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->ports->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
>                      if (stop_parallel_processing()) {
> @@ -13759,7 +13774,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->lbs->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
>                      if (stop_parallel_processing()) {
> @@ -13780,7 +13795,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->igmp_groups->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (
>                          igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> @@ -13799,33 +13814,7 @@ build_lflows_thread(void *arg)
>      return NULL;
>  }
>  
> -static bool pool_init_done = false;
> -static struct lflows_thread_pool *build_lflows_pool = NULL;
> -
> -static void
> -init_lflows_thread_pool(void)
> -{
> -    int index;
> -
> -    if (!pool_init_done) {
> -        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> -        pool_init_done = true;
> -        if (pool) {
> -            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
> -            build_lflows_pool->pool = pool;
> -            for (index = 0; index < build_lflows_pool->pool->size; index++) {
> -                build_lflows_pool->pool->controls[index].workload =
> -                    build_lflows_pool;
> -            }
> -        }
> -    }
> -}
> -
> -/* TODO: replace hard cutoffs by configurable via commands. These are
> - * temporary defines to determine single-thread to multi-thread processing
> - * cutoff.
> - * Setting to 1 forces "all parallel" lflow build.
> - */
> +static struct worker_pool *build_lflows_pool = NULL;
>  
>  static void
>  noop_callback(struct worker_pool *pool OVS_UNUSED,
> @@ -13871,28 +13860,21 @@ build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
>  
>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>  
> -    if (use_parallel_build) {
> -        init_lflows_thread_pool();
> -        if (!can_parallelize_hashes(false)) {
> -            use_parallel_build = false;
> -        }
> -    }
> -
> -    if (use_parallel_build) {
> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>          struct hmap *lflow_segs;
>          struct lswitch_flow_build_info *lsiv;
>          int index;
>  
> -        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
>          if (use_logical_dp_groups) {
>              lflow_segs = NULL;
>          } else {
> -            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
> +            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
>          }
>  
>          /* Set up "work chunks" for each thread to work on. */
>  
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              if (use_logical_dp_groups) {
>                  /* if dp_groups are in use we lock a shared lflows hash
>                   * on a per-bucket level instead of merging hash frags */
> @@ -13915,19 +13897,19 @@ build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
>              ds_init(&lsiv[index].match);
>              ds_init(&lsiv[index].actions);
>  
> -            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +            build_lflows_pool->controls[index].data = &lsiv[index];
>          }
>  
>          /* Run thread pool. */
>          if (use_logical_dp_groups) {
> -            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
> +            run_pool_callback(build_lflows_pool, NULL, NULL,
>                                noop_callback);
> -            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
> +            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
>          } else {
> -            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
>          }
>  
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              ds_destroy(&lsiv[index].match);
>              ds_destroy(&lsiv[index].actions);
>          }
> @@ -14063,8 +14045,21 @@ ovn_sb_set_lflow_logical_dp_group(
>  }
>  
>  static ssize_t max_seen_lflow_size = 128;
> -static bool needs_parallel_init = true;
> -static bool reset_parallel = false;
> +
> +void run_update_worker_pool(int n_threads)
> +{
> +    /* If number of threads has been updated (or initially set),
> +     * update the worker pool */

Nit: missing '.' at the end of the comment.

> +    if (update_worker_pool(n_threads,
> +        &build_lflows_pool, build_lflows_thread)) {

Indentation looks a bit weird to me here.

> +        /* worker pool was updated */
> +        if (get_worker_pool_size() <= 1) {
> +            /* destroy potentially created lflow_hash_lock */
> +            lflow_hash_lock_destroy();
> +            parallelization_state = STATE_NULL;
> +        }
> +    }
> +}
>  
>  static void
>  build_mcast_groups(struct lflow_input *data,
> @@ -14085,24 +14080,23 @@ void build_lflows(struct lflow_input *input_data,
>      build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
>                         &mcast_groups, &igmp_groups);
>  
> -    if (reset_parallel) {
> -        /* Parallel build was disabled before, we need to
> -         * re-enable it. */
> -        use_parallel_build = true;
> -        reset_parallel = false;
> +    if (get_worker_pool_size() > 1) {

Can we avoid relying on get_worker_pool_size() here and instead set the
state directly in run_update_worker_pool?

Something like:

In run_update_worker_pool(), based on the value returned by
get_worker_pool_size() after update_worker_pool(), set the state to:

a. STATE_INIT_HASH_SIZES if pool_size > 1 and also call
lflow_hash_lock_init().
b. STATE_NULL if pool_size <= 1 and also call lflow_hash_lock_destroy().

> +        /* Disable parallel build on first run with dp_groups
> +         * to determine the correct sizing of hashes.
> +         */
> +        if (parallelization_state != STATE_USE_PARALLELIZATION) {
> +            if ((parallelization_state == STATE_NULL)
> +                && (use_logical_dp_groups)) {
> +                lflow_hash_lock_init();
> +                parallelization_state = STATE_INIT_HASH_SIZES;
> +            } else {
> +                parallelization_state = STATE_USE_PARALLELIZATION;
> +            }
> +        }
>      }
>  
>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>  
> -    if (use_parallel_build && use_logical_dp_groups &&
> -        needs_parallel_init) {
> -        lflow_hash_lock_init();
> -        needs_parallel_init = false;
> -        /* Disable parallel build on first run with dp_groups
> -         * to determine the correct sizing of hashes. */
> -        use_parallel_build = false;
> -        reset_parallel = true;
> -    }
>      build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
>                                      input_data->port_groups, &lflows,
>                                      &mcast_groups, &igmp_groups,

Then at this point it should be safe to "advance" state from
STATE_INIT_HASH_SIZES to STATE_USE_PARALLELIZATION.  If I'm reading the
code correctly, this would ensure next runs will use parallel lflow build.

> @@ -15350,10 +15344,6 @@ ovnnb_db_run(struct northd_input *input_data,
>  
>      smap_destroy(&options);
>  
> -    use_parallel_build =
> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
> -         can_parallelize_hashes(false));
> -
>      use_logical_dp_groups = smap_get_bool(&nb->options,
>                                            "use_logical_dp_groups", true);
>      use_ct_inv_match = smap_get_bool(&nb->options,
> diff --git a/northd/northd.h b/northd/northd.h
> index 2d804a22e..fe8dad03a 100644
> --- a/northd/northd.h
> +++ b/northd/northd.h
> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data,
>                       struct hmap *bfd_connections, struct hmap *ports);
>  void bfd_cleanup_connections(struct lflow_input *input_data,
>                               struct hmap *bfd_map);
> +void run_update_worker_pool(int n_threads);
>  
>  #endif /* NORTHD_H */
> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
> index 718d052cc..e9afda4c6 100644
> --- a/northd/ovn-northd-ddlog.c
> +++ b/northd/ovn-northd-ddlog.c
> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>          SSL_OPTION_ENUMS,
>          OPT_DRY_RUN,
>          OPT_DDLOG_RECORD,
> -        OPT_DUMMY_NUMA,
>      };
>      static const struct option long_options[] = {
>          {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>          OVN_DAEMON_LONG_OPTIONS,
>          VLOG_LONG_OPTIONS,
>          STREAM_SSL_LONG_OPTIONS,
> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>          {NULL, 0, NULL, 0},
>      };
>      char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>              *pause = true;
>              break;
>  
> -        case OPT_DUMMY_NUMA:
> -            ovs_numa_set_dummy(optarg);
> -            break;
> -
>          case OPT_DDLOG_RECORD:
>              record_file = optarg;
>              break;
> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
> index 0fe350d0e..b6a5e9121 100644
> --- a/northd/ovn-northd.8.xml
> +++ b/northd/ovn-northd.8.xml
> @@ -68,53 +68,24 @@
>            restarting a process or disturbing a running system.
>          </p>
>        </dd>
> -      <dt><code>--dummy-numa</code></dt>
> +      <dt><code>n-threads N</code></dt>
>        <dd>
> -        <p>
> -          Typically, OVS uses sysfs to determine the number of NUMA nodes and
> -          CPU cores that are available on a machine. The parallelization code
> -          in OVN uses this information to determine if there are enough
> -          resources to use parallelization. The current algorithm enables
> -          parallelization if the total number of CPU cores divided by the
> -          number of NUMA nodes is greater than or equal to four.
> -        </p>
> -
>          <p>
>            In certain situations, it may be desirable to enable parallelization
> -          on a system that otherwise would not have it allowed. The
> -          <code>--dummy-numa</code> option allows for you to fake the NUMA
> -          nodes and cores that OVS thinks your system has. The syntax consists
> -          of using numbers to represent the NUMA node IDs. The number of times
> -          that a NUMA node ID appears represents how many CPU cores that NUMA
> -          node contains. So for instance, if you did the following:
> -        </p>
> -
> -        <p>
> -          <code>--dummy-numa=0,0,0,0</code>
> -        </p>
> -
> -        <p>
> -          it would make OVS assume that you have a single NUMA node with ID 0,
> -          and that NUMA node consists of four CPU cores. Similarly, you could
> -          do:
> -        </p>
> -
> -        <p>
> -          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
> +          on a system to decrease latency (at the potential cost of increasing
> +          CPU usage).
>          </p>
>  
>          <p>
> -          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
> -          with six CPU cores.
> +          This option will cause ovn-northd to use N threads when building
> +          logical flows, when N is within [2-256].
> +          If N is 1, parallelization is disabled (default behavior).
> +          If N is less than 1 or more than 256, then N is set to 1,
> +          parallelization is disabled and a warning is logged.
>          </p>
>  
>          <p>
> -          Currently, the only affect this option has is on whether
> -          parallelization can be enabled in ovn-northd. There are no NUMA node
> -          or CPU core-specific actions performed by OVN. Setting
> -          <code>--dummy-numa</code> in ovn-northd does not affect how other OVS
> -          processes on the system (such as ovs-vswitchd) count the number of
> -          NUMA nodes and CPU cores; this setting is local to ovn-northd.
> +          ovn-northd-ddlog does not support this option.
>          </p>
>        </dd>
>      </dl>
> @@ -210,6 +181,27 @@
>          except for the northbound database client.
>        </p>
>        </dd>
> +
> +
> +      <dt><code>set-n-threads N</code></dt>
> +      <dd>
> +      <p>
> +        Set the number of threads used for building logical flows.
> +        When N is within [2-256], parallelization is enabled.
> +        When N is 1 parallelization is disabled.
> +        When N is less than 1 or more than 256, an error is retuned.
> +        If ovn-northd fails to start parallelization (e.g. fails to setup
> +        semaphores, parallelization is disabled and an error is returned.
> +      </p>
> +      </dd>
> +
> +      <dt><code>get-n-threads</code></dt>
> +      <dd>
> +      <p>
> +        Return the number of threads used for building logical flows.
> +      </p>
> +      </dd>
> +
>        </dl>
>      </p>
>  
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 45b120697..8d60376eb 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -41,6 +41,7 @@
>  #include "unixctl.h"
>  #include "util.h"
>  #include "openvswitch/vlog.h"
> +#include "lib/ovn-parallel-hmap.h"
>  
>  VLOG_DEFINE_THIS_MODULE(ovn_northd);
>  
> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume;
>  static unixctl_cb_func ovn_northd_is_paused;
>  static unixctl_cb_func ovn_northd_status;
>  static unixctl_cb_func cluster_state_reset_cmd;
> +static unixctl_cb_func ovn_northd_set_thread_count_cmd;
> +static unixctl_cb_func ovn_northd_get_thread_count_cmd;
>  
>  struct northd_state {
>      bool had_lock;
>      bool paused;
>  };
>  
> +#define OVN_MAX_SUPPORTED_THREADS 256
> +
>  static const char *ovnnb_db;
>  static const char *ovnsb_db;
>  static const char *unixctl_path;
> @@ -525,7 +530,7 @@ Options:\n\
>    --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
>                              (default: %s)\n\
>    --dry-run                 start in paused state (do not commit db changes)\n\
> -  --dummy-numa              override default NUMA node and CPU core discovery\n\
> +  --n-threads=N             specify number of threads\n\
>    --unixctl=SOCKET          override default control socket name\n\
>    -h, --help                display this help message\n\
>    -o, --options             list available options\n\
> @@ -538,14 +543,14 @@ Options:\n\
>  
>  static void
>  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
> -              bool *paused)
> +              bool *paused, int *n_threads)
>  {
>      enum {
>          OVN_DAEMON_OPTION_ENUMS,
>          VLOG_OPTION_ENUMS,
>          SSL_OPTION_ENUMS,
>          OPT_DRY_RUN,
> -        OPT_DUMMY_NUMA,
> +        OPT_N_THREADS,
>      };
>      static const struct option long_options[] = {
>          {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>          {"options", no_argument, NULL, 'o'},
>          {"version", no_argument, NULL, 'V'},
>          {"dry-run", no_argument, NULL, OPT_DRY_RUN},
> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
> +        {"n-threads", required_argument, NULL, OPT_N_THREADS},
>          OVN_DAEMON_LONG_OPTIONS,
>          VLOG_LONG_OPTIONS,
>          STREAM_SSL_LONG_OPTIONS,
> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>              ovn_print_version(0, 0);
>              exit(EXIT_SUCCESS);
>  
> -        case OPT_DUMMY_NUMA:
> -            ovs_numa_set_dummy(optarg);
> +        case OPT_N_THREADS:
> +            *n_threads = atoi(optarg);

I'd use strtoul() instead.

> +            if (*n_threads < 1) {
> +                *n_threads = 1;
> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
> +                    "set to : [%s]", *n_threads, optarg);
> +            }
> +            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
> +                *n_threads = OVN_MAX_SUPPORTED_THREADS;
> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
> +                    "set to : [%s]", *n_threads, optarg);
> +            }
> +            if (*n_threads != 1) {
> +                VLOG_INFO("Using %d threads", *n_threads);
> +            }
>              break;
>  
>          case OPT_DRY_RUN:
> @@ -668,6 +686,7 @@ main(int argc, char *argv[])
>      struct unixctl_server *unixctl;
>      int retval;
>      bool exiting;
> +    int n_threads = 1;
>      struct northd_state state = {
>          .had_lock = false,
>          .paused = false
> @@ -677,7 +696,7 @@ main(int argc, char *argv[])
>      ovs_cmdl_proctitle_init(argc, argv);
>      ovn_set_program_name(argv[0]);
>      service_start(&argc, &argv);
> -    parse_options(argc, argv, &state.paused);
> +    parse_options(argc, argv, &state.paused, &n_threads);
>  
>      daemonize_start(false);
>  
> @@ -704,6 +723,12 @@ main(int argc, char *argv[])
>      unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
>                               cluster_state_reset_cmd,
>                               &reset_ovnnb_idl_min_index);
> +    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1, 1,
> +                             ovn_northd_set_thread_count_cmd,
> +                             NULL);
> +    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
> +                             ovn_northd_get_thread_count_cmd,
> +                             NULL);
>  
>      daemonize_complete();
>  
> @@ -753,6 +778,8 @@ main(int argc, char *argv[])
>      unsigned int ovnnb_cond_seqno = UINT_MAX;
>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>  
> +    run_update_worker_pool(n_threads);
> +
>      /* Main loop. */
>      exiting = false;
>  
> @@ -1009,3 +1036,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
>      poll_immediate_wake();
>      unixctl_command_reply(conn, NULL);
>  }
> +
> +static void
> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
> +               const char *argv[], void *aux OVS_UNUSED)
> +{
> +    int n_threads = atoi(argv[1]);
> +
> +    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
> +        struct ds s = DS_EMPTY_INITIALIZER;
> +        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
> +        unixctl_command_reply_error(conn, ds_cstr(&s));
> +        ds_destroy(&s);
> +    } else {
> +        run_update_worker_pool(n_threads);
> +        unixctl_command_reply(conn, NULL);
> +    }
> +}
> +
> +static void
> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
> +               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
> +{
> +    struct ds s = DS_EMPTY_INITIALIZER;
> +    ds_put_format(&s, "%d\n", get_worker_pool_size());
> +    unixctl_command_reply(conn, ds_cstr(&s));
> +    ds_destroy(&s);
> +}
> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
> index d78315c75..c6f0f6251 100644
> --- a/tests/ovn-macros.at
> +++ b/tests/ovn-macros.at
> @@ -170,8 +170,8 @@ ovn_start_northd() {
>          ovn-northd-ddlog) northd_args="$northd_args --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
>      esac
>  
> -    if test X$NORTHD_DUMMY_NUMA = Xyes; then
> -        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> +        northd_args="$northd_args --n-threads=4"
>      fi
>  
>      local name=${d_prefix}northd${suffix}
> @@ -252,10 +252,6 @@ ovn_start () {
>      else
>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>      fi
> -
> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
> -    fi
>  }
>  
>  # Interconnection networks.
> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS
>  
>  m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
>  
> -# Use --dummy-numa if system has low cores and we want to force parallelization
> -m4_define([NORTHD_DUMMY_NUMA],
> -  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
> -     then
> -       echo "yes"
> -     else
> -       echo "no"
> -     fi)
> -])
> -
>  # Defines a versions of a test with all combinations of northd and
>  # datapath groups.
>  m4_define([OVN_FOR_EACH_NORTHD],
> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD],
>  # Some tests aren't prepared for dp groups to be enabled.
>  m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
>    [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
> -     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
> -])])])
> -
> -# Test parallelization with dp groups enabled and disabled
> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> -    [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [no])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> -    [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -# Use --dummy-numa if system has low cores
> -m4_define([HOST_HAS_LOW_CORES], [
> -    if test $(nproc) -lt 4; then
> -        :
> -        $1
> -    else
> -        :
> -        $2
> -    fi
> -])
> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
> +
> +# Some tests aren't prepared for ddlog to be enabled.
> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
> +  [m4_foreach([NORTHD_TYPE], [ovn-northd],
> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
>  
> -m4_define([NORTHD_PARALLELIZATION], [
> -    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
> -])
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 69ad85533..798bb5552 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -7158,3 +7158,112 @@ ct_next(ct_state=new|trk);
>  
>  AT_CLEANUP
>  ])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization unixctl])
> +ovn_start
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [4
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 0], [2], [],
> +  [invalid n_threads: 0
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads -1], [2], [],
> +  [invalid n_threads: -1
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 300], [2], [],
> +  [invalid n_threads: 300
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads], [2], [],
> +  ["parallel-build/set-n-threads" command requires at least 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 2], [2], [],
> +  ["parallel-build/set-n-threads" command takes at most 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CLEANUP
> +])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization runtime])
> +ovn_start
> +
> +add_switch_ports() {
> +    for port in $(seq $1 $2); do
> +        logical_switch_port=lsp${port}
> +        check ovn-nbctl lsp-add ls1 $logical_switch_port
> +        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
> +    done
> +}
> +
> +# Build some rather heavy config and modify number of threads in the middle
> +check ovn-nbctl ls-add ls1
> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
> +
> +check ovn-nbctl lr-add lr1
> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router options:router-port=lrp0 addresses=dynamic
> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
> +add_switch_ports 1 50
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 51 100
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +add_switch_ports 101 150
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 151 200
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +add_switch_ports 201 250
> +check ovn-nbctl --wait=sb sync
> +
> +# Run 3 times: one with parallelization enabled, one with disabled, and one while changing
> +# Compare the flows produced by the three runs
> +# Ignore IP/MAC addresses
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
> +
> +# Restart with 1 thread
> +for port in $(seq 1 250); do
> +    logical_switch_port=lsp${port}
> +    check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
> +AT_CHECK([diff flows1 flows2])
> +
> +# Restart with with 8 threads
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +for port in $(seq 1 250); do
> +    logical_switch_port=lsp${port}
> +    check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
> +AT_CHECK([diff flows1 flows3])
> +
> +AT_CLEANUP
> +])
diff mbox series

Patch

diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index 7edc4c0b6..0f5140d25 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -41,11 +41,7 @@  VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
 #define WORKER_SEM_NAME "%x-%p-%x"
 #define MAIN_SEM_NAME "%x-%p-main"
 
-/* These are accessed under mutex inside add_worker_pool().
- * They do not need to be atomic.
- */
 static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
-static bool can_parallelize = false;
 
 /* This is set only in the process of exit and the set is
  * accompanied by a fence. It does not need to be atomic or be
@@ -57,12 +53,12 @@  static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
 
 static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
 
-static int pool_size;
+static int pool_size = 1;
 
 static int sembase;
 
 static void worker_pool_hook(void *aux OVS_UNUSED);
-static void setup_worker_pools(bool force);
+static void setup_worker_pools(void);
 static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
                                void *fin_result, void *result_frags,
                                int index);
@@ -76,107 +72,182 @@  ovn_stop_parallel_processing(void)
     return workers_must_exit;
 }
 
-bool
-ovn_can_parallelize_hashes(bool force_parallel)
+int
+ovn_get_worker_pool_size(void)
 {
-    bool test = false;
+    return pool_size;
+}
 
-    if (atomic_compare_exchange_strong(
-            &initial_pool_setup,
-            &test,
-            true)) {
-        ovs_mutex_lock(&init_mutex);
-        setup_worker_pools(force_parallel);
-        ovs_mutex_unlock(&init_mutex);
+static void
+stop_controls(struct worker_pool *pool)
+{
+    if (pool->controls) {
+        workers_must_exit = true;
+
+        /* unlock threads */
+        for (int i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].fire != SEM_FAILED) {
+                sem_post(pool->controls[i].fire);
+            }
+        }
+
+        /* Wait completion */
+        for (int i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].worker) {
+                pthread_join(pool->controls[i].worker, NULL);
+                pool->controls[i].worker = 0;
+            }
+        }
+        workers_must_exit = false;
+    }
+}
+
+static void
+free_controls(struct worker_pool *pool)
+{
+    char sem_name[256];
+    if (pool->controls) {
+        /* Close/unlink semaphores */
+        for (int i = 0; i < pool->size; i++) {
+            ovs_mutex_destroy(&pool->controls[i].mutex);
+            if (pool->controls[i].fire != SEM_FAILED) {
+                sem_close(pool->controls[i].fire);
+                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+                sem_unlink(sem_name);
+            } else {
+               /* This and following controls are not initialized */
+                break;
+            }
+        }
+        free(pool->controls);
+        pool->controls = NULL;
     }
-    return can_parallelize;
 }
 
-struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *))
+static void
+free_pool(struct worker_pool *pool)
+{
+    char sem_name[256];
+    stop_controls(pool);
+    free_controls(pool);
+    if (pool->done != SEM_FAILED) {
+        sem_close(pool->done);
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+        sem_unlink(sem_name);
+    }
+    free(pool);
+}
+
+static int
+init_controls(struct worker_pool *pool)
 {
-    struct worker_pool *new_pool = NULL;
     struct worker_control *new_control;
+    char sem_name[256];
+
+    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
+    for (int i = 0; i < pool->size ; i++) {
+        pool->controls[i].fire = SEM_FAILED;
+    }
+    for (int i = 0; i < pool->size; i++) {
+        new_control = &pool->controls[i];
+        new_control->id = i;
+        new_control->done = pool->done;
+        new_control->data = NULL;
+        new_control->pool = pool;
+        new_control->worker = 0;
+        ovs_mutex_init(&new_control->mutex);
+        new_control->finished = ATOMIC_VAR_INIT(false);
+        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+        if (new_control->fire == SEM_FAILED) {
+            free_controls(pool);
+            return -1;
+        }
+    }
+    return 0;
+}
+
+static void
+init_threads(struct worker_pool *pool, void *(*start)(void *))
+{
+    for (int i = 0; i < pool_size; i++) {
+        pool->controls[i].worker =
+            ovs_thread_create("worker pool helper", start, &pool->controls[i]);
+    }
+    ovs_list_push_back(&worker_pools, &pool->list_node);
+}
+
+bool
+ovn_update_worker_pool(int requested_pool_size,
+                       struct worker_pool **pool, void *(*start)(void *))
+{
     bool test = false;
-    int i;
     char sem_name[256];
 
-    /* Belt and braces - initialize the pool system just in case if
-     * if it is not yet initialized.
-     */
+    if (requested_pool_size == pool_size) {
+        return false;
+    }
+
     if (atomic_compare_exchange_strong(
             &initial_pool_setup,
             &test,
             true)) {
         ovs_mutex_lock(&init_mutex);
-        setup_worker_pools(false);
+        setup_worker_pools();
         ovs_mutex_unlock(&init_mutex);
     }
-
     ovs_mutex_lock(&init_mutex);
-    if (can_parallelize) {
-        new_pool = xmalloc(sizeof(struct worker_pool));
-        new_pool->size = pool_size;
-        new_pool->controls = NULL;
-        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
-        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
-        if (new_pool->done == SEM_FAILED) {
-            goto cleanup;
-        }
-
-        new_pool->controls =
-            xmalloc(sizeof(struct worker_control) * new_pool->size);
-
-        for (i = 0; i < new_pool->size; i++) {
-            new_control = &new_pool->controls[i];
-            new_control->id = i;
-            new_control->done = new_pool->done;
-            new_control->data = NULL;
-            ovs_mutex_init(&new_control->mutex);
-            new_control->finished = ATOMIC_VAR_INIT(false);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
-            if (new_control->fire == SEM_FAILED) {
+    pool_size = requested_pool_size;
+    VLOG_DBG("setting thread count to %d", pool_size);
+
+    if (*pool == NULL) {
+        if (pool_size > 1) {
+            VLOG_DBG("Creating new pool with size %d", pool_size);
+            *pool = xmalloc(sizeof(struct worker_pool));
+            (*pool)->size = pool_size;
+            (*pool)->controls = NULL;
+            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
+            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+            if ((*pool)->done == SEM_FAILED) {
                 goto cleanup;
             }
+            if (init_controls(*pool) == -1) {
+                goto cleanup;
+            }
+            init_threads(*pool, start);
         }
-
-        for (i = 0; i < pool_size; i++) {
-            new_pool->controls[i].worker =
-                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+    } else {
+        if (pool_size > 1) {
+            VLOG_DBG("Changing size of existing pool to %d", pool_size);
+            stop_controls(*pool);
+            free_controls(*pool);
+            ovs_list_remove(&(*pool)->list_node);
+            (*pool)->size = pool_size;
+            if (init_controls(*pool) == -1) {
+                goto cleanup;
+            }
+            init_threads(*pool, start);
+        } else {
+            VLOG_DBG("Deleting existing pool");
+            worker_pool_hook(NULL);
+            *pool = NULL;
         }
-        ovs_list_push_back(&worker_pools, &new_pool->list_node);
     }
     ovs_mutex_unlock(&init_mutex);
-    return new_pool;
-cleanup:
+    return true;
 
+cleanup:
     /* Something went wrong when opening semaphores. In this case
      * it is better to shut off parallel procesing altogether
      */
-
     VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
-    can_parallelize = false;
-    if (new_pool->controls) {
-        for (i = 0; i < new_pool->size; i++) {
-            if (new_pool->controls[i].fire != SEM_FAILED) {
-                sem_close(new_pool->controls[i].fire);
-                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-                sem_unlink(sem_name);
-                break; /* semaphores past this one are uninitialized */
-            }
-        }
-    }
-    if (new_pool->done != SEM_FAILED) {
-        sem_close(new_pool->done);
-        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
-        sem_unlink(sem_name);
-    }
+    free_pool(*pool);
+    *pool = NULL;
+    pool_size = 1;
     ovs_mutex_unlock(&init_mutex);
-    return NULL;
+    return true;
 }
 
-
 /* Initializes 'hmap' as an empty hash table with mask N. */
 void
 ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
@@ -367,9 +438,7 @@  ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
 
 static void
 worker_pool_hook(void *aux OVS_UNUSED) {
-    int i;
     static struct worker_pool *pool;
-    char sem_name[256];
 
     workers_must_exit = true;
 
@@ -380,55 +449,15 @@  worker_pool_hook(void *aux OVS_UNUSED) {
      */
     atomic_thread_fence(memory_order_acq_rel);
 
-    /* Wake up the workers after the must_exit flag has been set */
-
-    LIST_FOR_EACH (pool, list_node, &worker_pools) {
-        for (i = 0; i < pool->size ; i++) {
-            sem_post(pool->controls[i].fire);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            pthread_join(pool->controls[i].worker, NULL);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            sem_close(pool->controls[i].fire);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
-            sem_unlink(sem_name);
-        }
-        sem_close(pool->done);
-        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
-        sem_unlink(sem_name);
+    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
+        ovs_list_remove(&pool->list_node);
+        free_pool(pool);
     }
 }
 
 static void
-setup_worker_pools(bool force) {
-    int cores, nodes;
-
-    ovs_numa_init();
-    nodes = ovs_numa_get_n_numas();
-    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
-        nodes = 1;
-    }
-    cores = ovs_numa_get_n_cores();
-
-    /* If there is no NUMA config, use 4 cores.
-     * If there is NUMA config use half the cores on
-     * one node so that the OS does not start pushing
-     * threads to other nodes.
-     */
-    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
-        /* If there is no NUMA we can try the ovs-threads routine.
-         * It falls back to sysconf and/or affinity mask.
-         */
-        cores = count_cpu_cores();
-        pool_size = cores;
-    } else {
-        pool_size = cores / nodes;
-    }
-    if ((pool_size < 4) && force) {
-        pool_size = 4;
-    }
-    can_parallelize = (pool_size >= 3);
+setup_worker_pools(void)
+{
     fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
     sembase = random_uint32();
 }
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 0f7d68770..dc00ad635 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -81,8 +81,8 @@  struct worker_control {
     sem_t *done; /* Work completion semaphore - sem_post on completion. */
     struct ovs_mutex mutex; /* Guards the data. */
     void *data; /* Pointer to data to be processed. */
-    void *workload; /* back-pointer to the worker pool structure. */
     pthread_t worker;
+    struct worker_pool *pool;
 };
 
 struct worker_pool {
@@ -92,10 +92,13 @@  struct worker_pool {
     sem_t *done; /* Work completion semaphorew. */
 };
 
-/* Add a worker pool for thread function start() which expects a pointer to
- * a worker_control structure as an argument. */
+/* Return pool size; bigger than 1 means parallelization has been enabled */
+int ovn_get_worker_pool_size(void);
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+/* Add/delete a worker pool for thread function start() which expects a pointer
+ * to a worker_control structure as an argument. Return true if updated */
+bool ovn_update_worker_pool(int requested_pool_size, struct worker_pool **,
+                            void *(*start)(void *));
 
 /* Setting this to true will make all processing threads exit */
 
@@ -251,17 +254,17 @@  static inline void init_hash_row_locks(struct hashrow_locks *hrl)
     hrl->row_locks = NULL;
 }
 
-bool ovn_can_parallelize_hashes(bool force_parallel);
-
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
  * supplied functions.
  */
+#define update_worker_pool(requested_pool_size, existing_pool, func) \
+    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
 
-#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
+#define get_worker_pool_size() ovn_get_worker_pool_size()
 
-#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
+#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
 
 #define stop_parallel_processing() ovn_stop_parallel_processing()
 
diff --git a/northd/northd.c b/northd/northd.c
index a56666297..a113349cd 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -59,6 +59,7 @@ 
 VLOG_DEFINE_THIS_MODULE(northd);
 
 static bool controller_event_en;
+static bool lflow_hash_lock_initialized = false;
 
 static bool check_lsp_is_up;
 
@@ -4673,7 +4674,13 @@  ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od,
 /* If this option is 'true' northd will combine logical flows that differ by
  * logical datapath only by creating a datapath group. */
 static bool use_logical_dp_groups = false;
-static bool use_parallel_build = false;
+
+enum {
+    STATE_NULL,
+    STATE_INIT_HASH_SIZES,
+    STATE_USE_PARALLELIZATION
+};
+static int parallelization_state = STATE_NULL;
 
 static void
 ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
@@ -4692,7 +4699,8 @@  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
     lflow->ctrl_meter = ctrl_meter;
     lflow->dpg = NULL;
     lflow->where = where;
-    if (use_parallel_build && use_logical_dp_groups) {
+    if ((parallelization_state == STATE_USE_PARALLELIZATION)
+        && use_logical_dp_groups) {
         ovs_mutex_init(&lflow->odg_lock);
     }
 }
@@ -4706,7 +4714,7 @@  ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
         return false;
     }
 
-    if (use_parallel_build) {
+    if (parallelization_state == STATE_USE_PARALLELIZATION) {
         ovs_mutex_lock(&lflow_ref->odg_lock);
         hmapx_add(&lflow_ref->od_group, od);
         ovs_mutex_unlock(&lflow_ref->odg_lock);
@@ -4739,6 +4747,18 @@  lflow_hash_lock_init(void)
     for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
         ovs_mutex_init(&lflow_hash_locks[i]);
     }
+    lflow_hash_lock_initialized = true;
+}
+
+static void
+lflow_hash_lock_destroy(void)
+{
+    if (lflow_hash_lock_initialized) {
+        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
+            ovs_mutex_destroy(&lflow_hash_locks[i]);
+        }
+    }
+    lflow_hash_lock_initialized = false;
 }
 
 /* This thread-local var is used for parallel lflow building when dp-groups is
@@ -4786,7 +4806,7 @@  do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
                    nullable_xstrdup(ctrl_meter),
                    ovn_lflow_hint(stage_hint), where);
     hmapx_add(&lflow->od_group, od);
-    if (!use_parallel_build) {
+    if (parallelization_state != STATE_USE_PARALLELIZATION) {
         hmap_insert(lflow_map, &lflow->hmap_node, hash);
     } else {
         hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
@@ -4829,7 +4849,8 @@  ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
     struct ovn_lflow *lflow;
 
     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
-    if (use_logical_dp_groups && use_parallel_build) {
+    if (use_logical_dp_groups
+        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
         lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
                                     match, actions, io_port, stage_hint, where,
                                     ctrl_meter);
@@ -13708,15 +13729,10 @@  build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
                                       &lsi->actions);
 }
 
-struct lflows_thread_pool {
-    struct worker_pool *pool;
-};
-
 static void *
 build_lflows_thread(void *arg)
 {
     struct worker_control *control = (struct worker_control *) arg;
-    struct lflows_thread_pool *workload;
     struct lswitch_flow_build_info *lsi;
 
     struct ovn_datapath *od;
@@ -13727,17 +13743,16 @@  build_lflows_thread(void *arg)
 
     while (!stop_parallel_processing()) {
         wait_for_work(control);
-        workload = (struct lflows_thread_pool *) control->workload;
         lsi = (struct lswitch_flow_build_info *) control->data;
         if (stop_parallel_processing()) {
             return NULL;
         }
         thread_lflow_counter = 0;
-        if (lsi && workload) {
+        if (lsi) {
             /* Iterate over bucket ThreadID, ThreadID+size, ... */
             for (bnum = control->id;
                     bnum <= lsi->datapaths->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
                     if (stop_parallel_processing()) {
@@ -13748,7 +13763,7 @@  build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->ports->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
                     if (stop_parallel_processing()) {
@@ -13759,7 +13774,7 @@  build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->lbs->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
                     if (stop_parallel_processing()) {
@@ -13780,7 +13795,7 @@  build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->igmp_groups->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (
                         igmp_group, hmap_node, bnum, lsi->igmp_groups) {
@@ -13799,33 +13814,7 @@  build_lflows_thread(void *arg)
     return NULL;
 }
 
-static bool pool_init_done = false;
-static struct lflows_thread_pool *build_lflows_pool = NULL;
-
-static void
-init_lflows_thread_pool(void)
-{
-    int index;
-
-    if (!pool_init_done) {
-        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
-        pool_init_done = true;
-        if (pool) {
-            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
-            build_lflows_pool->pool = pool;
-            for (index = 0; index < build_lflows_pool->pool->size; index++) {
-                build_lflows_pool->pool->controls[index].workload =
-                    build_lflows_pool;
-            }
-        }
-    }
-}
-
-/* TODO: replace hard cutoffs by configurable via commands. These are
- * temporary defines to determine single-thread to multi-thread processing
- * cutoff.
- * Setting to 1 forces "all parallel" lflow build.
- */
+static struct worker_pool *build_lflows_pool = NULL;
 
 static void
 noop_callback(struct worker_pool *pool OVS_UNUSED,
@@ -13871,28 +13860,21 @@  build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    if (use_parallel_build) {
-        init_lflows_thread_pool();
-        if (!can_parallelize_hashes(false)) {
-            use_parallel_build = false;
-        }
-    }
-
-    if (use_parallel_build) {
+    if (parallelization_state == STATE_USE_PARALLELIZATION) {
         struct hmap *lflow_segs;
         struct lswitch_flow_build_info *lsiv;
         int index;
 
-        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
+        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
         if (use_logical_dp_groups) {
             lflow_segs = NULL;
         } else {
-            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
+            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
         }
 
         /* Set up "work chunks" for each thread to work on. */
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             if (use_logical_dp_groups) {
                 /* if dp_groups are in use we lock a shared lflows hash
                  * on a per-bucket level instead of merging hash frags */
@@ -13915,19 +13897,19 @@  build_lswitch_and_lrouter_flows(const struct hmap *datapaths,
             ds_init(&lsiv[index].match);
             ds_init(&lsiv[index].actions);
 
-            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+            build_lflows_pool->controls[index].data = &lsiv[index];
         }
 
         /* Run thread pool. */
         if (use_logical_dp_groups) {
-            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
+            run_pool_callback(build_lflows_pool, NULL, NULL,
                               noop_callback);
-            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
+            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
         } else {
-            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
         }
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             ds_destroy(&lsiv[index].match);
             ds_destroy(&lsiv[index].actions);
         }
@@ -14063,8 +14045,21 @@  ovn_sb_set_lflow_logical_dp_group(
 }
 
 static ssize_t max_seen_lflow_size = 128;
-static bool needs_parallel_init = true;
-static bool reset_parallel = false;
+
+void run_update_worker_pool(int n_threads)
+{
+    /* If number of threads has been updated (or initially set),
+     * update the worker pool */
+    if (update_worker_pool(n_threads,
+        &build_lflows_pool, build_lflows_thread)) {
+        /* worker pool was updated */
+        if (get_worker_pool_size() <= 1) {
+            /* destroy potentially created lflow_hash_lock */
+            lflow_hash_lock_destroy();
+            parallelization_state = STATE_NULL;
+        }
+    }
+}
 
 static void
 build_mcast_groups(struct lflow_input *data,
@@ -14085,24 +14080,23 @@  void build_lflows(struct lflow_input *input_data,
     build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
                        &mcast_groups, &igmp_groups);
 
-    if (reset_parallel) {
-        /* Parallel build was disabled before, we need to
-         * re-enable it. */
-        use_parallel_build = true;
-        reset_parallel = false;
+    if (get_worker_pool_size() > 1) {
+        /* Disable parallel build on first run with dp_groups
+         * to determine the correct sizing of hashes.
+         */
+        if (parallelization_state != STATE_USE_PARALLELIZATION) {
+            if ((parallelization_state == STATE_NULL)
+                && (use_logical_dp_groups)) {
+                lflow_hash_lock_init();
+                parallelization_state = STATE_INIT_HASH_SIZES;
+            } else {
+                parallelization_state = STATE_USE_PARALLELIZATION;
+            }
+        }
     }
 
     fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
-    if (use_parallel_build && use_logical_dp_groups &&
-        needs_parallel_init) {
-        lflow_hash_lock_init();
-        needs_parallel_init = false;
-        /* Disable parallel build on first run with dp_groups
-         * to determine the correct sizing of hashes. */
-        use_parallel_build = false;
-        reset_parallel = true;
-    }
     build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
                                     input_data->port_groups, &lflows,
                                     &mcast_groups, &igmp_groups,
@@ -15350,10 +15344,6 @@  ovnnb_db_run(struct northd_input *input_data,
 
     smap_destroy(&options);
 
-    use_parallel_build =
-        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
-         can_parallelize_hashes(false));
-
     use_logical_dp_groups = smap_get_bool(&nb->options,
                                           "use_logical_dp_groups", true);
     use_ct_inv_match = smap_get_bool(&nb->options,
diff --git a/northd/northd.h b/northd/northd.h
index 2d804a22e..fe8dad03a 100644
--- a/northd/northd.h
+++ b/northd/northd.h
@@ -107,5 +107,6 @@  void build_bfd_table(struct lflow_input *input_data,
                      struct hmap *bfd_connections, struct hmap *ports);
 void bfd_cleanup_connections(struct lflow_input *input_data,
                              struct hmap *bfd_map);
+void run_update_worker_pool(int n_threads);
 
 #endif /* NORTHD_H */
diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
index 718d052cc..e9afda4c6 100644
--- a/northd/ovn-northd-ddlog.c
+++ b/northd/ovn-northd-ddlog.c
@@ -1084,7 +1084,6 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
         SSL_OPTION_ENUMS,
         OPT_DRY_RUN,
         OPT_DDLOG_RECORD,
-        OPT_DUMMY_NUMA,
     };
     static const struct option long_options[] = {
         {"ovnsb-db", required_argument, NULL, 'd'},
@@ -1098,7 +1097,6 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
         OVN_DAEMON_LONG_OPTIONS,
         VLOG_LONG_OPTIONS,
         STREAM_SSL_LONG_OPTIONS,
-        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
         {NULL, 0, NULL, 0},
     };
     char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
@@ -1155,10 +1153,6 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
             *pause = true;
             break;
 
-        case OPT_DUMMY_NUMA:
-            ovs_numa_set_dummy(optarg);
-            break;
-
         case OPT_DDLOG_RECORD:
             record_file = optarg;
             break;
diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
index 0fe350d0e..b6a5e9121 100644
--- a/northd/ovn-northd.8.xml
+++ b/northd/ovn-northd.8.xml
@@ -68,53 +68,24 @@ 
           restarting a process or disturbing a running system.
         </p>
       </dd>
-      <dt><code>--dummy-numa</code></dt>
+      <dt><code>n-threads N</code></dt>
       <dd>
-        <p>
-          Typically, OVS uses sysfs to determine the number of NUMA nodes and
-          CPU cores that are available on a machine. The parallelization code
-          in OVN uses this information to determine if there are enough
-          resources to use parallelization. The current algorithm enables
-          parallelization if the total number of CPU cores divided by the
-          number of NUMA nodes is greater than or equal to four.
-        </p>
-
         <p>
           In certain situations, it may be desirable to enable parallelization
-          on a system that otherwise would not have it allowed. The
-          <code>--dummy-numa</code> option allows for you to fake the NUMA
-          nodes and cores that OVS thinks your system has. The syntax consists
-          of using numbers to represent the NUMA node IDs. The number of times
-          that a NUMA node ID appears represents how many CPU cores that NUMA
-          node contains. So for instance, if you did the following:
-        </p>
-
-        <p>
-          <code>--dummy-numa=0,0,0,0</code>
-        </p>
-
-        <p>
-          it would make OVS assume that you have a single NUMA node with ID 0,
-          and that NUMA node consists of four CPU cores. Similarly, you could
-          do:
-        </p>
-
-        <p>
-          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
+          on a system to decrease latency (at the potential cost of increasing
+          CPU usage).
         </p>
 
         <p>
-          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
-          with six CPU cores.
+          This option will cause ovn-northd to use N threads when building
+          logical flows, when N is within [2-256].
+          If N is 1, parallelization is disabled (default behavior).
+          If N is less than 1 or more than 256, then N is set to 1,
+          parallelization is disabled and a warning is logged.
         </p>
 
         <p>
-          Currently, the only affect this option has is on whether
-          parallelization can be enabled in ovn-northd. There are no NUMA node
-          or CPU core-specific actions performed by OVN. Setting
-          <code>--dummy-numa</code> in ovn-northd does not affect how other OVS
-          processes on the system (such as ovs-vswitchd) count the number of
-          NUMA nodes and CPU cores; this setting is local to ovn-northd.
+          ovn-northd-ddlog does not support this option.
         </p>
       </dd>
     </dl>
@@ -210,6 +181,27 @@ 
         except for the northbound database client.
       </p>
       </dd>
+
+
+      <dt><code>set-n-threads N</code></dt>
+      <dd>
+      <p>
+        Set the number of threads used for building logical flows.
+        When N is within [2-256], parallelization is enabled.
+        When N is 1 parallelization is disabled.
+        When N is less than 1 or more than 256, an error is retuned.
+        If ovn-northd fails to start parallelization (e.g. fails to setup
+        semaphores, parallelization is disabled and an error is returned.
+      </p>
+      </dd>
+
+      <dt><code>get-n-threads</code></dt>
+      <dd>
+      <p>
+        Return the number of threads used for building logical flows.
+      </p>
+      </dd>
+
       </dl>
     </p>
 
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 45b120697..8d60376eb 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -41,6 +41,7 @@ 
 #include "unixctl.h"
 #include "util.h"
 #include "openvswitch/vlog.h"
+#include "lib/ovn-parallel-hmap.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_northd);
 
@@ -50,12 +51,16 @@  static unixctl_cb_func ovn_northd_resume;
 static unixctl_cb_func ovn_northd_is_paused;
 static unixctl_cb_func ovn_northd_status;
 static unixctl_cb_func cluster_state_reset_cmd;
+static unixctl_cb_func ovn_northd_set_thread_count_cmd;
+static unixctl_cb_func ovn_northd_get_thread_count_cmd;
 
 struct northd_state {
     bool had_lock;
     bool paused;
 };
 
+#define OVN_MAX_SUPPORTED_THREADS 256
+
 static const char *ovnnb_db;
 static const char *ovnsb_db;
 static const char *unixctl_path;
@@ -525,7 +530,7 @@  Options:\n\
   --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
                             (default: %s)\n\
   --dry-run                 start in paused state (do not commit db changes)\n\
-  --dummy-numa              override default NUMA node and CPU core discovery\n\
+  --n-threads=N             specify number of threads\n\
   --unixctl=SOCKET          override default control socket name\n\
   -h, --help                display this help message\n\
   -o, --options             list available options\n\
@@ -538,14 +543,14 @@  Options:\n\
 
 static void
 parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
-              bool *paused)
+              bool *paused, int *n_threads)
 {
     enum {
         OVN_DAEMON_OPTION_ENUMS,
         VLOG_OPTION_ENUMS,
         SSL_OPTION_ENUMS,
         OPT_DRY_RUN,
-        OPT_DUMMY_NUMA,
+        OPT_N_THREADS,
     };
     static const struct option long_options[] = {
         {"ovnsb-db", required_argument, NULL, 'd'},
@@ -555,7 +560,7 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
         {"options", no_argument, NULL, 'o'},
         {"version", no_argument, NULL, 'V'},
         {"dry-run", no_argument, NULL, OPT_DRY_RUN},
-        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
+        {"n-threads", required_argument, NULL, OPT_N_THREADS},
         OVN_DAEMON_LONG_OPTIONS,
         VLOG_LONG_OPTIONS,
         STREAM_SSL_LONG_OPTIONS,
@@ -611,8 +616,21 @@  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
             ovn_print_version(0, 0);
             exit(EXIT_SUCCESS);
 
-        case OPT_DUMMY_NUMA:
-            ovs_numa_set_dummy(optarg);
+        case OPT_N_THREADS:
+            *n_threads = atoi(optarg);
+            if (*n_threads < 1) {
+                *n_threads = 1;
+                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
+                    "set to : [%s]", *n_threads, optarg);
+            }
+            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
+                *n_threads = OVN_MAX_SUPPORTED_THREADS;
+                VLOG_WARN("Setting n_threads to %d as --n-threads option was "
+                    "set to : [%s]", *n_threads, optarg);
+            }
+            if (*n_threads != 1) {
+                VLOG_INFO("Using %d threads", *n_threads);
+            }
             break;
 
         case OPT_DRY_RUN:
@@ -668,6 +686,7 @@  main(int argc, char *argv[])
     struct unixctl_server *unixctl;
     int retval;
     bool exiting;
+    int n_threads = 1;
     struct northd_state state = {
         .had_lock = false,
         .paused = false
@@ -677,7 +696,7 @@  main(int argc, char *argv[])
     ovs_cmdl_proctitle_init(argc, argv);
     ovn_set_program_name(argv[0]);
     service_start(&argc, &argv);
-    parse_options(argc, argv, &state.paused);
+    parse_options(argc, argv, &state.paused, &n_threads);
 
     daemonize_start(false);
 
@@ -704,6 +723,12 @@  main(int argc, char *argv[])
     unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
                              cluster_state_reset_cmd,
                              &reset_ovnnb_idl_min_index);
+    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1, 1,
+                             ovn_northd_set_thread_count_cmd,
+                             NULL);
+    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
+                             ovn_northd_get_thread_count_cmd,
+                             NULL);
 
     daemonize_complete();
 
@@ -753,6 +778,8 @@  main(int argc, char *argv[])
     unsigned int ovnnb_cond_seqno = UINT_MAX;
     unsigned int ovnsb_cond_seqno = UINT_MAX;
 
+    run_update_worker_pool(n_threads);
+
     /* Main loop. */
     exiting = false;
 
@@ -1009,3 +1036,30 @@  cluster_state_reset_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
     poll_immediate_wake();
     unixctl_command_reply(conn, NULL);
 }
+
+static void
+ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
+               const char *argv[], void *aux OVS_UNUSED)
+{
+    int n_threads = atoi(argv[1]);
+
+    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
+        struct ds s = DS_EMPTY_INITIALIZER;
+        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
+        unixctl_command_reply_error(conn, ds_cstr(&s));
+        ds_destroy(&s);
+    } else {
+        run_update_worker_pool(n_threads);
+        unixctl_command_reply(conn, NULL);
+    }
+}
+
+static void
+ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc OVS_UNUSED,
+               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    struct ds s = DS_EMPTY_INITIALIZER;
+    ds_put_format(&s, "%d\n", get_worker_pool_size());
+    unixctl_command_reply(conn, ds_cstr(&s));
+    ds_destroy(&s);
+}
diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
index d78315c75..c6f0f6251 100644
--- a/tests/ovn-macros.at
+++ b/tests/ovn-macros.at
@@ -170,8 +170,8 @@  ovn_start_northd() {
         ovn-northd-ddlog) northd_args="$northd_args --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
     esac
 
-    if test X$NORTHD_DUMMY_NUMA = Xyes; then
-        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
+    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
+        northd_args="$northd_args --n-threads=4"
     fi
 
     local name=${d_prefix}northd${suffix}
@@ -252,10 +252,6 @@  ovn_start () {
     else
         ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
     fi
-
-    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
-        ovn-nbctl set NB_Global . options:use_parallel_build=true
-    fi
 }
 
 # Interconnection networks.
@@ -749,16 +745,6 @@  OVS_END_SHELL_HELPERS
 
 m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
 
-# Use --dummy-numa if system has low cores and we want to force parallelization
-m4_define([NORTHD_DUMMY_NUMA],
-  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
-     then
-       echo "yes"
-     else
-       echo "no"
-     fi)
-])
-
 # Defines a versions of a test with all combinations of northd and
 # datapath groups.
 m4_define([OVN_FOR_EACH_NORTHD],
@@ -770,35 +756,14 @@  m4_define([OVN_FOR_EACH_NORTHD],
 # Some tests aren't prepared for dp groups to be enabled.
 m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
   [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
-     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
-])])])
-
-# Test parallelization with dp groups enabled and disabled
-m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
-m4_pushdef([NORTHD_TYPE], [ovn_northd])
-m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
-[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
-    [[NORTHD_USE_PARALLELIZATION], [yes]
-])]])
-
-m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
-m4_pushdef([NORTHD_TYPE], [ovn_northd])
-m4_pushdef(NORTHD_DUMMY_NUMA, [no])
-[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
-    [[NORTHD_USE_PARALLELIZATION], [yes]
-])]])
-
-# Use --dummy-numa if system has low cores
-m4_define([HOST_HAS_LOW_CORES], [
-    if test $(nproc) -lt 4; then
-        :
-        $1
-    else
-        :
-        $2
-    fi
-])
+     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
+       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
+])])])])
+
+# Some tests aren't prepared for ddlog to be enabled.
+m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
+  [m4_foreach([NORTHD_TYPE], [ovn-northd],
+     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
+       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
+])])])])
 
-m4_define([NORTHD_PARALLELIZATION], [
-    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
-])
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 69ad85533..798bb5552 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -7158,3 +7158,112 @@  ct_next(ct_state=new|trk);
 
 AT_CLEANUP
 ])
+
+OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
+AT_SETUP([northd-parallelization unixctl])
+ovn_start
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
+OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
+])
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
+OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [4
+])
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
+OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE parallel-build/get-n-threads], [0], [1
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 0], [2], [],
+  [invalid n_threads: 0
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads -1], [2], [],
+  [invalid n_threads: -1
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 300], [2], [],
+  [invalid n_threads: 300
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads], [2], [],
+  ["parallel-build/set-n-threads" command requires at least 1 arguments
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 2], [2], [],
+  ["parallel-build/set-n-threads" command takes at most 1 arguments
+ovn-appctl: ovn-northd: server returned an error
+])
+
+AT_CLEANUP
+])
+
+OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
+AT_SETUP([northd-parallelization runtime])
+ovn_start
+
+add_switch_ports() {
+    for port in $(seq $1 $2); do
+        logical_switch_port=lsp${port}
+        check ovn-nbctl lsp-add ls1 $logical_switch_port
+        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
+    done
+}
+
+# Build some rather heavy config and modify number of threads in the middle
+check ovn-nbctl ls-add ls1
+check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
+check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
+
+check ovn-nbctl lr-add lr1
+check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router options:router-port=lrp0 addresses=dynamic
+check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
+check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
+add_switch_ports 1 50
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
+add_switch_ports 51 100
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
+add_switch_ports 101 150
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
+add_switch_ports 151 200
+
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
+add_switch_ports 201 250
+check ovn-nbctl --wait=sb sync
+
+# Run 3 times: one with parallelization enabled, one with disabled, and one while changing
+# Compare the flows produced by the three runs
+# Ignore IP/MAC addresses
+ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
+
+# Restart with 1 thread
+for port in $(seq 1 250); do
+    logical_switch_port=lsp${port}
+    check ovn-nbctl lsp-del $logical_switch_port
+done
+add_switch_ports 1 250
+check ovn-nbctl --wait=sb sync
+ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
+AT_CHECK([diff flows1 flows2])
+
+# Restart with with 8 threads
+check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
+for port in $(seq 1 250); do
+    logical_switch_port=lsp${port}
+    check ovn-nbctl lsp-del $logical_switch_port
+done
+add_switch_ports 1 250
+check ovn-nbctl --wait=sb sync
+ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
+AT_CHECK([diff flows1 flows3])
+
+AT_CLEANUP
+])