diff mbox series

[ovs-dev,v12,3/4] ovn-northd: Introduce parallel lflow build

Message ID 20210115153001.24426-3-anton.ivanov@cambridgegreys.com
State Changes Requested
Headers show
Series [ovs-dev,v12,1/4] ovn-libs: Add support for parallel processing | expand

Commit Message

Anton Ivanov Jan. 15, 2021, 3:30 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Datapaths, ports, igmp groups and load balancers can now
be iterated over in parallel in order to speed up the lflow
generation. This decreases the time needed to generate the
logical flows by a factor of 4+ on a 6 core/12 thread CPU
without datapath groups - from 0.8-1 microseconds per flow
down to 0.2-0.3 microseconds per flow on average.

The decrease in time to compute lflows with datapath groups
enabled is ~2 times for the same hardware - from an average of
2.4 microseconds per flow to 1.2 microseconds per flow.

Tested for on an 8 node, 400 pod K8 simulation resulting
in > 6K flows.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 northd/ovn-northd.c | 334 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 281 insertions(+), 53 deletions(-)

Comments

0-day Robot Jan. 15, 2021, 4:04 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line is 80 characters long (recommended limit is 79)
#155 FILE: northd/ovn-northd.c:11527:
                HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {

WARNING: Line is 85 characters long (recommended limit is 79)
#304 FILE: northd/ovn-northd.c:11638:
            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);

WARNING: Line is 82 characters long (recommended limit is 79)
#338 FILE: northd/ovn-northd.c:11670:
            run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);

Lines checked: 449, Warnings: 3, Errors: 0


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Dumitru Ceara Jan. 25, 2021, 8:56 p.m. UTC | #2
On 1/15/21 4:30 PM, anton.ivanov@cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> Datapaths, ports, igmp groups and load balancers can now
> be iterated over in parallel in order to speed up the lflow
> generation. This decreases the time needed to generate the
> logical flows by a factor of 4+ on a 6 core/12 thread CPU
> without datapath groups - from 0.8-1 microseconds per flow
> down to 0.2-0.3 microseconds per flow on average.
> 
> The decrease in time to compute lflows with datapath groups
> enabled is ~2 times for the same hardware - from an average of
> 2.4 microseconds per flow to 1.2 microseconds per flow.
> 
> Tested for on an 8 node, 400 pod K8 simulation resulting
> in > 6K flows.
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---

Hi Anton,

Again, not a full review, but some initial remarks, most of them I think 
related to the fact that we're leaking the fast-hmap abstraction details.

>   northd/ovn-northd.c | 334 +++++++++++++++++++++++++++++++++++++-------
>   1 file changed, 281 insertions(+), 53 deletions(-)
> 
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index dda033543..53e0cc50d 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -37,6 +37,7 @@
>   #include "lib/ovn-sb-idl.h"
>   #include "lib/ovn-util.h"
>   #include "lib/lb.h"
> +#include "lib/fasthmap.h"
>   #include "ovn/actions.h"
>   #include "ovn/logical-fields.h"
>   #include "packets.h"
> @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>   /* If this option is 'true' northd will combine logical flows that differs by
>    * logical datapath only by creating a datapah group. */
>   static bool use_logical_dp_groups = false;
> +static bool use_parallel_build = true;
> +
> +static struct ovs_mutex *slice_locks = NULL;

I'm not very sure why the locking details have to be known by the user 
of the fasthmap.  Why doesn't the fasthmap library take care of 
(re)allocating just the right amount of locks it needs?

> +
> +/* Adds a row with the specified contents to the Logical_Flow table.
> + * Version to use when locking is required.
> + */
> +static void
> +do_ovn_lflow_add(struct hmap *lflow_map, bool shared,
> +                        struct ovn_datapath *od,
> +                        uint32_t hash, struct ovn_lflow *lflow)
> +{
> +
> +    struct ovn_lflow *old_lflow;
> +
> +    if (shared && use_logical_dp_groups) {
> +        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
> +        if (old_lflow) {
> +            ovn_lflow_destroy(NULL, lflow);
> +            hmapx_add(&old_lflow->od_group, od);
> +            return;
> +        }
> +    }
> +
> +    hmapx_add(&lflow->od_group, od);
> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> +}
> +
>   
>   /* Adds a row with the specified contents to the Logical_Flow table. */
>   static void
> @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>   {
>       ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
>   
> -    struct ovn_lflow *old_lflow, *lflow;
> +    struct ovn_lflow *lflow;
>       uint32_t hash;
>   
>       lflow = xmalloc(sizeof *lflow);
> @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>                      ovn_lflow_hint(stage_hint), where);
>   
>       hash = ovn_lflow_hash(lflow);
> -    if (shared && use_logical_dp_groups) {
> -        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
> -        if (old_lflow) {
> -            ovn_lflow_destroy(NULL, lflow);
> -            hmapx_add(&old_lflow->od_group, od);
> -            return;
> -        }
> -    }
>   
> -    hmapx_add(&lflow->od_group, od);
> -    hmap_insert(lflow_map, &lflow->hmap_node, hash);
> +    if (use_logical_dp_groups && use_parallel_build) {
> +        ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);

We're using hmap internals and the fasthmap user has to make sure it 
locked the correct slice lock.

If slice_locks were something internal to the fasthmap, an API to get 
the corresponding lock, e.g., based on 'hash' would hide these 
implementation details.

> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
> +        ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
> +    } else {
> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
> +    }
>   }
>   
>   /* Adds a row with the specified contents to the Logical_Flow table. */
> @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group,
>       }
>   }
>   
> +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
> +
>   /* Ingress table 19: Destination lookup, unicast handling (priority 50), */
>   static void
>   build_lswitch_ip_unicast_lookup(struct ovn_port *op,
> @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op,
>                                           &op->nbsp->header_);
>               } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
>                   if (lsp_is_enabled(op->nbsp)) {
> +                    ovs_mutex_lock(&mcgroup_mutex);
>                       ovn_multicast_add(mcgroups, &mc_unknown, op);
> +                    ovs_mutex_unlock(&mcgroup_mutex);
>                       op->od->has_unknown = true;
>                   }
>               } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
> @@ -11676,6 +11706,122 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>                                   &lsi->match, &lsi->actions);
>   }
>   
> +struct lflows_thread_pool {
> +    struct worker_pool *pool;
> +};
> +
> +static void *build_lflows_thread(void *arg)
> +{
> +    struct worker_control *control = (struct worker_control *) arg;
> +    struct lflows_thread_pool *workload;
> +    struct lswitch_flow_build_info *lsi;
> +
> +    struct ovn_datapath *od;
> +    struct ovn_port *op;
> +    struct ovn_northd_lb *lb;
> +    struct ovn_igmp_group *igmp_group;
> +    int bnum;
> +
> +    while (!stop_parallel_processing()) {
> +        sem_wait(&control->fire);

This feels a bit error prone.  Can all this be abstracted away in a 
fasthmap API that executes callbacks for all fast-hmap inputs 
(lsi->datapaths, lsi->ports, lsi->lbs, etc.) that have been defined by 
the user code?

> +        workload = (struct lflows_thread_pool *) control->workload;
> +        lsi = (struct lswitch_flow_build_info *) control->data;
> +        if (lsi && workload) {
> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
> +            for (bnum = control->id;
> +                    bnum <= lsi->datapaths->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
> +                    if (stop_parallel_processing()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
> +                }
> +            }
> +            for (bnum = control->id;
> +                    bnum <= lsi->ports->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
> +                    if (stop_parallel_processing()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
> +                }
> +            }
> +            for (bnum = control->id;
> +                    bnum <= lsi->lbs->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
> +                    if (stop_parallel_processing()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
> +                                                         &lsi->match,
> +                                                         &lsi->actions);
> +                }
> +            }
> +            for (bnum = control->id;
> +                    bnum <= lsi->igmp_groups->mask;
> +                    bnum += workload->pool->size)
> +            {
> +                HMAP_FOR_EACH_IN_PARALLEL (
> +                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> +                    if (stop_parallel_processing()) {
> +                        return NULL;
> +                    }
> +                    build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
> +                                                    &lsi->match,
> +                                                    &lsi->actions);
> +                }
> +            }
> +            atomic_store_relaxed(&control->finished, true);
> +            atomic_thread_fence(memory_order_acq_rel);

Especially this seems error prone and it would be better if it's 
implemented inside fasthmap.c where we actually read &control->finished.

> +        }
> +        sem_post(control->done);

Same here.

> +     }
> +    return NULL;
> +}
> +
> +static bool pool_init_done = false;
> +static struct lflows_thread_pool *build_lflows_pool = NULL;
> +
> +static void init_lflows_thread_pool(void)
> +{
> +    int index;
> +
> +    if (!pool_init_done) {
> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> +        pool_init_done = true;
> +        if (pool) {
> +            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
> +            build_lflows_pool->pool = pool;
> +            for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +                build_lflows_pool->pool->controls[index].workload =
> +                    build_lflows_pool;
> +            }
> +        }

As mentioned on patch 1/4, if pool == NULL, build_lflows_pool is always 
NULL and ovn-northd crashes when use_parallel_build == true.

> +    }
> +}
> +
> +/* TODO: replace hard cutoffs by configurable via commands. These are
> + * temporary defines to determine single-thread to multi-thread processing
> + * cutoff.
> + * Setting to 1 forces "all parallel" lflow build.
> + */
> +
> +static void
> +noop_callback(struct worker_pool *pool OVS_UNUSED,
> +              void *fin_result OVS_UNUSED,
> +              void *result_frags OVS_UNUSED,
> +              int index OVS_UNUSED)
> +{
> +    /* Do nothing */
> +}
> +
> +
>   static void
>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>                                   struct hmap *port_groups, struct hmap *lflows,
> @@ -11684,53 +11830,108 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>                                   struct shash *meter_groups, struct hmap *lbs,
>                                   struct hmap *bfd_connections)
>   {
> -    struct ovn_datapath *od;
> -    struct ovn_port *op;
> -    struct ovn_northd_lb *lb;
> -    struct ovn_igmp_group *igmp_group;
>   
>       char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>   
> -    struct lswitch_flow_build_info lsi = {
> -        .datapaths = datapaths,
> -        .ports = ports,
> -        .port_groups = port_groups,
> -        .lflows = lflows,
> -        .mcgroups = mcgroups,
> -        .igmp_groups = igmp_groups,
> -        .meter_groups = meter_groups,
> -        .lbs = lbs,
> -        .bfd_connections = bfd_connections,
> -        .svc_check_match = svc_check_match,
> -        .match = DS_EMPTY_INITIALIZER,
> -        .actions = DS_EMPTY_INITIALIZER,
> -    };
> +    if (use_parallel_build) {
> +        init_lflows_thread_pool();
> +        struct hmap *lflow_segs;
> +        struct lswitch_flow_build_info *lsiv;
> +        int index;
>   
> -    /* Combined build - all lflow generation from lswitch and lrouter
> -     * will move here and will be reogranized by iterator type.
> -     */
> -    HMAP_FOR_EACH (od, key_node, datapaths) {
> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> -    }
> -    HMAP_FOR_EACH (op, key_node, ports) {
> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> -    }
> -    HMAP_FOR_EACH (lb, hmap_node, lbs) {
> -        build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
> -                                             &lsi.actions,
> -                                             &lsi.match);
> -    }
> -    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> -        build_lswitch_ip_mcast_igmp_mld(igmp_group,
> -                                        lsi.lflows,
> -                                        &lsi.actions,
> -                                        &lsi.match);
> -    }
> -    free(svc_check_match);
> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
> +        if (use_logical_dp_groups) {
> +            lflow_segs = NULL;
> +        } else {
> +            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
> +        }
> +
> +        /* Set up "work chunks" for each thread to work on. */
>   
> -    ds_destroy(&lsi.match);
> -    ds_destroy(&lsi.actions);
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            if (use_logical_dp_groups) {
> +                /* if dp_groups are in use we lock a shared lflows hash
> +                 * on a per-bucket level instead of merging hash frags */
> +                lsiv[index].lflows = lflows;
> +            } else {
> +                fast_hmap_init(&lflow_segs[index], lflows->mask);

I guess I should've mentioned this on patch 1/4 but now it became more 
obvious that fast_hmap_*() APIs work on 'struct hmap' objects directly. 
  It makes it very easy to end up mixing thread safe (i.e., fast_hmap*) 
and regular hmap_*() APIs which might lead to unexpected results.

> +                lsiv[index].lflows = &lflow_segs[index];
> +            }
> +
> +            lsiv[index].datapaths = datapaths;
> +            lsiv[index].ports = ports;
> +            lsiv[index].port_groups = port_groups;
> +            lsiv[index].mcgroups = mcgroups;
> +            lsiv[index].igmp_groups = igmp_groups;
> +            lsiv[index].meter_groups = meter_groups;
> +            lsiv[index].lbs = lbs;
> +            lsiv[index].bfd_connections = bfd_connections;
> +            lsiv[index].svc_check_match = svc_check_match;
> +            ds_init(&lsiv[index].match);
> +            ds_init(&lsiv[index].actions);
> +
> +            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +        }
> +
> +        /* Run thread pool. */
> +        if (use_logical_dp_groups) {
> +            run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
> +        } else {
> +            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +        }
>   
> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +            ds_destroy(&lsiv[index].match);
> +            ds_destroy(&lsiv[index].actions);
> +        }
> +        free(lflow_segs);
> +        free(lsiv);
> +    } else {
> +        struct ovn_datapath *od;
> +        struct ovn_port *op;
> +        struct ovn_northd_lb *lb;
> +        struct ovn_igmp_group *igmp_group;
> +        struct lswitch_flow_build_info lsi = {
> +            .datapaths = datapaths,
> +            .ports = ports,
> +            .port_groups = port_groups,
> +            .lflows = lflows,
> +            .mcgroups = mcgroups,
> +            .igmp_groups = igmp_groups,
> +            .meter_groups = meter_groups,
> +            .lbs = lbs,
> +            .bfd_connections = bfd_connections,
> +            .svc_check_match = svc_check_match,
> +            .match = DS_EMPTY_INITIALIZER,
> +            .actions = DS_EMPTY_INITIALIZER,
> +        };
> +
> +        /* Combined build - all lflow generation from lswitch and lrouter
> +         * will move here and will be reogranized by iterator type.
> +         */
> +        HMAP_FOR_EACH (od, key_node, datapaths) {
> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> +        }
> +        HMAP_FOR_EACH (op, key_node, ports) {
> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> +        }
> +        HMAP_FOR_EACH (lb, hmap_node, lbs) {
> +            build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
> +                                                 &lsi.actions,
> +                                                 &lsi.match);
> +        }
> +        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> +            build_lswitch_ip_mcast_igmp_mld(igmp_group,
> +                                            lsi.lflows,
> +                                            &lsi.actions,
> +                                            &lsi.match);
> +        }
> +
> +        ds_destroy(&lsi.match);
> +        ds_destroy(&lsi.actions);
> +    }
> +
> +    free(svc_check_match);
>       build_lswitch_flows(datapaths, lflows);
>   }
>   
> @@ -11801,6 +12002,25 @@ ovn_sb_set_lflow_logical_dp_group(
>       sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
>   }
>   
> +static ssize_t max_seen_lflow_size = 128;
> +
> +static ssize_t recent_lflow_map_mask = 0;

I think this can also be part of the fast-hmap implementation.

> +
> +static void update_lock_array(struct hmap *lflows)
> +{
> +    int i;
> +    if (recent_lflow_map_mask != lflows->mask) {
> +        if (slice_locks) {
> +            free(slice_locks);
> +        }
> +        slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
> +        recent_lflow_map_mask = lflows->mask;
> +        for (i = 0; i <= lflows->mask; i++) {
> +            ovs_mutex_init(&slice_locks[i]);
> +        }
> +    }
> +}

Same here.

Regards,
Dumitru
Anton Ivanov Jan. 25, 2021, 9:10 p.m. UTC | #3
On 25/01/2021 20:56, Dumitru Ceara wrote:
> On 1/15/21 4:30 PM, anton.ivanov@cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> Datapaths, ports, igmp groups and load balancers can now
>> be iterated over in parallel in order to speed up the lflow
>> generation. This decreases the time needed to generate the
>> logical flows by a factor of 4+ on a 6 core/12 thread CPU
>> without datapath groups - from 0.8-1 microseconds per flow
>> down to 0.2-0.3 microseconds per flow on average.
>>
>> The decrease in time to compute lflows with datapath groups
>> enabled is ~2 times for the same hardware - from an average of
>> 2.4 microseconds per flow to 1.2 microseconds per flow.
>>
>> Tested for on an 8 node, 400 pod K8 simulation resulting
>> in > 6K flows.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> ---
> 
> Hi Anton,
> 
> Again, not a full review, but some initial remarks, most of them I think 
> related to the fact that we're leaking the fast-hmap abstraction details.
> 
>>   northd/ovn-northd.c | 334 +++++++++++++++++++++++++++++++++++++-------
>>   1 file changed, 281 insertions(+), 53 deletions(-)
>>
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index dda033543..53e0cc50d 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -37,6 +37,7 @@
>>   #include "lib/ovn-sb-idl.h"
>>   #include "lib/ovn-util.h"
>>   #include "lib/lb.h"
>> +#include "lib/fasthmap.h"
>>   #include "ovn/actions.h"
>>   #include "ovn/logical-fields.h"
>>   #include "packets.h"
>> @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct 
>> ovn_datapath *od,
>>   /* If this option is 'true' northd will combine logical flows that 
>> differs by
>>    * logical datapath only by creating a datapah group. */
>>   static bool use_logical_dp_groups = false;
>> +static bool use_parallel_build = true;
>> +
>> +static struct ovs_mutex *slice_locks = NULL;
> 
> I'm not very sure why the locking details have to be known by the user 
> of the fasthmap.  Why doesn't the fasthmap library take care of 
> (re)allocating just the right amount of locks it needs?
> 
>> +
>> +/* Adds a row with the specified contents to the Logical_Flow table.
>> + * Version to use when locking is required.
>> + */
>> +static void
>> +do_ovn_lflow_add(struct hmap *lflow_map, bool shared,
>> +                        struct ovn_datapath *od,
>> +                        uint32_t hash, struct ovn_lflow *lflow)
>> +{
>> +
>> +    struct ovn_lflow *old_lflow;
>> +
>> +    if (shared && use_logical_dp_groups) {
>> +        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> +        if (old_lflow) {
>> +            ovn_lflow_destroy(NULL, lflow);
>> +            hmapx_add(&old_lflow->od_group, od);
>> +            return;
>> +        }
>> +    }
>> +
>> +    hmapx_add(&lflow->od_group, od);
>> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> +}
>> +
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>>   static void
>> @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct 
>> ovn_datapath *od,
>>   {
>>       ovs_assert(ovn_stage_to_datapath_type(stage) == 
>> ovn_datapath_get_type(od));
>> -    struct ovn_lflow *old_lflow, *lflow;
>> +    struct ovn_lflow *lflow;
>>       uint32_t hash;
>>       lflow = xmalloc(sizeof *lflow);
>> @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, 
>> struct ovn_datapath *od,
>>                      ovn_lflow_hint(stage_hint), where);
>>       hash = ovn_lflow_hash(lflow);
>> -    if (shared && use_logical_dp_groups) {
>> -        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> -        if (old_lflow) {
>> -            ovn_lflow_destroy(NULL, lflow);
>> -            hmapx_add(&old_lflow->od_group, od);
>> -            return;
>> -        }
>> -    }
>> -    hmapx_add(&lflow->od_group, od);
>> -    hmap_insert(lflow_map, &lflow->hmap_node, hash);
>> +    if (use_logical_dp_groups && use_parallel_build) {
>> +        ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);
> 
> We're using hmap internals and the fasthmap user has to make sure it 
> locked the correct slice lock.
> 
> If slice_locks were something internal to the fasthmap, an API to get 
> the corresponding lock, e.g., based on 'hash' would hide these 
> implementation details.

I can add the lock-on-slice facility there.

Though for now it has only one use case - datapath groups. The idea 
elsewhere is to use lockless "produce a result fragment" strategy in 
each worker thread and have the main thread merge the fragments.

> 
>> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> +        ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
>> +    } else {
>> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> +    }
>>   }
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>> @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct 
>> ovn_igmp_group *igmp_group,
>>       }
>>   }
>> +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
>> +
>>   /* Ingress table 19: Destination lookup, unicast handling (priority 
>> 50), */
>>   static void
>>   build_lswitch_ip_unicast_lookup(struct ovn_port *op,
>> @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port 
>> *op,
>>                                           &op->nbsp->header_);
>>               } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
>>                   if (lsp_is_enabled(op->nbsp)) {
>> +                    ovs_mutex_lock(&mcgroup_mutex);
>>                       ovn_multicast_add(mcgroups, &mc_unknown, op);
>> +                    ovs_mutex_unlock(&mcgroup_mutex);
>>                       op->od->has_unknown = true;
>>                   }
>>               } else if 
>> (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
>> @@ -11676,6 +11706,122 @@ 
>> build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>                                   &lsi->match, &lsi->actions);
>>   }
>> +struct lflows_thread_pool {
>> +    struct worker_pool *pool;
>> +};
>> +
>> +static void *build_lflows_thread(void *arg)
>> +{
>> +    struct worker_control *control = (struct worker_control *) arg;
>> +    struct lflows_thread_pool *workload;
>> +    struct lswitch_flow_build_info *lsi;
>> +
>> +    struct ovn_datapath *od;
>> +    struct ovn_port *op;
>> +    struct ovn_northd_lb *lb;
>> +    struct ovn_igmp_group *igmp_group;
>> +    int bnum;
>> +
>> +    while (!stop_parallel_processing()) {
>> +        sem_wait(&control->fire);
> 
> This feels a bit error prone.  Can all this be abstracted away in a 
> fasthmap API that executes callbacks for all fast-hmap inputs 
> (lsi->datapaths, lsi->ports, lsi->lbs, etc.) that have been defined by 
> the user code?

I need to get my head around. The stop part can.

The rest, not so much - the are iterators similar to HMAP_FOR_EACH

> 
>> +        workload = (struct lflows_thread_pool *) control->workload;
>> +        lsi = (struct lswitch_flow_build_info *) control->data;
>> +        if (lsi && workload) {
>> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->datapaths->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, 
>> lsi->datapaths) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->ports->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, 
>> lsi->ports) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->lbs->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, 
>> lsi->lbs) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_arp_nd_service_monitor(lb, 
>> lsi->lflows,
>> +                                                         &lsi->match,
>> +                                                         &lsi->actions);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->igmp_groups->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_ip_mcast_igmp_mld(igmp_group, 
>> lsi->lflows,
>> +                                                    &lsi->match,
>> +                                                    &lsi->actions);
>> +                }
>> +            }
>> +            atomic_store_relaxed(&control->finished, true);
>> +            atomic_thread_fence(memory_order_acq_rel);
> 
> Especially this seems error prone and it would be better if it's 
> implemented inside fasthmap.c where we actually read &control->finished.

That's a different thread :) You cannot read it there.

You read control->finished in the master thread where you collect 
results, you set control->finished and do the barrier in the worker 
threads.


> 
>> +        }
>> +        sem_post(control->done);
> 
> Same here.
> 
>> +     }
>> +    return NULL;
>> +}
>> +
>> +static bool pool_init_done = false;
>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>> +
>> +static void init_lflows_thread_pool(void)
>> +{
>> +    int index;
>> +
>> +    if (!pool_init_done) {
>> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> +        pool_init_done = true;
>> +        if (pool) {
>> +            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
>> +            build_lflows_pool->pool = pool;
>> +            for (index = 0; index < build_lflows_pool->pool->size; 
>> index++) {
>> +                build_lflows_pool->pool->controls[index].workload =
>> +                    build_lflows_pool;
>> +            }
>> +        }
> 
> As mentioned on patch 1/4, if pool == NULL, build_lflows_pool is always 
> NULL and ovn-northd crashes when use_parallel_build == true.
> 
>> +    }
>> +}
>> +
>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>> + * temporary defines to determine single-thread to multi-thread 
>> processing
>> + * cutoff.
>> + * Setting to 1 forces "all parallel" lflow build.
>> + */
>> +
>> +static void
>> +noop_callback(struct worker_pool *pool OVS_UNUSED,
>> +              void *fin_result OVS_UNUSED,
>> +              void *result_frags OVS_UNUSED,
>> +              int index OVS_UNUSED)
>> +{
>> +    /* Do nothing */
>> +}
>> +
>> +
>>   static void
>>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap 
>> *ports,
>>                                   struct hmap *port_groups, struct 
>> hmap *lflows,
>> @@ -11684,53 +11830,108 @@ build_lswitch_and_lrouter_flows(struct hmap 
>> *datapaths, struct hmap *ports,
>>                                   struct shash *meter_groups, struct 
>> hmap *lbs,
>>                                   struct hmap *bfd_connections)
>>   {
>> -    struct ovn_datapath *od;
>> -    struct ovn_port *op;
>> -    struct ovn_northd_lb *lb;
>> -    struct ovn_igmp_group *igmp_group;
>>       char *svc_check_match = xasprintf("eth.dst == %s", 
>> svc_monitor_mac);
>> -    struct lswitch_flow_build_info lsi = {
>> -        .datapaths = datapaths,
>> -        .ports = ports,
>> -        .port_groups = port_groups,
>> -        .lflows = lflows,
>> -        .mcgroups = mcgroups,
>> -        .igmp_groups = igmp_groups,
>> -        .meter_groups = meter_groups,
>> -        .lbs = lbs,
>> -        .bfd_connections = bfd_connections,
>> -        .svc_check_match = svc_check_match,
>> -        .match = DS_EMPTY_INITIALIZER,
>> -        .actions = DS_EMPTY_INITIALIZER,
>> -    };
>> +    if (use_parallel_build) {
>> +        init_lflows_thread_pool();
>> +        struct hmap *lflow_segs;
>> +        struct lswitch_flow_build_info *lsiv;
>> +        int index;
>> -    /* Combined build - all lflow generation from lswitch and lrouter
>> -     * will move here and will be reogranized by iterator type.
>> -     */
>> -    HMAP_FOR_EACH (od, key_node, datapaths) {
>> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (op, key_node, ports) {
>> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> -        build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> -                                             &lsi.actions,
>> -                                             &lsi.match);
>> -    }
>> -    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> -        build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> -                                        lsi.lflows,
>> -                                        &lsi.actions,
>> -                                        &lsi.match);
>> -    }
>> -    free(svc_check_match);
>> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
>> +        if (use_logical_dp_groups) {
>> +            lflow_segs = NULL;
>> +        } else {
>> +            lflow_segs = xcalloc(sizeof(*lflow_segs), 
>> build_lflows_pool->pool->size);
>> +        }
>> +
>> +        /* Set up "work chunks" for each thread to work on. */
>> -    ds_destroy(&lsi.match);
>> -    ds_destroy(&lsi.actions);
>> +        for (index = 0; index < build_lflows_pool->pool->size; 
>> index++) {
>> +            if (use_logical_dp_groups) {
>> +                /* if dp_groups are in use we lock a shared lflows hash
>> +                 * on a per-bucket level instead of merging hash 
>> frags */
>> +                lsiv[index].lflows = lflows;
>> +            } else {
>> +                fast_hmap_init(&lflow_segs[index], lflows->mask);
> 
> I guess I should've mentioned this on patch 1/4 but now it became more 
> obvious that fast_hmap_*() APIs work on 'struct hmap' objects directly. 
>   It makes it very easy to end up mixing thread safe (i.e., fast_hmap*) 
> and regular hmap_*() APIs which might lead to unexpected results.
> 
>> +                lsiv[index].lflows = &lflow_segs[index];
>> +            }
>> +
>> +            lsiv[index].datapaths = datapaths;
>> +            lsiv[index].ports = ports;
>> +            lsiv[index].port_groups = port_groups;
>> +            lsiv[index].mcgroups = mcgroups;
>> +            lsiv[index].igmp_groups = igmp_groups;
>> +            lsiv[index].meter_groups = meter_groups;
>> +            lsiv[index].lbs = lbs;
>> +            lsiv[index].bfd_connections = bfd_connections;
>> +            lsiv[index].svc_check_match = svc_check_match;
>> +            ds_init(&lsiv[index].match);
>> +            ds_init(&lsiv[index].actions);
>> +
>> +            build_lflows_pool->pool->controls[index].data = 
>> &lsiv[index];
>> +        }
>> +
>> +        /* Run thread pool. */
>> +        if (use_logical_dp_groups) {
>> +            run_pool_callback(build_lflows_pool->pool, NULL, NULL, 
>> noop_callback);
>> +        } else {
>> +            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +        }
>> +        for (index = 0; index < build_lflows_pool->pool->size; 
>> index++) {
>> +            ds_destroy(&lsiv[index].match);
>> +            ds_destroy(&lsiv[index].actions);
>> +        }
>> +        free(lflow_segs);
>> +        free(lsiv);
>> +    } else {
>> +        struct ovn_datapath *od;
>> +        struct ovn_port *op;
>> +        struct ovn_northd_lb *lb;
>> +        struct ovn_igmp_group *igmp_group;
>> +        struct lswitch_flow_build_info lsi = {
>> +            .datapaths = datapaths,
>> +            .ports = ports,
>> +            .port_groups = port_groups,
>> +            .lflows = lflows,
>> +            .mcgroups = mcgroups,
>> +            .igmp_groups = igmp_groups,
>> +            .meter_groups = meter_groups,
>> +            .lbs = lbs,
>> +            .bfd_connections = bfd_connections,
>> +            .svc_check_match = svc_check_match,
>> +            .match = DS_EMPTY_INITIALIZER,
>> +            .actions = DS_EMPTY_INITIALIZER,
>> +        };
>> +
>> +        /* Combined build - all lflow generation from lswitch and 
>> lrouter
>> +         * will move here and will be reogranized by iterator type.
>> +         */
>> +        HMAP_FOR_EACH (od, key_node, datapaths) {
>> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (op, key_node, ports) {
>> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> +            build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> +                                                 &lsi.actions,
>> +                                                 &lsi.match);
>> +        }
>> +        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> +            build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> +                                            lsi.lflows,
>> +                                            &lsi.actions,
>> +                                            &lsi.match);
>> +        }
>> +
>> +        ds_destroy(&lsi.match);
>> +        ds_destroy(&lsi.actions);
>> +    }
>> +
>> +    free(svc_check_match);
>>       build_lswitch_flows(datapaths, lflows);
>>   }
>> @@ -11801,6 +12002,25 @@ ovn_sb_set_lflow_logical_dp_group(
>>       sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
>>   }
>> +static ssize_t max_seen_lflow_size = 128;
>> +
>> +static ssize_t recent_lflow_map_mask = 0;
> 
> I think this can also be part of the fast-hmap implementation.
> 
>> +
>> +static void update_lock_array(struct hmap *lflows)
>> +{
>> +    int i;
>> +    if (recent_lflow_map_mask != lflows->mask) {
>> +        if (slice_locks) {
>> +            free(slice_locks);
>> +        }
>> +        slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask 
>> + 1);
>> +        recent_lflow_map_mask = lflows->mask;
>> +        for (i = 0; i <= lflows->mask; i++) {
>> +            ovs_mutex_init(&slice_locks[i]);
>> +        }
>> +    }
>> +}
> 
> Same here.
> 
> Regards,
> Dumitru
> 
>
Dumitru Ceara Jan. 25, 2021, 9:15 p.m. UTC | #4
On 1/25/21 10:10 PM, Anton Ivanov wrote:

[...]

>>> +            atomic_store_relaxed(&control->finished, true);
>>> +            atomic_thread_fence(memory_order_acq_rel);
>>
>> Especially this seems error prone and it would be better if it's 
>> implemented inside fasthmap.c where we actually read &control->finished.
> 
> That's a different thread :) You cannot read it there.
> 

I meant inside the fasthmap.c file, not in the function where we read 
the variable :)  Sorry for the confusion.
Anton Ivanov Jan. 25, 2021, 9:51 p.m. UTC | #5
On 25/01/2021 21:15, Dumitru Ceara wrote:
> On 1/25/21 10:10 PM, Anton Ivanov wrote:
>
> [...]
>
>>>> + atomic_store_relaxed(&control->finished, true);
>>>> +            atomic_thread_fence(memory_order_acq_rel);
>>>
>>> Especially this seems error prone and it would be better if it's 
>>> implemented inside fasthmap.c where we actually read 
>>> &control->finished.
>>
>> That's a different thread :) You cannot read it there.
>>
>
> I meant inside the fasthmap.c file, not in the function where we read 
> the variable :)  Sorry for the confusion.
>
>
>
Ah, OK.

I will abstract these snippets.

A.
diff mbox series

Patch

diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index dda033543..53e0cc50d 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -37,6 +37,7 @@ 
 #include "lib/ovn-sb-idl.h"
 #include "lib/ovn-util.h"
 #include "lib/lb.h"
+#include "lib/fasthmap.h"
 #include "ovn/actions.h"
 #include "ovn/logical-fields.h"
 #include "packets.h"
@@ -4174,6 +4175,34 @@  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
 /* If this option is 'true' northd will combine logical flows that differs by
  * logical datapath only by creating a datapah group. */
 static bool use_logical_dp_groups = false;
+static bool use_parallel_build = true;
+
+static struct ovs_mutex *slice_locks = NULL;
+
+/* Adds a row with the specified contents to the Logical_Flow table.
+ * Version to use when locking is required.
+ */
+static void
+do_ovn_lflow_add(struct hmap *lflow_map, bool shared,
+                        struct ovn_datapath *od,
+                        uint32_t hash, struct ovn_lflow *lflow)
+{
+
+    struct ovn_lflow *old_lflow;
+
+    if (shared && use_logical_dp_groups) {
+        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
+        if (old_lflow) {
+            ovn_lflow_destroy(NULL, lflow);
+            hmapx_add(&old_lflow->od_group, od);
+            return;
+        }
+    }
+
+    hmapx_add(&lflow->od_group, od);
+    hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
+}
+
 
 /* Adds a row with the specified contents to the Logical_Flow table. */
 static void
@@ -4184,7 +4213,7 @@  ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
 {
     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
 
-    struct ovn_lflow *old_lflow, *lflow;
+    struct ovn_lflow *lflow;
     uint32_t hash;
 
     lflow = xmalloc(sizeof *lflow);
@@ -4196,17 +4225,14 @@  ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
                    ovn_lflow_hint(stage_hint), where);
 
     hash = ovn_lflow_hash(lflow);
-    if (shared && use_logical_dp_groups) {
-        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
-        if (old_lflow) {
-            ovn_lflow_destroy(NULL, lflow);
-            hmapx_add(&old_lflow->od_group, od);
-            return;
-        }
-    }
 
-    hmapx_add(&lflow->od_group, od);
-    hmap_insert(lflow_map, &lflow->hmap_node, hash);
+    if (use_logical_dp_groups && use_parallel_build) {
+        ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);
+        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
+        ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
+    } else {
+        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
+    }
 }
 
 /* Adds a row with the specified contents to the Logical_Flow table. */
@@ -7348,6 +7374,8 @@  build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group,
     }
 }
 
+static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
+
 /* Ingress table 19: Destination lookup, unicast handling (priority 50), */
 static void
 build_lswitch_ip_unicast_lookup(struct ovn_port *op,
@@ -7386,7 +7414,9 @@  build_lswitch_ip_unicast_lookup(struct ovn_port *op,
                                         &op->nbsp->header_);
             } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
                 if (lsp_is_enabled(op->nbsp)) {
+                    ovs_mutex_lock(&mcgroup_mutex);
                     ovn_multicast_add(mcgroups, &mc_unknown, op);
+                    ovs_mutex_unlock(&mcgroup_mutex);
                     op->od->has_unknown = true;
                 }
             } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
@@ -11676,6 +11706,122 @@  build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
                                 &lsi->match, &lsi->actions);
 }
 
+struct lflows_thread_pool {
+    struct worker_pool *pool;
+};
+
+static void *build_lflows_thread(void *arg)
+{
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lflows_thread_pool *workload;
+    struct lswitch_flow_build_info *lsi;
+
+    struct ovn_datapath *od;
+    struct ovn_port *op;
+    struct ovn_northd_lb *lb;
+    struct ovn_igmp_group *igmp_group;
+    int bnum;
+
+    while (!stop_parallel_processing()) {
+        sem_wait(&control->fire);
+        workload = (struct lflows_thread_pool *) control->workload;
+        lsi = (struct lswitch_flow_build_info *) control->data;
+        if (lsi && workload) {
+            /* Iterate over bucket ThreadID, ThreadID+size, ... */
+            for (bnum = control->id;
+                    bnum <= lsi->datapaths->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
+                    if (stop_parallel_processing()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
+                }
+            }
+            for (bnum = control->id;
+                    bnum <= lsi->ports->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
+                    if (stop_parallel_processing()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
+                }
+            }
+            for (bnum = control->id;
+                    bnum <= lsi->lbs->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
+                    if (stop_parallel_processing()) {
+                        return NULL;
+                    }
+                    build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
+                                                         &lsi->match,
+                                                         &lsi->actions);
+                }
+            }
+            for (bnum = control->id;
+                    bnum <= lsi->igmp_groups->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
+                    if (stop_parallel_processing()) {
+                        return NULL;
+                    }
+                    build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
+                                                    &lsi->match,
+                                                    &lsi->actions);
+                }
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_acq_rel);
+        }
+        sem_post(control->done);
+     }
+    return NULL;
+}
+
+static bool pool_init_done = false;
+static struct lflows_thread_pool *build_lflows_pool = NULL;
+
+static void init_lflows_thread_pool(void)
+{
+    int index;
+
+    if (!pool_init_done) {
+        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+        pool_init_done = true;
+        if (pool) {
+            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
+            build_lflows_pool->pool = pool;
+            for (index = 0; index < build_lflows_pool->pool->size; index++) {
+                build_lflows_pool->pool->controls[index].workload =
+                    build_lflows_pool;
+            }
+        }
+    }
+}
+
+/* TODO: replace hard cutoffs by configurable via commands. These are
+ * temporary defines to determine single-thread to multi-thread processing
+ * cutoff.
+ * Setting to 1 forces "all parallel" lflow build.
+ */
+
+static void
+noop_callback(struct worker_pool *pool OVS_UNUSED,
+              void *fin_result OVS_UNUSED,
+              void *result_frags OVS_UNUSED,
+              int index OVS_UNUSED)
+{
+    /* Do nothing */
+}
+
+
 static void
 build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                                 struct hmap *port_groups, struct hmap *lflows,
@@ -11684,53 +11830,108 @@  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                                 struct shash *meter_groups, struct hmap *lbs,
                                 struct hmap *bfd_connections)
 {
-    struct ovn_datapath *od;
-    struct ovn_port *op;
-    struct ovn_northd_lb *lb;
-    struct ovn_igmp_group *igmp_group;
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    struct lswitch_flow_build_info lsi = {
-        .datapaths = datapaths,
-        .ports = ports,
-        .port_groups = port_groups,
-        .lflows = lflows,
-        .mcgroups = mcgroups,
-        .igmp_groups = igmp_groups,
-        .meter_groups = meter_groups,
-        .lbs = lbs,
-        .bfd_connections = bfd_connections,
-        .svc_check_match = svc_check_match,
-        .match = DS_EMPTY_INITIALIZER,
-        .actions = DS_EMPTY_INITIALIZER,
-    };
+    if (use_parallel_build) {
+        init_lflows_thread_pool();
+        struct hmap *lflow_segs;
+        struct lswitch_flow_build_info *lsiv;
+        int index;
 
-    /* Combined build - all lflow generation from lswitch and lrouter
-     * will move here and will be reogranized by iterator type.
-     */
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
-    }
-    HMAP_FOR_EACH (op, key_node, ports) {
-        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
-    }
-    HMAP_FOR_EACH (lb, hmap_node, lbs) {
-        build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
-                                             &lsi.actions,
-                                             &lsi.match);
-    }
-    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
-        build_lswitch_ip_mcast_igmp_mld(igmp_group,
-                                        lsi.lflows,
-                                        &lsi.actions,
-                                        &lsi.match);
-    }
-    free(svc_check_match);
+        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
+        if (use_logical_dp_groups) {
+            lflow_segs = NULL;
+        } else {
+            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
+        }
+
+        /* Set up "work chunks" for each thread to work on. */
 
-    ds_destroy(&lsi.match);
-    ds_destroy(&lsi.actions);
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            if (use_logical_dp_groups) {
+                /* if dp_groups are in use we lock a shared lflows hash
+                 * on a per-bucket level instead of merging hash frags */
+                lsiv[index].lflows = lflows;
+            } else {
+                fast_hmap_init(&lflow_segs[index], lflows->mask);
+                lsiv[index].lflows = &lflow_segs[index];
+            }
+
+            lsiv[index].datapaths = datapaths;
+            lsiv[index].ports = ports;
+            lsiv[index].port_groups = port_groups;
+            lsiv[index].mcgroups = mcgroups;
+            lsiv[index].igmp_groups = igmp_groups;
+            lsiv[index].meter_groups = meter_groups;
+            lsiv[index].lbs = lbs;
+            lsiv[index].bfd_connections = bfd_connections;
+            lsiv[index].svc_check_match = svc_check_match;
+            ds_init(&lsiv[index].match);
+            ds_init(&lsiv[index].actions);
+
+            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+        }
+
+        /* Run thread pool. */
+        if (use_logical_dp_groups) {
+            run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
+        } else {
+            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+        }
 
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            ds_destroy(&lsiv[index].match);
+            ds_destroy(&lsiv[index].actions);
+        }
+        free(lflow_segs);
+        free(lsiv);
+    } else {
+        struct ovn_datapath *od;
+        struct ovn_port *op;
+        struct ovn_northd_lb *lb;
+        struct ovn_igmp_group *igmp_group;
+        struct lswitch_flow_build_info lsi = {
+            .datapaths = datapaths,
+            .ports = ports,
+            .port_groups = port_groups,
+            .lflows = lflows,
+            .mcgroups = mcgroups,
+            .igmp_groups = igmp_groups,
+            .meter_groups = meter_groups,
+            .lbs = lbs,
+            .bfd_connections = bfd_connections,
+            .svc_check_match = svc_check_match,
+            .match = DS_EMPTY_INITIALIZER,
+            .actions = DS_EMPTY_INITIALIZER,
+        };
+
+        /* Combined build - all lflow generation from lswitch and lrouter
+         * will move here and will be reogranized by iterator type.
+         */
+        HMAP_FOR_EACH (od, key_node, datapaths) {
+            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
+        }
+        HMAP_FOR_EACH (op, key_node, ports) {
+            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+        }
+        HMAP_FOR_EACH (lb, hmap_node, lbs) {
+            build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
+                                                 &lsi.actions,
+                                                 &lsi.match);
+        }
+        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
+            build_lswitch_ip_mcast_igmp_mld(igmp_group,
+                                            lsi.lflows,
+                                            &lsi.actions,
+                                            &lsi.match);
+        }
+
+        ds_destroy(&lsi.match);
+        ds_destroy(&lsi.actions);
+    }
+
+    free(svc_check_match);
     build_lswitch_flows(datapaths, lflows);
 }
 
@@ -11801,6 +12002,25 @@  ovn_sb_set_lflow_logical_dp_group(
     sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
 }
 
+static ssize_t max_seen_lflow_size = 128;
+
+static ssize_t recent_lflow_map_mask = 0;
+
+static void update_lock_array(struct hmap *lflows)
+{
+    int i;
+    if (recent_lflow_map_mask != lflows->mask) {
+        if (slice_locks) {
+            free(slice_locks);
+        }
+        slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
+        recent_lflow_map_mask = lflows->mask;
+        for (i = 0; i <= lflows->mask; i++) {
+            ovs_mutex_init(&slice_locks[i]);
+        }
+    }
+}
+
 /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
  * constructing their contents based on the OVN_NB database. */
 static void
@@ -11810,13 +12030,21 @@  build_lflows(struct northd_context *ctx, struct hmap *datapaths,
              struct shash *meter_groups,
              struct hmap *lbs, struct hmap *bfd_connections)
 {
-    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap lflows;
 
+    fast_hmap_size_for(&lflows, max_seen_lflow_size);
+    if (use_parallel_build) {
+        update_lock_array(&lflows);
+    }
     build_lswitch_and_lrouter_flows(datapaths, ports,
                                     port_groups, &lflows, mcgroups,
                                     igmp_groups, meter_groups, lbs,
                                     bfd_connections);
 
+    if (hmap_count(&lflows) > max_seen_lflow_size) {
+        max_seen_lflow_size = hmap_count(&lflows);
+    }
+
     /* Collecting all unique datapath groups. */
     struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups);
     struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows);