From patchwork Mon Jul 5 13:42:28 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1500769 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.137; helo=smtp4.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Received: from smtp4.osuosl.org (smtp4.osuosl.org [140.211.166.137]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4GJRhR6cXMz9sV8 for ; Mon, 5 Jul 2021 23:42:47 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by smtp4.osuosl.org (Postfix) with ESMTP id 8196140318; Mon, 5 Jul 2021 13:42:45 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp4.osuosl.org ([127.0.0.1]) by localhost (smtp4.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id Y3K78-uNuipI; Mon, 5 Jul 2021 13:42:43 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by smtp4.osuosl.org (Postfix) with ESMTPS id B50B840277; Mon, 5 Jul 2021 13:42:42 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 74963C001B; Mon, 5 Jul 2021 13:42:42 +0000 (UTC) X-Original-To: ovs-dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from smtp3.osuosl.org (smtp3.osuosl.org [IPv6:2605:bc80:3010::136]) by lists.linuxfoundation.org (Postfix) with ESMTP id 13DA5C000E for ; Mon, 5 Jul 2021 13:42:41 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by smtp3.osuosl.org (Postfix) with ESMTP id E84446085C for ; Mon, 5 Jul 2021 13:42:40 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from smtp3.osuosl.org ([127.0.0.1]) by localhost (smtp3.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 8hOIAADP3EGs for ; Mon, 5 Jul 2021 13:42:39 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.8.0 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by smtp3.osuosl.org (Postfix) with ESMTPS id DBAE060708 for ; Mon, 5 Jul 2021 13:42:38 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1m0Ors-00037H-Ra for ovs-dev@openvswitch.org; Mon, 05 Jul 2021 13:42:37 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1m0Orp-0007t6-GH; Mon, 05 Jul 2021 14:42:35 +0100 From: anton.ivanov@cambridgegreys.com To: ovs-dev@openvswitch.org Date: Mon, 5 Jul 2021 14:42:28 +0100 Message-Id: <20210705134228.30197-2-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210705134228.30197-1-anton.ivanov@cambridgegreys.com> References: <20210705134228.30197-1-anton.ivanov@cambridgegreys.com> MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [PATCH v2 2/2] ovsdb: Run monitor processing in parallel X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov Monitor processing performs a full walk of all rows in all tables referenced in a monitor. The rows are internally represented as a hash map. This operation can be run in parallel where thread M out N is walking hash buckets M, N+M, etc. Running inter-thread IPC for only a handful of datum values is not likely to be efficient, hence there is a cut-off. Tables smaller than the cut-off value are run in the main thread. Larger tables are run via multiple threads. The cut-off value of 8 has been chosen based on experimentation on machines with between 4 and 12 cores. Signed-off-by: Anton Ivanov --- lib/ovsdb-idl.c | 30 ++++ lib/ovsdb-idl.h | 6 + ovsdb/monitor.c | 387 ++++++++++++++++++++++++++++++++++++++++---- ovsdb/ovsdb-idlc.in | 26 +++ 4 files changed, 414 insertions(+), 35 deletions(-) diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c index 2198c69c6..bf4fd6db6 100644 --- a/lib/ovsdb-idl.c +++ b/lib/ovsdb-idl.c @@ -46,6 +46,7 @@ #include "svec.h" #include "util.h" #include "uuid.h" +#include "parallel-hmap.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(ovsdb_idl); @@ -2375,6 +2376,24 @@ ovsdb_idl_first_row(const struct ovsdb_idl *idl, return next_real_row(table, hmap_first(&table->rows)); } +/* Returns a row in 'table_class''s table slice in 'idl', or a null pointer + * if that table slice is empty. + * + * Database tables are internally maintained as hash tables, so adding or + * removing rows while traversing the same table can cause some rows to be + * visited twice or not at apply. */ +const struct ovsdb_idl_row * +parallel_ovsdb_idl_first_row(const struct ovsdb_idl *idl, + const struct ovsdb_idl_table_class *table_class, + ssize_t job_id, + ssize_t pool_size) +{ + struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, + table_class); + return next_real_row( + table, parallel_hmap_first(&table->rows, job_id, pool_size)); +} + /* Returns a row following 'row' within its table, or a null pointer if 'row' * is the last row in its table. */ const struct ovsdb_idl_row * @@ -2385,6 +2404,17 @@ ovsdb_idl_next_row(const struct ovsdb_idl_row *row) return next_real_row(table, hmap_next(&table->rows, &row->hmap_node)); } +/* Returns a row following 'row' within its table slice, or a null pointer + * if 'row' is the last row in its table slice. */ +const struct ovsdb_idl_row * +parallel_ovsdb_idl_next_row(const struct ovsdb_idl_row *row, ssize_t pool_size) +{ + struct ovsdb_idl_table *table = row->table; + + return next_real_row(table, + parallel_hmap_next(&table->rows, &row->hmap_node, pool_size)); +} + /* Reads and returns the value of 'column' within 'row'. If an ongoing * transaction has changed 'column''s value, the modified value is returned. * diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h index d93483245..0d0b68f0f 100644 --- a/lib/ovsdb-idl.h +++ b/lib/ovsdb-idl.h @@ -236,6 +236,12 @@ const struct ovsdb_idl_row *ovsdb_idl_first_row( const struct ovsdb_idl *, const struct ovsdb_idl_table_class *); const struct ovsdb_idl_row *ovsdb_idl_next_row(const struct ovsdb_idl_row *); +const struct ovsdb_idl_row *parallel_ovsdb_idl_first_row( + const struct ovsdb_idl *, const struct ovsdb_idl_table_class *, + ssize_t job_id, ssize_t pool); +const struct ovsdb_idl_row *parallel_ovsdb_idl_next_row( + const struct ovsdb_idl_row *, ssize_t pool); + const struct ovsdb_datum *ovsdb_idl_read(const struct ovsdb_idl_row *, const struct ovsdb_idl_column *); const struct ovsdb_datum *ovsdb_idl_get(const struct ovsdb_idl_row *, diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 532dedcb6..acf9e5798 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -36,6 +36,7 @@ #include "jsonrpc-server.h" #include "monitor.h" #include "util.h" +#include "parallel-hmap.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(ovsdb_monitor); @@ -1084,7 +1085,6 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name, const struct uuid *row_uuid) { char uuid[UUID_LEN + 1]; - /* Create JSON object for transaction overall. */ if (!*json) { *json = json_object_create(); @@ -1101,6 +1101,142 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name, json_object_put(*table_json, uuid, row_json); } +/* We cannot reuse the same multi-threaded conventions as for the cond + * update (further down) as the calling conventions and data supplied + * to update generation are quite different + * + * In addition to this, key parts of the code are dependent on container + * macros making it impossible to reuse processing code between the two. + * + * The only part which we can reuse is json result processing. + */ + +/* The "cut-off" point determined by using debug timings seems to be + * somewhere ~ processing hashesh with 4-8 elements. That is surprisingly + * low. JSON serialization looks like the primary suspect. If that improves + * this should be increased accordingly. + */ + +#define PARALLEL_CUT_OFF_A 8 + +struct mon_json_result { + struct ovs_list list_node; + const char *table_name; + const struct uuid *row_uuid; + struct json *row_json; +}; + +struct monitor_compose_update_info { + struct ovsdb_monitor_change_set_for_table *mcst; + const struct ovsdb_monitor_session_condition *condition; + unsigned long int *changed; + struct ovs_list *results; + compose_row_update_cb_func row_update; + bool initial; +}; + +struct monitor_compose_update_pool { + void (*row_helper_func)(struct ovsdb_monitor_row *row, + struct monitor_compose_update_info *mci); + struct worker_pool *pool; +}; + + +static void ovsdb_monitor_compose_update_process_one_row ( + struct ovsdb_monitor_row *row, + struct monitor_compose_update_info *mci) +{ + struct json *row_json; + struct mon_json_result *temp; + + row_json = (*mci->row_update)(mci->mcst->mt, mci->condition, OVSDB_MONITOR_ROW, + row, mci->initial, mci->changed, + mci->mcst->n_columns); + if (row_json) { + temp = xmalloc(sizeof(struct mon_json_result)); + temp->table_name = mci->mcst->mt->table->schema->name; + temp->row_uuid = &row->uuid; + temp->row_json = row_json; + ovs_list_push_back(mci->results, &temp->list_node); + } +} + + +static void *monitor_compose_update_thread(void *arg) +{ + struct worker_control *control = (struct worker_control *) arg; + struct monitor_compose_update_pool *workload; + struct monitor_compose_update_info *mci; + struct ovsdb_monitor_row *row; + struct hmap *table_rows; + int bnum; + + + while (!stop_parallel_processing()) { + wait_for_work(control); + if (stop_parallel_processing()) { + return NULL; + } + workload = (struct monitor_compose_update_pool *) control->workload; + mci = (struct monitor_compose_update_info *) control->data; + if (mci && workload) { + table_rows = (struct hmap *) &mci->mcst->rows; + for (bnum = control->id; + bnum <= table_rows->mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, table_rows) { + if (stop_parallel_processing()) { + return NULL; + } + (workload->row_helper_func)(row, control->data); + } + } + } + post_completed_work(control); + } + return NULL; +} + + +static void ovsdb_monitor_compose_cond_change_update_generate_json( + struct json **json, struct json **table_json, + struct ovs_list *temp_result) +{ + struct mon_json_result * temp; + + LIST_FOR_EACH_POP (temp, list_node, temp_result) { + ovsdb_monitor_add_json_row(json, + temp->table_name, + table_json, + temp->row_json, + temp->row_uuid); + free(temp); + } +} + + +static struct monitor_compose_update_pool *monitor_compose_pool = NULL; + +static void init_compose_pool(void) { + + int index; + + if (can_parallelize_hashes(false) && (!monitor_compose_pool)) { + monitor_compose_pool = + xmalloc(sizeof(*monitor_compose_pool)); + monitor_compose_pool->pool = + add_worker_pool(monitor_compose_update_thread); + monitor_compose_pool->row_helper_func = + ovsdb_monitor_compose_update_process_one_row; + + for (index = 0; index < monitor_compose_pool->pool->size; index++) { + monitor_compose_pool->pool->controls[index].workload = + monitor_compose_pool; + } + } +} + /* Constructs and returns JSON for a object (as described in * RFC 7047) for all the outstanding changes within 'monitor', starting from * 'transaction'. */ @@ -1113,31 +1249,169 @@ ovsdb_monitor_compose_update( { struct json *json; size_t max_columns = ovsdb_monitor_max_columns(dbmon); - unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); json = NULL; struct ovsdb_monitor_change_set_for_table *mcst; LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) { struct ovsdb_monitor_row *row, *next; struct json *table_json = NULL; - struct ovsdb_monitor_table *mt = mcst->mt; - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) { - struct json *row_json; - row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row, - initial, changed, mcst->n_columns); - if (row_json) { - ovsdb_monitor_add_json_row(&json, mt->table->schema->name, - &table_json, row_json, - &row->uuid); + if (hmap_count(&mcst->rows) > PARALLEL_CUT_OFF_A && + can_parallelize_hashes(false)) { + + struct monitor_compose_update_info *mci; + struct ovs_list combined_result = OVS_LIST_INITIALIZER(&combined_result); + struct ovs_list *results = NULL; + int index; + + init_compose_pool(); + mci = xcalloc(sizeof(*mci), monitor_compose_pool->pool->size); + results = xcalloc(sizeof(*results), monitor_compose_pool->pool->size); + + for (index = 0; + index < monitor_compose_pool->pool->size; index++) { + + mci[index].mcst = mcst; + mci[index].condition = condition; + mci[index].initial = initial; + mci[index].changed = xmalloc(bitmap_n_bytes(max_columns)); + mci[index].row_update = row_update; + ovs_list_init(&results[index]); + mci[index].results = &results[index]; + monitor_compose_pool->pool->controls[index].data = &mci[index]; } + + run_pool_list(monitor_compose_pool->pool, &combined_result, results); + + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, &combined_result); + + free(results); + for (index = 0; index < monitor_compose_pool->pool->size; index++) { + free(mci[index].changed); + } + free(mci); + } else { + struct monitor_compose_update_info mci; + struct ovs_list results = OVS_LIST_INITIALIZER(&results); + mci.mcst = mcst; + mci.condition = condition; + mci.initial = initial; + mci.changed = xmalloc(bitmap_n_bytes(max_columns)); + mci.row_update = row_update; + mci.results = &results; + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) { + ovsdb_monitor_compose_update_process_one_row( + row, &mci); + } + if (!ovs_list_is_empty(mci.results)) { + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, mci.results); + } + free(mci.changed); } } - free(changed); return json; } +#define PARALLEL_CUT_OFF_B 8 + +struct monitor_cond_change_info { + struct ovsdb_monitor_table *mt; + struct ovsdb_monitor_session_condition *condition; + unsigned long int *changed; + struct ovs_list *results; +}; + +struct monitor_cond_change_pool { + void (*row_helper_func)(struct ovsdb_row *row, + struct monitor_cond_change_info *mi); + struct worker_pool *pool; +}; + +static void *monitor_cond_change_thread(void *arg) { + struct worker_control *control = (struct worker_control *) arg; + struct monitor_cond_change_pool *workload; + struct monitor_cond_change_info *mi; + struct ovsdb_row *row; + struct hmap *table_rows; + int bnum; + + + while (!stop_parallel_processing()) { + wait_for_work(control); + if (stop_parallel_processing()) { + return NULL; + } + workload = (struct monitor_cond_change_pool *) control->workload; + mi = (struct monitor_cond_change_info *) control->data; + if (mi && workload) { + table_rows = (struct hmap *) &mi->mt->table->rows; + for (bnum = control->id; + bnum <= mi->mt->table->rows.mask; + bnum += workload->pool->size) + { + HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, table_rows) { + if (stop_parallel_processing()) { + return NULL; + } + (workload->row_helper_func)(row, control->data); + } + } + } + post_completed_work(control); + } + return NULL; +} + +static void ovsdb_monitor_cond_change_process_one_row ( + struct ovsdb_row *row, + struct monitor_cond_change_info *mi) +{ + struct json *row_json; + struct mon_json_result *temp; + + row_json = ovsdb_monitor_compose_row_update2( + mi->mt, + mi->condition, + OVSDB_ROW, + row, + false, + mi->changed, + mi->mt->n_columns); + if (row_json) { + temp = xmalloc(sizeof(struct mon_json_result)); + temp->table_name = mi->mt->table->schema->name; + temp->row_uuid = ovsdb_row_get_uuid(row); + temp->row_json = row_json; + ovs_list_push_back(mi->results, &temp->list_node); + } +} + +static struct monitor_cond_change_pool *monitor_cond_pool = NULL; + +static void init_cond_pool(void) { + + int index; + + if (can_parallelize_hashes(false) && (!monitor_cond_pool)) { + monitor_cond_pool = + xmalloc(sizeof(*monitor_cond_pool)); + monitor_cond_pool->pool = + add_worker_pool(monitor_cond_change_thread); + monitor_cond_pool->row_helper_func = + ovsdb_monitor_cond_change_process_one_row; + + for (index = 0; index < monitor_cond_pool->pool->size; index++) { + monitor_cond_pool->pool->controls[index].workload = + monitor_cond_pool; + } + } +} + + static struct json* ovsdb_monitor_compose_cond_change_update( struct ovsdb_monitor *dbmon, @@ -1146,40 +1420,83 @@ ovsdb_monitor_compose_cond_change_update( struct shash_node *node; struct json *json = NULL; size_t max_columns = ovsdb_monitor_max_columns(dbmon); - unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); + SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; - struct ovsdb_row *row; struct json *table_json = NULL; struct ovsdb_condition *old_condition, *new_condition; - if (!ovsdb_monitor_get_table_conditions(mt, - condition, - &old_condition, - &new_condition) || - !ovsdb_condition_cmp_3way(old_condition, new_condition)) { - /* Nothing to update on this table */ - continue; - } + if (can_parallelize_hashes(false) && + (hmap_count(&mt->table->rows) > PARALLEL_CUT_OFF_B)) { + + struct monitor_cond_change_info *mi; + struct ovs_list combined_result = OVS_LIST_INITIALIZER(&combined_result); + struct ovs_list *results = NULL; + int index, count; + + init_cond_pool(); + mi = xcalloc(sizeof(*mi), monitor_cond_pool->pool->size); + results = xcalloc(sizeof(*results), monitor_cond_pool->pool->size); - /* Iterate over all rows in table */ - HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { - struct json *row_json; - - row_json = ovsdb_monitor_compose_row_update2(mt, condition, - OVSDB_ROW, row, - false, changed, - mt->n_columns); - if (row_json) { - ovsdb_monitor_add_json_row(&json, mt->table->schema->name, - &table_json, row_json, - ovsdb_row_get_uuid(row)); + count = monitor_cond_pool->pool->size; + + for (index = 0; index < count; index++) { + mi[index].changed = xmalloc(bitmap_n_bytes(max_columns)); + mi[index].condition = condition; + mi[index].mt = mt; + ovs_list_init(&results[index]); + mi[index].results = &results[index]; + monitor_cond_pool->pool->controls[index].data = &mi[index]; + } + + if (!ovsdb_monitor_get_table_conditions(mt, + condition, + &old_condition, + &new_condition) || + !ovsdb_condition_cmp_3way(old_condition, new_condition)) { + /* Nothing to update on this table */ + goto cleanup_scratchpad; } + + run_pool_list( + monitor_cond_pool->pool, + &combined_result, + results); + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, &combined_result); +cleanup_scratchpad: + for (index = 0; index < count; index++) { + free(mi[index].changed); + } + free(results); + free(mi); + } else { + /* Iterate over all rows in table - single threaded */ + + struct ovsdb_row *row; + struct monitor_cond_change_info mi; + struct ovs_list results = OVS_LIST_INITIALIZER(&results); + + init_cond_pool(); + + mi.changed = xmalloc(bitmap_n_bytes(max_columns)); + mi.condition = condition; + mi.mt = mt; + mi.results = &results; + + HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { + ovsdb_monitor_cond_change_process_one_row(row, &mi); + } + if (!ovs_list_is_empty(mi.results)) { + ovsdb_monitor_compose_cond_change_update_generate_json( + &json, &table_json, mi.results); + } + free(mi.changed); } ovsdb_monitor_table_condition_updated(mt, condition); } - free(changed); + return json; } diff --git a/ovsdb/ovsdb-idlc.in b/ovsdb/ovsdb-idlc.in index 61cded16d..c6f9e9ffa 100755 --- a/ovsdb/ovsdb-idlc.in +++ b/ovsdb/ovsdb-idlc.in @@ -253,6 +253,8 @@ const struct %(s)s *%(s)s_get_for_uuid(const struct ovsdb_idl *, const struct uu const struct %(s)s *%(s)s_table_get_for_uuid(const struct %(s)s_table *, const struct uuid *); const struct %(s)s *%(s)s_first(const struct ovsdb_idl *); const struct %(s)s *%(s)s_next(const struct %(s)s *); +const struct %(s)s *%(s)s_parallel_first(const struct ovsdb_idl *, ssize_t job_id, ssize_t pool); +const struct %(s)s *%(s)s_parallel_next(const struct %(s)s *, ssize_t pool); #define %(S)s_FOR_EACH(ROW, IDL) \\ for ((ROW) = %(s)s_first(IDL); \\ (ROW); \\ @@ -261,6 +263,10 @@ const struct %(s)s *%(s)s_next(const struct %(s)s *); for ((ROW) = %(s)s_first(IDL); \\ (ROW) ? ((NEXT) = %(s)s_next(ROW), 1) : 0; \\ (ROW) = (NEXT)) +#define %(S)s_PARALLEL_FOR_EACH(ROW, IDL, JOBID, POOL) \\ + for ((ROW) = %(s)s_parallel_first(IDL, JOBID, POOL); \\ + (ROW); \\ + (ROW) = %(s)s_parallel_next(ROW, POOL)) unsigned int %(s)s_get_seqno(const struct ovsdb_idl *); unsigned int %(s)s_row_get_seqno(const struct %(s)s *row, enum ovsdb_idl_change change); @@ -707,6 +713,18 @@ const struct %(s)s * return %(s)s_cast(ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s)); } +/* Returns a row in table "%(t)s" in 'idl', or a null pointer if that + * table is empty. + * + * Database tables are internally maintained as hash tables, so adding or + * removing rows while traversing the same table can cause some rows to be + * visited twice or not at apply. */ +const struct %(s)s * +%(s)s_parallel_first(const struct ovsdb_idl *idl, ssize_t job_id, ssize_t pool) +{ + return %(s)s_cast(parallel_ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s, job_id, pool)); +} + /* Returns a row following 'row' within its table, or a null pointer if 'row' * is the last row in its table. */ const struct %(s)s * @@ -715,6 +733,14 @@ const struct %(s)s * return %(s)s_cast(ovsdb_idl_next_row(&row->header_)); } +/* Returns a row following 'row' within its table slice, or a null pointer if 'row' + * is the last row in its table slice. */ +const struct %(s)s * +%(s)s_parallel_next(const struct %(s)s *row, ssize_t pool) +{ + return %(s)s_cast(parallel_ovsdb_idl_next_row(&row->header_, pool)); +} + unsigned int %(s)s_get_seqno(const struct ovsdb_idl *idl) { return ovsdb_idl_table_get_seqno(idl, &%(p)stable_%(tl)s);