diff mbox series

[ovs-dev,v3,2/2] Add support for configuring parallelization via unixctl

Message ID 20210906145947.18521-2-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v3,1/2] Make changes to the parallel processing API to allow pool sizing | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test fail github build: failed
ovsrobot/github-robot-_ovn-kubernetes fail github build: failed

Commit Message

Anton Ivanov Sept. 6, 2021, 2:59 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/ovn-parallel-hmap.c | 209 ++++++++++++++++++++++++++++++++++++++--
 lib/ovn-parallel-hmap.h |  63 +++++++++++-
 northd/ovn-northd.c     |  26 ++---
 tests/ovn-macros.at     |  16 ++-
 4 files changed, 283 insertions(+), 31 deletions(-)
diff mbox series

Patch

diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index 30de457b5..8a055b2c6 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -33,6 +33,7 @@ 
 #include "ovs-thread.h"
 #include "ovs-numa.h"
 #include "random.h"
+#include "unixctl.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
 
@@ -46,6 +47,7 @@  VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
  */
 static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
 static bool can_parallelize = false;
+static bool should_parallelize = false;
 
 /* This is set only in the process of exit and the set is
  * accompanied by a fence. It does not need to be atomic or be
@@ -85,6 +87,19 @@  ovn_stop_parallel_processing(struct worker_pool *pool)
     return pool->workers_must_exit;
 }
 
+bool
+ovn_set_parallel_processing(bool enable)
+{
+    should_parallelize = enable;
+    return can_parallelize;
+}
+
+bool
+ovn_get_parallel_processing(void)
+{
+    return can_parallelize && should_parallelize;
+}
+
 bool
 ovn_can_parallelize_hashes(bool force_parallel)
 {
@@ -110,6 +125,7 @@  destroy_pool(struct worker_pool *pool) {
     sem_close(pool->done);
     sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
     sem_unlink(sem_name);
+    free(pool->name);
     free(pool);
 }
 
@@ -120,6 +136,10 @@  ovn_resize_pool(struct worker_pool *pool, int size)
 
     ovs_assert(pool != NULL);
 
+    if (!pool->is_mutable) {
+        return false;
+    }
+
     if (!size) {
         size = pool_size;
     }
@@ -159,7 +179,8 @@  cleanup:
 
 
 struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *), int size)
+ovn_add_worker_pool(void *(*start)(void *), int size, char *name,
+                    bool is_mutable)
 {
     struct worker_pool *new_pool = NULL;
     bool test = false;
@@ -187,6 +208,8 @@  ovn_add_worker_pool(void *(*start)(void *), int size)
         new_pool = xmalloc(sizeof(struct worker_pool));
         new_pool->size = size;
         new_pool->start = start;
+        new_pool->is_mutable = is_mutable;
+        new_pool->name = xstrdup(name);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
         if (new_pool->done == SEM_FAILED) {
@@ -219,6 +242,7 @@  cleanup:
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         sem_unlink(sem_name);
     }
+    free(new_pool->name);
     ovs_mutex_unlock(&init_mutex);
     return NULL;
 }
@@ -267,13 +291,9 @@  ovn_fast_hmap_size_for(struct hmap *hmap, int size)
 /* Run a thread pool which uses a callback function to process results
  */
 void
-ovn_run_pool_callback(struct worker_pool *pool,
-                      void *fin_result, void *result_frags,
-                      void (*helper_func)(struct worker_pool *pool,
-                                          void *fin_result,
-                                          void *result_frags, int index))
+ovn_start_pool(struct worker_pool *pool)
 {
-    int index, completed;
+    int index;
 
     /* Ensure that all worker threads see the same data as the
      * main thread.
@@ -284,8 +304,19 @@  ovn_run_pool_callback(struct worker_pool *pool,
     for (index = 0; index < pool->size; index++) {
         sem_post(pool->controls[index].fire);
     }
+}
+
 
-    completed = 0;
+/* Run a thread pool which uses a callback function to process results
+ */
+void
+ovn_complete_pool_callback(struct worker_pool *pool,
+                      void *fin_result, void *result_frags,
+                      void (*helper_func)(struct worker_pool *pool,
+                                          void *fin_result,
+                                          void *result_frags, int index))
+{
+    int index, completed = 0;
 
     do {
         bool test;
@@ -327,6 +358,18 @@  ovn_run_pool_callback(struct worker_pool *pool,
         }
     } while (completed < pool->size);
 }
+/* Run a thread pool which uses a callback function to process results
+ */
+void
+ovn_run_pool_callback(struct worker_pool *pool,
+                      void *fin_result, void *result_frags,
+                      void (*helper_func)(struct worker_pool *pool,
+                                          void *fin_result,
+                                          void *result_frags, int index))
+{
+    start_pool(pool);
+    complete_pool_callback(pool, fin_result, result_frags, helper_func);
+}
 
 /* Run a thread pool - basic, does not do results processing.
  */
@@ -373,6 +416,28 @@  ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
     inc->n = 0;
 }
 
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+void
+ovn_complete_pool_hash(struct worker_pool *pool,
+                  struct hmap *result,
+                  struct hmap *result_frags)
+{
+    complete_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+void
+ovn_complete_pool_list(struct worker_pool *pool,
+                  struct ovs_list *result,
+                  struct ovs_list *result_frags)
+{
+    complete_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
 /* Run a thread pool which gathers results in an array
  * of hashes. Merge results.
  */
@@ -486,7 +551,7 @@  static struct worker_control *alloc_controls(int size)
 
 static void
 worker_pool_hook(void *aux OVS_UNUSED) {
-    static struct worker_pool *pool;
+    struct worker_pool *pool;
     char sem_name[256];
 
     /* All workers must honour the must_exit flag and check for it regularly.
@@ -564,4 +629,130 @@  merge_hash_results(struct worker_pool *pool OVS_UNUSED,
     hmap_destroy(&res_frags[index]);
 }
 
+static void
+ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                            const char *argv[], void *unused OVS_UNUSED)
+{
+
+    struct worker_pool *pool;
+    int value;
+
+    if (!str_to_int(argv[2], 10, &value)) {
+        unixctl_command_reply_error(conn, "invalid argument");
+        return;
+    }
+
+    if (value > 0) {
+        pool_size = value;
+    }
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        if (strcmp(pool->name, argv[1]) == 0) {
+            resize_pool(pool, value);
+            unixctl_command_reply_error(conn, NULL);
+        }
+    }
+    unixctl_command_reply_error(conn, "pool not found");
+}
+
+static void
+ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                           const char *argv[] OVS_UNUSED,
+                           void *unused OVS_UNUSED)
+{
+
+    char *reply = NULL;
+    char *new_reply;
+    char buf[256];
+    struct worker_pool *pool;
+
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        snprintf(buf, 255, "%s : %d\n", pool->name, pool->size);
+        if (reply) {
+            new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);
+            ovs_strlcpy(new_reply, reply, strlen(reply));
+            strcat(new_reply, buf);
+            free(reply);
+        }
+        reply = new_reply;
+    }
+    unixctl_command_reply(conn, reply);
+}
+
+static void
+ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[], void *unused OVS_UNUSED)
+{
+    int value;
+    bool result;
+    if (!str_to_int(argv[1], 10, &value)) {
+        unixctl_command_reply_error(conn, "invalid argument");
+        return;
+    }
+
+    if (!ovn_can_parallelize_hashes(true)) {
+        unixctl_command_reply_error(conn, "cannot enable parallel processing");
+        return;
+    }
+
+    if (value > 0) {
+        /* Change default pool size */
+        ovs_mutex_lock(&init_mutex);
+        pool_size = value;
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    result = ovn_set_parallel_processing(true);
+    unixctl_command_reply(conn, result ? "enabled" : "disabled");
+}
+
+static void
+ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,
+                                 int argc OVS_UNUSED,
+                                 const char *argv[] OVS_UNUSED,
+                                 void *unused OVS_UNUSED)
+{
+    ovn_set_parallel_processing(false);
+    unixctl_command_reply(conn, NULL);
+}
+
+static void
+ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[] OVS_UNUSED,
+                                void *unused OVS_UNUSED)
+{
+    char status[256];
+
+    sprintf(status, "%s, default pool size %d",
+            get_parallel_processing() ? "active" : "inactive",
+            pool_size);
+
+    unixctl_command_reply(conn, status);
+}
+
+void
+ovn_parallel_thread_pools_init(void)
+{
+    bool test = false;
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools(false);
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1,
+                             ovn_thread_pool_set_parallel_on, NULL);
+    unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0,
+                             ovn_thread_pool_set_parallel_off, NULL);
+    unixctl_command_register("thread-pool/status", "", 0, 0,
+                             ovn_thread_pool_parallel_status, NULL);
+    unixctl_command_register("thread-pool/list", "", 0, 0,
+                             ovn_thread_pool_list_pools, NULL);
+    unixctl_command_register("thread-pool/reload-pool", "Pool Threads", 2, 2,
+                             ovn_thread_pool_resize_pool, NULL);
+}
+
 #endif
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 4708f41f2..403d37736 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -33,6 +33,7 @@  extern "C" {
 #include "openvswitch/hmap.h"
 #include "openvswitch/thread.h"
 #include "ovs-atomic.h"
+#include "unixctl.h"
 
 /* Process this include only if OVS does not supply parallel definitions
  */
@@ -93,6 +94,8 @@  struct worker_pool {
     sem_t *done; /* Work completion semaphorew. */
     void *(*start)(void *); /* Work function. */
     bool workers_must_exit; /* Pool to be destroyed flag. */
+    char *name; /* Name to be used in cli commands */
+    bool is_mutable; /* Can the pool be reloaded with different params */
 };
 
 /* Add a worker pool for thread function start() which expects a pointer to
@@ -101,7 +104,9 @@  struct worker_pool {
  * size uses system defaults.
  */
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),
+                                        int size, char *name,
+                                        bool is_mutable);
 
 /* Setting this to true will make all processing threads exit */
 
@@ -149,6 +154,35 @@  void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
                            void *fin_result, void *result_frags, int index));
 
 
+/* Start a pool. Do not wait for any results. They will be collected
+ * using the _complete_ functions.
+ */
+void ovn_start_pool(struct worker_pool *pool);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_complete_pool_hash(struct worker_pool *pool,
+                       struct hmap *result, struct hmap *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from list frags into a final list result.
+ */
+
+void ovn_complete_pool_list(struct worker_pool *pool,
+                       struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Call a callback function to perform processing of results.
+ */
+
+void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,
+                           void *result_frags,
+                           void (*helper_func)(struct worker_pool *pool,
+                           void *fin_result, void *result_frags, int index));
+
 /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
  * would land, or a null pointer if that bucket is empty. */
 
@@ -259,10 +293,16 @@  static inline void init_hash_row_locks(struct hashrow_locks *hrl)
 
 bool ovn_can_parallelize_hashes(bool force_parallel);
 
+bool ovn_set_parallel_processing(bool enable);
+
+bool ovn_get_parallel_processing(void);
+
 void ovn_destroy_pool(struct worker_pool *pool);
 
 bool ovn_resize_pool(struct worker_pool *pool, int size);
 
+void ovn_parallel_thread_pools_init(void);
+
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
@@ -273,9 +313,16 @@  bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
 
+#define set_parallel_processing(enable) ovn_set_parallel_processing(enable)
+
+#define get_parallel_processing() ovn_get_parallel_processing()
+
+#define enable_parallel_processing() ovn_enable_parallel_processing()
+
 #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
 
-#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
+#define add_worker_pool(start, size, name, is_mutable) \
+        ovn_add_worker_pool(start, size, name, is_mutable)
 
 #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
 
@@ -296,10 +343,22 @@  bool ovn_resize_pool(struct worker_pool *pool, int size);
 #define run_pool_callback(pool, fin_result, result_frags, helper_func) \
     ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
 
+#define start_pool(pool) ovn_start_pool(pool)
+
+#define complete_pool_hash(pool, result, result_frags) \
+    ovn_complete_pool_hash(pool, result, result_frags)
+
+#define complete_pool_list(pool, result, result_frags) \
+    ovn_complete_pool_list(pool, result, result_frags)
+
+#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \
+    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func)
+
 #define destroy_pool(pool) ovn_destroy_pool(pool)
 
 #define resize_pool(pool, size) ovn_resize_pool(pool, size)
 
+#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()
 
 #ifdef __clang__
 #pragma clang diagnostic pop
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 324800c32..2f7728646 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -4358,7 +4358,6 @@  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
 /* If this option is 'true' northd will combine logical flows that differ by
  * logical datapath only by creating a datapath group. */
 static bool use_logical_dp_groups = false;
-static bool use_parallel_build = true;
 
 static struct hashrow_locks lflow_locks;
 
@@ -4395,7 +4394,7 @@  do_ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
                    nullable_xstrdup(ctrl_meter),
                    ovn_lflow_hint(stage_hint), where);
     hmapx_add(&lflow->od_group, od);
-    if (!use_parallel_build) {
+    if (!get_parallel_processing()) {
         hmap_insert(lflow_map, &lflow->hmap_node, hash);
     } else {
         hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
@@ -4414,7 +4413,7 @@  ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct ovn_datapath *od,
     struct ovn_lflow *lflow;
 
     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
-    if (use_logical_dp_groups && use_parallel_build) {
+    if (use_logical_dp_groups && get_parallel_processing()) {
         lock_hash_row(&lflow_locks, hash);
         lflow = do_ovn_lflow_add(lflow_map, od, hash, stage, priority, match,
                                  actions, io_port, stage_hint, where,
@@ -4454,7 +4453,7 @@  ovn_dp_group_add_with_reference(struct ovn_lflow *lflow_ref,
         return false;
     }
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         lock_hash_row(&lflow_locks, hash);
         hmapx_add(&lflow_ref->od_group, od);
         unlock_hash_row(&lflow_locks, hash);
@@ -12919,7 +12918,8 @@  static void
 init_lflows_thread_pool(void)
 {
     if (!pool_init_done) {
-        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
+        build_lflows_pool = add_worker_pool(build_lflows_thread, 0,
+                                            "lflows", true);
         pool_init_done = true;
     }
 }
@@ -12951,14 +12951,11 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         init_lflows_thread_pool();
-        if (!can_parallelize_hashes(false)) {
-            use_parallel_build = false;
-        }
     }
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         struct hmap *lflow_segs;
         struct lswitch_flow_build_info *lsiv;
         int index;
@@ -13154,7 +13151,7 @@  build_lflows(struct northd_context *ctx, struct hmap *datapaths,
     struct hmap lflows;
 
     fast_hmap_size_for(&lflows, max_seen_lflow_size);
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         update_hashrow_locks(&lflows, &lflow_locks);
     }
     build_lswitch_and_lrouter_flows(datapaths, ports,
@@ -14226,10 +14223,6 @@  ovnnb_db_run(struct northd_context *ctx,
     northd_probe_interval_nb = get_probe_interval(ovnnb_db, nb);
     northd_probe_interval_sb = get_probe_interval(ovnsb_db, nb);
 
-    use_parallel_build =
-        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
-         can_parallelize_hashes(false));
-
     use_logical_dp_groups = smap_get_bool(&nb->options,
                                           "use_logical_dp_groups", true);
     use_ct_inv_match = smap_get_bool(&nb->options,
@@ -15144,8 +15137,9 @@  main(int argc, char *argv[])
 
     daemonize_complete();
 
+    ovn_parallel_thread_pools_init();
+
     init_hash_row_locks(&lflow_locks);
-    use_parallel_build = can_parallelize_hashes(false);
 
     /* We want to detect (almost) all changes to the ovn-nb db. */
     struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
index f06f2e68e..958ce18b0 100644
--- a/tests/ovn-macros.at
+++ b/tests/ovn-macros.at
@@ -179,6 +179,18 @@  ovn_start_northd() {
     test -d "$ovs_base/$name" || mkdir "$ovs_base/$name"
     as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \
                --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB
+    if test -z "$USE_PARALLEL_THREADS" ; then
+        USE_PARALLEL_THREADS=0
+    fi
+
+    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
+        case ${NORTHD_TYPE:=ovn-northd} in
+            ovn-northd) ovs-appctl --timeout=10 --target northd$suffix/ovn-northd \
+                            thread-pool/set-parallel-on $USE_PARALLEL_THREADS
+            ;;
+        esac
+    fi
+
 }
 
 # ovn_start [--backup-northd=none|paused] [AZ]
@@ -252,10 +264,6 @@  ovn_start () {
     else
         ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
     fi
-
-    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
-        ovn-nbctl set NB_Global . options:use_parallel_build=true
-    fi
 }
 
 # Interconnection networks.