diff mbox series

[ovs-dev,RFC] ovsdb-server: Fix the possible data loss issue in an active/standby setup - APPROACH 2 (using in memory db)

Message ID 20180903054920.6348-1-nusiddiq@redhat.com
State Not Applicable
Headers show
Series [ovs-dev,RFC] ovsdb-server: Fix the possible data loss issue in an active/standby setup - APPROACH 2 (using in memory db) | expand

Commit Message

Numan Siddique Sept. 3, 2018, 5:49 a.m. UTC
From: Numan Siddique <nusiddiq@redhat.com>

The present code resets the database when it is in the state -
'RPL_S_SCHEMA_REQUESTED' and repopulates the database when it
receives the monitor reply when it is in the state -
'RPL_S_MONITOR_REQUESTED'. If however, it goes to active mode
before it processes the monitor reply, the whole data is lost.

This patch fixes the issue by
 - deleting the code to reset the database in the state -
   'RPL_S_SCHEMA_REQUESTED'
 - storing the result of the process_notification() (when it
   receives the monitor reply) in the memory (using unbacked
   'struct ovsdb *' object)
 - resetting the database
 - repopulating the db from the in memory db

This approach is a bit memory invasive and not very effecient, but
guarantees that the data is safe.

An alternate approach to solve this issue is to reset the database
when it receives the monitor reply (before processing it), so that
reset and repopulation of the db happens in the same state. This
is simpler, but it has a small window for data loss
if the function process_notification() which processes the monitor
reply fails for some reason.

Reported-by: Han Zhou <zhouhan@gmail.com>
Reported-at: https://mail.openvswitch.org/pipermail/ovs-discuss/2018-August/047161.html
Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
---
 ovsdb/replication.c | 137 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 115 insertions(+), 22 deletions(-)
diff mbox series

Patch

diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 2b9ae2f83..1a648b899 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -30,6 +30,7 @@ 
 #include "replication.h"
 #include "row.h"
 #include "sset.h"
+#include "storage.h"
 #include "stream.h"
 #include "svec.h"
 #include "table.h"
@@ -105,11 +106,18 @@  static enum ovsdb_replication_state state;
  * schema matches.  */
 static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
 static struct shash *replication_dbs;
+static struct shash unbacked_dbs = SHASH_INITIALIZER(&unbacked_dbs);
 
 static struct shash *replication_db_clone(struct shash *dbs);
 static void replication_dbs_destroy(void);
 /* Find 'struct ovsdb' by name within 'replication_dbs' */
 static struct ovsdb* find_db(const char *db_name);
+
+static void create_unbacked_db(struct ovsdb *db);
+static void unbacked_dbs_destroy(void);
+static struct ovsdb *find_unbacked_db(const char *db_name);
+static struct ovsdb_error *clone_from_unbacked_database(
+    struct ovsdb *db, struct ovsdb *unbacked_db);
 
 
 void
@@ -123,7 +131,7 @@  replication_init(const char *sync_from_, const char *exclude_tables,
     ovs_assert(!set_blacklist_tables(exclude_tables, false));
 
     replication_dbs_destroy();
-
+    unbacked_dbs_destroy();
     shash_clear(&local_dbs);
     if (session) {
         jsonrpc_session_close(session);
@@ -299,20 +307,7 @@  replication_run(void)
                 /* After receiving schemas, reset the local databases that
                  * will be monitored and send out monitor requests for them. */
                 if (hmap_is_empty(&request_ids)) {
-                    struct shash_node *node, *next;
-
-                    SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
-                        db = node->data;
-                        error = reset_database(db);
-                        if (error) {
-                            const char *db_name = db->schema->name;
-                            shash_find_and_delete(replication_dbs, db_name);
-                            ovsdb_error_assert(error);
-                            VLOG_WARN("Failed to reset database, "
-                                      "%s not replicated.", db_name);
-                        }
-                    }
-
+                    struct shash_node *node;
                     if (shash_is_empty(replication_dbs)) {
                         VLOG_WARN("Nothing to replicate.");
                         state = RPL_S_ERR;
@@ -321,7 +316,7 @@  replication_run(void)
                             db = node->data;
                             struct jsonrpc_msg *request =
                                 create_monitor_request(db);
-
+                            create_unbacked_db(db);
                             request_ids_add(request->id, db);
                             jsonrpc_session_send(session, request);
                             VLOG_DBG("Send monitor requests");
@@ -335,18 +330,51 @@  replication_run(void)
             case RPL_S_MONITOR_REQUESTED: {
                 /* Reply to monitor requests. */
                 struct ovsdb_error *error;
-                error = process_notification(msg->result, db);
+                struct ovsdb *unbacked_db = find_unbacked_db(db->name);
+                error = process_notification(msg->result, unbacked_db);
                 if (error) {
                     ovsdb_error_assert(error);
                     state = RPL_S_ERR;
-                } else {
-                    /* Transition to replicating state after receiving
-                     * all replies of "monitor" requests. */
-                    if (hmap_is_empty(&request_ids)) {
+                    break;
+                }
+                if (!hmap_is_empty(&request_ids)) {
+                    break;
+                }
+                if (shash_is_empty(replication_dbs)) {
+                    VLOG_WARN("Nothing to replicate.");
+                    state = RPL_S_ERR;
+                    break;
+                }
+
+                struct shash_node *node;
+                SHASH_FOR_EACH (node, replication_dbs) {
+                    db = node->data;
+                    /* Reset the local databases and then clone from the
+                     * unbacked db.
+                     */
+                    error = reset_database(db);
+                    if (error) {
+                        const char *db_name = db->schema->name;
+                        shash_find_and_delete(replication_dbs, db_name);
+                        ovsdb_error_assert(error);
+                        VLOG_WARN("Failed to reset database, "
+                                  "%s not replicated.", db_name);
+                        state = RPL_S_ERR;
+                        break;
+                    }
+                    unbacked_db = find_unbacked_db(db->name);
+                    error = clone_from_unbacked_database(db, unbacked_db);
+                    if (error) {
+                        ovsdb_error_assert(error);
+                        VLOG_WARN("Failed to set database, "
+                                  "%s not replicated.", db->name);
+                        state = RPL_S_ERR;
+                    } else {
                         VLOG_DBG("Listening to monitor updates");
                         state = RPL_S_REPLICATING;
                     }
                 }
+                unbacked_dbs_destroy();
                 break;
             }
 
@@ -508,7 +536,7 @@  replication_destroy(void)
 
     request_ids_destroy();
     replication_dbs_destroy();
-
+    unbacked_dbs_destroy();
     shash_destroy(&local_dbs);
 }
 
@@ -517,6 +545,37 @@  find_db(const char *db_name)
 {
     return shash_find_data(replication_dbs, db_name);
 }
+
+static void
+create_unbacked_db(struct ovsdb *db)
+{
+    struct ovsdb *unbacked_db = ovsdb_create(ovsdb_schema_clone(db->schema),
+                                             ovsdb_storage_create_unbacked());
+    struct shash_node *node = shash_find(&unbacked_dbs, db->name);
+    if (node) {
+        shash_delete(&unbacked_dbs, node);
+        ovsdb_destroy(node->data);
+    }
+    shash_add_assert(&unbacked_dbs, db->name, unbacked_db);
+}
+
+static void
+unbacked_dbs_destroy(void)
+{
+    struct shash_node *node, *next;
+
+    SHASH_FOR_EACH_SAFE (node, next, &unbacked_dbs) {
+        ovsdb_destroy(node->data);
+        shash_delete(&unbacked_dbs, node);
+    }
+    shash_destroy(&unbacked_dbs);
+}
+
+static struct ovsdb *
+find_unbacked_db(const char *db_name)
+{
+    return shash_find_data(&unbacked_dbs, db_name);
+}
 
 static struct ovsdb_error *
 reset_database(struct ovsdb *db)
@@ -538,6 +597,40 @@  reset_database(struct ovsdb *db)
     return ovsdb_txn_propose_commit_block(txn, false);
 }
 
+static struct ovsdb_error *
+clone_from_unbacked_database(struct ovsdb *db, struct ovsdb *unbacked_db)
+{
+    struct ovsdb_txn *txn = ovsdb_txn_create(db);
+    struct shash_node *table_node;
+
+    SHASH_FOR_EACH (table_node, &unbacked_db->tables) {
+        struct ovsdb_table *unbacked_db_table = table_node->data;
+        struct ovsdb_table *table = ovsdb_get_table(db, table_node->name);
+        struct ovsdb_row *unbacked_db_row, *next;
+        struct ovsdb_column_set all_columns;
+        ovsdb_column_set_init(&all_columns);
+        ovsdb_column_set_add_all(&all_columns, unbacked_db_table);
+
+        HMAP_FOR_EACH_SAFE (unbacked_db_row, next, hmap_node,
+                            &unbacked_db_table->rows) {
+            const struct uuid *row_uuid = ovsdb_row_get_uuid(unbacked_db_row);
+            struct ovsdb_row *row = ovsdb_row_create(table);
+            struct json *unbacked_db_json_row =
+                ovsdb_row_to_json(unbacked_db_row, &all_columns);
+            struct ovsdb_error *error = ovsdb_row_from_json(
+                row, unbacked_db_json_row, NULL, NULL);
+            if (error) {
+                return error;
+            }
+            *ovsdb_row_get_uuid_rw(row) = *row_uuid;
+            ovsdb_txn_row_insert(txn, row);
+        }
+        ovsdb_column_set_destroy(&all_columns);
+    }
+
+    return ovsdb_txn_propose_commit_block(txn, false);
+}
+
 /* Create a monitor request for 'db'. The monitor request will include
  * any tables from 'blacklisted_tables'
  *