diff mbox series

[ovs-dev,v14,1/3] ovn-libs: Add support for parallel processing

Message ID 20210212144952.27215-1-anton.ivanov@cambridgegreys.com
State New
Headers show
Series [ovs-dev,v14,1/3] ovn-libs: Add support for parallel processing | expand

Commit Message

Anton Ivanov Feb. 12, 2021, 2:49 p.m. UTC
This adds a set of functions and macros intended to process
hashes in parallel.

The principles of operation are documented in the fasthmap.h

If these one day go into the OVS tree, the OVS tree versions
would be used in preference.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/automake.mk         |   2 +
 lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
 lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
 3 files changed, 742 insertions(+)
 create mode 100644 lib/ovn-parallel-hmap.c
 create mode 100644 lib/ovn-parallel-hmap.h

Comments

0-day Robot Feb. 12, 2021, 3:27 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line is 83 characters long (recommended limit is 79)
#187 FILE: lib/ovn-parallel-hmap.c:146:
            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);

WARNING: Line has trailing whitespace
#466 FILE: lib/ovn-parallel-hmap.c:425:
    } 

ERROR: Improper whitespace around control block
#562 FILE: lib/ovn-parallel-hmap.h:60:
#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \

WARNING: Line has trailing whitespace
#736 FILE: lib/ovn-parallel-hmap.h:234:
    hrl->row_locks = NULL;   

Lines checked: 790, Warnings: 3, Errors: 1


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Mark Michelson Feb. 18, 2021, 10:36 p.m. UTC | #2
Hi Anton, in short:

Acked-by: Mark Michelson <mmichels@redhat.com>

I think this can go in as-is. My only question is with regards to how 
dp_groups are handled. The current patch makes it so that when dp_groups 
are used, we parallelize the work. However, instead of each thread 
creating discrete segments of lflows, each thread shares a reference to 
the same large lflow hmap. This means the threads have to lock the 
pertinent hash row each time they add a lflow to the hmap. This makes 
sense since the entire lflow hmap needs to be taken into account when 
determining if a flow is repeated between different datapaths.

I'm curious if this contention might result in poorer parallization 
speedup. Without having done any profiling, my assumption is that this 
shouldn't perform worse than single-threaded code, but that it likely 
isn't performing that much better either. Because this isn't actively 
*harming* performance, that's why I went ahead and acked the patch series.

Once we have run some performance tests, it would be worth determining 
if the algorithm used for dp_groups could be improved. I have a couple 
of thoughts:

1) It might just make sense to disable parallelization when dp_groups 
are enabled. This certainly wouldn't help performance, but it would 
simplify northd somewhat. This might be an OK tradeoff if the 
performance improvement from parallelization with dp_groups is 
insignificant.

2) We could fully parallelize lflow table creation just like we do when 
dp_groups are disabled: split the lflow hmap into segments for each 
worker to fill in, and then merge them together once all threads are 
complete. Then after merging, we could iterate over the completed lflow 
hmap and determine the dp_groups at that point, rather than when 
inserting the lflow into the hmap. This extra hmap traversal might slow 
things down some, but uncontended parallelization could more than offset 
that cost.

What do you think about these ideas? I bring them up because OVN is 
trending towards eventually having dp_groups enabled and no longer being 
disable-able.

On 2/12/21 9:49 AM, Anton Ivanov wrote:
> This adds a set of functions and macros intended to process
> hashes in parallel.
> 
> The principles of operation are documented in the fasthmap.h
> 
> If these one day go into the OVS tree, the OVS tree versions
> would be used in preference.
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---
>   lib/automake.mk         |   2 +
>   lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>   lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>   3 files changed, 742 insertions(+)
>   create mode 100644 lib/ovn-parallel-hmap.c
>   create mode 100644 lib/ovn-parallel-hmap.h
> 
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 250c7aefa..781be2109 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>   	lib/expr.c \
>   	lib/extend-table.h \
>   	lib/extend-table.c \
> +	lib/ovn-parallel-hmap.h \
> +	lib/ovn-parallel-hmap.c \
>   	lib/ip-mcast-index.c \
>   	lib/ip-mcast-index.h \
>   	lib/mcast-group-index.c \
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> new file mode 100644
> index 000000000..06aa95aba
> --- /dev/null
> +++ b/lib/ovn-parallel-hmap.c
> @@ -0,0 +1,455 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <fcntl.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <semaphore.h>
> +#include "fatal-signal.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovn-parallel-hmap.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "ovs-numa.h"
> +#include "random.h"
> +
> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
> +
> +#ifndef OVS_HAS_PARALLEL_HMAP
> +
> +#define WORKER_SEM_NAME "%x-%p-%x"
> +#define MAIN_SEM_NAME "%x-%p-main"
> +
> +/* These are accessed under mutex inside add_worker_pool().
> + * They do not need to be atomic.
> + */
> +
> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> +static bool can_parallelize = false;
> +
> +/* This is set only in the process of exit and the set is
> + * accompanied by a fence. It does not need to be atomic or be
> + * accessed under a lock.
> + */
> +
> +static bool workers_must_exit = false;
> +
> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static int pool_size;
> +
> +static int sembase;
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED);
> +static void setup_worker_pools(bool force);
> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index);
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index);
> +
> +bool ovn_stop_parallel_processing(void)
> +{
> +    return workers_must_exit;
> +}
> +
> +bool ovn_can_parallelize_hashes(bool force_parallel)
> +{
> +    bool test = false;
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools(force_parallel);
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +    return can_parallelize;
> +}
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
> +
> +    struct worker_pool *new_pool = NULL;
> +    struct worker_control *new_control;
> +    bool test = false;
> +    int i;
> +    char sem_name[256];
> +
> +
> +    /* Belt and braces - initialize the pool system just in case if
> +     * if it is not yet initialized.
> +     */
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools(false);
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +
> +    ovs_mutex_lock(&init_mutex);
> +    if (can_parallelize) {
> +        new_pool = xmalloc(sizeof(struct worker_pool));
> +        new_pool->size = pool_size;
> +        new_pool->controls = NULL;
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +        if (new_pool->done == SEM_FAILED) {
> +            goto cleanup;
> +        }
> +
> +        new_pool->controls =
> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
> +
> +        for (i = 0; i < new_pool->size; i++) {
> +            new_control = &new_pool->controls[i];
> +            new_control->id = i;
> +            new_control->done = new_pool->done;
> +            new_control->data = NULL;
> +            ovs_mutex_init(&new_control->mutex);
> +            new_control->finished = ATOMIC_VAR_INIT(false);
> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +            if (new_control->fire == SEM_FAILED) {
> +                goto cleanup;
> +            }
> +        }
> +
> +        for (i = 0; i < pool_size; i++) {
> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> +        }
> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return new_pool;
> +cleanup:
> +
> +    /* Something went wrong when opening semaphores. In this case
> +     * it is better to shut off parallel procesing altogether
> +     */
> +
> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
> +    can_parallelize = false;
> +    if (new_pool->controls) {
> +        for (i = 0; i < new_pool->size; i++) {
> +            if (new_pool->controls[i].fire != SEM_FAILED) {
> +                sem_close(new_pool->controls[i].fire);
> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> +                sem_unlink(sem_name);
> +                break; /* semaphores past this one are uninitialized */
> +            }
> +        }
> +    }
> +    if (new_pool->done != SEM_FAILED) {
> +        sem_close(new_pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> +        sem_unlink(sem_name);
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return NULL;
> +}
> +
> +
> +/* Initializes 'hmap' as an empty hash table with mask N. */
> +void
> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> +{
> +    size_t i;
> +
> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> +    hmap->one = NULL;
> +    hmap->mask = mask;
> +    hmap->n = 0;
> +    for (i = 0; i <= hmap->mask; i++) {
> +        hmap->buckets[i] = NULL;
> +    }
> +}
> +
> +/* Initializes 'hmap' as an empty hash table of size X.
> + * Intended for use in parallel processing so that all
> + * fragments used to store results in a parallel job
> + * are the same size.
> + */
> +void
> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
> +{
> +    size_t mask;
> +    mask = size / 2;
> +    mask |= mask >> 1;
> +    mask |= mask >> 2;
> +    mask |= mask >> 4;
> +    mask |= mask >> 8;
> +    mask |= mask >> 16;
> +#if SIZE_MAX > UINT32_MAX
> +    mask |= mask >> 32;
> +#endif
> +
> +    /* If we need to dynamically allocate buckets we might as well allocate at
> +     * least 4 of them. */
> +    mask |= (mask & 1) << 1;
> +
> +    fast_hmap_init(hmap, mask);
> +}
> +
> +/* Run a thread pool which uses a callback function to process results
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool,
> +                           void *fin_result, void *result_frags,
> +                           void (*helper_func)(struct worker_pool *pool,
> +                                               void *fin_result,
> +                                               void *result_frags, int index))
> +{
> +    int index, completed;
> +
> +    /* Ensure that all worker threads see the same data as the
> +     * main thread.
> +     */
> +
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Start workers */
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        /* Note - we do not loop on semaphore until it reaches
> +         * zero, but on pool size/remaining workers.
> +         * This is by design. If the inner loop can handle
> +         * completion for more than one worker within an iteration
> +         * it will do so to ensure no additional iterations and
> +         * waits once all of them are done.
> +         *
> +         * This may result in us having an initial positive value
> +         * of the semaphore when the pool is invoked the next time.
> +         * This is harmless - the loop will spin up a couple of times
> +         * doing nothing while the workers are processing their data
> +         * slices.
> +         */
> +        sem_wait(pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            /* If the worker has marked its data chunk as complete,
> +             * invoke the helper function to combine the results of
> +             * this worker into the main result.
> +             *
> +             * The worker must invoke an appropriate memory fence
> +             * (most likely acq_rel) to ensure that the main thread
> +             * sees all of the results produced by the worker.
> +             */
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                if (helper_func) {
> +                    (helper_func)(pool, fin_result, result_frags, index);
> +                }
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +/* Run a thread pool - basic, does not do results processing.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool)
> +{
> +    run_pool_callback(pool, NULL, NULL, NULL);
> +}
> +
> +/* Brute force merge of a hashmap into another hashmap.
> + * Intended for use in parallel processing. The destination
> + * hashmap MUST be the same size as the one being merged.
> + *
> + * This can be achieved by pre-allocating them to correct size
> + * and using hmap_insert_fast() instead of hmap_insert()
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> +{
> +    size_t i;
> +
> +    ovs_assert(inc->mask == dest->mask);
> +
> +    if (!inc->n) {
> +        /* Request to merge an empty frag, nothing to do */
> +        return;
> +    }
> +
> +    for (i = 0; i <= dest->mask; i++) {
> +        struct hmap_node **dest_bucket = &dest->buckets[i];
> +        struct hmap_node **inc_bucket = &inc->buckets[i];
> +        if (*inc_bucket != NULL) {
> +            struct hmap_node *last_node = *inc_bucket;
> +            while (last_node->next != NULL) {
> +                last_node = last_node->next;
> +            }
> +            last_node->next = *dest_bucket;
> +            *dest_bucket = *inc_bucket;
> +            *inc_bucket = NULL;
> +        }
> +    }
> +    dest->n += inc->n;
> +    inc->n = 0;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +
> +
> +void ovn_run_pool_hash(
> +        struct worker_pool *pool,
> +        struct hmap *result,
> +        struct hmap *result_frags)
> +{
> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
> +}
> +
> +/* Run a thread pool which gathers results in an array of lists.
> + * Merge results.
> + */
> +void ovn_run_pool_list(
> +        struct worker_pool *pool,
> +        struct ovs_list *result,
> +        struct ovs_list *result_frags)
> +{
> +    run_pool_callback(pool, result, result_frags, merge_list_results);
> +}
> +
> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
> +{
> +    int i;
> +    if (hrl->mask != lflows->mask) {
> +        if (hrl->row_locks) {
> +            free(hrl->row_locks);
> +        }
> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
> +        hrl->mask = lflows->mask;
> +        for (i = 0; i <= lflows->mask; i++) {
> +            ovs_mutex_init(&hrl->row_locks[i]);
> +        }
> +    }
> +}
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> +    int i;
> +    static struct worker_pool *pool;
> +    char sem_name[256];
> +
> +    workers_must_exit = true;
> +
> +    /* All workers must honour the must_exit flag and check for it regularly.
> +     * We can make it atomic and check it via atomics in workers, but that
> +     * is not really necessary as it is set just once - when the program
> +     * terminates. So we use a fence which is invoked before exiting instead.
> +     */
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Wake up the workers after the must_exit flag has been set */
> +
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_post(pool->controls[i].fire);
> +        }
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_close(pool->controls[i].fire);
> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +            sem_unlink(sem_name);
> +        }
> +        sem_close(pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +        sem_unlink(sem_name);
> +    }
> +}
> +
> +static void setup_worker_pools(bool force) {
> +    int cores, nodes;
> +
> +    nodes = ovs_numa_get_n_numas();
> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> +        nodes = 1;
> +    }
> +    cores = ovs_numa_get_n_cores();
> +
> +    /* If there is no NUMA config, use 4 cores.
> +     * If there is NUMA config use half the cores on
> +     * one node so that the OS does not start pushing
> +     * threads to other nodes.
> +     */
> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> +        /* If there is no NUMA we can try the ovs-threads routine.
> +         * It falls back to sysconf and/or affinity mask.
> +         */
> +        cores = count_cpu_cores();
> +        pool_size = cores;
> +    } else {
> +        pool_size = cores / nodes;
> +    }
> +    if ((pool_size < 4) && force) {
> +        pool_size = 4;
> +    }
> +    can_parallelize = (pool_size >= 3);
> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> +    sembase = random_uint32();
> +}
> +
> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index)
> +{
> +    struct ovs_list *result = (struct ovs_list *)fin_result;
> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
> +
> +    if (!ovs_list_is_empty(&res_frags[index])) {
> +        ovs_list_splice(result->next,
> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
> +    }
> +}
> +
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index)
> +{
> +    struct hmap *result = (struct hmap *)fin_result;
> +    struct hmap *res_frags = (struct hmap *)result_frags;
> +
> +    fast_hmap_merge(result, &res_frags[index]);
> +    hmap_destroy(&res_frags[index]);
> +}
> +
> +#endif
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> new file mode 100644
> index 000000000..71ad17fb0
> --- /dev/null
> +++ b/lib/ovn-parallel-hmap.h
> @@ -0,0 +1,285 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVN_PARALLEL_HMAP
> +#define OVN_PARALLEL_HMAP 1
> +
> +/* if the parallel macros are defined by hmap.h or any other ovs define
> + * we skip over the ovn specific definitions.
> + */
> +
> +#ifdef  __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdbool.h>
> +#include <stdlib.h>
> +#include <semaphore.h>
> +#include "openvswitch/util.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovs-atomic.h"
> +
> +/* Process this include only if OVS does not supply parallel definitions
> + */
> +
> +#ifdef OVS_HAS_PARALLEL_HMAP
> +
> +#include "parallel-hmap.h"
> +
> +#else
> +
> +
> +#ifdef __clang__
> +#pragma clang diagnostic push
> +#pragma clang diagnostic ignored "-Wthread-safety"
> +#endif
> +
> +
> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> + * of parallel processing.
> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
> + * and will iterate hash buckets ThreadID, ThreadID + step,
> + * ThreadID + step * 2, etc. The actual macro accepts
> + * ThreadID + step * i as the JOBID parameter.
> + */
> +
> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> +       || ((NODE = NULL), false); \
> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
> +
> +/* We do not have a SAFE version of the macro, because the hash size is not
> + * atomic and hash removal operations would need to be wrapped with
> + * locks. This will defeat most of the benefits from doing anything in
> + * parallel.
> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> + * each thread should store them in a temporary list result instead, merging
> + * the lists into a combined result at the end */
> +
> +/* Work "Handle" */
> +
> +struct worker_control {
> +    int id; /* Used as a modulo when iterating over a hash. */
> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
> +    struct ovs_mutex mutex; /* Guards the data. */
> +    void *data; /* Pointer to data to be processed. */
> +    void *workload; /* back-pointer to the worker pool structure. */
> +};
> +
> +struct worker_pool {
> +    int size;   /* Number of threads in the pool. */
> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> +    struct worker_control *controls; /* "Handles" in this pool. */
> +    sem_t *done; /* Work completion semaphorew. */
> +};
> +
> +/* Add a worker pool for thread function start() which expects a pointer to
> + * a worker_control structure as an argument. */
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +
> +/* Setting this to true will make all processing threads exit */
> +
> +bool ovn_stop_parallel_processing(void);
> +
> +/* Build a hmap pre-sized for size elements */
> +
> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
> +
> +/* Build a hmap with a mask equals to size */
> +
> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
> +
> +/* Brute-force merge a hmap into hmap.
> + * Dest and inc have to have the same mask. The merge is performed
> + * by extending the element list for bucket N in the dest hmap with the list
> + * from bucket N in inc.
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> +
> +/* Run a pool, without any default processing of results.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool);
> +
> +/* Run a pool, merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_run_pool_hash(struct worker_pool *pool,
> +                       struct hmap *result, struct hmap *result_frags);
> +/* Run a pool, merge results from list frags into a final list result.
> + */
> +
> +void ovn_run_pool_list(struct worker_pool *pool,
> +                       struct ovs_list *result, struct ovs_list *result_frags);
> +
> +/* Run a pool, call a callback function to perform processing of results.
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> +                    void *result_frags,
> +                    void (*helper_func)(struct worker_pool *pool,
> +                        void *fin_result, void *result_frags, int index));
> +
> +
> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> + * would land, or a null pointer if that bucket is empty. */
> +
> +static inline struct hmap_node *
> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> +{
> +    return hmap->buckets[num];
> +}
> +
> +static inline struct hmap_node *
> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
> +{
> +    size_t i;
> +    for (i = start; i <= hmap->mask; i+= pool_size) {
> +        struct hmap_node *node = hmap->buckets[i];
> +        if (node) {
> +            return node;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Returns the first node in 'hmap', as expected by thread with job_id
> + * for parallel processing in arbitrary order, or a null pointer if
> + * the slice of 'hmap' for that job_id is empty. */
> +static inline struct hmap_node *
> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
> +{
> +    return parallel_hmap_next__(hmap, job_id, pool_size);
> +}
> +
> +/* Returns the next node in the slice of 'hmap' following 'node',
> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> + * the 'hmap' slice.
> + *
> + */
> +static inline struct hmap_node *
> +parallel_hmap_next(const struct hmap *hmap,
> +                   const struct hmap_node *node, ssize_t pool_size)
> +{
> +    return (node->next
> +            ? node->next
> +            : parallel_hmap_next__(hmap,
> +                (node->hash & hmap->mask) + pool_size, pool_size));
> +}
> +
> +static inline void post_completed_work(struct worker_control *control)
> +{
> +    atomic_thread_fence(memory_order_acq_rel);
> +    atomic_store_relaxed(&control->finished, true);
> +    sem_post(control->done);
> +}
> +
> +static inline void wait_for_work(struct worker_control *control)
> +{
> +    sem_wait(control->fire);
> +}
> +
> +/* Hash per-row locking support - to be used only in conjunction
> + * with fast hash inserts. Normal hash inserts may resize the hash
> + * rendering the locking invalid.
> + */
> +
> +struct hashrow_locks {
> +    ssize_t mask;
> +    struct ovs_mutex *row_locks;
> +};
> +
> +/* Update an hash row locks structure to match the current hash size */
> +
> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
> +
> +/* Lock a hash row */
> +
> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> +{
> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
> +}
> +
> +/* Unlock a hash row */
> +
> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> +{
> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
> +}
> +/* Init the row locks structure */
> +
> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
> +{
> +    hrl->mask = 0;
> +    hrl->row_locks = NULL;
> +}
> +
> +bool ovn_can_parallelize_hashes(bool force_parallel);
> +
> +/* Use the OVN library functions for stuff which OVS has not defined
> + * If OVS has defined these, they will still compile using the OVN
> + * local names, but will be dropped by the linker in favour of the OVS
> + * supplied functions.
> + */
> +
> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
> +
> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> +
> +#define stop_parallel_processing() ovn_stop_parallel_processing()
> +
> +#define add_worker_pool(start) ovn_add_worker_pool(start)
> +
> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
> +
> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
> +
> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
> +
> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
> +
> +#define ovn_run_pool(pool) ovn_run_pool(pool)
> +
> +#define run_pool_hash(pool, result, result_frags) \
> +    ovn_run_pool_hash(pool, result, result_frags)
> +
> +#define run_pool_list(pool, result, result_frags) \
> +    ovn_run_pool_list(pool, result, result_frags)
> +
> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
> +
> +
> +
> +#ifdef __clang__
> +#pragma clang diagnostic pop
> +#endif
> +
> +#endif
> +
> +#ifdef  __cplusplus
> +}
> +#endif
> +
> +
> +#endif /* lib/fasthmap.h */
>
Anton Ivanov Feb. 19, 2021, 7:34 a.m. UTC | #3
On 18/02/2021 22:36, Mark Michelson wrote:
> Hi Anton, in short:
>
> Acked-by: Mark Michelson <mmichels@redhat.com>
>
> I think this can go in as-is. My only question is with regards to how 
> dp_groups are handled. The current patch makes it so that when 
> dp_groups are used, we parallelize the work. However, instead of each 
> thread creating discrete segments of lflows, each thread shares a 
> reference to the same large lflow hmap. This means the threads have to 
> lock the pertinent hash row each time they add a lflow to the hmap. 
> This makes sense since the entire lflow hmap needs to be taken into 
> account when determining if a flow is repeated between different 
> datapaths.
>
> I'm curious if this contention might result in poorer parallization 
> speedup.


It does. According to Newman's results the speedup is ~ 2 times instead 
of 4+


> Without having done any profiling, my assumption is that this 
> shouldn't perform worse than single-threaded code, but that it likely 
> isn't performing that much better either.

See above.

> Because this isn't actively *harming* performance, that's why I went 
> ahead and acked the patch series.
>
> Once we have run some performance tests, it would be worth determining 
> if the algorithm used for dp_groups could be improved. I have a couple 
> of thoughts:
>
> 1) It might just make sense to disable parallelization when dp_groups 
> are enabled. This certainly wouldn't help performance, but it would 
> simplify northd somewhat. This might be an OK tradeoff if the 
> performance improvement from parallelization with dp_groups is 
> insignificant.

Actually, it looks like it is still faster if you have at least 8 
threads to throw at it.

>
> 2) We could fully parallelize lflow table creation just like we do 
> when dp_groups are disabled: split the lflow hmap into segments for 
> each worker to fill in, and then merge them together once all threads 
> are complete. Then after merging, we could iterate over the completed 
> lflow hmap and determine the dp_groups at that point, rather than when 
> inserting the lflow into the hmap. This extra hmap traversal might 
> slow things down some, but uncontended parallelization could more than 
> offset that cost.

I would suggest leaving it as is. It does not lock full table - it locks 
"hash rows". So the contention is not as bad as one would expect and it 
still provides a reasonable gain.

>
> What do you think about these ideas? I bring them up because OVN is 
> trending towards eventually having dp_groups enabled and no longer 
> being disable-able.
>
> On 2/12/21 9:49 AM, Anton Ivanov wrote:
>> This adds a set of functions and macros intended to process
>> hashes in parallel.
>>
>> The principles of operation are documented in the fasthmap.h
>>
>> If these one day go into the OVS tree, the OVS tree versions
>> would be used in preference.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> ---
>>   lib/automake.mk         |   2 +
>>   lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>>   lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>>   3 files changed, 742 insertions(+)
>>   create mode 100644 lib/ovn-parallel-hmap.c
>>   create mode 100644 lib/ovn-parallel-hmap.h
>>
>> diff --git a/lib/automake.mk b/lib/automake.mk
>> index 250c7aefa..781be2109 100644
>> --- a/lib/automake.mk
>> +++ b/lib/automake.mk
>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>       lib/expr.c \
>>       lib/extend-table.h \
>>       lib/extend-table.c \
>> +    lib/ovn-parallel-hmap.h \
>> +    lib/ovn-parallel-hmap.c \
>>       lib/ip-mcast-index.c \
>>       lib/ip-mcast-index.h \
>>       lib/mcast-group-index.c \
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> new file mode 100644
>> index 000000000..06aa95aba
>> --- /dev/null
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -0,0 +1,455 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +#include <stdint.h>
>> +#include <string.h>
>> +#include <stdlib.h>
>> +#include <fcntl.h>
>> +#include <unistd.h>
>> +#include <errno.h>
>> +#include <semaphore.h>
>> +#include "fatal-signal.h"
>> +#include "util.h"
>> +#include "openvswitch/vlog.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovn-parallel-hmap.h"
>> +#include "ovs-atomic.h"
>> +#include "ovs-thread.h"
>> +#include "ovs-numa.h"
>> +#include "random.h"
>> +
>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>> +
>> +#ifndef OVS_HAS_PARALLEL_HMAP
>> +
>> +#define WORKER_SEM_NAME "%x-%p-%x"
>> +#define MAIN_SEM_NAME "%x-%p-main"
>> +
>> +/* These are accessed under mutex inside add_worker_pool().
>> + * They do not need to be atomic.
>> + */
>> +
>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>> +static bool can_parallelize = false;
>> +
>> +/* This is set only in the process of exit and the set is
>> + * accompanied by a fence. It does not need to be atomic or be
>> + * accessed under a lock.
>> + */
>> +
>> +static bool workers_must_exit = false;
>> +
>> +static struct ovs_list worker_pools = 
>> OVS_LIST_INITIALIZER(&worker_pools);
>> +
>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>> +
>> +static int pool_size;
>> +
>> +static int sembase;
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>> +static void setup_worker_pools(bool force);
>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index);
>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index);
>> +
>> +bool ovn_stop_parallel_processing(void)
>> +{
>> +    return workers_must_exit;
>> +}
>> +
>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>> +{
>> +    bool test = false;
>> +
>> +    if (atomic_compare_exchange_strong(
>> +            &initial_pool_setup,
>> +            &test,
>> +            true)) {
>> +        ovs_mutex_lock(&init_mutex);
>> +        setup_worker_pools(force_parallel);
>> +        ovs_mutex_unlock(&init_mutex);
>> +    }
>> +    return can_parallelize;
>> +}
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>> +
>> +    struct worker_pool *new_pool = NULL;
>> +    struct worker_control *new_control;
>> +    bool test = false;
>> +    int i;
>> +    char sem_name[256];
>> +
>> +
>> +    /* Belt and braces - initialize the pool system just in case if
>> +     * if it is not yet initialized.
>> +     */
>> +
>> +    if (atomic_compare_exchange_strong(
>> +            &initial_pool_setup,
>> +            &test,
>> +            true)) {
>> +        ovs_mutex_lock(&init_mutex);
>> +        setup_worker_pools(false);
>> +        ovs_mutex_unlock(&init_mutex);
>> +    }
>> +
>> +    ovs_mutex_lock(&init_mutex);
>> +    if (can_parallelize) {
>> +        new_pool = xmalloc(sizeof(struct worker_pool));
>> +        new_pool->size = pool_size;
>> +        new_pool->controls = NULL;
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +        if (new_pool->done == SEM_FAILED) {
>> +            goto cleanup;
>> +        }
>> +
>> +        new_pool->controls =
>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>> +
>> +        for (i = 0; i < new_pool->size; i++) {
>> +            new_control = &new_pool->controls[i];
>> +            new_control->id = i;
>> +            new_control->done = new_pool->done;
>> +            new_control->data = NULL;
>> +            ovs_mutex_init(&new_control->mutex);
>> +            new_control->finished = ATOMIC_VAR_INIT(false);
>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 
>> 0);
>> +            if (new_control->fire == SEM_FAILED) {
>> +                goto cleanup;
>> +            }
>> +        }
>> +
>> +        for (i = 0; i < pool_size; i++) {
>> +            ovs_thread_create("worker pool helper", start, 
>> &new_pool->controls[i]);
>> +        }
>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>> +    }
>> +    ovs_mutex_unlock(&init_mutex);
>> +    return new_pool;
>> +cleanup:
>> +
>> +    /* Something went wrong when opening semaphores. In this case
>> +     * it is better to shut off parallel procesing altogether
>> +     */
>> +
>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", 
>> errno);
>> +    can_parallelize = false;
>> +    if (new_pool->controls) {
>> +        for (i = 0; i < new_pool->size; i++) {
>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
>> +                sem_close(new_pool->controls[i].fire);
>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, 
>> new_pool, i);
>> +                sem_unlink(sem_name);
>> +                break; /* semaphores past this one are uninitialized */
>> +            }
>> +        }
>> +    }
>> +    if (new_pool->done != SEM_FAILED) {
>> +        sem_close(new_pool->done);
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> +        sem_unlink(sem_name);
>> +    }
>> +    ovs_mutex_unlock(&init_mutex);
>> +    return NULL;
>> +}
>> +
>> +
>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>> +void
>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>> +{
>> +    size_t i;
>> +
>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>> +    hmap->one = NULL;
>> +    hmap->mask = mask;
>> +    hmap->n = 0;
>> +    for (i = 0; i <= hmap->mask; i++) {
>> +        hmap->buckets[i] = NULL;
>> +    }
>> +}
>> +
>> +/* Initializes 'hmap' as an empty hash table of size X.
>> + * Intended for use in parallel processing so that all
>> + * fragments used to store results in a parallel job
>> + * are the same size.
>> + */
>> +void
>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>> +{
>> +    size_t mask;
>> +    mask = size / 2;
>> +    mask |= mask >> 1;
>> +    mask |= mask >> 2;
>> +    mask |= mask >> 4;
>> +    mask |= mask >> 8;
>> +    mask |= mask >> 16;
>> +#if SIZE_MAX > UINT32_MAX
>> +    mask |= mask >> 32;
>> +#endif
>> +
>> +    /* If we need to dynamically allocate buckets we might as well 
>> allocate at
>> +     * least 4 of them. */
>> +    mask |= (mask & 1) << 1;
>> +
>> +    fast_hmap_init(hmap, mask);
>> +}
>> +
>> +/* Run a thread pool which uses a callback function to process results
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool,
>> +                           void *fin_result, void *result_frags,
>> +                           void (*helper_func)(struct worker_pool 
>> *pool,
>> +                                               void *fin_result,
>> +                                               void *result_frags, 
>> int index))
>> +{
>> +    int index, completed;
>> +
>> +    /* Ensure that all worker threads see the same data as the
>> +     * main thread.
>> +     */
>> +
>> +    atomic_thread_fence(memory_order_acq_rel);
>> +
>> +    /* Start workers */
>> +
>> +    for (index = 0; index < pool->size; index++) {
>> +        sem_post(pool->controls[index].fire);
>> +    }
>> +
>> +    completed = 0;
>> +
>> +    do {
>> +        bool test;
>> +        /* Note - we do not loop on semaphore until it reaches
>> +         * zero, but on pool size/remaining workers.
>> +         * This is by design. If the inner loop can handle
>> +         * completion for more than one worker within an iteration
>> +         * it will do so to ensure no additional iterations and
>> +         * waits once all of them are done.
>> +         *
>> +         * This may result in us having an initial positive value
>> +         * of the semaphore when the pool is invoked the next time.
>> +         * This is harmless - the loop will spin up a couple of times
>> +         * doing nothing while the workers are processing their data
>> +         * slices.
>> +         */
>> +        sem_wait(pool->done);
>> +        for (index = 0; index < pool->size; index++) {
>> +            test = true;
>> +            /* If the worker has marked its data chunk as complete,
>> +             * invoke the helper function to combine the results of
>> +             * this worker into the main result.
>> +             *
>> +             * The worker must invoke an appropriate memory fence
>> +             * (most likely acq_rel) to ensure that the main thread
>> +             * sees all of the results produced by the worker.
>> +             */
>> +            if (atomic_compare_exchange_weak(
>> +                    &pool->controls[index].finished,
>> +                    &test,
>> +                    false)) {
>> +                if (helper_func) {
>> +                    (helper_func)(pool, fin_result, result_frags, 
>> index);
>> +                }
>> +                completed++;
>> +                pool->controls[index].data = NULL;
>> +            }
>> +        }
>> +    } while (completed < pool->size);
>> +}
>> +
>> +/* Run a thread pool - basic, does not do results processing.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool)
>> +{
>> +    run_pool_callback(pool, NULL, NULL, NULL);
>> +}
>> +
>> +/* Brute force merge of a hashmap into another hashmap.
>> + * Intended for use in parallel processing. The destination
>> + * hashmap MUST be the same size as the one being merged.
>> + *
>> + * This can be achieved by pre-allocating them to correct size
>> + * and using hmap_insert_fast() instead of hmap_insert()
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>> +{
>> +    size_t i;
>> +
>> +    ovs_assert(inc->mask == dest->mask);
>> +
>> +    if (!inc->n) {
>> +        /* Request to merge an empty frag, nothing to do */
>> +        return;
>> +    }
>> +
>> +    for (i = 0; i <= dest->mask; i++) {
>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>> +        if (*inc_bucket != NULL) {
>> +            struct hmap_node *last_node = *inc_bucket;
>> +            while (last_node->next != NULL) {
>> +                last_node = last_node->next;
>> +            }
>> +            last_node->next = *dest_bucket;
>> +            *dest_bucket = *inc_bucket;
>> +            *inc_bucket = NULL;
>> +        }
>> +    }
>> +    dest->n += inc->n;
>> +    inc->n = 0;
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array
>> + * of hashes. Merge results.
>> + */
>> +
>> +
>> +void ovn_run_pool_hash(
>> +        struct worker_pool *pool,
>> +        struct hmap *result,
>> +        struct hmap *result_frags)
>> +{
>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array of lists.
>> + * Merge results.
>> + */
>> +void ovn_run_pool_list(
>> +        struct worker_pool *pool,
>> +        struct ovs_list *result,
>> +        struct ovs_list *result_frags)
>> +{
>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
>> +}
>> +
>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct 
>> hashrow_locks *hrl)
>> +{
>> +    int i;
>> +    if (hrl->mask != lflows->mask) {
>> +        if (hrl->row_locks) {
>> +            free(hrl->row_locks);
>> +        }
>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), 
>> lflows->mask + 1);
>> +        hrl->mask = lflows->mask;
>> +        for (i = 0; i <= lflows->mask; i++) {
>> +            ovs_mutex_init(&hrl->row_locks[i]);
>> +        }
>> +    }
>> +}
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>> +    int i;
>> +    static struct worker_pool *pool;
>> +    char sem_name[256];
>> +
>> +    workers_must_exit = true;
>> +
>> +    /* All workers must honour the must_exit flag and check for it 
>> regularly.
>> +     * We can make it atomic and check it via atomics in workers, 
>> but that
>> +     * is not really necessary as it is set just once - when the 
>> program
>> +     * terminates. So we use a fence which is invoked before exiting 
>> instead.
>> +     */
>> +    atomic_thread_fence(memory_order_acq_rel);
>> +
>> +    /* Wake up the workers after the must_exit flag has been set */
>> +
>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> +        for (i = 0; i < pool->size ; i++) {
>> +            sem_post(pool->controls[i].fire);
>> +        }
>> +        for (i = 0; i < pool->size ; i++) {
>> +            sem_close(pool->controls[i].fire);
>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> +            sem_unlink(sem_name);
>> +        }
>> +        sem_close(pool->done);
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> +        sem_unlink(sem_name);
>> +    }
>> +}
>> +
>> +static void setup_worker_pools(bool force) {
>> +    int cores, nodes;
>> +
>> +    nodes = ovs_numa_get_n_numas();
>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>> +        nodes = 1;
>> +    }
>> +    cores = ovs_numa_get_n_cores();
>> +
>> +    /* If there is no NUMA config, use 4 cores.
>> +     * If there is NUMA config use half the cores on
>> +     * one node so that the OS does not start pushing
>> +     * threads to other nodes.
>> +     */
>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>> +        /* If there is no NUMA we can try the ovs-threads routine.
>> +         * It falls back to sysconf and/or affinity mask.
>> +         */
>> +        cores = count_cpu_cores();
>> +        pool_size = cores;
>> +    } else {
>> +        pool_size = cores / nodes;
>> +    }
>> +    if ((pool_size < 4) && force) {
>> +        pool_size = 4;
>> +    }
>> +    can_parallelize = (pool_size >= 3);
>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>> +    sembase = random_uint32();
>> +}
>> +
>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index)
>> +{
>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>> +
>> +    if (!ovs_list_is_empty(&res_frags[index])) {
>> +        ovs_list_splice(result->next,
>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
>> +    }
>> +}
>> +
>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index)
>> +{
>> +    struct hmap *result = (struct hmap *)fin_result;
>> +    struct hmap *res_frags = (struct hmap *)result_frags;
>> +
>> +    fast_hmap_merge(result, &res_frags[index]);
>> +    hmap_destroy(&res_frags[index]);
>> +}
>> +
>> +#endif
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> new file mode 100644
>> index 000000000..71ad17fb0
>> --- /dev/null
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -0,0 +1,285 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#ifndef OVN_PARALLEL_HMAP
>> +#define OVN_PARALLEL_HMAP 1
>> +
>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>> + * we skip over the ovn specific definitions.
>> + */
>> +
>> +#ifdef  __cplusplus
>> +extern "C" {
>> +#endif
>> +
>> +#include <stdbool.h>
>> +#include <stdlib.h>
>> +#include <semaphore.h>
>> +#include "openvswitch/util.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovs-atomic.h"
>> +
>> +/* Process this include only if OVS does not supply parallel 
>> definitions
>> + */
>> +
>> +#ifdef OVS_HAS_PARALLEL_HMAP
>> +
>> +#include "parallel-hmap.h"
>> +
>> +#else
>> +
>> +
>> +#ifdef __clang__
>> +#pragma clang diagnostic push
>> +#pragma clang diagnostic ignored "-Wthread-safety"
>> +#endif
>> +
>> +
>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>> + * of parallel processing.
>> + * Each worker thread has a different ThreadID in the range of 
>> 0..POOL_SIZE
>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>> + * ThreadID + step * 2, etc. The actual macro accepts
>> + * ThreadID + step * i as the JOBID parameter.
>> + */
>> +
>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), 
>> MEMBER); \
>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>> +       || ((NODE = NULL), false); \
>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), 
>> MEMBER))
>> +
>> +/* We do not have a SAFE version of the macro, because the hash size 
>> is not
>> + * atomic and hash removal operations would need to be wrapped with
>> + * locks. This will defeat most of the benefits from doing anything in
>> + * parallel.
>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove 
>> elements,
>> + * each thread should store them in a temporary list result instead, 
>> merging
>> + * the lists into a combined result at the end */
>> +
>> +/* Work "Handle" */
>> +
>> +struct worker_control {
>> +    int id; /* Used as a modulo when iterating over a hash. */
>> +    atomic_bool finished; /* Set to true after achunk of work is 
>> complete. */
>> +    sem_t *fire; /* Work start semaphore - sem_post starts the 
>> worker. */
>> +    sem_t *done; /* Work completion semaphore - sem_post on 
>> completion. */
>> +    struct ovs_mutex mutex; /* Guards the data. */
>> +    void *data; /* Pointer to data to be processed. */
>> +    void *workload; /* back-pointer to the worker pool structure. */
>> +};
>> +
>> +struct worker_pool {
>> +    int size;   /* Number of threads in the pool. */
>> +    struct ovs_list list_node; /* List of pools - used in 
>> cleanup/exit. */
>> +    struct worker_control *controls; /* "Handles" in this pool. */
>> +    sem_t *done; /* Work completion semaphorew. */
>> +};
>> +
>> +/* Add a worker pool for thread function start() which expects a 
>> pointer to
>> + * a worker_control structure as an argument. */
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>> +
>> +/* Setting this to true will make all processing threads exit */
>> +
>> +bool ovn_stop_parallel_processing(void);
>> +
>> +/* Build a hmap pre-sized for size elements */
>> +
>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>> +
>> +/* Build a hmap with a mask equals to size */
>> +
>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>> +
>> +/* Brute-force merge a hmap into hmap.
>> + * Dest and inc have to have the same mask. The merge is performed
>> + * by extending the element list for bucket N in the dest hmap with 
>> the list
>> + * from bucket N in inc.
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>> +
>> +/* Run a pool, without any default processing of results.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool);
>> +
>> +/* Run a pool, merge results from hash frags into a final hash result.
>> + * The hash frags must be pre-sized to the same size.
>> + */
>> +
>> +void ovn_run_pool_hash(struct worker_pool *pool,
>> +                       struct hmap *result, struct hmap *result_frags);
>> +/* Run a pool, merge results from list frags into a final list result.
>> + */
>> +
>> +void ovn_run_pool_list(struct worker_pool *pool,
>> +                       struct ovs_list *result, struct ovs_list 
>> *result_frags);
>> +
>> +/* Run a pool, call a callback function to perform processing of 
>> results.
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>> +                    void *result_frags,
>> +                    void (*helper_func)(struct worker_pool *pool,
>> +                        void *fin_result, void *result_frags, int 
>> index));
>> +
>> +
>> +/* Returns the first node in 'hmap' in the bucket in which the given 
>> 'hash'
>> + * would land, or a null pointer if that bucket is empty. */
>> +
>> +static inline struct hmap_node *
>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>> +{
>> +    return hmap->buckets[num];
>> +}
>> +
>> +static inline struct hmap_node *
>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t 
>> pool_size)
>> +{
>> +    size_t i;
>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>> +        struct hmap_node *node = hmap->buckets[i];
>> +        if (node) {
>> +            return node;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>> + * for parallel processing in arbitrary order, or a null pointer if
>> + * the slice of 'hmap' for that job_id is empty. */
>> +static inline struct hmap_node *
>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t 
>> pool_size)
>> +{
>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>> +}
>> +
>> +/* Returns the next node in the slice of 'hmap' following 'node',
>> + * in arbitrary order, or a * null pointer if 'node' is the last 
>> node in
>> + * the 'hmap' slice.
>> + *
>> + */
>> +static inline struct hmap_node *
>> +parallel_hmap_next(const struct hmap *hmap,
>> +                   const struct hmap_node *node, ssize_t pool_size)
>> +{
>> +    return (node->next
>> +            ? node->next
>> +            : parallel_hmap_next__(hmap,
>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>> +}
>> +
>> +static inline void post_completed_work(struct worker_control *control)
>> +{
>> +    atomic_thread_fence(memory_order_acq_rel);
>> +    atomic_store_relaxed(&control->finished, true);
>> +    sem_post(control->done);
>> +}
>> +
>> +static inline void wait_for_work(struct worker_control *control)
>> +{
>> +    sem_wait(control->fire);
>> +}
>> +
>> +/* Hash per-row locking support - to be used only in conjunction
>> + * with fast hash inserts. Normal hash inserts may resize the hash
>> + * rendering the locking invalid.
>> + */
>> +
>> +struct hashrow_locks {
>> +    ssize_t mask;
>> +    struct ovs_mutex *row_locks;
>> +};
>> +
>> +/* Update an hash row locks structure to match the current hash size */
>> +
>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct 
>> hashrow_locks *hrl);
>> +
>> +/* Lock a hash row */
>> +
>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t 
>> hash)
>> +{
>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>> +}
>> +
>> +/* Unlock a hash row */
>> +
>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, 
>> uint32_t hash)
>> +{
>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>> +}
>> +/* Init the row locks structure */
>> +
>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>> +{
>> +    hrl->mask = 0;
>> +    hrl->row_locks = NULL;
>> +}
>> +
>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>> +
>> +/* Use the OVN library functions for stuff which OVS has not defined
>> + * If OVS has defined these, they will still compile using the OVN
>> + * local names, but will be dropped by the linker in favour of the OVS
>> + * supplied functions.
>> + */
>> +
>> +#define update_hashrow_locks(lflows, hrl) 
>> ovn_update_hashrow_locks(lflows, hrl)
>> +
>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>> +
>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>> +
>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>> +
>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, 
>> size)
>> +
>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>> +
>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>> +
>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>> +
>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>> +
>> +#define run_pool_hash(pool, result, result_frags) \
>> +    ovn_run_pool_hash(pool, result, result_frags)
>> +
>> +#define run_pool_list(pool, result, result_frags) \
>> +    ovn_run_pool_list(pool, result, result_frags)
>> +
>> +#define run_pool_callback(pool, fin_result, result_frags, 
>> helper_func) \
>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>> +
>> +
>> +
>> +#ifdef __clang__
>> +#pragma clang diagnostic pop
>> +#endif
>> +
>> +#endif
>> +
>> +#ifdef  __cplusplus
>> +}
>> +#endif
>> +
>> +
>> +#endif /* lib/fasthmap.h */
>>
>
>
Numan Siddique Feb. 25, 2021, 12:41 p.m. UTC | #4
On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov
<anton.ivanov@cambridgegreys.com> wrote:
>
> This adds a set of functions and macros intended to process
> hashes in parallel.
>
> The principles of operation are documented in the fasthmap.h
>
> If these one day go into the OVS tree, the OVS tree versions
> would be used in preference.
>
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Hi Anton,

I still see problems with this patch set.

If I apply the first 2 patches and run the tests, most of the test
cases fail or hang.

When configured with gcc and address sanitizer,  the test case fails -
45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
localnet ports with same tags (ovn.at:2970): FAILED
(ovs-macros.at:219) and I see the below in the testsuite.log

*********
clean up OVN
../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
ovn.at:3091: wait succeeded immediately
../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
ovn.at:3091: wait succeeded immediately
../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
ovn.at:3091: wait succeeded after 1 seconds
../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
ovn.at:3091: wait succeeded quickly

main: clean up vswitch
../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid
../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit --cleanup
ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
ovn.at:3091: wait succeeded quickly
../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
ovn.at:3091: wait succeeded immediately
Address Sanitizer reported errors in: asan.2986645
=================================================================
==2986645==ERROR: AddressSanitizer: SEGV on unknown address
0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80
T2)
==2986645==The signal is caused by a READ memory access.
../../tests/ovs-macros.at:219: hard failure
45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
localnet ports with same tags (ovn.at:2970): FAILED
(ovs-macros.at:219)

******

The same is observed for many test cases.

When I configure clang and run the tests, all the tests pass, but I
see lot of coredumps.
I think I had reported this earlier too.

Here is the backtrace
-----
#0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
/lib64/libpthread.so.0
[Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))]
Missing separate debuginfos, use: dnf debuginfo-install
glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
zlib-1.2.11-23.fc33.x86_64
(gdb) bt
#0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
/lib64/libpthread.so.0
#1  0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at
../lib/ovn-parallel-hmap.h:193
#2  build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807
#3  0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at
../lib/ovs-thread.c:383
#4  0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0
#5  0x00001528ad00e903 in clone () from /lib64/libc.so.6
----


I'm sorry but something is not right. Instead of using semaphores, why
can't we use
'struct latch' for each worker and use it to synchronize between the main thread
and the workers ?

The usage of the function - can_parallelize_hashes() in ovn-northd.c is very
confusing to me.

I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only once
and for the subsequent calls, this function will be a no-op (due to
atomic_compare_exchange_strong()).

Since this function is called before the workers are started, what is
the need to use
atomic_compare_exchange_strong() ?

Let's say we want to add another config option -
force_northd_parallel, and if the user
toggles the value between the runs, then the below code in
ovnnb_db_run() in patch 3

---
use_parallel_build = smap_get_bool(&nb->options,
"use_parallel_build", false) &&
ovn_can_parallelize_hashes(false);
--
 will have no effect since setup_worker_pools() is not called later
once the atomic
bool initial_pool_setup is set to true.

I think we should provide the option to toggle this value at run time.
Also the patch 3 should
add an option to configure the force_parallel.

> ---
>  lib/automake.mk         |   2 +
>  lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>  lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>  3 files changed, 742 insertions(+)
>  create mode 100644 lib/ovn-parallel-hmap.c
>  create mode 100644 lib/ovn-parallel-hmap.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 250c7aefa..781be2109 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>         lib/expr.c \
>         lib/extend-table.h \
>         lib/extend-table.c \
> +       lib/ovn-parallel-hmap.h \
> +       lib/ovn-parallel-hmap.c \
>         lib/ip-mcast-index.c \
>         lib/ip-mcast-index.h \
>         lib/mcast-group-index.c \
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> new file mode 100644
> index 000000000..06aa95aba
> --- /dev/null
> +++ b/lib/ovn-parallel-hmap.c
> @@ -0,0 +1,455 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <fcntl.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <semaphore.h>
> +#include "fatal-signal.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovn-parallel-hmap.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "ovs-numa.h"
> +#include "random.h"
> +
> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
> +
> +#ifndef OVS_HAS_PARALLEL_HMAP
> +
> +#define WORKER_SEM_NAME "%x-%p-%x"
> +#define MAIN_SEM_NAME "%x-%p-main"
> +
> +/* These are accessed under mutex inside add_worker_pool().
> + * They do not need to be atomic.
> + */
> +
> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> +static bool can_parallelize = false;
> +
> +/* This is set only in the process of exit and the set is
> + * accompanied by a fence. It does not need to be atomic or be
> + * accessed under a lock.
> + */
> +
> +static bool workers_must_exit = false;
> +
> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static int pool_size;
> +
> +static int sembase;
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED);
> +static void setup_worker_pools(bool force);
> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index);
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index);
> +
> +bool ovn_stop_parallel_processing(void)
> +{
> +    return workers_must_exit;
> +}
> +
> +bool ovn_can_parallelize_hashes(bool force_parallel)
> +{
> +    bool test = false;
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools(force_parallel);
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +    return can_parallelize;
> +}
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
> +
> +    struct worker_pool *new_pool = NULL;
> +    struct worker_control *new_control;
> +    bool test = false;
> +    int i;
> +    char sem_name[256];
> +
> +
> +    /* Belt and braces - initialize the pool system just in case if
> +     * if it is not yet initialized.
> +     */
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools(false);
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +
> +    ovs_mutex_lock(&init_mutex);
> +    if (can_parallelize) {
> +        new_pool = xmalloc(sizeof(struct worker_pool));
> +        new_pool->size = pool_size;
> +        new_pool->controls = NULL;
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +        if (new_pool->done == SEM_FAILED) {
> +            goto cleanup;
> +        }
> +
> +        new_pool->controls =
> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
> +
> +        for (i = 0; i < new_pool->size; i++) {
> +            new_control = &new_pool->controls[i];
> +            new_control->id = i;
> +            new_control->done = new_pool->done;
> +            new_control->data = NULL;
> +            ovs_mutex_init(&new_control->mutex);
> +            new_control->finished = ATOMIC_VAR_INIT(false);
> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +            if (new_control->fire == SEM_FAILED) {
> +                goto cleanup;
> +            }
> +        }
> +
> +        for (i = 0; i < pool_size; i++) {
> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> +        }
> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return new_pool;
> +cleanup:
> +
> +    /* Something went wrong when opening semaphores. In this case
> +     * it is better to shut off parallel procesing altogether
> +     */
> +
> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
> +    can_parallelize = false;
> +    if (new_pool->controls) {
> +        for (i = 0; i < new_pool->size; i++) {
> +            if (new_pool->controls[i].fire != SEM_FAILED) {
> +                sem_close(new_pool->controls[i].fire);
> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> +                sem_unlink(sem_name);
> +                break; /* semaphores past this one are uninitialized */
> +            }
> +        }
> +    }
> +    if (new_pool->done != SEM_FAILED) {
> +        sem_close(new_pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> +        sem_unlink(sem_name);
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return NULL;
> +}
> +
> +
> +/* Initializes 'hmap' as an empty hash table with mask N. */
> +void
> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> +{
> +    size_t i;
> +
> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> +    hmap->one = NULL;
> +    hmap->mask = mask;
> +    hmap->n = 0;
> +    for (i = 0; i <= hmap->mask; i++) {
> +        hmap->buckets[i] = NULL;
> +    }
> +}
> +
> +/* Initializes 'hmap' as an empty hash table of size X.
> + * Intended for use in parallel processing so that all
> + * fragments used to store results in a parallel job
> + * are the same size.
> + */
> +void
> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
> +{
> +    size_t mask;
> +    mask = size / 2;
> +    mask |= mask >> 1;
> +    mask |= mask >> 2;
> +    mask |= mask >> 4;
> +    mask |= mask >> 8;
> +    mask |= mask >> 16;
> +#if SIZE_MAX > UINT32_MAX
> +    mask |= mask >> 32;
> +#endif
> +
> +    /* If we need to dynamically allocate buckets we might as well allocate at
> +     * least 4 of them. */
> +    mask |= (mask & 1) << 1;
> +
> +    fast_hmap_init(hmap, mask);
> +}
> +
> +/* Run a thread pool which uses a callback function to process results
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool,
> +                           void *fin_result, void *result_frags,
> +                           void (*helper_func)(struct worker_pool *pool,
> +                                               void *fin_result,
> +                                               void *result_frags, int index))
> +{
> +    int index, completed;
> +
> +    /* Ensure that all worker threads see the same data as the
> +     * main thread.
> +     */
> +
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Start workers */
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        /* Note - we do not loop on semaphore until it reaches
> +         * zero, but on pool size/remaining workers.
> +         * This is by design. If the inner loop can handle
> +         * completion for more than one worker within an iteration
> +         * it will do so to ensure no additional iterations and
> +         * waits once all of them are done.
> +         *
> +         * This may result in us having an initial positive value
> +         * of the semaphore when the pool is invoked the next time.
> +         * This is harmless - the loop will spin up a couple of times
> +         * doing nothing while the workers are processing their data
> +         * slices.
> +         */
> +        sem_wait(pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            /* If the worker has marked its data chunk as complete,
> +             * invoke the helper function to combine the results of
> +             * this worker into the main result.
> +             *
> +             * The worker must invoke an appropriate memory fence
> +             * (most likely acq_rel) to ensure that the main thread
> +             * sees all of the results produced by the worker.
> +             */
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                if (helper_func) {
> +                    (helper_func)(pool, fin_result, result_frags, index);
> +                }
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +/* Run a thread pool - basic, does not do results processing.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool)
> +{
> +    run_pool_callback(pool, NULL, NULL, NULL);
> +}
> +
> +/* Brute force merge of a hashmap into another hashmap.
> + * Intended for use in parallel processing. The destination
> + * hashmap MUST be the same size as the one being merged.
> + *
> + * This can be achieved by pre-allocating them to correct size
> + * and using hmap_insert_fast() instead of hmap_insert()
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> +{
> +    size_t i;
> +
> +    ovs_assert(inc->mask == dest->mask);
> +
> +    if (!inc->n) {
> +        /* Request to merge an empty frag, nothing to do */
> +        return;
> +    }
> +
> +    for (i = 0; i <= dest->mask; i++) {
> +        struct hmap_node **dest_bucket = &dest->buckets[i];
> +        struct hmap_node **inc_bucket = &inc->buckets[i];
> +        if (*inc_bucket != NULL) {
> +            struct hmap_node *last_node = *inc_bucket;
> +            while (last_node->next != NULL) {
> +                last_node = last_node->next;
> +            }
> +            last_node->next = *dest_bucket;
> +            *dest_bucket = *inc_bucket;
> +            *inc_bucket = NULL;
> +        }
> +    }
> +    dest->n += inc->n;
> +    inc->n = 0;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +
> +
> +void ovn_run_pool_hash(
> +        struct worker_pool *pool,
> +        struct hmap *result,
> +        struct hmap *result_frags)
> +{
> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
> +}
> +
> +/* Run a thread pool which gathers results in an array of lists.
> + * Merge results.
> + */
> +void ovn_run_pool_list(
> +        struct worker_pool *pool,
> +        struct ovs_list *result,
> +        struct ovs_list *result_frags)
> +{
> +    run_pool_callback(pool, result, result_frags, merge_list_results);
> +}
> +
> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
> +{
> +    int i;
> +    if (hrl->mask != lflows->mask) {
> +        if (hrl->row_locks) {
> +            free(hrl->row_locks);
> +        }
> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
> +        hrl->mask = lflows->mask;
> +        for (i = 0; i <= lflows->mask; i++) {
> +            ovs_mutex_init(&hrl->row_locks[i]);
> +        }
> +    }
> +}
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> +    int i;
> +    static struct worker_pool *pool;
> +    char sem_name[256];
> +
> +    workers_must_exit = true;
> +
> +    /* All workers must honour the must_exit flag and check for it regularly.
> +     * We can make it atomic and check it via atomics in workers, but that
> +     * is not really necessary as it is set just once - when the program
> +     * terminates. So we use a fence which is invoked before exiting instead.
> +     */
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Wake up the workers after the must_exit flag has been set */
> +
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_post(pool->controls[i].fire);
> +        }
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_close(pool->controls[i].fire);
> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +            sem_unlink(sem_name);
> +        }
> +        sem_close(pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +        sem_unlink(sem_name);
> +    }
> +}
> +
> +static void setup_worker_pools(bool force) {
> +    int cores, nodes;
> +
> +    nodes = ovs_numa_get_n_numas();
> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> +        nodes = 1;
> +    }
> +    cores = ovs_numa_get_n_cores();
> +
> +    /* If there is no NUMA config, use 4 cores.
> +     * If there is NUMA config use half the cores on
> +     * one node so that the OS does not start pushing
> +     * threads to other nodes.
> +     */
> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> +        /* If there is no NUMA we can try the ovs-threads routine.
> +         * It falls back to sysconf and/or affinity mask.
> +         */
> +        cores = count_cpu_cores();
> +        pool_size = cores;
> +    } else {
> +        pool_size = cores / nodes;
> +    }
> +    if ((pool_size < 4) && force) {
> +        pool_size = 4;
> +    }
> +    can_parallelize = (pool_size >= 3);
> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> +    sembase = random_uint32();
> +}
> +
> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index)
> +{
> +    struct ovs_list *result = (struct ovs_list *)fin_result;
> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
> +
> +    if (!ovs_list_is_empty(&res_frags[index])) {
> +        ovs_list_splice(result->next,
> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
> +    }
> +}
> +
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index)
> +{
> +    struct hmap *result = (struct hmap *)fin_result;
> +    struct hmap *res_frags = (struct hmap *)result_frags;
> +
> +    fast_hmap_merge(result, &res_frags[index]);
> +    hmap_destroy(&res_frags[index]);
> +}
> +
> +#endif
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> new file mode 100644
> index 000000000..71ad17fb0
> --- /dev/null
> +++ b/lib/ovn-parallel-hmap.h
> @@ -0,0 +1,285 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVN_PARALLEL_HMAP
> +#define OVN_PARALLEL_HMAP 1
> +
> +/* if the parallel macros are defined by hmap.h or any other ovs define
> + * we skip over the ovn specific definitions.
> + */
> +
> +#ifdef  __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdbool.h>
> +#include <stdlib.h>
> +#include <semaphore.h>
> +#include "openvswitch/util.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovs-atomic.h"
> +
> +/* Process this include only if OVS does not supply parallel definitions
> + */
> +
> +#ifdef OVS_HAS_PARALLEL_HMAP
> +
> +#include "parallel-hmap.h"
> +
> +#else
> +
> +
> +#ifdef __clang__
> +#pragma clang diagnostic push
> +#pragma clang diagnostic ignored "-Wthread-safety"
> +#endif

I think you missed addressing my comment I provided in v13 to
add some comments on why this is required.


> +
> +
> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> + * of parallel processing.
> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
> + * and will iterate hash buckets ThreadID, ThreadID + step,
> + * ThreadID + step * 2, etc. The actual macro accepts
> + * ThreadID + step * i as the JOBID parameter.
> + */
> +
> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> +       || ((NODE = NULL), false); \
> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
> +
> +/* We do not have a SAFE version of the macro, because the hash size is not
> + * atomic and hash removal operations would need to be wrapped with
> + * locks. This will defeat most of the benefits from doing anything in
> + * parallel.
> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> + * each thread should store them in a temporary list result instead, merging
> + * the lists into a combined result at the end */
> +
> +/* Work "Handle" */
> +
> +struct worker_control {
> +    int id; /* Used as a modulo when iterating over a hash. */
> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
> +    struct ovs_mutex mutex; /* Guards the data. */
> +    void *data; /* Pointer to data to be processed. */
> +    void *workload; /* back-pointer to the worker pool structure. */
> +};
> +
> +struct worker_pool {
> +    int size;   /* Number of threads in the pool. */
> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> +    struct worker_control *controls; /* "Handles" in this pool. */
> +    sem_t *done; /* Work completion semaphorew. */
> +};
> +
> +/* Add a worker pool for thread function start() which expects a pointer to
> + * a worker_control structure as an argument. */
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +
> +/* Setting this to true will make all processing threads exit */
> +
> +bool ovn_stop_parallel_processing(void);
> +
> +/* Build a hmap pre-sized for size elements */
> +
> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
> +
> +/* Build a hmap with a mask equals to size */
> +
> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
> +
> +/* Brute-force merge a hmap into hmap.
> + * Dest and inc have to have the same mask. The merge is performed
> + * by extending the element list for bucket N in the dest hmap with the list
> + * from bucket N in inc.
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> +
> +/* Run a pool, without any default processing of results.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool);
> +
> +/* Run a pool, merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_run_pool_hash(struct worker_pool *pool,
> +                       struct hmap *result, struct hmap *result_frags);
> +/* Run a pool, merge results from list frags into a final list result.
> + */
> +
> +void ovn_run_pool_list(struct worker_pool *pool,
> +                       struct ovs_list *result, struct ovs_list *result_frags);
> +
> +/* Run a pool, call a callback function to perform processing of results.
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> +                    void *result_frags,
> +                    void (*helper_func)(struct worker_pool *pool,
> +                        void *fin_result, void *result_frags, int index));
> +
> +
> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> + * would land, or a null pointer if that bucket is empty. */
> +
> +static inline struct hmap_node *
> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> +{
> +    return hmap->buckets[num];
> +}
> +
> +static inline struct hmap_node *
> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
> +{
> +    size_t i;
> +    for (i = start; i <= hmap->mask; i+= pool_size) {
> +        struct hmap_node *node = hmap->buckets[i];
> +        if (node) {
> +            return node;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Returns the first node in 'hmap', as expected by thread with job_id
> + * for parallel processing in arbitrary order, or a null pointer if
> + * the slice of 'hmap' for that job_id is empty. */
> +static inline struct hmap_node *
> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
> +{
> +    return parallel_hmap_next__(hmap, job_id, pool_size);
> +}
> +
> +/* Returns the next node in the slice of 'hmap' following 'node',
> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> + * the 'hmap' slice.
> + *
> + */
> +static inline struct hmap_node *
> +parallel_hmap_next(const struct hmap *hmap,
> +                   const struct hmap_node *node, ssize_t pool_size)
> +{
> +    return (node->next
> +            ? node->next
> +            : parallel_hmap_next__(hmap,
> +                (node->hash & hmap->mask) + pool_size, pool_size));
> +}
> +
> +static inline void post_completed_work(struct worker_control *control)
> +{
> +    atomic_thread_fence(memory_order_acq_rel);
> +    atomic_store_relaxed(&control->finished, true);
> +    sem_post(control->done);
> +}
> +
> +static inline void wait_for_work(struct worker_control *control)
> +{
> +    sem_wait(control->fire);
> +}
> +
> +/* Hash per-row locking support - to be used only in conjunction
> + * with fast hash inserts. Normal hash inserts may resize the hash
> + * rendering the locking invalid.
> + */
> +
> +struct hashrow_locks {
> +    ssize_t mask;
> +    struct ovs_mutex *row_locks;
> +};
> +
> +/* Update an hash row locks structure to match the current hash size */
> +
> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
> +
> +/* Lock a hash row */
> +
> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> +{
> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
> +}
> +
> +/* Unlock a hash row */
> +
> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> +{
> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
> +}
> +/* Init the row locks structure */
> +
> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
> +{
> +    hrl->mask = 0;
> +    hrl->row_locks = NULL;
> +}
> +
> +bool ovn_can_parallelize_hashes(bool force_parallel);
> +
> +/* Use the OVN library functions for stuff which OVS has not defined
> + * If OVS has defined these, they will still compile using the OVN
> + * local names, but will be dropped by the linker in favour of the OVS
> + * supplied functions.
> + */
> +
> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
> +
> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> +
> +#define stop_parallel_processing() ovn_stop_parallel_processing()
> +
> +#define add_worker_pool(start) ovn_add_worker_pool(start)
> +
> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
> +
> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
> +
> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
> +
> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
> +
> +#define ovn_run_pool(pool) ovn_run_pool(pool)
> +
> +#define run_pool_hash(pool, result, result_frags) \
> +    ovn_run_pool_hash(pool, result, result_frags)
> +
> +#define run_pool_list(pool, result, result_frags) \
> +    ovn_run_pool_list(pool, result, result_frags)
> +
> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
> +
> +

In my opinion, we can switch over to making use of OVS APIs for fast_hmap
once the patches in OVS are merged.  Until then I think we should just assume
that these functions are part of OVN lib and consume them directly.

It's possible that the function names could change when those patches
land in OVS.

Thanks
Numan

> +
> +#ifdef __clang__
> +#pragma clang diagnostic pop
> +#endif
> +
> +#endif
> +
> +#ifdef  __cplusplus
> +}
> +#endif
> +
> +
> +#endif /* lib/fasthmap.h */
> --
> 2.20.1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Anton Ivanov Feb. 25, 2021, 1:27 p.m. UTC | #5
On 25/02/2021 12:41, Numan Siddique wrote:
> On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov
> <anton.ivanov@cambridgegreys.com> wrote:
>> This adds a set of functions and macros intended to process
>> hashes in parallel.
>>
>> The principles of operation are documented in the fasthmap.h
>>
>> If these one day go into the OVS tree, the OVS tree versions
>> would be used in preference.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> Hi Anton,
>
> I still see problems with this patch set.
>
> If I apply the first 2 patches and run the tests, most of the test
> cases fail or hang.
>
> When configured with gcc and address sanitizer,  the test case fails -
> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
> localnet ports with same tags (ovn.at:2970): FAILED
> (ovs-macros.at:219) and I see the below in the testsuite.log
>
> *********
> clean up OVN
> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> ovn.at:3091: wait succeeded immediately
> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> ovn.at:3091: wait succeeded immediately
> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> ovn.at:3091: wait succeeded after 1 seconds
> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> ovn.at:3091: wait succeeded quickly
>
> main: clean up vswitch
> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid
> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit --cleanup
> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> ovn.at:3091: wait succeeded quickly
> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> ovn.at:3091: wait succeeded immediately
> Address Sanitizer reported errors in: asan.2986645
> =================================================================
> ==2986645==ERROR: AddressSanitizer: SEGV on unknown address
> 0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80
> T2)
> ==2986645==The signal is caused by a READ memory access.
> ../../tests/ovs-macros.at:219: hard failure
> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
> localnet ports with same tags (ovn.at:2970): FAILED
> (ovs-macros.at:219)
>
> ******
>
> The same is observed for many test cases.
>
> When I configure clang and run the tests, all the tests pass, but I
> see lot of coredumps.
> I think I had reported this earlier too.

I have absolutely no idea how you get this. Seriously.

I have been unable to reproduce this

The only thing that comes to mind is to "fuzz" the semaphore creation and introduce random failures there to see if the error logic works. I can't see any logical problems in it via code inspection.

>
> Here is the backtrace
> -----
> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
> /lib64/libpthread.so.0
> [Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))]
> Missing separate debuginfos, use: dnf debuginfo-install
> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
> zlib-1.2.11-23.fc33.x86_64
> (gdb) bt
> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
> /lib64/libpthread.so.0
> #1  0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at
> ../lib/ovn-parallel-hmap.h:193
> #2  build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807
> #3  0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at
> ../lib/ovs-thread.c:383
> #4  0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0
> #5  0x00001528ad00e903 in clone () from /lib64/libc.so.6
> ----
>
>
> I'm sorry but something is not right. Instead of using semaphores, why
> can't we use
> 'struct latch' for each worker and use it to synchronize between the main thread
> and the workers ?

We can't because this will conflict with the main poll loop.

You need to rewrite the entire northd processing logic and the IO logic to use latch here. Or establish parallel logic - part of the joy of using "thead once" to do poll magic.

None of these are a realistic option. I'd rather try to understand exactly what happens on your setup and what makes a sem_open() fail and why that is not handled properly by the error checking code.

Can you send the results of "sysctl kernel.sem" please? That's the only way I know to limit semaphores and the usual limits on Linux are in the 1G range.

>
> The usage of the function - can_parallelize_hashes() in ovn-northd.c is very
> confusing to me.
>
> I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only once
> and for the subsequent calls, this function will be a no-op (due to
> atomic_compare_exchange_strong()).
>
> Since this function is called before the workers are started, what is
> the need to use
> atomic_compare_exchange_strong() ?

We have ended there over time. There was a point where there was no option and no initial invocation wasn't and we were invoking the set-up on first processing.

In fact that is how it is invoked in the OVS codebase port for the parallelized monitors. I would actually like to keep this as an option.

>
> Let's say we want to add another config option -
> force_northd_parallel, and if the user
> toggles the value between the runs, then the below code in
> ovnnb_db_run() in patch 3
>
> ---
> use_parallel_build = smap_get_bool(&nb->options,
> "use_parallel_build", false) &&
> ovn_can_parallelize_hashes(false);
> --
>   will have no effect since setup_worker_pools() is not called later
> once the atomic
> bool initial_pool_setup is set to true.
>
> I think we should provide the option to toggle this value at run time.
> Also the patch 3 should
> add an option to configure the force_parallel.
>
>> ---
>>   lib/automake.mk         |   2 +
>>   lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>>   lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>>   3 files changed, 742 insertions(+)
>>   create mode 100644 lib/ovn-parallel-hmap.c
>>   create mode 100644 lib/ovn-parallel-hmap.h
>>
>> diff --git a/lib/automake.mk b/lib/automake.mk
>> index 250c7aefa..781be2109 100644
>> --- a/lib/automake.mk
>> +++ b/lib/automake.mk
>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>          lib/expr.c \
>>          lib/extend-table.h \
>>          lib/extend-table.c \
>> +       lib/ovn-parallel-hmap.h \
>> +       lib/ovn-parallel-hmap.c \
>>          lib/ip-mcast-index.c \
>>          lib/ip-mcast-index.h \
>>          lib/mcast-group-index.c \
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> new file mode 100644
>> index 000000000..06aa95aba
>> --- /dev/null
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -0,0 +1,455 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +#include <stdint.h>
>> +#include <string.h>
>> +#include <stdlib.h>
>> +#include <fcntl.h>
>> +#include <unistd.h>
>> +#include <errno.h>
>> +#include <semaphore.h>
>> +#include "fatal-signal.h"
>> +#include "util.h"
>> +#include "openvswitch/vlog.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovn-parallel-hmap.h"
>> +#include "ovs-atomic.h"
>> +#include "ovs-thread.h"
>> +#include "ovs-numa.h"
>> +#include "random.h"
>> +
>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>> +
>> +#ifndef OVS_HAS_PARALLEL_HMAP
>> +
>> +#define WORKER_SEM_NAME "%x-%p-%x"
>> +#define MAIN_SEM_NAME "%x-%p-main"
>> +
>> +/* These are accessed under mutex inside add_worker_pool().
>> + * They do not need to be atomic.
>> + */
>> +
>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>> +static bool can_parallelize = false;
>> +
>> +/* This is set only in the process of exit and the set is
>> + * accompanied by a fence. It does not need to be atomic or be
>> + * accessed under a lock.
>> + */
>> +
>> +static bool workers_must_exit = false;
>> +
>> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>> +
>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>> +
>> +static int pool_size;
>> +
>> +static int sembase;
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>> +static void setup_worker_pools(bool force);
>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index);
>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index);
>> +
>> +bool ovn_stop_parallel_processing(void)
>> +{
>> +    return workers_must_exit;
>> +}
>> +
>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>> +{
>> +    bool test = false;
>> +
>> +    if (atomic_compare_exchange_strong(
>> +            &initial_pool_setup,
>> +            &test,
>> +            true)) {
>> +        ovs_mutex_lock(&init_mutex);
>> +        setup_worker_pools(force_parallel);
>> +        ovs_mutex_unlock(&init_mutex);
>> +    }
>> +    return can_parallelize;
>> +}
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>> +
>> +    struct worker_pool *new_pool = NULL;
>> +    struct worker_control *new_control;
>> +    bool test = false;
>> +    int i;
>> +    char sem_name[256];
>> +
>> +
>> +    /* Belt and braces - initialize the pool system just in case if
>> +     * if it is not yet initialized.
>> +     */
>> +
>> +    if (atomic_compare_exchange_strong(
>> +            &initial_pool_setup,
>> +            &test,
>> +            true)) {
>> +        ovs_mutex_lock(&init_mutex);
>> +        setup_worker_pools(false);
>> +        ovs_mutex_unlock(&init_mutex);
>> +    }
>> +
>> +    ovs_mutex_lock(&init_mutex);
>> +    if (can_parallelize) {
>> +        new_pool = xmalloc(sizeof(struct worker_pool));
>> +        new_pool->size = pool_size;
>> +        new_pool->controls = NULL;
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +        if (new_pool->done == SEM_FAILED) {
>> +            goto cleanup;
>> +        }
>> +
>> +        new_pool->controls =
>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>> +
>> +        for (i = 0; i < new_pool->size; i++) {
>> +            new_control = &new_pool->controls[i];
>> +            new_control->id = i;
>> +            new_control->done = new_pool->done;
>> +            new_control->data = NULL;
>> +            ovs_mutex_init(&new_control->mutex);
>> +            new_control->finished = ATOMIC_VAR_INIT(false);
>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +            if (new_control->fire == SEM_FAILED) {
>> +                goto cleanup;
>> +            }
>> +        }
>> +
>> +        for (i = 0; i < pool_size; i++) {
>> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>> +        }
>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>> +    }
>> +    ovs_mutex_unlock(&init_mutex);
>> +    return new_pool;
>> +cleanup:
>> +
>> +    /* Something went wrong when opening semaphores. In this case
>> +     * it is better to shut off parallel procesing altogether
>> +     */
>> +
>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>> +    can_parallelize = false;
>> +    if (new_pool->controls) {
>> +        for (i = 0; i < new_pool->size; i++) {
>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
>> +                sem_close(new_pool->controls[i].fire);
>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> +                sem_unlink(sem_name);
>> +                break; /* semaphores past this one are uninitialized */
>> +            }
>> +        }
>> +    }
>> +    if (new_pool->done != SEM_FAILED) {
>> +        sem_close(new_pool->done);
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> +        sem_unlink(sem_name);
>> +    }
>> +    ovs_mutex_unlock(&init_mutex);
>> +    return NULL;
>> +}
>> +
>> +
>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>> +void
>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>> +{
>> +    size_t i;
>> +
>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>> +    hmap->one = NULL;
>> +    hmap->mask = mask;
>> +    hmap->n = 0;
>> +    for (i = 0; i <= hmap->mask; i++) {
>> +        hmap->buckets[i] = NULL;
>> +    }
>> +}
>> +
>> +/* Initializes 'hmap' as an empty hash table of size X.
>> + * Intended for use in parallel processing so that all
>> + * fragments used to store results in a parallel job
>> + * are the same size.
>> + */
>> +void
>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>> +{
>> +    size_t mask;
>> +    mask = size / 2;
>> +    mask |= mask >> 1;
>> +    mask |= mask >> 2;
>> +    mask |= mask >> 4;
>> +    mask |= mask >> 8;
>> +    mask |= mask >> 16;
>> +#if SIZE_MAX > UINT32_MAX
>> +    mask |= mask >> 32;
>> +#endif
>> +
>> +    /* If we need to dynamically allocate buckets we might as well allocate at
>> +     * least 4 of them. */
>> +    mask |= (mask & 1) << 1;
>> +
>> +    fast_hmap_init(hmap, mask);
>> +}
>> +
>> +/* Run a thread pool which uses a callback function to process results
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool,
>> +                           void *fin_result, void *result_frags,
>> +                           void (*helper_func)(struct worker_pool *pool,
>> +                                               void *fin_result,
>> +                                               void *result_frags, int index))
>> +{
>> +    int index, completed;
>> +
>> +    /* Ensure that all worker threads see the same data as the
>> +     * main thread.
>> +     */
>> +
>> +    atomic_thread_fence(memory_order_acq_rel);
>> +
>> +    /* Start workers */
>> +
>> +    for (index = 0; index < pool->size; index++) {
>> +        sem_post(pool->controls[index].fire);
>> +    }
>> +
>> +    completed = 0;
>> +
>> +    do {
>> +        bool test;
>> +        /* Note - we do not loop on semaphore until it reaches
>> +         * zero, but on pool size/remaining workers.
>> +         * This is by design. If the inner loop can handle
>> +         * completion for more than one worker within an iteration
>> +         * it will do so to ensure no additional iterations and
>> +         * waits once all of them are done.
>> +         *
>> +         * This may result in us having an initial positive value
>> +         * of the semaphore when the pool is invoked the next time.
>> +         * This is harmless - the loop will spin up a couple of times
>> +         * doing nothing while the workers are processing their data
>> +         * slices.
>> +         */
>> +        sem_wait(pool->done);
>> +        for (index = 0; index < pool->size; index++) {
>> +            test = true;
>> +            /* If the worker has marked its data chunk as complete,
>> +             * invoke the helper function to combine the results of
>> +             * this worker into the main result.
>> +             *
>> +             * The worker must invoke an appropriate memory fence
>> +             * (most likely acq_rel) to ensure that the main thread
>> +             * sees all of the results produced by the worker.
>> +             */
>> +            if (atomic_compare_exchange_weak(
>> +                    &pool->controls[index].finished,
>> +                    &test,
>> +                    false)) {
>> +                if (helper_func) {
>> +                    (helper_func)(pool, fin_result, result_frags, index);
>> +                }
>> +                completed++;
>> +                pool->controls[index].data = NULL;
>> +            }
>> +        }
>> +    } while (completed < pool->size);
>> +}
>> +
>> +/* Run a thread pool - basic, does not do results processing.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool)
>> +{
>> +    run_pool_callback(pool, NULL, NULL, NULL);
>> +}
>> +
>> +/* Brute force merge of a hashmap into another hashmap.
>> + * Intended for use in parallel processing. The destination
>> + * hashmap MUST be the same size as the one being merged.
>> + *
>> + * This can be achieved by pre-allocating them to correct size
>> + * and using hmap_insert_fast() instead of hmap_insert()
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>> +{
>> +    size_t i;
>> +
>> +    ovs_assert(inc->mask == dest->mask);
>> +
>> +    if (!inc->n) {
>> +        /* Request to merge an empty frag, nothing to do */
>> +        return;
>> +    }
>> +
>> +    for (i = 0; i <= dest->mask; i++) {
>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>> +        if (*inc_bucket != NULL) {
>> +            struct hmap_node *last_node = *inc_bucket;
>> +            while (last_node->next != NULL) {
>> +                last_node = last_node->next;
>> +            }
>> +            last_node->next = *dest_bucket;
>> +            *dest_bucket = *inc_bucket;
>> +            *inc_bucket = NULL;
>> +        }
>> +    }
>> +    dest->n += inc->n;
>> +    inc->n = 0;
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array
>> + * of hashes. Merge results.
>> + */
>> +
>> +
>> +void ovn_run_pool_hash(
>> +        struct worker_pool *pool,
>> +        struct hmap *result,
>> +        struct hmap *result_frags)
>> +{
>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array of lists.
>> + * Merge results.
>> + */
>> +void ovn_run_pool_list(
>> +        struct worker_pool *pool,
>> +        struct ovs_list *result,
>> +        struct ovs_list *result_frags)
>> +{
>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
>> +}
>> +
>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>> +{
>> +    int i;
>> +    if (hrl->mask != lflows->mask) {
>> +        if (hrl->row_locks) {
>> +            free(hrl->row_locks);
>> +        }
>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
>> +        hrl->mask = lflows->mask;
>> +        for (i = 0; i <= lflows->mask; i++) {
>> +            ovs_mutex_init(&hrl->row_locks[i]);
>> +        }
>> +    }
>> +}
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>> +    int i;
>> +    static struct worker_pool *pool;
>> +    char sem_name[256];
>> +
>> +    workers_must_exit = true;
>> +
>> +    /* All workers must honour the must_exit flag and check for it regularly.
>> +     * We can make it atomic and check it via atomics in workers, but that
>> +     * is not really necessary as it is set just once - when the program
>> +     * terminates. So we use a fence which is invoked before exiting instead.
>> +     */
>> +    atomic_thread_fence(memory_order_acq_rel);
>> +
>> +    /* Wake up the workers after the must_exit flag has been set */
>> +
>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> +        for (i = 0; i < pool->size ; i++) {
>> +            sem_post(pool->controls[i].fire);
>> +        }
>> +        for (i = 0; i < pool->size ; i++) {
>> +            sem_close(pool->controls[i].fire);
>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> +            sem_unlink(sem_name);
>> +        }
>> +        sem_close(pool->done);
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> +        sem_unlink(sem_name);
>> +    }
>> +}
>> +
>> +static void setup_worker_pools(bool force) {
>> +    int cores, nodes;
>> +
>> +    nodes = ovs_numa_get_n_numas();
>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>> +        nodes = 1;
>> +    }
>> +    cores = ovs_numa_get_n_cores();
>> +
>> +    /* If there is no NUMA config, use 4 cores.
>> +     * If there is NUMA config use half the cores on
>> +     * one node so that the OS does not start pushing
>> +     * threads to other nodes.
>> +     */
>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>> +        /* If there is no NUMA we can try the ovs-threads routine.
>> +         * It falls back to sysconf and/or affinity mask.
>> +         */
>> +        cores = count_cpu_cores();
>> +        pool_size = cores;
>> +    } else {
>> +        pool_size = cores / nodes;
>> +    }
>> +    if ((pool_size < 4) && force) {
>> +        pool_size = 4;
>> +    }
>> +    can_parallelize = (pool_size >= 3);
>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>> +    sembase = random_uint32();
>> +}
>> +
>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index)
>> +{
>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>> +
>> +    if (!ovs_list_is_empty(&res_frags[index])) {
>> +        ovs_list_splice(result->next,
>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
>> +    }
>> +}
>> +
>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>> +                               void *fin_result, void *result_frags,
>> +                               int index)
>> +{
>> +    struct hmap *result = (struct hmap *)fin_result;
>> +    struct hmap *res_frags = (struct hmap *)result_frags;
>> +
>> +    fast_hmap_merge(result, &res_frags[index]);
>> +    hmap_destroy(&res_frags[index]);
>> +}
>> +
>> +#endif
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> new file mode 100644
>> index 000000000..71ad17fb0
>> --- /dev/null
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -0,0 +1,285 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#ifndef OVN_PARALLEL_HMAP
>> +#define OVN_PARALLEL_HMAP 1
>> +
>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>> + * we skip over the ovn specific definitions.
>> + */
>> +
>> +#ifdef  __cplusplus
>> +extern "C" {
>> +#endif
>> +
>> +#include <stdbool.h>
>> +#include <stdlib.h>
>> +#include <semaphore.h>
>> +#include "openvswitch/util.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovs-atomic.h"
>> +
>> +/* Process this include only if OVS does not supply parallel definitions
>> + */
>> +
>> +#ifdef OVS_HAS_PARALLEL_HMAP
>> +
>> +#include "parallel-hmap.h"
>> +
>> +#else
>> +
>> +
>> +#ifdef __clang__
>> +#pragma clang diagnostic push
>> +#pragma clang diagnostic ignored "-Wthread-safety"
>> +#endif
> I think you missed addressing my comment I provided in v13 to
> add some comments on why this is required.
>
>
>> +
>> +
>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>> + * of parallel processing.
>> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>> + * ThreadID + step * 2, etc. The actual macro accepts
>> + * ThreadID + step * i as the JOBID parameter.
>> + */
>> +
>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>> +       || ((NODE = NULL), false); \
>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
>> +
>> +/* We do not have a SAFE version of the macro, because the hash size is not
>> + * atomic and hash removal operations would need to be wrapped with
>> + * locks. This will defeat most of the benefits from doing anything in
>> + * parallel.
>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
>> + * each thread should store them in a temporary list result instead, merging
>> + * the lists into a combined result at the end */
>> +
>> +/* Work "Handle" */
>> +
>> +struct worker_control {
>> +    int id; /* Used as a modulo when iterating over a hash. */
>> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
>> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
>> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
>> +    struct ovs_mutex mutex; /* Guards the data. */
>> +    void *data; /* Pointer to data to be processed. */
>> +    void *workload; /* back-pointer to the worker pool structure. */
>> +};
>> +
>> +struct worker_pool {
>> +    int size;   /* Number of threads in the pool. */
>> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>> +    struct worker_control *controls; /* "Handles" in this pool. */
>> +    sem_t *done; /* Work completion semaphorew. */
>> +};
>> +
>> +/* Add a worker pool for thread function start() which expects a pointer to
>> + * a worker_control structure as an argument. */
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>> +
>> +/* Setting this to true will make all processing threads exit */
>> +
>> +bool ovn_stop_parallel_processing(void);
>> +
>> +/* Build a hmap pre-sized for size elements */
>> +
>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>> +
>> +/* Build a hmap with a mask equals to size */
>> +
>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>> +
>> +/* Brute-force merge a hmap into hmap.
>> + * Dest and inc have to have the same mask. The merge is performed
>> + * by extending the element list for bucket N in the dest hmap with the list
>> + * from bucket N in inc.
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>> +
>> +/* Run a pool, without any default processing of results.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool);
>> +
>> +/* Run a pool, merge results from hash frags into a final hash result.
>> + * The hash frags must be pre-sized to the same size.
>> + */
>> +
>> +void ovn_run_pool_hash(struct worker_pool *pool,
>> +                       struct hmap *result, struct hmap *result_frags);
>> +/* Run a pool, merge results from list frags into a final list result.
>> + */
>> +
>> +void ovn_run_pool_list(struct worker_pool *pool,
>> +                       struct ovs_list *result, struct ovs_list *result_frags);
>> +
>> +/* Run a pool, call a callback function to perform processing of results.
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>> +                    void *result_frags,
>> +                    void (*helper_func)(struct worker_pool *pool,
>> +                        void *fin_result, void *result_frags, int index));
>> +
>> +
>> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>> + * would land, or a null pointer if that bucket is empty. */
>> +
>> +static inline struct hmap_node *
>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>> +{
>> +    return hmap->buckets[num];
>> +}
>> +
>> +static inline struct hmap_node *
>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
>> +{
>> +    size_t i;
>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>> +        struct hmap_node *node = hmap->buckets[i];
>> +        if (node) {
>> +            return node;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>> + * for parallel processing in arbitrary order, or a null pointer if
>> + * the slice of 'hmap' for that job_id is empty. */
>> +static inline struct hmap_node *
>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
>> +{
>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>> +}
>> +
>> +/* Returns the next node in the slice of 'hmap' following 'node',
>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
>> + * the 'hmap' slice.
>> + *
>> + */
>> +static inline struct hmap_node *
>> +parallel_hmap_next(const struct hmap *hmap,
>> +                   const struct hmap_node *node, ssize_t pool_size)
>> +{
>> +    return (node->next
>> +            ? node->next
>> +            : parallel_hmap_next__(hmap,
>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>> +}
>> +
>> +static inline void post_completed_work(struct worker_control *control)
>> +{
>> +    atomic_thread_fence(memory_order_acq_rel);
>> +    atomic_store_relaxed(&control->finished, true);
>> +    sem_post(control->done);
>> +}
>> +
>> +static inline void wait_for_work(struct worker_control *control)
>> +{
>> +    sem_wait(control->fire);
>> +}
>> +
>> +/* Hash per-row locking support - to be used only in conjunction
>> + * with fast hash inserts. Normal hash inserts may resize the hash
>> + * rendering the locking invalid.
>> + */
>> +
>> +struct hashrow_locks {
>> +    ssize_t mask;
>> +    struct ovs_mutex *row_locks;
>> +};
>> +
>> +/* Update an hash row locks structure to match the current hash size */
>> +
>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
>> +
>> +/* Lock a hash row */
>> +
>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>> +{
>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>> +}
>> +
>> +/* Unlock a hash row */
>> +
>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>> +{
>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>> +}
>> +/* Init the row locks structure */
>> +
>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>> +{
>> +    hrl->mask = 0;
>> +    hrl->row_locks = NULL;
>> +}
>> +
>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>> +
>> +/* Use the OVN library functions for stuff which OVS has not defined
>> + * If OVS has defined these, they will still compile using the OVN
>> + * local names, but will be dropped by the linker in favour of the OVS
>> + * supplied functions.
>> + */
>> +
>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>> +
>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>> +
>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>> +
>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>> +
>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
>> +
>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>> +
>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>> +
>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>> +
>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>> +
>> +#define run_pool_hash(pool, result, result_frags) \
>> +    ovn_run_pool_hash(pool, result, result_frags)
>> +
>> +#define run_pool_list(pool, result, result_frags) \
>> +    ovn_run_pool_list(pool, result, result_frags)
>> +
>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>> +
>> +
> In my opinion, we can switch over to making use of OVS APIs for fast_hmap
> once the patches in OVS are merged.  Until then I think we should just assume
> that these functions are part of OVN lib and consume them directly.
>
> It's possible that the function names could change when those patches
> land in OVS.
>
> Thanks
> Numan
>
>> +
>> +#ifdef __clang__
>> +#pragma clang diagnostic pop
>> +#endif
>> +
>> +#endif
>> +
>> +#ifdef  __cplusplus
>> +}
>> +#endif
>> +
>> +
>> +#endif /* lib/fasthmap.h */
>> --
>> 2.20.1
>>
>> _______________________________________________
>> dev mailing list
>> dev@openvswitch.org
>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>
Anton Ivanov Feb. 25, 2021, 2:03 p.m. UTC | #6
Found the most likely culprit.

This is similar to this: https://bugzilla.redhat.com/show_bug.cgi?id=663584
this: https://bugzilla.redhat.com/show_bug.cgi?id=1554955
and god knows how many others.

Selinux is "securing" your semaphores.

A.

On 25/02/2021 13:27, Anton Ivanov wrote:
> 
> On 25/02/2021 12:41, Numan Siddique wrote:
>> On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov
>> <anton.ivanov@cambridgegreys.com> wrote:
>>> This adds a set of functions and macros intended to process
>>> hashes in parallel.
>>>
>>> The principles of operation are documented in the fasthmap.h
>>>
>>> If these one day go into the OVS tree, the OVS tree versions
>>> would be used in preference.
>>>
>>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> Hi Anton,
>>
>> I still see problems with this patch set.
>>
>> If I apply the first 2 patches and run the tests, most of the test
>> cases fail or hang.
>>
>> When configured with gcc and address sanitizer,  the test case fails -
>> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
>> localnet ports with same tags (ovn.at:2970): FAILED
>> (ovs-macros.at:219) and I see the below in the testsuite.log
>>
>> *********
>> clean up OVN
>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>> ovn.at:3091: wait succeeded immediately
>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>> ovn.at:3091: wait succeeded immediately
>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>> ovn.at:3091: wait succeeded after 1 seconds
>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>> ovn.at:3091: wait succeeded quickly
>>
>> main: clean up vswitch
>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid
>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit --cleanup
>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>> ovn.at:3091: wait succeeded quickly
>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>> ovn.at:3091: wait succeeded immediately
>> Address Sanitizer reported errors in: asan.2986645
>> =================================================================
>> ==2986645==ERROR: AddressSanitizer: SEGV on unknown address
>> 0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80
>> T2)
>> ==2986645==The signal is caused by a READ memory access.
>> ../../tests/ovs-macros.at:219: hard failure
>> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
>> localnet ports with same tags (ovn.at:2970): FAILED
>> (ovs-macros.at:219)
>>
>> ******
>>
>> The same is observed for many test cases.
>>
>> When I configure clang and run the tests, all the tests pass, but I
>> see lot of coredumps.
>> I think I had reported this earlier too.
> 
> I have absolutely no idea how you get this. Seriously.
> 
> I have been unable to reproduce this
> 
> The only thing that comes to mind is to "fuzz" the semaphore creation and introduce random failures there to see if the error logic works. I can't see any logical problems in it via code inspection.
> 
>>
>> Here is the backtrace
>> -----
>> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
>> /lib64/libpthread.so.0
>> [Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))]
>> Missing separate debuginfos, use: dnf debuginfo-install
>> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
>> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
>> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
>> zlib-1.2.11-23.fc33.x86_64
>> (gdb) bt
>> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
>> /lib64/libpthread.so.0
>> #1  0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at
>> ../lib/ovn-parallel-hmap.h:193
>> #2  build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807
>> #3  0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at
>> ../lib/ovs-thread.c:383
>> #4  0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0
>> #5  0x00001528ad00e903 in clone () from /lib64/libc.so.6
>> ----
>>
>>
>> I'm sorry but something is not right. Instead of using semaphores, why
>> can't we use
>> 'struct latch' for each worker and use it to synchronize between the main thread
>> and the workers ?
> 
> We can't because this will conflict with the main poll loop.
> 
> You need to rewrite the entire northd processing logic and the IO logic to use latch here. Or establish parallel logic - part of the joy of using "thead once" to do poll magic.
> 
> None of these are a realistic option. I'd rather try to understand exactly what happens on your setup and what makes a sem_open() fail and why that is not handled properly by the error checking code.
> 
> Can you send the results of "sysctl kernel.sem" please? That's the only way I know to limit semaphores and the usual limits on Linux are in the 1G range.
> 
>>
>> The usage of the function - can_parallelize_hashes() in ovn-northd.c is very
>> confusing to me.
>>
>> I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only once
>> and for the subsequent calls, this function will be a no-op (due to
>> atomic_compare_exchange_strong()).
>>
>> Since this function is called before the workers are started, what is
>> the need to use
>> atomic_compare_exchange_strong() ?
> 
> We have ended there over time. There was a point where there was no option and no initial invocation wasn't and we were invoking the set-up on first processing.
> 
> In fact that is how it is invoked in the OVS codebase port for the parallelized monitors. I would actually like to keep this as an option.
> 
>>
>> Let's say we want to add another config option -
>> force_northd_parallel, and if the user
>> toggles the value between the runs, then the below code in
>> ovnnb_db_run() in patch 3
>>
>> ---
>> use_parallel_build = smap_get_bool(&nb->options,
>> "use_parallel_build", false) &&
>> ovn_can_parallelize_hashes(false);
>> -- 
>>   will have no effect since setup_worker_pools() is not called later
>> once the atomic
>> bool initial_pool_setup is set to true.
>>
>> I think we should provide the option to toggle this value at run time.
>> Also the patch 3 should
>> add an option to configure the force_parallel.
>>
>>> ---
>>>   lib/automake.mk         |   2 +
>>>   lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>>>   lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>>>   3 files changed, 742 insertions(+)
>>>   create mode 100644 lib/ovn-parallel-hmap.c
>>>   create mode 100644 lib/ovn-parallel-hmap.h
>>>
>>> diff --git a/lib/automake.mk b/lib/automake.mk
>>> index 250c7aefa..781be2109 100644
>>> --- a/lib/automake.mk
>>> +++ b/lib/automake.mk
>>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>>          lib/expr.c \
>>>          lib/extend-table.h \
>>>          lib/extend-table.c \
>>> +       lib/ovn-parallel-hmap.h \
>>> +       lib/ovn-parallel-hmap.c \
>>>          lib/ip-mcast-index.c \
>>>          lib/ip-mcast-index.h \
>>>          lib/mcast-group-index.c \
>>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>>> new file mode 100644
>>> index 000000000..06aa95aba
>>> --- /dev/null
>>> +++ b/lib/ovn-parallel-hmap.c
>>> @@ -0,0 +1,455 @@
>>> +/*
>>> + * Copyright (c) 2020 Red Hat, Inc.
>>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#include <config.h>
>>> +#include <stdint.h>
>>> +#include <string.h>
>>> +#include <stdlib.h>
>>> +#include <fcntl.h>
>>> +#include <unistd.h>
>>> +#include <errno.h>
>>> +#include <semaphore.h>
>>> +#include "fatal-signal.h"
>>> +#include "util.h"
>>> +#include "openvswitch/vlog.h"
>>> +#include "openvswitch/hmap.h"
>>> +#include "openvswitch/thread.h"
>>> +#include "ovn-parallel-hmap.h"
>>> +#include "ovs-atomic.h"
>>> +#include "ovs-thread.h"
>>> +#include "ovs-numa.h"
>>> +#include "random.h"
>>> +
>>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>> +
>>> +#ifndef OVS_HAS_PARALLEL_HMAP
>>> +
>>> +#define WORKER_SEM_NAME "%x-%p-%x"
>>> +#define MAIN_SEM_NAME "%x-%p-main"
>>> +
>>> +/* These are accessed under mutex inside add_worker_pool().
>>> + * They do not need to be atomic.
>>> + */
>>> +
>>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>>> +static bool can_parallelize = false;
>>> +
>>> +/* This is set only in the process of exit and the set is
>>> + * accompanied by a fence. It does not need to be atomic or be
>>> + * accessed under a lock.
>>> + */
>>> +
>>> +static bool workers_must_exit = false;
>>> +
>>> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>>> +
>>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>>> +
>>> +static int pool_size;
>>> +
>>> +static int sembase;
>>> +
>>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>>> +static void setup_worker_pools(bool force);
>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>> +                               void *fin_result, void *result_frags,
>>> +                               int index);
>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>> +                               void *fin_result, void *result_frags,
>>> +                               int index);
>>> +
>>> +bool ovn_stop_parallel_processing(void)
>>> +{
>>> +    return workers_must_exit;
>>> +}
>>> +
>>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>>> +{
>>> +    bool test = false;
>>> +
>>> +    if (atomic_compare_exchange_strong(
>>> +            &initial_pool_setup,
>>> +            &test,
>>> +            true)) {
>>> +        ovs_mutex_lock(&init_mutex);
>>> +        setup_worker_pools(force_parallel);
>>> +        ovs_mutex_unlock(&init_mutex);
>>> +    }
>>> +    return can_parallelize;
>>> +}
>>> +
>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>>> +
>>> +    struct worker_pool *new_pool = NULL;
>>> +    struct worker_control *new_control;
>>> +    bool test = false;
>>> +    int i;
>>> +    char sem_name[256];
>>> +
>>> +
>>> +    /* Belt and braces - initialize the pool system just in case if
>>> +     * if it is not yet initialized.
>>> +     */
>>> +
>>> +    if (atomic_compare_exchange_strong(
>>> +            &initial_pool_setup,
>>> +            &test,
>>> +            true)) {
>>> +        ovs_mutex_lock(&init_mutex);
>>> +        setup_worker_pools(false);
>>> +        ovs_mutex_unlock(&init_mutex);
>>> +    }
>>> +
>>> +    ovs_mutex_lock(&init_mutex);
>>> +    if (can_parallelize) {
>>> +        new_pool = xmalloc(sizeof(struct worker_pool));
>>> +        new_pool->size = pool_size;
>>> +        new_pool->controls = NULL;
>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>> +        if (new_pool->done == SEM_FAILED) {
>>> +            goto cleanup;
>>> +        }
>>> +
>>> +        new_pool->controls =
>>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>>> +
>>> +        for (i = 0; i < new_pool->size; i++) {
>>> +            new_control = &new_pool->controls[i];
>>> +            new_control->id = i;
>>> +            new_control->done = new_pool->done;
>>> +            new_control->data = NULL;
>>> +            ovs_mutex_init(&new_control->mutex);
>>> +            new_control->finished = ATOMIC_VAR_INIT(false);
>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>> +            if (new_control->fire == SEM_FAILED) {
>>> +                goto cleanup;
>>> +            }
>>> +        }
>>> +
>>> +        for (i = 0; i < pool_size; i++) {
>>> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>>> +        }
>>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>> +    }
>>> +    ovs_mutex_unlock(&init_mutex);
>>> +    return new_pool;
>>> +cleanup:
>>> +
>>> +    /* Something went wrong when opening semaphores. In this case
>>> +     * it is better to shut off parallel procesing altogether
>>> +     */
>>> +
>>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>>> +    can_parallelize = false;
>>> +    if (new_pool->controls) {
>>> +        for (i = 0; i < new_pool->size; i++) {
>>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
>>> +                sem_close(new_pool->controls[i].fire);
>>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>> +                sem_unlink(sem_name);
>>> +                break; /* semaphores past this one are uninitialized */
>>> +            }
>>> +        }
>>> +    }
>>> +    if (new_pool->done != SEM_FAILED) {
>>> +        sem_close(new_pool->done);
>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>> +        sem_unlink(sem_name);
>>> +    }
>>> +    ovs_mutex_unlock(&init_mutex);
>>> +    return NULL;
>>> +}
>>> +
>>> +
>>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>>> +void
>>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>>> +{
>>> +    size_t i;
>>> +
>>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>>> +    hmap->one = NULL;
>>> +    hmap->mask = mask;
>>> +    hmap->n = 0;
>>> +    for (i = 0; i <= hmap->mask; i++) {
>>> +        hmap->buckets[i] = NULL;
>>> +    }
>>> +}
>>> +
>>> +/* Initializes 'hmap' as an empty hash table of size X.
>>> + * Intended for use in parallel processing so that all
>>> + * fragments used to store results in a parallel job
>>> + * are the same size.
>>> + */
>>> +void
>>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>>> +{
>>> +    size_t mask;
>>> +    mask = size / 2;
>>> +    mask |= mask >> 1;
>>> +    mask |= mask >> 2;
>>> +    mask |= mask >> 4;
>>> +    mask |= mask >> 8;
>>> +    mask |= mask >> 16;
>>> +#if SIZE_MAX > UINT32_MAX
>>> +    mask |= mask >> 32;
>>> +#endif
>>> +
>>> +    /* If we need to dynamically allocate buckets we might as well allocate at
>>> +     * least 4 of them. */
>>> +    mask |= (mask & 1) << 1;
>>> +
>>> +    fast_hmap_init(hmap, mask);
>>> +}
>>> +
>>> +/* Run a thread pool which uses a callback function to process results
>>> + */
>>> +
>>> +void ovn_run_pool_callback(struct worker_pool *pool,
>>> +                           void *fin_result, void *result_frags,
>>> +                           void (*helper_func)(struct worker_pool *pool,
>>> +                                               void *fin_result,
>>> +                                               void *result_frags, int index))
>>> +{
>>> +    int index, completed;
>>> +
>>> +    /* Ensure that all worker threads see the same data as the
>>> +     * main thread.
>>> +     */
>>> +
>>> +    atomic_thread_fence(memory_order_acq_rel);
>>> +
>>> +    /* Start workers */
>>> +
>>> +    for (index = 0; index < pool->size; index++) {
>>> +        sem_post(pool->controls[index].fire);
>>> +    }
>>> +
>>> +    completed = 0;
>>> +
>>> +    do {
>>> +        bool test;
>>> +        /* Note - we do not loop on semaphore until it reaches
>>> +         * zero, but on pool size/remaining workers.
>>> +         * This is by design. If the inner loop can handle
>>> +         * completion for more than one worker within an iteration
>>> +         * it will do so to ensure no additional iterations and
>>> +         * waits once all of them are done.
>>> +         *
>>> +         * This may result in us having an initial positive value
>>> +         * of the semaphore when the pool is invoked the next time.
>>> +         * This is harmless - the loop will spin up a couple of times
>>> +         * doing nothing while the workers are processing their data
>>> +         * slices.
>>> +         */
>>> +        sem_wait(pool->done);
>>> +        for (index = 0; index < pool->size; index++) {
>>> +            test = true;
>>> +            /* If the worker has marked its data chunk as complete,
>>> +             * invoke the helper function to combine the results of
>>> +             * this worker into the main result.
>>> +             *
>>> +             * The worker must invoke an appropriate memory fence
>>> +             * (most likely acq_rel) to ensure that the main thread
>>> +             * sees all of the results produced by the worker.
>>> +             */
>>> +            if (atomic_compare_exchange_weak(
>>> +                    &pool->controls[index].finished,
>>> +                    &test,
>>> +                    false)) {
>>> +                if (helper_func) {
>>> +                    (helper_func)(pool, fin_result, result_frags, index);
>>> +                }
>>> +                completed++;
>>> +                pool->controls[index].data = NULL;
>>> +            }
>>> +        }
>>> +    } while (completed < pool->size);
>>> +}
>>> +
>>> +/* Run a thread pool - basic, does not do results processing.
>>> + */
>>> +
>>> +void ovn_run_pool(struct worker_pool *pool)
>>> +{
>>> +    run_pool_callback(pool, NULL, NULL, NULL);
>>> +}
>>> +
>>> +/* Brute force merge of a hashmap into another hashmap.
>>> + * Intended for use in parallel processing. The destination
>>> + * hashmap MUST be the same size as the one being merged.
>>> + *
>>> + * This can be achieved by pre-allocating them to correct size
>>> + * and using hmap_insert_fast() instead of hmap_insert()
>>> + */
>>> +
>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>>> +{
>>> +    size_t i;
>>> +
>>> +    ovs_assert(inc->mask == dest->mask);
>>> +
>>> +    if (!inc->n) {
>>> +        /* Request to merge an empty frag, nothing to do */
>>> +        return;
>>> +    }
>>> +
>>> +    for (i = 0; i <= dest->mask; i++) {
>>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>>> +        if (*inc_bucket != NULL) {
>>> +            struct hmap_node *last_node = *inc_bucket;
>>> +            while (last_node->next != NULL) {
>>> +                last_node = last_node->next;
>>> +            }
>>> +            last_node->next = *dest_bucket;
>>> +            *dest_bucket = *inc_bucket;
>>> +            *inc_bucket = NULL;
>>> +        }
>>> +    }
>>> +    dest->n += inc->n;
>>> +    inc->n = 0;
>>> +}
>>> +
>>> +/* Run a thread pool which gathers results in an array
>>> + * of hashes. Merge results.
>>> + */
>>> +
>>> +
>>> +void ovn_run_pool_hash(
>>> +        struct worker_pool *pool,
>>> +        struct hmap *result,
>>> +        struct hmap *result_frags)
>>> +{
>>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
>>> +}
>>> +
>>> +/* Run a thread pool which gathers results in an array of lists.
>>> + * Merge results.
>>> + */
>>> +void ovn_run_pool_list(
>>> +        struct worker_pool *pool,
>>> +        struct ovs_list *result,
>>> +        struct ovs_list *result_frags)
>>> +{
>>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
>>> +}
>>> +
>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>>> +{
>>> +    int i;
>>> +    if (hrl->mask != lflows->mask) {
>>> +        if (hrl->row_locks) {
>>> +            free(hrl->row_locks);
>>> +        }
>>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
>>> +        hrl->mask = lflows->mask;
>>> +        for (i = 0; i <= lflows->mask; i++) {
>>> +            ovs_mutex_init(&hrl->row_locks[i]);
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>>> +    int i;
>>> +    static struct worker_pool *pool;
>>> +    char sem_name[256];
>>> +
>>> +    workers_must_exit = true;
>>> +
>>> +    /* All workers must honour the must_exit flag and check for it regularly.
>>> +     * We can make it atomic and check it via atomics in workers, but that
>>> +     * is not really necessary as it is set just once - when the program
>>> +     * terminates. So we use a fence which is invoked before exiting instead.
>>> +     */
>>> +    atomic_thread_fence(memory_order_acq_rel);
>>> +
>>> +    /* Wake up the workers after the must_exit flag has been set */
>>> +
>>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>>> +        for (i = 0; i < pool->size ; i++) {
>>> +            sem_post(pool->controls[i].fire);
>>> +        }
>>> +        for (i = 0; i < pool->size ; i++) {
>>> +            sem_close(pool->controls[i].fire);
>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>>> +            sem_unlink(sem_name);
>>> +        }
>>> +        sem_close(pool->done);
>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>>> +        sem_unlink(sem_name);
>>> +    }
>>> +}
>>> +
>>> +static void setup_worker_pools(bool force) {
>>> +    int cores, nodes;
>>> +
>>> +    nodes = ovs_numa_get_n_numas();
>>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>>> +        nodes = 1;
>>> +    }
>>> +    cores = ovs_numa_get_n_cores();
>>> +
>>> +    /* If there is no NUMA config, use 4 cores.
>>> +     * If there is NUMA config use half the cores on
>>> +     * one node so that the OS does not start pushing
>>> +     * threads to other nodes.
>>> +     */
>>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>>> +        /* If there is no NUMA we can try the ovs-threads routine.
>>> +         * It falls back to sysconf and/or affinity mask.
>>> +         */
>>> +        cores = count_cpu_cores();
>>> +        pool_size = cores;
>>> +    } else {
>>> +        pool_size = cores / nodes;
>>> +    }
>>> +    if ((pool_size < 4) && force) {
>>> +        pool_size = 4;
>>> +    }
>>> +    can_parallelize = (pool_size >= 3);
>>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>>> +    sembase = random_uint32();
>>> +}
>>> +
>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>> +                               void *fin_result, void *result_frags,
>>> +                               int index)
>>> +{
>>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
>>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>>> +
>>> +    if (!ovs_list_is_empty(&res_frags[index])) {
>>> +        ovs_list_splice(result->next,
>>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
>>> +    }
>>> +}
>>> +
>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>> +                               void *fin_result, void *result_frags,
>>> +                               int index)
>>> +{
>>> +    struct hmap *result = (struct hmap *)fin_result;
>>> +    struct hmap *res_frags = (struct hmap *)result_frags;
>>> +
>>> +    fast_hmap_merge(result, &res_frags[index]);
>>> +    hmap_destroy(&res_frags[index]);
>>> +}
>>> +
>>> +#endif
>>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>>> new file mode 100644
>>> index 000000000..71ad17fb0
>>> --- /dev/null
>>> +++ b/lib/ovn-parallel-hmap.h
>>> @@ -0,0 +1,285 @@
>>> +/*
>>> + * Copyright (c) 2020 Red Hat, Inc.
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#ifndef OVN_PARALLEL_HMAP
>>> +#define OVN_PARALLEL_HMAP 1
>>> +
>>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>>> + * we skip over the ovn specific definitions.
>>> + */
>>> +
>>> +#ifdef  __cplusplus
>>> +extern "C" {
>>> +#endif
>>> +
>>> +#include <stdbool.h>
>>> +#include <stdlib.h>
>>> +#include <semaphore.h>
>>> +#include "openvswitch/util.h"
>>> +#include "openvswitch/hmap.h"
>>> +#include "openvswitch/thread.h"
>>> +#include "ovs-atomic.h"
>>> +
>>> +/* Process this include only if OVS does not supply parallel definitions
>>> + */
>>> +
>>> +#ifdef OVS_HAS_PARALLEL_HMAP
>>> +
>>> +#include "parallel-hmap.h"
>>> +
>>> +#else
>>> +
>>> +
>>> +#ifdef __clang__
>>> +#pragma clang diagnostic push
>>> +#pragma clang diagnostic ignored "-Wthread-safety"
>>> +#endif
>> I think you missed addressing my comment I provided in v13 to
>> add some comments on why this is required.
>>
>>
>>> +
>>> +
>>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>>> + * of parallel processing.
>>> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
>>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>>> + * ThreadID + step * 2, etc. The actual macro accepts
>>> + * ThreadID + step * i as the JOBID parameter.
>>> + */
>>> +
>>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
>>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>>> +       || ((NODE = NULL), false); \
>>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
>>> +
>>> +/* We do not have a SAFE version of the macro, because the hash size is not
>>> + * atomic and hash removal operations would need to be wrapped with
>>> + * locks. This will defeat most of the benefits from doing anything in
>>> + * parallel.
>>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
>>> + * each thread should store them in a temporary list result instead, merging
>>> + * the lists into a combined result at the end */
>>> +
>>> +/* Work "Handle" */
>>> +
>>> +struct worker_control {
>>> +    int id; /* Used as a modulo when iterating over a hash. */
>>> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
>>> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
>>> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
>>> +    struct ovs_mutex mutex; /* Guards the data. */
>>> +    void *data; /* Pointer to data to be processed. */
>>> +    void *workload; /* back-pointer to the worker pool structure. */
>>> +};
>>> +
>>> +struct worker_pool {
>>> +    int size;   /* Number of threads in the pool. */
>>> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>>> +    struct worker_control *controls; /* "Handles" in this pool. */
>>> +    sem_t *done; /* Work completion semaphorew. */
>>> +};
>>> +
>>> +/* Add a worker pool for thread function start() which expects a pointer to
>>> + * a worker_control structure as an argument. */
>>> +
>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>>> +
>>> +/* Setting this to true will make all processing threads exit */
>>> +
>>> +bool ovn_stop_parallel_processing(void);
>>> +
>>> +/* Build a hmap pre-sized for size elements */
>>> +
>>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>>> +
>>> +/* Build a hmap with a mask equals to size */
>>> +
>>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>>> +
>>> +/* Brute-force merge a hmap into hmap.
>>> + * Dest and inc have to have the same mask. The merge is performed
>>> + * by extending the element list for bucket N in the dest hmap with the list
>>> + * from bucket N in inc.
>>> + */
>>> +
>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>>> +
>>> +/* Run a pool, without any default processing of results.
>>> + */
>>> +
>>> +void ovn_run_pool(struct worker_pool *pool);
>>> +
>>> +/* Run a pool, merge results from hash frags into a final hash result.
>>> + * The hash frags must be pre-sized to the same size.
>>> + */
>>> +
>>> +void ovn_run_pool_hash(struct worker_pool *pool,
>>> +                       struct hmap *result, struct hmap *result_frags);
>>> +/* Run a pool, merge results from list frags into a final list result.
>>> + */
>>> +
>>> +void ovn_run_pool_list(struct worker_pool *pool,
>>> +                       struct ovs_list *result, struct ovs_list *result_frags);
>>> +
>>> +/* Run a pool, call a callback function to perform processing of results.
>>> + */
>>> +
>>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>>> +                    void *result_frags,
>>> +                    void (*helper_func)(struct worker_pool *pool,
>>> +                        void *fin_result, void *result_frags, int index));
>>> +
>>> +
>>> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>>> + * would land, or a null pointer if that bucket is empty. */
>>> +
>>> +static inline struct hmap_node *
>>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>>> +{
>>> +    return hmap->buckets[num];
>>> +}
>>> +
>>> +static inline struct hmap_node *
>>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
>>> +{
>>> +    size_t i;
>>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>>> +        struct hmap_node *node = hmap->buckets[i];
>>> +        if (node) {
>>> +            return node;
>>> +        }
>>> +    }
>>> +    return NULL;
>>> +}
>>> +
>>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>>> + * for parallel processing in arbitrary order, or a null pointer if
>>> + * the slice of 'hmap' for that job_id is empty. */
>>> +static inline struct hmap_node *
>>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
>>> +{
>>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>>> +}
>>> +
>>> +/* Returns the next node in the slice of 'hmap' following 'node',
>>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
>>> + * the 'hmap' slice.
>>> + *
>>> + */
>>> +static inline struct hmap_node *
>>> +parallel_hmap_next(const struct hmap *hmap,
>>> +                   const struct hmap_node *node, ssize_t pool_size)
>>> +{
>>> +    return (node->next
>>> +            ? node->next
>>> +            : parallel_hmap_next__(hmap,
>>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>>> +}
>>> +
>>> +static inline void post_completed_work(struct worker_control *control)
>>> +{
>>> +    atomic_thread_fence(memory_order_acq_rel);
>>> +    atomic_store_relaxed(&control->finished, true);
>>> +    sem_post(control->done);
>>> +}
>>> +
>>> +static inline void wait_for_work(struct worker_control *control)
>>> +{
>>> +    sem_wait(control->fire);
>>> +}
>>> +
>>> +/* Hash per-row locking support - to be used only in conjunction
>>> + * with fast hash inserts. Normal hash inserts may resize the hash
>>> + * rendering the locking invalid.
>>> + */
>>> +
>>> +struct hashrow_locks {
>>> +    ssize_t mask;
>>> +    struct ovs_mutex *row_locks;
>>> +};
>>> +
>>> +/* Update an hash row locks structure to match the current hash size */
>>> +
>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
>>> +
>>> +/* Lock a hash row */
>>> +
>>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>> +{
>>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>>> +}
>>> +
>>> +/* Unlock a hash row */
>>> +
>>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>> +{
>>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>>> +}
>>> +/* Init the row locks structure */
>>> +
>>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>>> +{
>>> +    hrl->mask = 0;
>>> +    hrl->row_locks = NULL;
>>> +}
>>> +
>>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>>> +
>>> +/* Use the OVN library functions for stuff which OVS has not defined
>>> + * If OVS has defined these, they will still compile using the OVN
>>> + * local names, but will be dropped by the linker in favour of the OVS
>>> + * supplied functions.
>>> + */
>>> +
>>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>>> +
>>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>>> +
>>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>>> +
>>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>>> +
>>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
>>> +
>>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>>> +
>>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>>> +
>>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>>> +
>>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>>> +
>>> +#define run_pool_hash(pool, result, result_frags) \
>>> +    ovn_run_pool_hash(pool, result, result_frags)
>>> +
>>> +#define run_pool_list(pool, result, result_frags) \
>>> +    ovn_run_pool_list(pool, result, result_frags)
>>> +
>>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
>>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>>> +
>>> +
>> In my opinion, we can switch over to making use of OVS APIs for fast_hmap
>> once the patches in OVS are merged.  Until then I think we should just assume
>> that these functions are part of OVN lib and consume them directly.
>>
>> It's possible that the function names could change when those patches
>> land in OVS.
>>
>> Thanks
>> Numan
>>
>>> +
>>> +#ifdef __clang__
>>> +#pragma clang diagnostic pop
>>> +#endif
>>> +
>>> +#endif
>>> +
>>> +#ifdef  __cplusplus
>>> +}
>>> +#endif
>>> +
>>> +
>>> +#endif /* lib/fasthmap.h */
>>> -- 
>>> 2.20.1
>>>
>>> _______________________________________________
>>> dev mailing list
>>> dev@openvswitch.org
>>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>>
Numan Siddique Feb. 25, 2021, 2:29 p.m. UTC | #7
On Thu, Feb 25, 2021 at 7:33 PM Anton Ivanov
<anton.ivanov@cambridgegreys.com> wrote:
>
> Found the most likely culprit.
>
> This is similar to this: https://bugzilla.redhat.com/show_bug.cgi?id=663584
> this: https://bugzilla.redhat.com/show_bug.cgi?id=1554955
> and god knows how many others.
>
> Selinux is "securing" your semaphores.

I disabled selinux (permissive) and I still see the same behavior


>
> A.
>
> On 25/02/2021 13:27, Anton Ivanov wrote:
> >
> > On 25/02/2021 12:41, Numan Siddique wrote:
> >> On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov
> >> <anton.ivanov@cambridgegreys.com> wrote:
> >>> This adds a set of functions and macros intended to process
> >>> hashes in parallel.
> >>>
> >>> The principles of operation are documented in the fasthmap.h
> >>>
> >>> If these one day go into the OVS tree, the OVS tree versions
> >>> would be used in preference.
> >>>
> >>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> >> Hi Anton,
> >>
> >> I still see problems with this patch set.
> >>
> >> If I apply the first 2 patches and run the tests, most of the test
> >> cases fail or hang.
> >>
> >> When configured with gcc and address sanitizer,  the test case fails -
> >> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
> >> localnet ports with same tags (ovn.at:2970): FAILED
> >> (ovs-macros.at:219) and I see the below in the testsuite.log
> >>
> >> *********
> >> clean up OVN
> >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
> >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
> >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> >> ovn.at:3091: wait succeeded immediately
> >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
> >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
> >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> >> ovn.at:3091: wait succeeded immediately
> >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
> >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
> >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> >> ovn.at:3091: wait succeeded after 1 seconds
> >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
> >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
> >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> >> ovn.at:3091: wait succeeded quickly
> >>
> >> main: clean up vswitch
> >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid
> >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit --cleanup
> >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> >> ovn.at:3091: wait succeeded quickly
> >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
> >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
> >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
> >> ovn.at:3091: wait succeeded immediately
> >> Address Sanitizer reported errors in: asan.2986645
> >> =================================================================
> >> ==2986645==ERROR: AddressSanitizer: SEGV on unknown address
> >> 0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80
> >> T2)
> >> ==2986645==The signal is caused by a READ memory access.
> >> ../../tests/ovs-macros.at:219: hard failure
> >> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
> >> localnet ports with same tags (ovn.at:2970): FAILED
> >> (ovs-macros.at:219)
> >>
> >> ******
> >>
> >> The same is observed for many test cases.
> >>
> >> When I configure clang and run the tests, all the tests pass, but I
> >> see lot of coredumps.
> >> I think I had reported this earlier too.
> >
> > I have absolutely no idea how you get this. Seriously.
> >
> > I have been unable to reproduce this
> >
> > The only thing that comes to mind is to "fuzz" the semaphore creation and introduce random failures there to see if the error logic works. I can't see any logical problems in it via code inspection.
> >
> >>
> >> Here is the backtrace
> >> -----
> >> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
> >> /lib64/libpthread.so.0
> >> [Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))]
> >> Missing separate debuginfos, use: dnf debuginfo-install
> >> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
> >> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
> >> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
> >> zlib-1.2.11-23.fc33.x86_64
> >> (gdb) bt
> >> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
> >> /lib64/libpthread.so.0
> >> #1  0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at
> >> ../lib/ovn-parallel-hmap.h:193
> >> #2  build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807
> >> #3  0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at
> >> ../lib/ovs-thread.c:383
> >> #4  0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0
> >> #5  0x00001528ad00e903 in clone () from /lib64/libc.so.6
> >> ----
> >>
> >>
> >> I'm sorry but something is not right. Instead of using semaphores, why
> >> can't we use
> >> 'struct latch' for each worker and use it to synchronize between the main thread
> >> and the workers ?
> >
> > We can't because this will conflict with the main poll loop.
> >
> > You need to rewrite the entire northd processing logic and the IO logic to use latch here. Or establish parallel logic - part of the joy of using "thead once" to do poll magic.
> >
> > None of these are a realistic option. I'd rather try to understand exactly what happens on your setup and what makes a sem_open() fail and why that is not handled properly by the error checking code.
> >
> > Can you send the results of "sysctl kernel.sem" please? That's the only way I know to limit semaphores and the usual limits on Linux are in the 1G range.

Here's the output

sysctl kernel.sem
kernel.sem = 32000 1024000000 500 32000

I ran the same tests on the rhel 8 system, I don't see any crashes.


Thanks
Numan
> >
> >>
> >> The usage of the function - can_parallelize_hashes() in ovn-northd.c is very
> >> confusing to me.
> >>
> >> I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only once
> >> and for the subsequent calls, this function will be a no-op (due to
> >> atomic_compare_exchange_strong()).
> >>
> >> Since this function is called before the workers are started, what is
> >> the need to use
> >> atomic_compare_exchange_strong() ?
> >
> > We have ended there over time. There was a point where there was no option and no initial invocation wasn't and we were invoking the set-up on first processing.
> >
> > In fact that is how it is invoked in the OVS codebase port for the parallelized monitors. I would actually like to keep this as an option.
> >
> >>
> >> Let's say we want to add another config option -
> >> force_northd_parallel, and if the user
> >> toggles the value between the runs, then the below code in
> >> ovnnb_db_run() in patch 3
> >>
> >> ---
> >> use_parallel_build = smap_get_bool(&nb->options,
> >> "use_parallel_build", false) &&
> >> ovn_can_parallelize_hashes(false);
> >> --
> >>   will have no effect since setup_worker_pools() is not called later
> >> once the atomic
> >> bool initial_pool_setup is set to true.
> >>
> >> I think we should provide the option to toggle this value at run time.
> >> Also the patch 3 should
> >> add an option to configure the force_parallel.
> >>
> >>> ---
> >>>   lib/automake.mk         |   2 +
> >>>   lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
> >>>   lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
> >>>   3 files changed, 742 insertions(+)
> >>>   create mode 100644 lib/ovn-parallel-hmap.c
> >>>   create mode 100644 lib/ovn-parallel-hmap.h
> >>>
> >>> diff --git a/lib/automake.mk b/lib/automake.mk
> >>> index 250c7aefa..781be2109 100644
> >>> --- a/lib/automake.mk
> >>> +++ b/lib/automake.mk
> >>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
> >>>          lib/expr.c \
> >>>          lib/extend-table.h \
> >>>          lib/extend-table.c \
> >>> +       lib/ovn-parallel-hmap.h \
> >>> +       lib/ovn-parallel-hmap.c \
> >>>          lib/ip-mcast-index.c \
> >>>          lib/ip-mcast-index.h \
> >>>          lib/mcast-group-index.c \
> >>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> >>> new file mode 100644
> >>> index 000000000..06aa95aba
> >>> --- /dev/null
> >>> +++ b/lib/ovn-parallel-hmap.c
> >>> @@ -0,0 +1,455 @@
> >>> +/*
> >>> + * Copyright (c) 2020 Red Hat, Inc.
> >>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> >>> + *
> >>> + * Licensed under the Apache License, Version 2.0 (the "License");
> >>> + * you may not use this file except in compliance with the License.
> >>> + * You may obtain a copy of the License at:
> >>> + *
> >>> + *     http://www.apache.org/licenses/LICENSE-2.0
> >>> + *
> >>> + * Unless required by applicable law or agreed to in writing, software
> >>> + * distributed under the License is distributed on an "AS IS" BASIS,
> >>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> >>> + * See the License for the specific language governing permissions and
> >>> + * limitations under the License.
> >>> + */
> >>> +
> >>> +#include <config.h>
> >>> +#include <stdint.h>
> >>> +#include <string.h>
> >>> +#include <stdlib.h>
> >>> +#include <fcntl.h>
> >>> +#include <unistd.h>
> >>> +#include <errno.h>
> >>> +#include <semaphore.h>
> >>> +#include "fatal-signal.h"
> >>> +#include "util.h"
> >>> +#include "openvswitch/vlog.h"
> >>> +#include "openvswitch/hmap.h"
> >>> +#include "openvswitch/thread.h"
> >>> +#include "ovn-parallel-hmap.h"
> >>> +#include "ovs-atomic.h"
> >>> +#include "ovs-thread.h"
> >>> +#include "ovs-numa.h"
> >>> +#include "random.h"
> >>> +
> >>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
> >>> +
> >>> +#ifndef OVS_HAS_PARALLEL_HMAP
> >>> +
> >>> +#define WORKER_SEM_NAME "%x-%p-%x"
> >>> +#define MAIN_SEM_NAME "%x-%p-main"
> >>> +
> >>> +/* These are accessed under mutex inside add_worker_pool().
> >>> + * They do not need to be atomic.
> >>> + */
> >>> +
> >>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> >>> +static bool can_parallelize = false;
> >>> +
> >>> +/* This is set only in the process of exit and the set is
> >>> + * accompanied by a fence. It does not need to be atomic or be
> >>> + * accessed under a lock.
> >>> + */
> >>> +
> >>> +static bool workers_must_exit = false;
> >>> +
> >>> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> >>> +
> >>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> >>> +
> >>> +static int pool_size;
> >>> +
> >>> +static int sembase;
> >>> +
> >>> +static void worker_pool_hook(void *aux OVS_UNUSED);
> >>> +static void setup_worker_pools(bool force);
> >>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> >>> +                               void *fin_result, void *result_frags,
> >>> +                               int index);
> >>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> >>> +                               void *fin_result, void *result_frags,
> >>> +                               int index);
> >>> +
> >>> +bool ovn_stop_parallel_processing(void)
> >>> +{
> >>> +    return workers_must_exit;
> >>> +}
> >>> +
> >>> +bool ovn_can_parallelize_hashes(bool force_parallel)
> >>> +{
> >>> +    bool test = false;
> >>> +
> >>> +    if (atomic_compare_exchange_strong(
> >>> +            &initial_pool_setup,
> >>> +            &test,
> >>> +            true)) {
> >>> +        ovs_mutex_lock(&init_mutex);
> >>> +        setup_worker_pools(force_parallel);
> >>> +        ovs_mutex_unlock(&init_mutex);
> >>> +    }
> >>> +    return can_parallelize;
> >>> +}
> >>> +
> >>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
> >>> +
> >>> +    struct worker_pool *new_pool = NULL;
> >>> +    struct worker_control *new_control;
> >>> +    bool test = false;
> >>> +    int i;
> >>> +    char sem_name[256];
> >>> +
> >>> +
> >>> +    /* Belt and braces - initialize the pool system just in case if
> >>> +     * if it is not yet initialized.
> >>> +     */
> >>> +
> >>> +    if (atomic_compare_exchange_strong(
> >>> +            &initial_pool_setup,
> >>> +            &test,
> >>> +            true)) {
> >>> +        ovs_mutex_lock(&init_mutex);
> >>> +        setup_worker_pools(false);
> >>> +        ovs_mutex_unlock(&init_mutex);
> >>> +    }
> >>> +
> >>> +    ovs_mutex_lock(&init_mutex);
> >>> +    if (can_parallelize) {
> >>> +        new_pool = xmalloc(sizeof(struct worker_pool));
> >>> +        new_pool->size = pool_size;
> >>> +        new_pool->controls = NULL;
> >>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> >>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> >>> +        if (new_pool->done == SEM_FAILED) {
> >>> +            goto cleanup;
> >>> +        }
> >>> +
> >>> +        new_pool->controls =
> >>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
> >>> +
> >>> +        for (i = 0; i < new_pool->size; i++) {
> >>> +            new_control = &new_pool->controls[i];
> >>> +            new_control->id = i;
> >>> +            new_control->done = new_pool->done;
> >>> +            new_control->data = NULL;
> >>> +            ovs_mutex_init(&new_control->mutex);
> >>> +            new_control->finished = ATOMIC_VAR_INIT(false);
> >>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> >>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> >>> +            if (new_control->fire == SEM_FAILED) {
> >>> +                goto cleanup;
> >>> +            }
> >>> +        }
> >>> +
> >>> +        for (i = 0; i < pool_size; i++) {
> >>> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> >>> +        }
> >>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
> >>> +    }
> >>> +    ovs_mutex_unlock(&init_mutex);
> >>> +    return new_pool;
> >>> +cleanup:
> >>> +
> >>> +    /* Something went wrong when opening semaphores. In this case
> >>> +     * it is better to shut off parallel procesing altogether
> >>> +     */
> >>> +
> >>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
> >>> +    can_parallelize = false;
> >>> +    if (new_pool->controls) {
> >>> +        for (i = 0; i < new_pool->size; i++) {
> >>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
> >>> +                sem_close(new_pool->controls[i].fire);
> >>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> >>> +                sem_unlink(sem_name);
> >>> +                break; /* semaphores past this one are uninitialized */
> >>> +            }
> >>> +        }
> >>> +    }
> >>> +    if (new_pool->done != SEM_FAILED) {
> >>> +        sem_close(new_pool->done);
> >>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> >>> +        sem_unlink(sem_name);
> >>> +    }
> >>> +    ovs_mutex_unlock(&init_mutex);
> >>> +    return NULL;
> >>> +}
> >>> +
> >>> +
> >>> +/* Initializes 'hmap' as an empty hash table with mask N. */
> >>> +void
> >>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> >>> +{
> >>> +    size_t i;
> >>> +
> >>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> >>> +    hmap->one = NULL;
> >>> +    hmap->mask = mask;
> >>> +    hmap->n = 0;
> >>> +    for (i = 0; i <= hmap->mask; i++) {
> >>> +        hmap->buckets[i] = NULL;
> >>> +    }
> >>> +}
> >>> +
> >>> +/* Initializes 'hmap' as an empty hash table of size X.
> >>> + * Intended for use in parallel processing so that all
> >>> + * fragments used to store results in a parallel job
> >>> + * are the same size.
> >>> + */
> >>> +void
> >>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
> >>> +{
> >>> +    size_t mask;
> >>> +    mask = size / 2;
> >>> +    mask |= mask >> 1;
> >>> +    mask |= mask >> 2;
> >>> +    mask |= mask >> 4;
> >>> +    mask |= mask >> 8;
> >>> +    mask |= mask >> 16;
> >>> +#if SIZE_MAX > UINT32_MAX
> >>> +    mask |= mask >> 32;
> >>> +#endif
> >>> +
> >>> +    /* If we need to dynamically allocate buckets we might as well allocate at
> >>> +     * least 4 of them. */
> >>> +    mask |= (mask & 1) << 1;
> >>> +
> >>> +    fast_hmap_init(hmap, mask);
> >>> +}
> >>> +
> >>> +/* Run a thread pool which uses a callback function to process results
> >>> + */
> >>> +
> >>> +void ovn_run_pool_callback(struct worker_pool *pool,
> >>> +                           void *fin_result, void *result_frags,
> >>> +                           void (*helper_func)(struct worker_pool *pool,
> >>> +                                               void *fin_result,
> >>> +                                               void *result_frags, int index))
> >>> +{
> >>> +    int index, completed;
> >>> +
> >>> +    /* Ensure that all worker threads see the same data as the
> >>> +     * main thread.
> >>> +     */
> >>> +
> >>> +    atomic_thread_fence(memory_order_acq_rel);
> >>> +
> >>> +    /* Start workers */
> >>> +
> >>> +    for (index = 0; index < pool->size; index++) {
> >>> +        sem_post(pool->controls[index].fire);
> >>> +    }
> >>> +
> >>> +    completed = 0;
> >>> +
> >>> +    do {
> >>> +        bool test;
> >>> +        /* Note - we do not loop on semaphore until it reaches
> >>> +         * zero, but on pool size/remaining workers.
> >>> +         * This is by design. If the inner loop can handle
> >>> +         * completion for more than one worker within an iteration
> >>> +         * it will do so to ensure no additional iterations and
> >>> +         * waits once all of them are done.
> >>> +         *
> >>> +         * This may result in us having an initial positive value
> >>> +         * of the semaphore when the pool is invoked the next time.
> >>> +         * This is harmless - the loop will spin up a couple of times
> >>> +         * doing nothing while the workers are processing their data
> >>> +         * slices.
> >>> +         */
> >>> +        sem_wait(pool->done);
> >>> +        for (index = 0; index < pool->size; index++) {
> >>> +            test = true;
> >>> +            /* If the worker has marked its data chunk as complete,
> >>> +             * invoke the helper function to combine the results of
> >>> +             * this worker into the main result.
> >>> +             *
> >>> +             * The worker must invoke an appropriate memory fence
> >>> +             * (most likely acq_rel) to ensure that the main thread
> >>> +             * sees all of the results produced by the worker.
> >>> +             */
> >>> +            if (atomic_compare_exchange_weak(
> >>> +                    &pool->controls[index].finished,
> >>> +                    &test,
> >>> +                    false)) {
> >>> +                if (helper_func) {
> >>> +                    (helper_func)(pool, fin_result, result_frags, index);
> >>> +                }
> >>> +                completed++;
> >>> +                pool->controls[index].data = NULL;
> >>> +            }
> >>> +        }
> >>> +    } while (completed < pool->size);
> >>> +}
> >>> +
> >>> +/* Run a thread pool - basic, does not do results processing.
> >>> + */
> >>> +
> >>> +void ovn_run_pool(struct worker_pool *pool)
> >>> +{
> >>> +    run_pool_callback(pool, NULL, NULL, NULL);
> >>> +}
> >>> +
> >>> +/* Brute force merge of a hashmap into another hashmap.
> >>> + * Intended for use in parallel processing. The destination
> >>> + * hashmap MUST be the same size as the one being merged.
> >>> + *
> >>> + * This can be achieved by pre-allocating them to correct size
> >>> + * and using hmap_insert_fast() instead of hmap_insert()
> >>> + */
> >>> +
> >>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> >>> +{
> >>> +    size_t i;
> >>> +
> >>> +    ovs_assert(inc->mask == dest->mask);
> >>> +
> >>> +    if (!inc->n) {
> >>> +        /* Request to merge an empty frag, nothing to do */
> >>> +        return;
> >>> +    }
> >>> +
> >>> +    for (i = 0; i <= dest->mask; i++) {
> >>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
> >>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
> >>> +        if (*inc_bucket != NULL) {
> >>> +            struct hmap_node *last_node = *inc_bucket;
> >>> +            while (last_node->next != NULL) {
> >>> +                last_node = last_node->next;
> >>> +            }
> >>> +            last_node->next = *dest_bucket;
> >>> +            *dest_bucket = *inc_bucket;
> >>> +            *inc_bucket = NULL;
> >>> +        }
> >>> +    }
> >>> +    dest->n += inc->n;
> >>> +    inc->n = 0;
> >>> +}
> >>> +
> >>> +/* Run a thread pool which gathers results in an array
> >>> + * of hashes. Merge results.
> >>> + */
> >>> +
> >>> +
> >>> +void ovn_run_pool_hash(
> >>> +        struct worker_pool *pool,
> >>> +        struct hmap *result,
> >>> +        struct hmap *result_frags)
> >>> +{
> >>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
> >>> +}
> >>> +
> >>> +/* Run a thread pool which gathers results in an array of lists.
> >>> + * Merge results.
> >>> + */
> >>> +void ovn_run_pool_list(
> >>> +        struct worker_pool *pool,
> >>> +        struct ovs_list *result,
> >>> +        struct ovs_list *result_frags)
> >>> +{
> >>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
> >>> +}
> >>> +
> >>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
> >>> +{
> >>> +    int i;
> >>> +    if (hrl->mask != lflows->mask) {
> >>> +        if (hrl->row_locks) {
> >>> +            free(hrl->row_locks);
> >>> +        }
> >>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
> >>> +        hrl->mask = lflows->mask;
> >>> +        for (i = 0; i <= lflows->mask; i++) {
> >>> +            ovs_mutex_init(&hrl->row_locks[i]);
> >>> +        }
> >>> +    }
> >>> +}
> >>> +
> >>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> >>> +    int i;
> >>> +    static struct worker_pool *pool;
> >>> +    char sem_name[256];
> >>> +
> >>> +    workers_must_exit = true;
> >>> +
> >>> +    /* All workers must honour the must_exit flag and check for it regularly.
> >>> +     * We can make it atomic and check it via atomics in workers, but that
> >>> +     * is not really necessary as it is set just once - when the program
> >>> +     * terminates. So we use a fence which is invoked before exiting instead.
> >>> +     */
> >>> +    atomic_thread_fence(memory_order_acq_rel);
> >>> +
> >>> +    /* Wake up the workers after the must_exit flag has been set */
> >>> +
> >>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> >>> +        for (i = 0; i < pool->size ; i++) {
> >>> +            sem_post(pool->controls[i].fire);
> >>> +        }
> >>> +        for (i = 0; i < pool->size ; i++) {
> >>> +            sem_close(pool->controls[i].fire);
> >>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> >>> +            sem_unlink(sem_name);
> >>> +        }
> >>> +        sem_close(pool->done);
> >>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> >>> +        sem_unlink(sem_name);
> >>> +    }
> >>> +}
> >>> +
> >>> +static void setup_worker_pools(bool force) {
> >>> +    int cores, nodes;
> >>> +
> >>> +    nodes = ovs_numa_get_n_numas();
> >>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> >>> +        nodes = 1;
> >>> +    }
> >>> +    cores = ovs_numa_get_n_cores();
> >>> +
> >>> +    /* If there is no NUMA config, use 4 cores.
> >>> +     * If there is NUMA config use half the cores on
> >>> +     * one node so that the OS does not start pushing
> >>> +     * threads to other nodes.
> >>> +     */
> >>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> >>> +        /* If there is no NUMA we can try the ovs-threads routine.
> >>> +         * It falls back to sysconf and/or affinity mask.
> >>> +         */
> >>> +        cores = count_cpu_cores();
> >>> +        pool_size = cores;
> >>> +    } else {
> >>> +        pool_size = cores / nodes;
> >>> +    }
> >>> +    if ((pool_size < 4) && force) {
> >>> +        pool_size = 4;
> >>> +    }
> >>> +    can_parallelize = (pool_size >= 3);
> >>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> >>> +    sembase = random_uint32();
> >>> +}
> >>> +
> >>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> >>> +                               void *fin_result, void *result_frags,
> >>> +                               int index)
> >>> +{
> >>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
> >>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
> >>> +
> >>> +    if (!ovs_list_is_empty(&res_frags[index])) {
> >>> +        ovs_list_splice(result->next,
> >>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
> >>> +    }
> >>> +}
> >>> +
> >>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> >>> +                               void *fin_result, void *result_frags,
> >>> +                               int index)
> >>> +{
> >>> +    struct hmap *result = (struct hmap *)fin_result;
> >>> +    struct hmap *res_frags = (struct hmap *)result_frags;
> >>> +
> >>> +    fast_hmap_merge(result, &res_frags[index]);
> >>> +    hmap_destroy(&res_frags[index]);
> >>> +}
> >>> +
> >>> +#endif
> >>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> >>> new file mode 100644
> >>> index 000000000..71ad17fb0
> >>> --- /dev/null
> >>> +++ b/lib/ovn-parallel-hmap.h
> >>> @@ -0,0 +1,285 @@
> >>> +/*
> >>> + * Copyright (c) 2020 Red Hat, Inc.
> >>> + *
> >>> + * Licensed under the Apache License, Version 2.0 (the "License");
> >>> + * you may not use this file except in compliance with the License.
> >>> + * You may obtain a copy of the License at:
> >>> + *
> >>> + *     http://www.apache.org/licenses/LICENSE-2.0
> >>> + *
> >>> + * Unless required by applicable law or agreed to in writing, software
> >>> + * distributed under the License is distributed on an "AS IS" BASIS,
> >>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> >>> + * See the License for the specific language governing permissions and
> >>> + * limitations under the License.
> >>> + */
> >>> +
> >>> +#ifndef OVN_PARALLEL_HMAP
> >>> +#define OVN_PARALLEL_HMAP 1
> >>> +
> >>> +/* if the parallel macros are defined by hmap.h or any other ovs define
> >>> + * we skip over the ovn specific definitions.
> >>> + */
> >>> +
> >>> +#ifdef  __cplusplus
> >>> +extern "C" {
> >>> +#endif
> >>> +
> >>> +#include <stdbool.h>
> >>> +#include <stdlib.h>
> >>> +#include <semaphore.h>
> >>> +#include "openvswitch/util.h"
> >>> +#include "openvswitch/hmap.h"
> >>> +#include "openvswitch/thread.h"
> >>> +#include "ovs-atomic.h"
> >>> +
> >>> +/* Process this include only if OVS does not supply parallel definitions
> >>> + */
> >>> +
> >>> +#ifdef OVS_HAS_PARALLEL_HMAP
> >>> +
> >>> +#include "parallel-hmap.h"
> >>> +
> >>> +#else
> >>> +
> >>> +
> >>> +#ifdef __clang__
> >>> +#pragma clang diagnostic push
> >>> +#pragma clang diagnostic ignored "-Wthread-safety"
> >>> +#endif
> >> I think you missed addressing my comment I provided in v13 to
> >> add some comments on why this is required.
> >>
> >>
> >>> +
> >>> +
> >>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> >>> + * of parallel processing.
> >>> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
> >>> + * and will iterate hash buckets ThreadID, ThreadID + step,
> >>> + * ThreadID + step * 2, etc. The actual macro accepts
> >>> + * ThreadID + step * i as the JOBID parameter.
> >>> + */
> >>> +
> >>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> >>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
> >>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> >>> +       || ((NODE = NULL), false); \
> >>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
> >>> +
> >>> +/* We do not have a SAFE version of the macro, because the hash size is not
> >>> + * atomic and hash removal operations would need to be wrapped with
> >>> + * locks. This will defeat most of the benefits from doing anything in
> >>> + * parallel.
> >>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> >>> + * each thread should store them in a temporary list result instead, merging
> >>> + * the lists into a combined result at the end */
> >>> +
> >>> +/* Work "Handle" */
> >>> +
> >>> +struct worker_control {
> >>> +    int id; /* Used as a modulo when iterating over a hash. */
> >>> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
> >>> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
> >>> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
> >>> +    struct ovs_mutex mutex; /* Guards the data. */
> >>> +    void *data; /* Pointer to data to be processed. */
> >>> +    void *workload; /* back-pointer to the worker pool structure. */
> >>> +};
> >>> +
> >>> +struct worker_pool {
> >>> +    int size;   /* Number of threads in the pool. */
> >>> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> >>> +    struct worker_control *controls; /* "Handles" in this pool. */
> >>> +    sem_t *done; /* Work completion semaphorew. */
> >>> +};
> >>> +
> >>> +/* Add a worker pool for thread function start() which expects a pointer to
> >>> + * a worker_control structure as an argument. */
> >>> +
> >>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> >>> +
> >>> +/* Setting this to true will make all processing threads exit */
> >>> +
> >>> +bool ovn_stop_parallel_processing(void);
> >>> +
> >>> +/* Build a hmap pre-sized for size elements */
> >>> +
> >>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
> >>> +
> >>> +/* Build a hmap with a mask equals to size */
> >>> +
> >>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
> >>> +
> >>> +/* Brute-force merge a hmap into hmap.
> >>> + * Dest and inc have to have the same mask. The merge is performed
> >>> + * by extending the element list for bucket N in the dest hmap with the list
> >>> + * from bucket N in inc.
> >>> + */
> >>> +
> >>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> >>> +
> >>> +/* Run a pool, without any default processing of results.
> >>> + */
> >>> +
> >>> +void ovn_run_pool(struct worker_pool *pool);
> >>> +
> >>> +/* Run a pool, merge results from hash frags into a final hash result.
> >>> + * The hash frags must be pre-sized to the same size.
> >>> + */
> >>> +
> >>> +void ovn_run_pool_hash(struct worker_pool *pool,
> >>> +                       struct hmap *result, struct hmap *result_frags);
> >>> +/* Run a pool, merge results from list frags into a final list result.
> >>> + */
> >>> +
> >>> +void ovn_run_pool_list(struct worker_pool *pool,
> >>> +                       struct ovs_list *result, struct ovs_list *result_frags);
> >>> +
> >>> +/* Run a pool, call a callback function to perform processing of results.
> >>> + */
> >>> +
> >>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> >>> +                    void *result_frags,
> >>> +                    void (*helper_func)(struct worker_pool *pool,
> >>> +                        void *fin_result, void *result_frags, int index));
> >>> +
> >>> +
> >>> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> >>> + * would land, or a null pointer if that bucket is empty. */
> >>> +
> >>> +static inline struct hmap_node *
> >>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> >>> +{
> >>> +    return hmap->buckets[num];
> >>> +}
> >>> +
> >>> +static inline struct hmap_node *
> >>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
> >>> +{
> >>> +    size_t i;
> >>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
> >>> +        struct hmap_node *node = hmap->buckets[i];
> >>> +        if (node) {
> >>> +            return node;
> >>> +        }
> >>> +    }
> >>> +    return NULL;
> >>> +}
> >>> +
> >>> +/* Returns the first node in 'hmap', as expected by thread with job_id
> >>> + * for parallel processing in arbitrary order, or a null pointer if
> >>> + * the slice of 'hmap' for that job_id is empty. */
> >>> +static inline struct hmap_node *
> >>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
> >>> +{
> >>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
> >>> +}
> >>> +
> >>> +/* Returns the next node in the slice of 'hmap' following 'node',
> >>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> >>> + * the 'hmap' slice.
> >>> + *
> >>> + */
> >>> +static inline struct hmap_node *
> >>> +parallel_hmap_next(const struct hmap *hmap,
> >>> +                   const struct hmap_node *node, ssize_t pool_size)
> >>> +{
> >>> +    return (node->next
> >>> +            ? node->next
> >>> +            : parallel_hmap_next__(hmap,
> >>> +                (node->hash & hmap->mask) + pool_size, pool_size));
> >>> +}
> >>> +
> >>> +static inline void post_completed_work(struct worker_control *control)
> >>> +{
> >>> +    atomic_thread_fence(memory_order_acq_rel);
> >>> +    atomic_store_relaxed(&control->finished, true);
> >>> +    sem_post(control->done);
> >>> +}
> >>> +
> >>> +static inline void wait_for_work(struct worker_control *control)
> >>> +{
> >>> +    sem_wait(control->fire);
> >>> +}
> >>> +
> >>> +/* Hash per-row locking support - to be used only in conjunction
> >>> + * with fast hash inserts. Normal hash inserts may resize the hash
> >>> + * rendering the locking invalid.
> >>> + */
> >>> +
> >>> +struct hashrow_locks {
> >>> +    ssize_t mask;
> >>> +    struct ovs_mutex *row_locks;
> >>> +};
> >>> +
> >>> +/* Update an hash row locks structure to match the current hash size */
> >>> +
> >>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
> >>> +
> >>> +/* Lock a hash row */
> >>> +
> >>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> >>> +{
> >>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
> >>> +}
> >>> +
> >>> +/* Unlock a hash row */
> >>> +
> >>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> >>> +{
> >>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
> >>> +}
> >>> +/* Init the row locks structure */
> >>> +
> >>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
> >>> +{
> >>> +    hrl->mask = 0;
> >>> +    hrl->row_locks = NULL;
> >>> +}
> >>> +
> >>> +bool ovn_can_parallelize_hashes(bool force_parallel);
> >>> +
> >>> +/* Use the OVN library functions for stuff which OVS has not defined
> >>> + * If OVS has defined these, they will still compile using the OVN
> >>> + * local names, but will be dropped by the linker in favour of the OVS
> >>> + * supplied functions.
> >>> + */
> >>> +
> >>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
> >>> +
> >>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> >>> +
> >>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
> >>> +
> >>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
> >>> +
> >>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
> >>> +
> >>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
> >>> +
> >>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
> >>> +
> >>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
> >>> +
> >>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
> >>> +
> >>> +#define run_pool_hash(pool, result, result_frags) \
> >>> +    ovn_run_pool_hash(pool, result, result_frags)
> >>> +
> >>> +#define run_pool_list(pool, result, result_frags) \
> >>> +    ovn_run_pool_list(pool, result, result_frags)
> >>> +
> >>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
> >>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
> >>> +
> >>> +
> >> In my opinion, we can switch over to making use of OVS APIs for fast_hmap
> >> once the patches in OVS are merged.  Until then I think we should just assume
> >> that these functions are part of OVN lib and consume them directly.
> >>
> >> It's possible that the function names could change when those patches
> >> land in OVS.
> >>
> >> Thanks
> >> Numan
> >>
> >>> +
> >>> +#ifdef __clang__
> >>> +#pragma clang diagnostic pop
> >>> +#endif
> >>> +
> >>> +#endif
> >>> +
> >>> +#ifdef  __cplusplus
> >>> +}
> >>> +#endif
> >>> +
> >>> +
> >>> +#endif /* lib/fasthmap.h */
> >>> --
> >>> 2.20.1
> >>>
> >>> _______________________________________________
> >>> dev mailing list
> >>> dev@openvswitch.org
> >>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> >>>
>
> --
> Anton R. Ivanov
> Cambridgegreys Limited. Registered in England. Company Number 10273661
> https://www.cambridgegreys.com/
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Anton Ivanov Feb. 25, 2021, 2:33 p.m. UTC | #8
On 25/02/2021 14:29, Numan Siddique wrote:
> On Thu, Feb 25, 2021 at 7:33 PM Anton Ivanov
> <anton.ivanov@cambridgegreys.com> wrote:
>> Found the most likely culprit.
>>
>> This is similar to this: https://bugzilla.redhat.com/show_bug.cgi?id=663584
>> this: https://bugzilla.redhat.com/show_bug.cgi?id=1554955
>> and god knows how many others.
>>
>> Selinux is "securing" your semaphores.
> I disabled selinux (permissive) and I still see the same behavior

I have set-up a couple of VMs to run it with different CPU numbers/settings and it is always passing, no cores.

A.

>
>
>> A.
>>
>> On 25/02/2021 13:27, Anton Ivanov wrote:
>>> On 25/02/2021 12:41, Numan Siddique wrote:
>>>> On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov
>>>> <anton.ivanov@cambridgegreys.com> wrote:
>>>>> This adds a set of functions and macros intended to process
>>>>> hashes in parallel.
>>>>>
>>>>> The principles of operation are documented in the fasthmap.h
>>>>>
>>>>> If these one day go into the OVS tree, the OVS tree versions
>>>>> would be used in preference.
>>>>>
>>>>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>>> Hi Anton,
>>>>
>>>> I still see problems with this patch set.
>>>>
>>>> If I apply the first 2 patches and run the tests, most of the test
>>>> cases fail or hang.
>>>>
>>>> When configured with gcc and address sanitizer,  the test case fails -
>>>> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
>>>> localnet ports with same tags (ovn.at:2970): FAILED
>>>> (ovs-macros.at:219) and I see the below in the testsuite.log
>>>>
>>>> *********
>>>> clean up OVN
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded immediately
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded immediately
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded after 1 seconds
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded quickly
>>>>
>>>> main: clean up vswitch
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit --cleanup
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded quickly
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded immediately
>>>> Address Sanitizer reported errors in: asan.2986645
>>>> =================================================================
>>>> ==2986645==ERROR: AddressSanitizer: SEGV on unknown address
>>>> 0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80
>>>> T2)
>>>> ==2986645==The signal is caused by a READ memory access.
>>>> ../../tests/ovs-macros.at:219: hard failure
>>>> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
>>>> localnet ports with same tags (ovn.at:2970): FAILED
>>>> (ovs-macros.at:219)
>>>>
>>>> ******
>>>>
>>>> The same is observed for many test cases.
>>>>
>>>> When I configure clang and run the tests, all the tests pass, but I
>>>> see lot of coredumps.
>>>> I think I had reported this earlier too.
>>> I have absolutely no idea how you get this. Seriously.
>>>
>>> I have been unable to reproduce this
>>>
>>> The only thing that comes to mind is to "fuzz" the semaphore creation and introduce random failures there to see if the error logic works. I can't see any logical problems in it via code inspection.
>>>
>>>> Here is the backtrace
>>>> -----
>>>> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
>>>> /lib64/libpthread.so.0
>>>> [Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))]
>>>> Missing separate debuginfos, use: dnf debuginfo-install
>>>> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
>>>> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
>>>> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
>>>> zlib-1.2.11-23.fc33.x86_64
>>>> (gdb) bt
>>>> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
>>>> /lib64/libpthread.so.0
>>>> #1  0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at
>>>> ../lib/ovn-parallel-hmap.h:193
>>>> #2  build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807
>>>> #3  0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at
>>>> ../lib/ovs-thread.c:383
>>>> #4  0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0
>>>> #5  0x00001528ad00e903 in clone () from /lib64/libc.so.6
>>>> ----
>>>>
>>>>
>>>> I'm sorry but something is not right. Instead of using semaphores, why
>>>> can't we use
>>>> 'struct latch' for each worker and use it to synchronize between the main thread
>>>> and the workers ?
>>> We can't because this will conflict with the main poll loop.
>>>
>>> You need to rewrite the entire northd processing logic and the IO logic to use latch here. Or establish parallel logic - part of the joy of using "thead once" to do poll magic.
>>>
>>> None of these are a realistic option. I'd rather try to understand exactly what happens on your setup and what makes a sem_open() fail and why that is not handled properly by the error checking code.
>>>
>>> Can you send the results of "sysctl kernel.sem" please? That's the only way I know to limit semaphores and the usual limits on Linux are in the 1G range.
> Here's the output
>
> sysctl kernel.sem
> kernel.sem = 32000 1024000000 500 32000
>
> I ran the same tests on the rhel 8 system, I don't see any crashes.
>
>
> Thanks
> Numan
>>>> The usage of the function - can_parallelize_hashes() in ovn-northd.c is very
>>>> confusing to me.
>>>>
>>>> I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only once
>>>> and for the subsequent calls, this function will be a no-op (due to
>>>> atomic_compare_exchange_strong()).
>>>>
>>>> Since this function is called before the workers are started, what is
>>>> the need to use
>>>> atomic_compare_exchange_strong() ?
>>> We have ended there over time. There was a point where there was no option and no initial invocation wasn't and we were invoking the set-up on first processing.
>>>
>>> In fact that is how it is invoked in the OVS codebase port for the parallelized monitors. I would actually like to keep this as an option.
>>>
>>>> Let's say we want to add another config option -
>>>> force_northd_parallel, and if the user
>>>> toggles the value between the runs, then the below code in
>>>> ovnnb_db_run() in patch 3
>>>>
>>>> ---
>>>> use_parallel_build = smap_get_bool(&nb->options,
>>>> "use_parallel_build", false) &&
>>>> ovn_can_parallelize_hashes(false);
>>>> --
>>>>    will have no effect since setup_worker_pools() is not called later
>>>> once the atomic
>>>> bool initial_pool_setup is set to true.
>>>>
>>>> I think we should provide the option to toggle this value at run time.
>>>> Also the patch 3 should
>>>> add an option to configure the force_parallel.
>>>>
>>>>> ---
>>>>>    lib/automake.mk         |   2 +
>>>>>    lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>>>>>    lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>>>>>    3 files changed, 742 insertions(+)
>>>>>    create mode 100644 lib/ovn-parallel-hmap.c
>>>>>    create mode 100644 lib/ovn-parallel-hmap.h
>>>>>
>>>>> diff --git a/lib/automake.mk b/lib/automake.mk
>>>>> index 250c7aefa..781be2109 100644
>>>>> --- a/lib/automake.mk
>>>>> +++ b/lib/automake.mk
>>>>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>>>>           lib/expr.c \
>>>>>           lib/extend-table.h \
>>>>>           lib/extend-table.c \
>>>>> +       lib/ovn-parallel-hmap.h \
>>>>> +       lib/ovn-parallel-hmap.c \
>>>>>           lib/ip-mcast-index.c \
>>>>>           lib/ip-mcast-index.h \
>>>>>           lib/mcast-group-index.c \
>>>>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>>>>> new file mode 100644
>>>>> index 000000000..06aa95aba
>>>>> --- /dev/null
>>>>> +++ b/lib/ovn-parallel-hmap.c
>>>>> @@ -0,0 +1,455 @@
>>>>> +/*
>>>>> + * Copyright (c) 2020 Red Hat, Inc.
>>>>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>>>>> + *
>>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>>> + * you may not use this file except in compliance with the License.
>>>>> + * You may obtain a copy of the License at:
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +
>>>>> +#include <config.h>
>>>>> +#include <stdint.h>
>>>>> +#include <string.h>
>>>>> +#include <stdlib.h>
>>>>> +#include <fcntl.h>
>>>>> +#include <unistd.h>
>>>>> +#include <errno.h>
>>>>> +#include <semaphore.h>
>>>>> +#include "fatal-signal.h"
>>>>> +#include "util.h"
>>>>> +#include "openvswitch/vlog.h"
>>>>> +#include "openvswitch/hmap.h"
>>>>> +#include "openvswitch/thread.h"
>>>>> +#include "ovn-parallel-hmap.h"
>>>>> +#include "ovs-atomic.h"
>>>>> +#include "ovs-thread.h"
>>>>> +#include "ovs-numa.h"
>>>>> +#include "random.h"
>>>>> +
>>>>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>>>> +
>>>>> +#ifndef OVS_HAS_PARALLEL_HMAP
>>>>> +
>>>>> +#define WORKER_SEM_NAME "%x-%p-%x"
>>>>> +#define MAIN_SEM_NAME "%x-%p-main"
>>>>> +
>>>>> +/* These are accessed under mutex inside add_worker_pool().
>>>>> + * They do not need to be atomic.
>>>>> + */
>>>>> +
>>>>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>>>>> +static bool can_parallelize = false;
>>>>> +
>>>>> +/* This is set only in the process of exit and the set is
>>>>> + * accompanied by a fence. It does not need to be atomic or be
>>>>> + * accessed under a lock.
>>>>> + */
>>>>> +
>>>>> +static bool workers_must_exit = false;
>>>>> +
>>>>> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>>>>> +
>>>>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>>>>> +
>>>>> +static int pool_size;
>>>>> +
>>>>> +static int sembase;
>>>>> +
>>>>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>>>>> +static void setup_worker_pools(bool force);
>>>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index);
>>>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index);
>>>>> +
>>>>> +bool ovn_stop_parallel_processing(void)
>>>>> +{
>>>>> +    return workers_must_exit;
>>>>> +}
>>>>> +
>>>>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>>>>> +{
>>>>> +    bool test = false;
>>>>> +
>>>>> +    if (atomic_compare_exchange_strong(
>>>>> +            &initial_pool_setup,
>>>>> +            &test,
>>>>> +            true)) {
>>>>> +        ovs_mutex_lock(&init_mutex);
>>>>> +        setup_worker_pools(force_parallel);
>>>>> +        ovs_mutex_unlock(&init_mutex);
>>>>> +    }
>>>>> +    return can_parallelize;
>>>>> +}
>>>>> +
>>>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>>>>> +
>>>>> +    struct worker_pool *new_pool = NULL;
>>>>> +    struct worker_control *new_control;
>>>>> +    bool test = false;
>>>>> +    int i;
>>>>> +    char sem_name[256];
>>>>> +
>>>>> +
>>>>> +    /* Belt and braces - initialize the pool system just in case if
>>>>> +     * if it is not yet initialized.
>>>>> +     */
>>>>> +
>>>>> +    if (atomic_compare_exchange_strong(
>>>>> +            &initial_pool_setup,
>>>>> +            &test,
>>>>> +            true)) {
>>>>> +        ovs_mutex_lock(&init_mutex);
>>>>> +        setup_worker_pools(false);
>>>>> +        ovs_mutex_unlock(&init_mutex);
>>>>> +    }
>>>>> +
>>>>> +    ovs_mutex_lock(&init_mutex);
>>>>> +    if (can_parallelize) {
>>>>> +        new_pool = xmalloc(sizeof(struct worker_pool));
>>>>> +        new_pool->size = pool_size;
>>>>> +        new_pool->controls = NULL;
>>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>>>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>>>> +        if (new_pool->done == SEM_FAILED) {
>>>>> +            goto cleanup;
>>>>> +        }
>>>>> +
>>>>> +        new_pool->controls =
>>>>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>>>>> +
>>>>> +        for (i = 0; i < new_pool->size; i++) {
>>>>> +            new_control = &new_pool->controls[i];
>>>>> +            new_control->id = i;
>>>>> +            new_control->done = new_pool->done;
>>>>> +            new_control->data = NULL;
>>>>> +            ovs_mutex_init(&new_control->mutex);
>>>>> +            new_control->finished = ATOMIC_VAR_INIT(false);
>>>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>>>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>>>> +            if (new_control->fire == SEM_FAILED) {
>>>>> +                goto cleanup;
>>>>> +            }
>>>>> +        }
>>>>> +
>>>>> +        for (i = 0; i < pool_size; i++) {
>>>>> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>>>>> +        }
>>>>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>>>> +    }
>>>>> +    ovs_mutex_unlock(&init_mutex);
>>>>> +    return new_pool;
>>>>> +cleanup:
>>>>> +
>>>>> +    /* Something went wrong when opening semaphores. In this case
>>>>> +     * it is better to shut off parallel procesing altogether
>>>>> +     */
>>>>> +
>>>>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>>>>> +    can_parallelize = false;
>>>>> +    if (new_pool->controls) {
>>>>> +        for (i = 0; i < new_pool->size; i++) {
>>>>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
>>>>> +                sem_close(new_pool->controls[i].fire);
>>>>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>>>> +                sem_unlink(sem_name);
>>>>> +                break; /* semaphores past this one are uninitialized */
>>>>> +            }
>>>>> +        }
>>>>> +    }
>>>>> +    if (new_pool->done != SEM_FAILED) {
>>>>> +        sem_close(new_pool->done);
>>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>>>> +        sem_unlink(sem_name);
>>>>> +    }
>>>>> +    ovs_mutex_unlock(&init_mutex);
>>>>> +    return NULL;
>>>>> +}
>>>>> +
>>>>> +
>>>>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>>>>> +void
>>>>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>>>>> +{
>>>>> +    size_t i;
>>>>> +
>>>>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>>>>> +    hmap->one = NULL;
>>>>> +    hmap->mask = mask;
>>>>> +    hmap->n = 0;
>>>>> +    for (i = 0; i <= hmap->mask; i++) {
>>>>> +        hmap->buckets[i] = NULL;
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +/* Initializes 'hmap' as an empty hash table of size X.
>>>>> + * Intended for use in parallel processing so that all
>>>>> + * fragments used to store results in a parallel job
>>>>> + * are the same size.
>>>>> + */
>>>>> +void
>>>>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>>>>> +{
>>>>> +    size_t mask;
>>>>> +    mask = size / 2;
>>>>> +    mask |= mask >> 1;
>>>>> +    mask |= mask >> 2;
>>>>> +    mask |= mask >> 4;
>>>>> +    mask |= mask >> 8;
>>>>> +    mask |= mask >> 16;
>>>>> +#if SIZE_MAX > UINT32_MAX
>>>>> +    mask |= mask >> 32;
>>>>> +#endif
>>>>> +
>>>>> +    /* If we need to dynamically allocate buckets we might as well allocate at
>>>>> +     * least 4 of them. */
>>>>> +    mask |= (mask & 1) << 1;
>>>>> +
>>>>> +    fast_hmap_init(hmap, mask);
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool which uses a callback function to process results
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_callback(struct worker_pool *pool,
>>>>> +                           void *fin_result, void *result_frags,
>>>>> +                           void (*helper_func)(struct worker_pool *pool,
>>>>> +                                               void *fin_result,
>>>>> +                                               void *result_frags, int index))
>>>>> +{
>>>>> +    int index, completed;
>>>>> +
>>>>> +    /* Ensure that all worker threads see the same data as the
>>>>> +     * main thread.
>>>>> +     */
>>>>> +
>>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>>> +
>>>>> +    /* Start workers */
>>>>> +
>>>>> +    for (index = 0; index < pool->size; index++) {
>>>>> +        sem_post(pool->controls[index].fire);
>>>>> +    }
>>>>> +
>>>>> +    completed = 0;
>>>>> +
>>>>> +    do {
>>>>> +        bool test;
>>>>> +        /* Note - we do not loop on semaphore until it reaches
>>>>> +         * zero, but on pool size/remaining workers.
>>>>> +         * This is by design. If the inner loop can handle
>>>>> +         * completion for more than one worker within an iteration
>>>>> +         * it will do so to ensure no additional iterations and
>>>>> +         * waits once all of them are done.
>>>>> +         *
>>>>> +         * This may result in us having an initial positive value
>>>>> +         * of the semaphore when the pool is invoked the next time.
>>>>> +         * This is harmless - the loop will spin up a couple of times
>>>>> +         * doing nothing while the workers are processing their data
>>>>> +         * slices.
>>>>> +         */
>>>>> +        sem_wait(pool->done);
>>>>> +        for (index = 0; index < pool->size; index++) {
>>>>> +            test = true;
>>>>> +            /* If the worker has marked its data chunk as complete,
>>>>> +             * invoke the helper function to combine the results of
>>>>> +             * this worker into the main result.
>>>>> +             *
>>>>> +             * The worker must invoke an appropriate memory fence
>>>>> +             * (most likely acq_rel) to ensure that the main thread
>>>>> +             * sees all of the results produced by the worker.
>>>>> +             */
>>>>> +            if (atomic_compare_exchange_weak(
>>>>> +                    &pool->controls[index].finished,
>>>>> +                    &test,
>>>>> +                    false)) {
>>>>> +                if (helper_func) {
>>>>> +                    (helper_func)(pool, fin_result, result_frags, index);
>>>>> +                }
>>>>> +                completed++;
>>>>> +                pool->controls[index].data = NULL;
>>>>> +            }
>>>>> +        }
>>>>> +    } while (completed < pool->size);
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool - basic, does not do results processing.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool(struct worker_pool *pool)
>>>>> +{
>>>>> +    run_pool_callback(pool, NULL, NULL, NULL);
>>>>> +}
>>>>> +
>>>>> +/* Brute force merge of a hashmap into another hashmap.
>>>>> + * Intended for use in parallel processing. The destination
>>>>> + * hashmap MUST be the same size as the one being merged.
>>>>> + *
>>>>> + * This can be achieved by pre-allocating them to correct size
>>>>> + * and using hmap_insert_fast() instead of hmap_insert()
>>>>> + */
>>>>> +
>>>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>>>>> +{
>>>>> +    size_t i;
>>>>> +
>>>>> +    ovs_assert(inc->mask == dest->mask);
>>>>> +
>>>>> +    if (!inc->n) {
>>>>> +        /* Request to merge an empty frag, nothing to do */
>>>>> +        return;
>>>>> +    }
>>>>> +
>>>>> +    for (i = 0; i <= dest->mask; i++) {
>>>>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>>>>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>>>>> +        if (*inc_bucket != NULL) {
>>>>> +            struct hmap_node *last_node = *inc_bucket;
>>>>> +            while (last_node->next != NULL) {
>>>>> +                last_node = last_node->next;
>>>>> +            }
>>>>> +            last_node->next = *dest_bucket;
>>>>> +            *dest_bucket = *inc_bucket;
>>>>> +            *inc_bucket = NULL;
>>>>> +        }
>>>>> +    }
>>>>> +    dest->n += inc->n;
>>>>> +    inc->n = 0;
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool which gathers results in an array
>>>>> + * of hashes. Merge results.
>>>>> + */
>>>>> +
>>>>> +
>>>>> +void ovn_run_pool_hash(
>>>>> +        struct worker_pool *pool,
>>>>> +        struct hmap *result,
>>>>> +        struct hmap *result_frags)
>>>>> +{
>>>>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool which gathers results in an array of lists.
>>>>> + * Merge results.
>>>>> + */
>>>>> +void ovn_run_pool_list(
>>>>> +        struct worker_pool *pool,
>>>>> +        struct ovs_list *result,
>>>>> +        struct ovs_list *result_frags)
>>>>> +{
>>>>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
>>>>> +}
>>>>> +
>>>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>>>>> +{
>>>>> +    int i;
>>>>> +    if (hrl->mask != lflows->mask) {
>>>>> +        if (hrl->row_locks) {
>>>>> +            free(hrl->row_locks);
>>>>> +        }
>>>>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
>>>>> +        hrl->mask = lflows->mask;
>>>>> +        for (i = 0; i <= lflows->mask; i++) {
>>>>> +            ovs_mutex_init(&hrl->row_locks[i]);
>>>>> +        }
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>>>>> +    int i;
>>>>> +    static struct worker_pool *pool;
>>>>> +    char sem_name[256];
>>>>> +
>>>>> +    workers_must_exit = true;
>>>>> +
>>>>> +    /* All workers must honour the must_exit flag and check for it regularly.
>>>>> +     * We can make it atomic and check it via atomics in workers, but that
>>>>> +     * is not really necessary as it is set just once - when the program
>>>>> +     * terminates. So we use a fence which is invoked before exiting instead.
>>>>> +     */
>>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>>> +
>>>>> +    /* Wake up the workers after the must_exit flag has been set */
>>>>> +
>>>>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>>>>> +        for (i = 0; i < pool->size ; i++) {
>>>>> +            sem_post(pool->controls[i].fire);
>>>>> +        }
>>>>> +        for (i = 0; i < pool->size ; i++) {
>>>>> +            sem_close(pool->controls[i].fire);
>>>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>>>>> +            sem_unlink(sem_name);
>>>>> +        }
>>>>> +        sem_close(pool->done);
>>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>>>>> +        sem_unlink(sem_name);
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +static void setup_worker_pools(bool force) {
>>>>> +    int cores, nodes;
>>>>> +
>>>>> +    nodes = ovs_numa_get_n_numas();
>>>>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>>>>> +        nodes = 1;
>>>>> +    }
>>>>> +    cores = ovs_numa_get_n_cores();
>>>>> +
>>>>> +    /* If there is no NUMA config, use 4 cores.
>>>>> +     * If there is NUMA config use half the cores on
>>>>> +     * one node so that the OS does not start pushing
>>>>> +     * threads to other nodes.
>>>>> +     */
>>>>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>>>>> +        /* If there is no NUMA we can try the ovs-threads routine.
>>>>> +         * It falls back to sysconf and/or affinity mask.
>>>>> +         */
>>>>> +        cores = count_cpu_cores();
>>>>> +        pool_size = cores;
>>>>> +    } else {
>>>>> +        pool_size = cores / nodes;
>>>>> +    }
>>>>> +    if ((pool_size < 4) && force) {
>>>>> +        pool_size = 4;
>>>>> +    }
>>>>> +    can_parallelize = (pool_size >= 3);
>>>>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>>>>> +    sembase = random_uint32();
>>>>> +}
>>>>> +
>>>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index)
>>>>> +{
>>>>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
>>>>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>>>>> +
>>>>> +    if (!ovs_list_is_empty(&res_frags[index])) {
>>>>> +        ovs_list_splice(result->next,
>>>>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index)
>>>>> +{
>>>>> +    struct hmap *result = (struct hmap *)fin_result;
>>>>> +    struct hmap *res_frags = (struct hmap *)result_frags;
>>>>> +
>>>>> +    fast_hmap_merge(result, &res_frags[index]);
>>>>> +    hmap_destroy(&res_frags[index]);
>>>>> +}
>>>>> +
>>>>> +#endif
>>>>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>>>>> new file mode 100644
>>>>> index 000000000..71ad17fb0
>>>>> --- /dev/null
>>>>> +++ b/lib/ovn-parallel-hmap.h
>>>>> @@ -0,0 +1,285 @@
>>>>> +/*
>>>>> + * Copyright (c) 2020 Red Hat, Inc.
>>>>> + *
>>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>>> + * you may not use this file except in compliance with the License.
>>>>> + * You may obtain a copy of the License at:
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +
>>>>> +#ifndef OVN_PARALLEL_HMAP
>>>>> +#define OVN_PARALLEL_HMAP 1
>>>>> +
>>>>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>>>>> + * we skip over the ovn specific definitions.
>>>>> + */
>>>>> +
>>>>> +#ifdef  __cplusplus
>>>>> +extern "C" {
>>>>> +#endif
>>>>> +
>>>>> +#include <stdbool.h>
>>>>> +#include <stdlib.h>
>>>>> +#include <semaphore.h>
>>>>> +#include "openvswitch/util.h"
>>>>> +#include "openvswitch/hmap.h"
>>>>> +#include "openvswitch/thread.h"
>>>>> +#include "ovs-atomic.h"
>>>>> +
>>>>> +/* Process this include only if OVS does not supply parallel definitions
>>>>> + */
>>>>> +
>>>>> +#ifdef OVS_HAS_PARALLEL_HMAP
>>>>> +
>>>>> +#include "parallel-hmap.h"
>>>>> +
>>>>> +#else
>>>>> +
>>>>> +
>>>>> +#ifdef __clang__
>>>>> +#pragma clang diagnostic push
>>>>> +#pragma clang diagnostic ignored "-Wthread-safety"
>>>>> +#endif
>>>> I think you missed addressing my comment I provided in v13 to
>>>> add some comments on why this is required.
>>>>
>>>>
>>>>> +
>>>>> +
>>>>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>>>>> + * of parallel processing.
>>>>> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
>>>>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>>>>> + * ThreadID + step * 2, etc. The actual macro accepts
>>>>> + * ThreadID + step * i as the JOBID parameter.
>>>>> + */
>>>>> +
>>>>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>>>>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
>>>>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>>>>> +       || ((NODE = NULL), false); \
>>>>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
>>>>> +
>>>>> +/* We do not have a SAFE version of the macro, because the hash size is not
>>>>> + * atomic and hash removal operations would need to be wrapped with
>>>>> + * locks. This will defeat most of the benefits from doing anything in
>>>>> + * parallel.
>>>>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
>>>>> + * each thread should store them in a temporary list result instead, merging
>>>>> + * the lists into a combined result at the end */
>>>>> +
>>>>> +/* Work "Handle" */
>>>>> +
>>>>> +struct worker_control {
>>>>> +    int id; /* Used as a modulo when iterating over a hash. */
>>>>> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
>>>>> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
>>>>> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
>>>>> +    struct ovs_mutex mutex; /* Guards the data. */
>>>>> +    void *data; /* Pointer to data to be processed. */
>>>>> +    void *workload; /* back-pointer to the worker pool structure. */
>>>>> +};
>>>>> +
>>>>> +struct worker_pool {
>>>>> +    int size;   /* Number of threads in the pool. */
>>>>> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>>>>> +    struct worker_control *controls; /* "Handles" in this pool. */
>>>>> +    sem_t *done; /* Work completion semaphorew. */
>>>>> +};
>>>>> +
>>>>> +/* Add a worker pool for thread function start() which expects a pointer to
>>>>> + * a worker_control structure as an argument. */
>>>>> +
>>>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>>>>> +
>>>>> +/* Setting this to true will make all processing threads exit */
>>>>> +
>>>>> +bool ovn_stop_parallel_processing(void);
>>>>> +
>>>>> +/* Build a hmap pre-sized for size elements */
>>>>> +
>>>>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>>>>> +
>>>>> +/* Build a hmap with a mask equals to size */
>>>>> +
>>>>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>>>>> +
>>>>> +/* Brute-force merge a hmap into hmap.
>>>>> + * Dest and inc have to have the same mask. The merge is performed
>>>>> + * by extending the element list for bucket N in the dest hmap with the list
>>>>> + * from bucket N in inc.
>>>>> + */
>>>>> +
>>>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>>>>> +
>>>>> +/* Run a pool, without any default processing of results.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool(struct worker_pool *pool);
>>>>> +
>>>>> +/* Run a pool, merge results from hash frags into a final hash result.
>>>>> + * The hash frags must be pre-sized to the same size.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_hash(struct worker_pool *pool,
>>>>> +                       struct hmap *result, struct hmap *result_frags);
>>>>> +/* Run a pool, merge results from list frags into a final list result.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_list(struct worker_pool *pool,
>>>>> +                       struct ovs_list *result, struct ovs_list *result_frags);
>>>>> +
>>>>> +/* Run a pool, call a callback function to perform processing of results.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>>>>> +                    void *result_frags,
>>>>> +                    void (*helper_func)(struct worker_pool *pool,
>>>>> +                        void *fin_result, void *result_frags, int index));
>>>>> +
>>>>> +
>>>>> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>>>>> + * would land, or a null pointer if that bucket is empty. */
>>>>> +
>>>>> +static inline struct hmap_node *
>>>>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>>>>> +{
>>>>> +    return hmap->buckets[num];
>>>>> +}
>>>>> +
>>>>> +static inline struct hmap_node *
>>>>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
>>>>> +{
>>>>> +    size_t i;
>>>>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>>>>> +        struct hmap_node *node = hmap->buckets[i];
>>>>> +        if (node) {
>>>>> +            return node;
>>>>> +        }
>>>>> +    }
>>>>> +    return NULL;
>>>>> +}
>>>>> +
>>>>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>>>>> + * for parallel processing in arbitrary order, or a null pointer if
>>>>> + * the slice of 'hmap' for that job_id is empty. */
>>>>> +static inline struct hmap_node *
>>>>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
>>>>> +{
>>>>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>>>>> +}
>>>>> +
>>>>> +/* Returns the next node in the slice of 'hmap' following 'node',
>>>>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
>>>>> + * the 'hmap' slice.
>>>>> + *
>>>>> + */
>>>>> +static inline struct hmap_node *
>>>>> +parallel_hmap_next(const struct hmap *hmap,
>>>>> +                   const struct hmap_node *node, ssize_t pool_size)
>>>>> +{
>>>>> +    return (node->next
>>>>> +            ? node->next
>>>>> +            : parallel_hmap_next__(hmap,
>>>>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>>>>> +}
>>>>> +
>>>>> +static inline void post_completed_work(struct worker_control *control)
>>>>> +{
>>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>>> +    atomic_store_relaxed(&control->finished, true);
>>>>> +    sem_post(control->done);
>>>>> +}
>>>>> +
>>>>> +static inline void wait_for_work(struct worker_control *control)
>>>>> +{
>>>>> +    sem_wait(control->fire);
>>>>> +}
>>>>> +
>>>>> +/* Hash per-row locking support - to be used only in conjunction
>>>>> + * with fast hash inserts. Normal hash inserts may resize the hash
>>>>> + * rendering the locking invalid.
>>>>> + */
>>>>> +
>>>>> +struct hashrow_locks {
>>>>> +    ssize_t mask;
>>>>> +    struct ovs_mutex *row_locks;
>>>>> +};
>>>>> +
>>>>> +/* Update an hash row locks structure to match the current hash size */
>>>>> +
>>>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
>>>>> +
>>>>> +/* Lock a hash row */
>>>>> +
>>>>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>>>> +{
>>>>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>>>>> +}
>>>>> +
>>>>> +/* Unlock a hash row */
>>>>> +
>>>>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>>>> +{
>>>>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>>>>> +}
>>>>> +/* Init the row locks structure */
>>>>> +
>>>>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>>>>> +{
>>>>> +    hrl->mask = 0;
>>>>> +    hrl->row_locks = NULL;
>>>>> +}
>>>>> +
>>>>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>>>>> +
>>>>> +/* Use the OVN library functions for stuff which OVS has not defined
>>>>> + * If OVS has defined these, they will still compile using the OVN
>>>>> + * local names, but will be dropped by the linker in favour of the OVS
>>>>> + * supplied functions.
>>>>> + */
>>>>> +
>>>>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>>>>> +
>>>>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>>>>> +
>>>>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>>>>> +
>>>>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>>>>> +
>>>>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
>>>>> +
>>>>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>>>>> +
>>>>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>>>>> +
>>>>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>>>>> +
>>>>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>>>>> +
>>>>> +#define run_pool_hash(pool, result, result_frags) \
>>>>> +    ovn_run_pool_hash(pool, result, result_frags)
>>>>> +
>>>>> +#define run_pool_list(pool, result, result_frags) \
>>>>> +    ovn_run_pool_list(pool, result, result_frags)
>>>>> +
>>>>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
>>>>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>>>>> +
>>>>> +
>>>> In my opinion, we can switch over to making use of OVS APIs for fast_hmap
>>>> once the patches in OVS are merged.  Until then I think we should just assume
>>>> that these functions are part of OVN lib and consume them directly.
>>>>
>>>> It's possible that the function names could change when those patches
>>>> land in OVS.
>>>>
>>>> Thanks
>>>> Numan
>>>>
>>>>> +
>>>>> +#ifdef __clang__
>>>>> +#pragma clang diagnostic pop
>>>>> +#endif
>>>>> +
>>>>> +#endif
>>>>> +
>>>>> +#ifdef  __cplusplus
>>>>> +}
>>>>> +#endif
>>>>> +
>>>>> +
>>>>> +#endif /* lib/fasthmap.h */
>>>>> --
>>>>> 2.20.1
>>>>>
>>>>> _______________________________________________
>>>>> dev mailing list
>>>>> dev@openvswitch.org
>>>>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>>>>
>> --
>> Anton R. Ivanov
>> Cambridgegreys Limited. Registered in England. Company Number 10273661
>> https://www.cambridgegreys.com/
>>
>> _______________________________________________
>> dev mailing list
>> dev@openvswitch.org
>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Ilya Maximets Feb. 25, 2021, 3:18 p.m. UTC | #9
On 2/12/21 3:49 PM, Anton Ivanov wrote:
> This adds a set of functions and macros intended to process
> hashes in parallel.
> 
> The principles of operation are documented in the fasthmap.h
> 
> If these one day go into the OVS tree, the OVS tree versions
> would be used in preference.
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Hi, Anton.  Not a full review, just one comment below.

> +static inline void wait_for_work(struct worker_control *control)
> +{
> +    sem_wait(control->fire);
> +}

Shouldn't we check the result of sem_wait?  If it failed with
EINTR we likely need to call it again, otherwise worker thread
might start working on the incomplete or uninitialized data
and crash.

We may also add assertion for EINVAL just to catch it earlier.

Best regards, Ilya Maximets.
Anton Ivanov Feb. 25, 2021, 4:03 p.m. UTC | #10
Good one.

I will add handling for that.

I think we can just restart the call if it is it an EINTR.

A.

On 25/02/2021 15:18, Ilya Maximets wrote:
> On 2/12/21 3:49 PM, Anton Ivanov wrote:
>> This adds a set of functions and macros intended to process
>> hashes in parallel.
>>
>> The principles of operation are documented in the fasthmap.h
>>
>> If these one day go into the OVS tree, the OVS tree versions
>> would be used in preference.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> Hi, Anton.  Not a full review, just one comment below.
>
>> +static inline void wait_for_work(struct worker_control *control)
>> +{
>> +    sem_wait(control->fire);
>> +}
> Shouldn't we check the result of sem_wait?  If it failed with
> EINTR we likely need to call it again, otherwise worker thread
> might start working on the incomplete or uninitialized data
> and crash.
>
> We may also add assertion for EINVAL just to catch it earlier.
>
> Best regards, Ilya Maximets.
>
Numan Siddique Feb. 25, 2021, 5:46 p.m. UTC | #11
On Thu, Feb 25, 2021 at 9:33 PM Anton Ivanov
<anton.ivanov@cambridgegreys.com> wrote:
>
> Good one.
>
> I will add handling for that.
>
> I think we can just restart the call if it is it an EINTR.

The patches need rebase. Can you please rebase to the latest master
and post them
after addressing the review comments from me and Ilya.

I will try to run it on my other machine.

Thanks
Numan

>
> A.
>
> On 25/02/2021 15:18, Ilya Maximets wrote:
> > On 2/12/21 3:49 PM, Anton Ivanov wrote:
> >> This adds a set of functions and macros intended to process
> >> hashes in parallel.
> >>
> >> The principles of operation are documented in the fasthmap.h
> >>
> >> If these one day go into the OVS tree, the OVS tree versions
> >> would be used in preference.
> >>
> >> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> > Hi, Anton.  Not a full review, just one comment below.
> >
> >> +static inline void wait_for_work(struct worker_control *control)
> >> +{
> >> +    sem_wait(control->fire);
> >> +}
> > Shouldn't we check the result of sem_wait?  If it failed with
> > EINTR we likely need to call it again, otherwise worker thread
> > might start working on the incomplete or uninitialized data
> > and crash.
> >
> > We may also add assertion for EINVAL just to catch it earlier.
> >
> > Best regards, Ilya Maximets.
> >
> --
> Anton R. Ivanov
> Cambridgegreys Limited. Registered in England. Company Number 10273661
> https://www.cambridgegreys.com/
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 250c7aefa..781be2109 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -13,6 +13,8 @@  lib_libovn_la_SOURCES = \
 	lib/expr.c \
 	lib/extend-table.h \
 	lib/extend-table.c \
+	lib/ovn-parallel-hmap.h \
+	lib/ovn-parallel-hmap.c \
 	lib/ip-mcast-index.c \
 	lib/ip-mcast-index.h \
 	lib/mcast-group-index.c \
diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
new file mode 100644
index 000000000..06aa95aba
--- /dev/null
+++ b/lib/ovn-parallel-hmap.c
@@ -0,0 +1,455 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+#include <string.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <semaphore.h>
+#include "fatal-signal.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovn-parallel-hmap.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "ovs-numa.h"
+#include "random.h"
+
+VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
+
+#ifndef OVS_HAS_PARALLEL_HMAP
+
+#define WORKER_SEM_NAME "%x-%p-%x"
+#define MAIN_SEM_NAME "%x-%p-main"
+
+/* These are accessed under mutex inside add_worker_pool().
+ * They do not need to be atomic.
+ */
+
+static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
+static bool can_parallelize = false;
+
+/* This is set only in the process of exit and the set is
+ * accompanied by a fence. It does not need to be atomic or be
+ * accessed under a lock.
+ */
+
+static bool workers_must_exit = false;
+
+static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static int pool_size;
+
+static int sembase;
+
+static void worker_pool_hook(void *aux OVS_UNUSED);
+static void setup_worker_pools(bool force);
+static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
+                               void *fin_result, void *result_frags,
+                               int index);
+static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
+                               void *fin_result, void *result_frags,
+                               int index);
+
+bool ovn_stop_parallel_processing(void)
+{
+    return workers_must_exit;
+}
+
+bool ovn_can_parallelize_hashes(bool force_parallel)
+{
+    bool test = false;
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools(force_parallel);
+        ovs_mutex_unlock(&init_mutex);
+    }
+    return can_parallelize;
+}
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
+
+    struct worker_pool *new_pool = NULL;
+    struct worker_control *new_control;
+    bool test = false;
+    int i;
+    char sem_name[256];
+
+
+    /* Belt and braces - initialize the pool system just in case if
+     * if it is not yet initialized.
+     */
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools(false);
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    ovs_mutex_lock(&init_mutex);
+    if (can_parallelize) {
+        new_pool = xmalloc(sizeof(struct worker_pool));
+        new_pool->size = pool_size;
+        new_pool->controls = NULL;
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
+        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+        if (new_pool->done == SEM_FAILED) {
+            goto cleanup;
+        }
+
+        new_pool->controls =
+            xmalloc(sizeof(struct worker_control) * new_pool->size);
+
+        for (i = 0; i < new_pool->size; i++) {
+            new_control = &new_pool->controls[i];
+            new_control->id = i;
+            new_control->done = new_pool->done;
+            new_control->data = NULL;
+            ovs_mutex_init(&new_control->mutex);
+            new_control->finished = ATOMIC_VAR_INIT(false);
+            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
+            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+            if (new_control->fire == SEM_FAILED) {
+                goto cleanup;
+            }
+        }
+
+        for (i = 0; i < pool_size; i++) {
+            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+        }
+        ovs_list_push_back(&worker_pools, &new_pool->list_node);
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return new_pool;
+cleanup:
+
+    /* Something went wrong when opening semaphores. In this case
+     * it is better to shut off parallel procesing altogether
+     */
+
+    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
+    can_parallelize = false;
+    if (new_pool->controls) {
+        for (i = 0; i < new_pool->size; i++) {
+            if (new_pool->controls[i].fire != SEM_FAILED) {
+                sem_close(new_pool->controls[i].fire);
+                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
+                sem_unlink(sem_name);
+                break; /* semaphores past this one are uninitialized */
+            }
+        }
+    }
+    if (new_pool->done != SEM_FAILED) {
+        sem_close(new_pool->done);
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
+        sem_unlink(sem_name);
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return NULL;
+}
+
+
+/* Initializes 'hmap' as an empty hash table with mask N. */
+void
+ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
+{
+    size_t i;
+
+    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
+    hmap->one = NULL;
+    hmap->mask = mask;
+    hmap->n = 0;
+    for (i = 0; i <= hmap->mask; i++) {
+        hmap->buckets[i] = NULL;
+    }
+}
+
+/* Initializes 'hmap' as an empty hash table of size X.
+ * Intended for use in parallel processing so that all
+ * fragments used to store results in a parallel job
+ * are the same size.
+ */
+void
+ovn_fast_hmap_size_for(struct hmap *hmap, int size)
+{
+    size_t mask;
+    mask = size / 2;
+    mask |= mask >> 1;
+    mask |= mask >> 2;
+    mask |= mask >> 4;
+    mask |= mask >> 8;
+    mask |= mask >> 16;
+#if SIZE_MAX > UINT32_MAX
+    mask |= mask >> 32;
+#endif
+
+    /* If we need to dynamically allocate buckets we might as well allocate at
+     * least 4 of them. */
+    mask |= (mask & 1) << 1;
+
+    fast_hmap_init(hmap, mask);
+}
+
+/* Run a thread pool which uses a callback function to process results
+ */
+
+void ovn_run_pool_callback(struct worker_pool *pool,
+                           void *fin_result, void *result_frags,
+                           void (*helper_func)(struct worker_pool *pool,
+                                               void *fin_result,
+                                               void *result_frags, int index))
+{
+    int index, completed;
+
+    /* Ensure that all worker threads see the same data as the
+     * main thread.
+     */
+
+    atomic_thread_fence(memory_order_acq_rel);
+
+    /* Start workers */
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        /* Note - we do not loop on semaphore until it reaches
+         * zero, but on pool size/remaining workers.
+         * This is by design. If the inner loop can handle
+         * completion for more than one worker within an iteration
+         * it will do so to ensure no additional iterations and
+         * waits once all of them are done.
+         *
+         * This may result in us having an initial positive value
+         * of the semaphore when the pool is invoked the next time.
+         * This is harmless - the loop will spin up a couple of times
+         * doing nothing while the workers are processing their data
+         * slices.
+         */
+        sem_wait(pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            /* If the worker has marked its data chunk as complete,
+             * invoke the helper function to combine the results of
+             * this worker into the main result.
+             *
+             * The worker must invoke an appropriate memory fence
+             * (most likely acq_rel) to ensure that the main thread
+             * sees all of the results produced by the worker.
+             */
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                if (helper_func) {
+                    (helper_func)(pool, fin_result, result_frags, index);
+                }
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Run a thread pool - basic, does not do results processing.
+ */
+
+void ovn_run_pool(struct worker_pool *pool)
+{
+    run_pool_callback(pool, NULL, NULL, NULL);
+}
+
+/* Brute force merge of a hashmap into another hashmap.
+ * Intended for use in parallel processing. The destination
+ * hashmap MUST be the same size as the one being merged.
+ *
+ * This can be achieved by pre-allocating them to correct size
+ * and using hmap_insert_fast() instead of hmap_insert()
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
+{
+    size_t i;
+
+    ovs_assert(inc->mask == dest->mask);
+
+    if (!inc->n) {
+        /* Request to merge an empty frag, nothing to do */
+        return;
+    }
+
+    for (i = 0; i <= dest->mask; i++) {
+        struct hmap_node **dest_bucket = &dest->buckets[i];
+        struct hmap_node **inc_bucket = &inc->buckets[i];
+        if (*inc_bucket != NULL) {
+            struct hmap_node *last_node = *inc_bucket;
+            while (last_node->next != NULL) {
+                last_node = last_node->next;
+            }
+            last_node->next = *dest_bucket;
+            *dest_bucket = *inc_bucket;
+            *inc_bucket = NULL;
+        }
+    }
+    dest->n += inc->n;
+    inc->n = 0;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+
+
+void ovn_run_pool_hash(
+        struct worker_pool *pool,
+        struct hmap *result,
+        struct hmap *result_frags)
+{
+    run_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+void ovn_run_pool_list(
+        struct worker_pool *pool,
+        struct ovs_list *result,
+        struct ovs_list *result_frags)
+{
+    run_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
+void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
+{
+    int i;
+    if (hrl->mask != lflows->mask) {
+        if (hrl->row_locks) {
+            free(hrl->row_locks);
+        }
+        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
+        hrl->mask = lflows->mask;
+        for (i = 0; i <= lflows->mask; i++) {
+            ovs_mutex_init(&hrl->row_locks[i]);
+        }
+    }
+}
+
+static void worker_pool_hook(void *aux OVS_UNUSED) {
+    int i;
+    static struct worker_pool *pool;
+    char sem_name[256];
+
+    workers_must_exit = true;
+
+    /* All workers must honour the must_exit flag and check for it regularly.
+     * We can make it atomic and check it via atomics in workers, but that
+     * is not really necessary as it is set just once - when the program
+     * terminates. So we use a fence which is invoked before exiting instead.
+     */
+    atomic_thread_fence(memory_order_acq_rel);
+
+    /* Wake up the workers after the must_exit flag has been set */
+
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        for (i = 0; i < pool->size ; i++) {
+            sem_post(pool->controls[i].fire);
+        }
+        for (i = 0; i < pool->size ; i++) {
+            sem_close(pool->controls[i].fire);
+            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+            sem_unlink(sem_name);
+        }
+        sem_close(pool->done);
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+        sem_unlink(sem_name);
+    }
+}
+
+static void setup_worker_pools(bool force) {
+    int cores, nodes;
+
+    nodes = ovs_numa_get_n_numas();
+    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+        nodes = 1;
+    }
+    cores = ovs_numa_get_n_cores();
+
+    /* If there is no NUMA config, use 4 cores.
+     * If there is NUMA config use half the cores on
+     * one node so that the OS does not start pushing
+     * threads to other nodes.
+     */
+    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+        /* If there is no NUMA we can try the ovs-threads routine.
+         * It falls back to sysconf and/or affinity mask.
+         */
+        cores = count_cpu_cores();
+        pool_size = cores;
+    } else {
+        pool_size = cores / nodes;
+    }
+    if ((pool_size < 4) && force) {
+        pool_size = 4;
+    } 
+    can_parallelize = (pool_size >= 3);
+    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
+    sembase = random_uint32();
+}
+
+static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
+                               void *fin_result, void *result_frags,
+                               int index)
+{
+    struct ovs_list *result = (struct ovs_list *)fin_result;
+    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
+
+    if (!ovs_list_is_empty(&res_frags[index])) {
+        ovs_list_splice(result->next,
+                ovs_list_front(&res_frags[index]), &res_frags[index]);
+    }
+}
+
+static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
+                               void *fin_result, void *result_frags,
+                               int index)
+{
+    struct hmap *result = (struct hmap *)fin_result;
+    struct hmap *res_frags = (struct hmap *)result_frags;
+
+    fast_hmap_merge(result, &res_frags[index]);
+    hmap_destroy(&res_frags[index]);
+}
+
+#endif
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
new file mode 100644
index 000000000..71ad17fb0
--- /dev/null
+++ b/lib/ovn-parallel-hmap.h
@@ -0,0 +1,285 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVN_PARALLEL_HMAP
+#define OVN_PARALLEL_HMAP 1
+
+/* if the parallel macros are defined by hmap.h or any other ovs define
+ * we skip over the ovn specific definitions.
+ */
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <semaphore.h>
+#include "openvswitch/util.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovs-atomic.h"
+
+/* Process this include only if OVS does not supply parallel definitions
+ */
+
+#ifdef OVS_HAS_PARALLEL_HMAP
+
+#include "parallel-hmap.h"
+
+#else
+
+
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wthread-safety"
+#endif
+
+
+/* A version of the HMAP_FOR_EACH macro intended for iterating as part
+ * of parallel processing.
+ * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
+ * and will iterate hash buckets ThreadID, ThreadID + step,
+ * ThreadID + step * 2, etc. The actual macro accepts
+ * ThreadID + step * i as the JOBID parameter.
+ */
+
+#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
+   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
+        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
+       || ((NODE = NULL), false); \
+       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
+
+/* We do not have a SAFE version of the macro, because the hash size is not
+ * atomic and hash removal operations would need to be wrapped with
+ * locks. This will defeat most of the benefits from doing anything in
+ * parallel.
+ * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
+ * each thread should store them in a temporary list result instead, merging
+ * the lists into a combined result at the end */
+
+/* Work "Handle" */
+
+struct worker_control {
+    int id; /* Used as a modulo when iterating over a hash. */
+    atomic_bool finished; /* Set to true after achunk of work is complete. */
+    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
+    sem_t *done; /* Work completion semaphore - sem_post on completion. */
+    struct ovs_mutex mutex; /* Guards the data. */
+    void *data; /* Pointer to data to be processed. */
+    void *workload; /* back-pointer to the worker pool structure. */
+};
+
+struct worker_pool {
+    int size;   /* Number of threads in the pool. */
+    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
+    struct worker_control *controls; /* "Handles" in this pool. */
+    sem_t *done; /* Work completion semaphorew. */
+};
+
+/* Add a worker pool for thread function start() which expects a pointer to
+ * a worker_control structure as an argument. */
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+
+/* Setting this to true will make all processing threads exit */
+
+bool ovn_stop_parallel_processing(void);
+
+/* Build a hmap pre-sized for size elements */
+
+void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
+
+/* Build a hmap with a mask equals to size */
+
+void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
+
+/* Brute-force merge a hmap into hmap.
+ * Dest and inc have to have the same mask. The merge is performed
+ * by extending the element list for bucket N in the dest hmap with the list
+ * from bucket N in inc.
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
+
+/* Run a pool, without any default processing of results.
+ */
+
+void ovn_run_pool(struct worker_pool *pool);
+
+/* Run a pool, merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_run_pool_hash(struct worker_pool *pool,
+                       struct hmap *result, struct hmap *result_frags);
+/* Run a pool, merge results from list frags into a final list result.
+ */
+
+void ovn_run_pool_list(struct worker_pool *pool,
+                       struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Run a pool, call a callback function to perform processing of results.
+ */
+
+void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
+                    void *result_frags,
+                    void (*helper_func)(struct worker_pool *pool,
+                        void *fin_result, void *result_frags, int index));
+
+
+/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
+ * would land, or a null pointer if that bucket is empty. */
+
+static inline struct hmap_node *
+hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
+{
+    return hmap->buckets[num];
+}
+
+static inline struct hmap_node *
+parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
+{
+    size_t i;
+    for (i = start; i <= hmap->mask; i+= pool_size) {
+        struct hmap_node *node = hmap->buckets[i];
+        if (node) {
+            return node;
+        }
+    }
+    return NULL;
+}
+
+/* Returns the first node in 'hmap', as expected by thread with job_id
+ * for parallel processing in arbitrary order, or a null pointer if
+ * the slice of 'hmap' for that job_id is empty. */
+static inline struct hmap_node *
+parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
+{
+    return parallel_hmap_next__(hmap, job_id, pool_size);
+}
+
+/* Returns the next node in the slice of 'hmap' following 'node',
+ * in arbitrary order, or a * null pointer if 'node' is the last node in
+ * the 'hmap' slice.
+ *
+ */
+static inline struct hmap_node *
+parallel_hmap_next(const struct hmap *hmap,
+                   const struct hmap_node *node, ssize_t pool_size)
+{
+    return (node->next
+            ? node->next
+            : parallel_hmap_next__(hmap,
+                (node->hash & hmap->mask) + pool_size, pool_size));
+}
+
+static inline void post_completed_work(struct worker_control *control)
+{
+    atomic_thread_fence(memory_order_acq_rel);
+    atomic_store_relaxed(&control->finished, true);
+    sem_post(control->done);
+}
+
+static inline void wait_for_work(struct worker_control *control)
+{
+    sem_wait(control->fire);
+}
+
+/* Hash per-row locking support - to be used only in conjunction
+ * with fast hash inserts. Normal hash inserts may resize the hash
+ * rendering the locking invalid.
+ */
+
+struct hashrow_locks {
+    ssize_t mask;
+    struct ovs_mutex *row_locks;
+};
+
+/* Update an hash row locks structure to match the current hash size */
+
+void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
+
+/* Lock a hash row */
+
+static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
+{
+    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
+}
+
+/* Unlock a hash row */
+
+static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
+{
+    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
+}
+/* Init the row locks structure */
+
+static inline void init_hash_row_locks(struct hashrow_locks *hrl)
+{
+    hrl->mask = 0;
+    hrl->row_locks = NULL;   
+}
+
+bool ovn_can_parallelize_hashes(bool force_parallel);
+
+/* Use the OVN library functions for stuff which OVS has not defined
+ * If OVS has defined these, they will still compile using the OVN
+ * local names, but will be dropped by the linker in favour of the OVS
+ * supplied functions.
+ */
+
+#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
+
+#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
+
+#define stop_parallel_processing() ovn_stop_parallel_processing()
+
+#define add_worker_pool(start) ovn_add_worker_pool(start)
+
+#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
+
+#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
+
+#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
+
+#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
+
+#define ovn_run_pool(pool) ovn_run_pool(pool)
+
+#define run_pool_hash(pool, result, result_frags) \
+    ovn_run_pool_hash(pool, result, result_frags)
+
+#define run_pool_list(pool, result, result_frags) \
+    ovn_run_pool_list(pool, result, result_frags)
+
+#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
+    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
+
+
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+#endif
+
+#ifdef  __cplusplus
+}
+#endif
+
+
+#endif /* lib/fasthmap.h */