diff mbox series

[ovs-dev,v7,02/14] ovn-northd: introduce parallel lflow build

Message ID 20201125110216.7944-3-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v7,01/14] ovn-libs: Add support for parallel processing | expand

Commit Message

Anton Ivanov Nov. 25, 2020, 11:02 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

1. Add support for parallel lflow build.
2. Move combined lflow generation to be build in parallel.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 northd/ovn-northd.c | 201 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 172 insertions(+), 29 deletions(-)

Comments

Numan Siddique Dec. 21, 2020, 12:06 p.m. UTC | #1
On Wed, Nov 25, 2020 at 4:32 PM <anton.ivanov@cambridgegreys.com> wrote:
>
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>
> 1. Add support for parallel lflow build.
> 2. Move combined lflow generation to be build in parallel.
>
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Hi Anton,

Sorry for  the delay in reviews. I think now that we have branched and
released v20.12.0. I think these patches can be considered.
Can you please repost the patches rebasing and making the parallel
building disabled by default.
And a configuration option to enable it. Something like
  - ovn-nbctl set NB_Global . enable_parallel_lflow_build=true (or a
better name if you have in mind)

Recently Ilya added datapath groups feature and that is disabled by
default [1]. I think we can take the similar approach to begin with
and flip it later.

[1] - https://github.com/ovn-org/ovn/commit/44c323a077af3709a111a6156850fd77f9302f5e

Thanks
Numan

> ---
>  northd/ovn-northd.c | 201 +++++++++++++++++++++++++++++++++++++-------
>  1 file changed, 172 insertions(+), 29 deletions(-)
>
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 47a177c99..076a6d27f 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -49,6 +49,7 @@
>  #include "unixctl.h"
>  #include "util.h"
>  #include "uuid.h"
> +#include "fasthmap.h"
>  #include "openvswitch/vlog.h"
>
>  VLOG_DEFINE_THIS_MODULE(ovn_northd);
> @@ -4152,7 +4153,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>      ovn_lflow_init(lflow, od, stage, priority,
>                     xstrdup(match), xstrdup(actions),
>                     ovn_lflow_hint(stage_hint), where);
> -    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>  }
>
>  /* Adds a row with the specified contents to the Logical_Flow table. */
> @@ -11100,6 +11101,8 @@ struct lswitch_flow_build_info {
>
>  /* Helper function to combine all lflow generation which is iterated by
>   * datapath.
> + * Invoked by parallel build over a "chunk" of work or by single threaded
> + * build over a chunk which is initialized to contain "all" work.
>   */
>
>  static void
> @@ -11134,6 +11137,8 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od,
>  }
>
>  /* Helper function to combine all lflow generation which is iterated by port.
> + * Invoked by parallel build over a "chunk" of work or by single threaded
> + * build over a chunk which is initialized to contain "all" work.
>   */
>
>  static void
> @@ -11161,6 +11166,88 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>                                              &lsi->match, &lsi->actions);
>  }
>
> +struct lflows_thread_pool {
> +    struct worker_pool *pool;
> +};
> +
> +static void *build_lflows_thread(void *arg) {
> +    struct worker_control *control = (struct worker_control *) arg;
> +    struct lflows_thread_pool *workload;
> +    struct lswitch_flow_build_info *lsi;
> +
> +    struct ovn_datapath *od;
> +    struct ovn_port *op;
> +    int bnum;
> +
> +    while (!cease_fire()) {
> +        sem_wait(&control->fire);
> +        workload = (struct lflows_thread_pool *) control->workload;
> +        lsi = (struct lswitch_flow_build_info *) control->data;
> +        if (lsi && workload) {
> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
> +            for (bnum = control->id;
> +                    bnum <= lsi->datapaths->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (
> +                        od, key_node, bnum, lsi->datapaths) {
> +                    if (cease_fire()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
> +                }
> +            }
> +            for (bnum = control->id;
> +                    bnum <= lsi->ports->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (
> +                        op, key_node, bnum, lsi->ports) {
> +                    if (cease_fire()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
> +                }
> +            }
> +            atomic_store_relaxed(&control->finished, true);
> +            atomic_thread_fence(memory_order_release);
> +        }
> +        sem_post(control->done);
> +     }
> +    return NULL;
> +}
> +
> +static bool pool_init_done = false;
> +static struct lflows_thread_pool *build_lflows_pool = NULL;
> +
> +static void init_lflows_thread_pool(void)
> +{
> +    int index;
> +
> +    if (!pool_init_done) {
> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> +        pool_init_done = true;
> +        if (pool) {
> +            build_lflows_pool =
> +                xmalloc(sizeof(struct lflows_thread_pool));
> +            build_lflows_pool->pool = pool;
> +            for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +                build_lflows_pool->pool->controls[index].workload =
> +                    build_lflows_pool;
> +            }
> +        }
> +    }
> +}
> +
> +/* TODO: replace hard cutoffs by configurable via commands. These are
> + * temporary defines to determine single-thread to multi-thread processing
> + * cutoff.
> + * Setting to 1 forces "all parallel" lflow build.
> + */
> +
> +#define OD_CUTOFF 1
> +#define OP_CUTOFF 1
> +
>  static void
>  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>                                  struct hmap *port_groups, struct hmap *lflows,
> @@ -11168,38 +11255,87 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>                                  struct hmap *igmp_groups,
>                                  struct shash *meter_groups, struct hmap *lbs)
>  {
> -    struct ovn_datapath *od;
> -    struct ovn_port *op;
> -
>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>
> -    struct lswitch_flow_build_info lsi = {
> -        .datapaths = datapaths,
> -        .ports = ports,
> -        .port_groups = port_groups,
> -        .lflows = lflows,
> -        .mcgroups = mcgroups,
> -        .igmp_groups = igmp_groups,
> -        .meter_groups = meter_groups,
> -        .lbs = lbs,
> -        .svc_check_match = svc_check_match,
> -        .match = DS_EMPTY_INITIALIZER,
> -        .actions = DS_EMPTY_INITIALIZER,
> -    };
> +    init_lflows_thread_pool();
>
> -    /* Combined build - all lflow generation from lswitch and lrouter
> -     * will move here and will be reogranized by iterator type.
> -     */
> -    HMAP_FOR_EACH (od, key_node, datapaths) {
> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> -    }
> -    HMAP_FOR_EACH (op, key_node, ports) {
> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> +    if (build_lflows_pool &&
> +        (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF)) {
> +
> +        struct hmap *lflow_segs;
> +        struct lswitch_flow_build_info *lsiv;
> +        int index;
> +
> +        lsiv = xmalloc(
> +            sizeof(struct lswitch_flow_build_info) *
> +                build_lflows_pool->pool->size);
> +        lflow_segs = xmalloc(
> +            sizeof(struct hmap) * build_lflows_pool->pool->size);
> +
> +        /* Set up "work chunks" for each thread to work on. */
> +
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            fast_hmap_init(&lflow_segs[index], lflows->mask);
> +
> +            lsiv[index].datapaths = datapaths;
> +            lsiv[index].ports = ports;
> +            lsiv[index].port_groups = port_groups;
> +            lsiv[index].lflows = &lflow_segs[index];
> +            lsiv[index].mcgroups = mcgroups;
> +            lsiv[index].igmp_groups = igmp_groups;
> +            lsiv[index].meter_groups = meter_groups;
> +            lsiv[index].lbs = lbs;
> +            lsiv[index].svc_check_match = svc_check_match;
> +            ds_init(&lsiv[index].match);
> +            ds_init(&lsiv[index].actions);
> +
> +            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +        }
> +
> +        /* Run thread pool. */
> +
> +        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            ds_destroy(&lsiv[index].match);
> +            ds_destroy(&lsiv[index].actions);
> +        }
> +
> +        free(lflow_segs);
> +        free(lsiv);
> +    } else {
> +        struct ovn_datapath *od;
> +        struct ovn_port *op;
> +
> +        struct lswitch_flow_build_info lsi = {
> +            .datapaths = datapaths,
> +            .ports = ports,
> +            .port_groups = port_groups,
> +            .lflows = lflows,
> +            .mcgroups = mcgroups,
> +            .igmp_groups = igmp_groups,
> +            .meter_groups = meter_groups,
> +            .lbs = lbs,
> +            .svc_check_match = svc_check_match,
> +            .match = DS_EMPTY_INITIALIZER,
> +            .actions = DS_EMPTY_INITIALIZER,
> +        };
> +
> +
> +        /* Converged build - all lflow generation from lswitch and lrouter
> +         * will move here and will be reogranized by iterator type.
> +         */
> +        HMAP_FOR_EACH (od, key_node, datapaths) {
> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> +        }
> +        HMAP_FOR_EACH (op, key_node, ports) {
> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> +        }
> +        ds_destroy(&lsi.match);
> +        ds_destroy(&lsi.actions);
>      }
> -    free(svc_check_match);
>
> -    ds_destroy(&lsi.match);
> -    ds_destroy(&lsi.actions);
> +    free(svc_check_match);
>
>      /* Legacy lswitch build - to be migrated. */
>      build_lswitch_flows(datapaths, ports, lflows, mcgroups,
> @@ -11209,6 +11345,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>      build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
>  }
>
> +static ssize_t max_seen_lflow_size = 128;
>
>  /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
>   * constructing their contents based on the OVN_NB database. */
> @@ -11219,12 +11356,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
>               struct shash *meter_groups,
>               struct hmap *lbs)
>  {
> -    struct hmap lflows = HMAP_INITIALIZER(&lflows);
> +    struct hmap lflows;
> +
> +    fast_hmap_size_for(&lflows, max_seen_lflow_size);
>
>      build_lswitch_and_lrouter_flows(datapaths, ports,
>                                      port_groups, &lflows, mcgroups,
>                                      igmp_groups, meter_groups, lbs);
>
> +    if (hmap_count(&lflows) > max_seen_lflow_size) {
> +        max_seen_lflow_size = hmap_count(&lflows);
> +    }
> +
>      /* Push changes to the Logical_Flow table to database. */
>      const struct sbrec_logical_flow *sbflow, *next_sbflow;
>      SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
> --
> 2.20.1
>
Anton Ivanov Dec. 21, 2020, 12:29 p.m. UTC | #2
On 21/12/2020 12:06, Numan Siddique wrote:
> On Wed, Nov 25, 2020 at 4:32 PM <anton.ivanov@cambridgegreys.com> wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> 1. Add support for parallel lflow build.
>> 2. Move combined lflow generation to be build in parallel.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> Hi Anton,
>
> Sorry for  the delay in reviews. I think now that we have branched and
> released v20.12.0. I think these patches can be considered.
> Can you please repost the patches rebasing and making the parallel
> building disabled by default.
> And a configuration option to enable it. Something like
>    - ovn-nbctl set NB_Global . enable_parallel_lflow_build=true (or a
> better name if you have in mind)
>
> Recently Ilya added datapath groups feature and that is disabled by
> default [1]. I think we can take the similar approach to begin with
> and flip it later.

Cool.

I am on PTO as I have some days to burn, I will have a look at it the 
moment I am back at my desk.

A.

>
> [1] - https://github.com/ovn-org/ovn/commit/44c323a077af3709a111a6156850fd77f9302f5e
>
> Thanks
> Numan
>
>> ---
>>   northd/ovn-northd.c | 201 +++++++++++++++++++++++++++++++++++++-------
>>   1 file changed, 172 insertions(+), 29 deletions(-)
>>
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 47a177c99..076a6d27f 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -49,6 +49,7 @@
>>   #include "unixctl.h"
>>   #include "util.h"
>>   #include "uuid.h"
>> +#include "fasthmap.h"
>>   #include "openvswitch/vlog.h"
>>
>>   VLOG_DEFINE_THIS_MODULE(ovn_northd);
>> @@ -4152,7 +4153,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>>       ovn_lflow_init(lflow, od, stage, priority,
>>                      xstrdup(match), xstrdup(actions),
>>                      ovn_lflow_hint(stage_hint), where);
>> -    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>>   }
>>
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>> @@ -11100,6 +11101,8 @@ struct lswitch_flow_build_info {
>>
>>   /* Helper function to combine all lflow generation which is iterated by
>>    * datapath.
>> + * Invoked by parallel build over a "chunk" of work or by single threaded
>> + * build over a chunk which is initialized to contain "all" work.
>>    */
>>
>>   static void
>> @@ -11134,6 +11137,8 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od,
>>   }
>>
>>   /* Helper function to combine all lflow generation which is iterated by port.
>> + * Invoked by parallel build over a "chunk" of work or by single threaded
>> + * build over a chunk which is initialized to contain "all" work.
>>    */
>>
>>   static void
>> @@ -11161,6 +11166,88 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>                                               &lsi->match, &lsi->actions);
>>   }
>>
>> +struct lflows_thread_pool {
>> +    struct worker_pool *pool;
>> +};
>> +
>> +static void *build_lflows_thread(void *arg) {
>> +    struct worker_control *control = (struct worker_control *) arg;
>> +    struct lflows_thread_pool *workload;
>> +    struct lswitch_flow_build_info *lsi;
>> +
>> +    struct ovn_datapath *od;
>> +    struct ovn_port *op;
>> +    int bnum;
>> +
>> +    while (!cease_fire()) {
>> +        sem_wait(&control->fire);
>> +        workload = (struct lflows_thread_pool *) control->workload;
>> +        lsi = (struct lswitch_flow_build_info *) control->data;
>> +        if (lsi && workload) {
>> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->datapaths->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        od, key_node, bnum, lsi->datapaths) {
>> +                    if (cease_fire()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->ports->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        op, key_node, bnum, lsi->ports) {
>> +                    if (cease_fire()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>> +                }
>> +            }
>> +            atomic_store_relaxed(&control->finished, true);
>> +            atomic_thread_fence(memory_order_release);
>> +        }
>> +        sem_post(control->done);
>> +     }
>> +    return NULL;
>> +}
>> +
>> +static bool pool_init_done = false;
>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>> +
>> +static void init_lflows_thread_pool(void)
>> +{
>> +    int index;
>> +
>> +    if (!pool_init_done) {
>> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> +        pool_init_done = true;
>> +        if (pool) {
>> +            build_lflows_pool =
>> +                xmalloc(sizeof(struct lflows_thread_pool));
>> +            build_lflows_pool->pool = pool;
>> +            for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +                build_lflows_pool->pool->controls[index].workload =
>> +                    build_lflows_pool;
>> +            }
>> +        }
>> +    }
>> +}
>> +
>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>> + * temporary defines to determine single-thread to multi-thread processing
>> + * cutoff.
>> + * Setting to 1 forces "all parallel" lflow build.
>> + */
>> +
>> +#define OD_CUTOFF 1
>> +#define OP_CUTOFF 1
>> +
>>   static void
>>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>                                   struct hmap *port_groups, struct hmap *lflows,
>> @@ -11168,38 +11255,87 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>                                   struct hmap *igmp_groups,
>>                                   struct shash *meter_groups, struct hmap *lbs)
>>   {
>> -    struct ovn_datapath *od;
>> -    struct ovn_port *op;
>> -
>>       char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>
>> -    struct lswitch_flow_build_info lsi = {
>> -        .datapaths = datapaths,
>> -        .ports = ports,
>> -        .port_groups = port_groups,
>> -        .lflows = lflows,
>> -        .mcgroups = mcgroups,
>> -        .igmp_groups = igmp_groups,
>> -        .meter_groups = meter_groups,
>> -        .lbs = lbs,
>> -        .svc_check_match = svc_check_match,
>> -        .match = DS_EMPTY_INITIALIZER,
>> -        .actions = DS_EMPTY_INITIALIZER,
>> -    };
>> +    init_lflows_thread_pool();
>>
>> -    /* Combined build - all lflow generation from lswitch and lrouter
>> -     * will move here and will be reogranized by iterator type.
>> -     */
>> -    HMAP_FOR_EACH (od, key_node, datapaths) {
>> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (op, key_node, ports) {
>> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +    if (build_lflows_pool &&
>> +        (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF)) {
>> +
>> +        struct hmap *lflow_segs;
>> +        struct lswitch_flow_build_info *lsiv;
>> +        int index;
>> +
>> +        lsiv = xmalloc(
>> +            sizeof(struct lswitch_flow_build_info) *
>> +                build_lflows_pool->pool->size);
>> +        lflow_segs = xmalloc(
>> +            sizeof(struct hmap) * build_lflows_pool->pool->size);
>> +
>> +        /* Set up "work chunks" for each thread to work on. */
>> +
>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +            fast_hmap_init(&lflow_segs[index], lflows->mask);
>> +
>> +            lsiv[index].datapaths = datapaths;
>> +            lsiv[index].ports = ports;
>> +            lsiv[index].port_groups = port_groups;
>> +            lsiv[index].lflows = &lflow_segs[index];
>> +            lsiv[index].mcgroups = mcgroups;
>> +            lsiv[index].igmp_groups = igmp_groups;
>> +            lsiv[index].meter_groups = meter_groups;
>> +            lsiv[index].lbs = lbs;
>> +            lsiv[index].svc_check_match = svc_check_match;
>> +            ds_init(&lsiv[index].match);
>> +            ds_init(&lsiv[index].actions);
>> +
>> +            build_lflows_pool->pool->controls[index].data = &lsiv[index];
>> +        }
>> +
>> +        /* Run thread pool. */
>> +
>> +        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +
>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +            ds_destroy(&lsiv[index].match);
>> +            ds_destroy(&lsiv[index].actions);
>> +        }
>> +
>> +        free(lflow_segs);
>> +        free(lsiv);
>> +    } else {
>> +        struct ovn_datapath *od;
>> +        struct ovn_port *op;
>> +
>> +        struct lswitch_flow_build_info lsi = {
>> +            .datapaths = datapaths,
>> +            .ports = ports,
>> +            .port_groups = port_groups,
>> +            .lflows = lflows,
>> +            .mcgroups = mcgroups,
>> +            .igmp_groups = igmp_groups,
>> +            .meter_groups = meter_groups,
>> +            .lbs = lbs,
>> +            .svc_check_match = svc_check_match,
>> +            .match = DS_EMPTY_INITIALIZER,
>> +            .actions = DS_EMPTY_INITIALIZER,
>> +        };
>> +
>> +
>> +        /* Converged build - all lflow generation from lswitch and lrouter
>> +         * will move here and will be reogranized by iterator type.
>> +         */
>> +        HMAP_FOR_EACH (od, key_node, datapaths) {
>> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (op, key_node, ports) {
>> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +        }
>> +        ds_destroy(&lsi.match);
>> +        ds_destroy(&lsi.actions);
>>       }
>> -    free(svc_check_match);
>>
>> -    ds_destroy(&lsi.match);
>> -    ds_destroy(&lsi.actions);
>> +    free(svc_check_match);
>>
>>       /* Legacy lswitch build - to be migrated. */
>>       build_lswitch_flows(datapaths, ports, lflows, mcgroups,
>> @@ -11209,6 +11345,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>       build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
>>   }
>>
>> +static ssize_t max_seen_lflow_size = 128;
>>
>>   /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
>>    * constructing their contents based on the OVN_NB database. */
>> @@ -11219,12 +11356,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
>>                struct shash *meter_groups,
>>                struct hmap *lbs)
>>   {
>> -    struct hmap lflows = HMAP_INITIALIZER(&lflows);
>> +    struct hmap lflows;
>> +
>> +    fast_hmap_size_for(&lflows, max_seen_lflow_size);
>>
>>       build_lswitch_and_lrouter_flows(datapaths, ports,
>>                                       port_groups, &lflows, mcgroups,
>>                                       igmp_groups, meter_groups, lbs);
>>
>> +    if (hmap_count(&lflows) > max_seen_lflow_size) {
>> +        max_seen_lflow_size = hmap_count(&lflows);
>> +    }
>> +
>>       /* Push changes to the Logical_Flow table to database. */
>>       const struct sbrec_logical_flow *sbflow, *next_sbflow;
>>       SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
>> --
>> 2.20.1
>>
Anton Ivanov Jan. 5, 2021, 11:21 a.m. UTC | #3
On 21/12/2020 12:06, Numan Siddique wrote:
> On Wed, Nov 25, 2020 at 4:32 PM <anton.ivanov@cambridgegreys.com> wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> 1. Add support for parallel lflow build.
>> 2. Move combined lflow generation to be build in parallel.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> Hi Anton,
>
> Sorry for  the delay in reviews. I think now that we have branched and
> released v20.12.0. I think these patches can be considered.
> Can you please repost the patches rebasing and making the parallel
> building disabled by default.
> And a configuration option to enable it. Something like
>    - ovn-nbctl set NB_Global . enable_parallel_lflow_build=true (or a
> better name if you have in mind)
>
> Recently Ilya added datapath groups feature and that is disabled by
> default [1]. I think we can take the similar approach to begin with
> and flip it later.

I went through that changeset. It is still possible to process in parallel with DGs. Non-trivial, but doable. It will require fine-grained locking on "slices" of the lflow table based on hash modulo. It looks like this can be contained 100% inside ovn_lflow_add_at().

As far as running in parallel with DGs off (default at present), this is mostly identical to the old version of the patch.

I am pretty sure that it is possible to merge both use cases and lock only when DGs are in use.

In either case, we should finish the cleanup of build_lswitch_flows and build_lrouter_flows first.

These functions are still at 656 lines and 953 lines respectively. They need to be split into chunks which iterate by od, op, lb, etc prior to going parallel.

I will do the next version of that first.

A.

>
> [1] - https://github.com/ovn-org/ovn/commit/44c323a077af3709a111a6156850fd77f9302f5e
>
> Thanks
> Numan
>
>> ---
>>   northd/ovn-northd.c | 201 +++++++++++++++++++++++++++++++++++++-------
>>   1 file changed, 172 insertions(+), 29 deletions(-)
>>
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 47a177c99..076a6d27f 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -49,6 +49,7 @@
>>   #include "unixctl.h"
>>   #include "util.h"
>>   #include "uuid.h"
>> +#include "fasthmap.h"
>>   #include "openvswitch/vlog.h"
>>
>>   VLOG_DEFINE_THIS_MODULE(ovn_northd);
>> @@ -4152,7 +4153,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>>       ovn_lflow_init(lflow, od, stage, priority,
>>                      xstrdup(match), xstrdup(actions),
>>                      ovn_lflow_hint(stage_hint), where);
>> -    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>>   }
>>
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>> @@ -11100,6 +11101,8 @@ struct lswitch_flow_build_info {
>>
>>   /* Helper function to combine all lflow generation which is iterated by
>>    * datapath.
>> + * Invoked by parallel build over a "chunk" of work or by single threaded
>> + * build over a chunk which is initialized to contain "all" work.
>>    */
>>
>>   static void
>> @@ -11134,6 +11137,8 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od,
>>   }
>>
>>   /* Helper function to combine all lflow generation which is iterated by port.
>> + * Invoked by parallel build over a "chunk" of work or by single threaded
>> + * build over a chunk which is initialized to contain "all" work.
>>    */
>>
>>   static void
>> @@ -11161,6 +11166,88 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>                                               &lsi->match, &lsi->actions);
>>   }
>>
>> +struct lflows_thread_pool {
>> +    struct worker_pool *pool;
>> +};
>> +
>> +static void *build_lflows_thread(void *arg) {
>> +    struct worker_control *control = (struct worker_control *) arg;
>> +    struct lflows_thread_pool *workload;
>> +    struct lswitch_flow_build_info *lsi;
>> +
>> +    struct ovn_datapath *od;
>> +    struct ovn_port *op;
>> +    int bnum;
>> +
>> +    while (!cease_fire()) {
>> +        sem_wait(&control->fire);
>> +        workload = (struct lflows_thread_pool *) control->workload;
>> +        lsi = (struct lswitch_flow_build_info *) control->data;
>> +        if (lsi && workload) {
>> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->datapaths->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        od, key_node, bnum, lsi->datapaths) {
>> +                    if (cease_fire()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->ports->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        op, key_node, bnum, lsi->ports) {
>> +                    if (cease_fire()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>> +                }
>> +            }
>> +            atomic_store_relaxed(&control->finished, true);
>> +            atomic_thread_fence(memory_order_release);
>> +        }
>> +        sem_post(control->done);
>> +     }
>> +    return NULL;
>> +}
>> +
>> +static bool pool_init_done = false;
>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>> +
>> +static void init_lflows_thread_pool(void)
>> +{
>> +    int index;
>> +
>> +    if (!pool_init_done) {
>> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> +        pool_init_done = true;
>> +        if (pool) {
>> +            build_lflows_pool =
>> +                xmalloc(sizeof(struct lflows_thread_pool));
>> +            build_lflows_pool->pool = pool;
>> +            for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +                build_lflows_pool->pool->controls[index].workload =
>> +                    build_lflows_pool;
>> +            }
>> +        }
>> +    }
>> +}
>> +
>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>> + * temporary defines to determine single-thread to multi-thread processing
>> + * cutoff.
>> + * Setting to 1 forces "all parallel" lflow build.
>> + */
>> +
>> +#define OD_CUTOFF 1
>> +#define OP_CUTOFF 1
>> +
>>   static void
>>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>                                   struct hmap *port_groups, struct hmap *lflows,
>> @@ -11168,38 +11255,87 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>                                   struct hmap *igmp_groups,
>>                                   struct shash *meter_groups, struct hmap *lbs)
>>   {
>> -    struct ovn_datapath *od;
>> -    struct ovn_port *op;
>> -
>>       char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>
>> -    struct lswitch_flow_build_info lsi = {
>> -        .datapaths = datapaths,
>> -        .ports = ports,
>> -        .port_groups = port_groups,
>> -        .lflows = lflows,
>> -        .mcgroups = mcgroups,
>> -        .igmp_groups = igmp_groups,
>> -        .meter_groups = meter_groups,
>> -        .lbs = lbs,
>> -        .svc_check_match = svc_check_match,
>> -        .match = DS_EMPTY_INITIALIZER,
>> -        .actions = DS_EMPTY_INITIALIZER,
>> -    };
>> +    init_lflows_thread_pool();
>>
>> -    /* Combined build - all lflow generation from lswitch and lrouter
>> -     * will move here and will be reogranized by iterator type.
>> -     */
>> -    HMAP_FOR_EACH (od, key_node, datapaths) {
>> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (op, key_node, ports) {
>> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +    if (build_lflows_pool &&
>> +        (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF)) {
>> +
>> +        struct hmap *lflow_segs;
>> +        struct lswitch_flow_build_info *lsiv;
>> +        int index;
>> +
>> +        lsiv = xmalloc(
>> +            sizeof(struct lswitch_flow_build_info) *
>> +                build_lflows_pool->pool->size);
>> +        lflow_segs = xmalloc(
>> +            sizeof(struct hmap) * build_lflows_pool->pool->size);
>> +
>> +        /* Set up "work chunks" for each thread to work on. */
>> +
>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +            fast_hmap_init(&lflow_segs[index], lflows->mask);
>> +
>> +            lsiv[index].datapaths = datapaths;
>> +            lsiv[index].ports = ports;
>> +            lsiv[index].port_groups = port_groups;
>> +            lsiv[index].lflows = &lflow_segs[index];
>> +            lsiv[index].mcgroups = mcgroups;
>> +            lsiv[index].igmp_groups = igmp_groups;
>> +            lsiv[index].meter_groups = meter_groups;
>> +            lsiv[index].lbs = lbs;
>> +            lsiv[index].svc_check_match = svc_check_match;
>> +            ds_init(&lsiv[index].match);
>> +            ds_init(&lsiv[index].actions);
>> +
>> +            build_lflows_pool->pool->controls[index].data = &lsiv[index];
>> +        }
>> +
>> +        /* Run thread pool. */
>> +
>> +        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +
>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +            ds_destroy(&lsiv[index].match);
>> +            ds_destroy(&lsiv[index].actions);
>> +        }
>> +
>> +        free(lflow_segs);
>> +        free(lsiv);
>> +    } else {
>> +        struct ovn_datapath *od;
>> +        struct ovn_port *op;
>> +
>> +        struct lswitch_flow_build_info lsi = {
>> +            .datapaths = datapaths,
>> +            .ports = ports,
>> +            .port_groups = port_groups,
>> +            .lflows = lflows,
>> +            .mcgroups = mcgroups,
>> +            .igmp_groups = igmp_groups,
>> +            .meter_groups = meter_groups,
>> +            .lbs = lbs,
>> +            .svc_check_match = svc_check_match,
>> +            .match = DS_EMPTY_INITIALIZER,
>> +            .actions = DS_EMPTY_INITIALIZER,
>> +        };
>> +
>> +
>> +        /* Converged build - all lflow generation from lswitch and lrouter
>> +         * will move here and will be reogranized by iterator type.
>> +         */
>> +        HMAP_FOR_EACH (od, key_node, datapaths) {
>> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (op, key_node, ports) {
>> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +        }
>> +        ds_destroy(&lsi.match);
>> +        ds_destroy(&lsi.actions);
>>       }
>> -    free(svc_check_match);
>>
>> -    ds_destroy(&lsi.match);
>> -    ds_destroy(&lsi.actions);
>> +    free(svc_check_match);
>>
>>       /* Legacy lswitch build - to be migrated. */
>>       build_lswitch_flows(datapaths, ports, lflows, mcgroups,
>> @@ -11209,6 +11345,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>       build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
>>   }
>>
>> +static ssize_t max_seen_lflow_size = 128;
>>
>>   /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
>>    * constructing their contents based on the OVN_NB database. */
>> @@ -11219,12 +11356,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
>>                struct shash *meter_groups,
>>                struct hmap *lbs)
>>   {
>> -    struct hmap lflows = HMAP_INITIALIZER(&lflows);
>> +    struct hmap lflows;
>> +
>> +    fast_hmap_size_for(&lflows, max_seen_lflow_size);
>>
>>       build_lswitch_and_lrouter_flows(datapaths, ports,
>>                                       port_groups, &lflows, mcgroups,
>>                                       igmp_groups, meter_groups, lbs);
>>
>> +    if (hmap_count(&lflows) > max_seen_lflow_size) {
>> +        max_seen_lflow_size = hmap_count(&lflows);
>> +    }
>> +
>>       /* Push changes to the Logical_Flow table to database. */
>>       const struct sbrec_logical_flow *sbflow, *next_sbflow;
>>       SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
>> --
>> 2.20.1
>>
Anton Ivanov Jan. 6, 2021, 4:44 p.m. UTC | #4
On 05/01/2021 11:21, Anton Ivanov wrote:
>
> On 21/12/2020 12:06, Numan Siddique wrote:
>> On Wed, Nov 25, 2020 at 4:32 PM <anton.ivanov@cambridgegreys.com> wrote:
>>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>>
>>> 1. Add support for parallel lflow build.
>>> 2. Move combined lflow generation to be build in parallel.
>>>
>>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> Hi Anton,
>>
>> Sorry for  the delay in reviews. I think now that we have branched and
>> released v20.12.0. I think these patches can be considered.
>> Can you please repost the patches rebasing and making the parallel
>> building disabled by default.
>> And a configuration option to enable it. Something like
>>    - ovn-nbctl set NB_Global . enable_parallel_lflow_build=true (or a
>> better name if you have in mind)
>>
>> Recently Ilya added datapath groups feature and that is disabled by
>> default [1]. I think we can take the similar approach to begin with
>> and flip it later.
>
> I went through that changeset. It is still possible to process in parallel with DGs. Non-trivial, but doable. It will require fine-grained locking on "slices" of the lflow table based on hash modulo. It looks like this can be contained 100% inside ovn_lflow_add_at().
>
> As far as running in parallel with DGs off (default at present), this is mostly identical to the old version of the patch.
>
> I am pretty sure that it is possible to merge both use cases and lock only when DGs are in use.
>
> In either case, we should finish the cleanup of build_lswitch_flows and build_lrouter_flows first.
>
> These functions are still at 656 lines and 953 lines respectively. They need to be split into chunks which iterate by od, op, lb, etc prior to going parallel.
>
> I will do the next version of that first.

Hi Numan, hi all,

I sent the "prereqs" series yesterday and I now have a working version for parallel processing both with and without DPGs.

I will run some scale tests to see the performance impact of parallel processing when using DPGs to see if there is a point to have that at all.

Once I have some numbers, I will submit the patch. Probably tomorrow.

It will be incremental on top of the 11-patch series which I sent yesterday.

A.

>
> A.
>
>>
>> [1] - https://github.com/ovn-org/ovn/commit/44c323a077af3709a111a6156850fd77f9302f5e
>>
>> Thanks
>> Numan
>>
>>> ---
>>>   northd/ovn-northd.c | 201 +++++++++++++++++++++++++++++++++++++-------
>>>   1 file changed, 172 insertions(+), 29 deletions(-)
>>>
>>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>>> index 47a177c99..076a6d27f 100644
>>> --- a/northd/ovn-northd.c
>>> +++ b/northd/ovn-northd.c
>>> @@ -49,6 +49,7 @@
>>>   #include "unixctl.h"
>>>   #include "util.h"
>>>   #include "uuid.h"
>>> +#include "fasthmap.h"
>>>   #include "openvswitch/vlog.h"
>>>
>>>   VLOG_DEFINE_THIS_MODULE(ovn_northd);
>>> @@ -4152,7 +4153,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>>>       ovn_lflow_init(lflow, od, stage, priority,
>>>                      xstrdup(match), xstrdup(actions),
>>>                      ovn_lflow_hint(stage_hint), where);
>>> -    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>>> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
>>>   }
>>>
>>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>>> @@ -11100,6 +11101,8 @@ struct lswitch_flow_build_info {
>>>
>>>   /* Helper function to combine all lflow generation which is iterated by
>>>    * datapath.
>>> + * Invoked by parallel build over a "chunk" of work or by single threaded
>>> + * build over a chunk which is initialized to contain "all" work.
>>>    */
>>>
>>>   static void
>>> @@ -11134,6 +11137,8 @@ build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od,
>>>   }
>>>
>>>   /* Helper function to combine all lflow generation which is iterated by port.
>>> + * Invoked by parallel build over a "chunk" of work or by single threaded
>>> + * build over a chunk which is initialized to contain "all" work.
>>>    */
>>>
>>>   static void
>>> @@ -11161,6 +11166,88 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>> &lsi->match, &lsi->actions);
>>>   }
>>>
>>> +struct lflows_thread_pool {
>>> +    struct worker_pool *pool;
>>> +};
>>> +
>>> +static void *build_lflows_thread(void *arg) {
>>> +    struct worker_control *control = (struct worker_control *) arg;
>>> +    struct lflows_thread_pool *workload;
>>> +    struct lswitch_flow_build_info *lsi;
>>> +
>>> +    struct ovn_datapath *od;
>>> +    struct ovn_port *op;
>>> +    int bnum;
>>> +
>>> +    while (!cease_fire()) {
>>> +        sem_wait(&control->fire);
>>> +        workload = (struct lflows_thread_pool *) control->workload;
>>> +        lsi = (struct lswitch_flow_build_info *) control->data;
>>> +        if (lsi && workload) {
>>> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
>>> +            for (bnum = control->id;
>>> +                    bnum <= lsi->datapaths->mask;
>>> +                    bnum += workload->pool->size)
>>> +            {
>>> +                HMAP_FOR_EACH_IN_PARALLEL (
>>> +                        od, key_node, bnum, lsi->datapaths) {
>>> +                    if (cease_fire()) {
>>> +                        return NULL;
>>> +                    }
>>> + build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>>> +                }
>>> +            }
>>> +            for (bnum = control->id;
>>> +                    bnum <= lsi->ports->mask;
>>> +                    bnum += workload->pool->size)
>>> +            {
>>> +                HMAP_FOR_EACH_IN_PARALLEL (
>>> +                        op, key_node, bnum, lsi->ports) {
>>> +                    if (cease_fire()) {
>>> +                        return NULL;
>>> +                    }
>>> + build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>>> +                }
>>> +            }
>>> +            atomic_store_relaxed(&control->finished, true);
>>> +            atomic_thread_fence(memory_order_release);
>>> +        }
>>> +        sem_post(control->done);
>>> +     }
>>> +    return NULL;
>>> +}
>>> +
>>> +static bool pool_init_done = false;
>>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>>> +
>>> +static void init_lflows_thread_pool(void)
>>> +{
>>> +    int index;
>>> +
>>> +    if (!pool_init_done) {
>>> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>>> +        pool_init_done = true;
>>> +        if (pool) {
>>> +            build_lflows_pool =
>>> +                xmalloc(sizeof(struct lflows_thread_pool));
>>> +            build_lflows_pool->pool = pool;
>>> +            for (index = 0; index < build_lflows_pool->pool->size; index++) {
>>> + build_lflows_pool->pool->controls[index].workload =
>>> +                    build_lflows_pool;
>>> +            }
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>>> + * temporary defines to determine single-thread to multi-thread processing
>>> + * cutoff.
>>> + * Setting to 1 forces "all parallel" lflow build.
>>> + */
>>> +
>>> +#define OD_CUTOFF 1
>>> +#define OP_CUTOFF 1
>>> +
>>>   static void
>>>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>>                                   struct hmap *port_groups, struct hmap *lflows,
>>> @@ -11168,38 +11255,87 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>>                                   struct hmap *igmp_groups,
>>>                                   struct shash *meter_groups, struct hmap *lbs)
>>>   {
>>> -    struct ovn_datapath *od;
>>> -    struct ovn_port *op;
>>> -
>>>       char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>>
>>> -    struct lswitch_flow_build_info lsi = {
>>> -        .datapaths = datapaths,
>>> -        .ports = ports,
>>> -        .port_groups = port_groups,
>>> -        .lflows = lflows,
>>> -        .mcgroups = mcgroups,
>>> -        .igmp_groups = igmp_groups,
>>> -        .meter_groups = meter_groups,
>>> -        .lbs = lbs,
>>> -        .svc_check_match = svc_check_match,
>>> -        .match = DS_EMPTY_INITIALIZER,
>>> -        .actions = DS_EMPTY_INITIALIZER,
>>> -    };
>>> +    init_lflows_thread_pool();
>>>
>>> -    /* Combined build - all lflow generation from lswitch and lrouter
>>> -     * will move here and will be reogranized by iterator type.
>>> -     */
>>> -    HMAP_FOR_EACH (od, key_node, datapaths) {
>>> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>>> -    }
>>> -    HMAP_FOR_EACH (op, key_node, ports) {
>>> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>>> +    if (build_lflows_pool &&
>>> +        (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF)) {
>>> +
>>> +        struct hmap *lflow_segs;
>>> +        struct lswitch_flow_build_info *lsiv;
>>> +        int index;
>>> +
>>> +        lsiv = xmalloc(
>>> +            sizeof(struct lswitch_flow_build_info) *
>>> +                build_lflows_pool->pool->size);
>>> +        lflow_segs = xmalloc(
>>> +            sizeof(struct hmap) * build_lflows_pool->pool->size);
>>> +
>>> +        /* Set up "work chunks" for each thread to work on. */
>>> +
>>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>>> +            fast_hmap_init(&lflow_segs[index], lflows->mask);
>>> +
>>> +            lsiv[index].datapaths = datapaths;
>>> +            lsiv[index].ports = ports;
>>> +            lsiv[index].port_groups = port_groups;
>>> +            lsiv[index].lflows = &lflow_segs[index];
>>> +            lsiv[index].mcgroups = mcgroups;
>>> +            lsiv[index].igmp_groups = igmp_groups;
>>> +            lsiv[index].meter_groups = meter_groups;
>>> +            lsiv[index].lbs = lbs;
>>> +            lsiv[index].svc_check_match = svc_check_match;
>>> +            ds_init(&lsiv[index].match);
>>> +            ds_init(&lsiv[index].actions);
>>> +
>>> + build_lflows_pool->pool->controls[index].data = &lsiv[index];
>>> +        }
>>> +
>>> +        /* Run thread pool. */
>>> +
>>> +        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>>> +
>>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>>> +            ds_destroy(&lsiv[index].match);
>>> +            ds_destroy(&lsiv[index].actions);
>>> +        }
>>> +
>>> +        free(lflow_segs);
>>> +        free(lsiv);
>>> +    } else {
>>> +        struct ovn_datapath *od;
>>> +        struct ovn_port *op;
>>> +
>>> +        struct lswitch_flow_build_info lsi = {
>>> +            .datapaths = datapaths,
>>> +            .ports = ports,
>>> +            .port_groups = port_groups,
>>> +            .lflows = lflows,
>>> +            .mcgroups = mcgroups,
>>> +            .igmp_groups = igmp_groups,
>>> +            .meter_groups = meter_groups,
>>> +            .lbs = lbs,
>>> +            .svc_check_match = svc_check_match,
>>> +            .match = DS_EMPTY_INITIALIZER,
>>> +            .actions = DS_EMPTY_INITIALIZER,
>>> +        };
>>> +
>>> +
>>> +        /* Converged build - all lflow generation from lswitch and lrouter
>>> +         * will move here and will be reogranized by iterator type.
>>> +         */
>>> +        HMAP_FOR_EACH (od, key_node, datapaths) {
>>> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>>> +        }
>>> +        HMAP_FOR_EACH (op, key_node, ports) {
>>> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>>> +        }
>>> +        ds_destroy(&lsi.match);
>>> +        ds_destroy(&lsi.actions);
>>>       }
>>> -    free(svc_check_match);
>>>
>>> -    ds_destroy(&lsi.match);
>>> -    ds_destroy(&lsi.actions);
>>> +    free(svc_check_match);
>>>
>>>       /* Legacy lswitch build - to be migrated. */
>>>       build_lswitch_flows(datapaths, ports, lflows, mcgroups,
>>> @@ -11209,6 +11345,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>>       build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
>>>   }
>>>
>>> +static ssize_t max_seen_lflow_size = 128;
>>>
>>>   /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
>>>    * constructing their contents based on the OVN_NB database. */
>>> @@ -11219,12 +11356,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
>>>                struct shash *meter_groups,
>>>                struct hmap *lbs)
>>>   {
>>> -    struct hmap lflows = HMAP_INITIALIZER(&lflows);
>>> +    struct hmap lflows;
>>> +
>>> +    fast_hmap_size_for(&lflows, max_seen_lflow_size);
>>>
>>>       build_lswitch_and_lrouter_flows(datapaths, ports,
>>>                                       port_groups, &lflows, mcgroups,
>>>                                       igmp_groups, meter_groups, lbs);
>>>
>>> +    if (hmap_count(&lflows) > max_seen_lflow_size) {
>>> +        max_seen_lflow_size = hmap_count(&lflows);
>>> +    }
>>> +
>>>       /* Push changes to the Logical_Flow table to database. */
>>>       const struct sbrec_logical_flow *sbflow, *next_sbflow;
>>>       SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
>>> -- 
>>> 2.20.1
>>>
diff mbox series

Patch

diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 47a177c99..076a6d27f 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -49,6 +49,7 @@ 
 #include "unixctl.h"
 #include "util.h"
 #include "uuid.h"
+#include "fasthmap.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_northd);
@@ -4152,7 +4153,7 @@  ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
     ovn_lflow_init(lflow, od, stage, priority,
                    xstrdup(match), xstrdup(actions),
                    ovn_lflow_hint(stage_hint), where);
-    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
+    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
 }
 
 /* Adds a row with the specified contents to the Logical_Flow table. */
@@ -11100,6 +11101,8 @@  struct lswitch_flow_build_info {
 
 /* Helper function to combine all lflow generation which is iterated by
  * datapath.
+ * Invoked by parallel build over a "chunk" of work or by single threaded
+ * build over a chunk which is initialized to contain "all" work.
  */
 
 static void
@@ -11134,6 +11137,8 @@  build_lswitch_and_lrouter_iterate_by_od(struct ovn_datapath *od,
 }
 
 /* Helper function to combine all lflow generation which is iterated by port.
+ * Invoked by parallel build over a "chunk" of work or by single threaded
+ * build over a chunk which is initialized to contain "all" work.
  */
 
 static void
@@ -11161,6 +11166,88 @@  build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
                                             &lsi->match, &lsi->actions);
 }
 
+struct lflows_thread_pool {
+    struct worker_pool *pool;
+};
+
+static void *build_lflows_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lflows_thread_pool *workload;
+    struct lswitch_flow_build_info *lsi;
+
+    struct ovn_datapath *od;
+    struct ovn_port *op;
+    int bnum;
+
+    while (!cease_fire()) {
+        sem_wait(&control->fire);
+        workload = (struct lflows_thread_pool *) control->workload;
+        lsi = (struct lswitch_flow_build_info *) control->data;
+        if (lsi && workload) {
+            /* Iterate over bucket ThreadID, ThreadID+size, ... */
+            for (bnum = control->id;
+                    bnum <= lsi->datapaths->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        od, key_node, bnum, lsi->datapaths) {
+                    if (cease_fire()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
+                }
+            }
+            for (bnum = control->id;
+                    bnum <= lsi->ports->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        op, key_node, bnum, lsi->ports) {
+                    if (cease_fire()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
+                }
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_release);
+        }
+        sem_post(control->done);
+     }
+    return NULL;
+}
+
+static bool pool_init_done = false;
+static struct lflows_thread_pool *build_lflows_pool = NULL;
+
+static void init_lflows_thread_pool(void)
+{
+    int index;
+
+    if (!pool_init_done) {
+        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+        pool_init_done = true;
+        if (pool) {
+            build_lflows_pool =
+                xmalloc(sizeof(struct lflows_thread_pool));
+            build_lflows_pool->pool = pool;
+            for (index = 0; index < build_lflows_pool->pool->size; index++) {
+                build_lflows_pool->pool->controls[index].workload =
+                    build_lflows_pool;
+            }
+        }
+    }
+}
+
+/* TODO: replace hard cutoffs by configurable via commands. These are
+ * temporary defines to determine single-thread to multi-thread processing
+ * cutoff.
+ * Setting to 1 forces "all parallel" lflow build.
+ */
+
+#define OD_CUTOFF 1
+#define OP_CUTOFF 1
+
 static void
 build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                                 struct hmap *port_groups, struct hmap *lflows,
@@ -11168,38 +11255,87 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                                 struct hmap *igmp_groups,
                                 struct shash *meter_groups, struct hmap *lbs)
 {
-    struct ovn_datapath *od;
-    struct ovn_port *op;
-
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    struct lswitch_flow_build_info lsi = {
-        .datapaths = datapaths,
-        .ports = ports,
-        .port_groups = port_groups,
-        .lflows = lflows,
-        .mcgroups = mcgroups,
-        .igmp_groups = igmp_groups,
-        .meter_groups = meter_groups,
-        .lbs = lbs,
-        .svc_check_match = svc_check_match,
-        .match = DS_EMPTY_INITIALIZER,
-        .actions = DS_EMPTY_INITIALIZER,
-    };
+    init_lflows_thread_pool();
 
-    /* Combined build - all lflow generation from lswitch and lrouter
-     * will move here and will be reogranized by iterator type.
-     */
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
-    }
-    HMAP_FOR_EACH (op, key_node, ports) {
-        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+    if (build_lflows_pool &&
+        (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF)) {
+
+        struct hmap *lflow_segs;
+        struct lswitch_flow_build_info *lsiv;
+        int index;
+
+        lsiv = xmalloc(
+            sizeof(struct lswitch_flow_build_info) *
+                build_lflows_pool->pool->size);
+        lflow_segs = xmalloc(
+            sizeof(struct hmap) * build_lflows_pool->pool->size);
+
+        /* Set up "work chunks" for each thread to work on. */
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            fast_hmap_init(&lflow_segs[index], lflows->mask);
+
+            lsiv[index].datapaths = datapaths;
+            lsiv[index].ports = ports;
+            lsiv[index].port_groups = port_groups;
+            lsiv[index].lflows = &lflow_segs[index];
+            lsiv[index].mcgroups = mcgroups;
+            lsiv[index].igmp_groups = igmp_groups;
+            lsiv[index].meter_groups = meter_groups;
+            lsiv[index].lbs = lbs;
+            lsiv[index].svc_check_match = svc_check_match;
+            ds_init(&lsiv[index].match);
+            ds_init(&lsiv[index].actions);
+
+            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+        }
+
+        /* Run thread pool. */
+
+        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            ds_destroy(&lsiv[index].match);
+            ds_destroy(&lsiv[index].actions);
+        }
+
+        free(lflow_segs);
+        free(lsiv);
+    } else {
+        struct ovn_datapath *od;
+        struct ovn_port *op;
+
+        struct lswitch_flow_build_info lsi = {
+            .datapaths = datapaths,
+            .ports = ports,
+            .port_groups = port_groups,
+            .lflows = lflows,
+            .mcgroups = mcgroups,
+            .igmp_groups = igmp_groups,
+            .meter_groups = meter_groups,
+            .lbs = lbs,
+            .svc_check_match = svc_check_match,
+            .match = DS_EMPTY_INITIALIZER,
+            .actions = DS_EMPTY_INITIALIZER,
+        };
+
+
+        /* Converged build - all lflow generation from lswitch and lrouter
+         * will move here and will be reogranized by iterator type.
+         */
+        HMAP_FOR_EACH (od, key_node, datapaths) {
+            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
+        }
+        HMAP_FOR_EACH (op, key_node, ports) {
+            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+        }
+        ds_destroy(&lsi.match);
+        ds_destroy(&lsi.actions);
     }
-    free(svc_check_match);
 
-    ds_destroy(&lsi.match);
-    ds_destroy(&lsi.actions);
+    free(svc_check_match);
 
     /* Legacy lswitch build - to be migrated. */
     build_lswitch_flows(datapaths, ports, lflows, mcgroups,
@@ -11209,6 +11345,7 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
     build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
 }
 
+static ssize_t max_seen_lflow_size = 128;
 
 /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
  * constructing their contents based on the OVN_NB database. */
@@ -11219,12 +11356,18 @@  build_lflows(struct northd_context *ctx, struct hmap *datapaths,
              struct shash *meter_groups,
              struct hmap *lbs)
 {
-    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap lflows;
+
+    fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
     build_lswitch_and_lrouter_flows(datapaths, ports,
                                     port_groups, &lflows, mcgroups,
                                     igmp_groups, meter_groups, lbs);
 
+    if (hmap_count(&lflows) > max_seen_lflow_size) {
+        max_seen_lflow_size = hmap_count(&lflows);
+    }
+
     /* Push changes to the Logical_Flow table to database. */
     const struct sbrec_logical_flow *sbflow, *next_sbflow;
     SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {