diff mbox series

[ovs-dev,v4,3/9] ovn-northd: introduce parallel lflow build

Message ID 20200925095807.19358-4-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v4,1/9] ovn-libs: Add support for parallel processing | expand

Commit Message

Anton Ivanov Sept. 25, 2020, 9:58 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

1. Add support for parallel lflow build.
2. Move combined lflow generation to be build in parallel.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 northd/ovn-northd.c | 197 ++++++++++++++++++++++++++++++++++++++------
 1 file changed, 171 insertions(+), 26 deletions(-)

Comments

Mark Michelson Oct. 13, 2020, 7:33 p.m. UTC | #1
On 9/25/20 5:58 AM, anton.ivanov@cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> 1. Add support for parallel lflow build.
> 2. Move combined lflow generation to be build in parallel.
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---
>   northd/ovn-northd.c | 197 ++++++++++++++++++++++++++++++++++++++------
>   1 file changed, 171 insertions(+), 26 deletions(-)
> 
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 66b6c2985..225f4ca8e 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -48,6 +48,7 @@
>   #include "unixctl.h"
>   #include "util.h"
>   #include "uuid.h"
> +#include "fasthmap.h"
>   #include "openvswitch/vlog.h"
>   
>   VLOG_DEFINE_THIS_MODULE(ovn_northd);
> @@ -4189,7 +4190,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>       ovn_lflow_init(lflow, od, stage, priority,
>                      xstrdup(match), xstrdup(actions),
>                      ovn_lflow_hint(stage_hint), where);
> -    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>   }
>   
>   /* Adds a row with the specified contents to the Logical_Flow table. */
> @@ -11331,6 +11332,8 @@ struct lswitch_flow_build_info {
>   
>   /* Helper function to combine all lflow generation which is iterated by
>    * datapath.
> + * Invoked by parallel build over a "chunk" of work or by single threaded
> + * build over a chunk which is initialized to contain "all" work.
>    */
>   
>   static void
> @@ -11357,6 +11360,8 @@ build_lswitch_and_lrouter_iterate_by_od(
>   }
>   
>   /* Helper function to combine all lflow generation which is iterated by port.
> + * Invoked by parallel build over a "chunk" of work or by single threaded
> + * build over a chunk which is initialized to contain "all" work.
>    */
>   
>   static void
> @@ -11379,6 +11384,85 @@ build_lswitch_and_lrouter_iterate_by_op(
>                                                    &lsi->actions);
>   }
>   
> +struct lflows_thread_pool {
> +    struct worker_pool *pool;
> +};
> +
> +static void *build_lflows_thread(void *arg) {
> +    struct worker_control *control = (struct worker_control *) arg;
> +    struct lflows_thread_pool *workload;
> +    struct lswitch_flow_build_info *lsi;
> +
> +    struct ovn_datapath *od;
> +    struct ovn_port *op;
> +    int bnum;
> +
> +    while (!seize_fire()) {
> +        sem_wait(&control->fire);
> +        workload = (struct lflows_thread_pool *) control->workload;
> +        lsi = (struct lswitch_flow_build_info *) control->data;
> +        if (lsi && workload) {
> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
> +            for (bnum = control->id;
> +                    bnum <= lsi->datapaths->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (
> +                        od, key_node, bnum, lsi->datapaths) {
> +                    if (seize_fire()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
> +                }
> +            }
> +            for (bnum = control->id;
> +                    bnum <= lsi->ports->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (
> +                        op, key_node, bnum, lsi->ports) {
> +                    if (seize_fire()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
> +                }
> +            }
> +            atomic_store_relaxed(&control->finished, true);
> +            atomic_thread_fence(memory_order_release);
> +        }
> +        sem_post(control->done);
> +     }
> +    return NULL;
> +}
> +
> +static struct lflows_thread_pool *build_lflows_pool = NULL;
> +
> +static void init_lflows_thread_pool(void)
> +{
> +    int index;
> +
> +    if (!build_lflows_pool) {
> +        build_lflows_pool =
> +            xmalloc(sizeof(struct lflows_thread_pool));
> +        build_lflows_pool->pool =
> +            add_worker_pool(build_lflows_thread);
> +
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            build_lflows_pool->pool->controls[index].workload =
> +                build_lflows_pool;
> +        }
> +    }
> +}
> +
> +/* TODO: replace hard cutoffs by configurable via commands. These are
> + * temporary defines to determine single-thread to multi-thread processing
> + * cutoff.
> + * Setting to 1 forces "all parallel" lflow build.
> + */
> +
> +#define OD_CUTOFF 1
> +#define OP_CUTOFF 1
> +
>   static void
>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>                       struct hmap *port_groups, struct hmap *lflows,
> @@ -11386,35 +11470,89 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>                       struct shash *meter_groups,
>                       struct hmap *lbs)
>   {
> +    char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>   
> -    struct ovn_datapath *od;
> -    struct ovn_port *op;
> +    if (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF) {
>   
> -    char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
> +        struct hmap *lflow_segs;
> +        struct lswitch_flow_build_info *lsiv;
> +        int index;
>   
> -    struct lswitch_flow_build_info lsi = {
> -        .datapaths = datapaths,
> -        .ports = ports,
> -        .port_groups = port_groups,
> -        .lflows = lflows,
> -        .mcgroups = mcgroups,
> -        .igmp_groups = igmp_groups,
> -        .meter_groups = meter_groups,
> -        .lbs = lbs,
> -        .svc_check_match = svc_check_match,
> -        .match = DS_EMPTY_INITIALIZER,
> -        .actions = DS_EMPTY_INITIALIZER,
> -    };
> +        init_lflows_thread_pool();
> +        lsiv = xmalloc(
> +            sizeof(struct lswitch_flow_build_info) *
> +                build_lflows_pool->pool->size);
> +        lflow_segs = xmalloc(
> +            sizeof(struct hmap) * build_lflows_pool->pool->size);
>   
> -    /* Combined build - all lflow generation from lswitch and lrouter
> -     * will move here and will be reogranized by iterator type.
> -     */
> -    HMAP_FOR_EACH (od, key_node, datapaths) {
> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> -    }
> -    HMAP_FOR_EACH (op, key_node, ports) {
> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> +        /* Set up "work chunks" for each thread to work on. */
> +
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            fast_hmap_init(&lflow_segs[index], lflows->mask);
> +
> +            lsiv[index].datapaths = datapaths;
> +            lsiv[index].ports = ports;
> +            lsiv[index].port_groups = port_groups;
> +            lsiv[index].lflows = &lflow_segs[index];
> +            lsiv[index].mcgroups = mcgroups;
> +            lsiv[index].igmp_groups = igmp_groups;
> +            lsiv[index].meter_groups = meter_groups;
> +            lsiv[index].lbs = lbs;
> +            lsiv[index].svc_check_match = svc_check_match;
> +
> +            /* This cast is needed. While you can initialize without
> +             * casting, you cannot assign a struct at once without a cast
> +             * and that is what DS_EMPTY_INITALIZER returns at present. */
> +
> +            lsiv[index].match = (struct ds) DS_EMPTY_INITIALIZER;
> +            lsiv[index].actions = (struct ds) DS_EMPTY_INITIALIZER;

You can avoid the cast (and the need for the comment) by calling 
ds_init() instead of using the initializer macro.

> +
> +            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +        }
> +
> +        /* Run thread pool. */
> +
> +        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            ds_destroy(&lsiv[index].match);
> +            ds_destroy(&lsiv[index].actions);
> +        }
> +
> +        free(lflow_segs);
> +        free(lsiv);
> +    } else {
> +        struct ovn_datapath *od;
> +        struct ovn_port *op;
> +
> +        struct lswitch_flow_build_info lsi = {
> +            .datapaths = datapaths,
> +            .ports = ports,
> +            .port_groups = port_groups,
> +            .lflows = lflows,
> +            .mcgroups = mcgroups,
> +            .igmp_groups = igmp_groups,
> +            .meter_groups = meter_groups,
> +            .lbs = lbs,
> +            .svc_check_match = svc_check_match,
> +            .match = DS_EMPTY_INITIALIZER,
> +            .actions = DS_EMPTY_INITIALIZER,
> +        };
> +
> +
> +        /* Converged build - all lflow generation from lswitch and lrouter
> +         * will move here and will be reogranized by iterator type.
> +         */
> +        HMAP_FOR_EACH (od, key_node, datapaths) {
> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> +        }
> +        HMAP_FOR_EACH (op, key_node, ports) {
> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> +        }
> +        ds_destroy(&lsi.match);
> +        ds_destroy(&lsi.actions);
>       }
> +
>       free(svc_check_match);
>   
>       /* Legacy lswitch build - to be migrated. */
> @@ -11425,6 +11563,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>       build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
>   }
>   
> +static ssize_t max_seen_lflow_size = 128;
>   
>   /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
>    * constructing their contents based on the OVN_NB database. */
> @@ -11435,12 +11574,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
>                struct shash *meter_groups,
>                struct hmap *lbs)
>   {
> -    struct hmap lflows = HMAP_INITIALIZER(&lflows);
> +    struct hmap lflows;
> +
> +    fast_hmap_size_for(&lflows, max_seen_lflow_size);
>   
>       build_lswitch_and_lrouter_flows(datapaths, ports,
>                                       port_groups, &lflows, mcgroups,
>                                       igmp_groups, meter_groups, lbs);
>   
> +    if (hmap_count(&lflows) > max_seen_lflow_size) {
> +        max_seen_lflow_size = hmap_count(&lflows);
> +    }
> +
>       /* Push changes to the Logical_Flow table to database. */
>       const struct sbrec_logical_flow *sbflow, *next_sbflow;
>       SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
>
diff mbox series

Patch

diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 66b6c2985..225f4ca8e 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -48,6 +48,7 @@ 
 #include "unixctl.h"
 #include "util.h"
 #include "uuid.h"
+#include "fasthmap.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_northd);
@@ -4189,7 +4190,7 @@  ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
     ovn_lflow_init(lflow, od, stage, priority,
                    xstrdup(match), xstrdup(actions),
                    ovn_lflow_hint(stage_hint), where);
-    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
+    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
 }
 
 /* Adds a row with the specified contents to the Logical_Flow table. */
@@ -11331,6 +11332,8 @@  struct lswitch_flow_build_info {
 
 /* Helper function to combine all lflow generation which is iterated by
  * datapath.
+ * Invoked by parallel build over a "chunk" of work or by single threaded
+ * build over a chunk which is initialized to contain "all" work.
  */
 
 static void
@@ -11357,6 +11360,8 @@  build_lswitch_and_lrouter_iterate_by_od(
 }
 
 /* Helper function to combine all lflow generation which is iterated by port.
+ * Invoked by parallel build over a "chunk" of work or by single threaded
+ * build over a chunk which is initialized to contain "all" work.
  */
 
 static void
@@ -11379,6 +11384,85 @@  build_lswitch_and_lrouter_iterate_by_op(
                                                  &lsi->actions);
 }
 
+struct lflows_thread_pool {
+    struct worker_pool *pool;
+};
+
+static void *build_lflows_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lflows_thread_pool *workload;
+    struct lswitch_flow_build_info *lsi;
+
+    struct ovn_datapath *od;
+    struct ovn_port *op;
+    int bnum;
+
+    while (!seize_fire()) {
+        sem_wait(&control->fire);
+        workload = (struct lflows_thread_pool *) control->workload;
+        lsi = (struct lswitch_flow_build_info *) control->data;
+        if (lsi && workload) {
+            /* Iterate over bucket ThreadID, ThreadID+size, ... */
+            for (bnum = control->id;
+                    bnum <= lsi->datapaths->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        od, key_node, bnum, lsi->datapaths) {
+                    if (seize_fire()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
+                }
+            }
+            for (bnum = control->id;
+                    bnum <= lsi->ports->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        op, key_node, bnum, lsi->ports) {
+                    if (seize_fire()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
+                }
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_release);
+        }
+        sem_post(control->done);
+     }
+    return NULL;
+}
+
+static struct lflows_thread_pool *build_lflows_pool = NULL;
+
+static void init_lflows_thread_pool(void)
+{
+    int index;
+
+    if (!build_lflows_pool) {
+        build_lflows_pool =
+            xmalloc(sizeof(struct lflows_thread_pool));
+        build_lflows_pool->pool =
+            add_worker_pool(build_lflows_thread);
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            build_lflows_pool->pool->controls[index].workload =
+                build_lflows_pool;
+        }
+    }
+}
+
+/* TODO: replace hard cutoffs by configurable via commands. These are
+ * temporary defines to determine single-thread to multi-thread processing
+ * cutoff.
+ * Setting to 1 forces "all parallel" lflow build.
+ */
+
+#define OD_CUTOFF 1
+#define OP_CUTOFF 1
+
 static void
 build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                     struct hmap *port_groups, struct hmap *lflows,
@@ -11386,35 +11470,89 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                     struct shash *meter_groups,
                     struct hmap *lbs)
 {
+    char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    struct ovn_datapath *od;
-    struct ovn_port *op;
+    if (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF) {
 
-    char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
+        struct hmap *lflow_segs;
+        struct lswitch_flow_build_info *lsiv;
+        int index;
 
-    struct lswitch_flow_build_info lsi = {
-        .datapaths = datapaths,
-        .ports = ports,
-        .port_groups = port_groups,
-        .lflows = lflows,
-        .mcgroups = mcgroups,
-        .igmp_groups = igmp_groups,
-        .meter_groups = meter_groups,
-        .lbs = lbs,
-        .svc_check_match = svc_check_match,
-        .match = DS_EMPTY_INITIALIZER,
-        .actions = DS_EMPTY_INITIALIZER,
-    };
+        init_lflows_thread_pool();
+        lsiv = xmalloc(
+            sizeof(struct lswitch_flow_build_info) *
+                build_lflows_pool->pool->size);
+        lflow_segs = xmalloc(
+            sizeof(struct hmap) * build_lflows_pool->pool->size);
 
-    /* Combined build - all lflow generation from lswitch and lrouter
-     * will move here and will be reogranized by iterator type.
-     */
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
-    }
-    HMAP_FOR_EACH (op, key_node, ports) {
-        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+        /* Set up "work chunks" for each thread to work on. */
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            fast_hmap_init(&lflow_segs[index], lflows->mask);
+
+            lsiv[index].datapaths = datapaths;
+            lsiv[index].ports = ports;
+            lsiv[index].port_groups = port_groups;
+            lsiv[index].lflows = &lflow_segs[index];
+            lsiv[index].mcgroups = mcgroups;
+            lsiv[index].igmp_groups = igmp_groups;
+            lsiv[index].meter_groups = meter_groups;
+            lsiv[index].lbs = lbs;
+            lsiv[index].svc_check_match = svc_check_match;
+
+            /* This cast is needed. While you can initialize without
+             * casting, you cannot assign a struct at once without a cast
+             * and that is what DS_EMPTY_INITALIZER returns at present. */
+
+            lsiv[index].match = (struct ds) DS_EMPTY_INITIALIZER;
+            lsiv[index].actions = (struct ds) DS_EMPTY_INITIALIZER;
+
+            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+        }
+
+        /* Run thread pool. */
+
+        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            ds_destroy(&lsiv[index].match);
+            ds_destroy(&lsiv[index].actions);
+        }
+
+        free(lflow_segs);
+        free(lsiv);
+    } else {
+        struct ovn_datapath *od;
+        struct ovn_port *op;
+
+        struct lswitch_flow_build_info lsi = {
+            .datapaths = datapaths,
+            .ports = ports,
+            .port_groups = port_groups,
+            .lflows = lflows,
+            .mcgroups = mcgroups,
+            .igmp_groups = igmp_groups,
+            .meter_groups = meter_groups,
+            .lbs = lbs,
+            .svc_check_match = svc_check_match,
+            .match = DS_EMPTY_INITIALIZER,
+            .actions = DS_EMPTY_INITIALIZER,
+        };
+
+
+        /* Converged build - all lflow generation from lswitch and lrouter
+         * will move here and will be reogranized by iterator type.
+         */
+        HMAP_FOR_EACH (od, key_node, datapaths) {
+            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
+        }
+        HMAP_FOR_EACH (op, key_node, ports) {
+            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+        }
+        ds_destroy(&lsi.match);
+        ds_destroy(&lsi.actions);
     }
+
     free(svc_check_match);
 
     /* Legacy lswitch build - to be migrated. */
@@ -11425,6 +11563,7 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
     build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
 }
 
+static ssize_t max_seen_lflow_size = 128;
 
 /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
  * constructing their contents based on the OVN_NB database. */
@@ -11435,12 +11574,18 @@  build_lflows(struct northd_context *ctx, struct hmap *datapaths,
              struct shash *meter_groups,
              struct hmap *lbs)
 {
-    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap lflows;
+
+    fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
     build_lswitch_and_lrouter_flows(datapaths, ports,
                                     port_groups, &lflows, mcgroups,
                                     igmp_groups, meter_groups, lbs);
 
+    if (hmap_count(&lflows) > max_seen_lflow_size) {
+        max_seen_lflow_size = hmap_count(&lflows);
+    }
+
     /* Push changes to the Logical_Flow table to database. */
     const struct sbrec_logical_flow *sbflow, *next_sbflow;
     SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {