From patchwork Mon Jul 6 08:36:49 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1323446 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.138; helo=whitealder.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=cambridgegreys.com Received: from whitealder.osuosl.org (smtp1.osuosl.org [140.211.166.138]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4B0f7p6Kcgz9sQt for ; Mon, 6 Jul 2020 18:37:10 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by whitealder.osuosl.org (Postfix) with ESMTP id 270508838F; Mon, 6 Jul 2020 08:37:09 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from whitealder.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id m6CXZ0qDwGjt; Mon, 6 Jul 2020 08:37:04 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by whitealder.osuosl.org (Postfix) with ESMTP id A4D2F882F3; Mon, 6 Jul 2020 08:37:04 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 8AFD1C016F; Mon, 6 Jul 2020 08:37:04 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from hemlock.osuosl.org (smtp2.osuosl.org [140.211.166.133]) by lists.linuxfoundation.org (Postfix) with ESMTP id 5AEF3C016F for ; Mon, 6 Jul 2020 08:37:03 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by hemlock.osuosl.org (Postfix) with ESMTP id 50A80886D6 for ; Mon, 6 Jul 2020 08:37:03 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from hemlock.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 7WNnszGbBg8d for ; Mon, 6 Jul 2020 08:37:02 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.7.6 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by hemlock.osuosl.org (Postfix) with ESMTPS id F3E4788667 for ; Mon, 6 Jul 2020 08:37:01 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1jsMcW-0006AI-7T for dev@openvswitch.org; Mon, 06 Jul 2020 08:37:00 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1jsMcS-0007xN-DK; Mon, 06 Jul 2020 09:36:58 +0100 From: anton.ivanov@cambridgegreys.com To: dev@openvswitch.org Date: Mon, 6 Jul 2020 09:36:49 +0100 Message-Id: <20200706083650.29443-2-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200706083650.29443-1-anton.ivanov@cambridgegreys.com> References: <20200706083650.29443-1-anton.ivanov@cambridgegreys.com> MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [PATCH 1/2] Add support for parallel processing of hashes X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov Adds functionality needed to walk a hash in parallel where thread ID N out of a pool sized M is responcible for processing all elements in buckets N, N+M, N+M*2, etc Signed-off-by: Anton Ivanov --- lib/automake.mk | 2 + lib/fasthmap.c | 367 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/fasthmap.h | 146 +++++++++++++++++++ 3 files changed, 515 insertions(+) create mode 100644 lib/fasthmap.c create mode 100644 lib/fasthmap.h diff --git a/lib/automake.mk b/lib/automake.mk index 86940ccd2..dc1f8c29e 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -95,6 +95,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/dynamic-string.c \ lib/entropy.c \ lib/entropy.h \ + lib/fasthmap.h \ + lib/fasthmap.c \ lib/fat-rwlock.c \ lib/fat-rwlock.h \ lib/fatal-signal.c \ diff --git a/lib/fasthmap.c b/lib/fasthmap.c new file mode 100644 index 000000000..23f6e3cb3 --- /dev/null +++ b/lib/fasthmap.c @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "fatal-signal.h" +#include "util.h" +#include "openvswitch/vlog.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "fasthmap.h" +#include "ovs-atomic.h" +#include "ovs-thread.h" +#include "ovs-numa.h" + +VLOG_DEFINE_THIS_MODULE(fasthmap); + + +static bool worker_pool_setup = false; +static bool workers_must_exit = false; + +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); + +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; + +static int pool_size; + +static void worker_pool_hook(void *aux OVS_UNUSED) { + int i; + static struct worker_pool *pool; + workers_must_exit = true; /* all workers must honour this flag */ + atomic_thread_fence(memory_order_release); + LIST_FOR_EACH (pool, list_node, &worker_pools) { + for (i = 0; i < pool->size ; i++) { + sem_post(&pool->controls[i].fire); + } + } +} + +static void setup_worker_pools(void) { + int cores, nodes; + + nodes = ovs_numa_get_n_numas(); + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { + nodes = 1; + } + cores = ovs_numa_get_n_cores(); + if (cores == OVS_CORE_UNSPEC || cores <= 0) { + pool_size = 4; + } else { + pool_size = cores / nodes; + } + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); + worker_pool_setup = true; +} + +bool seize_fire(void) +{ + return workers_must_exit; +} + +struct worker_pool *add_worker_pool(void *(*start)(void *)){ + + struct worker_pool *new_pool = NULL; + struct worker_control *new_control; + int i; + + ovs_mutex_lock(&init_mutex); + + if (!worker_pool_setup) { + setup_worker_pools(); + } + + new_pool = xmalloc(sizeof(struct worker_pool)); + new_pool->size = pool_size; + sem_init(&new_pool->done, 0, 0); + + ovs_list_push_back(&worker_pools, &new_pool->list_node); + + new_pool->controls = + xmalloc(sizeof(struct worker_control) * new_pool->size); + + for (i = 0; i < new_pool->size; i++) { + new_control = &new_pool->controls[i]; + sem_init(&new_control->fire, 0, 0); + new_control->id = i; + new_control->done = &new_pool->done; + new_control->data = NULL; + ovs_mutex_init(&new_control->mutex); + new_control->finished = ATOMIC_VAR_INIT(false); + } + + for (i = 0; i < pool_size; i++) { + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); + } + ovs_mutex_unlock(&init_mutex); + return new_pool; +} + + +/* Initializes 'hmap' as an empty hash table with mask N. */ +void +fast_hmap_init(struct hmap *hmap, ssize_t mask) +{ + size_t i; + + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); + hmap->one = NULL; + hmap->mask = mask; + hmap->n = 0; + for (i = 0; i <= hmap->mask; i++) { + hmap->buckets[i] = NULL; + } +} + +/* Initializes 'hmap' as an empty hash table of size X. + * Intended for use in parallel processing so that all + * fragments used to store results in a parallel job + * are the same size. + */ +void +fast_hmap_size_for(struct hmap *hmap, int size) +{ + size_t mask; + mask = size / 2; + mask |= mask >> 1; + mask |= mask >> 2; + mask |= mask >> 4; + mask |= mask >> 8; + mask |= mask >> 16; +#if SIZE_MAX > UINT32_MAX + mask |= mask >> 32; +#endif + + /* If we need to dynamically allocate buckets we might as well allocate at + * least 4 of them. */ + mask |= (mask & 1) << 1; + + fast_hmap_init(hmap, mask); +} + +/* Run a thread pool - basic, does not do results processing. + */ + +void run_pool(struct worker_pool *pool) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +/* Brute force merge of a hashmap into another hashmap. + * Intended for use in parallel processing. The destination + * hashmap MUST be the same size as the one being merged. + * + * This can be achieved by pre-allocating them to correct size + * and using hmap_insert_fast() instead of hmap_insert() + */ + +void fast_hmap_merge(struct hmap *dest, struct hmap *inc) +{ + size_t i; + + ovs_assert(inc->mask == dest->mask); + + if (!inc->n) { + /* Request to merge an empty frag, nothing to do */ + return; + } + + for (i = 0; i <= dest->mask; i++) { + struct hmap_node **dest_bucket = &dest->buckets[i]; + struct hmap_node **inc_bucket = &inc->buckets[i]; + if (*inc_bucket != NULL) { + struct hmap_node *last_node = *inc_bucket; + while (last_node->next != NULL) { + last_node = last_node->next; + } + last_node->next = *dest_bucket; + *dest_bucket = *inc_bucket; + *inc_bucket = NULL; + } + } + dest->n += inc->n; + inc->n = 0; +} + +/* Run a thread pool which gathers results in an array + * of hashes. Merge results. + */ + +void run_pool_hash( + struct worker_pool *pool, + struct hmap *result, + struct hmap *result_frags) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + fast_hmap_merge(result, &result_frags[index]); + hmap_destroy(&result_frags[index]); + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +void merge_lists(struct ovs_list **dest, struct ovs_list *inc) +{ + struct ovs_list *last, *first; + if (inc == NULL) { + return; + } + + if (* dest == NULL) { + * dest = inc; + return; + } + + if (ovs_list_is_empty(inc)) { + return; + } + + if (ovs_list_is_empty(*dest)) { + * dest = inc; + return; + } + + + last = inc->prev; + /* first element is not the list pointer itself, it is the ->next */ + first = inc->next; + + (*dest)->prev->next = first; + first->prev = (*dest)->prev; + + (*dest)->prev = last; + last->next = *dest; +} + +/* Run a thread pool which gathers results in an array + * of lists. Merge results + */ + +void run_pool_list( + struct worker_pool *pool, + struct ovs_list **result, + struct ovs_list **result_frags) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + merge_lists(result, result_frags[index]); + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +/* Run a thread pool which uses a callback function to process results + */ + +void run_pool_callback( + struct worker_pool *pool, + void *fin_result, + void (*helper_func)( + struct worker_pool *pool, void *fin_result, int index)) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + (helper_func)(pool, fin_result, index); + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} diff --git a/lib/fasthmap.h b/lib/fasthmap.h new file mode 100644 index 000000000..947900801 --- /dev/null +++ b/lib/fasthmap.h @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FAST_HMAP_H +#define FAST_HMAP_H 1 + + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "openvswitch/util.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "ovs-atomic.h" + +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \ + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ + || ((NODE = NULL), false); \ + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) + +/* Safe when NODE may be freed (not needed when NODE may be removed from the + * hash map but its members remain accessible and intact). */ +#define HMAP_FOR_EACH_IN_PARALLEL_SAFE(NODE, NEXT, MEMBER, JOBID, HMAP) \ + HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, MEMBER, JOBID, HMAP, (void) 0) + +#define HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, \ + MEMBER, JOBID, HMAP, ...)\ + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER), \ + __VA_ARGS__; \ + ((NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ + || ((NODE = NULL), false) \ + ? INIT_CONTAINER(NEXT, hmap_next_in_bucket(&(NODE)->MEMBER), \ + MEMBER), 1 : 0); \ + (NODE) = (NEXT)) + +struct worker_control { + int id; + int size; + atomic_bool finished; + sem_t fire; + sem_t *done; + struct ovs_mutex mutex; + void *data; + void *workload; +}; + +struct worker_pool { + int size; + struct ovs_list list_node; + struct worker_control *controls; + sem_t done; +}; + +struct worker_pool *add_worker_pool(void *(*start)(void *)); + +bool seize_fire(void); +void fast_hmap_size_for(struct hmap *hmap, int size); +void fast_hmap_init(struct hmap *hmap, ssize_t size); +void fast_hmap_merge(struct hmap *dest, struct hmap *inc); +void hmap_merge(struct hmap *dest, struct hmap *inc); +void merge_lists(struct ovs_list **dest, struct ovs_list *inc); + +void run_pool( + struct worker_pool *pool); +void run_pool_hash( + struct worker_pool *pool, struct hmap *result, struct hmap *result_frags); +void run_pool_list( + struct worker_pool *pool, struct ovs_list **result, + struct ovs_list **result_frags); +void run_pool_callback( + struct worker_pool *pool, + void *fin_result, + void (*helper_func)( + struct worker_pool *pool, void *fin_result, int index)); + + +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' + * would land, or a null pointer if that bucket is empty. */ +static inline struct hmap_node * +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) +{ + return hmap->buckets[num]; +} + +static inline struct hmap_node * +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) +{ + size_t i; + for (i = start; i <= hmap->mask; i+= pool_size) { + struct hmap_node *node = hmap->buckets[i]; + if (node) { + return node; + } + } + return NULL; +} + +/* Returns the first node in 'hmap', as expected by thread with job_id + * for parallel processing in arbitrary order, or a null pointer if + * the slice of 'hmap' for that job_id is empty. */ +static inline struct hmap_node * +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) +{ + return parallel_hmap_next__(hmap, job_id, pool_size); +} + +/* Returns the next node in the slice of 'hmap' following 'node', + * in arbitrary order, or a * null pointer if 'node' is the last node in + * the 'hmap' slice. + * + */ +static inline struct hmap_node * +parallel_hmap_next( + const struct hmap *hmap, + const struct hmap_node *node, + ssize_t pool_size) +{ + return (node->next + ? node->next + : parallel_hmap_next__(hmap, + (node->hash & hmap->mask) + pool_size, pool_size)); +} + +#ifdef __cplusplus +} +#endif + +#endif /* lib/fast-hmap.h */ From patchwork Mon Jul 6 08:36:50 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1323447 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.138; helo=whitealder.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=cambridgegreys.com Received: from whitealder.osuosl.org (smtp1.osuosl.org [140.211.166.138]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4B0f811JVwz9sSn for ; Mon, 6 Jul 2020 18:37:21 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by whitealder.osuosl.org (Postfix) with ESMTP id 7477088330; Mon, 6 Jul 2020 08:37:19 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from whitealder.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id b+Kn89OgOVif; Mon, 6 Jul 2020 08:37:10 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by whitealder.osuosl.org (Postfix) with ESMTP id 6D81C8847B; Mon, 6 Jul 2020 08:37:10 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 53EF1C08A9; Mon, 6 Jul 2020 08:37:10 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from silver.osuosl.org (smtp3.osuosl.org [140.211.166.136]) by lists.linuxfoundation.org (Postfix) with ESMTP id 5160DC016F for ; Mon, 6 Jul 2020 08:37:09 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by silver.osuosl.org (Postfix) with ESMTP id 40E46227B1 for ; Mon, 6 Jul 2020 08:37:09 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from silver.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id CxONTcyHU3S5 for ; Mon, 6 Jul 2020 08:37:04 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.7.6 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by silver.osuosl.org (Postfix) with ESMTPS id 8456A226E9 for ; Mon, 6 Jul 2020 08:37:04 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1jsMcX-0006AM-Sz for dev@openvswitch.org; Mon, 06 Jul 2020 08:37:03 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1jsMcU-0007xN-LC; Mon, 06 Jul 2020 09:37:00 +0100 From: anton.ivanov@cambridgegreys.com To: dev@openvswitch.org Date: Mon, 6 Jul 2020 09:36:50 +0100 Message-Id: <20200706083650.29443-3-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200706083650.29443-1-anton.ivanov@cambridgegreys.com> References: <20200706083650.29443-1-anton.ivanov@cambridgegreys.com> MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [PATCH 2/2] Run monitor processing in parallel X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov Monitor processing performs a full walk of all rows in all tables referenced in a monitor. The rows are internally represented as a hash map. This operation can be run in parallel where thread M out N is walking hash buckets M, N+M, etc. Running inter-thread IPC for only a handful of datum values is not likely to be efficient, hence there is a cut-off. Tables smaller than the cut-off value are run in the main thread. Larger tables are run via multiple threads. Signed-off-by: Anton Ivanov --- lib/ovsdb-idl.c | 31 ++++ lib/ovsdb-idl.h | 6 + ovsdb/monitor.c | 358 ++++++++++++++++++++++++++++++++++++++++---- ovsdb/ovsdb-idlc.in | 26 ++++ 4 files changed, 394 insertions(+), 27 deletions(-) diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c index 0a18261fc..0720e7a68 100644 --- a/lib/ovsdb-idl.c +++ b/lib/ovsdb-idl.c @@ -45,6 +45,7 @@ #include "svec.h" #include "util.h" #include "uuid.h" +#include "fasthmap.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(ovsdb_idl); @@ -3543,6 +3544,24 @@ ovsdb_idl_first_row(const struct ovsdb_idl *idl, return next_real_row(table, hmap_first(&table->rows)); } +/* Returns a row in 'table_class''s table slice in 'idl', or a null pointer + * if that table slice is empty. + * + * Database tables are internally maintained as hash tables, so adding or + * removing rows while traversing the same table can cause some rows to be + * visited twice or not at apply. */ +const struct ovsdb_idl_row * +parallel_ovsdb_idl_first_row(const struct ovsdb_idl *idl, + const struct ovsdb_idl_table_class *table_class, + ssize_t job_id, + ssize_t pool_size) +{ + struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, + table_class); + return next_real_row( + table, parallel_hmap_first(&table->rows, job_id, pool_size)); +} + /* Returns a row following 'row' within its table, or a null pointer if 'row' * is the last row in its table. */ const struct ovsdb_idl_row * @@ -3553,6 +3572,18 @@ ovsdb_idl_next_row(const struct ovsdb_idl_row *row) return next_real_row(table, hmap_next(&table->rows, &row->hmap_node)); } +/* Returns a row following 'row' within its table slice, or a null pointer + * if 'row' is the last row in its table slice. */ +const struct ovsdb_idl_row * +parallel_ovsdb_idl_next_row( + const struct ovsdb_idl_row *row, ssize_t pool_size) +{ + struct ovsdb_idl_table *table = row->table; + + return next_real_row(table, + parallel_hmap_next(&table->rows, &row->hmap_node, pool_size)); +} + /* Reads and returns the value of 'column' within 'row'. If an ongoing * transaction has changed 'column''s value, the modified value is returned. * diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h index c56cd19b1..935db0dbe 100644 --- a/lib/ovsdb-idl.h +++ b/lib/ovsdb-idl.h @@ -235,6 +235,12 @@ const struct ovsdb_idl_row *ovsdb_idl_first_row( const struct ovsdb_idl *, const struct ovsdb_idl_table_class *); const struct ovsdb_idl_row *ovsdb_idl_next_row(const struct ovsdb_idl_row *); +const struct ovsdb_idl_row *parallel_ovsdb_idl_first_row( + const struct ovsdb_idl *, const struct ovsdb_idl_table_class *, + ssize_t job_id, ssize_t pool); +const struct ovsdb_idl_row *parallel_ovsdb_idl_next_row( + const struct ovsdb_idl_row *, ssize_t pool); + const struct ovsdb_datum *ovsdb_idl_read(const struct ovsdb_idl_row *, const struct ovsdb_idl_column *); const struct ovsdb_datum *ovsdb_idl_get(const struct ovsdb_idl_row *, diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 1c66b428e..d950aef38 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -37,6 +37,7 @@ #include "jsonrpc-server.h" #include "monitor.h" #include "util.h" +#include "fasthmap.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(ovsdb_monitor); @@ -1085,7 +1086,6 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name, const struct uuid *row_uuid) { char uuid[UUID_LEN + 1]; - /* Create JSON object for transaction overall. */ if (!*json) { *json = json_object_create(); @@ -1102,6 +1102,140 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name, json_object_put(*table_json, uuid, row_json); } +/* We cannot reuse the same multi-threaded conventions as for the cond + * update (further down) as the calling conventions and data supplied + * to update generation are quite different + * + * In addition to this, key parts of the code are dependent on container + * macros making it impossible to reuse processing code between the two. + * + * The only part which we can reuse is json result processing. + */ + +/* To debug parallel processing set this to zero */ + +#define PARALLEL_CUT_OFF 64 + +struct mon_json_result { + struct ovs_list list_node; + const char *table_name; + const struct uuid *row_uuid; + struct json *row_json; +}; + +struct monitor_compose_update_info { + struct ovsdb_monitor_change_set_for_table *mcst; + const struct ovsdb_monitor_session_condition *condition; + unsigned long int *changed; + struct ovs_list results; + compose_row_update_cb_func row_update; + bool initial; +}; + +struct monitor_compose_update_pool { + void (*row_helper_func)(struct ovsdb_monitor_row *row, + struct monitor_compose_update_info *mci); + struct worker_pool *pool; +}; + + +static void ovsdb_monitor_compose_update_process_one_row ( + struct ovsdb_monitor_row *row, + struct monitor_compose_update_info *mci) +{ + struct json *row_json; + struct mon_json_result *temp; + + row_json = (*mci->row_update)( + mci->mcst->mt, mci->condition, OVSDB_MONITOR_ROW, row, + mci->initial, mci->changed, mci->mcst->n_columns); + if (row_json) { + temp = xmalloc(sizeof(struct mon_json_result)); + temp->table_name = mci->mcst->mt->table->schema->name; + temp->row_uuid = &row->uuid; + temp->row_json = row_json; + ovs_list_push_back(&mci->results, &temp->list_node); + } +} + + +static void *monitor_compose_update_thread(void *arg) { + struct worker_control *control = (struct worker_control *) arg; + struct monitor_compose_update_pool *workload; + struct monitor_compose_update_info *mci; + struct ovsdb_monitor_row *row, *next; + struct hmap *table_rows; + int bnum; + + + while (!seize_fire()) { + sem_wait(&control->fire); + if (seize_fire()) { + return NULL; + } + workload = (struct monitor_compose_update_pool *) control->workload; + mci = (struct monitor_compose_update_info *) control->data; + if (mci && workload) { + table_rows = (struct hmap *) &mci->mcst->rows; + for (bnum = control->id; + bnum <= table_rows->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL_SAFE ( + row, next, hmap_node, bnum, table_rows) { + if (seize_fire()) { + return NULL; + } + (workload->row_helper_func)(row, control->data); + } + } + atomic_store_relaxed(&control->finished, true); + atomic_thread_fence(memory_order_release); + } + sem_post(control->done); + } + return NULL; +} + + +static void + ovsdb_monitor_compose_cond_change_update_generate_json( + struct json **json, struct json **table_json, + struct ovs_list *temp_result) { + struct mon_json_result * temp; + + LIST_FOR_EACH_POP (temp, list_node, temp_result) { + ovsdb_monitor_add_json_row(json, + temp->table_name, + table_json, + temp->row_json, + temp->row_uuid); + free(temp); + } +} + + +static struct monitor_compose_update_pool *monitor_compose_pool = NULL; + +static void init_compose_pool(void) { + + int index; + + if (!monitor_compose_pool) { + monitor_compose_pool = + xmalloc(sizeof (struct monitor_compose_update_pool)); + monitor_compose_pool->pool = + add_worker_pool(monitor_compose_update_thread); + monitor_compose_pool->row_helper_func = + ovsdb_monitor_compose_update_process_one_row; + + for (index = 0; index < monitor_compose_pool->pool->size; index++) { + monitor_compose_pool->pool->controls[index].workload = + monitor_compose_pool; + } + } +} + /* Constructs and returns JSON for a object (as described in * RFC 7047) for all the outstanding changes within 'monitor', starting from * 'transaction'. */ @@ -1114,31 +1248,174 @@ ovsdb_monitor_compose_update( { struct json *json; size_t max_columns = ovsdb_monitor_max_columns(dbmon); - unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); json = NULL; struct ovsdb_monitor_change_set_for_table *mcst; LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) { struct ovsdb_monitor_row *row, *next; struct json *table_json = NULL; - struct ovsdb_monitor_table *mt = mcst->mt; - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) { - struct json *row_json; - row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row, - initial, changed, mcst->n_columns); - if (row_json) { - ovsdb_monitor_add_json_row(&json, mt->table->schema->name, - &table_json, row_json, - &row->uuid); + if (hmap_count(&mcst->rows) < PARALLEL_CUT_OFF) { + struct monitor_compose_update_info mci; + mci.mcst = mcst; + mci.condition = condition; + mci.initial = initial; + mci.changed = xmalloc(bitmap_n_bytes(max_columns)); + mci.row_update = row_update; + ovs_list_init(&mci.results); + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) { + ovsdb_monitor_compose_update_process_one_row( + row, &mci); + } + if (!ovs_list_is_empty(&mci.results)) { + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, &mci.results); + } + free(mci.changed); + } else { + struct monitor_compose_update_info *mci; + struct ovs_list *combined_result = NULL; + struct ovs_list **results = NULL; + int index; + + init_compose_pool(); + + mci = xmalloc(sizeof(struct monitor_compose_update_info) * + monitor_compose_pool->pool->size); + results = xmalloc(sizeof(struct ovs_list *) * + monitor_compose_pool->pool->size); + + for (index = 0; + index < monitor_compose_pool->pool->size; index++) { + + mci[index].mcst = mcst; + mci[index].condition = condition; + mci[index].initial = initial; + mci[index].changed = xmalloc(bitmap_n_bytes(max_columns)); + mci[index].row_update = row_update; + ovs_list_init(&mci[index].results); + results[index] = &mci[index].results; + monitor_compose_pool->pool->controls[index].data = &mci[index]; + } + + run_pool_list( + monitor_compose_pool->pool, + &combined_result, + results); + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, combined_result); + + free(results); + for (index = 0; + index < monitor_compose_pool->pool->size; index++) { + free(mci[index].changed); } + free(mci); + } } - free(changed); return json; } + +struct monitor_cond_change_info { + struct ovsdb_monitor_table *mt; + struct ovsdb_monitor_session_condition *condition; + unsigned long int *changed; + struct ovs_list results; +}; + +struct monitor_cond_change_pool { + void (*row_helper_func)(struct ovsdb_row *row, + struct monitor_cond_change_info *mi); + struct worker_pool *pool; +}; + +static void *monitor_cond_change_thread(void *arg) { + struct worker_control *control = (struct worker_control *) arg; + struct monitor_cond_change_pool *workload; + struct monitor_cond_change_info *mi; + struct ovsdb_row *row; + struct hmap *table_rows; + int bnum; + + + while (!seize_fire()) { + sem_wait(&control->fire); + if (seize_fire()) { + return NULL; + } + workload = (struct monitor_cond_change_pool *) control->workload; + mi = (struct monitor_cond_change_info *) control->data; + if (mi && workload) { + table_rows = (struct hmap *) &mi->mt->table->rows; + for (bnum = control->id; + bnum <= mi->mt->table->rows.mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, table_rows) { + if (seize_fire()) { + return NULL; + } + (workload->row_helper_func)(row, control->data); + } + } + atomic_store_relaxed(&control->finished, true); + atomic_thread_fence(memory_order_release); + } + sem_post(control->done); + } + return NULL; +} + +static void ovsdb_monitor_cond_change_process_one_row ( + struct ovsdb_row *row, + struct monitor_cond_change_info *mi) +{ + struct json *row_json; + struct mon_json_result *temp; + + row_json = ovsdb_monitor_compose_row_update2( + mi->mt, + mi->condition, + OVSDB_ROW, + row, + false, + mi->changed, + mi->mt->n_columns); + if (row_json) { + temp = xmalloc(sizeof(struct mon_json_result)); + temp->table_name = mi->mt->table->schema->name; + temp->row_uuid = ovsdb_row_get_uuid(row); + temp->row_json = row_json; + ovs_list_push_back(&mi->results, &temp->list_node); + } +} + +static struct monitor_cond_change_pool *monitor_cond_pool = NULL; + +static void init_cond_pool(void) { + + int index; + + if (!monitor_cond_pool) { + monitor_cond_pool = + xmalloc(sizeof (struct monitor_cond_change_pool)); + monitor_cond_pool->pool = + add_worker_pool(monitor_cond_change_thread); + monitor_cond_pool->row_helper_func = + ovsdb_monitor_cond_change_process_one_row; + + for (index = 0; index < monitor_cond_pool->pool->size; index++) { + monitor_cond_pool->pool->controls[index].workload = + monitor_cond_pool; + } + } +} + + static struct json* ovsdb_monitor_compose_cond_change_update( struct ovsdb_monitor *dbmon, @@ -1147,13 +1424,33 @@ ovsdb_monitor_compose_cond_change_update( struct shash_node *node; struct json *json = NULL; size_t max_columns = ovsdb_monitor_max_columns(dbmon); - unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); + int index; + + init_cond_pool(); SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; struct ovsdb_row *row; struct json *table_json = NULL; struct ovsdb_condition *old_condition, *new_condition; + struct monitor_cond_change_info *mi; + struct ovs_list *combined_result = NULL; + struct ovs_list **results = NULL; + + + mi = xmalloc(sizeof(struct monitor_cond_change_info) * + monitor_cond_pool->pool->size); + results = xmalloc(sizeof(struct ovs_list *) * + monitor_cond_pool->pool->size); + + for (index = 0; index < monitor_cond_pool->pool->size; index++) { + mi[index].changed = xmalloc(bitmap_n_bytes(max_columns)); + mi[index].condition = condition; + mi[index].mt = mt; + ovs_list_init(&mi[index].results); + results[index] = &mi[index].results; + monitor_cond_pool->pool->controls[index].data = &mi[index]; + } if (!ovsdb_monitor_get_table_conditions(mt, condition, @@ -1163,24 +1460,31 @@ ovsdb_monitor_compose_cond_change_update( /* Nothing to update on this table */ continue; } - - /* Iterate over all rows in table */ - HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { - struct json *row_json; - - row_json = ovsdb_monitor_compose_row_update2(mt, condition, - OVSDB_ROW, row, - false, changed, - mt->n_columns); - if (row_json) { - ovsdb_monitor_add_json_row(&json, mt->table->schema->name, - &table_json, row_json, - ovsdb_row_get_uuid(row)); + if (hmap_count(&mt->table->rows) < PARALLEL_CUT_OFF) { + /* Iterate over all rows in table - single threaded */ + HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { + ovsdb_monitor_cond_change_process_one_row(row, &mi[0]); + } + if (!ovs_list_is_empty(&mi[0].results)) { + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, &mi[0].results); } + } else { + run_pool_list( + monitor_cond_pool->pool, + &combined_result, + results); + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, combined_result); } ovsdb_monitor_table_condition_updated(mt, condition); + + free(results); + for (index = 0; index < monitor_cond_pool->pool->size; index++) { + free(mi[index].changed); + } + free(mi); } - free(changed); return json; } diff --git a/ovsdb/ovsdb-idlc.in b/ovsdb/ovsdb-idlc.in index 698fe25f3..db3ff5a60 100755 --- a/ovsdb/ovsdb-idlc.in +++ b/ovsdb/ovsdb-idlc.in @@ -254,6 +254,8 @@ const struct %(s)s *%(s)s_get_for_uuid(const struct ovsdb_idl *, const struct uu const struct %(s)s *%(s)s_table_get_for_uuid(const struct %(s)s_table *, const struct uuid *); const struct %(s)s *%(s)s_first(const struct ovsdb_idl *); const struct %(s)s *%(s)s_next(const struct %(s)s *); +const struct %(s)s *%(s)s_parallel_first(const struct ovsdb_idl *, ssize_t job_id, ssize_t pool); +const struct %(s)s *%(s)s_parallel_next(const struct %(s)s *, ssize_t pool); #define %(S)s_FOR_EACH(ROW, IDL) \\ for ((ROW) = %(s)s_first(IDL); \\ (ROW); \\ @@ -262,6 +264,10 @@ const struct %(s)s *%(s)s_next(const struct %(s)s *); for ((ROW) = %(s)s_first(IDL); \\ (ROW) ? ((NEXT) = %(s)s_next(ROW), 1) : 0; \\ (ROW) = (NEXT)) +#define %(S)s_PARALLEL_FOR_EACH(ROW, IDL, JOBID, POOL) \\ + for ((ROW) = %(s)s_parallel_first(IDL, JOBID, POOL); \\ + (ROW); \\ + (ROW) = %(s)s_parallel_next(ROW, POOL)) unsigned int %(s)s_get_seqno(const struct ovsdb_idl *); unsigned int %(s)s_row_get_seqno(const struct %(s)s *row, enum ovsdb_idl_change change); @@ -692,6 +698,18 @@ const struct %(s)s * return %(s)s_cast(ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s)); } +/* Returns a row in table "%(t)s" in 'idl', or a null pointer if that + * table is empty. + * + * Database tables are internally maintained as hash tables, so adding or + * removing rows while traversing the same table can cause some rows to be + * visited twice or not at apply. */ +const struct %(s)s * +%(s)s_parallel_first(const struct ovsdb_idl *idl, ssize_t job_id, ssize_t pool) +{ + return %(s)s_cast(parallel_ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s, job_id, pool)); +} + /* Returns a row following 'row' within its table, or a null pointer if 'row' * is the last row in its table. */ const struct %(s)s * @@ -700,6 +718,14 @@ const struct %(s)s * return %(s)s_cast(ovsdb_idl_next_row(&row->header_)); } +/* Returns a row following 'row' within its table slice, or a null pointer if 'row' + * is the last row in its table slice. */ +const struct %(s)s * +%(s)s_parallel_next(const struct %(s)s *row, ssize_t pool) +{ + return %(s)s_cast(parallel_ovsdb_idl_next_row(&row->header_, pool)); +} + unsigned int %(s)s_get_seqno(const struct ovsdb_idl *idl) { return ovsdb_idl_table_get_seqno(idl, &%(p)stable_%(tl)s);