diff mbox series

[ovs-dev,1/2] Add support for parallel processing of hashes

Message ID 20210129133741.27211-1-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,1/2] Add support for parallel processing of hashes | expand

Commit Message

Anton Ivanov Jan. 29, 2021, 1:37 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Adds functionality needed to walk a hash in parallel where thread ID
N out of a pool sized M is responsible for processing all elements
in buckets N, N+M, N+M*2, etc

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 include/openvswitch/hmap.h |   2 +
 lib/automake.mk            |   2 +
 lib/parallel-hmap.c        | 410 +++++++++++++++++++++++++++++++++++++
 lib/parallel-hmap.h        | 238 +++++++++++++++++++++
 4 files changed, 652 insertions(+)
 create mode 100644 lib/parallel-hmap.c
 create mode 100644 lib/parallel-hmap.h

Comments

0-day Robot Jan. 29, 2021, 2:42 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line is 83 characters long (recommended limit is 79)
#240 FILE: lib/parallel-hmap.c:189:
            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);

ERROR: Inappropriate spacing around cast
#406 FILE: lib/parallel-hmap.c:355:
    struct hmap *result = (struct hmap *)fin_result;

ERROR: Inappropriate spacing around cast
#407 FILE: lib/parallel-hmap.c:356:
    struct hmap *res_frags = (struct hmap *)result_frags;

ERROR: Inappropriate spacing around cast
#430 FILE: lib/parallel-hmap.c:379:
    struct ovs_list *result = (struct ovs_list *)fin_result;

ERROR: Inappropriate spacing around cast
#431 FILE: lib/parallel-hmap.c:380:
    struct ovs_list *res_frags = (struct ovs_list *)result_frags;

ERROR: Improper whitespace around control block
#515 FILE: lib/parallel-hmap.h:48:
#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \

WARNING: Line is 80 characters long (recommended limit is 79)
#597 FILE: lib/parallel-hmap.h:130:
                                           void *fin_result, void *result_frags,

WARNING: Line has trailing whitespace
#691 FILE: lib/parallel-hmap.h:224:
    hrl->row_locks = NULL;   

Lines checked: 708, Warnings: 3, Errors: 5


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Numan Siddique Feb. 10, 2021, 3:57 p.m. UTC | #2
On Fri, Jan 29, 2021 at 7:08 PM <anton.ivanov@cambridgegreys.com> wrote:
>
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>
> Adds functionality needed to walk a hash in parallel where thread ID
> N out of a pool sized M is responsible for processing all elements
> in buckets N, N+M, N+M*2, etc
>
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Hi Anton,

Please see below for a few comments.

Thanks
Numan

> ---
>  include/openvswitch/hmap.h |   2 +
>  lib/automake.mk            |   2 +
>  lib/parallel-hmap.c        | 410 +++++++++++++++++++++++++++++++++++++
>  lib/parallel-hmap.h        | 238 +++++++++++++++++++++
>  4 files changed, 652 insertions(+)
>  create mode 100644 lib/parallel-hmap.c
>  create mode 100644 lib/parallel-hmap.h
>
> diff --git a/include/openvswitch/hmap.h b/include/openvswitch/hmap.h
> index 4e001cc69..6aed568e1 100644
> --- a/include/openvswitch/hmap.h
> +++ b/include/openvswitch/hmap.h
> @@ -17,6 +17,8 @@
>  #ifndef HMAP_H
>  #define HMAP_H 1
>
> +#define OVS_HAS_PARALLEL_HMAP 1
> +
>  #include <stdbool.h>
>  #include <stdlib.h>
>  #include "openvswitch/util.h"
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 39afbff9d..99f7bb8d4 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -122,6 +122,8 @@ lib_libopenvswitch_la_SOURCES = \
>         lib/dynamic-string.c \
>         lib/entropy.c \
>         lib/entropy.h \
> +       lib/parallel-hmap.h \
> +       lib/parallel-hmap.c \
>         lib/fat-rwlock.c \
>         lib/fat-rwlock.h \
>         lib/fatal-signal.c \
> diff --git a/lib/parallel-hmap.c b/lib/parallel-hmap.c
> new file mode 100644
> index 000000000..a923a6142
> --- /dev/null
> +++ b/lib/parallel-hmap.c
> @@ -0,0 +1,410 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#include <semaphore.h>
> +#include <fcntl.h>
> +#include "fatal-signal.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "hmapx.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "ovs-numa.h"
> +#include "random.h"
> +#include "parallel-hmap.h"
> +
> +VLOG_DEFINE_THIS_MODULE(parallel_hmap);
> +
> +#define WORKER_SEM_NAME "%x-%p-%x"
> +#define MAIN_SEM_NAME "%x-%p-main"
> +
> +
> +/* These are accessed under mutex inside add_worker_pool().
> + * They do not need to be atomic.
> + */
> +
> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> +static bool can_parallelize = false;
> +
> +/* This is set only in the process of exit and the set is
> + * accompanied by a fence. It does not need to be atomic or be
> + * accessed under a lock.
> + */
> +
> +static bool workers_must_exit = false;
> +
> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static int pool_size;
> +
> +static int sembase;
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> +    int i;
> +    static struct worker_pool *pool;
> +    char sem_name[256];
> +
> +    workers_must_exit = true;
> +
> +    /* All workers must honour the must_exit flag and check for it regularly.
> +     * We can make it atomic and check it via atomics in workers, but that
> +     * is not really necessary as it is set just once - when the program
> +     * terminates. So we use a fence which is invoked before exiting instead.
> +     */
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Wake up the workers after the must_exit flag has been set */
> +
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_post(pool->controls[i].fire);
> +        }
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_close(pool->controls[i].fire);
> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +            sem_unlink(sem_name);
> +        }
> +        sem_close(pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +        sem_unlink(sem_name);
> +    }
> +}
> +
> +static void setup_worker_pools(void) {
> +    int cores, nodes;
> +
> +    nodes = ovs_numa_get_n_numas();
> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> +        nodes = 1;
> +    }
> +    cores = ovs_numa_get_n_cores();
> +
> +    /* If there is no NUMA config, use 4 cores.
> +     * If there is NUMA config use half the cores on
> +     * one node so that the OS does not start pushing
> +     * threads to other nodes.
> +     */
> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> +        /* If there is no NUMA we can try the ovs-threads routine.
> +         * It falls back to sysconf and/or affinity mask.
> +         */
> +        cores = count_cpu_cores();
> +        pool_size = cores;
> +    } else {
> +        pool_size = cores / nodes;
> +    }
> +    if (pool_size > 16) {
> +        pool_size = 16;
> +    }
> +    can_parallelize = (pool_size >= 3);
> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> +    sembase = random_uint32();
> +}
> +
> +bool stop_parallel_processing(void)
> +{
> +    return workers_must_exit;
> +}
> +
> +bool can_parallelize_hashes(void)
> +{
> +    bool test = false;
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools();
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +    return can_parallelize;
> +}

Can you please define the public functions first and then the static
functions. That is the recommended way in the coding guidelines
 - https://github.com/ovn-org/ovn/blob/master/Documentation/internals/contributing/coding-style.rst
 (section Functions)

I would suggest having a function -  ovn_fast_processing_init(bool
force_parallelize) which does the job of
'can_parallelize_hashes()'.

If force_parallelize is true, then the function would set
can_parallelize to true and set the pool size to a default value  (may
be 4)
Feel free to change the function name to a more appropriate one if you prefer.

A user of the library can then call can_parallelize_hashes() to know
if parallelization can be enabled or not.

I would also suggest to prefix the name of the public functions with
ovn_fastp to indicate that they are related to hmap parallelization.

Like add_worker_pool() can be ovn_fastp_setup_worker_pool.



> +
> +struct worker_pool *add_worker_pool(void *(*start)(void *)){
> +
> +    struct worker_pool *new_pool = NULL;
> +    struct worker_control *new_control;
> +    bool test = false;
> +    int i;
> +    char sem_name[256];
> +
> +
> +    if (atomic_compare_exchange_strong(
> +            &initial_pool_setup,
> +            &test,
> +            true)) {
> +        ovs_mutex_lock(&init_mutex);
> +        setup_worker_pools();
> +        ovs_mutex_unlock(&init_mutex);
> +    }
> +
> +    ovs_mutex_lock(&init_mutex);
> +    if (can_parallelize) {
> +        new_pool = xmalloc(sizeof(struct worker_pool));
> +        new_pool->size = pool_size;
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +
> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
> +
> +        new_pool->controls =
> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
> +
> +        for (i = 0; i < new_pool->size; i++) {
> +            new_control = &new_pool->controls[i];
> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +            new_control->id = i;
> +            new_control->done = new_pool->done;
> +            new_control->data = NULL;
> +            ovs_mutex_init(&new_control->mutex);
> +            new_control->finished = ATOMIC_VAR_INIT(false);
> +        }
> +
> +        for (i = 0; i < pool_size; i++) {
> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> +        }
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return new_pool;
> +}
> +
> +
> +/* Initializes 'hmap' as an empty hash table with mask N. */
> +void
> +fast_hmap_init(struct hmap *hmap, ssize_t mask)
> +{
> +    size_t i;
> +
> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> +    hmap->one = NULL;
> +    hmap->mask = mask;
> +    hmap->n = 0;
> +    for (i = 0; i <= hmap->mask; i++) {
> +        hmap->buckets[i] = NULL;
> +    }
> +}
> +
> +/* Initializes 'hmap' as an empty hash table of size X.
> + * Intended for use in parallel processing so that all
> + * fragments used to store results in a parallel job
> + * are the same size.
> + */
> +void
> +fast_hmap_size_for(struct hmap *hmap, int size)
> +{
> +    size_t mask;
> +    mask = size / 2;
> +    mask |= mask >> 1;
> +    mask |= mask >> 2;
> +    mask |= mask >> 4;
> +    mask |= mask >> 8;
> +    mask |= mask >> 16;
> +#if SIZE_MAX > UINT32_MAX
> +    mask |= mask >> 32;
> +#endif
> +
> +    /* If we need to dynamically allocate buckets we might as well allocate at
> +     * least 4 of them. */
> +    mask |= (mask & 1) << 1;
> +
> +    fast_hmap_init(hmap, mask);
> +}
> +
> +/* Run a thread pool which uses a callback function to process results
> + */
> +
> +void run_pool_callback(struct worker_pool *pool,
> +                       void *fin_result, void *result_frags,
> +                       void (*helper_func)(struct worker_pool *pool,
> +                                           void *fin_result,
> +                                           void *result_frags, int index))
> +{
> +    int index, completed;
> +
> +    /* Ensure that all worker threads see the same data as the
> +     * main thread.
> +     */
> +
> +    atomic_thread_fence(memory_order_acq_rel);
> +
> +    /* Start workers */
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        /* Note - we do not loop on semaphore until it reaches
> +         * zero, but on pool size/remaining workers.
> +         * This is by design. If the inner loop can handle
> +         * completion for more than one worker within an iteration
> +         * it will do so to ensure no additional iterations and
> +         * waits once all of them are done.
> +         *
> +         * This may result in us having an initial positive value
> +         * of the semaphore when the pool is invoked the next time.
> +         * This is harmless - the loop will spin up a couple of times
> +         * doing nothing while the workers are processing their data
> +         * slices.
> +         */
> +        sem_wait(pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            /* If the worker has marked its data chunk as complete,
> +             * invoke the helper function to combine the results of
> +             * this worker into the main result.
> +             *
> +             * The worker must invoke an appropriate memory fence
> +             * (most likely acq_rel) to ensure that the main thread
> +             * sees all of the results produced by the worker.
> +             */
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                if (helper_func) {
> +                    (helper_func)(pool, fin_result, result_frags, index);
> +                }
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +/* Run a thread pool - basic, does not do results processing.
> + */
> +
> +void run_pool(struct worker_pool *pool)
> +{
> +    run_pool_callback(pool, NULL, NULL, NULL);
> +}
> +
> +/* Brute force merge of a hashmap into another hashmap.
> + * Intended for use in parallel processing. The destination
> + * hashmap MUST be the same size as the one being merged.
> + *
> + * This can be achieved by pre-allocating them to correct size
> + * and using hmap_insert_fast() instead of hmap_insert()
> + */
> +
> +void fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> +{
> +    size_t i;
> +
> +    ovs_assert(inc->mask == dest->mask);
> +
> +    if (!inc->n) {
> +        /* Request to merge an empty frag, nothing to do */
> +        return;
> +    }
> +
> +    for (i = 0; i <= dest->mask; i++) {
> +        struct hmap_node **dest_bucket = &dest->buckets[i];
> +        struct hmap_node **inc_bucket = &inc->buckets[i];
> +        if (*inc_bucket != NULL) {
> +            struct hmap_node *last_node = *inc_bucket;
> +            while (last_node->next != NULL) {
> +                last_node = last_node->next;
> +            }
> +            last_node->next = *dest_bucket;
> +            *dest_bucket = *inc_bucket;
> +            *inc_bucket = NULL;
> +        }
> +    }
> +    dest->n += inc->n;
> +    inc->n = 0;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index)
> +{
> +    struct hmap *result = (struct hmap *)fin_result;
> +    struct hmap *res_frags = (struct hmap *)result_frags;
> +
> +    fast_hmap_merge(result, &res_frags[index]);
> +    hmap_destroy(&res_frags[index]);
> +}
> +
> +
> +void run_pool_hash(
> +        struct worker_pool *pool,
> +        struct hmap *result,
> +        struct hmap *result_frags)
> +{
> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
> +}
> +
> +/* Run a thread pool which gathers results in an array of lists.
> + * Merge results.
> + */
> +
> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> +                               void *fin_result, void *result_frags,
> +                               int index)
> +{
> +    struct ovs_list *result = (struct ovs_list *)fin_result;
> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
> +
> +    if (!ovs_list_is_empty(&res_frags[index])) {
> +        ovs_list_splice(result->next,
> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
> +    }
> +}
> +
> +
> +void run_pool_list(
> +        struct worker_pool *pool,
> +        struct ovs_list *result,
> +        struct ovs_list *result_frags)
> +{
> +    run_pool_callback(pool, result, result_frags, merge_list_results);
> +}
> +
> +void update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
> +{
> +    int i;
> +    if (hrl->mask != lflows->mask) {
> +        if (hrl->row_locks) {
> +            free(hrl->row_locks);
> +        }
> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
> +        hrl->mask = lflows->mask;
> +        for (i = 0; i <= lflows->mask; i++) {
> +            ovs_mutex_init(&hrl->row_locks[i]);
> +        }
> +    }
> +}
> diff --git a/lib/parallel-hmap.h b/lib/parallel-hmap.h
> new file mode 100644
> index 000000000..771bf583a
> --- /dev/null
> +++ b/lib/parallel-hmap.h
> @@ -0,0 +1,238 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVS_PARALLEL_HMAP
> +#define OVS_PARALLEL_HMAP 1
> +
> +/* if the parallel macros are defined by hmap.h or any other ovs define
> + * we skip over the ovn specific definitions.
> + */
> +
> +#ifdef  __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdbool.h>
> +#include <stdlib.h>
> +#include <semaphore.h>
> +#include "openvswitch/util.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovs-atomic.h"
> +
> +#ifdef __clang__
> +#pragma clang diagnostic push
> +#pragma clang diagnostic ignored "-Wthread-safety"
> +#endif

Why is this required ?  I see that if I comment I see compilation
errors with clang.

I would suggest adding some comments on why it is required if it is required.

> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> + * of parallel processing.
> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
> + * and will iterate hash buckets ThreadID, ThreadID + step,
> + * ThreadID + step * 2, etc. The actual macro accepts
> + * ThreadID + step * i as the JOBID parameter.
> + */
> +
> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> +       || ((NODE = NULL), false); \
> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
> +
> +/* We do not have a SAFE version of the macro, because the hash size is not
> + * atomic and hash removal operations would need to be wrapped with
> + * locks. This will defeat most of the benefits from doing anything in
> + * parallel.
> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> + * each thread should store them in a temporary list result instead, merging
> + * the lists into a combined result at the end */
> +
> +/* Work "Handle" */
> +
> +struct worker_control {
> +    int id; /* Used as a modulo when iterating over a hash. */
> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
> +    struct ovs_mutex mutex; /* Guards the data. */
> +    void *data; /* Pointer to data to be processed. */
> +    void *workload; /* back-pointer to the worker pool structure. */
> +};
> +
> +struct worker_pool {
> +    int size;   /* Number of threads in the pool. */
> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> +    struct worker_control *controls; /* "Handles" in this pool. */
> +    sem_t *done; /* Work completion semaphorew. */
> +};
> +
> +/* Add a worker pool for thread function start() which expects a pointer to
> + * a worker_control structure as an argument. */
> +
> +struct worker_pool *add_worker_pool(void *(*start)(void *));
> +
> +/* Setting this to true will make all processing threads exit */
> +
> +bool stop_parallel_processing(void);
> +
> +/* Build a hmap pre-sized for size elements */
> +
> +void fast_hmap_size_for(struct hmap *hmap, int size);
> +
> +/* Build a hmap with a mask equals to size */
> +
> +void fast_hmap_init(struct hmap *hmap, ssize_t size);
> +
> +/* Brute-force merge a hmap into hmap.
> + * Dest and inc have to have the same mask. The merge is performed
> + * by extending the element list for bucket N in the dest hmap with the list
> + * from bucket N in inc.
> + */
> +
> +void fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> +
> +/* Run a pool, without any default processing of results.
> + */
> +
> +void run_pool(struct worker_pool *pool);
> +
> +/* Run a pool, merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void run_pool_hash(struct worker_pool *pool,
> +                   struct hmap *result, struct hmap *result_frags);
> +
> +/* Run a pool, merge results from list frags into a final list result.
> + */
> +
> +void run_pool_list(struct worker_pool *pool,
> +                    struct ovs_list *result, struct ovs_list *result_frags);
> +
> +/* Run a pool, call a callback function to perform processing of results.
> + */
> +
> +void run_pool_callback(struct worker_pool *pool, void *fin_result,
> +                       void *result_frags,
> +                       void (*helper_func)(struct worker_pool *pool,
> +                                           void *fin_result, void *result_frags,
> +                                           int index));
> +
> +
> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> + * would land, or a null pointer if that bucket is empty. */
> +
> +static inline struct hmap_node *
> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> +{
> +    return hmap->buckets[num];
> +}
> +
> +static inline struct hmap_node *
> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
> +{
> +    size_t i;
> +    for (i = start; i <= hmap->mask; i+= pool_size) {
> +        struct hmap_node *node = hmap->buckets[i];
> +        if (node) {
> +            return node;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Returns the first node in 'hmap', as expected by thread with job_id
> + * for parallel processing in arbitrary order, or a null pointer if
> + * the slice of 'hmap' for that job_id is empty. */
> +static inline struct hmap_node *
> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
> +{
> +    return parallel_hmap_next__(hmap, job_id, pool_size);
> +}
> +
> +/* Returns the next node in the slice of 'hmap' following 'node',
> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> + * the 'hmap' slice.
> + *
> + */
> +static inline struct hmap_node *
> +parallel_hmap_next(const struct hmap *hmap,
> +                   const struct hmap_node *node, ssize_t pool_size)
> +{
> +    return (node->next
> +            ? node->next
> +            : parallel_hmap_next__(hmap,
> +                (node->hash & hmap->mask) + pool_size, pool_size));
> +}
> +
> +static inline void post_completed_work(struct worker_control *control)
> +{
> +    atomic_thread_fence(memory_order_acq_rel);
> +    atomic_store_relaxed(&control->finished, true);
> +    sem_post(control->done);
> +}
> +
> +static inline void wait_for_work(struct worker_control *control)
> +{
> +    sem_wait(control->fire);
> +}
> +
> +/* Hash per-row locking support - to be used only in conjunction
> + * with fast hash inserts. Normal hash inserts may resize the hash
> + * rendering the locking invalid.
> + */
> +
> +struct hashrow_locks {
> +    ssize_t mask;
> +    struct ovs_mutex *row_locks;
> +};
> +
> +/* Update an hash row locks structure to match the current hash size */
> +
> +void update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
> +
> +/* Lock a hash row */
> +
> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> +{
> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
> +}
> +
> +/* Unlock a hash row */
> +
> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
> +{
> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
> +}
> +/* Init the row locks structure */
> +
> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
> +{
> +    hrl->mask = 0;
> +    hrl->row_locks = NULL;
> +}
> +
> +bool can_parallelize_hashes(void);
> +
> +#ifdef __clang__
> +#pragma clang diagnostic pop
> +#endif
> +
> +#ifdef  __cplusplus
> +}
> +#endif
> +
> +
> +#endif /* lib/ovs-fasthmap.h */
> --
> 2.20.1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
diff mbox series

Patch

diff --git a/include/openvswitch/hmap.h b/include/openvswitch/hmap.h
index 4e001cc69..6aed568e1 100644
--- a/include/openvswitch/hmap.h
+++ b/include/openvswitch/hmap.h
@@ -17,6 +17,8 @@ 
 #ifndef HMAP_H
 #define HMAP_H 1
 
+#define OVS_HAS_PARALLEL_HMAP 1
+
 #include <stdbool.h>
 #include <stdlib.h>
 #include "openvswitch/util.h"
diff --git a/lib/automake.mk b/lib/automake.mk
index 39afbff9d..99f7bb8d4 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -122,6 +122,8 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/dynamic-string.c \
 	lib/entropy.c \
 	lib/entropy.h \
+	lib/parallel-hmap.h \
+	lib/parallel-hmap.c \
 	lib/fat-rwlock.c \
 	lib/fat-rwlock.h \
 	lib/fatal-signal.c \
diff --git a/lib/parallel-hmap.c b/lib/parallel-hmap.c
new file mode 100644
index 000000000..a923a6142
--- /dev/null
+++ b/lib/parallel-hmap.c
@@ -0,0 +1,410 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <semaphore.h>
+#include <fcntl.h>
+#include "fatal-signal.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "hmapx.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "ovs-numa.h"
+#include "random.h"
+#include "parallel-hmap.h"
+
+VLOG_DEFINE_THIS_MODULE(parallel_hmap);
+
+#define WORKER_SEM_NAME "%x-%p-%x"
+#define MAIN_SEM_NAME "%x-%p-main"
+
+
+/* These are accessed under mutex inside add_worker_pool().
+ * They do not need to be atomic.
+ */
+
+static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
+static bool can_parallelize = false;
+
+/* This is set only in the process of exit and the set is
+ * accompanied by a fence. It does not need to be atomic or be
+ * accessed under a lock.
+ */
+
+static bool workers_must_exit = false;
+
+static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static int pool_size;
+
+static int sembase;
+
+static void worker_pool_hook(void *aux OVS_UNUSED) {
+    int i;
+    static struct worker_pool *pool;
+    char sem_name[256];
+
+    workers_must_exit = true;
+
+    /* All workers must honour the must_exit flag and check for it regularly.
+     * We can make it atomic and check it via atomics in workers, but that
+     * is not really necessary as it is set just once - when the program
+     * terminates. So we use a fence which is invoked before exiting instead.
+     */
+    atomic_thread_fence(memory_order_acq_rel);
+
+    /* Wake up the workers after the must_exit flag has been set */
+
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        for (i = 0; i < pool->size ; i++) {
+            sem_post(pool->controls[i].fire);
+        }
+        for (i = 0; i < pool->size ; i++) {
+            sem_close(pool->controls[i].fire);
+            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
+            sem_unlink(sem_name);
+        }
+        sem_close(pool->done);
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+        sem_unlink(sem_name);
+    }
+}
+
+static void setup_worker_pools(void) {
+    int cores, nodes;
+
+    nodes = ovs_numa_get_n_numas();
+    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+        nodes = 1;
+    }
+    cores = ovs_numa_get_n_cores();
+
+    /* If there is no NUMA config, use 4 cores.
+     * If there is NUMA config use half the cores on
+     * one node so that the OS does not start pushing
+     * threads to other nodes.
+     */
+    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+        /* If there is no NUMA we can try the ovs-threads routine.
+         * It falls back to sysconf and/or affinity mask.
+         */
+        cores = count_cpu_cores();
+        pool_size = cores;
+    } else {
+        pool_size = cores / nodes;
+    }
+    if (pool_size > 16) {
+        pool_size = 16;
+    }
+    can_parallelize = (pool_size >= 3);
+    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
+    sembase = random_uint32();
+}
+
+bool stop_parallel_processing(void)
+{
+    return workers_must_exit;
+}
+
+bool can_parallelize_hashes(void)
+{
+    bool test = false;
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools();
+        ovs_mutex_unlock(&init_mutex);
+    }
+    return can_parallelize;
+}
+
+struct worker_pool *add_worker_pool(void *(*start)(void *)){
+
+    struct worker_pool *new_pool = NULL;
+    struct worker_control *new_control;
+    bool test = false;
+    int i;
+    char sem_name[256];
+
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools();
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    ovs_mutex_lock(&init_mutex);
+    if (can_parallelize) {
+        new_pool = xmalloc(sizeof(struct worker_pool));
+        new_pool->size = pool_size;
+        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
+        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+
+        ovs_list_push_back(&worker_pools, &new_pool->list_node);
+
+        new_pool->controls =
+            xmalloc(sizeof(struct worker_control) * new_pool->size);
+
+        for (i = 0; i < new_pool->size; i++) {
+            new_control = &new_pool->controls[i];
+            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
+            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+            new_control->id = i;
+            new_control->done = new_pool->done;
+            new_control->data = NULL;
+            ovs_mutex_init(&new_control->mutex);
+            new_control->finished = ATOMIC_VAR_INIT(false);
+        }
+
+        for (i = 0; i < pool_size; i++) {
+            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+        }
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return new_pool;
+}
+
+
+/* Initializes 'hmap' as an empty hash table with mask N. */
+void
+fast_hmap_init(struct hmap *hmap, ssize_t mask)
+{
+    size_t i;
+
+    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
+    hmap->one = NULL;
+    hmap->mask = mask;
+    hmap->n = 0;
+    for (i = 0; i <= hmap->mask; i++) {
+        hmap->buckets[i] = NULL;
+    }
+}
+
+/* Initializes 'hmap' as an empty hash table of size X.
+ * Intended for use in parallel processing so that all
+ * fragments used to store results in a parallel job
+ * are the same size.
+ */
+void
+fast_hmap_size_for(struct hmap *hmap, int size)
+{
+    size_t mask;
+    mask = size / 2;
+    mask |= mask >> 1;
+    mask |= mask >> 2;
+    mask |= mask >> 4;
+    mask |= mask >> 8;
+    mask |= mask >> 16;
+#if SIZE_MAX > UINT32_MAX
+    mask |= mask >> 32;
+#endif
+
+    /* If we need to dynamically allocate buckets we might as well allocate at
+     * least 4 of them. */
+    mask |= (mask & 1) << 1;
+
+    fast_hmap_init(hmap, mask);
+}
+
+/* Run a thread pool which uses a callback function to process results
+ */
+
+void run_pool_callback(struct worker_pool *pool,
+                       void *fin_result, void *result_frags,
+                       void (*helper_func)(struct worker_pool *pool,
+                                           void *fin_result,
+                                           void *result_frags, int index))
+{
+    int index, completed;
+
+    /* Ensure that all worker threads see the same data as the
+     * main thread.
+     */
+
+    atomic_thread_fence(memory_order_acq_rel);
+
+    /* Start workers */
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        /* Note - we do not loop on semaphore until it reaches
+         * zero, but on pool size/remaining workers.
+         * This is by design. If the inner loop can handle
+         * completion for more than one worker within an iteration
+         * it will do so to ensure no additional iterations and
+         * waits once all of them are done.
+         *
+         * This may result in us having an initial positive value
+         * of the semaphore when the pool is invoked the next time.
+         * This is harmless - the loop will spin up a couple of times
+         * doing nothing while the workers are processing their data
+         * slices.
+         */
+        sem_wait(pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            /* If the worker has marked its data chunk as complete,
+             * invoke the helper function to combine the results of
+             * this worker into the main result.
+             *
+             * The worker must invoke an appropriate memory fence
+             * (most likely acq_rel) to ensure that the main thread
+             * sees all of the results produced by the worker.
+             */
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                if (helper_func) {
+                    (helper_func)(pool, fin_result, result_frags, index);
+                }
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Run a thread pool - basic, does not do results processing.
+ */
+
+void run_pool(struct worker_pool *pool)
+{
+    run_pool_callback(pool, NULL, NULL, NULL);
+}
+
+/* Brute force merge of a hashmap into another hashmap.
+ * Intended for use in parallel processing. The destination
+ * hashmap MUST be the same size as the one being merged.
+ *
+ * This can be achieved by pre-allocating them to correct size
+ * and using hmap_insert_fast() instead of hmap_insert()
+ */
+
+void fast_hmap_merge(struct hmap *dest, struct hmap *inc)
+{
+    size_t i;
+
+    ovs_assert(inc->mask == dest->mask);
+
+    if (!inc->n) {
+        /* Request to merge an empty frag, nothing to do */
+        return;
+    }
+
+    for (i = 0; i <= dest->mask; i++) {
+        struct hmap_node **dest_bucket = &dest->buckets[i];
+        struct hmap_node **inc_bucket = &inc->buckets[i];
+        if (*inc_bucket != NULL) {
+            struct hmap_node *last_node = *inc_bucket;
+            while (last_node->next != NULL) {
+                last_node = last_node->next;
+            }
+            last_node->next = *dest_bucket;
+            *dest_bucket = *inc_bucket;
+            *inc_bucket = NULL;
+        }
+    }
+    dest->n += inc->n;
+    inc->n = 0;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+
+static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
+                               void *fin_result, void *result_frags,
+                               int index)
+{
+    struct hmap *result = (struct hmap *)fin_result;
+    struct hmap *res_frags = (struct hmap *)result_frags;
+
+    fast_hmap_merge(result, &res_frags[index]);
+    hmap_destroy(&res_frags[index]);
+}
+
+
+void run_pool_hash(
+        struct worker_pool *pool,
+        struct hmap *result,
+        struct hmap *result_frags)
+{
+    run_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+
+static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
+                               void *fin_result, void *result_frags,
+                               int index)
+{
+    struct ovs_list *result = (struct ovs_list *)fin_result;
+    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
+
+    if (!ovs_list_is_empty(&res_frags[index])) {
+        ovs_list_splice(result->next,
+                ovs_list_front(&res_frags[index]), &res_frags[index]);
+    }
+}
+
+
+void run_pool_list(
+        struct worker_pool *pool,
+        struct ovs_list *result,
+        struct ovs_list *result_frags)
+{
+    run_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
+void update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
+{
+    int i;
+    if (hrl->mask != lflows->mask) {
+        if (hrl->row_locks) {
+            free(hrl->row_locks);
+        }
+        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
+        hrl->mask = lflows->mask;
+        for (i = 0; i <= lflows->mask; i++) {
+            ovs_mutex_init(&hrl->row_locks[i]);
+        }
+    }
+}
diff --git a/lib/parallel-hmap.h b/lib/parallel-hmap.h
new file mode 100644
index 000000000..771bf583a
--- /dev/null
+++ b/lib/parallel-hmap.h
@@ -0,0 +1,238 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVS_PARALLEL_HMAP
+#define OVS_PARALLEL_HMAP 1
+
+/* if the parallel macros are defined by hmap.h or any other ovs define
+ * we skip over the ovn specific definitions.
+ */
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <semaphore.h>
+#include "openvswitch/util.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovs-atomic.h"
+
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wthread-safety"
+#endif
+/* A version of the HMAP_FOR_EACH macro intended for iterating as part
+ * of parallel processing.
+ * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
+ * and will iterate hash buckets ThreadID, ThreadID + step,
+ * ThreadID + step * 2, etc. The actual macro accepts
+ * ThreadID + step * i as the JOBID parameter.
+ */
+
+#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
+   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
+        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
+       || ((NODE = NULL), false); \
+       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
+
+/* We do not have a SAFE version of the macro, because the hash size is not
+ * atomic and hash removal operations would need to be wrapped with
+ * locks. This will defeat most of the benefits from doing anything in
+ * parallel.
+ * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
+ * each thread should store them in a temporary list result instead, merging
+ * the lists into a combined result at the end */
+
+/* Work "Handle" */
+
+struct worker_control {
+    int id; /* Used as a modulo when iterating over a hash. */
+    atomic_bool finished; /* Set to true after achunk of work is complete. */
+    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
+    sem_t *done; /* Work completion semaphore - sem_post on completion. */
+    struct ovs_mutex mutex; /* Guards the data. */
+    void *data; /* Pointer to data to be processed. */
+    void *workload; /* back-pointer to the worker pool structure. */
+};
+
+struct worker_pool {
+    int size;   /* Number of threads in the pool. */
+    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
+    struct worker_control *controls; /* "Handles" in this pool. */
+    sem_t *done; /* Work completion semaphorew. */
+};
+
+/* Add a worker pool for thread function start() which expects a pointer to
+ * a worker_control structure as an argument. */
+
+struct worker_pool *add_worker_pool(void *(*start)(void *));
+
+/* Setting this to true will make all processing threads exit */
+
+bool stop_parallel_processing(void);
+
+/* Build a hmap pre-sized for size elements */
+
+void fast_hmap_size_for(struct hmap *hmap, int size);
+
+/* Build a hmap with a mask equals to size */
+
+void fast_hmap_init(struct hmap *hmap, ssize_t size);
+
+/* Brute-force merge a hmap into hmap.
+ * Dest and inc have to have the same mask. The merge is performed
+ * by extending the element list for bucket N in the dest hmap with the list
+ * from bucket N in inc.
+ */
+
+void fast_hmap_merge(struct hmap *dest, struct hmap *inc);
+
+/* Run a pool, without any default processing of results.
+ */
+
+void run_pool(struct worker_pool *pool);
+
+/* Run a pool, merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void run_pool_hash(struct worker_pool *pool,
+                   struct hmap *result, struct hmap *result_frags);
+
+/* Run a pool, merge results from list frags into a final list result.
+ */
+
+void run_pool_list(struct worker_pool *pool,
+                    struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Run a pool, call a callback function to perform processing of results.
+ */
+
+void run_pool_callback(struct worker_pool *pool, void *fin_result,
+                       void *result_frags,
+                       void (*helper_func)(struct worker_pool *pool,
+                                           void *fin_result, void *result_frags,
+                                           int index));
+
+
+/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
+ * would land, or a null pointer if that bucket is empty. */
+
+static inline struct hmap_node *
+hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
+{
+    return hmap->buckets[num];
+}
+
+static inline struct hmap_node *
+parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
+{
+    size_t i;
+    for (i = start; i <= hmap->mask; i+= pool_size) {
+        struct hmap_node *node = hmap->buckets[i];
+        if (node) {
+            return node;
+        }
+    }
+    return NULL;
+}
+
+/* Returns the first node in 'hmap', as expected by thread with job_id
+ * for parallel processing in arbitrary order, or a null pointer if
+ * the slice of 'hmap' for that job_id is empty. */
+static inline struct hmap_node *
+parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
+{
+    return parallel_hmap_next__(hmap, job_id, pool_size);
+}
+
+/* Returns the next node in the slice of 'hmap' following 'node',
+ * in arbitrary order, or a * null pointer if 'node' is the last node in
+ * the 'hmap' slice.
+ *
+ */
+static inline struct hmap_node *
+parallel_hmap_next(const struct hmap *hmap,
+                   const struct hmap_node *node, ssize_t pool_size)
+{
+    return (node->next
+            ? node->next
+            : parallel_hmap_next__(hmap,
+                (node->hash & hmap->mask) + pool_size, pool_size));
+}
+
+static inline void post_completed_work(struct worker_control *control)
+{
+    atomic_thread_fence(memory_order_acq_rel);
+    atomic_store_relaxed(&control->finished, true);
+    sem_post(control->done);
+}
+
+static inline void wait_for_work(struct worker_control *control)
+{
+    sem_wait(control->fire);
+}
+
+/* Hash per-row locking support - to be used only in conjunction
+ * with fast hash inserts. Normal hash inserts may resize the hash
+ * rendering the locking invalid.
+ */
+
+struct hashrow_locks {
+    ssize_t mask;
+    struct ovs_mutex *row_locks;
+};
+
+/* Update an hash row locks structure to match the current hash size */
+
+void update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
+
+/* Lock a hash row */
+
+static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
+{
+    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
+}
+
+/* Unlock a hash row */
+
+static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
+{
+    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
+}
+/* Init the row locks structure */
+
+static inline void init_hash_row_locks(struct hashrow_locks *hrl)
+{
+    hrl->mask = 0;
+    hrl->row_locks = NULL;   
+}
+
+bool can_parallelize_hashes(void);
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#endif
+
+#ifdef  __cplusplus
+}
+#endif
+
+
+#endif /* lib/ovs-fasthmap.h */