diff mbox series

[ovs-dev,2/2] ovsdb: Run monitor processing in parallel

Message ID 20210129133741.27211-2-anton.ivanov@cambridgegreys.com
State New
Headers show
Series [ovs-dev,1/2] Add support for parallel processing of hashes | expand

Commit Message

Anton Ivanov Jan. 29, 2021, 1:37 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Monitor processing performs a full walk of all rows in all
tables referenced in a monitor.

The rows are internally represented as a hash map.

This operation can be run in parallel where thread M out N is
walking hash buckets M, N+M, etc.

Running inter-thread IPC for only a handful of datum values
is not likely to be efficient, hence there is a cut-off. Tables
smaller than the cut-off value are run in the main thread.
Larger tables are run via multiple threads.

The cut-off value of 8 has been chosen based on experimentation
on machines with between 4 and 12 cores.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/ovsdb-idl.c     |  30 ++++
 lib/ovsdb-idl.h     |   6 +
 ovsdb/monitor.c     | 387 ++++++++++++++++++++++++++++++++++++++++----
 ovsdb/ovsdb-idlc.in |  26 +++
 4 files changed, 414 insertions(+), 35 deletions(-)

Comments

0-day Robot Jan. 29, 2021, 2:46 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line is 83 characters long (recommended limit is 79)
#174 FILE: ovsdb/monitor.c:1152:
    row_json = (*mci->row_update)(mci->mcst->mt, mci->condition, OVSDB_MONITOR_ROW,

WARNING: Line is 85 characters long (recommended limit is 79)
#290 FILE: ovsdb/monitor.c:1263:
            struct ovs_list combined_result = OVS_LIST_INITIALIZER(&combined_result);

WARNING: Line is 82 characters long (recommended limit is 79)
#296 FILE: ovsdb/monitor.c:1269:
            results = xcalloc(sizeof(*results), monitor_compose_pool->pool->size);

WARNING: Line is 81 characters long (recommended limit is 79)
#311 FILE: ovsdb/monitor.c:1284:
            run_pool_list(monitor_compose_pool->pool, &combined_result, results);

WARNING: Line is 80 characters long (recommended limit is 79)
#317 FILE: ovsdb/monitor.c:1290:
            for (index = 0; index < monitor_compose_pool->pool->size; index++) {

WARNING: Line has non-spaces leading whitespace
WARNING: Line has trailing whitespace
#452 FILE: ovsdb/monitor.c:1424:
    

WARNING: Line is 85 characters long (recommended limit is 79)
#471 FILE: ovsdb/monitor.c:1434:
            struct ovs_list combined_result = OVS_LIST_INITIALIZER(&combined_result);

Lines checked: 613, Warnings: 8, Errors: 0


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
diff mbox series

Patch

diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
index 2c8a0c9cf..a9aaa7d66 100644
--- a/lib/ovsdb-idl.c
+++ b/lib/ovsdb-idl.c
@@ -46,6 +46,7 @@ 
 #include "svec.h"
 #include "util.h"
 #include "uuid.h"
+#include "parallel-hmap.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
@@ -2298,6 +2299,24 @@  ovsdb_idl_first_row(const struct ovsdb_idl *idl,
     return next_real_row(table, hmap_first(&table->rows));
 }
 
+/* Returns a row in 'table_class''s table slice in 'idl', or a null pointer
+ * if that table slice is empty.
+ *
+ * Database tables are internally maintained as hash tables, so adding or
+ * removing rows while traversing the same table can cause some rows to be
+ * visited twice or not at apply. */
+const struct ovsdb_idl_row *
+parallel_ovsdb_idl_first_row(const struct ovsdb_idl *idl,
+                             const struct ovsdb_idl_table_class *table_class,
+                             ssize_t job_id,
+                             ssize_t pool_size)
+{
+    struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl,
+                                                               table_class);
+    return next_real_row(
+        table, parallel_hmap_first(&table->rows, job_id, pool_size));
+}
+
 /* Returns a row following 'row' within its table, or a null pointer if 'row'
  * is the last row in its table. */
 const struct ovsdb_idl_row *
@@ -2308,6 +2327,17 @@  ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
 }
 
+/* Returns a row following 'row' within its table slice, or a null pointer
+ *  if 'row' is the last row in its table slice. */
+const struct ovsdb_idl_row *
+parallel_ovsdb_idl_next_row(const struct ovsdb_idl_row *row, ssize_t pool_size)
+{
+    struct ovsdb_idl_table *table = row->table;
+
+    return next_real_row(table,
+        parallel_hmap_next(&table->rows, &row->hmap_node, pool_size));
+}
+
 /* Reads and returns the value of 'column' within 'row'.  If an ongoing
  * transaction has changed 'column''s value, the modified value is returned.
  *
diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h
index 05bb48d66..dcc1ad531 100644
--- a/lib/ovsdb-idl.h
+++ b/lib/ovsdb-idl.h
@@ -234,6 +234,12 @@  const struct ovsdb_idl_row *ovsdb_idl_first_row(
     const struct ovsdb_idl *, const struct ovsdb_idl_table_class *);
 const struct ovsdb_idl_row *ovsdb_idl_next_row(const struct ovsdb_idl_row *);
 
+const struct ovsdb_idl_row *parallel_ovsdb_idl_first_row(
+    const struct ovsdb_idl *, const struct ovsdb_idl_table_class *,
+    ssize_t job_id, ssize_t pool);
+const struct ovsdb_idl_row *parallel_ovsdb_idl_next_row(
+    const struct ovsdb_idl_row *, ssize_t pool);
+
 const struct ovsdb_datum *ovsdb_idl_read(const struct ovsdb_idl_row *,
                                          const struct ovsdb_idl_column *);
 const struct ovsdb_datum *ovsdb_idl_get(const struct ovsdb_idl_row *,
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 532dedcb6..2bde7772a 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -36,6 +36,7 @@ 
 #include "jsonrpc-server.h"
 #include "monitor.h"
 #include "util.h"
+#include "parallel-hmap.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
@@ -1084,7 +1085,6 @@  ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
                            const struct uuid *row_uuid)
 {
     char uuid[UUID_LEN + 1];
-
     /* Create JSON object for transaction overall. */
     if (!*json) {
         *json = json_object_create();
@@ -1101,6 +1101,142 @@  ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
     json_object_put(*table_json, uuid, row_json);
 }
 
+/* We cannot reuse the same multi-threaded conventions as for the cond
+ * update (further down) as the calling conventions and data supplied
+ * to update generation are quite different
+ *
+ * In addition to this, key parts of the code are dependent on container
+ * macros making it impossible to reuse processing code between the two.
+ *
+ * The only part which we can reuse is json result processing.
+ */
+
+/* The "cut-off" point determined by using debug timings seems to be
+ * somewhere ~ processing hashesh with 4-8 elements. That is surprisingly
+ * low. JSON serialization looks like the primary suspect. If that improves
+ * this should be increased accordingly.
+ */
+
+#define PARALLEL_CUT_OFF_A 8
+
+struct mon_json_result {
+    struct ovs_list list_node;
+    const char *table_name;
+    const struct uuid *row_uuid;
+    struct json *row_json;
+};
+
+struct monitor_compose_update_info {
+    struct ovsdb_monitor_change_set_for_table *mcst;
+    const struct ovsdb_monitor_session_condition *condition;
+    unsigned long int *changed;
+    struct ovs_list *results;
+    compose_row_update_cb_func row_update;
+    bool initial;
+};
+
+struct monitor_compose_update_pool {
+    void (*row_helper_func)(struct ovsdb_monitor_row *row,
+                            struct monitor_compose_update_info *mci);
+    struct worker_pool *pool;
+};
+
+
+static void ovsdb_monitor_compose_update_process_one_row (
+    struct ovsdb_monitor_row *row,
+    struct monitor_compose_update_info *mci)
+{
+    struct json *row_json;
+    struct mon_json_result *temp;
+
+    row_json = (*mci->row_update)(mci->mcst->mt, mci->condition, OVSDB_MONITOR_ROW,
+                                  row, mci->initial, mci->changed,
+                                  mci->mcst->n_columns);
+    if (row_json) {
+        temp = xmalloc(sizeof(struct mon_json_result));
+        temp->table_name = mci->mcst->mt->table->schema->name;
+        temp->row_uuid = &row->uuid;
+        temp->row_json = row_json;
+        ovs_list_push_back(mci->results, &temp->list_node);
+    }
+}
+
+
+static void *monitor_compose_update_thread(void *arg)
+{
+    struct worker_control *control = (struct worker_control *) arg;
+    struct monitor_compose_update_pool *workload;
+    struct monitor_compose_update_info *mci;
+    struct ovsdb_monitor_row *row;
+    struct hmap *table_rows;
+    int bnum;
+
+
+    while (!stop_parallel_processing()) {
+        wait_for_work(control);
+        if (stop_parallel_processing()) {
+            return NULL;
+        }
+        workload = (struct monitor_compose_update_pool *) control->workload;
+        mci = (struct monitor_compose_update_info *) control->data;
+        if (mci && workload) {
+            table_rows = (struct hmap *) &mci->mcst->rows;
+            for (bnum = control->id;
+                    bnum <= table_rows->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, table_rows) {
+                    if (stop_parallel_processing()) {
+                        return NULL;
+                    }
+                    (workload->row_helper_func)(row, control->data);
+                }
+            }
+        }
+        post_completed_work(control);
+    }
+    return NULL;
+}
+
+
+static void ovsdb_monitor_compose_cond_change_update_generate_json(
+    struct json **json, struct json **table_json,
+    struct ovs_list *temp_result)
+{
+    struct mon_json_result * temp;
+
+    LIST_FOR_EACH_POP (temp, list_node, temp_result) {
+        ovsdb_monitor_add_json_row(json,
+                temp->table_name,
+                table_json,
+                temp->row_json,
+                temp->row_uuid);
+        free(temp);
+    }
+}
+
+
+static struct monitor_compose_update_pool *monitor_compose_pool = NULL;
+
+static void init_compose_pool(void) {
+
+    int index;
+
+    if (can_parallelize_hashes() && (!monitor_compose_pool)) {
+        monitor_compose_pool =
+            xmalloc(sizeof(*monitor_compose_pool));
+        monitor_compose_pool->pool =
+            add_worker_pool(monitor_compose_update_thread);
+        monitor_compose_pool->row_helper_func =
+            ovsdb_monitor_compose_update_process_one_row;
+
+        for (index = 0; index < monitor_compose_pool->pool->size; index++) {
+            monitor_compose_pool->pool->controls[index].workload =
+                monitor_compose_pool;
+        }
+    }
+}
+
 /* Constructs and returns JSON for a <table-updates> object (as described in
  * RFC 7047) for all the outstanding changes within 'monitor', starting from
  * 'transaction'.  */
@@ -1113,31 +1249,169 @@  ovsdb_monitor_compose_update(
 {
     struct json *json;
     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
-    unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
 
     json = NULL;
     struct ovsdb_monitor_change_set_for_table *mcst;
     LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
         struct ovsdb_monitor_row *row, *next;
         struct json *table_json = NULL;
-        struct ovsdb_monitor_table *mt = mcst->mt;
 
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
-            struct json *row_json;
-            row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
-                                     initial, changed, mcst->n_columns);
-            if (row_json) {
-                ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
-                                           &table_json, row_json,
-                                           &row->uuid);
+        if (hmap_count(&mcst->rows) > PARALLEL_CUT_OFF_A &&
+                can_parallelize_hashes()) {
+
+            struct monitor_compose_update_info *mci;
+            struct ovs_list combined_result = OVS_LIST_INITIALIZER(&combined_result);
+            struct ovs_list *results = NULL;
+            int index;
+
+            init_compose_pool();
+            mci = xcalloc(sizeof(*mci), monitor_compose_pool->pool->size);
+            results = xcalloc(sizeof(*results), monitor_compose_pool->pool->size);
+
+            for (index = 0;
+                    index < monitor_compose_pool->pool->size; index++) {
+
+                mci[index].mcst = mcst;
+                mci[index].condition = condition;
+                mci[index].initial = initial;
+                mci[index].changed = xmalloc(bitmap_n_bytes(max_columns));
+                mci[index].row_update = row_update;
+                ovs_list_init(&results[index]);
+                mci[index].results = &results[index];
+                monitor_compose_pool->pool->controls[index].data = &mci[index];
+            }
+
+            run_pool_list(monitor_compose_pool->pool, &combined_result, results);
+
+            ovsdb_monitor_compose_cond_change_update_generate_json(
+                    &json, &table_json, &combined_result);
+
+            free(results);
+            for (index = 0; index < monitor_compose_pool->pool->size; index++) {
+                free(mci[index].changed);
             }
+            free(mci);
+        } else {
+            struct monitor_compose_update_info mci;
+            struct ovs_list results = OVS_LIST_INITIALIZER(&results);
+            mci.mcst = mcst;
+            mci.condition = condition;
+            mci.initial = initial;
+            mci.changed = xmalloc(bitmap_n_bytes(max_columns));
+            mci.row_update = row_update;
+            mci.results = &results;
+
+            HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
+                ovsdb_monitor_compose_update_process_one_row(
+                                row, &mci);
+            }
+            if (!ovs_list_is_empty(mci.results)) {
+                    ovsdb_monitor_compose_cond_change_update_generate_json(
+                        &json, &table_json, mci.results);
+            }
+            free(mci.changed);
         }
     }
-    free(changed);
 
     return json;
 }
 
+#define PARALLEL_CUT_OFF_B 8
+
+struct monitor_cond_change_info {
+    struct ovsdb_monitor_table *mt;
+    struct ovsdb_monitor_session_condition *condition;
+    unsigned long int *changed;
+    struct ovs_list *results;
+};
+
+struct monitor_cond_change_pool {
+    void (*row_helper_func)(struct ovsdb_row *row,
+            struct monitor_cond_change_info *mi);
+    struct worker_pool *pool;
+};
+
+static void *monitor_cond_change_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct monitor_cond_change_pool *workload;
+    struct monitor_cond_change_info *mi;
+    struct ovsdb_row *row;
+    struct hmap *table_rows;
+    int bnum;
+
+
+    while (!stop_parallel_processing()) {
+        wait_for_work(control);
+        if (stop_parallel_processing()) {
+            return NULL;
+        }
+        workload = (struct monitor_cond_change_pool *) control->workload;
+        mi = (struct monitor_cond_change_info *) control->data;
+        if (mi && workload) {
+            table_rows = (struct hmap *) &mi->mt->table->rows;
+            for (bnum = control->id;
+                    bnum <= mi->mt->table->rows.mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, table_rows) {
+                    if (stop_parallel_processing()) {
+                        return NULL;
+                    }
+                    (workload->row_helper_func)(row, control->data);
+                }
+            }
+        }
+        post_completed_work(control);
+    }
+    return NULL;
+}
+
+static void ovsdb_monitor_cond_change_process_one_row (
+        struct ovsdb_row *row,
+        struct monitor_cond_change_info *mi)
+{
+    struct json *row_json;
+    struct mon_json_result *temp;
+
+    row_json = ovsdb_monitor_compose_row_update2(
+            mi->mt,
+            mi->condition,
+            OVSDB_ROW,
+            row,
+            false,
+            mi->changed,
+            mi->mt->n_columns);
+    if (row_json) {
+        temp = xmalloc(sizeof(struct mon_json_result));
+        temp->table_name = mi->mt->table->schema->name;
+        temp->row_uuid = ovsdb_row_get_uuid(row);
+        temp->row_json = row_json;
+        ovs_list_push_back(mi->results, &temp->list_node);
+    }
+}
+
+static struct monitor_cond_change_pool *monitor_cond_pool = NULL;
+
+static void init_cond_pool(void) {
+
+    int index;
+
+    if (can_parallelize_hashes() && (!monitor_cond_pool)) {
+        monitor_cond_pool =
+            xmalloc(sizeof(*monitor_cond_pool));
+        monitor_cond_pool->pool =
+            add_worker_pool(monitor_cond_change_thread);
+        monitor_cond_pool->row_helper_func =
+            ovsdb_monitor_cond_change_process_one_row;
+
+        for (index = 0; index < monitor_cond_pool->pool->size; index++) {
+            monitor_cond_pool->pool->controls[index].workload =
+                monitor_cond_pool;
+        }
+    }
+}
+
+
 static struct json*
 ovsdb_monitor_compose_cond_change_update(
                     struct ovsdb_monitor *dbmon,
@@ -1146,40 +1420,83 @@  ovsdb_monitor_compose_cond_change_update(
     struct shash_node *node;
     struct json *json = NULL;
     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
-    unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
 
+    
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
-        struct ovsdb_row *row;
         struct json *table_json = NULL;
         struct ovsdb_condition *old_condition, *new_condition;
 
-        if (!ovsdb_monitor_get_table_conditions(mt,
-                                                condition,
-                                                &old_condition,
-                                                &new_condition) ||
-            !ovsdb_condition_cmp_3way(old_condition, new_condition)) {
-            /* Nothing to update on this table */
-            continue;
-        }
+        if (can_parallelize_hashes() &&
+                (hmap_count(&mt->table->rows) > PARALLEL_CUT_OFF_B)) {
+
+            struct monitor_cond_change_info *mi;
+            struct ovs_list combined_result = OVS_LIST_INITIALIZER(&combined_result);
+            struct ovs_list *results = NULL;
+            int index, count;
+
+            init_cond_pool();
+            mi = xcalloc(sizeof(*mi), monitor_cond_pool->pool->size);
+            results = xcalloc(sizeof(*results), monitor_cond_pool->pool->size);
 
-        /* Iterate over all rows in table */
-        HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
-            struct json *row_json;
-
-            row_json = ovsdb_monitor_compose_row_update2(mt, condition,
-                                                         OVSDB_ROW, row,
-                                                         false, changed,
-                                                         mt->n_columns);
-            if (row_json) {
-                ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
-                                           &table_json, row_json,
-                                           ovsdb_row_get_uuid(row));
+            count = monitor_cond_pool->pool->size;
+
+            for (index = 0; index < count; index++) {
+                mi[index].changed = xmalloc(bitmap_n_bytes(max_columns));
+                mi[index].condition = condition;
+                mi[index].mt = mt;
+                ovs_list_init(&results[index]);
+                mi[index].results = &results[index];
+                monitor_cond_pool->pool->controls[index].data = &mi[index];
+            }
+
+            if (!ovsdb_monitor_get_table_conditions(mt,
+                                                    condition,
+                                                    &old_condition,
+                                                    &new_condition) ||
+                !ovsdb_condition_cmp_3way(old_condition, new_condition)) {
+                /* Nothing to update on this table */
+                goto cleanup_scratchpad;
             }
+
+            run_pool_list(
+                monitor_cond_pool->pool,
+                &combined_result,
+                results);
+            ovsdb_monitor_compose_cond_change_update_generate_json(
+                    &json, &table_json, &combined_result);
+cleanup_scratchpad:
+            for (index = 0; index < count; index++) {
+                free(mi[index].changed);
+            }
+            free(results);
+            free(mi);
+        } else {
+            /* Iterate over all rows in table - single threaded */
+
+            struct ovsdb_row *row;
+            struct monitor_cond_change_info mi;
+            struct ovs_list results = OVS_LIST_INITIALIZER(&results);
+
+            init_cond_pool();
+
+            mi.changed = xmalloc(bitmap_n_bytes(max_columns));
+            mi.condition = condition;
+            mi.mt = mt;
+            mi.results = &results;
+
+            HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+                ovsdb_monitor_cond_change_process_one_row(row, &mi);
+            }
+            if (!ovs_list_is_empty(mi.results)) {
+                ovsdb_monitor_compose_cond_change_update_generate_json(
+                    &json, &table_json, mi.results);
+            }
+            free(mi.changed);
         }
         ovsdb_monitor_table_condition_updated(mt, condition);
     }
-    free(changed);
+
 
     return json;
 }
diff --git a/ovsdb/ovsdb-idlc.in b/ovsdb/ovsdb-idlc.in
index 5914e0878..b3245b415 100755
--- a/ovsdb/ovsdb-idlc.in
+++ b/ovsdb/ovsdb-idlc.in
@@ -254,6 +254,8 @@  const struct %(s)s *%(s)s_get_for_uuid(const struct ovsdb_idl *, const struct uu
 const struct %(s)s *%(s)s_table_get_for_uuid(const struct %(s)s_table *, const struct uuid *);
 const struct %(s)s *%(s)s_first(const struct ovsdb_idl *);
 const struct %(s)s *%(s)s_next(const struct %(s)s *);
+const struct %(s)s *%(s)s_parallel_first(const struct ovsdb_idl *, ssize_t job_id, ssize_t pool);
+const struct %(s)s *%(s)s_parallel_next(const struct %(s)s *, ssize_t pool);
 #define %(S)s_FOR_EACH(ROW, IDL) \\
         for ((ROW) = %(s)s_first(IDL); \\
              (ROW); \\
@@ -262,6 +264,10 @@  const struct %(s)s *%(s)s_next(const struct %(s)s *);
         for ((ROW) = %(s)s_first(IDL); \\
              (ROW) ? ((NEXT) = %(s)s_next(ROW), 1) : 0; \\
              (ROW) = (NEXT))
+#define %(S)s_PARALLEL_FOR_EACH(ROW, IDL, JOBID, POOL) \\
+        for ((ROW) = %(s)s_parallel_first(IDL, JOBID, POOL); \\
+             (ROW); \\
+             (ROW) = %(s)s_parallel_next(ROW, POOL))
 
 unsigned int %(s)s_get_seqno(const struct ovsdb_idl *);
 unsigned int %(s)s_row_get_seqno(const struct %(s)s *row, enum ovsdb_idl_change change);
@@ -708,6 +714,18 @@  const struct %(s)s *
     return %(s)s_cast(ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s));
 }
 
+/* Returns a row in table "%(t)s" in 'idl', or a null pointer if that
+ * table is empty.
+ *
+ * Database tables are internally maintained as hash tables, so adding or
+ * removing rows while traversing the same table can cause some rows to be
+ * visited twice or not at apply. */
+const struct %(s)s *
+%(s)s_parallel_first(const struct ovsdb_idl *idl, ssize_t job_id, ssize_t pool)
+{
+    return %(s)s_cast(parallel_ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s, job_id, pool));
+}
+
 /* Returns a row following 'row' within its table, or a null pointer if 'row'
  * is the last row in its table. */
 const struct %(s)s *
@@ -716,6 +734,14 @@  const struct %(s)s *
     return %(s)s_cast(ovsdb_idl_next_row(&row->header_));
 }
 
+/* Returns a row following 'row' within its table slice, or a null pointer if 'row'
+ * is the last row in its table slice. */
+const struct %(s)s *
+%(s)s_parallel_next(const struct %(s)s *row, ssize_t pool)
+{
+    return %(s)s_cast(parallel_ovsdb_idl_next_row(&row->header_, pool));
+}
+
 unsigned int %(s)s_get_seqno(const struct ovsdb_idl *idl)
 {
     return ovsdb_idl_table_get_seqno(idl, &%(p)stable_%(tl)s);