diff mbox series

[ovs-dev,3/4] ovsdb: Parallel processing for monitor compose update

Message ID 20210803170218.7290-3-anton.ivanov@cambridgegreys.com
State Deferred
Headers show
Series [ovs-dev,1/4] Migrate parallelisation routines from the OVN tree | expand

Checks

Context Check Description
ovsrobot/apply-robot warning apply and check: warning
ovsrobot/github-robot success github build: passed

Commit Message

Anton Ivanov Aug. 3, 2021, 5:02 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Enable parallel processing of monitor compose update reusing
the json merge routines for parallel snapshotting.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 ovsdb/monitor.c | 154 ++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 135 insertions(+), 19 deletions(-)

Comments

0-day Robot Aug. 3, 2021, 5:22 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line is 80 characters long (recommended limit is 79)
#149 FILE: ovsdb/monitor.c:1218:
            xcalloc(sizeof(struct compose_update_info), monitor_pool_mcu->size);

Lines checked: 200, Warnings: 1, Errors: 0


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
diff mbox series

Patch

diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 532dedcb6..7c7a9faea 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -36,6 +36,8 @@ 
 #include "jsonrpc-server.h"
 #include "monitor.h"
 #include "util.h"
+#include "parallel-hmap.h"
+#include "parallel-json.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
@@ -1101,6 +1103,102 @@  ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
     json_object_put(*table_json, uuid, row_json);
 }
 
+/* Parallel processing thread */
+
+struct compose_update_info {
+    compose_row_update_cb_func row_update;
+    struct json *json;
+    struct ovsdb_monitor *dbmon;
+    struct ovsdb_monitor_change_set *mcs;
+    const struct ovsdb_monitor_session_condition *condition;
+    bool initial;
+};
+
+static void *
+compose_update_thread(void *arg)
+{
+    struct worker_control *control = (struct worker_control *) arg;
+    struct worker_pool *workload;
+    struct compose_update_info *cui;
+    int bnum;
+
+    while (!stop_parallel_processing()) {
+        wait_for_work(control);
+        workload = (struct worker_pool *) control->workload;
+        cui = (struct compose_update_info *) control->data;
+        if (stop_parallel_processing()) {
+            return NULL;
+        }
+        if (cui && workload) {
+            size_t max_columns = ovsdb_monitor_max_columns(cui->dbmon);
+            unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+            struct ovsdb_monitor_change_set_for_table *mcst;
+
+            LIST_FOR_EACH (mcst,
+                    list_in_change_set, &cui->mcs->change_set_for_tables) {
+                struct ovsdb_monitor_row *row;
+                struct json *table_json = NULL;
+                struct ovsdb_monitor_table *mt = mcst->mt;
+
+                for (bnum = control->id;
+                     bnum <= mcst->rows.mask; bnum += workload->size)
+                {
+                    HMAP_FOR_EACH_IN_PARALLEL (row,
+                            hmap_node, bnum, &mcst->rows) {
+                        struct json *row_json;
+                        row_json = cui->row_update(mt, cui->condition,
+                                                   OVSDB_MONITOR_ROW, row,
+                                                   cui->initial, changed,
+                                                   mcst->n_columns);
+                        if (row_json) {
+                            ovsdb_monitor_add_json_row(&cui->json,
+                                                       mt->table->schema->name,
+                                                       &table_json, row_json,
+                                                       &row->uuid);
+                        }
+                    }
+                }
+            }
+            free(changed);
+        }
+        post_completed_work(control);
+    }
+    return NULL;
+}
+
+static struct worker_pool *monitor_pool_mcu = NULL;
+static bool pool_init_a_done = false;
+
+static void
+init_monitor_pool_mcu(void)
+{
+    if (!pool_init_a_done) {
+        if (can_parallelize_hashes(false)) {
+            monitor_pool_mcu = add_worker_pool(compose_update_thread);
+            if (monitor_pool_mcu) {
+                int i;
+                for (i = 0; i < monitor_pool_mcu->size; i++) {
+                   monitor_pool_mcu->controls[i].workload = monitor_pool_mcu;
+                }
+            }
+        }
+    }
+    pool_init_a_done = true;
+}
+
+static void
+pool_mcu_merge_cb(struct worker_pool *pool OVS_UNUSED,
+              void *fin_result,
+              void *result_frags,
+              int index)
+{
+    struct json **result = fin_result;
+    struct compose_update_info *cui = result_frags;
+
+    parallel_json_merge_tables(result, cui[index].json);
+}
+
+
 /* Constructs and returns JSON for a <table-updates> object (as described in
  * RFC 7047) for all the outstanding changes within 'monitor', starting from
  * 'transaction'.  */
@@ -1111,30 +1209,48 @@  ovsdb_monitor_compose_update(
                       const struct ovsdb_monitor_session_condition *condition,
                       compose_row_update_cb_func row_update)
 {
-    struct json *json;
-    size_t max_columns = ovsdb_monitor_max_columns(dbmon);
-    unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+    struct json *json = NULL;
 
-    json = NULL;
-    struct ovsdb_monitor_change_set_for_table *mcst;
-    LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
-        struct ovsdb_monitor_row *row, *next;
-        struct json *table_json = NULL;
-        struct ovsdb_monitor_table *mt = mcst->mt;
+    init_monitor_pool_mcu();
+
+    if (monitor_pool_mcu) {
+        struct compose_update_info *cui =
+            xcalloc(sizeof(struct compose_update_info), monitor_pool_mcu->size);
+        int i;
+        for (i = 0; i < monitor_pool_mcu->size; i++) {
+            cui[i].row_update = row_update;
+            cui[i].json = NULL;
+            cui[i].dbmon = dbmon;
+            cui[i].mcs = mcs;
+            cui[i].condition = condition;
+            cui[i].initial = initial;
+            monitor_pool_mcu->controls[i].data = &cui[i];
+        }
+        run_pool_callback(monitor_pool_mcu, &json, cui, pool_mcu_merge_cb);
+        free(cui);
+    } else {
+        size_t max_columns = ovsdb_monitor_max_columns(dbmon);
+        unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+        struct ovsdb_monitor_change_set_for_table *mcst;
 
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
-            struct json *row_json;
-            row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
-                                     initial, changed, mcst->n_columns);
-            if (row_json) {
-                ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
-                                           &table_json, row_json,
-                                           &row->uuid);
+        LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
+            struct ovsdb_monitor_row *row, *next;
+            struct json *table_json = NULL;
+            struct ovsdb_monitor_table *mt = mcst->mt;
+
+            HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
+                struct json *row_json;
+                row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
+                                         initial, changed, mcst->n_columns);
+                if (row_json) {
+                    ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+                                               &table_json, row_json,
+                                               &row->uuid);
+                }
             }
         }
+        free(changed);
     }
-    free(changed);
-
     return json;
 }