diff mbox series

[ovs-dev,09/11] ovn-northd-ddlog: Apply multiple database updates in single ddlog txn.

Message ID 20210304041012.4128938-10-blp@ovn.org
State Deferred
Headers show
Series ovn-northd-ddlog improvements | expand

Commit Message

Ben Pfaff March 4, 2021, 4:10 a.m. UTC
DDlog can apply a larger batch of updates more efficiently than a series
of smaller ones.  Until now, ovn-northd-ddlog has always applied updates
one-by-one.  This commit changes it so that if multiple updates are
received in a batch, it applies all of them within a single ddlog
transaction.

Signed-off-by: Ben Pfaff <blp@ovn.org>
---
 northd/ovn-northd-ddlog.c | 115 ++++++++++++++++++++++++--------------
 1 file changed, 72 insertions(+), 43 deletions(-)
diff mbox series

Patch

diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
index a510e64fd275..aa0ea73e401d 100644
--- a/northd/ovn-northd-ddlog.c
+++ b/northd/ovn-northd-ddlog.c
@@ -358,10 +358,11 @@  json_object_get(const struct json *json, const char *member_name)
             : NULL);
 }
 
-/* Returns the new value of NB_Global::nb_cfg, if any, from the updates in
- * <table-updates> provided by the caller, or INT64_MIN if none is present. */
-static int64_t
-get_nb_cfg(const struct json *table_updates)
+/* Stores into '*nb_cfgp' the new value of NB_Global::nb_cfg in the updates in
+ * <table-updates> provided by the caller.  Leaves '*nb_cfgp' alone if the
+ * updates don't set NB_Global::nb_cfg. */
+static void
+get_nb_cfg(const struct json *table_updates, int64_t *nb_cfgp)
 {
     const struct json *nb_global = json_object_get(table_updates, "NB_Global");
     if (nb_global) {
@@ -371,59 +372,72 @@  get_nb_cfg(const struct json *table_updates)
             const struct json *new = json_object_get(value, "new");
             const struct json *nb_cfg = json_object_get(new, "nb_cfg");
             if (nb_cfg && nb_cfg->type == JSON_INTEGER) {
-                return json_integer(nb_cfg);
+                *nb_cfgp = json_integer(nb_cfg);
+                return;
             }
         }
     }
-    return INT64_MIN;
 }
 
 static void
-northd_parse_update(struct northd_ctx *ctx,
-                    const struct ovsdb_cs_update_event *update)
+northd_parse_updates(struct northd_ctx *ctx, struct ovs_list *updates)
 {
+    if (ovs_list_is_empty(updates)) {
+        return;
+    }
+
     if (ddlog_transaction_start(ctx->ddlog)) {
         VLOG_WARN("DDlog failed to start transaction");
         return;
     }
 
-    if (update->clear && ddlog_clear(ctx)) {
-        goto error;
-    }
-    char *updates_s = json_to_string(update->table_updates, 0);
-    if (ddlog_apply_ovsdb_updates(ctx->ddlog, ctx->prefix, updates_s)) {
-        VLOG_WARN("DDlog failed to apply updates %s", updates_s);
-        free(updates_s);
-        goto error;
-    }
-    free(updates_s);
 
-    /* Whenever a new 'nb_cfg' value comes in, take the current time and push
-     * it into the NbCfgTimestamp relation for the DDlog program to put into
-     * nb::NB_Global.nb_cfg_timestamp. */
+    /* Whenever a new 'nb_cfg' value comes in, we take the current time and
+     * push it into the NbCfgTimestamp relation for the DDlog program to put
+     * into nb::NB_Global.nb_cfg_timestamp.
+     *
+     * The 'old_nb_cfg' variables track the state we've pushed into DDlog.
+     * The 'new_nb_cfg' variables track what 'updates' sets (by default,
+     * no change, so we initialize from the old variables). */
     static int64_t old_nb_cfg = INT64_MIN;
     static int64_t old_nb_cfg_timestamp = INT64_MIN;
-    int64_t new_nb_cfg = old_nb_cfg;
+    int64_t new_nb_cfg = old_nb_cfg == INT64_MIN ? 0 : old_nb_cfg;
     int64_t new_nb_cfg_timestamp = old_nb_cfg_timestamp;
-    if (ctx->has_timestamp_columns) {
-        new_nb_cfg = get_nb_cfg(update->table_updates);
-        if (new_nb_cfg == INT64_MIN) {
-            new_nb_cfg = old_nb_cfg == INT64_MIN ? 0 : old_nb_cfg;
+
+    struct ovsdb_cs_event *event;
+    LIST_FOR_EACH (event, list_node, updates) {
+        ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE);
+        struct ovsdb_cs_update_event *update = &event->update;
+        if (update->clear && ddlog_clear(ctx)) {
+            goto error;
         }
-        if (new_nb_cfg != old_nb_cfg) {
-            new_nb_cfg_timestamp = time_wall_msec();
-
-            ddlog_cmd *updates[2];
-            int n_updates = 0;
-            if (old_nb_cfg_timestamp != INT64_MIN) {
-                updates[n_updates++] = ddlog_delete_val_cmd(
-                    NB_CFG_TIMESTAMP_ID, ddlog_i64(old_nb_cfg_timestamp));
-            }
-            updates[n_updates++] = ddlog_insert_cmd(
-                NB_CFG_TIMESTAMP_ID, ddlog_i64(new_nb_cfg_timestamp));
-            if (ddlog_apply_updates(ctx->ddlog, updates, n_updates) < 0) {
-                goto error;
-            }
+
+        char *updates_s = json_to_string(update->table_updates, 0);
+        if (ddlog_apply_ovsdb_updates(ctx->ddlog, ctx->prefix, updates_s)) {
+            VLOG_WARN("DDlog failed to apply updates %s", updates_s);
+            free(updates_s);
+            goto error;
+        }
+        free(updates_s);
+
+        if (ctx->has_timestamp_columns) {
+            get_nb_cfg(update->table_updates, &new_nb_cfg);
+        }
+    }
+
+    if (ctx->has_timestamp_columns && new_nb_cfg != old_nb_cfg) {
+        new_nb_cfg_timestamp = time_wall_msec();
+
+        ddlog_cmd *cmds[2];
+        int n_cmds = 0;
+        if (old_nb_cfg_timestamp != INT64_MIN) {
+            cmds[n_cmds++] = ddlog_delete_val_cmd(
+                NB_CFG_TIMESTAMP_ID, ddlog_i64(old_nb_cfg_timestamp));
+        }
+        cmds[n_cmds++] = ddlog_insert_cmd(
+            NB_CFG_TIMESTAMP_ID, ddlog_i64(new_nb_cfg_timestamp));
+        if (ddlog_apply_updates(ctx->ddlog, cmds, n_cmds) < 0) {
+            goto error;
         }
     }
 
@@ -486,6 +500,15 @@  northd_process_txn_reply(struct northd_ctx *ctx,
     }
 }
 
+static void
+destroy_event_list(struct ovs_list *events)
+{
+    struct ovsdb_cs_event *event;
+    LIST_FOR_EACH_POP (event, list_node, events) {
+        ovsdb_cs_event_destroy(event);
+    }
+}
+
 /* Processes a batch of messages from the database server on 'ctx'. */
 static void
 northd_run(struct northd_ctx *ctx)
@@ -493,6 +516,7 @@  northd_run(struct northd_ctx *ctx)
     struct ovs_list events;
     ovsdb_cs_run(ctx->cs, &events);
 
+    struct ovs_list updates = OVS_LIST_INITIALIZER(&updates);
     struct ovsdb_cs_event *event;
     LIST_FOR_EACH_POP (event, list_node, &events) {
         switch (event->type) {
@@ -505,8 +529,11 @@  northd_run(struct northd_ctx *ctx)
             break;
 
         case OVSDB_CS_EVENT_TYPE_UPDATE:
-            northd_parse_update(ctx, &event->update);
-            break;
+            if (event->update.clear) {
+                destroy_event_list(&updates);
+            }
+            ovs_list_push_back(&updates, &event->list_node);
+            continue;
 
         case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
             northd_process_txn_reply(ctx, event->txn_reply);
@@ -515,6 +542,9 @@  northd_run(struct northd_ctx *ctx)
         ovsdb_cs_event_destroy(event);
     }
 
+    northd_parse_updates(ctx, &updates);
+    destroy_event_list(&updates);
+
     if (ctx->state == S_INITIAL && ovsdb_cs_may_send_transaction(ctx->cs)) {
         northd_send_output_only_data_request(ctx);
     }
@@ -1128,7 +1158,6 @@  main(int argc, char *argv[])
     unixctl_command_register("exit", "", 0, 0, ovn_northd_exit, &exiting);
     unixctl_command_register("status", "", 0, 0, ovn_northd_status, &status);
 
-
     ddlog_prog ddlog;
     ddlog = ddlog_run(1, false, ddlog_print_error, &delta);
     if (!ddlog) {