diff mbox series

[ovs-dev,1/2] Add support for parallel processing of hashes

Message ID 20200706083650.29443-2-anton.ivanov@cambridgegreys.com
State New
Headers show
Series [ovs-dev,1/2] Add support for parallel processing of hashes | expand

Commit Message

Anton Ivanov July 6, 2020, 8:36 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Adds functionality needed to walk a hash in parallel where thread ID
N out of a pool sized M is responcible for processing all elements
in buckets N, N+M, N+M*2, etc

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/automake.mk |   2 +
 lib/fasthmap.c  | 367 ++++++++++++++++++++++++++++++++++++++++++++++++
 lib/fasthmap.h  | 146 +++++++++++++++++++
 3 files changed, 515 insertions(+)
 create mode 100644 lib/fasthmap.c
 create mode 100644 lib/fasthmap.h

Comments

0-day Robot July 6, 2020, 9:15 a.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
ERROR: Improper whitespace around control block
#443 FILE: lib/fasthmap.h:33:
#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \

ERROR: Improper whitespace around control block
#451 FILE: lib/fasthmap.h:41:
#define HMAP_FOR_EACH_IN_PARALLEL_SAFE(NODE, NEXT, MEMBER, JOBID, HMAP) \

ERROR: Improper whitespace around control block
#452 FILE: lib/fasthmap.h:42:
    HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, MEMBER, JOBID, HMAP, (void) 0)

ERROR: Improper whitespace around control block
#454 FILE: lib/fasthmap.h:44:
#define HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, \

Lines checked: 559, Warnings: 0, Errors: 4


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 86940ccd2..dc1f8c29e 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -95,6 +95,8 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/dynamic-string.c \
 	lib/entropy.c \
 	lib/entropy.h \
+	lib/fasthmap.h \
+	lib/fasthmap.c \
 	lib/fat-rwlock.c \
 	lib/fat-rwlock.h \
 	lib/fatal-signal.c \
diff --git a/lib/fasthmap.c b/lib/fasthmap.c
new file mode 100644
index 000000000..23f6e3cb3
--- /dev/null
+++ b/lib/fasthmap.c
@@ -0,0 +1,367 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+#include <string.h>
+#include <semaphore.h>
+#include "fatal-signal.h"
+#include "util.h"
+#include "openvswitch/vlog.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "fasthmap.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "ovs-numa.h"
+
+VLOG_DEFINE_THIS_MODULE(fasthmap);
+
+
+static bool worker_pool_setup = false;
+static bool workers_must_exit = false;
+
+static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static int pool_size;
+
+static void worker_pool_hook(void *aux OVS_UNUSED) {
+    int i;
+    static struct worker_pool *pool;
+    workers_must_exit = true; /* all workers must honour this flag */
+    atomic_thread_fence(memory_order_release);
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        for (i = 0; i < pool->size ; i++) {
+            sem_post(&pool->controls[i].fire);
+        }
+    }
+}
+
+static void setup_worker_pools(void) {
+    int cores, nodes;
+
+    nodes = ovs_numa_get_n_numas();
+    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+        nodes = 1;
+    }
+    cores = ovs_numa_get_n_cores();
+    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+        pool_size = 4;
+    } else {
+        pool_size = cores / nodes;
+    }
+    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
+    worker_pool_setup = true;
+}
+
+bool seize_fire(void)
+{
+    return workers_must_exit;
+}
+
+struct worker_pool *add_worker_pool(void *(*start)(void *)){
+
+    struct worker_pool *new_pool = NULL;
+    struct worker_control *new_control;
+    int i;
+
+    ovs_mutex_lock(&init_mutex);
+
+    if (!worker_pool_setup) {
+         setup_worker_pools();
+    }
+
+    new_pool = xmalloc(sizeof(struct worker_pool));
+    new_pool->size = pool_size;
+    sem_init(&new_pool->done, 0, 0);
+
+    ovs_list_push_back(&worker_pools, &new_pool->list_node);
+
+    new_pool->controls =
+        xmalloc(sizeof(struct worker_control) * new_pool->size);
+
+    for (i = 0; i < new_pool->size; i++) {
+        new_control = &new_pool->controls[i];
+        sem_init(&new_control->fire, 0, 0);
+        new_control->id = i;
+        new_control->done = &new_pool->done;
+        new_control->data = NULL;
+        ovs_mutex_init(&new_control->mutex);
+        new_control->finished = ATOMIC_VAR_INIT(false);
+    }
+
+    for (i = 0; i < pool_size; i++) {
+        ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return new_pool;
+}
+
+
+/* Initializes 'hmap' as an empty hash table with mask N. */
+void
+fast_hmap_init(struct hmap *hmap, ssize_t mask)
+{
+    size_t i;
+
+    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
+    hmap->one = NULL;
+    hmap->mask = mask;
+    hmap->n = 0;
+    for (i = 0; i <= hmap->mask; i++) {
+        hmap->buckets[i] = NULL;
+    }
+}
+
+/* Initializes 'hmap' as an empty hash table of size X.
+ * Intended for use in parallel processing so that all
+ * fragments used to store results in a parallel job
+ * are the same size.
+ */
+void
+fast_hmap_size_for(struct hmap *hmap, int size)
+{
+    size_t mask;
+    mask = size / 2;
+    mask |= mask >> 1;
+    mask |= mask >> 2;
+    mask |= mask >> 4;
+    mask |= mask >> 8;
+    mask |= mask >> 16;
+#if SIZE_MAX > UINT32_MAX
+    mask |= mask >> 32;
+#endif
+
+    /* If we need to dynamically allocate buckets we might as well allocate at
+     * least 4 of them. */
+    mask |= (mask & 1) << 1;
+
+    fast_hmap_init(hmap, mask);
+}
+
+/* Run a thread pool - basic, does not do results processing.
+ */
+
+void run_pool(struct worker_pool *pool)
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Brute force merge of a hashmap into another hashmap.
+ * Intended for use in parallel processing. The destination
+ * hashmap MUST be the same size as the one being merged.
+ *
+ * This can be achieved by pre-allocating them to correct size
+ * and using hmap_insert_fast() instead of hmap_insert()
+ */
+
+void fast_hmap_merge(struct hmap *dest, struct hmap *inc)
+{
+    size_t i;
+
+    ovs_assert(inc->mask == dest->mask);
+
+    if (!inc->n) {
+        /* Request to merge an empty frag, nothing to do */
+        return;
+    }
+
+    for (i = 0; i <= dest->mask; i++) {
+        struct hmap_node **dest_bucket = &dest->buckets[i];
+        struct hmap_node **inc_bucket = &inc->buckets[i];
+        if (*inc_bucket != NULL) {
+            struct hmap_node *last_node = *inc_bucket;
+            while (last_node->next != NULL) {
+                last_node = last_node->next;
+            }
+            last_node->next = *dest_bucket;
+            *dest_bucket = *inc_bucket;
+            *inc_bucket = NULL;
+        }
+    }
+    dest->n += inc->n;
+    inc->n = 0;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+
+void run_pool_hash(
+        struct worker_pool *pool,
+        struct hmap *result,
+        struct hmap *result_frags)
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                fast_hmap_merge(result, &result_frags[index]);
+                hmap_destroy(&result_frags[index]);
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+void merge_lists(struct ovs_list **dest, struct ovs_list *inc)
+{
+    struct ovs_list *last, *first;
+    if (inc == NULL) {
+        return;
+    }
+
+    if (* dest == NULL) {
+        * dest = inc;
+        return;
+    }
+
+    if (ovs_list_is_empty(inc)) {
+        return;
+    }
+
+    if (ovs_list_is_empty(*dest)) {
+        * dest = inc;
+        return;
+    }
+
+
+    last = inc->prev;
+    /* first element is not the list pointer itself, it is the ->next */
+    first = inc->next;
+
+    (*dest)->prev->next = first;
+    first->prev = (*dest)->prev;
+
+    (*dest)->prev = last;
+    last->next = *dest;
+}
+
+/* Run a thread pool which gathers results in an array
+ * of lists. Merge results
+ */
+
+void run_pool_list(
+        struct worker_pool *pool,
+        struct ovs_list **result,
+        struct ovs_list **result_frags)
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                merge_lists(result, result_frags[index]);
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
+
+/* Run a thread pool which uses a callback function to process results
+ */
+
+void run_pool_callback(
+        struct worker_pool *pool,
+        void *fin_result,
+        void (*helper_func)(
+            struct worker_pool *pool, void *fin_result, int index))
+{
+    int index, completed;
+
+    atomic_thread_fence(memory_order_release);
+
+    for (index = 0; index < pool->size; index++) {
+        sem_post(&pool->controls[index].fire);
+    }
+
+    completed = 0;
+
+    do {
+        bool test;
+        sem_wait(&pool->done);
+        for (index = 0; index < pool->size; index++) {
+            test = true;
+            if (atomic_compare_exchange_weak(
+                    &pool->controls[index].finished,
+                    &test,
+                    false)) {
+                (helper_func)(pool, fin_result, index);
+                completed++;
+                pool->controls[index].data = NULL;
+            }
+        }
+    } while (completed < pool->size);
+}
diff --git a/lib/fasthmap.h b/lib/fasthmap.h
new file mode 100644
index 000000000..947900801
--- /dev/null
+++ b/lib/fasthmap.h
@@ -0,0 +1,146 @@ 
+/*
+ * Copyright (c) 2020 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FAST_HMAP_H
+#define FAST_HMAP_H 1
+
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <semaphore.h>
+#include "openvswitch/util.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/thread.h"
+#include "ovs-atomic.h"
+
+#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
+   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
+        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
+       || ((NODE = NULL), false); \
+       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
+
+/* Safe when NODE may be freed (not needed when NODE may be removed from the
+ * hash map but its members remain accessible and intact). */
+#define HMAP_FOR_EACH_IN_PARALLEL_SAFE(NODE, NEXT, MEMBER, JOBID, HMAP) \
+    HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, MEMBER, JOBID, HMAP, (void) 0)
+
+#define HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, \
+    MEMBER, JOBID, HMAP, ...)\
+    for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER), \
+           __VA_ARGS__;   \
+         ((NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER))               \
+          || ((NODE = NULL), false)                                     \
+          ? INIT_CONTAINER(NEXT, hmap_next_in_bucket(&(NODE)->MEMBER),  \
+          MEMBER), 1 : 0);                                              \
+         (NODE) = (NEXT))
+
+struct worker_control {
+    int id;
+    int size;
+    atomic_bool finished;
+    sem_t fire;
+    sem_t *done;
+    struct ovs_mutex mutex;
+    void *data;
+    void *workload;
+};
+
+struct worker_pool {
+    int size;
+    struct ovs_list list_node;
+    struct worker_control *controls;
+    sem_t done;
+};
+
+struct worker_pool *add_worker_pool(void *(*start)(void *));
+
+bool seize_fire(void);
+void fast_hmap_size_for(struct hmap *hmap, int size);
+void fast_hmap_init(struct hmap *hmap, ssize_t size);
+void fast_hmap_merge(struct hmap *dest, struct hmap *inc);
+void hmap_merge(struct hmap *dest, struct hmap *inc);
+void merge_lists(struct ovs_list **dest, struct ovs_list *inc);
+
+void run_pool(
+    struct worker_pool *pool);
+void run_pool_hash(
+    struct worker_pool *pool, struct hmap *result, struct hmap *result_frags);
+void run_pool_list(
+    struct worker_pool *pool, struct ovs_list **result,
+    struct ovs_list **result_frags);
+void run_pool_callback(
+        struct worker_pool *pool,
+        void *fin_result,
+        void (*helper_func)(
+            struct worker_pool *pool, void *fin_result, int index));
+
+
+/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
+ * would land, or a null pointer if that bucket is empty. */
+static inline struct hmap_node *
+hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
+{
+    return hmap->buckets[num];
+}
+
+static inline struct hmap_node *
+parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
+{
+    size_t i;
+    for (i = start; i <= hmap->mask; i+= pool_size) {
+        struct hmap_node *node = hmap->buckets[i];
+        if (node) {
+            return node;
+        }
+    }
+    return NULL;
+}
+
+/* Returns the first node in 'hmap', as expected by thread with job_id
+ * for parallel processing in arbitrary order, or a null pointer if
+ * the slice of 'hmap' for that job_id is empty. */
+static inline struct hmap_node *
+parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
+{
+    return parallel_hmap_next__(hmap, job_id, pool_size);
+}
+
+/* Returns the next node in the slice of 'hmap' following 'node',
+ * in arbitrary order, or a * null pointer if 'node' is the last node in
+ * the 'hmap' slice.
+ *
+ */
+static inline struct hmap_node *
+parallel_hmap_next(
+        const struct hmap *hmap,
+        const struct hmap_node *node,
+        ssize_t pool_size)
+{
+    return (node->next
+            ? node->next
+            : parallel_hmap_next__(hmap,
+                (node->hash & hmap->mask) + pool_size, pool_size));
+}
+
+#ifdef  __cplusplus
+}
+#endif
+
+#endif /* lib/fast-hmap.h */