diff mbox series

[ovs-dev,v4,1/9] ovn-libs: Add support for parallel processing

Message ID 20200925095807.19358-2-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v4,1/9] ovn-libs: Add support for parallel processing | expand

Commit Message

Anton Ivanov Sept. 25, 2020, 9:57 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

This adds a set of functions and macros intended to process
hashes in parallel.

The principles of operation are documented in the fasthmap.h

If these one day go into the OVS tree, the OVS tree versions
would be used in preference.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/automake.mk |   2 +
 lib/fasthmap.c  | 373 ++++++++++++++++++++++++++++++++++++++++++++++++
 lib/fasthmap.h  | 217 ++++++++++++++++++++++++++++
 3 files changed, 592 insertions(+)
 create mode 100644 lib/fasthmap.c
 create mode 100644 lib/fasthmap.h

Comments

0-day Robot Sept. 25, 2020, 11:01 a.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
ERROR: Improper whitespace around control block
#464 FILE: lib/fasthmap.h:44:
#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \

Lines checked: 640, Warnings: 0, Errors: 1


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Mark Michelson Oct. 13, 2020, 7:33 p.m. UTC | #2
On 9/25/20 5:57 AM, anton.ivanov@cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> This adds a set of functions and macros intended to process
> hashes in parallel.
> 
> The principles of operation are documented in the fasthmap.h
> 
> If these one day go into the OVS tree, the OVS tree versions
> would be used in preference.
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---
>   lib/automake.mk |   2 +
>   lib/fasthmap.c  | 373 ++++++++++++++++++++++++++++++++++++++++++++++++
>   lib/fasthmap.h  | 217 ++++++++++++++++++++++++++++
>   3 files changed, 592 insertions(+)
>   create mode 100644 lib/fasthmap.c
>   create mode 100644 lib/fasthmap.h
> 
> diff --git a/lib/automake.mk b/lib/automake.mk
> index f3e9c8818..c9bf7e021 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>   	lib/expr.c \
>   	lib/extend-table.h \
>   	lib/extend-table.c \
> +    lib/fasthmap.h \
> +    lib/fasthmap.c \

Looks like you may have used different whitespace for the change here. 
It appears you used spaces instead of a tab.

>   	lib/ip-mcast-index.c \
>   	lib/ip-mcast-index.h \
>   	lib/mcast-group-index.c \
> diff --git a/lib/fasthmap.c b/lib/fasthmap.c
> new file mode 100644
> index 000000000..38b06d74a
> --- /dev/null
> +++ b/lib/fasthmap.c
> @@ -0,0 +1,373 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <semaphore.h>
> +#include "fatal-signal.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "fasthmap.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "ovs-numa.h"
> +
> +VLOG_DEFINE_THIS_MODULE(fasthmap);
> +
> +
> +static bool worker_pool_setup = false;
> +static bool workers_must_exit = false;
> +
> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static int pool_size;
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> +    int i;
> +    static struct worker_pool *pool;
> +    workers_must_exit = true; /* all workers must honour this flag */
> +    atomic_thread_fence(memory_order_release);
> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> +        for (i = 0; i < pool->size ; i++) {
> +            sem_post(&pool->controls[i].fire);
> +        }
> +    }
> +}
> +
> +static void setup_worker_pools(void) {
> +    int cores, nodes;
> +
> +    nodes = ovs_numa_get_n_numas();
> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> +        nodes = 1;
> +    }
> +    cores = ovs_numa_get_n_cores();
> +
> +    /* If there is no NUMA config, use 4 cores.
> +     * If there is NUMA config use half the cores on
> +     * one node so that the OS does not start pushing
> +     * threads to other nodes.
> +     */
> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> +        pool_size = 4;
> +    } else {
> +        pool_size = cores / nodes / 2;
> +    }
> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> +    worker_pool_setup = true;
> +}
> +
> +bool ovn_seize_fire(void)
> +{
> +    return workers_must_exit;
> +}
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
> +
> +    struct worker_pool *new_pool = NULL;
> +    struct worker_control *new_control;
> +    int i;
> +
> +    ovs_mutex_lock(&init_mutex);
> +
> +    if (!worker_pool_setup) {
> +         setup_worker_pools();
> +    }
> +
> +    new_pool = xmalloc(sizeof(struct worker_pool));
> +    new_pool->size = pool_size;
> +    sem_init(&new_pool->done, 0, 0);
> +
> +    ovs_list_push_back(&worker_pools, &new_pool->list_node);
> +
> +    new_pool->controls =
> +        xmalloc(sizeof(struct worker_control) * new_pool->size);
> +
> +    for (i = 0; i < new_pool->size; i++) {
> +        new_control = &new_pool->controls[i];
> +        sem_init(&new_control->fire, 0, 0);
> +        new_control->id = i;
> +        new_control->done = &new_pool->done;
> +        new_control->data = NULL;
> +        ovs_mutex_init(&new_control->mutex);
> +        new_control->finished = ATOMIC_VAR_INIT(false);
> +    }
> +
> +    for (i = 0; i < pool_size; i++) {
> +        ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return new_pool;
> +}
> +
> +
> +/* Initializes 'hmap' as an empty hash table with mask N. */
> +void
> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> +{
> +    size_t i;
> +
> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> +    hmap->one = NULL;
> +    hmap->mask = mask;
> +    hmap->n = 0;
> +    for (i = 0; i <= hmap->mask; i++) {
> +        hmap->buckets[i] = NULL;
> +    }
> +}
> +
> +/* Initializes 'hmap' as an empty hash table of size X.
> + * Intended for use in parallel processing so that all
> + * fragments used to store results in a parallel job
> + * are the same size.
> + */
> +void
> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
> +{
> +    size_t mask;
> +    mask = size / 2;
> +    mask |= mask >> 1;
> +    mask |= mask >> 2;
> +    mask |= mask >> 4;
> +    mask |= mask >> 8;
> +    mask |= mask >> 16;
> +#if SIZE_MAX > UINT32_MAX
> +    mask |= mask >> 32;
> +#endif
> +
> +    /* If we need to dynamically allocate buckets we might as well allocate at
> +     * least 4 of them. */
> +    mask |= (mask & 1) << 1;
> +
> +    fast_hmap_init(hmap, mask);
> +}
> +
> +/* Run a thread pool - basic, does not do results processing.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool)
> +{
> +    int index, completed;
> +
> +    atomic_thread_fence(memory_order_release);
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(&pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        sem_wait(&pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +/* Brute force merge of a hashmap into another hashmap.
> + * Intended for use in parallel processing. The destination
> + * hashmap MUST be the same size as the one being merged.
> + *
> + * This can be achieved by pre-allocating them to correct size
> + * and using hmap_insert_fast() instead of hmap_insert()
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> +{
> +    size_t i;
> +
> +    ovs_assert(inc->mask == dest->mask);
> +
> +    if (!inc->n) {
> +        /* Request to merge an empty frag, nothing to do */
> +        return;
> +    }
> +
> +    for (i = 0; i <= dest->mask; i++) {
> +        struct hmap_node **dest_bucket = &dest->buckets[i];
> +        struct hmap_node **inc_bucket = &inc->buckets[i];
> +        if (*inc_bucket != NULL) {
> +            struct hmap_node *last_node = *inc_bucket;
> +            while (last_node->next != NULL) {
> +                last_node = last_node->next;
> +            }
> +            last_node->next = *dest_bucket;
> +            *dest_bucket = *inc_bucket;
> +            *inc_bucket = NULL;
> +        }
> +    }
> +    dest->n += inc->n;
> +    inc->n = 0;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +
> +void ovn_run_pool_hash(
> +        struct worker_pool *pool,
> +        struct hmap *result,
> +        struct hmap *result_frags)
> +{
> +    int index, completed;
> +
> +    atomic_thread_fence(memory_order_release);
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(&pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        sem_wait(&pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                fast_hmap_merge(result, &result_frags[index]);
> +                hmap_destroy(&result_frags[index]);
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +void ovn_merge_lists(struct ovs_list **dest, struct ovs_list *inc)
> +{
> +    struct ovs_list *last, *first;
> +    if (inc == NULL) {
> +        return;
> +    }
> +
> +    if (* dest == NULL) {
> +        * dest = inc;
> +        return;
> +    }
> +
> +    if (ovs_list_is_empty(inc)) {
> +        return;
> +    }
> +
> +    if (ovs_list_is_empty(*dest)) {
> +        * dest = inc;
> +        return;
> +    }
> +
> +
> +    last = inc->prev;
> +    /* first element is not the list pointer itself, it is the ->next */
> +    first = inc->next;
> +
> +    (*dest)->prev->next = first;
> +    first->prev = (*dest)->prev;
> +
> +    (*dest)->prev = last;
> +    last->next = *dest;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of lists. Merge results
> + */
> +
> +void ovn_run_pool_list(
> +        struct worker_pool *pool,
> +        struct ovs_list **result,
> +        struct ovs_list **result_frags)
> +{
> +    int index, completed;
> +
> +    atomic_thread_fence(memory_order_release);
> +
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(&pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +
> +    do {
> +        bool test;
> +        sem_wait(&pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                merge_lists(result, result_frags[index]);
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}
> +
> +/* Run a thread pool which uses a callback function to process results
> + */
> +
> +void ovn_run_pool_callback(
> +        struct worker_pool *pool,
> +        void *fin_result,
> +        void (*helper_func)(
> +            struct worker_pool *pool, void *fin_result, int index))
> +{
> +    int index, completed;
> +
> +    atomic_thread_fence(memory_order_release);
> +
> +    for (index = 0; index < pool->size; index++) {
> +        sem_post(&pool->controls[index].fire);
> +    }
> +
> +    completed = 0;
> +
> +    do {
> +        bool test;
> +        sem_wait(&pool->done);
> +        for (index = 0; index < pool->size; index++) {
> +            test = true;
> +            if (atomic_compare_exchange_weak(
> +                    &pool->controls[index].finished,
> +                    &test,
> +                    false)) {
> +                (helper_func)(pool, fin_result, index);
> +                completed++;
> +                pool->controls[index].data = NULL;
> +            }
> +        }
> +    } while (completed < pool->size);
> +}

2 notes:
* ovn_run_pool_list and ovn_run_pool_callback are never actually used in 
this patch series.
* There's a lot of code repeated between ovn_run_pool_hash, 
ovn_run_pool_list, and ovn_run_pool_callback.

I think it's safe to say that ovn_run_pool_list could be removed unless 
there's a specific improvement you have in mind that would use it.

If there's no immediate need to keep ovn_run_pool_list around, then I 
think you should also remove ovn_run_pool_callback as well.

If, however, ovn_run_pool_list should stick around, then I think both 
ovn_run_pool_list and ovn_run_pool_hash should be implemented using 
ovn_run_pool_callback. This way, a lot of the repeated code is eliminated.

> diff --git a/lib/fasthmap.h b/lib/fasthmap.h
> new file mode 100644
> index 000000000..ccf005475
> --- /dev/null
> +++ b/lib/fasthmap.h
> @@ -0,0 +1,217 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef HMAP_HAS_PARALLEL_MACROS
> +#define HMAP_HAS_PARALLEL_MACROS 1
> +
> +/* if the parallel macros are defined by hmap.h or any other ovs define
> + * we skip over the ovn specific definitions.
> + */
> +
> +#ifdef  __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdbool.h>
> +#include <stdlib.h>
> +#include <semaphore.h>
> +#include "openvswitch/util.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovs-atomic.h"
> +
> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> + * of parallel processing.
> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
> + * and will iterate hash buckets ThreadID, ThreadID + step,
> + * ThreadID + step * 2, etc. The actual macro accepts
> + * ThreadID + step * i as the JOBID parameter.
> + */
> +
> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> +       || ((NODE = NULL), false); \
> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
> +
> +/* We do not have a SAFE version of the macro, because the hash size is not
> + * atomic and hash removal operations would need to be wrapped with
> + * locks. This will defeat most of the benefits from doing anything in
> + * parallel.
> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> + * each thread should store them in a temporary list result instead, merging
> + * the lists into a combined result at the end */
> +
> +/* Work "Handle" */
> +
> +struct worker_control {
> +    int id; /* Used as a modulo when iterating over a hash. */
> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
> +    sem_t fire; /* Work start semaphore - sem_post starts the worker. */
> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
> +    struct ovs_mutex mutex; /* Guards the data. */

This mutex is never used in this series. Can it be removed?

> +    void *data; /* Pointer to data to be processed. */
> +    void *workload; /* back-pointer to the worker pool structure. */
> +};
> +
> +struct worker_pool {
> +    int size;   /* Number of threads in the pool. */
> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> +    struct worker_control *controls; /* "Handles" in this pool. */
> +    sem_t done; /* Work completion semaphorew. */
> +};
> +
> +/* Add a worker pool for thread function start() which expects a pointer to
> + * a worker_control structure as an argument. */
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +
> +/* Setting this to true will make all processing threads exit */
> +
> +bool ovn_seize_fire(void);

Was this intended to be ovn_cease_fire()? Or is there something that's 
being seized that I'm not understanding here?

> +
> +/* Build a hmap pre-sized for size elements */
> +
> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);

This should probably take a size_t instead of an int.

> +
> +/* Build a hmap with a mask equals to size */
> +
> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);

This should probably take a size_t instead of an ssize_t

> +
> +/* Brute-force merge a hmap into hmap.
> + * Dest and inc have to have the same mask. The merge is performed
> + * by extending the element list for bucket N in the dest hmap with the list
> + * from bucket N in inc.
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> +
> +/* Merge two lists.
> + * It is possible to achieve the same functionality using ovs_list_splice().
> + * This ensures the splicing is exactly for tail of dest to head of inc.
> + */
> +
> +void ovn_merge_lists(struct ovs_list **dest, struct ovs_list *inc);

Hm, I think ovs_list_push_back_all() achieves the same result.

> +
> +/* Run a pool, without any default processing of results.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool);
> +
> +/* Run a pool, merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_run_pool_hash(struct worker_pool *pool,
> +                    struct hmap *result, struct hmap *result_frags);
> +
> +/* Run a pool, merge results from lists into a final combined list.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_run_pool_list(struct worker_pool *pool,
> +                    struct ovs_list **result, struct ovs_list **result_frags);
> +
> +/* Run a pool, call a callback function to perform processing of results.
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> +                    void (*helper_func)(struct worker_pool *pool,
> +                                    void *fin_result, int index));
> +
> +
> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> + * would land, or a null pointer if that bucket is empty. */
> +
> +static inline struct hmap_node *
> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> +{
> +    return hmap->buckets[num];
> +}
> +
> +static inline struct hmap_node *
> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
> +{
> +    size_t i;
> +    for (i = start; i <= hmap->mask; i+= pool_size) {
> +        struct hmap_node *node = hmap->buckets[i];
> +        if (node) {
> +            return node;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Returns the first node in 'hmap', as expected by thread with job_id
> + * for parallel processing in arbitrary order, or a null pointer if
> + * the slice of 'hmap' for that job_id is empty. */
> +static inline struct hmap_node *
> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
> +{
> +    return parallel_hmap_next__(hmap, job_id, pool_size);
> +}
> +
> +/* Returns the next node in the slice of 'hmap' following 'node',
> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> + * the 'hmap' slice.
> + *
> + */
> +static inline struct hmap_node *
> +parallel_hmap_next(const struct hmap *hmap,
> +                        const struct hmap_node *node, ssize_t pool_size)
> +{
> +    return (node->next
> +            ? node->next
> +            : parallel_hmap_next__(hmap,
> +                (node->hash & hmap->mask) + pool_size, pool_size));
> +}
> +
> +/* Use the OVN library functions for stuff which OVS has not defined
> + * If OVS has defined these, they will still compile using the OVN
> + * local names, but will be dropped by the linker in favour of the OVS
> + * supplied functions.
> + */
> +
> +#define seize_fire() ovn_seize_fire()
> +
> +#define add_worker_pool(start) ovn_add_worker_pool(start)
> +
> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
> +
> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
> +
> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
> +
> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
> +
> +#define merge_lists(dest, inc) ovn_merge_lists(dest, inc)
> +
> +#define ovn_run_pool(pool) ovn_run_pool(pool)

Was the macro here supposed to be named run_pool instead of ovn_run_pool?

> +
> +#define run_pool_hash(pool, result, result_frags) \
> +    ovn_run_pool_hash(pool, result, result_frags)
> +
> +#define run_pool_list(pool, result, result_frags) \
> +    ovn_run_pool_list(pool, result, result_frags)
> +
> +#define run_pool_callback(pool, fin_result, helper_func) \
> +    ovn_run_pool_callback(pool, fin_result, helper_func)
> +
> +#ifdef  __cplusplus
> +}
> +#endif
> +
> +#endif /* lib/fast-hmap.h */
>
Anton Ivanov Oct. 13, 2020, 8:18 p.m. UTC | #3
On 13/10/2020 20:33, Mark Michelson wrote:
> On 9/25/20 5:57 AM, anton.ivanov@cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> This adds a set of functions and macros intended to process
>> hashes in parallel.
>>
>> The principles of operation are documented in the fasthmap.h
>>
>> If these one day go into the OVS tree, the OVS tree versions
>> would be used in preference.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> ---
>>   lib/automake.mk |   2 +
>>   lib/fasthmap.c  | 373 ++++++++++++++++++++++++++++++++++++++++++++++++
>>   lib/fasthmap.h  | 217 ++++++++++++++++++++++++++++
>>   3 files changed, 592 insertions(+)
>>   create mode 100644 lib/fasthmap.c
>>   create mode 100644 lib/fasthmap.h
>>
>> diff --git a/lib/automake.mk b/lib/automake.mk
>> index f3e9c8818..c9bf7e021 100644
>> --- a/lib/automake.mk
>> +++ b/lib/automake.mk
>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>       lib/expr.c \
>>       lib/extend-table.h \
>>       lib/extend-table.c \
>> +    lib/fasthmap.h \
>> +    lib/fasthmap.c \
> 
> Looks like you may have used different whitespace for the change here. 
> It appears you used spaces instead of a tab.
> 
>>       lib/ip-mcast-index.c \
>>       lib/ip-mcast-index.h \
>>       lib/mcast-group-index.c \
>> diff --git a/lib/fasthmap.c b/lib/fasthmap.c
>> new file mode 100644
>> index 000000000..38b06d74a
>> --- /dev/null
>> +++ b/lib/fasthmap.c
>> @@ -0,0 +1,373 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +#include <stdint.h>
>> +#include <string.h>
>> +#include <semaphore.h>
>> +#include "fatal-signal.h"
>> +#include "util.h"
>> +#include "openvswitch/vlog.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "fasthmap.h"
>> +#include "ovs-atomic.h"
>> +#include "ovs-thread.h"
>> +#include "ovs-numa.h"
>> +
>> +VLOG_DEFINE_THIS_MODULE(fasthmap);
>> +
>> +
>> +static bool worker_pool_setup = false;
>> +static bool workers_must_exit = false;
>> +
>> +static struct ovs_list worker_pools = 
>> OVS_LIST_INITIALIZER(&worker_pools);
>> +
>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>> +
>> +static int pool_size;
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>> +    int i;
>> +    static struct worker_pool *pool;
>> +    workers_must_exit = true; /* all workers must honour this flag */
>> +    atomic_thread_fence(memory_order_release);
>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> +        for (i = 0; i < pool->size ; i++) {
>> +            sem_post(&pool->controls[i].fire);
>> +        }
>> +    }
>> +}
>> +
>> +static void setup_worker_pools(void) {
>> +    int cores, nodes;
>> +
>> +    nodes = ovs_numa_get_n_numas();
>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>> +        nodes = 1;
>> +    }
>> +    cores = ovs_numa_get_n_cores();
>> +
>> +    /* If there is no NUMA config, use 4 cores.
>> +     * If there is NUMA config use half the cores on
>> +     * one node so that the OS does not start pushing
>> +     * threads to other nodes.
>> +     */
>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>> +        pool_size = 4;
>> +    } else {
>> +        pool_size = cores / nodes / 2;
>> +    }
>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>> +    worker_pool_setup = true;
>> +}
>> +
>> +bool ovn_seize_fire(void)
>> +{
>> +    return workers_must_exit;
>> +}
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>> +
>> +    struct worker_pool *new_pool = NULL;
>> +    struct worker_control *new_control;
>> +    int i;
>> +
>> +    ovs_mutex_lock(&init_mutex);
>> +
>> +    if (!worker_pool_setup) {
>> +         setup_worker_pools();
>> +    }
>> +
>> +    new_pool = xmalloc(sizeof(struct worker_pool));
>> +    new_pool->size = pool_size;
>> +    sem_init(&new_pool->done, 0, 0);
>> +
>> +    ovs_list_push_back(&worker_pools, &new_pool->list_node);
>> +
>> +    new_pool->controls =
>> +        xmalloc(sizeof(struct worker_control) * new_pool->size);
>> +
>> +    for (i = 0; i < new_pool->size; i++) {
>> +        new_control = &new_pool->controls[i];
>> +        sem_init(&new_control->fire, 0, 0);
>> +        new_control->id = i;
>> +        new_control->done = &new_pool->done;
>> +        new_control->data = NULL;
>> +        ovs_mutex_init(&new_control->mutex);
>> +        new_control->finished = ATOMIC_VAR_INIT(false);
>> +    }
>> +
>> +    for (i = 0; i < pool_size; i++) {
>> +        ovs_thread_create("worker pool helper", start, 
>> &new_pool->controls[i]);
>> +    }
>> +    ovs_mutex_unlock(&init_mutex);
>> +    return new_pool;
>> +}
>> +
>> +
>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>> +void
>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>> +{
>> +    size_t i;
>> +
>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>> +    hmap->one = NULL;
>> +    hmap->mask = mask;
>> +    hmap->n = 0;
>> +    for (i = 0; i <= hmap->mask; i++) {
>> +        hmap->buckets[i] = NULL;
>> +    }
>> +}
>> +
>> +/* Initializes 'hmap' as an empty hash table of size X.
>> + * Intended for use in parallel processing so that all
>> + * fragments used to store results in a parallel job
>> + * are the same size.
>> + */
>> +void
>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>> +{
>> +    size_t mask;
>> +    mask = size / 2;
>> +    mask |= mask >> 1;
>> +    mask |= mask >> 2;
>> +    mask |= mask >> 4;
>> +    mask |= mask >> 8;
>> +    mask |= mask >> 16;
>> +#if SIZE_MAX > UINT32_MAX
>> +    mask |= mask >> 32;
>> +#endif
>> +
>> +    /* If we need to dynamically allocate buckets we might as well 
>> allocate at
>> +     * least 4 of them. */
>> +    mask |= (mask & 1) << 1;
>> +
>> +    fast_hmap_init(hmap, mask);
>> +}
>> +
>> +/* Run a thread pool - basic, does not do results processing.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool)
>> +{
>> +    int index, completed;
>> +
>> +    atomic_thread_fence(memory_order_release);
>> +
>> +    for (index = 0; index < pool->size; index++) {
>> +        sem_post(&pool->controls[index].fire);
>> +    }
>> +
>> +    completed = 0;
>> +
>> +    do {
>> +        bool test;
>> +        sem_wait(&pool->done);
>> +        for (index = 0; index < pool->size; index++) {
>> +            test = true;
>> +            if (atomic_compare_exchange_weak(
>> +                    &pool->controls[index].finished,
>> +                    &test,
>> +                    false)) {
>> +                completed++;
>> +                pool->controls[index].data = NULL;
>> +            }
>> +        }
>> +    } while (completed < pool->size);
>> +}
>> +
>> +/* Brute force merge of a hashmap into another hashmap.
>> + * Intended for use in parallel processing. The destination
>> + * hashmap MUST be the same size as the one being merged.
>> + *
>> + * This can be achieved by pre-allocating them to correct size
>> + * and using hmap_insert_fast() instead of hmap_insert()
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>> +{
>> +    size_t i;
>> +
>> +    ovs_assert(inc->mask == dest->mask);
>> +
>> +    if (!inc->n) {
>> +        /* Request to merge an empty frag, nothing to do */
>> +        return;
>> +    }
>> +
>> +    for (i = 0; i <= dest->mask; i++) {
>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>> +        if (*inc_bucket != NULL) {
>> +            struct hmap_node *last_node = *inc_bucket;
>> +            while (last_node->next != NULL) {
>> +                last_node = last_node->next;
>> +            }
>> +            last_node->next = *dest_bucket;
>> +            *dest_bucket = *inc_bucket;
>> +            *inc_bucket = NULL;
>> +        }
>> +    }
>> +    dest->n += inc->n;
>> +    inc->n = 0;
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array
>> + * of hashes. Merge results.
>> + */
>> +
>> +void ovn_run_pool_hash(
>> +        struct worker_pool *pool,
>> +        struct hmap *result,
>> +        struct hmap *result_frags)
>> +{
>> +    int index, completed;
>> +
>> +    atomic_thread_fence(memory_order_release);
>> +
>> +    for (index = 0; index < pool->size; index++) {
>> +        sem_post(&pool->controls[index].fire);
>> +    }
>> +
>> +    completed = 0;
>> +
>> +    do {
>> +        bool test;
>> +        sem_wait(&pool->done);
>> +        for (index = 0; index < pool->size; index++) {
>> +            test = true;
>> +            if (atomic_compare_exchange_weak(
>> +                    &pool->controls[index].finished,
>> +                    &test,
>> +                    false)) {
>> +                fast_hmap_merge(result, &result_frags[index]);
>> +                hmap_destroy(&result_frags[index]);
>> +                completed++;
>> +                pool->controls[index].data = NULL;
>> +            }
>> +        }
>> +    } while (completed < pool->size);
>> +}
>> +
>> +void ovn_merge_lists(struct ovs_list **dest, struct ovs_list *inc)
>> +{
>> +    struct ovs_list *last, *first;
>> +    if (inc == NULL) {
>> +        return;
>> +    }
>> +
>> +    if (* dest == NULL) {
>> +        * dest = inc;
>> +        return;
>> +    }
>> +
>> +    if (ovs_list_is_empty(inc)) {
>> +        return;
>> +    }
>> +
>> +    if (ovs_list_is_empty(*dest)) {
>> +        * dest = inc;
>> +        return;
>> +    }
>> +
>> +
>> +    last = inc->prev;
>> +    /* first element is not the list pointer itself, it is the ->next */
>> +    first = inc->next;
>> +
>> +    (*dest)->prev->next = first;
>> +    first->prev = (*dest)->prev;
>> +
>> +    (*dest)->prev = last;
>> +    last->next = *dest;
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array
>> + * of lists. Merge results
>> + */
>> +
>> +void ovn_run_pool_list(
>> +        struct worker_pool *pool,
>> +        struct ovs_list **result,
>> +        struct ovs_list **result_frags)
>> +{
>> +    int index, completed;
>> +
>> +    atomic_thread_fence(memory_order_release);
>> +
>> +
>> +    for (index = 0; index < pool->size; index++) {
>> +        sem_post(&pool->controls[index].fire);
>> +    }
>> +
>> +    completed = 0;
>> +
>> +
>> +    do {
>> +        bool test;
>> +        sem_wait(&pool->done);
>> +        for (index = 0; index < pool->size; index++) {
>> +            test = true;
>> +            if (atomic_compare_exchange_weak(
>> +                    &pool->controls[index].finished,
>> +                    &test,
>> +                    false)) {
>> +                merge_lists(result, result_frags[index]);
>> +                completed++;
>> +                pool->controls[index].data = NULL;
>> +            }
>> +        }
>> +    } while (completed < pool->size);
>> +}
>> +
>> +/* Run a thread pool which uses a callback function to process results
>> + */
>> +
>> +void ovn_run_pool_callback(
>> +        struct worker_pool *pool,
>> +        void *fin_result,
>> +        void (*helper_func)(
>> +            struct worker_pool *pool, void *fin_result, int index))
>> +{
>> +    int index, completed;
>> +
>> +    atomic_thread_fence(memory_order_release);
>> +
>> +    for (index = 0; index < pool->size; index++) {
>> +        sem_post(&pool->controls[index].fire);
>> +    }
>> +
>> +    completed = 0;
>> +
>> +    do {
>> +        bool test;
>> +        sem_wait(&pool->done);
>> +        for (index = 0; index < pool->size; index++) {
>> +            test = true;
>> +            if (atomic_compare_exchange_weak(
>> +                    &pool->controls[index].finished,
>> +                    &test,
>> +                    false)) {
>> +                (helper_func)(pool, fin_result, index);
>> +                completed++;
>> +                pool->controls[index].data = NULL;
>> +            }
>> +        }
>> +    } while (completed < pool->size);
>> +}
> 
> 2 notes:
> * ovn_run_pool_list and ovn_run_pool_callback are never actually used in 
> this patch series.

Callback is needed later for doing the SB LFLOW iteration over the 
database - that ends up as two lists - one for lflows to be deleted, one 
for sbflows to be deleted.

list is unused in ovn for now - it was used in the parallel 
notifications in ovsdb patch.

> * There's a lot of code repeated between ovn_run_pool_hash, 
> ovn_run_pool_list, and ovn_run_pool_callback.

Yes, the sole difference is the "completion" method.

> 
> I think it's safe to say that ovn_run_pool_list could be removed unless 
> there's a specific improvement you have in mind that would use it.

List - yes. There is no requirement in the OVN code unless I figure out 
to parallelize the initial datapath walk. I have not figured out that 
one yet. It is tough.

If I figure it out it will be a list though - the products there are 
lists - north only, south only and both.

> 
> If there's no immediate need to keep ovn_run_pool_list around, then I 
> think you should also remove ovn_run_pool_callback as well.
> 
> If, however, ovn_run_pool_list should stick around, then I think both 
> ovn_run_pool_list and ovn_run_pool_hash should be implemented using 
> ovn_run_pool_callback. This way, a lot of the repeated code is eliminated.
> 
>> diff --git a/lib/fasthmap.h b/lib/fasthmap.h
>> new file mode 100644
>> index 000000000..ccf005475
>> --- /dev/null
>> +++ b/lib/fasthmap.h
>> @@ -0,0 +1,217 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#ifndef HMAP_HAS_PARALLEL_MACROS
>> +#define HMAP_HAS_PARALLEL_MACROS 1
>> +
>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>> + * we skip over the ovn specific definitions.
>> + */
>> +
>> +#ifdef  __cplusplus
>> +extern "C" {
>> +#endif
>> +
>> +#include <stdbool.h>
>> +#include <stdlib.h>
>> +#include <semaphore.h>
>> +#include "openvswitch/util.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovs-atomic.h"
>> +
>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>> + * of parallel processing.
>> + * Each worker thread has a different ThreadID in the range of 
>> 0..POOL_SIZE
>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>> + * ThreadID + step * 2, etc. The actual macro accepts
>> + * ThreadID + step * i as the JOBID parameter.
>> + */
>> +
>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), 
>> MEMBER); \
>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>> +       || ((NODE = NULL), false); \
>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), 
>> MEMBER))
>> +
>> +/* We do not have a SAFE version of the macro, because the hash size 
>> is not
>> + * atomic and hash removal operations would need to be wrapped with
>> + * locks. This will defeat most of the benefits from doing anything in
>> + * parallel.
>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove 
>> elements,
>> + * each thread should store them in a temporary list result instead, 
>> merging
>> + * the lists into a combined result at the end */
>> +
>> +/* Work "Handle" */
>> +
>> +struct worker_control {
>> +    int id; /* Used as a modulo when iterating over a hash. */
>> +    atomic_bool finished; /* Set to true after achunk of work is 
>> complete. */
>> +    sem_t fire; /* Work start semaphore - sem_post starts the worker. */
>> +    sem_t *done; /* Work completion semaphore - sem_post on 
>> completion. */
>> +    struct ovs_mutex mutex; /* Guards the data. */
> 
> This mutex is never used in this series. Can it be removed?

I think so. The data is never changed in-flight while one of the sides 
is active. It was put with that in mind, but it ended up being unused. 
Well spotted.

> 
>> +    void *data; /* Pointer to data to be processed. */
>> +    void *workload; /* back-pointer to the worker pool structure. */
>> +};
>> +
>> +struct worker_pool {
>> +    int size;   /* Number of threads in the pool. */
>> +    struct ovs_list list_node; /* List of pools - used in 
>> cleanup/exit. */
>> +    struct worker_control *controls; /* "Handles" in this pool. */
>> +    sem_t done; /* Work completion semaphorew. */
>> +};
>> +
>> +/* Add a worker pool for thread function start() which expects a 
>> pointer to
>> + * a worker_control structure as an argument. */
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>> +
>> +/* Setting this to true will make all processing threads exit */
>> +
>> +bool ovn_seize_fire(void);
> 
> Was this intended to be ovn_cease_fire()? Or is there something that's 
> being seized that I'm not understanding here?

yes :) Brain not in gear. Gearbox seized instead of ceased.


> 
>> +
>> +/* Build a hmap pre-sized for size elements */
>> +
>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
> 
> This should probably take a size_t instead of an int.

Yes, fair point.

> 
>> +
>> +/* Build a hmap with a mask equals to size */
>> +
>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
> 
> This should probably take a size_t instead of an ssize_t

Yes.

> 
>> +
>> +/* Brute-force merge a hmap into hmap.
>> + * Dest and inc have to have the same mask. The merge is performed
>> + * by extending the element list for bucket N in the dest hmap with 
>> the list
>> + * from bucket N in inc.
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>> +
>> +/* Merge two lists.
>> + * It is possible to achieve the same functionality using 
>> ovs_list_splice().
>> + * This ensures the splicing is exactly for tail of dest to head of inc.
>> + */
>> +
>> +void ovn_merge_lists(struct ovs_list **dest, struct ovs_list *inc);
> 
> Hm, I think ovs_list_push_back_all() achieves the same result.


I noted this potential use much later, after I already implemented it. I 
have not had the time to check if the results are absolutely identical.

I can check it and reuse the existing code.

> 
>> +
>> +/* Run a pool, without any default processing of results.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool);
>> +
>> +/* Run a pool, merge results from hash frags into a final hash result.
>> + * The hash frags must be pre-sized to the same size.
>> + */
>> +
>> +void ovn_run_pool_hash(struct worker_pool *pool,
>> +                    struct hmap *result, struct hmap *result_frags);
>> +
>> +/* Run a pool, merge results from lists into a final combined list.
>> + * The hash frags must be pre-sized to the same size.
>> + */
>> +
>> +void ovn_run_pool_list(struct worker_pool *pool,
>> +                    struct ovs_list **result, struct ovs_list 
>> **result_frags);
>> +
>> +/* Run a pool, call a callback function to perform processing of 
>> results.
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>> +                    void (*helper_func)(struct worker_pool *pool,
>> +                                    void *fin_result, int index));
>> +
>> +
>> +/* Returns the first node in 'hmap' in the bucket in which the given 
>> 'hash'
>> + * would land, or a null pointer if that bucket is empty. */
>> +
>> +static inline struct hmap_node *
>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>> +{
>> +    return hmap->buckets[num];
>> +}
>> +
>> +static inline struct hmap_node *
>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t 
>> pool_size)
>> +{
>> +    size_t i;
>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>> +        struct hmap_node *node = hmap->buckets[i];
>> +        if (node) {
>> +            return node;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>> + * for parallel processing in arbitrary order, or a null pointer if
>> + * the slice of 'hmap' for that job_id is empty. */
>> +static inline struct hmap_node *
>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t 
>> pool_size)
>> +{
>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>> +}
>> +
>> +/* Returns the next node in the slice of 'hmap' following 'node',
>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
>> + * the 'hmap' slice.
>> + *
>> + */
>> +static inline struct hmap_node *
>> +parallel_hmap_next(const struct hmap *hmap,
>> +                        const struct hmap_node *node, ssize_t pool_size)
>> +{
>> +    return (node->next
>> +            ? node->next
>> +            : parallel_hmap_next__(hmap,
>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>> +}
>> +
>> +/* Use the OVN library functions for stuff which OVS has not defined
>> + * If OVS has defined these, they will still compile using the OVN
>> + * local names, but will be dropped by the linker in favour of the OVS
>> + * supplied functions.
>> + */
>> +
>> +#define seize_fire() ovn_seize_fire()
>> +
>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>> +
>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, 
>> size)
>> +
>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>> +
>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>> +
>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>> +
>> +#define merge_lists(dest, inc) ovn_merge_lists(dest, inc)
>> +
>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
> 
> Was the macro here supposed to be named run_pool instead of ovn_run_pool?

The macros I want to add to OVS are called run_pool.

This is a way of having "local" ovn copies for the time being until the 
two codebases catch up. Once (and if) the the two codebases catch up, 
the linker will throw away the ones which are ovn_ prefixed when linking 
as they are not called anywhere.

> 
>> +
>> +#define run_pool_hash(pool, result, result_frags) \
>> +    ovn_run_pool_hash(pool, result, result_frags)
>> +
>> +#define run_pool_list(pool, result, result_frags) \
>> +    ovn_run_pool_list(pool, result, result_frags)
>> +
>> +#define run_pool_callback(pool, fin_result, helper_func) \
>> +    ovn_run_pool_callback(pool, fin_result, helper_func)
>> +
>> +#ifdef  __cplusplus
>> +}
>> +#endif
>> +
>> +#endif /* lib/fast-hmap.h */
>>
> 
> 
Brgds,
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index f3e9c8818..c9bf7e021 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -13,6 +13,8 @@  lib_libovn_la_SOURCES = \
 	lib/expr.c \
 	lib/extend-table.h \
 	lib/extend-table.c \
+    lib/fasthmap.h \
+    lib/fasthmap.c \
 	lib/ip-mcast-index.c \
 	lib/ip-mcast-index.h \
 	lib/mcast-group-index.c \
diff --git a/lib/fasthmap.c b/lib/fasthmap.c
new file mode 100644
index 000000000..38b06d74a
--- /dev/null
+++ b/lib/fasthmap.c
@@ -0,0 +1,373 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+#include <string.h>
+#include <semaphore.h>
+#include "fatal-signal.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "fasthmap.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "ovs-numa.h"
+
+VLOG_DEFINE_THIS_MODULE(fasthmap);
+
+
+static bool worker_pool_setup = false;
+static bool workers_must_exit = false;
+
+static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static int pool_size;
+
+static void worker_pool_hook(void *aux OVS_UNUSED) {
+    int i;
+    static struct worker_pool *pool;
+    workers_must_exit = true; /* all workers must honour this flag */
+    atomic_thread_fence(memory_order_release);
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        for (i = 0; i < pool->size ; i++) {
+            sem_post(&pool->controls[i].fire);
+        }
+    }
+}
+
+static void setup_worker_pools(void) {
+    int cores, nodes;
+
+    nodes = ovs_numa_get_n_numas();
+    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+        nodes = 1;
+    }
+    cores = ovs_numa_get_n_cores();
+
+    /* If there is no NUMA config, use 4 cores.
+     * If there is NUMA config use half the cores on
+     * one node so that the OS does not start pushing
+     * threads to other nodes.
+     */
+    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+        pool_size = 4;
+    } else {
+        pool_size = cores / nodes / 2;
+    }
+    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
+    worker_pool_setup = true;
+}
+
+bool ovn_seize_fire(void)
+{
+    return workers_must_exit;
+}
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
+
+    struct worker_pool *new_pool = NULL;
+    struct worker_control *new_control;
+    int i;
+
+    ovs_mutex_lock(&init_mutex);
+
+    if (!worker_pool_setup) {
+         setup_worker_pools();
+    }
+
+    new_pool = xmalloc(sizeof(struct worker_pool));
+    new_pool->size = pool_size;
+    sem_init(&new_pool->done, 0, 0);
+
+    ovs_list_push_back(&worker_pools, &new_pool->list_node);
+
+    new_pool->controls =
+        xmalloc(sizeof(struct worker_control) * new_pool->size);
+
+    for (i = 0; i < new_pool->size; i++) {
+        new_control = &new_pool->controls[i];
+        sem_init(&new_control->fire, 0, 0);
+        new_control->id = i;
+        new_control->done = &new_pool->done;
+        new_control->data = NULL;
+        ovs_mutex_init(&new_control->mutex);
+        new_control->finished = ATOMIC_VAR_INIT(false);
+    }
+
+    for (i = 0; i < pool_size; i++) {
+        ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return new_pool;
+}
+
+
+/* Initializes 'hmap' as an empty hash table with mask N. */
+void
+ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
+{
+    size_t i;
+
+    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
+    hmap->one = NULL;
+    hmap->mask = mask;
+    hmap->n = 0;
+    for (i = 0; i <= hmap->mask; i++) {
+        hmap->buckets[i] = NULL;
+    }
+}
+
+/* Initializes 'hmap' as an empty hash table of size X.
+ * Intended for use in parallel processing so that all
+ * fragments used to store results in a parallel job
+ * are the same size.
+ */
+void
+ovn_fast_hmap_size_for(struct hmap *hmap, int size)
+{
+    size_t mask;
+    mask = size / 2;
+    mask |= mask >> 1;
+    mask |= mask >> 2;
+    mask |= mask >> 4;
+    mask |= mask >> 8;
+    mask |= mask >> 16;
+#if SIZE_MAX > UINT32_MAX
+    mask |= mask >> 32;
+#endif
+
+    /* If we need to dynamically allocate buckets we might as well allocate at
+     * least 4 of them. */
+    mask |= (mask & 1) << 1;
+
+    fast_hmap_init(hmap, mask);
+}
+
+/* Run a thread pool - basic, does not do results processing.
+ */
+
+void ovn_run_pool(struct worker_pool *pool)
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Brute force merge of a hashmap into another hashmap.
+ * Intended for use in parallel processing. The destination
+ * hashmap MUST be the same size as the one being merged.
+ *
+ * This can be achieved by pre-allocating them to correct size
+ * and using hmap_insert_fast() instead of hmap_insert()
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
+{
+    size_t i;
+
+    ovs_assert(inc->mask == dest->mask);
+
+    if (!inc->n) {
+        /* Request to merge an empty frag, nothing to do */
+        return;
+    }
+
+    for (i = 0; i <= dest->mask; i++) {
+        struct hmap_node **dest_bucket = &dest->buckets[i];
+        struct hmap_node **inc_bucket = &inc->buckets[i];
+        if (*inc_bucket != NULL) {
+            struct hmap_node *last_node = *inc_bucket;
+            while (last_node->next != NULL) {
+                last_node = last_node->next;
+            }
+            last_node->next = *dest_bucket;
+            *dest_bucket = *inc_bucket;
+            *inc_bucket = NULL;
+        }
+    }
+    dest->n += inc->n;
+    inc->n = 0;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+
+void ovn_run_pool_hash(
+        struct worker_pool *pool,
+        struct hmap *result,
+        struct hmap *result_frags)
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                fast_hmap_merge(result, &result_frags[index]);
+                hmap_destroy(&result_frags[index]);
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+void ovn_merge_lists(struct ovs_list **dest, struct ovs_list *inc)
+{
+    struct ovs_list *last, *first;
+    if (inc == NULL) {
+        return;
+    }
+
+    if (* dest == NULL) {
+        * dest = inc;
+        return;
+    }
+
+    if (ovs_list_is_empty(inc)) {
+        return;
+    }
+
+    if (ovs_list_is_empty(*dest)) {
+        * dest = inc;
+        return;
+    }
+
+
+    last = inc->prev;
+    /* first element is not the list pointer itself, it is the ->next */
+    first = inc->next;
+
+    (*dest)->prev->next = first;
+    first->prev = (*dest)->prev;
+
+    (*dest)->prev = last;
+    last->next = *dest;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of lists. Merge results
+ */
+
+void ovn_run_pool_list(
+        struct worker_pool *pool,
+        struct ovs_list **result,
+        struct ovs_list **result_frags)
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                merge_lists(result, result_frags[index]);
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Run a thread pool which uses a callback function to process results
+ */
+
+void ovn_run_pool_callback(
+        struct worker_pool *pool,
+        void *fin_result,
+        void (*helper_func)(
+            struct worker_pool *pool, void *fin_result, int index))
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                (helper_func)(pool, fin_result, index);
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
diff --git a/lib/fasthmap.h b/lib/fasthmap.h
new file mode 100644
index 000000000..ccf005475
--- /dev/null
+++ b/lib/fasthmap.h
@@ -0,0 +1,217 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HMAP_HAS_PARALLEL_MACROS
+#define HMAP_HAS_PARALLEL_MACROS 1
+
+/* if the parallel macros are defined by hmap.h or any other ovs define
+ * we skip over the ovn specific definitions.
+ */
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <semaphore.h>
+#include "openvswitch/util.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovs-atomic.h"
+
+/* A version of the HMAP_FOR_EACH macro intended for iterating as part
+ * of parallel processing.
+ * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
+ * and will iterate hash buckets ThreadID, ThreadID + step,
+ * ThreadID + step * 2, etc. The actual macro accepts
+ * ThreadID + step * i as the JOBID parameter.
+ */
+
+#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
+   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
+        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
+       || ((NODE = NULL), false); \
+       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
+
+/* We do not have a SAFE version of the macro, because the hash size is not
+ * atomic and hash removal operations would need to be wrapped with
+ * locks. This will defeat most of the benefits from doing anything in
+ * parallel.
+ * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
+ * each thread should store them in a temporary list result instead, merging
+ * the lists into a combined result at the end */
+
+/* Work "Handle" */
+
+struct worker_control {
+    int id; /* Used as a modulo when iterating over a hash. */
+    atomic_bool finished; /* Set to true after achunk of work is complete. */
+    sem_t fire; /* Work start semaphore - sem_post starts the worker. */
+    sem_t *done; /* Work completion semaphore - sem_post on completion. */
+    struct ovs_mutex mutex; /* Guards the data. */
+    void *data; /* Pointer to data to be processed. */
+    void *workload; /* back-pointer to the worker pool structure. */
+};
+
+struct worker_pool {
+    int size;   /* Number of threads in the pool. */
+    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
+    struct worker_control *controls; /* "Handles" in this pool. */
+    sem_t done; /* Work completion semaphorew. */
+};
+
+/* Add a worker pool for thread function start() which expects a pointer to
+ * a worker_control structure as an argument. */
+
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+
+/* Setting this to true will make all processing threads exit */
+
+bool ovn_seize_fire(void);
+
+/* Build a hmap pre-sized for size elements */
+
+void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
+
+/* Build a hmap with a mask equals to size */
+
+void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
+
+/* Brute-force merge a hmap into hmap.
+ * Dest and inc have to have the same mask. The merge is performed
+ * by extending the element list for bucket N in the dest hmap with the list
+ * from bucket N in inc.
+ */
+
+void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
+
+/* Merge two lists.
+ * It is possible to achieve the same functionality using ovs_list_splice().
+ * This ensures the splicing is exactly for tail of dest to head of inc.
+ */
+
+void ovn_merge_lists(struct ovs_list **dest, struct ovs_list *inc);
+
+/* Run a pool, without any default processing of results.
+ */
+
+void ovn_run_pool(struct worker_pool *pool);
+
+/* Run a pool, merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_run_pool_hash(struct worker_pool *pool,
+                    struct hmap *result, struct hmap *result_frags);
+
+/* Run a pool, merge results from lists into a final combined list.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_run_pool_list(struct worker_pool *pool,
+                    struct ovs_list **result, struct ovs_list **result_frags);
+
+/* Run a pool, call a callback function to perform processing of results.
+ */
+
+void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
+                    void (*helper_func)(struct worker_pool *pool,
+                                    void *fin_result, int index));
+
+
+/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
+ * would land, or a null pointer if that bucket is empty. */
+
+static inline struct hmap_node *
+hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
+{
+    return hmap->buckets[num];
+}
+
+static inline struct hmap_node *
+parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
+{
+    size_t i;
+    for (i = start; i <= hmap->mask; i+= pool_size) {
+        struct hmap_node *node = hmap->buckets[i];
+        if (node) {
+            return node;
+        }
+    }
+    return NULL;
+}
+
+/* Returns the first node in 'hmap', as expected by thread with job_id
+ * for parallel processing in arbitrary order, or a null pointer if
+ * the slice of 'hmap' for that job_id is empty. */
+static inline struct hmap_node *
+parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
+{
+    return parallel_hmap_next__(hmap, job_id, pool_size);
+}
+
+/* Returns the next node in the slice of 'hmap' following 'node',
+ * in arbitrary order, or a * null pointer if 'node' is the last node in
+ * the 'hmap' slice.
+ *
+ */
+static inline struct hmap_node *
+parallel_hmap_next(const struct hmap *hmap,
+                        const struct hmap_node *node, ssize_t pool_size)
+{
+    return (node->next
+            ? node->next
+            : parallel_hmap_next__(hmap,
+                (node->hash & hmap->mask) + pool_size, pool_size));
+}
+
+/* Use the OVN library functions for stuff which OVS has not defined
+ * If OVS has defined these, they will still compile using the OVN
+ * local names, but will be dropped by the linker in favour of the OVS
+ * supplied functions.
+ */
+
+#define seize_fire() ovn_seize_fire()
+
+#define add_worker_pool(start) ovn_add_worker_pool(start)
+
+#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
+
+#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
+
+#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
+
+#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
+
+#define merge_lists(dest, inc) ovn_merge_lists(dest, inc)
+
+#define ovn_run_pool(pool) ovn_run_pool(pool)
+
+#define run_pool_hash(pool, result, result_frags) \
+    ovn_run_pool_hash(pool, result, result_frags)
+
+#define run_pool_list(pool, result, result_frags) \
+    ovn_run_pool_list(pool, result, result_frags)
+
+#define run_pool_callback(pool, fin_result, helper_func) \
+    ovn_run_pool_callback(pool, fin_result, helper_func)
+
+#ifdef  __cplusplus
+}
+#endif
+
+#endif /* lib/fast-hmap.h */