diff mbox series

[ovs-dev,v11,1/4] ovn-libs: Add support for parallel processing

Message ID 20210114164155.939-1-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v11,1/4] ovn-libs: Add support for parallel processing | expand

Commit Message

Anton Ivanov Jan. 14, 2021, 4:41 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

This adds a set of functions and macros intended to process
hashes in parallel.

The principles of operation are documented in the fasthmap.h

If these one day go into the OVS tree, the OVS tree versions
would be used in preference.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/automake.mk |   2 +
 lib/fasthmap.c  | 326 ++++++++++++++++++++++++++++++++++++++++++++++++
 lib/fasthmap.h  | 206 ++++++++++++++++++++++++++++++
 3 files changed, 534 insertions(+)
 create mode 100644 lib/fasthmap.c
 create mode 100644 lib/fasthmap.h

Comments

Numan Siddique Jan. 15, 2021, 9:55 a.m. UTC | #1
On Thu, Jan 14, 2021 at 10:12 PM <anton.ivanov@cambridgegreys.com> wrote:

> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>
> This adds a set of functions and macros intended to process
> hashes in parallel.
>
> The principles of operation are documented in the fasthmap.h
>
> If these one day go into the OVS tree, the OVS tree versions
> would be used in preference.
>
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---
>

Hi Anton,

I tested v10 of your series and I found some interesting results.

---------
 ------------------------------------------------------------------------------------------------
| DP groups  |    Parallel hmaps | ovn_db_run time in sec |    No of lflows
 |
 ------------------------------------------------------------------------------------------------
|    NO          |    NO                    |        ~13.5
    |   1499325        |
 ------------------------------------------------------------------------------------------------
|    NO          |    YES                  |         ~9.7
     |   1499325        |
 ------------------------------------------------------------------------------------------------
|    YES        |    NO                    |        ~3.1
    |   123360          |
 ------------------------------------------------------------------------------------------------
|    YES        |     YES                 |         ~1.1
   |   123360          |
 ------------------------------------------------------------------------------------------------

I see huge improvement with dp groups enabled and parallel processing
enabled.

Did you see the same behavior ?

I ran the test on Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz with 64 cores
with ~180GB RAM.

Few small nit comments below

Thanks
Numan


 lib/automake.mk |   2 +
>  lib/fasthmap.c  | 326 ++++++++++++++++++++++++++++++++++++++++++++++++
>  lib/fasthmap.h  | 206 ++++++++++++++++++++++++++++++
>  3 files changed, 534 insertions(+)
>  create mode 100644 lib/fasthmap.c
>  create mode 100644 lib/fasthmap.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 250c7aefa..d7e4b20cf 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>         lib/expr.c \
>         lib/extend-table.h \
>         lib/extend-table.c \
> +       lib/fasthmap.h \
> +       lib/fasthmap.c \
>         lib/ip-mcast-index.c \
>         lib/ip-mcast-index.h \
>         lib/mcast-group-index.c \
> diff --git a/lib/fasthmap.c b/lib/fasthmap.c
> new file mode 100644
> index 000000000..2be93e6a8
> --- /dev/null
> +++ b/lib/fasthmap.c
> @@ -0,0 +1,326 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#include <semaphore.h>
> +#include "fatal-signal.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "fasthmap.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "ovs-numa.h"
> +
> +VLOG_DEFINE_THIS_MODULE(fasthmap);
> +
> +
> +/* These are accessed under mutex inside ovn_add_worker_pool().
> + * They do not need to be atomic.
> + */
> +
> +static bool worker_pool_setup = false;
> +static bool can_parallelize = false;
> +
> +/* This is set only in the process of exit and the set is
> + * accompanied by a fence. It does not need to be atomic or be
> + * accessed under a lock.
> + */
> +
> +static bool workers_must_exit = false;
> +
> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static int pool_size;
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> +    int i;
> +    static struct worker_pool *pool;
> +    workers_must_exit = true;
> +    /* All workers must honour the must_exit flag and check for it
> regularly.
> +     * We can make it atomic and check it via atomics in workers, but that
> +     * is not really necessary as it is set just once - when the program
> +     * terminates. So we use a fence which is invoked before exiting
> instead.
> +     */
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Wake up the workers after the must_exit flag has been set */
> +
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_post(&pool->controls[i].fire);
> +        }
> +    }
> +}
> +
> +static void setup_worker_pools(void) {
> +    int cores, nodes;
> +
> +    nodes = ovs_numa_get_n_numas();
> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> +        nodes = 1;
> +    }
> +    cores = ovs_numa_get_n_cores();
> +
> +    /* If there is no NUMA config, use 4 cores.
> +     * If there is NUMA config use half the cores on
> +     * one node so that the OS does not start pushing
> +     * threads to other nodes.
> +     */
> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> +        /* If there is no NUMA we can try the ovs-threads routine.
> +         * It falls back to sysconf and/or affinity mask.
> +         */
> +        cores = count_cpu_cores();
> +        pool_size = cores;
> +    } else {
> +        pool_size = cores / nodes;
> +    }
> +    if (pool_size > 16) {
> +        pool_size = 16;
> +    }
> +    can_parallelize = (pool_size >= 3);
> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> +}
> +
> +bool ovn_cease_fire(void)
>

This function name seems a bit odd to me ? Can it be renamed to a better
name ?
May be ovn_abort_pool() ?



> +{
> +    return workers_must_exit;
> +}
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
> +
> +    struct worker_pool *new_pool = NULL;
> +    struct worker_control *new_control;
> +    int i;
> +
> +    ovs_mutex_lock(&init_mutex);
> +
> +    if (!worker_pool_setup) {
> +        setup_worker_pools();
> +        worker_pool_setup = true;
> +    }
> +
> +    if (can_parallelize) {
> +        new_pool = xmalloc(sizeof(struct worker_pool));
> +        new_pool->size = pool_size;
> +        sem_init(&new_pool->done, 0, 0);
> +
> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
> +
> +        new_pool->controls =
> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
> +
> +        for (i = 0; i < new_pool->size; i++) {
> +            new_control = &new_pool->controls[i];
> +            sem_init(&new_control->fire, 0, 0);
> +            new_control->id = i;
> +            new_control->done = &new_pool->done;
> +            new_control->data = NULL;
> +            ovs_mutex_init(&new_control->mutex);
> +            new_control->finished = ATOMIC_VAR_INIT(false);
> +        }
> +
> +        for (i = 0; i < pool_size; i++) {
> +            ovs_thread_create("worker pool helper", start,
> &new_pool->controls[i]);
> +        }
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return new_pool;
> +}
> +
> +
> +/* Initializes 'hmap' as an empty hash table with mask N. */
> +void
> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> +{
> +    size_t i;
> +
> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> +    hmap->one = NULL;
> +    hmap->mask = mask;
> +    hmap->n = 0;
> +    for (i = 0; i <= hmap->mask; i++) {
> +        hmap->buckets[i] = NULL;
> +    }
> +}
> +
> +/* Initializes 'hmap' as an empty hash table of size X.
> + * Intended for use in parallel processing so that all
> + * fragments used to store results in a parallel job
> + * are the same size.
> + */
> +void
> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
> +{
> +    size_t mask;
> +    mask = size / 2;
> +    mask |= mask >> 1;
> +    mask |= mask >> 2;
> +    mask |= mask >> 4;
> +    mask |= mask >> 8;
> +    mask |= mask >> 16;
> +#if SIZE_MAX > UINT32_MAX
> +    mask |= mask >> 32;
> +#endif
> +
> +    /* If we need to dynamically allocate buckets we might as well
> allocate at
> +     * least 4 of them. */
> +    mask |= (mask & 1) << 1;
> +
> +    fast_hmap_init(hmap, mask);
> +}
> +
> +/* Run a thread pool which uses a callback function to process results
> + */
> +
> +void ovn_run_pool_callback(
> +        struct worker_pool *pool,
> +        void *fin_result,
> +        void *result_frags,
> +        void (*helper_func)(struct worker_pool *pool,
> +            void *fin_result, void *result_frags, int index))
>

Indentation seems a bit odd to me in many places. Can the function
parameters be aligned
properly like the rest of the existing code ?



> +{
> +    int index, completed;
> +
> +    /* Ensure that all worker threads see the same data as the
> +     * main thread.
> +     */
> +
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Start workers */
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(&pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        /* Note - we do not loop on semaphore until it reaches
> +         * zero, but on pool size/remaining workers.
> +         * This is by design. If the inner loop can handle
> +         * completion for more than one worker within an iteration
> +         * it will do so to ensure no additional iterations and
> +         * waits once all of them are done.
> +         *
> +         * This may result in us having an initial positive value
> +         * of the semaphore when the pool is invoked the next time.
> +         * This is harmless - the loop will spin up a couple of times
> +         * doing nothing while the workers are processing their data
> +         * slices.
> +         */
> +        sem_wait(&pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            /* If the worker has marked its data chunk as complete,
> +             * invoke the helper function to combine the results of
> +             * this worker into the main result.
> +             *
> +             * The worker must invoke an appropriate memory fence
> +             * (most likely acq_rel) to ensure that the main thread
> +             * sees all of the results produced by the worker.
> +             */
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                if (helper_func) {
> +                    (helper_func)(pool, fin_result, result_frags, index);
> +                }
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +/* Run a thread pool - basic, does not do results processing.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool)
> +{
> +    ovn_run_pool_callback(pool, NULL, NULL, NULL);
> +}
> +
> +/* Brute force merge of a hashmap into another hashmap.
> + * Intended for use in parallel processing. The destination
> + * hashmap MUST be the same size as the one being merged.
> + *
> + * This can be achieved by pre-allocating them to correct size
> + * and using hmap_insert_fast() instead of hmap_insert()
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> +{
> +    size_t i;
> +
> +    ovs_assert(inc->mask == dest->mask);
> +
> +    if (!inc->n) {
> +        /* Request to merge an empty frag, nothing to do */
> +        return;
> +    }
> +
> +    for (i = 0; i <= dest->mask; i++) {
> +        struct hmap_node **dest_bucket = &dest->buckets[i];
> +        struct hmap_node **inc_bucket = &inc->buckets[i];
> +        if (*inc_bucket != NULL) {
> +            struct hmap_node *last_node = *inc_bucket;
> +            while (last_node->next != NULL) {
> +                last_node = last_node->next;
> +            }
> +            last_node->next = *dest_bucket;
> +            *dest_bucket = *inc_bucket;
> +            *inc_bucket = NULL;
> +        }
> +    }
> +    dest->n += inc->n;
> +    inc->n = 0;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> +        void *fin_result, void *result_frags, int index)
>

Same here. Can you please move the fin_result param and align with the
first param in the first line ?

Like

static void
merge_hash_results(struct worker_pool *pool OVS_UNUSED,
                                  void *fin_result, void *result_frags, int
index)


> +{
> +    struct hmap *result = (struct hmap *)fin_result;
> +    struct hmap *res_frags = (struct hmap *)result_frags;
> +
> +    fast_hmap_merge(result, &res_frags[index]);
> +    hmap_destroy(&res_frags[index]);
> +}
> +
> +
> +void ovn_run_pool_hash(
> +        struct worker_pool *pool,
> +        struct hmap *result,
> +        struct hmap *result_frags)
> +{
> +    ovn_run_pool_callback(pool, result, result_frags, merge_hash_results);
> +}
> diff --git a/lib/fasthmap.h b/lib/fasthmap.h
> new file mode 100644
> index 000000000..2a28553d5
> --- /dev/null
> +++ b/lib/fasthmap.h
> @@ -0,0 +1,206 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVN_PARALLEL_HMAP
> +#define OVN_PARALLEL_HMAP 1
> +
> +/* Process this include only if OVS does not supply parallel definitions
> + */
> +
> +#ifndef OVS_HAS_PARALLEL_HMAP
> +
> +/* if the parallel macros are defined by hmap.h or any other ovs define
> + * we skip over the ovn specific definitions.
> + */
> +
> +#ifdef  __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdbool.h>
> +#include <stdlib.h>
> +#include <semaphore.h>
> +#include "openvswitch/util.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovs-atomic.h"
> +
> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> + * of parallel processing.
> + * Each worker thread has a different ThreadID in the range of
> 0..POOL_SIZE
> + * and will iterate hash buckets ThreadID, ThreadID + step,
> + * ThreadID + step * 2, etc. The actual macro accepts
> + * ThreadID + step * i as the JOBID parameter.
> + */
> +
> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID),
> MEMBER); \
> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> +       || ((NODE = NULL), false); \
> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER),
> MEMBER))
> +
> +/* We do not have a SAFE version of the macro, because the hash size is
> not
> + * atomic and hash removal operations would need to be wrapped with
> + * locks. This will defeat most of the benefits from doing anything in
> + * parallel.
> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> + * each thread should store them in a temporary list result instead,
> merging
> + * the lists into a combined result at the end */
> +
> +/* Work "Handle" */
> +
> +struct worker_control {
> +    int id; /* Used as a modulo when iterating over a hash. */
> +    atomic_bool finished; /* Set to true after achunk of work is
> complete. */
> +    sem_t fire; /* Work start semaphore - sem_post starts the worker. */
> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
> +    struct ovs_mutex mutex; /* Guards the data. */
> +    void *data; /* Pointer to data to be processed. */
> +    void *workload; /* back-pointer to the worker pool structure. */
> +};
> +
> +struct worker_pool {
> +    int size;   /* Number of threads in the pool. */
> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> +    struct worker_control *controls; /* "Handles" in this pool. */
> +    sem_t done; /* Work completion semaphorew. */
> +};
> +
> +/* Add a worker pool for thread function start() which expects a pointer
> to
> + * a worker_control structure as an argument. */
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +
> +/* Setting this to true will make all processing threads exit */
> +
> +bool ovn_cease_fire(void);
> +
> +/* Build a hmap pre-sized for size elements */
> +
> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
> +
> +/* Build a hmap with a mask equals to size */
> +
> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
> +
> +/* Brute-force merge a hmap into hmap.
> + * Dest and inc have to have the same mask. The merge is performed
> + * by extending the element list for bucket N in the dest hmap with the
> list
> + * from bucket N in inc.
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> +
> +/* Run a pool, without any default processing of results.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool);
> +
> +/* Run a pool, merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_run_pool_hash(struct worker_pool *pool,
> +                    struct hmap *result, struct hmap *result_frags);
> +
> +/* Run a pool, call a callback function to perform processing of results.
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> +                    void *result_frags,
> +                    void (*helper_func)(struct worker_pool *pool,
> +                        void *fin_result, void *result_frags, int index));
> +
> +
> +/* Returns the first node in 'hmap' in the bucket in which the given
> 'hash'
> + * would land, or a null pointer if that bucket is empty. */
> +
> +static inline struct hmap_node *
> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> +{
> +    return hmap->buckets[num];
> +}
> +
> +static inline struct hmap_node *
> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t
> pool_size)
> +{
> +    size_t i;
> +    for (i = start; i <= hmap->mask; i+= pool_size) {
> +        struct hmap_node *node = hmap->buckets[i];
> +        if (node) {
> +            return node;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Returns the first node in 'hmap', as expected by thread with job_id
> + * for parallel processing in arbitrary order, or a null pointer if
> + * the slice of 'hmap' for that job_id is empty. */
> +static inline struct hmap_node *
> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t
> pool_size)
> +{
> +    return parallel_hmap_next__(hmap, job_id, pool_size);
> +}
> +
> +/* Returns the next node in the slice of 'hmap' following 'node',
> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> + * the 'hmap' slice.
> + *
> + */
> +static inline struct hmap_node *
> +parallel_hmap_next(const struct hmap *hmap,
> +                        const struct hmap_node *node, ssize_t pool_size)
> +{
> +    return (node->next
> +            ? node->next
> +            : parallel_hmap_next__(hmap,
> +                (node->hash & hmap->mask) + pool_size, pool_size));
> +}
> +
> +/* Use the OVN library functions for stuff which OVS has not defined
> + * If OVS has defined these, they will still compile using the OVN
> + * local names, but will be dropped by the linker in favour of the OVS
> + * supplied functions.
> + */
> +
> +#define cease_fire() ovn_cease_fire()
> +
> +#define add_worker_pool(start) ovn_add_worker_pool(start)
> +
> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
> +
> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
> +
> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
> +
> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
> +
> +#define ovn_run_pool(pool) ovn_run_pool(pool)
> +
> +#define run_pool_hash(pool, result, result_frags) \
> +    ovn_run_pool_hash(pool, result, result_frags)
> +
> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
> +
> +#ifdef  __cplusplus
> +}
> +#endif
> +
> +#endif
> +
> +#endif /* lib/fasthmap.h */
> --
> 2.20.1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
>
Anton Ivanov Jan. 15, 2021, 10:20 a.m. UTC | #2
On 15/01/2021 09:55, Numan Siddique wrote:
>
>
> On Thu, Jan 14, 2021 at 10:12 PM <anton.ivanov@cambridgegreys.com 
> <mailto:anton.ivanov@cambridgegreys.com>> wrote:
>
>     From: Anton Ivanov <anton.ivanov@cambridgegreys.com
>     <mailto:anton.ivanov@cambridgegreys.com>>
>
>     This adds a set of functions and macros intended to process
>     hashes in parallel.
>
>     The principles of operation are documented in the fasthmap.h
>
>     If these one day go into the OVS tree, the OVS tree versions
>     would be used in preference.
>
>     Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com
>     <mailto:anton.ivanov@cambridgegreys.com>>
>     ---
>
>
> Hi Anton,
>
> I tested v10 of your series and I found some interesting results.
>
> ---------
>  ------------------------------------------------------------------------------------------------
> | DP groups  |    Parallel hmaps | ovn_db_run time in sec |  No of 
> lflows  |
>  ------------------------------------------------------------------------------------------------
> |    NO          |    NO                    |        ~13.5             
>       |   1499325        |
>  ------------------------------------------------------------------------------------------------
> |    NO          |    YES                  |         ~9.7             
>      |   1499325        |
>  ------------------------------------------------------------------------------------------------
> |    YES        |    NO                    |        ~3.1               
>     |   123360          |
>  ------------------------------------------------------------------------------------------------
> |    YES        |     YES                 |         ~1.1               
>    |   123360          |
>  ------------------------------------------------------------------------------------------------ 
>
>
> I see huge improvement with dp groups enabled and parallel processing 
> enabled.
>
> Did you see the same behavior ?

I see slightly better improvement for the No-Yes case. I get ~ x4 there.

My results form the YES - YES case are very similar. I get ~ x2 there.

I am working with much smaller flow numbers though. Approximately one 
order of magnitude less. In my set-up DP Groups delivers lower "flow 
compression" too - it is ~ x2 if memory serves me right.

>
> I ran the test on Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz with 64 
> cores with ~180GB RAM.


That is much more multi-threaded than my Ryzens. Mine are 6/12.

I have tested it starting from 4 cores (it has effect even there).

>
> Few small nit comments below
>
> Thanks
> Numan
>
>
>      lib/automake.mk <http://automake.mk> |  2 +
>      lib/fasthmap.c  | 326
>     ++++++++++++++++++++++++++++++++++++++++++++++++
>      lib/fasthmap.h  | 206 ++++++++++++++++++++++++++++++
>      3 files changed, 534 insertions(+)
>      create mode 100644 lib/fasthmap.c
>      create mode 100644 lib/fasthmap.h
>
>     diff --git a/lib/automake.mk <http://automake.mk>
>     b/lib/automake.mk <http://automake.mk>
>     index 250c7aefa..d7e4b20cf 100644
>     --- a/lib/automake.mk <http://automake.mk>
>     +++ b/lib/automake.mk <http://automake.mk>
>     @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>             lib/expr.c \
>             lib/extend-table.h \
>             lib/extend-table.c \
>     +       lib/fasthmap.h \
>     +       lib/fasthmap.c \
>             lib/ip-mcast-index.c \
>             lib/ip-mcast-index.h \
>             lib/mcast-group-index.c \
>     diff --git a/lib/fasthmap.c b/lib/fasthmap.c
>     new file mode 100644
>     index 000000000..2be93e6a8
>     --- /dev/null
>     +++ b/lib/fasthmap.c
>     @@ -0,0 +1,326 @@
>     +/*
>     + * Copyright (c) 2020 Red Hat, Inc.
>     + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira,
>     Inc.
>     + *
>     + * Licensed under the Apache License, Version 2.0 (the "License");
>     + * you may not use this file except in compliance with the License.
>     + * You may obtain a copy of the License at:
>     + *
>     + * http://www.apache.org/licenses/LICENSE-2.0
>     <http://www.apache.org/licenses/LICENSE-2.0>
>     + *
>     + * Unless required by applicable law or agreed to in writing,
>     software
>     + * distributed under the License is distributed on an "AS IS" BASIS,
>     + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
>     or implied.
>     + * See the License for the specific language governing
>     permissions and
>     + * limitations under the License.
>     + */
>     +
>     +#include <config.h>
>     +#include <stdint.h>
>     +#include <string.h>
>     +#include <stdlib.h>
>     +#include <unistd.h>
>     +#include <semaphore.h>
>     +#include "fatal-signal.h"
>     +#include "util.h"
>     +#include "openvswitch/vlog.h"
>     +#include "openvswitch/hmap.h"
>     +#include "openvswitch/thread.h"
>     +#include "fasthmap.h"
>     +#include "ovs-atomic.h"
>     +#include "ovs-thread.h"
>     +#include "ovs-numa.h"
>     +
>     +VLOG_DEFINE_THIS_MODULE(fasthmap);
>     +
>     +
>     +/* These are accessed under mutex inside ovn_add_worker_pool().
>     + * They do not need to be atomic.
>     + */
>     +
>     +static bool worker_pool_setup = false;
>     +static bool can_parallelize = false;
>     +
>     +/* This is set only in the process of exit and the set is
>     + * accompanied by a fence. It does not need to be atomic or be
>     + * accessed under a lock.
>     + */
>     +
>     +static bool workers_must_exit = false;
>     +
>     +static struct ovs_list worker_pools =
>     OVS_LIST_INITIALIZER(&worker_pools);
>     +
>     +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>     +
>     +static int pool_size;
>     +
>     +static void worker_pool_hook(void *aux OVS_UNUSED) {
>     +    int i;
>     +    static struct worker_pool *pool;
>     +    workers_must_exit = true;
>     +    /* All workers must honour the must_exit flag and check for
>     it regularly.
>     +     * We can make it atomic and check it via atomics in workers,
>     but that
>     +     * is not really necessary as it is set just once - when the
>     program
>     +     * terminates. So we use a fence which is invoked before
>     exiting instead.
>     +     */
>     +    atomic_thread_fence(memory_order_acq_rel);
>     +
>     +    /* Wake up the workers after the must_exit flag has been set */
>     +
>     +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>     +        for (i = 0; i < pool->size ; i++) {
>     +            sem_post(&pool->controls[i].fire);
>     +        }
>     +    }
>     +}
>     +
>     +static void setup_worker_pools(void) {
>     +    int cores, nodes;
>     +
>     +    nodes = ovs_numa_get_n_numas();
>     +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>     +        nodes = 1;
>     +    }
>     +    cores = ovs_numa_get_n_cores();
>     +
>     +    /* If there is no NUMA config, use 4 cores.
>     +     * If there is NUMA config use half the cores on
>     +     * one node so that the OS does not start pushing
>     +     * threads to other nodes.
>     +     */
>     +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>     +        /* If there is no NUMA we can try the ovs-threads routine.
>     +         * It falls back to sysconf and/or affinity mask.
>     +         */
>     +        cores = count_cpu_cores();
>     +        pool_size = cores;
>     +    } else {
>     +        pool_size = cores / nodes;
>     +    }
>     +    if (pool_size > 16) {
>     +        pool_size = 16;
>     +    }
>     +    can_parallelize = (pool_size >= 3);
>     +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>     +}
>     +
>     +bool ovn_cease_fire(void)
>
>
> This function name seems a bit odd to me ? Can it be renamed to a 
> better name ?
> May be ovn_abort_pool() ?

OK. I will remove the multiple rocket launcher joke :)

>
>     +{
>     +    return workers_must_exit;
>     +}
>     +
>     +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>     +
>     +    struct worker_pool *new_pool = NULL;
>     +    struct worker_control *new_control;
>     +    int i;
>     +
>     +    ovs_mutex_lock(&init_mutex);
>     +
>     +    if (!worker_pool_setup) {
>     +        setup_worker_pools();
>     +        worker_pool_setup = true;
>     +    }
>     +
>     +    if (can_parallelize) {
>     +        new_pool = xmalloc(sizeof(struct worker_pool));
>     +        new_pool->size = pool_size;
>     +        sem_init(&new_pool->done, 0, 0);
>     +
>     +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>     +
>     +        new_pool->controls =
>     +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>     +
>     +        for (i = 0; i < new_pool->size; i++) {
>     +            new_control = &new_pool->controls[i];
>     +            sem_init(&new_control->fire, 0, 0);
>     +            new_control->id = i;
>     +            new_control->done = &new_pool->done;
>     +            new_control->data = NULL;
>     +            ovs_mutex_init(&new_control->mutex);
>     +            new_control->finished = ATOMIC_VAR_INIT(false);
>     +        }
>     +
>     +        for (i = 0; i < pool_size; i++) {
>     +            ovs_thread_create("worker pool helper", start,
>     &new_pool->controls[i]);
>     +        }
>     +    }
>     +    ovs_mutex_unlock(&init_mutex);
>     +    return new_pool;
>     +}
>     +
>     +
>     +/* Initializes 'hmap' as an empty hash table with mask N. */
>     +void
>     +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>     +{
>     +    size_t i;
>     +
>     +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask +
>     1));
>     +    hmap->one = NULL;
>     +    hmap->mask = mask;
>     +    hmap->n = 0;
>     +    for (i = 0; i <= hmap->mask; i++) {
>     +        hmap->buckets[i] = NULL;
>     +    }
>     +}
>     +
>     +/* Initializes 'hmap' as an empty hash table of size X.
>     + * Intended for use in parallel processing so that all
>     + * fragments used to store results in a parallel job
>     + * are the same size.
>     + */
>     +void
>     +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>     +{
>     +    size_t mask;
>     +    mask = size / 2;
>     +    mask |= mask >> 1;
>     +    mask |= mask >> 2;
>     +    mask |= mask >> 4;
>     +    mask |= mask >> 8;
>     +    mask |= mask >> 16;
>     +#if SIZE_MAX > UINT32_MAX
>     +    mask |= mask >> 32;
>     +#endif
>     +
>     +    /* If we need to dynamically allocate buckets we might as
>     well allocate at
>     +     * least 4 of them. */
>     +    mask |= (mask & 1) << 1;
>     +
>     +    fast_hmap_init(hmap, mask);
>     +}
>     +
>     +/* Run a thread pool which uses a callback function to process
>     results
>     + */
>     +
>     +void ovn_run_pool_callback(
>     +        struct worker_pool *pool,
>     +        void *fin_result,
>     +        void *result_frags,
>     +        void (*helper_func)(struct worker_pool *pool,
>     +            void *fin_result, void *result_frags, int index))
>
>
> Indentation seems a bit odd to me in many places. Can the function 
> parameters be aligned
> properly like the rest of the existing code ?

They were, probably an artefact of them being renamed and me forgetting 
to fix the indentation after that.

Thanks for noticing. Will do so.

>
>     +{
>     +    int index, completed;
>     +
>     +    /* Ensure that all worker threads see the same data as the
>     +     * main thread.
>     +     */
>     +
>     +    atomic_thread_fence(memory_order_acq_rel);
>     +
>     +    /* Start workers */
>     +
>     +    for (index = 0; index < pool->size; index++) {
>     +        sem_post(&pool->controls[index].fire);
>     +    }
>     +
>     +    completed = 0;
>     +
>     +    do {
>     +        bool test;
>     +        /* Note - we do not loop on semaphore until it reaches
>     +         * zero, but on pool size/remaining workers.
>     +         * This is by design. If the inner loop can handle
>     +         * completion for more than one worker within an iteration
>     +         * it will do so to ensure no additional iterations and
>     +         * waits once all of them are done.
>     +         *
>     +         * This may result in us having an initial positive value
>     +         * of the semaphore when the pool is invoked the next time.
>     +         * This is harmless - the loop will spin up a couple of times
>     +         * doing nothing while the workers are processing their data
>     +         * slices.
>     +         */
>     +        sem_wait(&pool->done);
>     +        for (index = 0; index < pool->size; index++) {
>     +            test = true;
>     +            /* If the worker has marked its data chunk as complete,
>     +             * invoke the helper function to combine the results of
>     +             * this worker into the main result.
>     +             *
>     +             * The worker must invoke an appropriate memory fence
>     +             * (most likely acq_rel) to ensure that the main thread
>     +             * sees all of the results produced by the worker.
>     +             */
>     +            if (atomic_compare_exchange_weak(
>     +                    &pool->controls[index].finished,
>     +                    &test,
>     +                    false)) {
>     +                if (helper_func) {
>     +                    (helper_func)(pool, fin_result, result_frags,
>     index);
>     +                }
>     +                completed++;
>     +                pool->controls[index].data = NULL;
>     +            }
>     +        }
>     +    } while (completed < pool->size);
>     +}
>     +
>     +/* Run a thread pool - basic, does not do results processing.
>     + */
>     +
>     +void ovn_run_pool(struct worker_pool *pool)
>     +{
>     +    ovn_run_pool_callback(pool, NULL, NULL, NULL);
>     +}
>     +
>     +/* Brute force merge of a hashmap into another hashmap.
>     + * Intended for use in parallel processing. The destination
>     + * hashmap MUST be the same size as the one being merged.
>     + *
>     + * This can be achieved by pre-allocating them to correct size
>     + * and using hmap_insert_fast() instead of hmap_insert()
>     + */
>     +
>     +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>     +{
>     +    size_t i;
>     +
>     +    ovs_assert(inc->mask == dest->mask);
>     +
>     +    if (!inc->n) {
>     +        /* Request to merge an empty frag, nothing to do */
>     +        return;
>     +    }
>     +
>     +    for (i = 0; i <= dest->mask; i++) {
>     +        struct hmap_node **dest_bucket = &dest->buckets[i];
>     +        struct hmap_node **inc_bucket = &inc->buckets[i];
>     +        if (*inc_bucket != NULL) {
>     +            struct hmap_node *last_node = *inc_bucket;
>     +            while (last_node->next != NULL) {
>     +                last_node = last_node->next;
>     +            }
>     +            last_node->next = *dest_bucket;
>     +            *dest_bucket = *inc_bucket;
>     +            *inc_bucket = NULL;
>     +        }
>     +    }
>     +    dest->n += inc->n;
>     +    inc->n = 0;
>     +}
>     +
>     +/* Run a thread pool which gathers results in an array
>     + * of hashes. Merge results.
>     + */
>     +
>     +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>     +        void *fin_result, void *result_frags, int index)
>
>
> Same here. Can you please move the fin_result param and align with the 
> first param in the first line ?
>
> Like
>
> static void
> merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>                                   void *fin_result, void 
> *result_frags, int index)
>
>     +{
>     +    struct hmap *result = (struct hmap *)fin_result;
>     +    struct hmap *res_frags = (struct hmap *)result_frags;
>     +
>     +    fast_hmap_merge(result, &res_frags[index]);
>     +    hmap_destroy(&res_frags[index]);
>     +}
>     +
>     +
>     +void ovn_run_pool_hash(
>     +        struct worker_pool *pool,
>     +        struct hmap *result,
>     +        struct hmap *result_frags)
>     +{
>     +    ovn_run_pool_callback(pool, result, result_frags,
>     merge_hash_results);
>     +}
>     diff --git a/lib/fasthmap.h b/lib/fasthmap.h
>     new file mode 100644
>     index 000000000..2a28553d5
>     --- /dev/null
>     +++ b/lib/fasthmap.h
>     @@ -0,0 +1,206 @@
>     +/*
>     + * Copyright (c) 2020 Red Hat, Inc.
>     + *
>     + * Licensed under the Apache License, Version 2.0 (the "License");
>     + * you may not use this file except in compliance with the License.
>     + * You may obtain a copy of the License at:
>     + *
>     + * http://www.apache.org/licenses/LICENSE-2.0
>     <http://www.apache.org/licenses/LICENSE-2.0>
>     + *
>     + * Unless required by applicable law or agreed to in writing,
>     software
>     + * distributed under the License is distributed on an "AS IS" BASIS,
>     + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
>     or implied.
>     + * See the License for the specific language governing
>     permissions and
>     + * limitations under the License.
>     + */
>     +
>     +#ifndef OVN_PARALLEL_HMAP
>     +#define OVN_PARALLEL_HMAP 1
>     +
>     +/* Process this include only if OVS does not supply parallel
>     definitions
>     + */
>     +
>     +#ifndef OVS_HAS_PARALLEL_HMAP
>     +
>     +/* if the parallel macros are defined by hmap.h or any other ovs
>     define
>     + * we skip over the ovn specific definitions.
>     + */
>     +
>     +#ifdef  __cplusplus
>     +extern "C" {
>     +#endif
>     +
>     +#include <stdbool.h>
>     +#include <stdlib.h>
>     +#include <semaphore.h>
>     +#include "openvswitch/util.h"
>     +#include "openvswitch/hmap.h"
>     +#include "openvswitch/thread.h"
>     +#include "ovs-atomic.h"
>     +
>     +/* A version of the HMAP_FOR_EACH macro intended for iterating as
>     part
>     + * of parallel processing.
>     + * Each worker thread has a different ThreadID in the range of
>     0..POOL_SIZE
>     + * and will iterate hash buckets ThreadID, ThreadID + step,
>     + * ThreadID + step * 2, etc. The actual macro accepts
>     + * ThreadID + step * i as the JOBID parameter.
>     + */
>     +
>     +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>     +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP,
>     JOBID), MEMBER); \
>     +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>     +       || ((NODE = NULL), false); \
>     +       ASSIGN_CONTAINER(NODE,
>     hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
>     +
>     +/* We do not have a SAFE version of the macro, because the hash
>     size is not
>     + * atomic and hash removal operations would need to be wrapped with
>     + * locks. This will defeat most of the benefits from doing
>     anything in
>     + * parallel.
>     + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove
>     elements,
>     + * each thread should store them in a temporary list result
>     instead, merging
>     + * the lists into a combined result at the end */
>     +
>     +/* Work "Handle" */
>     +
>     +struct worker_control {
>     +    int id; /* Used as a modulo when iterating over a hash. */
>     +    atomic_bool finished; /* Set to true after achunk of work is
>     complete. */
>     +    sem_t fire; /* Work start semaphore - sem_post starts the
>     worker. */
>     +    sem_t *done; /* Work completion semaphore - sem_post on
>     completion. */
>     +    struct ovs_mutex mutex; /* Guards the data. */
>     +    void *data; /* Pointer to data to be processed. */
>     +    void *workload; /* back-pointer to the worker pool structure. */
>     +};
>     +
>     +struct worker_pool {
>     +    int size;   /* Number of threads in the pool. */
>     +    struct ovs_list list_node; /* List of pools - used in
>     cleanup/exit. */
>     +    struct worker_control *controls; /* "Handles" in this pool. */
>     +    sem_t done; /* Work completion semaphorew. */
>     +};
>     +
>     +/* Add a worker pool for thread function start() which expects a
>     pointer to
>     + * a worker_control structure as an argument. */
>     +
>     +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>     +
>     +/* Setting this to true will make all processing threads exit */
>     +
>     +bool ovn_cease_fire(void);
>     +
>     +/* Build a hmap pre-sized for size elements */
>     +
>     +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>     +
>     +/* Build a hmap with a mask equals to size */
>     +
>     +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>     +
>     +/* Brute-force merge a hmap into hmap.
>     + * Dest and inc have to have the same mask. The merge is performed
>     + * by extending the element list for bucket N in the dest hmap
>     with the list
>     + * from bucket N in inc.
>     + */
>     +
>     +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>     +
>     +/* Run a pool, without any default processing of results.
>     + */
>     +
>     +void ovn_run_pool(struct worker_pool *pool);
>     +
>     +/* Run a pool, merge results from hash frags into a final hash
>     result.
>     + * The hash frags must be pre-sized to the same size.
>     + */
>     +
>     +void ovn_run_pool_hash(struct worker_pool *pool,
>     +                    struct hmap *result, struct hmap *result_frags);
>     +
>     +/* Run a pool, call a callback function to perform processing of
>     results.
>     + */
>     +
>     +void ovn_run_pool_callback(struct worker_pool *pool, void
>     *fin_result,
>     +                    void *result_frags,
>     +                    void (*helper_func)(struct worker_pool *pool,
>     +                        void *fin_result, void *result_frags, int
>     index));
>     +
>     +
>     +/* Returns the first node in 'hmap' in the bucket in which the
>     given 'hash'
>     + * would land, or a null pointer if that bucket is empty. */
>     +
>     +static inline struct hmap_node *
>     +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>     +{
>     +    return hmap->buckets[num];
>     +}
>     +
>     +static inline struct hmap_node *
>     +parallel_hmap_next__(const struct hmap *hmap, size_t start,
>     size_t pool_size)
>     +{
>     +    size_t i;
>     +    for (i = start; i <= hmap->mask; i+= pool_size) {
>     +        struct hmap_node *node = hmap->buckets[i];
>     +        if (node) {
>     +            return node;
>     +        }
>     +    }
>     +    return NULL;
>     +}
>     +
>     +/* Returns the first node in 'hmap', as expected by thread with
>     job_id
>     + * for parallel processing in arbitrary order, or a null pointer if
>     + * the slice of 'hmap' for that job_id is empty. */
>     +static inline struct hmap_node *
>     +parallel_hmap_first(const struct hmap *hmap, size_t job_id,
>     size_t pool_size)
>     +{
>     +    return parallel_hmap_next__(hmap, job_id, pool_size);
>     +}
>     +
>     +/* Returns the next node in the slice of 'hmap' following 'node',
>     + * in arbitrary order, or a * null pointer if 'node' is the last
>     node in
>     + * the 'hmap' slice.
>     + *
>     + */
>     +static inline struct hmap_node *
>     +parallel_hmap_next(const struct hmap *hmap,
>     +                        const struct hmap_node *node, ssize_t
>     pool_size)
>     +{
>     +    return (node->next
>     +            ? node->next
>     +            : parallel_hmap_next__(hmap,
>     +                (node->hash & hmap->mask) + pool_size, pool_size));
>     +}
>     +
>     +/* Use the OVN library functions for stuff which OVS has not defined
>     + * If OVS has defined these, they will still compile using the OVN
>     + * local names, but will be dropped by the linker in favour of
>     the OVS
>     + * supplied functions.
>     + */
>     +
>     +#define cease_fire() ovn_cease_fire()
>     +
>     +#define add_worker_pool(start) ovn_add_worker_pool(start)
>     +
>     +#define fast_hmap_size_for(hmap, size)
>     ovn_fast_hmap_size_for(hmap, size)
>     +
>     +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>     +
>     +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>     +
>     +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>     +
>     +#define ovn_run_pool(pool) ovn_run_pool(pool)
>     +
>     +#define run_pool_hash(pool, result, result_frags) \
>     +    ovn_run_pool_hash(pool, result, result_frags)
>     +
>     +#define run_pool_callback(pool, fin_result, result_frags,
>     helper_func) \
>     +    ovn_run_pool_callback(pool, fin_result, result_frags,
>     helper_func)
>     +
>     +#ifdef  __cplusplus
>     +}
>     +#endif
>     +
>     +#endif
>     +
>     +#endif /* lib/fasthmap.h */
>     -- 
>     2.20.1
>
>     _______________________________________________
>     dev mailing list
>     dev@openvswitch.org <mailto:dev@openvswitch.org>
>     https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>     <https://mail.openvswitch.org/mailman/listinfo/ovs-dev>
>
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 250c7aefa..d7e4b20cf 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -13,6 +13,8 @@  lib_libovn_la_SOURCES = \
 	lib/expr.c \
 	lib/extend-table.h \
 	lib/extend-table.c \
+	lib/fasthmap.h \
+	lib/fasthmap.c \
 	lib/ip-mcast-index.c \
 	lib/ip-mcast-index.h \
 	lib/mcast-group-index.c \
diff --git a/lib/fasthmap.c b/lib/fasthmap.c
new file mode 100644
index 000000000..2be93e6a8
--- /dev/null
+++ b/lib/fasthmap.c
@@ -0,0 +1,326 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <semaphore.h>
+#include "fatal-signal.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "fasthmap.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "ovs-numa.h"
+
+VLOG_DEFINE_THIS_MODULE(fasthmap);
+
+
+/* These are accessed under mutex inside ovn_add_worker_pool().
+ * They do not need to be atomic.
+ */
+
+static bool worker_pool_setup = false;
+static bool can_parallelize = false;
+
+/* This is set only in the process of exit and the set is
+ * accompanied by a fence. It does not need to be atomic or be
+ * accessed under a lock.
+ */
+
+static bool workers_must_exit = false;
+
+static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static int pool_size;
+
+static void worker_pool_hook(void *aux OVS_UNUSED) {
+    int i;
+    static struct worker_pool *pool;
+    workers_must_exit = true;
+    /* All workers must honour the must_exit flag and check for it regularly.
+     * We can make it atomic and check it via atomics in workers, but that
+     * is not really necessary as it is set just once - when the program
+     * terminates. So we use a fence which is invoked before exiting instead.
+     */
+    atomic_thread_fence(memory_order_acq_rel);
+
+    /* Wake up the workers after the must_exit flag has been set */
+
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        for (i = 0; i < pool->size ; i++) {
+            sem_post(&pool->controls[i].fire);
+        }
+    }
+}
+
+static void setup_worker_pools(void) {
+    int cores, nodes;
+
+    nodes = ovs_numa_get_n_numas();
+    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+        nodes = 1;
+    }
+    cores = ovs_numa_get_n_cores();
+
+    /* If there is no NUMA config, use 4 cores.
+     * If there is NUMA config use half the cores on
+     * one node so that the OS does not start pushing
+     * threads to other nodes.
+     */
+    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+        /* If there is no NUMA we can try the ovs-threads routine.
+         * It falls back to sysconf and/or affinity mask.
+         */
+        cores = count_cpu_cores();
+        pool_size = cores;
+    } else {
+        pool_size = cores / nodes;
+    }
+    if (pool_size > 16) {
+        pool_size = 16;
+    }
+    can_parallelize = (pool_size >= 3);
+    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
+}
+
+bool ovn_cease_fire(void)
+{
+    return workers_must_exit;
+}
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
+
+    struct worker_pool *new_pool = NULL;
+    struct worker_control *new_control;
+    int i;
+
+    ovs_mutex_lock(&init_mutex);
+
+    if (!worker_pool_setup) {
+        setup_worker_pools();
+        worker_pool_setup = true;
+    }
+
+    if (can_parallelize) {
+        new_pool = xmalloc(sizeof(struct worker_pool));
+        new_pool->size = pool_size;
+        sem_init(&new_pool->done, 0, 0);
+
+        ovs_list_push_back(&worker_pools, &new_pool->list_node);
+
+        new_pool->controls =
+            xmalloc(sizeof(struct worker_control) * new_pool->size);
+
+        for (i = 0; i < new_pool->size; i++) {
+            new_control = &new_pool->controls[i];
+            sem_init(&new_control->fire, 0, 0);
+            new_control->id = i;
+            new_control->done = &new_pool->done;
+            new_control->data = NULL;
+            ovs_mutex_init(&new_control->mutex);
+            new_control->finished = ATOMIC_VAR_INIT(false);
+        }
+
+        for (i = 0; i < pool_size; i++) {
+            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+        }
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return new_pool;
+}
+
+
+/* Initializes 'hmap' as an empty hash table with mask N. */
+void
+ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
+{
+    size_t i;
+
+    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
+    hmap->one = NULL;
+    hmap->mask = mask;
+    hmap->n = 0;
+    for (i = 0; i <= hmap->mask; i++) {
+        hmap->buckets[i] = NULL;
+    }
+}
+
+/* Initializes 'hmap' as an empty hash table of size X.
+ * Intended for use in parallel processing so that all
+ * fragments used to store results in a parallel job
+ * are the same size.
+ */
+void
+ovn_fast_hmap_size_for(struct hmap *hmap, int size)
+{
+    size_t mask;
+    mask = size / 2;
+    mask |= mask >> 1;
+    mask |= mask >> 2;
+    mask |= mask >> 4;
+    mask |= mask >> 8;
+    mask |= mask >> 16;
+#if SIZE_MAX > UINT32_MAX
+    mask |= mask >> 32;
+#endif
+
+    /* If we need to dynamically allocate buckets we might as well allocate at
+     * least 4 of them. */
+    mask |= (mask & 1) << 1;
+
+    fast_hmap_init(hmap, mask);
+}
+
+/* Run a thread pool which uses a callback function to process results
+ */
+
+void ovn_run_pool_callback(
+        struct worker_pool *pool,
+        void *fin_result,
+        void *result_frags,
+        void (*helper_func)(struct worker_pool *pool,
+            void *fin_result, void *result_frags, int index))
+{
+    int index, completed;
+
+    /* Ensure that all worker threads see the same data as the
+     * main thread.
+     */
+
+    atomic_thread_fence(memory_order_acq_rel);
+
+    /* Start workers */
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        /* Note - we do not loop on semaphore until it reaches
+         * zero, but on pool size/remaining workers.
+         * This is by design. If the inner loop can handle
+         * completion for more than one worker within an iteration
+         * it will do so to ensure no additional iterations and
+         * waits once all of them are done.
+         *
+         * This may result in us having an initial positive value
+         * of the semaphore when the pool is invoked the next time.
+         * This is harmless - the loop will spin up a couple of times
+         * doing nothing while the workers are processing their data
+         * slices.
+         */
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            /* If the worker has marked its data chunk as complete,
+             * invoke the helper function to combine the results of
+             * this worker into the main result.
+             *
+             * The worker must invoke an appropriate memory fence
+             * (most likely acq_rel) to ensure that the main thread
+             * sees all of the results produced by the worker.
+             */
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                if (helper_func) {
+                    (helper_func)(pool, fin_result, result_frags, index);
+                }
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Run a thread pool - basic, does not do results processing.
+ */
+
+void ovn_run_pool(struct worker_pool *pool)
+{
+    ovn_run_pool_callback(pool, NULL, NULL, NULL);
+}
+
+/* Brute force merge of a hashmap into another hashmap.
+ * Intended for use in parallel processing. The destination
+ * hashmap MUST be the same size as the one being merged.
+ *
+ * This can be achieved by pre-allocating them to correct size
+ * and using hmap_insert_fast() instead of hmap_insert()
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
+{
+    size_t i;
+
+    ovs_assert(inc->mask == dest->mask);
+
+    if (!inc->n) {
+        /* Request to merge an empty frag, nothing to do */
+        return;
+    }
+
+    for (i = 0; i <= dest->mask; i++) {
+        struct hmap_node **dest_bucket = &dest->buckets[i];
+        struct hmap_node **inc_bucket = &inc->buckets[i];
+        if (*inc_bucket != NULL) {
+            struct hmap_node *last_node = *inc_bucket;
+            while (last_node->next != NULL) {
+                last_node = last_node->next;
+            }
+            last_node->next = *dest_bucket;
+            *dest_bucket = *inc_bucket;
+            *inc_bucket = NULL;
+        }
+    }
+    dest->n += inc->n;
+    inc->n = 0;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+
+static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
+        void *fin_result, void *result_frags, int index)
+{
+    struct hmap *result = (struct hmap *)fin_result;
+    struct hmap *res_frags = (struct hmap *)result_frags;
+
+    fast_hmap_merge(result, &res_frags[index]);
+    hmap_destroy(&res_frags[index]);
+}
+
+
+void ovn_run_pool_hash(
+        struct worker_pool *pool,
+        struct hmap *result,
+        struct hmap *result_frags)
+{
+    ovn_run_pool_callback(pool, result, result_frags, merge_hash_results);
+}
diff --git a/lib/fasthmap.h b/lib/fasthmap.h
new file mode 100644
index 000000000..2a28553d5
--- /dev/null
+++ b/lib/fasthmap.h
@@ -0,0 +1,206 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVN_PARALLEL_HMAP
+#define OVN_PARALLEL_HMAP 1
+
+/* Process this include only if OVS does not supply parallel definitions
+ */
+
+#ifndef OVS_HAS_PARALLEL_HMAP
+
+/* if the parallel macros are defined by hmap.h or any other ovs define
+ * we skip over the ovn specific definitions.
+ */
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <semaphore.h>
+#include "openvswitch/util.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovs-atomic.h"
+
+/* A version of the HMAP_FOR_EACH macro intended for iterating as part
+ * of parallel processing.
+ * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
+ * and will iterate hash buckets ThreadID, ThreadID + step,
+ * ThreadID + step * 2, etc. The actual macro accepts
+ * ThreadID + step * i as the JOBID parameter.
+ */
+
+#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
+   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
+        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
+       || ((NODE = NULL), false); \
+       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
+
+/* We do not have a SAFE version of the macro, because the hash size is not
+ * atomic and hash removal operations would need to be wrapped with
+ * locks. This will defeat most of the benefits from doing anything in
+ * parallel.
+ * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
+ * each thread should store them in a temporary list result instead, merging
+ * the lists into a combined result at the end */
+
+/* Work "Handle" */
+
+struct worker_control {
+    int id; /* Used as a modulo when iterating over a hash. */
+    atomic_bool finished; /* Set to true after achunk of work is complete. */
+    sem_t fire; /* Work start semaphore - sem_post starts the worker. */
+    sem_t *done; /* Work completion semaphore - sem_post on completion. */
+    struct ovs_mutex mutex; /* Guards the data. */
+    void *data; /* Pointer to data to be processed. */
+    void *workload; /* back-pointer to the worker pool structure. */
+};
+
+struct worker_pool {
+    int size;   /* Number of threads in the pool. */
+    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
+    struct worker_control *controls; /* "Handles" in this pool. */
+    sem_t done; /* Work completion semaphorew. */
+};
+
+/* Add a worker pool for thread function start() which expects a pointer to
+ * a worker_control structure as an argument. */
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+
+/* Setting this to true will make all processing threads exit */
+
+bool ovn_cease_fire(void);
+
+/* Build a hmap pre-sized for size elements */
+
+void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
+
+/* Build a hmap with a mask equals to size */
+
+void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
+
+/* Brute-force merge a hmap into hmap.
+ * Dest and inc have to have the same mask. The merge is performed
+ * by extending the element list for bucket N in the dest hmap with the list
+ * from bucket N in inc.
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
+
+/* Run a pool, without any default processing of results.
+ */
+
+void ovn_run_pool(struct worker_pool *pool);
+
+/* Run a pool, merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_run_pool_hash(struct worker_pool *pool,
+                    struct hmap *result, struct hmap *result_frags);
+
+/* Run a pool, call a callback function to perform processing of results.
+ */
+
+void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
+                    void *result_frags,
+                    void (*helper_func)(struct worker_pool *pool,
+                        void *fin_result, void *result_frags, int index));
+
+
+/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
+ * would land, or a null pointer if that bucket is empty. */
+
+static inline struct hmap_node *
+hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
+{
+    return hmap->buckets[num];
+}
+
+static inline struct hmap_node *
+parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
+{
+    size_t i;
+    for (i = start; i <= hmap->mask; i+= pool_size) {
+        struct hmap_node *node = hmap->buckets[i];
+        if (node) {
+            return node;
+        }
+    }
+    return NULL;
+}
+
+/* Returns the first node in 'hmap', as expected by thread with job_id
+ * for parallel processing in arbitrary order, or a null pointer if
+ * the slice of 'hmap' for that job_id is empty. */
+static inline struct hmap_node *
+parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
+{
+    return parallel_hmap_next__(hmap, job_id, pool_size);
+}
+
+/* Returns the next node in the slice of 'hmap' following 'node',
+ * in arbitrary order, or a * null pointer if 'node' is the last node in
+ * the 'hmap' slice.
+ *
+ */
+static inline struct hmap_node *
+parallel_hmap_next(const struct hmap *hmap,
+                        const struct hmap_node *node, ssize_t pool_size)
+{
+    return (node->next
+            ? node->next
+            : parallel_hmap_next__(hmap,
+                (node->hash & hmap->mask) + pool_size, pool_size));
+}
+
+/* Use the OVN library functions for stuff which OVS has not defined
+ * If OVS has defined these, they will still compile using the OVN
+ * local names, but will be dropped by the linker in favour of the OVS
+ * supplied functions.
+ */
+
+#define cease_fire() ovn_cease_fire()
+
+#define add_worker_pool(start) ovn_add_worker_pool(start)
+
+#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
+
+#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
+
+#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
+
+#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
+
+#define ovn_run_pool(pool) ovn_run_pool(pool)
+
+#define run_pool_hash(pool, result, result_frags) \
+    ovn_run_pool_hash(pool, result, result_frags)
+
+#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
+    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
+
+#ifdef  __cplusplus
+}
+#endif
+
+#endif
+
+#endif /* lib/fasthmap.h */