From patchwork Fri Sep 3 14:28:14 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1524349 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.138; helo=smtp1.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Received: from smtp1.osuosl.org (smtp1.osuosl.org [140.211.166.138]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4H1KsW2fJCz9sPf for ; Sat, 4 Sep 2021 00:28:31 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by smtp1.osuosl.org (Postfix) with ESMTP id 094F383B42; Fri, 3 Sep 2021 14:28:28 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp1.osuosl.org ([127.0.0.1]) by localhost (smtp1.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id ushUwCZdvmJx; Fri, 3 Sep 2021 14:28:26 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [IPv6:2605:bc80:3010:104::8cd3:938]) by smtp1.osuosl.org (Postfix) with ESMTPS id A16D383EC7; Fri, 3 Sep 2021 14:28:25 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 7A097C0010; Fri, 3 Sep 2021 14:28:25 +0000 (UTC) X-Original-To: ovs-dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from smtp2.osuosl.org (smtp2.osuosl.org [140.211.166.133]) by lists.linuxfoundation.org (Postfix) with ESMTP id 2CD12C0010 for ; Fri, 3 Sep 2021 14:28:24 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by smtp2.osuosl.org (Postfix) with ESMTP id AB2D7407F6 for ; Fri, 3 Sep 2021 14:28:23 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp2.osuosl.org ([127.0.0.1]) by localhost (smtp2.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id U-3eMoJUsBJU for ; Fri, 3 Sep 2021 14:28:22 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.8.0 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by smtp2.osuosl.org (Postfix) with ESMTPS id 1E6E5407FE for ; Fri, 3 Sep 2021 14:28:21 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1mMAB1-0001iY-Uw for ovs-dev@openvswitch.org; Fri, 03 Sep 2021 14:28:20 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1mMAAx-0006fM-NK; Fri, 03 Sep 2021 15:28:17 +0100 From: anton.ivanov@cambridgegreys.com To: ovs-dev@openvswitch.org Date: Fri, 3 Sep 2021 15:28:14 +0100 Message-Id: <20210903142814.25574-1-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [OVN Patch v2] Make changes to the parallel processing API to allow pool sizing X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov 1. Make pool size user defineable. 2. Expose pool destruction. 3. Make pools resizeable at runtime. Signed-off-by: Anton Ivanov --- lib/ovn-parallel-hmap.c | 202 ++++++++++++++++++++++++++++++---------- lib/ovn-parallel-hmap.h | 23 ++++- northd/ovn-northd.c | 58 +++++------- ovs | 2 +- 4 files changed, 194 insertions(+), 91 deletions(-) diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c index b8c7ac786..30de457b5 100644 --- a/lib/ovn-parallel-hmap.c +++ b/lib/ovn-parallel-hmap.c @@ -51,7 +51,6 @@ static bool can_parallelize = false; * accompanied by a fence. It does not need to be atomic or be * accessed under a lock. */ -static bool workers_must_exit = false; static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); @@ -70,10 +69,20 @@ static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, void *fin_result, void *result_frags, int index); + +static bool init_control(struct worker_control *control, int id, + struct worker_pool *pool); + +static void cleanup_control(struct worker_pool *pool, int id); + +static void free_controls(struct worker_pool *pool); + +static struct worker_control *alloc_controls(int size); + bool -ovn_stop_parallel_processing(void) +ovn_stop_parallel_processing(struct worker_pool *pool) { - return workers_must_exit; + return pool->workers_must_exit; } bool @@ -92,11 +101,67 @@ ovn_can_parallelize_hashes(bool force_parallel) return can_parallelize; } + +void +destroy_pool(struct worker_pool *pool) { + char sem_name[256]; + + free_controls(pool); + sem_close(pool->done); + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); + sem_unlink(sem_name); + free(pool); +} + +bool +ovn_resize_pool(struct worker_pool *pool, int size) +{ + int i; + + ovs_assert(pool != NULL); + + if (!size) { + size = pool_size; + } + + ovs_mutex_lock(&init_mutex); + + if (can_parallelize) { + free_controls(pool); + pool->size = size; + + /* Allocate new control structures. */ + + pool->controls = alloc_controls(size); + pool->workers_must_exit = false; + + for (i = 0; i < pool->size; i++) { + if (! init_control(&pool->controls[i], i, pool)) { + goto cleanup; + } + } + } + ovs_mutex_unlock(&init_mutex); + return true; +cleanup: + + /* Something went wrong when opening semaphores. In this case + * it is better to shut off parallel procesing altogether + */ + + VLOG_INFO("Failed to initialize parallel processing, error %d", errno); + can_parallelize = false; + free_controls(pool); + + ovs_mutex_unlock(&init_mutex); + return false; +} + + struct worker_pool * -ovn_add_worker_pool(void *(*start)(void *)) +ovn_add_worker_pool(void *(*start)(void *), int size) { struct worker_pool *new_pool = NULL; - struct worker_control *new_control; bool test = false; int i; char sem_name[256]; @@ -113,38 +178,29 @@ ovn_add_worker_pool(void *(*start)(void *)) ovs_mutex_unlock(&init_mutex); } + if (!size) { + size = pool_size; + } + ovs_mutex_lock(&init_mutex); if (can_parallelize) { new_pool = xmalloc(sizeof(struct worker_pool)); - new_pool->size = pool_size; - new_pool->controls = NULL; + new_pool->size = size; + new_pool->start = start; sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); if (new_pool->done == SEM_FAILED) { goto cleanup; } - new_pool->controls = - xmalloc(sizeof(struct worker_control) * new_pool->size); + new_pool->controls = alloc_controls(size); + new_pool->workers_must_exit = false; for (i = 0; i < new_pool->size; i++) { - new_control = &new_pool->controls[i]; - new_control->id = i; - new_control->done = new_pool->done; - new_control->data = NULL; - ovs_mutex_init(&new_control->mutex); - new_control->finished = ATOMIC_VAR_INIT(false); - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); - new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); - if (new_control->fire == SEM_FAILED) { + if (!init_control(&new_pool->controls[i], i, new_pool)) { goto cleanup; } } - - for (i = 0; i < pool_size; i++) { - new_pool->controls[i].worker = - ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); - } ovs_list_push_back(&worker_pools, &new_pool->list_node); } ovs_mutex_unlock(&init_mutex); @@ -157,16 +213,7 @@ cleanup: VLOG_INFO("Failed to initialize parallel processing, error %d", errno); can_parallelize = false; - if (new_pool->controls) { - for (i = 0; i < new_pool->size; i++) { - if (new_pool->controls[i].fire != SEM_FAILED) { - sem_close(new_pool->controls[i].fire); - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); - sem_unlink(sem_name); - break; /* semaphores past this one are uninitialized */ - } - } - } + free_controls(new_pool); if (new_pool->done != SEM_FAILED) { sem_close(new_pool->done); sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); @@ -176,7 +223,6 @@ cleanup: return NULL; } - /* Initializes 'hmap' as an empty hash table with mask N. */ void ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) @@ -365,14 +411,84 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl) } } +static bool +init_control(struct worker_control *control, int id, + struct worker_pool *pool) +{ + char sem_name[256]; + control->id = id; + control->done = pool->done; + control->data = NULL; + ovs_mutex_init(&control->mutex); + control->finished = ATOMIC_VAR_INIT(false); + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id); + control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); + control->pool = pool; + control->worker = 0; + if (control->fire == SEM_FAILED) { + return false; + } + control->worker = + ovs_thread_create("worker pool helper", pool->start, control); + return true; +} + static void -worker_pool_hook(void *aux OVS_UNUSED) { +cleanup_control(struct worker_pool *pool, int id) +{ + char sem_name[256]; + struct worker_control *control = &pool->controls[id]; + + if (control->fire != SEM_FAILED) { + sem_close(control->fire); + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id); + sem_unlink(sem_name); + } +} + +static void +free_controls(struct worker_pool *pool) +{ int i; + if (pool->controls) { + pool->workers_must_exit = true; + for (i = 0; i < pool->size ; i++) { + if (pool->controls[i].fire != SEM_FAILED) { + sem_post(pool->controls[i].fire); + } + } + for (i = 0; i < pool->size ; i++) { + if (pool->controls[i].worker) { + pthread_join(pool->controls[i].worker, NULL); + pool->controls[i].worker = 0; + } + } + for (i = 0; i < pool->size; i++) { + cleanup_control(pool, i); + } + free(pool->controls); + pool->controls = NULL; + pool->workers_must_exit = false; + } +} + +static struct worker_control *alloc_controls(int size) +{ + int i; + struct worker_control *controls = + xcalloc(sizeof(struct worker_control), size); + + for (i = 0; i < size ; i++) { + controls[i].fire = SEM_FAILED; + } + return controls; +} + +static void +worker_pool_hook(void *aux OVS_UNUSED) { static struct worker_pool *pool; char sem_name[256]; - workers_must_exit = true; - /* All workers must honour the must_exit flag and check for it regularly. * We can make it atomic and check it via atomics in workers, but that * is not really necessary as it is set just once - when the program @@ -383,17 +499,7 @@ worker_pool_hook(void *aux OVS_UNUSED) { /* Wake up the workers after the must_exit flag has been set */ LIST_FOR_EACH (pool, list_node, &worker_pools) { - for (i = 0; i < pool->size ; i++) { - sem_post(pool->controls[i].fire); - } - for (i = 0; i < pool->size ; i++) { - pthread_join(pool->controls[i].worker, NULL); - } - for (i = 0; i < pool->size ; i++) { - sem_close(pool->controls[i].fire); - sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); - sem_unlink(sem_name); - } + free_controls(pool); sem_close(pool->done); sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); sem_unlink(sem_name); diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h index 2df132ea8..4708f41f2 100644 --- a/lib/ovn-parallel-hmap.h +++ b/lib/ovn-parallel-hmap.h @@ -83,6 +83,7 @@ struct worker_control { void *data; /* Pointer to data to be processed. */ void *workload; /* back-pointer to the worker pool structure. */ pthread_t worker; + struct worker_pool *pool; }; struct worker_pool { @@ -90,16 +91,21 @@ struct worker_pool { struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ struct worker_control *controls; /* "Handles" in this pool. */ sem_t *done; /* Work completion semaphorew. */ + void *(*start)(void *); /* Work function. */ + bool workers_must_exit; /* Pool to be destroyed flag. */ }; /* Add a worker pool for thread function start() which expects a pointer to - * a worker_control structure as an argument. */ + * a worker_control structure as an argument. + * If size is non-zero, it is used for pool sizing. If size is zero, pool + * size uses system defaults. + */ -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size); /* Setting this to true will make all processing threads exit */ -bool ovn_stop_parallel_processing(void); +bool ovn_stop_parallel_processing(struct worker_pool *pool); /* Build a hmap pre-sized for size elements */ @@ -253,6 +259,10 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl) bool ovn_can_parallelize_hashes(bool force_parallel); +void ovn_destroy_pool(struct worker_pool *pool); + +bool ovn_resize_pool(struct worker_pool *pool, int size); + /* Use the OVN library functions for stuff which OVS has not defined * If OVS has defined these, they will still compile using the OVN * local names, but will be dropped by the linker in favour of the OVS @@ -263,9 +273,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel); #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) -#define stop_parallel_processing() ovn_stop_parallel_processing() +#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool) -#define add_worker_pool(start) ovn_add_worker_pool(start) +#define add_worker_pool(start, size) ovn_add_worker_pool(start, size) #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) @@ -286,6 +296,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel); #define run_pool_callback(pool, fin_result, result_frags, helper_func) \ ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) +#define destroy_pool(pool) ovn_destroy_pool(pool) + +#define resize_pool(pool, size) ovn_resize_pool(pool, size) #ifdef __clang__ diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c index ee761cef0..324800c32 100644 --- a/northd/ovn-northd.c +++ b/northd/ovn-northd.c @@ -12828,16 +12828,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op, &lsi->actions); } -struct lflows_thread_pool { - struct worker_pool *pool; -}; - - static void * build_lflows_thread(void *arg) { struct worker_control *control = (struct worker_control *) arg; - struct lflows_thread_pool *workload; struct lswitch_flow_build_info *lsi; struct ovn_datapath *od; @@ -12846,21 +12840,21 @@ build_lflows_thread(void *arg) struct ovn_igmp_group *igmp_group; int bnum; - while (!stop_parallel_processing()) { + + while (!stop_parallel_processing(control->pool)) { wait_for_work(control); - workload = (struct lflows_thread_pool *) control->workload; lsi = (struct lswitch_flow_build_info *) control->data; - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } - if (lsi && workload) { + if (lsi) { /* Iterate over bucket ThreadID, ThreadID+size, ... */ for (bnum = control->id; bnum <= lsi->datapaths->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_and_lrouter_iterate_by_od(od, lsi); @@ -12868,10 +12862,10 @@ build_lflows_thread(void *arg) } for (bnum = control->id; bnum <= lsi->ports->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_and_lrouter_iterate_by_op(op, lsi); @@ -12879,10 +12873,10 @@ build_lflows_thread(void *arg) } for (bnum = control->id; bnum <= lsi->lbs->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_arp_nd_service_monitor(lb, lsi->lflows, @@ -12900,11 +12894,11 @@ build_lflows_thread(void *arg) } for (bnum = control->id; bnum <= lsi->igmp_groups->mask; - bnum += workload->pool->size) + bnum += control->pool->size) { HMAP_FOR_EACH_IN_PARALLEL ( igmp_group, hmap_node, bnum, lsi->igmp_groups) { - if (stop_parallel_processing()) { + if (stop_parallel_processing(control->pool)) { return NULL; } build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows, @@ -12919,24 +12913,14 @@ build_lflows_thread(void *arg) } static bool pool_init_done = false; -static struct lflows_thread_pool *build_lflows_pool = NULL; +static struct worker_pool *build_lflows_pool = NULL; static void init_lflows_thread_pool(void) { - int index; - if (!pool_init_done) { - struct worker_pool *pool = add_worker_pool(build_lflows_thread); + build_lflows_pool = add_worker_pool(build_lflows_thread, 0); pool_init_done = true; - if (pool) { - build_lflows_pool = xmalloc(sizeof(*build_lflows_pool)); - build_lflows_pool->pool = pool; - for (index = 0; index < build_lflows_pool->pool->size; index++) { - build_lflows_pool->pool->controls[index].workload = - build_lflows_pool; - } - } } } @@ -12979,16 +12963,16 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, struct lswitch_flow_build_info *lsiv; int index; - lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size); + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size); if (use_logical_dp_groups) { lflow_segs = NULL; } else { - lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size); + lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size); } /* Set up "work chunks" for each thread to work on. */ - for (index = 0; index < build_lflows_pool->pool->size; index++) { + for (index = 0; index < build_lflows_pool->size; index++) { if (use_logical_dp_groups) { /* if dp_groups are in use we lock a shared lflows hash * on a per-bucket level instead of merging hash frags */ @@ -13010,17 +12994,17 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports, ds_init(&lsiv[index].match); ds_init(&lsiv[index].actions); - build_lflows_pool->pool->controls[index].data = &lsiv[index]; + build_lflows_pool->controls[index].data = &lsiv[index]; } /* Run thread pool. */ if (use_logical_dp_groups) { - run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback); + run_pool_callback(build_lflows_pool, NULL, NULL, noop_callback); } else { - run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); + run_pool_hash(build_lflows_pool, lflows, lflow_segs); } - for (index = 0; index < build_lflows_pool->pool->size; index++) { + for (index = 0; index < build_lflows_pool->size; index++) { ds_destroy(&lsiv[index].match); ds_destroy(&lsiv[index].actions); } diff --git a/ovs b/ovs index 748010ff3..50e5523b9 160000 --- a/ovs +++ b/ovs @@ -1 +1 @@ -Subproject commit 748010ff304b7cd2c43f4eb98a554433f0df07f9 +Subproject commit 50e5523b9b2b154e5fafc5acdcdec85e9cc5a330