diff mbox series

[ovs-dev,v4,2/2] Add support for configuring parallelization via unixctl

Message ID 20210921154827.25940-2-anton.ivanov@cambridgegreys.com
State Not Applicable
Headers show
Series [ovs-dev,v4,1/2] Make changes to the parallel processing API to allow pool sizing | expand

Checks

Context Check Description
ovsrobot/apply-robot fail apply and check: fail

Commit Message

Anton Ivanov Sept. 21, 2021, 3:48 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

libs: add configuration support to parallel-hmap.[c,h]
northd: add support for configuring parallelization to northd

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/ovn-parallel-hmap.c | 185 ++++++++++++++++++++++++++++++++++++++--
 lib/ovn-parallel-hmap.h |  63 +++++++++++++-
 northd/northd.c         |  30 +++----
 northd/northd.h         |   2 -
 northd/ovn-northd.c     |   5 +-
 tests/ovn-macros.at     |  16 +++-
 6 files changed, 263 insertions(+), 38 deletions(-)

Comments

0-day Robot Sept. 27, 2021, 2:44 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


git-am:
error: sha1 information is lacking or useless (northd/northd.c).
error: could not build fake ancestor
hint: Use 'git am --show-current-patch' to see the failed patch
Patch failed at 0001 Add support for configuring parallelization via unixctl
When you have resolved this problem, run "git am --continue".
If you prefer to skip this patch, run "git am --skip" instead.
To restore the original branch and stop patching, run "git am --abort".


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Han Zhou Oct. 25, 2021, 3:05 a.m. UTC | #2
On Tue, Sep 21, 2021 at 8:48 AM <anton.ivanov@cambridgegreys.com> wrote:
>
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>
> libs: add configuration support to parallel-hmap.[c,h]
> northd: add support for configuring parallelization to northd

Hi Anton,

This patch seems to replace the NB option use_parallel_build with unix
command configuration. Could you explain the motivation of this? I feel
that NB option is better, because with HA we only need to set in one place
for all northds.
BTW, there is no documentation change for the NB options, if it is supposed
to be removed.

Thanks,
Han

>
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---
>  lib/ovn-parallel-hmap.c | 185 ++++++++++++++++++++++++++++++++++++++--
>  lib/ovn-parallel-hmap.h |  63 +++++++++++++-
>  northd/northd.c         |  30 +++----
>  northd/northd.h         |   2 -
>  northd/ovn-northd.c     |   5 +-
>  tests/ovn-macros.at     |  16 +++-
>  6 files changed, 263 insertions(+), 38 deletions(-)
>
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> index 1b3883441..6a6488a17 100644
> --- a/lib/ovn-parallel-hmap.c
> +++ b/lib/ovn-parallel-hmap.c
> @@ -33,6 +33,7 @@
>  #include "ovs-thread.h"
>  #include "ovs-numa.h"
>  #include "random.h"
> +#include "unixctl.h"
>
>  VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>
> @@ -46,6 +47,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>   */
>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>  static bool can_parallelize = false;
> +static bool should_parallelize = false;
>
>  /* This is set only in the process of exit and the set is
>   * accompanied by a fence. It does not need to be atomic or be
> @@ -83,7 +85,7 @@ static void *standard_helper_thread(void *arg);
>
>  struct worker_pool *ovn_add_standard_pool(int size)
>  {
> -    return add_worker_pool(standard_helper_thread, size);
> +    return add_worker_pool(standard_helper_thread, size, "default",
true);
>  }
>
>  bool
> @@ -92,6 +94,19 @@ ovn_stop_parallel_processing(struct worker_pool *pool)
>      return pool->workers_must_exit;
>  }
>
> +bool
> +ovn_set_parallel_processing(bool enable)
> +{
> +    should_parallelize = enable;
> +    return can_parallelize;
> +}
> +
> +bool
> +ovn_get_parallel_processing(void)
> +{
> +    return can_parallelize && should_parallelize;
> +}
> +
>  bool
>  ovn_can_parallelize_hashes(bool force_parallel)
>  {
> @@ -117,6 +132,7 @@ destroy_pool(struct worker_pool *pool) {
>      sem_close(pool->done);
>      sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>      sem_unlink(sem_name);
> +    free(pool->name);
>      free(pool);
>  }
>
> @@ -127,6 +143,10 @@ ovn_resize_pool(struct worker_pool *pool, int size)
>
>      ovs_assert(pool != NULL);
>
> +    if (!pool->is_mutable) {
> +        return false;
> +    }
> +
>      if (!size) {
>          size = pool_size;
>      }
> @@ -166,7 +186,8 @@ cleanup:
>
>
>  struct worker_pool *
> -ovn_add_worker_pool(void *(*start)(void *), int size)
> +ovn_add_worker_pool(void *(*start)(void *), int size, char *name,
> +                    bool is_mutable)
>  {
>      struct worker_pool *new_pool = NULL;
>      bool test = false;
> @@ -194,6 +215,8 @@ ovn_add_worker_pool(void *(*start)(void *), int size)
>          new_pool = xmalloc(sizeof(struct worker_pool));
>          new_pool->size = size;
>          new_pool->start = start;
> +        new_pool->is_mutable = is_mutable;
> +        new_pool->name = xstrdup(name);
>          sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>          new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>          if (new_pool->done == SEM_FAILED) {
> @@ -226,6 +249,7 @@ cleanup:
>          sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>          sem_unlink(sem_name);
>      }
> +    free(new_pool->name);
>      ovs_mutex_unlock(&init_mutex);
>      return NULL;
>  }
> @@ -342,8 +366,7 @@ ovn_complete_pool_callback(struct worker_pool *pool,
>          }
>      } while (completed < pool->size);
>  }
> -
> -/* Complete a thread pool which uses a callback function to process
results
> +/* Run a thread pool which uses a callback function to process results
>   */
>  void
>  ovn_run_pool_callback(struct worker_pool *pool,
> @@ -352,8 +375,8 @@ ovn_run_pool_callback(struct worker_pool *pool,
>                                            void *fin_result,
>                                            void *result_frags, int index))
>  {
> -    ovn_start_pool(pool);
> -    ovn_complete_pool_callback(pool, fin_result, result_frags,
helper_func);
> +    start_pool(pool);
> +    complete_pool_callback(pool, fin_result, result_frags, helper_func);
>  }
>
>  /* Run a thread pool - basic, does not do results processing.
> @@ -401,6 +424,28 @@ ovn_fast_hmap_merge(struct hmap *dest, struct hmap
*inc)
>      inc->n = 0;
>  }
>
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +void
> +ovn_complete_pool_hash(struct worker_pool *pool,
> +                  struct hmap *result,
> +                  struct hmap *result_frags)
> +{
> +    complete_pool_callback(pool, result, result_frags,
merge_hash_results);
> +}
> +
> +/* Run a thread pool which gathers results in an array of lists.
> + * Merge results.
> + */
> +void
> +ovn_complete_pool_list(struct worker_pool *pool,
> +                  struct ovs_list *result,
> +                  struct ovs_list *result_frags)
> +{
> +    complete_pool_callback(pool, result, result_frags,
merge_list_results);
> +}
> +
>  /* Run a thread pool which gathers results in an array
>   * of hashes. Merge results.
>   */
> @@ -514,7 +559,7 @@ static struct worker_control *alloc_controls(int size)
>
>  static void
>  worker_pool_hook(void *aux OVS_UNUSED) {
> -    static struct worker_pool *pool;
> +    struct worker_pool *pool;
>      char sem_name[256];
>
>      /* All workers must honour the must_exit flag and check for it
regularly.
> @@ -628,4 +673,130 @@ standard_helper_thread(void *arg)
>  }
>
>
> +static void
> +ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc
OVS_UNUSED,
> +                            const char *argv[], void *unused OVS_UNUSED)
> +{
> +
> +    struct worker_pool *pool;
> +    int value;
> +
> +    if (!str_to_int(argv[2], 10, &value)) {
> +        unixctl_command_reply_error(conn, "invalid argument");
> +        return;
> +    }
> +
> +    if (value > 0) {
> +        pool_size = value;
> +    }
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        if (strcmp(pool->name, argv[1]) == 0) {
> +            resize_pool(pool, value);
> +            unixctl_command_reply_error(conn, NULL);
> +        }
> +    }
> +    unixctl_command_reply_error(conn, "pool not found");
> +}
> +
> +static void
> +ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc
OVS_UNUSED,
> +                           const char *argv[] OVS_UNUSED,
> +                           void *unused OVS_UNUSED)
> +{
> +
> +    char *reply = NULL;
> +    char *new_reply;
> +    char buf[256];
> +    struct worker_pool *pool;
> +
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        snprintf(buf, 255, "%s : %d\n", pool->name, pool->size);
> +        if (reply) {
> +            new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);
> +            ovs_strlcpy(new_reply, reply, strlen(reply));
> +            strcat(new_reply, buf);
> +            free(reply);
> +        }
> +        reply = new_reply;
> +    }
> +    unixctl_command_reply(conn, reply);
> +}
> +
> +static void
> +ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc
OVS_UNUSED,
> +                                const char *argv[], void *unused
OVS_UNUSED)
> +{
> +    int value;
> +    bool result;
> +    if (!str_to_int(argv[1], 10, &value)) {
> +        unixctl_command_reply_error(conn, "invalid argument");
> +        return;
> +    }
> +
> +    if (!ovn_can_parallelize_hashes(true)) {
> +        unixctl_command_reply_error(conn, "cannot enable parallel
processing");
> +        return;
> +    }
> +
> +    if (value > 0) {
> +        /* Change default pool size */
> +        ovs_mutex_lock(&init_mutex);
> +        pool_size = value;
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +
> +    result = ovn_set_parallel_processing(true);
> +    unixctl_command_reply(conn, result ? "enabled" : "disabled");
> +}
> +
> +static void
> +ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,
> +                                 int argc OVS_UNUSED,
> +                                 const char *argv[] OVS_UNUSED,
> +                                 void *unused OVS_UNUSED)
> +{
> +    ovn_set_parallel_processing(false);
> +    unixctl_command_reply(conn, NULL);
> +}
> +
> +static void
> +ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc
OVS_UNUSED,
> +                                const char *argv[] OVS_UNUSED,
> +                                void *unused OVS_UNUSED)
> +{
> +    char status[256];
> +
> +    sprintf(status, "%s, default pool size %d",
> +            get_parallel_processing() ? "active" : "inactive",
> +            pool_size);
> +
> +    unixctl_command_reply(conn, status);
> +}
> +
> +void
> +ovn_parallel_thread_pools_init(void)
> +{
> +    bool test = false;
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools(false);
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +
> +    unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1,
> +                             ovn_thread_pool_set_parallel_on, NULL);
> +    unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0,
> +                             ovn_thread_pool_set_parallel_off, NULL);
> +    unixctl_command_register("thread-pool/status", "", 0, 0,
> +                             ovn_thread_pool_parallel_status, NULL);
> +    unixctl_command_register("thread-pool/list", "", 0, 0,
> +                             ovn_thread_pool_list_pools, NULL);
> +    unixctl_command_register("thread-pool/reload-pool", "Pool Threads",
2, 2,
> +                             ovn_thread_pool_resize_pool, NULL);
> +}
> +
>  #endif
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> index 4cdd5c4e5..9c0a69cb1 100644
> --- a/lib/ovn-parallel-hmap.h
> +++ b/lib/ovn-parallel-hmap.h
> @@ -33,6 +33,7 @@ extern "C" {
>  #include "openvswitch/hmap.h"
>  #include "openvswitch/thread.h"
>  #include "ovs-atomic.h"
> +#include "unixctl.h"
>
>  /* Process this include only if OVS does not supply parallel definitions
>   */
> @@ -93,6 +94,8 @@ struct worker_pool {
>      sem_t *done; /* Work completion semaphorew. */
>      void *(*start)(void *); /* Work function. */
>      bool workers_must_exit; /* Pool to be destroyed flag. */
> +    char *name; /* Name to be used in cli commands */
> +    bool is_mutable; /* Can the pool be reloaded with different params */
>  };
>
>
> @@ -109,7 +112,9 @@ struct helper_data {
>   * size uses system defaults.
>   */
>
> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int
size);
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),
> +                                        int size, char *name,
> +                                        bool is_mutable);
>
>  struct worker_pool *ovn_add_standard_pool(int size);
>
> @@ -188,6 +193,35 @@ void ovn_complete_pool_callback(struct worker_pool
*pool, void *fin_result,
>                             void *fin_result, void *result_frags, int
index));
>
>
> +/* Start a pool. Do not wait for any results. They will be collected
> + * using the _complete_ functions.
> + */
> +void ovn_start_pool(struct worker_pool *pool);
> +
> +/* Complete a pool run started using start_pool();
> + * Merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_complete_pool_hash(struct worker_pool *pool,
> +                       struct hmap *result, struct hmap *result_frags);
> +
> +/* Complete a pool run started using start_pool();
> + * Merge results from list frags into a final list result.
> + */
> +
> +void ovn_complete_pool_list(struct worker_pool *pool,
> +                       struct ovs_list *result, struct ovs_list
*result_frags);
> +
> +/* Complete a pool run started using start_pool();
> + * Call a callback function to perform processing of results.
> + */
> +
> +void ovn_complete_pool_callback(struct worker_pool *pool, void
*fin_result,
> +                           void *result_frags,
> +                           void (*helper_func)(struct worker_pool *pool,
> +                           void *fin_result, void *result_frags, int
index));
> +
>  /* Returns the first node in 'hmap' in the bucket in which the given
'hash'
>   * would land, or a null pointer if that bucket is empty. */
>
> @@ -298,10 +332,16 @@ static inline void init_hash_row_locks(struct
hashrow_locks *hrl)
>
>  bool ovn_can_parallelize_hashes(bool force_parallel);
>
> +bool ovn_set_parallel_processing(bool enable);
> +
> +bool ovn_get_parallel_processing(void);
> +
>  void ovn_destroy_pool(struct worker_pool *pool);
>
>  bool ovn_resize_pool(struct worker_pool *pool, int size);
>
> +void ovn_parallel_thread_pools_init(void);
> +
>  /* Use the OVN library functions for stuff which OVS has not defined
>   * If OVS has defined these, they will still compile using the OVN
>   * local names, but will be dropped by the linker in favour of the OVS
> @@ -312,9 +352,16 @@ bool ovn_resize_pool(struct worker_pool *pool, int
size);
>
>  #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>
> +#define set_parallel_processing(enable)
ovn_set_parallel_processing(enable)
> +
> +#define get_parallel_processing() ovn_get_parallel_processing()
> +
> +#define enable_parallel_processing() ovn_enable_parallel_processing()
> +
>  #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
>
> -#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
> +#define add_worker_pool(start, size, name, is_mutable) \
> +        ovn_add_worker_pool(start, size, name, is_mutable)
>
>  #define add_standard_pool(start, size) ovn_add_standard_pool(start, size)
>
> @@ -339,6 +386,17 @@ bool ovn_resize_pool(struct worker_pool *pool, int
size);
>
>  #define start_pool(pool) ovn_start_pool(pool)
>
> +#define complete_pool_hash(pool, result, result_frags) \
> +    ovn_complete_pool_hash(pool, result, result_frags)
> +
> +#define complete_pool_list(pool, result, result_frags) \
> +    ovn_complete_pool_list(pool, result, result_frags)
> +
> +#define complete_pool_callback(pool, fin_result, result_frags,
helper_func) \
> +    ovn_complete_pool_callback(pool, fin_result, result_frags,
helper_func)
> +
> +#define start_pool(pool) ovn_start_pool(pool)
> +
>  #define complete_pool_hash(pool, result, result_frags) \
>      ovn_complete_pool_hash(pool, result, result_frags)
>
> @@ -352,6 +410,7 @@ bool ovn_resize_pool(struct worker_pool *pool, int
size);
>
>  #define resize_pool(pool, size) ovn_resize_pool(pool, size)
>
> +#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()
>
>  #ifdef __clang__
>  #pragma clang diagnostic pop
> diff --git a/northd/northd.c b/northd/northd.c
> index 7724d27e9..d6401fe62 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -4279,7 +4279,6 @@ ovn_lflow_equal(const struct ovn_lflow *a, const
struct ovn_datapath *od,
>  /* If this option is 'true' northd will combine logical flows that
differ by
>   * logical datapath only by creating a datapath group. */
>  static bool use_logical_dp_groups = false;
> -static bool use_parallel_build = true;
>
>  static void
>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
> @@ -4298,7 +4297,7 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct
ovn_datapath *od,
>      lflow->ctrl_meter = ctrl_meter;
>      lflow->dpg = NULL;
>      lflow->where = where;
> -    if (use_parallel_build && use_logical_dp_groups) {
> +    if (get_parallel_processing() && use_logical_dp_groups) {
>          ovs_mutex_init(&lflow->odg_lock);
>      }
>  }
> @@ -4370,7 +4369,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct
ovn_datapath *od,
>                     nullable_xstrdup(ctrl_meter),
>                     ovn_lflow_hint(stage_hint), where);
>      hmapx_add(&lflow->od_group, od);
> -    if (!use_parallel_build) {
> +    if (!get_parallel_processing()) {
>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>      } else {
>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> @@ -4441,7 +4440,7 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map,
struct ovn_datapath *od,
>      struct ovn_lflow *lflow;
>
>      ovs_assert(ovn_stage_to_datapath_type(stage) ==
ovn_datapath_get_type(od));
> -    if (use_logical_dp_groups && use_parallel_build) {
> +    if (use_logical_dp_groups && get_parallel_processing()) {
>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>                                      match, actions, io_port, stage_hint,
where,
>                                      ctrl_meter);
> @@ -4479,7 +4478,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow
*lflow_ref,
>          return false;
>      }
>
> -    if (use_parallel_build && use_logical_dp_groups) {
> +    if (get_parallel_processing() && use_logical_dp_groups) {
>          ovs_mutex_lock(&lflow_ref->odg_lock);
>          hmapx_add(&lflow_ref->od_group, od);
>          ovs_mutex_unlock(&lflow_ref->odg_lock);
> @@ -12962,7 +12961,8 @@ init_lflows_thread_pool(void)
>  {
>
>      if (!pool_init_done) {
> -        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
> +        build_lflows_pool = add_worker_pool(build_lflows_thread, 0,
> +                                            "lflows", true);
>          pool_init_done = true;
>      }
>  }
> @@ -12978,14 +12978,11 @@ build_lswitch_and_lrouter_flows(struct hmap
*datapaths, struct hmap *ports,
>
>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>
> -    if (use_parallel_build) {
> +    if (get_parallel_processing()) {
>          init_lflows_thread_pool();
> -        if (!can_parallelize_hashes(false)) {
> -            use_parallel_build = false;
> -        }
>      }
>
> -    if (use_parallel_build) {
> +    if (get_parallel_processing()) {
>          struct hmap *lflow_segs;
>          struct lswitch_flow_build_info *lsiv;
>          int index;
> @@ -13185,19 +13182,19 @@ build_lflows(struct northd_context *ctx, struct
hmap *datapaths,
>      if (reset_parallel) {
>          /* Parallel build was disabled before, we need to
>           * re-enable it. */
> -        use_parallel_build = true;
> +        set_parallel_processing(true);
>          reset_parallel = false;
>      }
>
>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>
> -    if (use_parallel_build && use_logical_dp_groups &&
> +    if (get_parallel_processing() && use_logical_dp_groups &&
>          needs_parallel_init) {
>          ovs_rwlock_init(&flowtable_lock);
>          needs_parallel_init = false;
>          /* Disable parallel build on first run with dp_groups
>           * to determine the correct sizing of hashes. */
> -        use_parallel_build = false;
> +        set_parallel_processing(false);
>          reset_parallel = true;
>      }
>      build_lswitch_and_lrouter_flows(datapaths, ports,
> @@ -14279,10 +14276,6 @@ ovnnb_db_run(struct northd_context *ctx,
>      ovsdb_idl_set_probe_interval(ctx->ovnnb_idl,
northd_probe_interval_nb);
>      ovsdb_idl_set_probe_interval(ctx->ovnsb_idl,
northd_probe_interval_sb);
>
> -    use_parallel_build =
> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
> -         can_parallelize_hashes(false));
> -
>      use_logical_dp_groups = smap_get_bool(&nb->options,
>                                            "use_logical_dp_groups", true);
>      use_ct_inv_match = smap_get_bool(&nb->options,
> @@ -14652,7 +14645,6 @@ ovn_db_run(struct northd_context *ctx,
>      ovs_list_init(&lr_list);
>      hmap_init(&datapaths);
>      hmap_init(&ports);
> -    use_parallel_build = ctx->use_parallel_build;
>
>      int64_t start_time = time_wall_msec();
>      stopwatch_start(OVNNB_DB_RUN_STOPWATCH_NAME, time_msec());
> diff --git a/northd/northd.h b/northd/northd.h
> index ffa2bbb4e..5cbd183ef 100644
> --- a/northd/northd.h
> +++ b/northd/northd.h
> @@ -27,8 +27,6 @@ struct northd_context {
>      struct ovsdb_idl_index *sbrec_ha_chassis_grp_by_name;
>      struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp;
>      struct ovsdb_idl_index *sbrec_ip_mcast_by_dp;
> -
> -    bool use_parallel_build;
>  };
>
>  void
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 42c0ad644..fb1268661 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -65,8 +65,6 @@ static const char *ssl_private_key_file;
>  static const char *ssl_certificate_file;
>  static const char *ssl_ca_cert_file;
>
> -static bool use_parallel_build = true;
> -
>  static const char *rbac_chassis_auth[] =
>      {"name"};
>  static const char *rbac_chassis_update[] =
> @@ -622,7 +620,7 @@ main(int argc, char *argv[])
>
>      daemonize_complete();
>
> -    use_parallel_build = can_parallelize_hashes(false);
> +    ovn_parallel_thread_pools_init();
>
>      /* We want to detect (almost) all changes to the ovn-nb db. */
>      struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
> @@ -941,7 +939,6 @@ main(int argc, char *argv[])
>                  .sbrec_ha_chassis_grp_by_name =
sbrec_ha_chassis_grp_by_name,
>                  .sbrec_mcast_group_by_name_dp =
sbrec_mcast_group_by_name_dp,
>                  .sbrec_ip_mcast_by_dp = sbrec_ip_mcast_by_dp,
> -                .use_parallel_build = use_parallel_build,
>              };
>
>              if (!state.had_lock &&
ovsdb_idl_has_lock(ovnsb_idl_loop.idl)) {
> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
> index f06f2e68e..958ce18b0 100644
> --- a/tests/ovn-macros.at
> +++ b/tests/ovn-macros.at
> @@ -179,6 +179,18 @@ ovn_start_northd() {
>      test -d "$ovs_base/$name" || mkdir "$ovs_base/$name"
>      as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \
>                 --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB
> +    if test -z "$USE_PARALLEL_THREADS" ; then
> +        USE_PARALLEL_THREADS=0
> +    fi
> +
> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> +        case ${NORTHD_TYPE:=ovn-northd} in
> +            ovn-northd) ovs-appctl --timeout=10 --target
northd$suffix/ovn-northd \
> +                            thread-pool/set-parallel-on
$USE_PARALLEL_THREADS
> +            ;;
> +        esac
> +    fi
> +
>  }
>
>  # ovn_start [--backup-northd=none|paused] [AZ]
> @@ -252,10 +264,6 @@ ovn_start () {
>      else
>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>      fi
> -
> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
> -    fi
>  }
>
>  # Interconnection networks.
> --
> 2.20.1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Anton Ivanov Oct. 25, 2021, 6:20 p.m. UTC | #3
Hi Han,

I had requests to move this to cli. This also allows a unified format if we add parallelization to ovsdb or controller. 

On 25 October 2021 04:05:39 BST, Han Zhou <hzhou@ovn.org> wrote:
>On Tue, Sep 21, 2021 at 8:48 AM <anton.ivanov@cambridgegreys.com> wrote:
>>
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> libs: add configuration support to parallel-hmap.[c,h]
>> northd: add support for configuring parallelization to northd
>
>Hi Anton,
>
>This patch seems to replace the NB option use_parallel_build with unix
>command configuration. Could you explain the motivation of this? I feel
>that NB option is better, because with HA we only need to set in one place
>for all northds.
>BTW, there is no documentation change for the NB options, if it is supposed
>to be removed.
>
>Thanks,
>Han
>
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> ---
>>  lib/ovn-parallel-hmap.c | 185 ++++++++++++++++++++++++++++++++++++++--
>>  lib/ovn-parallel-hmap.h |  63 +++++++++++++-
>>  northd/northd.c         |  30 +++----
>>  northd/northd.h         |   2 -
>>  northd/ovn-northd.c     |   5 +-
>>  tests/ovn-macros.at     |  16 +++-
>>  6 files changed, 263 insertions(+), 38 deletions(-)
>>
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> index 1b3883441..6a6488a17 100644
>> --- a/lib/ovn-parallel-hmap.c
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -33,6 +33,7 @@
>>  #include "ovs-thread.h"
>>  #include "ovs-numa.h"
>>  #include "random.h"
>> +#include "unixctl.h"
>>
>>  VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>
>> @@ -46,6 +47,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>   */
>>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>>  static bool can_parallelize = false;
>> +static bool should_parallelize = false;
>>
>>  /* This is set only in the process of exit and the set is
>>   * accompanied by a fence. It does not need to be atomic or be
>> @@ -83,7 +85,7 @@ static void *standard_helper_thread(void *arg);
>>
>>  struct worker_pool *ovn_add_standard_pool(int size)
>>  {
>> -    return add_worker_pool(standard_helper_thread, size);
>> +    return add_worker_pool(standard_helper_thread, size, "default",
>true);
>>  }
>>
>>  bool
>> @@ -92,6 +94,19 @@ ovn_stop_parallel_processing(struct worker_pool *pool)
>>      return pool->workers_must_exit;
>>  }
>>
>> +bool
>> +ovn_set_parallel_processing(bool enable)
>> +{
>> +    should_parallelize = enable;
>> +    return can_parallelize;
>> +}
>> +
>> +bool
>> +ovn_get_parallel_processing(void)
>> +{
>> +    return can_parallelize && should_parallelize;
>> +}
>> +
>>  bool
>>  ovn_can_parallelize_hashes(bool force_parallel)
>>  {
>> @@ -117,6 +132,7 @@ destroy_pool(struct worker_pool *pool) {
>>      sem_close(pool->done);
>>      sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>>      sem_unlink(sem_name);
>> +    free(pool->name);
>>      free(pool);
>>  }
>>
>> @@ -127,6 +143,10 @@ ovn_resize_pool(struct worker_pool *pool, int size)
>>
>>      ovs_assert(pool != NULL);
>>
>> +    if (!pool->is_mutable) {
>> +        return false;
>> +    }
>> +
>>      if (!size) {
>>          size = pool_size;
>>      }
>> @@ -166,7 +186,8 @@ cleanup:
>>
>>
>>  struct worker_pool *
>> -ovn_add_worker_pool(void *(*start)(void *), int size)
>> +ovn_add_worker_pool(void *(*start)(void *), int size, char *name,
>> +                    bool is_mutable)
>>  {
>>      struct worker_pool *new_pool = NULL;
>>      bool test = false;
>> @@ -194,6 +215,8 @@ ovn_add_worker_pool(void *(*start)(void *), int size)
>>          new_pool = xmalloc(sizeof(struct worker_pool));
>>          new_pool->size = size;
>>          new_pool->start = start;
>> +        new_pool->is_mutable = is_mutable;
>> +        new_pool->name = xstrdup(name);
>>          sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>          new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>          if (new_pool->done == SEM_FAILED) {
>> @@ -226,6 +249,7 @@ cleanup:
>>          sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>          sem_unlink(sem_name);
>>      }
>> +    free(new_pool->name);
>>      ovs_mutex_unlock(&init_mutex);
>>      return NULL;
>>  }
>> @@ -342,8 +366,7 @@ ovn_complete_pool_callback(struct worker_pool *pool,
>>          }
>>      } while (completed < pool->size);
>>  }
>> -
>> -/* Complete a thread pool which uses a callback function to process
>results
>> +/* Run a thread pool which uses a callback function to process results
>>   */
>>  void
>>  ovn_run_pool_callback(struct worker_pool *pool,
>> @@ -352,8 +375,8 @@ ovn_run_pool_callback(struct worker_pool *pool,
>>                                            void *fin_result,
>>                                            void *result_frags, int index))
>>  {
>> -    ovn_start_pool(pool);
>> -    ovn_complete_pool_callback(pool, fin_result, result_frags,
>helper_func);
>> +    start_pool(pool);
>> +    complete_pool_callback(pool, fin_result, result_frags, helper_func);
>>  }
>>
>>  /* Run a thread pool - basic, does not do results processing.
>> @@ -401,6 +424,28 @@ ovn_fast_hmap_merge(struct hmap *dest, struct hmap
>*inc)
>>      inc->n = 0;
>>  }
>>
>> +/* Run a thread pool which gathers results in an array
>> + * of hashes. Merge results.
>> + */
>> +void
>> +ovn_complete_pool_hash(struct worker_pool *pool,
>> +                  struct hmap *result,
>> +                  struct hmap *result_frags)
>> +{
>> +    complete_pool_callback(pool, result, result_frags,
>merge_hash_results);
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array of lists.
>> + * Merge results.
>> + */
>> +void
>> +ovn_complete_pool_list(struct worker_pool *pool,
>> +                  struct ovs_list *result,
>> +                  struct ovs_list *result_frags)
>> +{
>> +    complete_pool_callback(pool, result, result_frags,
>merge_list_results);
>> +}
>> +
>>  /* Run a thread pool which gathers results in an array
>>   * of hashes. Merge results.
>>   */
>> @@ -514,7 +559,7 @@ static struct worker_control *alloc_controls(int size)
>>
>>  static void
>>  worker_pool_hook(void *aux OVS_UNUSED) {
>> -    static struct worker_pool *pool;
>> +    struct worker_pool *pool;
>>      char sem_name[256];
>>
>>      /* All workers must honour the must_exit flag and check for it
>regularly.
>> @@ -628,4 +673,130 @@ standard_helper_thread(void *arg)
>>  }
>>
>>
>> +static void
>> +ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc
>OVS_UNUSED,
>> +                            const char *argv[], void *unused OVS_UNUSED)
>> +{
>> +
>> +    struct worker_pool *pool;
>> +    int value;
>> +
>> +    if (!str_to_int(argv[2], 10, &value)) {
>> +        unixctl_command_reply_error(conn, "invalid argument");
>> +        return;
>> +    }
>> +
>> +    if (value > 0) {
>> +        pool_size = value;
>> +    }
>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> +        if (strcmp(pool->name, argv[1]) == 0) {
>> +            resize_pool(pool, value);
>> +            unixctl_command_reply_error(conn, NULL);
>> +        }
>> +    }
>> +    unixctl_command_reply_error(conn, "pool not found");
>> +}
>> +
>> +static void
>> +ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc
>OVS_UNUSED,
>> +                           const char *argv[] OVS_UNUSED,
>> +                           void *unused OVS_UNUSED)
>> +{
>> +
>> +    char *reply = NULL;
>> +    char *new_reply;
>> +    char buf[256];
>> +    struct worker_pool *pool;
>> +
>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> +        snprintf(buf, 255, "%s : %d\n", pool->name, pool->size);
>> +        if (reply) {
>> +            new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);
>> +            ovs_strlcpy(new_reply, reply, strlen(reply));
>> +            strcat(new_reply, buf);
>> +            free(reply);
>> +        }
>> +        reply = new_reply;
>> +    }
>> +    unixctl_command_reply(conn, reply);
>> +}
>> +
>> +static void
>> +ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc
>OVS_UNUSED,
>> +                                const char *argv[], void *unused
>OVS_UNUSED)
>> +{
>> +    int value;
>> +    bool result;
>> +    if (!str_to_int(argv[1], 10, &value)) {
>> +        unixctl_command_reply_error(conn, "invalid argument");
>> +        return;
>> +    }
>> +
>> +    if (!ovn_can_parallelize_hashes(true)) {
>> +        unixctl_command_reply_error(conn, "cannot enable parallel
>processing");
>> +        return;
>> +    }
>> +
>> +    if (value > 0) {
>> +        /* Change default pool size */
>> +        ovs_mutex_lock(&init_mutex);
>> +        pool_size = value;
>> +        ovs_mutex_unlock(&init_mutex);
>> +    }
>> +
>> +    result = ovn_set_parallel_processing(true);
>> +    unixctl_command_reply(conn, result ? "enabled" : "disabled");
>> +}
>> +
>> +static void
>> +ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,
>> +                                 int argc OVS_UNUSED,
>> +                                 const char *argv[] OVS_UNUSED,
>> +                                 void *unused OVS_UNUSED)
>> +{
>> +    ovn_set_parallel_processing(false);
>> +    unixctl_command_reply(conn, NULL);
>> +}
>> +
>> +static void
>> +ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc
>OVS_UNUSED,
>> +                                const char *argv[] OVS_UNUSED,
>> +                                void *unused OVS_UNUSED)
>> +{
>> +    char status[256];
>> +
>> +    sprintf(status, "%s, default pool size %d",
>> +            get_parallel_processing() ? "active" : "inactive",
>> +            pool_size);
>> +
>> +    unixctl_command_reply(conn, status);
>> +}
>> +
>> +void
>> +ovn_parallel_thread_pools_init(void)
>> +{
>> +    bool test = false;
>> +
>> +    if (atomic_compare_exchange_strong(
>> +            &initial_pool_setup,
>> +            &test,
>> +            true)) {
>> +        ovs_mutex_lock(&init_mutex);
>> +        setup_worker_pools(false);
>> +        ovs_mutex_unlock(&init_mutex);
>> +    }
>> +
>> +    unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1,
>> +                             ovn_thread_pool_set_parallel_on, NULL);
>> +    unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0,
>> +                             ovn_thread_pool_set_parallel_off, NULL);
>> +    unixctl_command_register("thread-pool/status", "", 0, 0,
>> +                             ovn_thread_pool_parallel_status, NULL);
>> +    unixctl_command_register("thread-pool/list", "", 0, 0,
>> +                             ovn_thread_pool_list_pools, NULL);
>> +    unixctl_command_register("thread-pool/reload-pool", "Pool Threads",
>2, 2,
>> +                             ovn_thread_pool_resize_pool, NULL);
>> +}
>> +
>>  #endif
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> index 4cdd5c4e5..9c0a69cb1 100644
>> --- a/lib/ovn-parallel-hmap.h
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -33,6 +33,7 @@ extern "C" {
>>  #include "openvswitch/hmap.h"
>>  #include "openvswitch/thread.h"
>>  #include "ovs-atomic.h"
>> +#include "unixctl.h"
>>
>>  /* Process this include only if OVS does not supply parallel definitions
>>   */
>> @@ -93,6 +94,8 @@ struct worker_pool {
>>      sem_t *done; /* Work completion semaphorew. */
>>      void *(*start)(void *); /* Work function. */
>>      bool workers_must_exit; /* Pool to be destroyed flag. */
>> +    char *name; /* Name to be used in cli commands */
>> +    bool is_mutable; /* Can the pool be reloaded with different params */
>>  };
>>
>>
>> @@ -109,7 +112,9 @@ struct helper_data {
>>   * size uses system defaults.
>>   */
>>
>> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int
>size);
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),
>> +                                        int size, char *name,
>> +                                        bool is_mutable);
>>
>>  struct worker_pool *ovn_add_standard_pool(int size);
>>
>> @@ -188,6 +193,35 @@ void ovn_complete_pool_callback(struct worker_pool
>*pool, void *fin_result,
>>                             void *fin_result, void *result_frags, int
>index));
>>
>>
>> +/* Start a pool. Do not wait for any results. They will be collected
>> + * using the _complete_ functions.
>> + */
>> +void ovn_start_pool(struct worker_pool *pool);
>> +
>> +/* Complete a pool run started using start_pool();
>> + * Merge results from hash frags into a final hash result.
>> + * The hash frags must be pre-sized to the same size.
>> + */
>> +
>> +void ovn_complete_pool_hash(struct worker_pool *pool,
>> +                       struct hmap *result, struct hmap *result_frags);
>> +
>> +/* Complete a pool run started using start_pool();
>> + * Merge results from list frags into a final list result.
>> + */
>> +
>> +void ovn_complete_pool_list(struct worker_pool *pool,
>> +                       struct ovs_list *result, struct ovs_list
>*result_frags);
>> +
>> +/* Complete a pool run started using start_pool();
>> + * Call a callback function to perform processing of results.
>> + */
>> +
>> +void ovn_complete_pool_callback(struct worker_pool *pool, void
>*fin_result,
>> +                           void *result_frags,
>> +                           void (*helper_func)(struct worker_pool *pool,
>> +                           void *fin_result, void *result_frags, int
>index));
>> +
>>  /* Returns the first node in 'hmap' in the bucket in which the given
>'hash'
>>   * would land, or a null pointer if that bucket is empty. */
>>
>> @@ -298,10 +332,16 @@ static inline void init_hash_row_locks(struct
>hashrow_locks *hrl)
>>
>>  bool ovn_can_parallelize_hashes(bool force_parallel);
>>
>> +bool ovn_set_parallel_processing(bool enable);
>> +
>> +bool ovn_get_parallel_processing(void);
>> +
>>  void ovn_destroy_pool(struct worker_pool *pool);
>>
>>  bool ovn_resize_pool(struct worker_pool *pool, int size);
>>
>> +void ovn_parallel_thread_pools_init(void);
>> +
>>  /* Use the OVN library functions for stuff which OVS has not defined
>>   * If OVS has defined these, they will still compile using the OVN
>>   * local names, but will be dropped by the linker in favour of the OVS
>> @@ -312,9 +352,16 @@ bool ovn_resize_pool(struct worker_pool *pool, int
>size);
>>
>>  #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>>
>> +#define set_parallel_processing(enable)
>ovn_set_parallel_processing(enable)
>> +
>> +#define get_parallel_processing() ovn_get_parallel_processing()
>> +
>> +#define enable_parallel_processing() ovn_enable_parallel_processing()
>> +
>>  #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
>>
>> -#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
>> +#define add_worker_pool(start, size, name, is_mutable) \
>> +        ovn_add_worker_pool(start, size, name, is_mutable)
>>
>>  #define add_standard_pool(start, size) ovn_add_standard_pool(start, size)
>>
>> @@ -339,6 +386,17 @@ bool ovn_resize_pool(struct worker_pool *pool, int
>size);
>>
>>  #define start_pool(pool) ovn_start_pool(pool)
>>
>> +#define complete_pool_hash(pool, result, result_frags) \
>> +    ovn_complete_pool_hash(pool, result, result_frags)
>> +
>> +#define complete_pool_list(pool, result, result_frags) \
>> +    ovn_complete_pool_list(pool, result, result_frags)
>> +
>> +#define complete_pool_callback(pool, fin_result, result_frags,
>helper_func) \
>> +    ovn_complete_pool_callback(pool, fin_result, result_frags,
>helper_func)
>> +
>> +#define start_pool(pool) ovn_start_pool(pool)
>> +
>>  #define complete_pool_hash(pool, result, result_frags) \
>>      ovn_complete_pool_hash(pool, result, result_frags)
>>
>> @@ -352,6 +410,7 @@ bool ovn_resize_pool(struct worker_pool *pool, int
>size);
>>
>>  #define resize_pool(pool, size) ovn_resize_pool(pool, size)
>>
>> +#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()
>>
>>  #ifdef __clang__
>>  #pragma clang diagnostic pop
>> diff --git a/northd/northd.c b/northd/northd.c
>> index 7724d27e9..d6401fe62 100644
>> --- a/northd/northd.c
>> +++ b/northd/northd.c
>> @@ -4279,7 +4279,6 @@ ovn_lflow_equal(const struct ovn_lflow *a, const
>struct ovn_datapath *od,
>>  /* If this option is 'true' northd will combine logical flows that
>differ by
>>   * logical datapath only by creating a datapath group. */
>>  static bool use_logical_dp_groups = false;
>> -static bool use_parallel_build = true;
>>
>>  static void
>>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>> @@ -4298,7 +4297,7 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct
>ovn_datapath *od,
>>      lflow->ctrl_meter = ctrl_meter;
>>      lflow->dpg = NULL;
>>      lflow->where = where;
>> -    if (use_parallel_build && use_logical_dp_groups) {
>> +    if (get_parallel_processing() && use_logical_dp_groups) {
>>          ovs_mutex_init(&lflow->odg_lock);
>>      }
>>  }
>> @@ -4370,7 +4369,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct
>ovn_datapath *od,
>>                     nullable_xstrdup(ctrl_meter),
>>                     ovn_lflow_hint(stage_hint), where);
>>      hmapx_add(&lflow->od_group, od);
>> -    if (!use_parallel_build) {
>> +    if (!get_parallel_processing()) {
>>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>>      } else {
>>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> @@ -4441,7 +4440,7 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map,
>struct ovn_datapath *od,
>>      struct ovn_lflow *lflow;
>>
>>      ovs_assert(ovn_stage_to_datapath_type(stage) ==
>ovn_datapath_get_type(od));
>> -    if (use_logical_dp_groups && use_parallel_build) {
>> +    if (use_logical_dp_groups && get_parallel_processing()) {
>>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>>                                      match, actions, io_port, stage_hint,
>where,
>>                                      ctrl_meter);
>> @@ -4479,7 +4478,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow
>*lflow_ref,
>>          return false;
>>      }
>>
>> -    if (use_parallel_build && use_logical_dp_groups) {
>> +    if (get_parallel_processing() && use_logical_dp_groups) {
>>          ovs_mutex_lock(&lflow_ref->odg_lock);
>>          hmapx_add(&lflow_ref->od_group, od);
>>          ovs_mutex_unlock(&lflow_ref->odg_lock);
>> @@ -12962,7 +12961,8 @@ init_lflows_thread_pool(void)
>>  {
>>
>>      if (!pool_init_done) {
>> -        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
>> +        build_lflows_pool = add_worker_pool(build_lflows_thread, 0,
>> +                                            "lflows", true);
>>          pool_init_done = true;
>>      }
>>  }
>> @@ -12978,14 +12978,11 @@ build_lswitch_and_lrouter_flows(struct hmap
>*datapaths, struct hmap *ports,
>>
>>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>
>> -    if (use_parallel_build) {
>> +    if (get_parallel_processing()) {
>>          init_lflows_thread_pool();
>> -        if (!can_parallelize_hashes(false)) {
>> -            use_parallel_build = false;
>> -        }
>>      }
>>
>> -    if (use_parallel_build) {
>> +    if (get_parallel_processing()) {
>>          struct hmap *lflow_segs;
>>          struct lswitch_flow_build_info *lsiv;
>>          int index;
>> @@ -13185,19 +13182,19 @@ build_lflows(struct northd_context *ctx, struct
>hmap *datapaths,
>>      if (reset_parallel) {
>>          /* Parallel build was disabled before, we need to
>>           * re-enable it. */
>> -        use_parallel_build = true;
>> +        set_parallel_processing(true);
>>          reset_parallel = false;
>>      }
>>
>>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>>
>> -    if (use_parallel_build && use_logical_dp_groups &&
>> +    if (get_parallel_processing() && use_logical_dp_groups &&
>>          needs_parallel_init) {
>>          ovs_rwlock_init(&flowtable_lock);
>>          needs_parallel_init = false;
>>          /* Disable parallel build on first run with dp_groups
>>           * to determine the correct sizing of hashes. */
>> -        use_parallel_build = false;
>> +        set_parallel_processing(false);
>>          reset_parallel = true;
>>      }
>>      build_lswitch_and_lrouter_flows(datapaths, ports,
>> @@ -14279,10 +14276,6 @@ ovnnb_db_run(struct northd_context *ctx,
>>      ovsdb_idl_set_probe_interval(ctx->ovnnb_idl,
>northd_probe_interval_nb);
>>      ovsdb_idl_set_probe_interval(ctx->ovnsb_idl,
>northd_probe_interval_sb);
>>
>> -    use_parallel_build =
>> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
>> -         can_parallelize_hashes(false));
>> -
>>      use_logical_dp_groups = smap_get_bool(&nb->options,
>>                                            "use_logical_dp_groups", true);
>>      use_ct_inv_match = smap_get_bool(&nb->options,
>> @@ -14652,7 +14645,6 @@ ovn_db_run(struct northd_context *ctx,
>>      ovs_list_init(&lr_list);
>>      hmap_init(&datapaths);
>>      hmap_init(&ports);
>> -    use_parallel_build = ctx->use_parallel_build;
>>
>>      int64_t start_time = time_wall_msec();
>>      stopwatch_start(OVNNB_DB_RUN_STOPWATCH_NAME, time_msec());
>> diff --git a/northd/northd.h b/northd/northd.h
>> index ffa2bbb4e..5cbd183ef 100644
>> --- a/northd/northd.h
>> +++ b/northd/northd.h
>> @@ -27,8 +27,6 @@ struct northd_context {
>>      struct ovsdb_idl_index *sbrec_ha_chassis_grp_by_name;
>>      struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp;
>>      struct ovsdb_idl_index *sbrec_ip_mcast_by_dp;
>> -
>> -    bool use_parallel_build;
>>  };
>>
>>  void
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 42c0ad644..fb1268661 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -65,8 +65,6 @@ static const char *ssl_private_key_file;
>>  static const char *ssl_certificate_file;
>>  static const char *ssl_ca_cert_file;
>>
>> -static bool use_parallel_build = true;
>> -
>>  static const char *rbac_chassis_auth[] =
>>      {"name"};
>>  static const char *rbac_chassis_update[] =
>> @@ -622,7 +620,7 @@ main(int argc, char *argv[])
>>
>>      daemonize_complete();
>>
>> -    use_parallel_build = can_parallelize_hashes(false);
>> +    ovn_parallel_thread_pools_init();
>>
>>      /* We want to detect (almost) all changes to the ovn-nb db. */
>>      struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
>> @@ -941,7 +939,6 @@ main(int argc, char *argv[])
>>                  .sbrec_ha_chassis_grp_by_name =
>sbrec_ha_chassis_grp_by_name,
>>                  .sbrec_mcast_group_by_name_dp =
>sbrec_mcast_group_by_name_dp,
>>                  .sbrec_ip_mcast_by_dp = sbrec_ip_mcast_by_dp,
>> -                .use_parallel_build = use_parallel_build,
>>              };
>>
>>              if (!state.had_lock &&
>ovsdb_idl_has_lock(ovnsb_idl_loop.idl)) {
>> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
>> index f06f2e68e..958ce18b0 100644
>> --- a/tests/ovn-macros.at
>> +++ b/tests/ovn-macros.at
>> @@ -179,6 +179,18 @@ ovn_start_northd() {
>>      test -d "$ovs_base/$name" || mkdir "$ovs_base/$name"
>>      as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \
>>                 --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB
>> +    if test -z "$USE_PARALLEL_THREADS" ; then
>> +        USE_PARALLEL_THREADS=0
>> +    fi
>> +
>> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
>> +        case ${NORTHD_TYPE:=ovn-northd} in
>> +            ovn-northd) ovs-appctl --timeout=10 --target
>northd$suffix/ovn-northd \
>> +                            thread-pool/set-parallel-on
>$USE_PARALLEL_THREADS
>> +            ;;
>> +        esac
>> +    fi
>> +
>>  }
>>
>>  # ovn_start [--backup-northd=none|paused] [AZ]
>> @@ -252,10 +264,6 @@ ovn_start () {
>>      else
>>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>>      fi
>> -
>> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
>> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
>> -    fi
>>  }
>>
>>  # Interconnection networks.
>> --
>> 2.20.1
>>
>> _______________________________________________
>> dev mailing list
>> dev@openvswitch.org
>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
diff mbox series

Patch

diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index 1b3883441..6a6488a17 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -33,6 +33,7 @@ 
 #include "ovs-thread.h"
 #include "ovs-numa.h"
 #include "random.h"
+#include "unixctl.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
 
@@ -46,6 +47,7 @@  VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
  */
 static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
 static bool can_parallelize = false;
+static bool should_parallelize = false;
 
 /* This is set only in the process of exit and the set is
  * accompanied by a fence. It does not need to be atomic or be
@@ -83,7 +85,7 @@  static void *standard_helper_thread(void *arg);
 
 struct worker_pool *ovn_add_standard_pool(int size)
 {
-    return add_worker_pool(standard_helper_thread, size);
+    return add_worker_pool(standard_helper_thread, size, "default", true);
 }
 
 bool
@@ -92,6 +94,19 @@  ovn_stop_parallel_processing(struct worker_pool *pool)
     return pool->workers_must_exit;
 }
 
+bool
+ovn_set_parallel_processing(bool enable)
+{
+    should_parallelize = enable;
+    return can_parallelize;
+}
+
+bool
+ovn_get_parallel_processing(void)
+{
+    return can_parallelize && should_parallelize;
+}
+
 bool
 ovn_can_parallelize_hashes(bool force_parallel)
 {
@@ -117,6 +132,7 @@  destroy_pool(struct worker_pool *pool) {
     sem_close(pool->done);
     sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
     sem_unlink(sem_name);
+    free(pool->name);
     free(pool);
 }
 
@@ -127,6 +143,10 @@  ovn_resize_pool(struct worker_pool *pool, int size)
 
     ovs_assert(pool != NULL);
 
+    if (!pool->is_mutable) {
+        return false;
+    }
+
     if (!size) {
         size = pool_size;
     }
@@ -166,7 +186,8 @@  cleanup:
 
 
 struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *), int size)
+ovn_add_worker_pool(void *(*start)(void *), int size, char *name,
+                    bool is_mutable)
 {
     struct worker_pool *new_pool = NULL;
     bool test = false;
@@ -194,6 +215,8 @@  ovn_add_worker_pool(void *(*start)(void *), int size)
         new_pool = xmalloc(sizeof(struct worker_pool));
         new_pool->size = size;
         new_pool->start = start;
+        new_pool->is_mutable = is_mutable;
+        new_pool->name = xstrdup(name);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
         if (new_pool->done == SEM_FAILED) {
@@ -226,6 +249,7 @@  cleanup:
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         sem_unlink(sem_name);
     }
+    free(new_pool->name);
     ovs_mutex_unlock(&init_mutex);
     return NULL;
 }
@@ -342,8 +366,7 @@  ovn_complete_pool_callback(struct worker_pool *pool,
         }
     } while (completed < pool->size);
 }
-
-/* Complete a thread pool which uses a callback function to process results
+/* Run a thread pool which uses a callback function to process results
  */
 void
 ovn_run_pool_callback(struct worker_pool *pool,
@@ -352,8 +375,8 @@  ovn_run_pool_callback(struct worker_pool *pool,
                                           void *fin_result,
                                           void *result_frags, int index))
 {
-    ovn_start_pool(pool);
-    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func);
+    start_pool(pool);
+    complete_pool_callback(pool, fin_result, result_frags, helper_func);
 }
 
 /* Run a thread pool - basic, does not do results processing.
@@ -401,6 +424,28 @@  ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
     inc->n = 0;
 }
 
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+void
+ovn_complete_pool_hash(struct worker_pool *pool,
+                  struct hmap *result,
+                  struct hmap *result_frags)
+{
+    complete_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+void
+ovn_complete_pool_list(struct worker_pool *pool,
+                  struct ovs_list *result,
+                  struct ovs_list *result_frags)
+{
+    complete_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
 /* Run a thread pool which gathers results in an array
  * of hashes. Merge results.
  */
@@ -514,7 +559,7 @@  static struct worker_control *alloc_controls(int size)
 
 static void
 worker_pool_hook(void *aux OVS_UNUSED) {
-    static struct worker_pool *pool;
+    struct worker_pool *pool;
     char sem_name[256];
 
     /* All workers must honour the must_exit flag and check for it regularly.
@@ -628,4 +673,130 @@  standard_helper_thread(void *arg)
 }
 
 
+static void
+ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                            const char *argv[], void *unused OVS_UNUSED)
+{
+
+    struct worker_pool *pool;
+    int value;
+
+    if (!str_to_int(argv[2], 10, &value)) {
+        unixctl_command_reply_error(conn, "invalid argument");
+        return;
+    }
+
+    if (value > 0) {
+        pool_size = value;
+    }
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        if (strcmp(pool->name, argv[1]) == 0) {
+            resize_pool(pool, value);
+            unixctl_command_reply_error(conn, NULL);
+        }
+    }
+    unixctl_command_reply_error(conn, "pool not found");
+}
+
+static void
+ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                           const char *argv[] OVS_UNUSED,
+                           void *unused OVS_UNUSED)
+{
+
+    char *reply = NULL;
+    char *new_reply;
+    char buf[256];
+    struct worker_pool *pool;
+
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        snprintf(buf, 255, "%s : %d\n", pool->name, pool->size);
+        if (reply) {
+            new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);
+            ovs_strlcpy(new_reply, reply, strlen(reply));
+            strcat(new_reply, buf);
+            free(reply);
+        }
+        reply = new_reply;
+    }
+    unixctl_command_reply(conn, reply);
+}
+
+static void
+ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[], void *unused OVS_UNUSED)
+{
+    int value;
+    bool result;
+    if (!str_to_int(argv[1], 10, &value)) {
+        unixctl_command_reply_error(conn, "invalid argument");
+        return;
+    }
+
+    if (!ovn_can_parallelize_hashes(true)) {
+        unixctl_command_reply_error(conn, "cannot enable parallel processing");
+        return;
+    }
+
+    if (value > 0) {
+        /* Change default pool size */
+        ovs_mutex_lock(&init_mutex);
+        pool_size = value;
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    result = ovn_set_parallel_processing(true);
+    unixctl_command_reply(conn, result ? "enabled" : "disabled");
+}
+
+static void
+ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,
+                                 int argc OVS_UNUSED,
+                                 const char *argv[] OVS_UNUSED,
+                                 void *unused OVS_UNUSED)
+{
+    ovn_set_parallel_processing(false);
+    unixctl_command_reply(conn, NULL);
+}
+
+static void
+ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[] OVS_UNUSED,
+                                void *unused OVS_UNUSED)
+{
+    char status[256];
+
+    sprintf(status, "%s, default pool size %d",
+            get_parallel_processing() ? "active" : "inactive",
+            pool_size);
+
+    unixctl_command_reply(conn, status);
+}
+
+void
+ovn_parallel_thread_pools_init(void)
+{
+    bool test = false;
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools(false);
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1,
+                             ovn_thread_pool_set_parallel_on, NULL);
+    unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0,
+                             ovn_thread_pool_set_parallel_off, NULL);
+    unixctl_command_register("thread-pool/status", "", 0, 0,
+                             ovn_thread_pool_parallel_status, NULL);
+    unixctl_command_register("thread-pool/list", "", 0, 0,
+                             ovn_thread_pool_list_pools, NULL);
+    unixctl_command_register("thread-pool/reload-pool", "Pool Threads", 2, 2,
+                             ovn_thread_pool_resize_pool, NULL);
+}
+
 #endif
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 4cdd5c4e5..9c0a69cb1 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -33,6 +33,7 @@  extern "C" {
 #include "openvswitch/hmap.h"
 #include "openvswitch/thread.h"
 #include "ovs-atomic.h"
+#include "unixctl.h"
 
 /* Process this include only if OVS does not supply parallel definitions
  */
@@ -93,6 +94,8 @@  struct worker_pool {
     sem_t *done; /* Work completion semaphorew. */
     void *(*start)(void *); /* Work function. */
     bool workers_must_exit; /* Pool to be destroyed flag. */
+    char *name; /* Name to be used in cli commands */
+    bool is_mutable; /* Can the pool be reloaded with different params */
 };
 
 
@@ -109,7 +112,9 @@  struct helper_data {
  * size uses system defaults.
  */
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),
+                                        int size, char *name,
+                                        bool is_mutable);
 
 struct worker_pool *ovn_add_standard_pool(int size);
 
@@ -188,6 +193,35 @@  void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,
                            void *fin_result, void *result_frags, int index));
 
 
+/* Start a pool. Do not wait for any results. They will be collected
+ * using the _complete_ functions.
+ */
+void ovn_start_pool(struct worker_pool *pool);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_complete_pool_hash(struct worker_pool *pool,
+                       struct hmap *result, struct hmap *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from list frags into a final list result.
+ */
+
+void ovn_complete_pool_list(struct worker_pool *pool,
+                       struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Call a callback function to perform processing of results.
+ */
+
+void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,
+                           void *result_frags,
+                           void (*helper_func)(struct worker_pool *pool,
+                           void *fin_result, void *result_frags, int index));
+
 /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
  * would land, or a null pointer if that bucket is empty. */
 
@@ -298,10 +332,16 @@  static inline void init_hash_row_locks(struct hashrow_locks *hrl)
 
 bool ovn_can_parallelize_hashes(bool force_parallel);
 
+bool ovn_set_parallel_processing(bool enable);
+
+bool ovn_get_parallel_processing(void);
+
 void ovn_destroy_pool(struct worker_pool *pool);
 
 bool ovn_resize_pool(struct worker_pool *pool, int size);
 
+void ovn_parallel_thread_pools_init(void);
+
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
@@ -312,9 +352,16 @@  bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
 
+#define set_parallel_processing(enable) ovn_set_parallel_processing(enable)
+
+#define get_parallel_processing() ovn_get_parallel_processing()
+
+#define enable_parallel_processing() ovn_enable_parallel_processing()
+
 #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
 
-#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
+#define add_worker_pool(start, size, name, is_mutable) \
+        ovn_add_worker_pool(start, size, name, is_mutable)
 
 #define add_standard_pool(start, size) ovn_add_standard_pool(start, size)
 
@@ -339,6 +386,17 @@  bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define start_pool(pool) ovn_start_pool(pool)
 
+#define complete_pool_hash(pool, result, result_frags) \
+    ovn_complete_pool_hash(pool, result, result_frags)
+
+#define complete_pool_list(pool, result, result_frags) \
+    ovn_complete_pool_list(pool, result, result_frags)
+
+#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \
+    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func)
+
+#define start_pool(pool) ovn_start_pool(pool)
+
 #define complete_pool_hash(pool, result, result_frags) \
     ovn_complete_pool_hash(pool, result, result_frags)
 
@@ -352,6 +410,7 @@  bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define resize_pool(pool, size) ovn_resize_pool(pool, size)
 
+#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()
 
 #ifdef __clang__
 #pragma clang diagnostic pop
diff --git a/northd/northd.c b/northd/northd.c
index 7724d27e9..d6401fe62 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -4279,7 +4279,6 @@  ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_datapath *od,
 /* If this option is 'true' northd will combine logical flows that differ by
  * logical datapath only by creating a datapath group. */
 static bool use_logical_dp_groups = false;
-static bool use_parallel_build = true;
 
 static void
 ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
@@ -4298,7 +4297,7 @@  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
     lflow->ctrl_meter = ctrl_meter;
     lflow->dpg = NULL;
     lflow->where = where;
-    if (use_parallel_build && use_logical_dp_groups) {
+    if (get_parallel_processing() && use_logical_dp_groups) {
         ovs_mutex_init(&lflow->odg_lock);
     }
 }
@@ -4370,7 +4369,7 @@  do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
                    nullable_xstrdup(ctrl_meter),
                    ovn_lflow_hint(stage_hint), where);
     hmapx_add(&lflow->od_group, od);
-    if (!use_parallel_build) {
+    if (!get_parallel_processing()) {
         hmap_insert(lflow_map, &lflow->hmap_node, hash);
     } else {
         hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
@@ -4441,7 +4440,7 @@  ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
     struct ovn_lflow *lflow;
 
     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
-    if (use_logical_dp_groups && use_parallel_build) {
+    if (use_logical_dp_groups && get_parallel_processing()) {
         lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
                                     match, actions, io_port, stage_hint, where,
                                     ctrl_meter);
@@ -4479,7 +4478,7 @@  ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
         return false;
     }
 
-    if (use_parallel_build && use_logical_dp_groups) {
+    if (get_parallel_processing() && use_logical_dp_groups) {
         ovs_mutex_lock(&lflow_ref->odg_lock);
         hmapx_add(&lflow_ref->od_group, od);
         ovs_mutex_unlock(&lflow_ref->odg_lock);
@@ -12962,7 +12961,8 @@  init_lflows_thread_pool(void)
 {
 
     if (!pool_init_done) {
-        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
+        build_lflows_pool = add_worker_pool(build_lflows_thread, 0,
+                                            "lflows", true);
         pool_init_done = true;
     }
 }
@@ -12978,14 +12978,11 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         init_lflows_thread_pool();
-        if (!can_parallelize_hashes(false)) {
-            use_parallel_build = false;
-        }
     }
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         struct hmap *lflow_segs;
         struct lswitch_flow_build_info *lsiv;
         int index;
@@ -13185,19 +13182,19 @@  build_lflows(struct northd_context *ctx, struct hmap *datapaths,
     if (reset_parallel) {
         /* Parallel build was disabled before, we need to
          * re-enable it. */
-        use_parallel_build = true;
+        set_parallel_processing(true);
         reset_parallel = false;
     }
 
     fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
-    if (use_parallel_build && use_logical_dp_groups &&
+    if (get_parallel_processing() && use_logical_dp_groups &&
         needs_parallel_init) {
         ovs_rwlock_init(&flowtable_lock);
         needs_parallel_init = false;
         /* Disable parallel build on first run with dp_groups
          * to determine the correct sizing of hashes. */
-        use_parallel_build = false;
+        set_parallel_processing(false);
         reset_parallel = true;
     }
     build_lswitch_and_lrouter_flows(datapaths, ports,
@@ -14279,10 +14276,6 @@  ovnnb_db_run(struct northd_context *ctx,
     ovsdb_idl_set_probe_interval(ctx->ovnnb_idl, northd_probe_interval_nb);
     ovsdb_idl_set_probe_interval(ctx->ovnsb_idl, northd_probe_interval_sb);
 
-    use_parallel_build =
-        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
-         can_parallelize_hashes(false));
-
     use_logical_dp_groups = smap_get_bool(&nb->options,
                                           "use_logical_dp_groups", true);
     use_ct_inv_match = smap_get_bool(&nb->options,
@@ -14652,7 +14645,6 @@  ovn_db_run(struct northd_context *ctx,
     ovs_list_init(&lr_list);
     hmap_init(&datapaths);
     hmap_init(&ports);
-    use_parallel_build = ctx->use_parallel_build;
 
     int64_t start_time = time_wall_msec();
     stopwatch_start(OVNNB_DB_RUN_STOPWATCH_NAME, time_msec());
diff --git a/northd/northd.h b/northd/northd.h
index ffa2bbb4e..5cbd183ef 100644
--- a/northd/northd.h
+++ b/northd/northd.h
@@ -27,8 +27,6 @@  struct northd_context {
     struct ovsdb_idl_index *sbrec_ha_chassis_grp_by_name;
     struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp;
     struct ovsdb_idl_index *sbrec_ip_mcast_by_dp;
-
-    bool use_parallel_build;
 };
 
 void
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 42c0ad644..fb1268661 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -65,8 +65,6 @@  static const char *ssl_private_key_file;
 static const char *ssl_certificate_file;
 static const char *ssl_ca_cert_file;
 
-static bool use_parallel_build = true;
-
 static const char *rbac_chassis_auth[] =
     {"name"};
 static const char *rbac_chassis_update[] =
@@ -622,7 +620,7 @@  main(int argc, char *argv[])
 
     daemonize_complete();
 
-    use_parallel_build = can_parallelize_hashes(false);
+    ovn_parallel_thread_pools_init();
 
     /* We want to detect (almost) all changes to the ovn-nb db. */
     struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
@@ -941,7 +939,6 @@  main(int argc, char *argv[])
                 .sbrec_ha_chassis_grp_by_name = sbrec_ha_chassis_grp_by_name,
                 .sbrec_mcast_group_by_name_dp = sbrec_mcast_group_by_name_dp,
                 .sbrec_ip_mcast_by_dp = sbrec_ip_mcast_by_dp,
-                .use_parallel_build = use_parallel_build,
             };
 
             if (!state.had_lock && ovsdb_idl_has_lock(ovnsb_idl_loop.idl)) {
diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
index f06f2e68e..958ce18b0 100644
--- a/tests/ovn-macros.at
+++ b/tests/ovn-macros.at
@@ -179,6 +179,18 @@  ovn_start_northd() {
     test -d "$ovs_base/$name" || mkdir "$ovs_base/$name"
     as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \
                --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB
+    if test -z "$USE_PARALLEL_THREADS" ; then
+        USE_PARALLEL_THREADS=0
+    fi
+
+    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
+        case ${NORTHD_TYPE:=ovn-northd} in
+            ovn-northd) ovs-appctl --timeout=10 --target northd$suffix/ovn-northd \
+                            thread-pool/set-parallel-on $USE_PARALLEL_THREADS
+            ;;
+        esac
+    fi
+
 }
 
 # ovn_start [--backup-northd=none|paused] [AZ]
@@ -252,10 +264,6 @@  ovn_start () {
     else
         ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
     fi
-
-    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
-        ovn-nbctl set NB_Global . options:use_parallel_build=true
-    fi
 }
 
 # Interconnection networks.