@@ -36,6 +36,8 @@
#include "jsonrpc-server.h"
#include "monitor.h"
#include "util.h"
+#include "parallel-hmap.h"
+#include "parallel-json.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
@@ -1101,6 +1103,102 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
json_object_put(*table_json, uuid, row_json);
}
+/* Parallel processing thread */
+
+struct compose_update_info {
+ compose_row_update_cb_func row_update;
+ struct json *json;
+ struct ovsdb_monitor *dbmon;
+ struct ovsdb_monitor_change_set *mcs;
+ const struct ovsdb_monitor_session_condition *condition;
+ bool initial;
+};
+
+static void *
+compose_update_thread(void *arg)
+{
+ struct worker_control *control = (struct worker_control *) arg;
+ struct worker_pool *workload;
+ struct compose_update_info *cui;
+ int bnum;
+
+ while (!stop_parallel_processing()) {
+ wait_for_work(control);
+ workload = (struct worker_pool *) control->workload;
+ cui = (struct compose_update_info *) control->data;
+ if (stop_parallel_processing()) {
+ return NULL;
+ }
+ if (cui && workload) {
+ size_t max_columns = ovsdb_monitor_max_columns(cui->dbmon);
+ unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+ struct ovsdb_monitor_change_set_for_table *mcst;
+
+ LIST_FOR_EACH (mcst,
+ list_in_change_set, &cui->mcs->change_set_for_tables) {
+ struct ovsdb_monitor_row *row;
+ struct json *table_json = NULL;
+ struct ovsdb_monitor_table *mt = mcst->mt;
+
+ for (bnum = control->id;
+ bnum <= mcst->rows.mask; bnum += workload->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL (row,
+ hmap_node, bnum, &mcst->rows) {
+ struct json *row_json;
+ row_json = cui->row_update(mt, cui->condition,
+ OVSDB_MONITOR_ROW, row,
+ cui->initial, changed,
+ mcst->n_columns);
+ if (row_json) {
+ ovsdb_monitor_add_json_row(&cui->json,
+ mt->table->schema->name,
+ &table_json, row_json,
+ &row->uuid);
+ }
+ }
+ }
+ }
+ free(changed);
+ }
+ post_completed_work(control);
+ }
+ return NULL;
+}
+
+static struct worker_pool *monitor_pool_mcu = NULL;
+static bool pool_init_a_done = false;
+
+static void
+init_monitor_pool_mcu(void)
+{
+ if (!pool_init_a_done) {
+ if (can_parallelize_hashes(false)) {
+ monitor_pool_mcu = add_worker_pool(compose_update_thread);
+ if (monitor_pool_mcu) {
+ int i;
+ for (i = 0; i < monitor_pool_mcu->size; i++) {
+ monitor_pool_mcu->controls[i].workload = monitor_pool_mcu;
+ }
+ }
+ }
+ }
+ pool_init_a_done = true;
+}
+
+static void
+pool_mcu_merge_cb(struct worker_pool *pool OVS_UNUSED,
+ void *fin_result,
+ void *result_frags,
+ int index)
+{
+ struct json **result = fin_result;
+ struct compose_update_info *cui = result_frags;
+
+ parallel_json_merge_tables(result, cui[index].json);
+}
+
+
/* Constructs and returns JSON for a <table-updates> object (as described in
* RFC 7047) for all the outstanding changes within 'monitor', starting from
* 'transaction'. */
@@ -1111,30 +1209,48 @@ ovsdb_monitor_compose_update(
const struct ovsdb_monitor_session_condition *condition,
compose_row_update_cb_func row_update)
{
- struct json *json;
- size_t max_columns = ovsdb_monitor_max_columns(dbmon);
- unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+ struct json *json = NULL;
- json = NULL;
- struct ovsdb_monitor_change_set_for_table *mcst;
- LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
- struct ovsdb_monitor_row *row, *next;
- struct json *table_json = NULL;
- struct ovsdb_monitor_table *mt = mcst->mt;
+ init_monitor_pool_mcu();
+
+ if (monitor_pool_mcu) {
+ struct compose_update_info *cui =
+ xcalloc(sizeof(struct compose_update_info), monitor_pool_mcu->size);
+ int i;
+ for (i = 0; i < monitor_pool_mcu->size; i++) {
+ cui[i].row_update = row_update;
+ cui[i].json = NULL;
+ cui[i].dbmon = dbmon;
+ cui[i].mcs = mcs;
+ cui[i].condition = condition;
+ cui[i].initial = initial;
+ monitor_pool_mcu->controls[i].data = &cui[i];
+ }
+ run_pool_callback(monitor_pool_mcu, &json, cui, pool_mcu_merge_cb);
+ free(cui);
+ } else {
+ size_t max_columns = ovsdb_monitor_max_columns(dbmon);
+ unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+ struct ovsdb_monitor_change_set_for_table *mcst;
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
- struct json *row_json;
- row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
- initial, changed, mcst->n_columns);
- if (row_json) {
- ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
- &table_json, row_json,
- &row->uuid);
+ LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
+ struct ovsdb_monitor_row *row, *next;
+ struct json *table_json = NULL;
+ struct ovsdb_monitor_table *mt = mcst->mt;
+
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
+ struct json *row_json;
+ row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
+ initial, changed, mcst->n_columns);
+ if (row_json) {
+ ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+ &table_json, row_json,
+ &row->uuid);
+ }
}
}
+ free(changed);
}
- free(changed);
-
return json;
}