diff mbox series

[ovs-dev,10/22] ovsdb: replication: Isolate databases from each other.

Message ID 20231214010431.1664005-11-i.maximets@ovn.org
State Superseded
Delegated to: Ilya Maximets
Headers show
Series [ovs-dev,01/22] ovsdb-server.at: Enbale debug logs in active-backup tests. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/intel-ovs-compilation success test: success

Commit Message

Ilya Maximets Dec. 14, 2023, 1:04 a.m. UTC
Refactoring of the replication code, so each database is handled
separately from each other.  Supposed to work the same way as before
with the only difference that each backup database will have its own
connection to the source and will have its own state machine.

From the user's perspective, the only visible difference is that
ovsdb-server/sync-status appctl now shows the status of each
database separately.

If one of the connections is permanently broken, all the databases
will be switched to active.  This is done in order to preserve the
old behavior where we had only one connection.

Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
---
 ovsdb/ovsdb-server.c |  74 +++--
 ovsdb/replication.c  | 676 ++++++++++++++++++++-----------------------
 ovsdb/replication.h  |  36 +--
 3 files changed, 384 insertions(+), 402 deletions(-)
diff mbox series

Patch

diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 7e95b3813..9a3b0add1 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -166,12 +166,12 @@  ovsdb_replication_init(const char *sync_from, const char *exclude,
                        struct shash *all_dbs, const struct uuid *server_uuid,
                        int probe_interval)
 {
-    replication_init(sync_from, exclude, server_uuid, probe_interval);
     struct shash_node *node;
     SHASH_FOR_EACH (node, all_dbs) {
         struct db *db = node->data;
         if (node->name[0] != '_' && db->db) {
-            replication_add_local_db(node->name, db->db);
+            replication_set_db(db->db, sync_from, exclude,
+                               server_uuid, probe_interval);
         }
     }
 }
@@ -228,11 +228,20 @@  main_loop(struct server_config *config,
         report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
         ovsdb_jsonrpc_server_run(jsonrpc);
 
+        replication_run();
         if (*is_backup) {
-            replication_run();
-            if (!replication_is_alive()) {
-                disconnect_active_server();
-                *is_backup = false;
+            SHASH_FOR_EACH (node, all_dbs) {
+                struct db *db = node->data;
+                if (db->db->name[0] != '_' && !replication_is_alive(db->db)) {
+                    *is_backup = false;
+                    break;
+                }
+            }
+            if (!*is_backup) {
+                SHASH_FOR_EACH (node, all_dbs) {
+                    struct db *db = node->data;
+                    replication_remove_db(db->db);
+                }
             }
         }
 
@@ -283,10 +292,8 @@  main_loop(struct server_config *config,
         update_server_status(all_dbs);
 
         memory_wait();
-        if (*is_backup) {
-            replication_wait();
-        }
 
+        replication_wait();
         ovsdb_relay_wait();
 
         ovsdb_jsonrpc_server_wait(jsonrpc);
@@ -518,7 +525,7 @@  main(int argc, char *argv[])
                              &server_config);
     unixctl_command_register("ovsdb-server/get-sync-exclude-tables", "",
                              0, 0, ovsdb_server_get_sync_exclude_tables,
-                             NULL);
+                             &server_config);
     unixctl_command_register("ovsdb-server/sync-status", "",
                              0, 0, ovsdb_server_get_sync_status,
                              &server_config);
@@ -606,6 +613,9 @@  close_db(struct server_config *config, struct db *db, char *comment)
         if (db->db->is_relay) {
             ovsdb_relay_del_db(db->db);
         }
+        if (*config->is_backup) {
+            replication_remove_db(db->db);
+        }
         ovsdb_destroy(db->db);
         free(db->filename);
         free(db);
@@ -1500,8 +1510,12 @@  ovsdb_server_disconnect_active_ovsdb_server(struct unixctl_conn *conn,
                                             void *config_)
 {
     struct server_config *config = config_;
+    struct shash_node *node;
 
-    disconnect_active_server();
+    SHASH_FOR_EACH (node, config->all_dbs) {
+        struct db *db = node->data;
+        replication_remove_db(db->db);
+    }
     *config->is_backup = false;
     save_config(config);
     unixctl_command_reply(conn, NULL);
@@ -1520,7 +1534,11 @@  ovsdb_server_set_active_ovsdb_server_probe_interval(struct unixctl_conn *conn,
         *config->replication_probe_interval = probe_interval;
         save_config(config);
         if (*config->is_backup) {
-            replication_set_probe_interval(probe_interval);
+            const struct uuid *server_uuid;
+            server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc);
+            ovsdb_replication_init(*config->sync_from, *config->sync_exclude,
+                                   config->all_dbs, server_uuid,
+                                   *config->replication_probe_interval);
         }
         unixctl_command_reply(conn, NULL);
     } else {
@@ -1557,7 +1575,7 @@  ovsdb_server_set_sync_exclude_tables(struct unixctl_conn *conn,
 {
     struct server_config *config = config_;
 
-    char *err = set_excluded_tables(argv[1], true);
+    char *err = parse_excluded_tables(argv[1]);
     if (!err) {
         free(*config->sync_exclude);
         *config->sync_exclude = xstrdup(argv[1]);
@@ -1569,7 +1587,6 @@  ovsdb_server_set_sync_exclude_tables(struct unixctl_conn *conn,
                                    config->all_dbs, server_uuid,
                                    *config->replication_probe_interval);
         }
-        err = set_excluded_tables(argv[1], false);
     }
     unixctl_command_reply(conn, err);
     free(err);
@@ -1579,11 +1596,11 @@  static void
 ovsdb_server_get_sync_exclude_tables(struct unixctl_conn *conn,
                                      int argc OVS_UNUSED,
                                      const char *argv[] OVS_UNUSED,
-                                     void *arg_ OVS_UNUSED)
+                                     void *config_)
 {
-    char *reply = get_excluded_tables();
-    unixctl_command_reply(conn, reply);
-    free(reply);
+    struct server_config *config = config_;
+
+    unixctl_command_reply(conn, *config->sync_exclude);
 }
 
 static void
@@ -1842,13 +1859,6 @@  remove_db(struct server_config *config, struct shash_node *node, char *comment)
     shash_delete(config->all_dbs, node);
 
     save_config(config);
-    if (*config->is_backup) {
-        const struct uuid *server_uuid;
-        server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc);
-        ovsdb_replication_init(*config->sync_from, *config->sync_exclude,
-                               config->all_dbs, server_uuid,
-                               *config->replication_probe_interval);
-    }
 }
 
 static void
@@ -1990,7 +2000,17 @@  ovsdb_server_get_sync_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
     ds_put_format(&ds, "state: %s\n", is_backup ? "backup" : "active");
 
     if (is_backup) {
-        ds_put_and_free_cstr(&ds, replication_status());
+        const struct shash_node **db_nodes = shash_sort(config->all_dbs);
+
+        for (size_t i = 0; i < shash_count(config->all_dbs); i++) {
+            const struct db *db = db_nodes[i]->data;
+
+            if (db->db && db->db->name[0] != '_') {
+                ds_put_and_free_cstr(&ds, replication_status(db->db));
+                ds_put_char(&ds, '\n');
+            }
+        }
+        free(db_nodes);
     }
 
     unixctl_command_reply(conn, ds_cstr(&ds));
@@ -2154,7 +2174,7 @@  parse_options(int argc, char *argv[],
             break;
 
         case OPT_SYNC_EXCLUDE: {
-            char *err = set_excluded_tables(optarg, false);
+            char *err = parse_excluded_tables(optarg);
             if (err) {
                 ovs_fatal(0, "%s", err);
             }
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 477c69d70..d0d48aad5 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -38,16 +38,7 @@ 
 
 VLOG_DEFINE_THIS_MODULE(replication);
 
-static char *sync_from;
 static struct uuid server_uuid;
-static struct jsonrpc_session *session;
-static unsigned int session_seqno = UINT_MAX;
-
-static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
-static void add_monitored_table(struct ovsdb_table_schema *table,
-                                struct json *monitor_requests);
-
-static struct ovsdb_error *reset_database(struct ovsdb *db);
 
 static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
 static struct ovsdb_error *process_table_update(struct json *table_update,
@@ -55,27 +46,6 @@  static struct ovsdb_error *process_table_update(struct json *table_update,
                                                 struct ovsdb *database,
                                                 struct ovsdb_txn *txn);
 
-/* Maps from db name to sset of table names. */
-static struct shash excluded_tables = SHASH_INITIALIZER(&excluded_tables);
-
-static void excluded_tables_clear(void);
-static void excluded_tables_add(const char *database, const char *table);
-static bool excluded_tables_find(const char *database, const char *table);
-
-
-/* Keep track of request IDs of all outstanding OVSDB requests. */
-static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
-
-struct request_ids_hmap_node {
-    struct hmap_node hmap;
-    struct json *request_id;
-    struct ovsdb *db;          /* associated database */
-};
-void request_ids_add(const struct json *id, struct ovsdb *db);
-bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
-static void request_ids_destroy(void);
-void request_ids_clear(void);
-
 enum ovsdb_replication_state {
     RPL_S_INIT,
     RPL_S_SERVER_ID_REQUESTED,
@@ -85,168 +55,231 @@  enum ovsdb_replication_state {
     RPL_S_REPLICATING,
     RPL_S_ERR /* Error, no longer replicating. */
 };
-static enum ovsdb_replication_state state;
 
-
 struct replication_db {
     struct ovsdb *db;
+
     bool schema_version_higher;
      /* Points to the schema received from the active server if
       * the local db schema version is higher. NULL otherwise. */
     struct ovsdb_schema *active_db_schema;
+
+    char *sync_from;
+    char *excluded_tables_str;
+    struct sset excluded_tables;
+
+    struct json *request_id;  /* Id of the outstanding OVSDB request. */
+
+    struct jsonrpc_session *session;
+    unsigned int session_seqno;
+
+    enum ovsdb_replication_state state;
 };
 
 static bool is_replication_possible(struct ovsdb_schema *local_db_schema,
                                     struct ovsdb_schema *active_db_schema);
 
+static struct jsonrpc_msg *create_monitor_request(struct replication_db *,
+                                                  struct ovsdb_schema *);
+static void add_monitored_table(struct ovsdb_table_schema *table,
+                                struct json *monitor_requests);
+
+
 /* All DBs known to ovsdb-server.  The actual replication dbs are stored
  * in 'replication dbs', which is a subset of all dbs and remote dbs whose
  * schema matches.  */
-static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
-static struct shash *replication_dbs;
+static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
+
+static void replication_db_destroy(struct replication_db *);
+static struct ovsdb_error *reset_database(struct replication_db *);
 
-static struct shash *replication_dbs_create(void);
-static void replication_dbs_destroy(void);
 /* Find 'struct ovsdb' by name within 'replication_dbs' */
 static struct replication_db *find_db(const char *db_name);
+
+static char *set_excluded_tables(struct replication_db *, const char *excluded)
+    OVS_WARN_UNUSED_RESULT;
+
+static void request_id_set(struct replication_db *, const struct json *id);
+static void request_id_clear(struct replication_db *);
+static bool request_id_compare_and_free(struct replication_db *,
+                                        const struct json *id);
 
 
 void
-replication_init(const char *sync_from_, const char *exclude_tables,
-                 const struct uuid *server, int probe_interval)
+replication_set_db(struct ovsdb *db, const char *sync_from,
+                   const char *exclude_tables, const struct uuid *server,
+                   int probe_interval)
 {
-    free(sync_from);
-    sync_from = xstrdup(sync_from_);
-    /* Caller should have verified that the 'exclude_tables' is
-     * parseable. An error here is unexpected. */
-    ovs_assert(!set_excluded_tables(exclude_tables, false));
+    struct replication_db *rdb = find_db(db->name);
 
-    replication_dbs_destroy();
+    if (uuid_is_zero(&server_uuid)) {
+        /* Keep a copy of local server uuid.  */
+        server_uuid = *server;
+    } else {
+        ovs_assert(uuid_equals(&server_uuid, server));
+    }
+
+    ovs_assert(sync_from);
+
+    if (rdb
+        && nullable_string_is_equal(rdb->excluded_tables_str, exclude_tables)
+        && nullable_string_is_equal(rdb->sync_from, sync_from)) {
+        jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
+        return;
+    }
 
-    shash_clear(&local_dbs);
-    if (session) {
-        jsonrpc_session_close(session);
+    if (!rdb) {
+        rdb = xzalloc(sizeof *rdb);
+        rdb->db = db;
+        sset_init(&rdb->excluded_tables);
+        rdb->schema_version_higher = false;
+        shash_add(&replication_dbs, db->name, rdb);
+    } else {
+        replication_db_destroy(rdb);
     }
 
-    session = jsonrpc_session_open(sync_from, true);
-    session_seqno = UINT_MAX;
+    rdb->sync_from = xstrdup(sync_from);
+    rdb->excluded_tables_str = nullable_xstrdup(exclude_tables);
+    /* Caller should have verified that the 'exclude_tables' is
+     * parseable. An error here is unexpected. */
+    ovs_assert(!set_excluded_tables(rdb, exclude_tables));
 
-    jsonrpc_session_set_probe_interval(session, probe_interval);
+    rdb->session = jsonrpc_session_open(rdb->sync_from, true);
+    rdb->session_seqno = UINT_MAX;
 
-    /* Keep a copy of local server uuid.  */
-    server_uuid = *server;
+    jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
 
-    state = RPL_S_INIT;
+    rdb->state = RPL_S_INIT;
 }
 
 void
-replication_add_local_db(const char *database, struct ovsdb *db)
+replication_remove_db(const struct ovsdb *db)
 {
-    shash_add_assert(&local_dbs, database, db);
+    struct replication_db *rdb;
+
+    rdb = shash_find_and_delete(&replication_dbs, db->name);
+    if (rdb) {
+        replication_db_destroy(rdb);
+        free(rdb);
+    }
 }
 
-static void
-send_schema_requests(const struct json *result)
+static bool
+json_array_contains_string(const struct json *js, const char *str)
 {
-    for (size_t i = 0; i < result->array.n; i++) {
-        const struct json *name = result->array.elems[i];
-        if (name->type == JSON_STRING) {
-            /* Send one schema request for each remote DB. */
-            const char *db_name = json_string(name);
-            struct replication_db *rdb = find_db(db_name);
-            if (rdb) {
-                struct jsonrpc_msg *request =
-                    jsonrpc_create_request(
-                        "get_schema",
-                        json_array_create_1(
-                            json_string_create(db_name)),
-                        NULL);
-
-                request_ids_add(request->id, rdb->db);
-                jsonrpc_session_send(session, request);
-            }
+    bool found = false;
+
+    for (size_t i = 0; i < js->array.n; i++) {
+        const struct json *elem = js->array.elems[i];
+
+        if (elem->type == JSON_STRING && !strcmp(json_string(elem), str)) {
+            found = true;
+            break;
         }
     }
+    return found;
 }
 
-void
-replication_run(void)
+static void
+send_schema_request(struct replication_db *rdb)
+{
+    struct jsonrpc_msg *request =
+        jsonrpc_create_request(
+                "get_schema",
+                json_array_create_1(json_string_create(rdb->db->name)),
+                NULL);
+
+    request_id_set(rdb, request->id);
+    jsonrpc_session_send(rdb->session, request);
+}
+
+static void
+replication_run_db(struct replication_db *rdb)
 {
-    if (!session) {
+    if (!rdb->session) {
         return;
     }
 
-    jsonrpc_session_run(session);
+    jsonrpc_session_run(rdb->session);
 
-    for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
+    for (int i = 0; i < 50; i++) {
         struct jsonrpc_msg *msg;
         unsigned int seqno;
 
-        seqno = jsonrpc_session_get_seqno(session);
-        if (seqno != session_seqno || state == RPL_S_INIT) {
-            session_seqno = seqno;
-            request_ids_clear();
+        if (!jsonrpc_session_is_connected(rdb->session)) {
+            break;
+        }
+
+        seqno = jsonrpc_session_get_seqno(rdb->session);
+        if (seqno != rdb->session_seqno || rdb->state == RPL_S_INIT) {
+            rdb->session_seqno = seqno;
+            request_id_clear(rdb);
+
             struct jsonrpc_msg *request;
             request = jsonrpc_create_request("get_server_id",
                                              json_array_create_empty(), NULL);
-            request_ids_add(request->id, NULL);
-            jsonrpc_session_send(session, request);
+            request_id_set(rdb, request->id);
+            jsonrpc_session_send(rdb->session, request);
 
-            state = RPL_S_SERVER_ID_REQUESTED;
-            VLOG_DBG("send server ID request.");
+            rdb->state = RPL_S_SERVER_ID_REQUESTED;
+            VLOG_DBG("%s: send server ID request.", rdb->db->name);
         }
 
-        msg = jsonrpc_session_recv(session);
+        msg = jsonrpc_session_recv(rdb->session);
         if (!msg) {
             continue;
         }
 
-        if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
+        if (msg->type == JSONRPC_NOTIFY && rdb->state != RPL_S_ERR
             && !strcmp(msg->method, "update")) {
             if (msg->params->type == JSON_ARRAY
                 && msg->params->array.n == 2
                 && msg->params->array.elems[0]->type == JSON_STRING) {
                 char *db_name = msg->params->array.elems[0]->string;
-                struct replication_db *rdb = find_db(db_name);
-                if (rdb) {
+
+                if (!strcmp(db_name, rdb->db->name)) {
                     struct ovsdb_error *error;
                     error = process_notification(msg->params->array.elems[1],
                                                  rdb->db);
                     if (error) {
                         ovsdb_error_assert(error);
-                        state = RPL_S_ERR;
+                        rdb->state = RPL_S_ERR;
                     }
+                } else {
+                    VLOG_WARN("%s: received update for unexpected database %s",
+                              rdb->db->name, db_name);
+                    rdb->state = RPL_S_ERR;
                 }
             }
         } else if (msg->type == JSONRPC_REPLY) {
-            struct replication_db *rdb;
-            struct ovsdb *db;
-            if (!request_ids_lookup_and_free(msg->id, &db)) {
-                VLOG_WARN("received unexpected reply");
+            if (!request_id_compare_and_free(rdb, msg->id)) {
+                VLOG_WARN("%s: received unexpected reply.", rdb->db->name);
                 goto next;
             }
 
-            switch (state) {
+            switch (rdb->state) {
             case RPL_S_SERVER_ID_REQUESTED: {
                 struct uuid uuid;
                 if (msg->result->type != JSON_STRING ||
                     !uuid_from_string(&uuid, json_string(msg->result))) {
                     struct ovsdb_error *error;
                     error = ovsdb_error("get_server_id failed",
-                                        "Server ID is not valid UUID");
+                                        "%s: Server ID is not valid UUID",
+                                        rdb->db->name);
 
                     ovsdb_error_assert(error);
-                    state = RPL_S_ERR;
+                    rdb->state = RPL_S_ERR;
                     break;
                 }
 
                 if (uuid_equals(&uuid, &server_uuid)) {
                     struct ovsdb_error *error;
                     error = ovsdb_error("Server ID check failed",
-                                        "Self replicating is not allowed");
+                                        "%s: Self replicating is not allowed",
+                                        rdb->db->name);
 
                     ovsdb_error_assert(error);
-                    state = RPL_S_ERR;
+                    rdb->state = RPL_S_ERR;
                     break;
                 }
 
@@ -254,25 +287,32 @@  replication_run(void)
                 request = jsonrpc_create_request("list_dbs",
                                                  json_array_create_empty(),
                                                  NULL);
-                request_ids_add(request->id, NULL);
-                jsonrpc_session_send(session, request);
+                request_id_set(rdb, request->id);
+                jsonrpc_session_send(rdb->session, request);
 
-                replication_dbs_destroy();
-                replication_dbs = replication_dbs_create();
-                state = RPL_S_DB_REQUESTED;
+                rdb->state = RPL_S_DB_REQUESTED;
                 break;
             }
             case RPL_S_DB_REQUESTED:
                 if (msg->result->type != JSON_ARRAY) {
                     struct ovsdb_error *error;
                     error = ovsdb_error("list_dbs failed",
-                                        "list_dbs response is not array");
+                                        "%s: list_dbs response is not array",
+                                        rdb->db->name);
+                    ovsdb_error_assert(error);
+                    rdb->state = RPL_S_ERR;
+                } else if (!json_array_contains_string(msg->result,
+                            rdb->db->name)) {
+                    struct ovsdb_error *error;
+                    error = ovsdb_error("list_dbs failed",
+                                        "%s: database name is not in the list",
+                                        rdb->db->name);
                     ovsdb_error_assert(error);
-                    state = RPL_S_ERR;
+                    rdb->state = RPL_S_ERR;
                 } else {
-                    send_schema_requests(msg->result);
-                    VLOG_DBG("Send schema requests");
-                    state = RPL_S_SCHEMA_REQUESTED;
+                    send_schema_request(rdb);
+                    VLOG_DBG("%s: send schema request.", rdb->db->name);
+                    rdb->state = RPL_S_SCHEMA_REQUESTED;
                 }
                 break;
 
@@ -283,19 +323,22 @@  replication_run(void)
                 error = ovsdb_schema_from_json(msg->result, &schema);
                 if (error) {
                     ovsdb_error_assert(error);
-                    state = RPL_S_ERR;
+                    rdb->state = RPL_S_ERR;
+                    break;
                 }
 
-                rdb = find_db(schema->name);
-                if (!rdb) {
+                if (strcmp(rdb->db->name, schema->name)) {
                     /* Unexpected schema. */
-                    VLOG_WARN("unexpected schema %s", schema->name);
-                    state = RPL_S_ERR;
+                    VLOG_WARN("%s: unexpected schema %s.",
+                              rdb->db->name, schema->name);
+                    rdb->state = RPL_S_ERR;
+                    ovsdb_schema_destroy(schema);
+                    break;
                 } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
                     /* Schmea version mismatch. */
-                    VLOG_INFO("Schema version mismatch, checking if %s can "
-                              "still be replicated or not.",
-                              schema->name);
+                    VLOG_INFO("%s: Schema version mismatch, checking if %s can"
+                              " still be replicated or not.",
+                              rdb->db->name, schema->name);
                     if (is_replication_possible(rdb->db->schema, schema)) {
                         VLOG_INFO("%s can be replicated.", schema->name);
                         rdb->schema_version_higher = true;
@@ -305,68 +348,48 @@  replication_run(void)
                         rdb->active_db_schema = schema;
                     } else {
                         VLOG_INFO("%s cannot be replicated.", schema->name);
-                        struct replication_db *r =
-                            shash_find_and_delete(replication_dbs,
-                                                  schema->name);
-                        if (r->active_db_schema) {
-                            ovsdb_schema_destroy(r->active_db_schema);
-                        }
-                        free(r);
+                        rdb->state = RPL_S_ERR;
                         ovsdb_schema_destroy(schema);
+                        break;
                     }
                 } else {
                     ovsdb_schema_destroy(schema);
                 }
 
-                /* After receiving schemas, reset the local databases that
-                 * will be monitored and send out monitor requests for them. */
-                if (hmap_is_empty(&request_ids)) {
-                    struct shash_node *node;
-
-                    if (shash_is_empty(replication_dbs)) {
-                        VLOG_WARN("Nothing to replicate.");
-                        state = RPL_S_ERR;
-                    } else {
-                        SHASH_FOR_EACH (node, replication_dbs) {
-                            rdb = node->data;
-                            struct jsonrpc_msg *request =
-                                create_monitor_request(
-                                    rdb->schema_version_higher ?
-                                    rdb->active_db_schema : rdb->db->schema);
-
-                            request_ids_add(request->id, rdb->db);
-                            jsonrpc_session_send(session, request);
-                            VLOG_DBG("Send monitor requests");
-                            state = RPL_S_MONITOR_REQUESTED;
-                        }
-                    }
-                }
+                /* Send out a monitor request. */
+                struct jsonrpc_msg *request =
+                    create_monitor_request(rdb, rdb->schema_version_higher
+                                                ? rdb->active_db_schema
+                                                : rdb->db->schema);
+
+                request_id_set(rdb, request->id);
+                jsonrpc_session_send(rdb->session, request);
+                VLOG_DBG("%s: send monitor request.", rdb->db->name);
+                rdb->state = RPL_S_MONITOR_REQUESTED;
                 break;
             }
 
             case RPL_S_MONITOR_REQUESTED: {
                 /* Reply to monitor requests. */
                 struct ovsdb_error *error;
-                VLOG_INFO("Monitor request received. Resetting the database");
+                VLOG_INFO("%s: Monitor reply received. "
+                          "Resetting the database.", rdb->db->name);
                 /* Resetting the database here has few risks. If the
                  * process_notification() fails, the database is completely
                  * lost locally. In case that node becomes active, then
                  * there is a chance of complete data loss in the active/standy
                  * cluster. */
-                error = reset_database(db);
+                error = reset_database(rdb);
                 if (!error) {
-                    error = process_notification(msg->result, db);
+                    error = process_notification(msg->result, rdb->db);
                 }
                 if (error) {
                     ovsdb_error_assert(error);
-                    state = RPL_S_ERR;
+                    rdb->state = RPL_S_ERR;
                 } else {
-                    /* Transition to replicating state after receiving
-                     * all replies of "monitor" requests. */
-                    if (hmap_is_empty(&request_ids)) {
-                        VLOG_DBG("Listening to monitor updates");
-                        state = RPL_S_REPLICATING;
-                    }
+                    VLOG_DBG("%s: Listening to monitor updates.",
+                             rdb->db->name);
+                    rdb->state = RPL_S_REPLICATING;
                 }
                 break;
             }
@@ -378,7 +401,7 @@  replication_run(void)
             case RPL_S_INIT:
             case RPL_S_REPLICATING:
             default:
-                OVS_NOT_REACHED();
+                VLOG_WARN("%s: received unexpected reply.", rdb->db->name);
             }
         }
     next:
@@ -386,24 +409,40 @@  replication_run(void)
     }
 }
 
+void
+replication_run(void)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &replication_dbs) {
+        replication_run_db(node->data);
+    }
+}
+
 void
 replication_wait(void)
 {
-    if (session) {
-        jsonrpc_session_wait(session);
-        jsonrpc_session_recv_wait(session);
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &replication_dbs) {
+        struct replication_db *rdb = node->data;
+
+        if (rdb->session) {
+            jsonrpc_session_wait(rdb->session);
+            jsonrpc_session_recv_wait(rdb->session);
+        }
     }
 }
 
-/* Parse 'excluded' to rebuild 'excluded_tables'.  If 'dryrun' is false, the
- * current set of excluded tables will be wiped out, regardless of whether
- * 'excluded' can be parsed.  If 'dryrun' is true, only parses 'excluded' and
+/* Parse 'excluded' to rebuild 'rdb->excluded_tables'.  If 'rdb' is not NULL,
+ * the current set of excluded tables will be wiped out, regardless of whether
+ * 'excluded' can be parsed.  If 'rdb' is NULL, only parses 'excluded' and
  * reports any errors, without modifying the list of exclusions.
  *
- * On error, returns the error string, which the caller is
- * responsible for freeing. Returns NULL otherwise. */
-char * OVS_WARN_UNUSED_RESULT
-set_excluded_tables(const char *excluded, bool dryrun)
+ * On error, returns the error string, which the caller is responsible for
+ * freeing.  Returns NULL otherwise. */
+static char * OVS_WARN_UNUSED_RESULT
+set_excluded_tables__(struct replication_db *rdb, const char *excluded)
 {
     struct sset set = SSET_INITIALIZER(&set);
     char *err = NULL;
@@ -411,17 +450,22 @@  set_excluded_tables(const char *excluded, bool dryrun)
     if (excluded) {
         const char *longname;
 
-        if (!dryrun) {
-            /* Can only add to an empty shash. */
-            excluded_tables_clear();
+        if (rdb) {
+            /* Can only add to an empty set. */
+            sset_clear(&rdb->excluded_tables);
         }
 
         sset_from_delimited_string(&set, excluded, " ,");
         SSET_FOR_EACH (longname, &set) {
+            if (rdb && !strchr(longname, ':')) {
+                sset_add(&rdb->excluded_tables, longname);
+                continue;
+            }
+
             char *database = xstrdup(longname), *table = NULL;
             strtok_r(database, ":", &table);
-            if (table && !dryrun) {
-                excluded_tables_add(database, table);
+            if (table && rdb && !strcmp(rdb->db->name, database)) {
+                sset_add(&rdb->excluded_tables, table);
             }
 
             free(database);
@@ -434,120 +478,74 @@  set_excluded_tables(const char *excluded, bool dryrun)
 
 done:
     sset_destroy(&set);
-    if (err && !dryrun) {
+    if (err && rdb) {
         /* On error, destroy the partially built 'excluded_tables'. */
-        excluded_tables_clear();
+        sset_clear(&rdb->excluded_tables);
     }
     return err;
 }
 
 char * OVS_WARN_UNUSED_RESULT
-get_excluded_tables(void)
+parse_excluded_tables(const char *excluded)
 {
-    struct shash_node *node;
-    struct sset set = SSET_INITIALIZER(&set);
-
-    SHASH_FOR_EACH (node, &excluded_tables) {
-        const char *database = node->name;
-        const char *table;
-        struct sset *tables = node->data;
-
-        SSET_FOR_EACH (table, tables) {
-            sset_add_and_free(&set, xasprintf("%s:%s", database, table));
-        }
-    }
-
-    /* Output the table list in an sorted order, so that
-     * the output string will not depend on the hash function
-     * that used to implement the hmap data structure. This is
-     * only useful for writting unit tests.  */
-    const char **sorted = sset_sort(&set);
-    struct ds ds = DS_EMPTY_INITIALIZER;
-    size_t i;
-    for (i = 0; i < sset_count(&set); i++) {
-        ds_put_format(&ds, "%s,", sorted[i]);
-    }
-
-    ds_chomp(&ds, ',');
-
-    free(sorted);
-    sset_destroy(&set);
-
-    return ds_steal_cstr(&ds);
+    return set_excluded_tables__(NULL, excluded);
 }
 
-static void
-excluded_tables_clear(void)
+static char * OVS_WARN_UNUSED_RESULT
+set_excluded_tables(struct replication_db *rdb, const char *excluded)
 {
-    struct shash_node *node;
-    SHASH_FOR_EACH (node, &excluded_tables) {
-        struct sset *tables = node->data;
-        sset_destroy(tables);
-    }
-
-    shash_clear_free_data(&excluded_tables);
+    return set_excluded_tables__(rdb, excluded);
 }
 
-static void
-excluded_tables_add(const char *database, const char *table)
+char * OVS_WARN_UNUSED_RESULT
+get_excluded_tables(const struct ovsdb *db)
 {
-    struct sset *tables = shash_find_data(&excluded_tables, database);
+    const struct replication_db *rdb = find_db(db->name);
 
-    if (!tables) {
-        tables = xmalloc(sizeof *tables);
-        sset_init(tables);
-        shash_add(&excluded_tables, database, tables);
+    if (!rdb) {
+        return xstrdup("");
     }
 
-    sset_add(tables, table);
-}
+    struct sset set = SSET_INITIALIZER(&set);
+    const char *table;
+    char *result;
 
-static bool
-excluded_tables_find(const char *database, const char *table)
-{
-    struct sset *tables = shash_find_data(&excluded_tables, database);
-    return tables && sset_contains(tables, table);
-}
+    SSET_FOR_EACH (table, &rdb->excluded_tables) {
+        sset_add_and_free(&set, xasprintf("%s:%s", rdb->db->name, table));
+    }
 
-void
-disconnect_active_server(void)
-{
-    jsonrpc_session_close(session);
-    session = NULL;
+    result = sset_join(&set, ",", "");
+    sset_destroy(&set);
+
+    return result;
 }
 
 void
 replication_destroy(void)
 {
-    excluded_tables_clear();
-    shash_destroy(&excluded_tables);
+    struct shash_node *node;
 
-    if (sync_from) {
-        free(sync_from);
-        sync_from = NULL;
+    SHASH_FOR_EACH (node, &replication_dbs) {
+        replication_db_destroy(node->data);
     }
-
-    request_ids_destroy();
-    replication_dbs_destroy();
-
-    shash_destroy(&local_dbs);
+    shash_destroy_free_data(&replication_dbs);
 }
 
 static struct replication_db *
 find_db(const char *db_name)
 {
-    return shash_find_data(replication_dbs, db_name);
+    return shash_find_data(&replication_dbs, db_name);
 }
 
 static struct ovsdb_error *
-reset_database(struct ovsdb *db)
+reset_database(struct replication_db *rdb)
 {
-    struct ovsdb_txn *txn = ovsdb_txn_create(db);
+    struct ovsdb_txn *txn = ovsdb_txn_create(rdb->db);
     struct shash_node *table_node;
 
-    SHASH_FOR_EACH (table_node, &db->tables) {
+    SHASH_FOR_EACH (table_node, &rdb->db->tables) {
         /* Delete all rows if the table is not excluded. */
-        if (!excluded_tables_find(db->schema->name, table_node->name)) {
+        if (!sset_contains(&rdb->excluded_tables, table_node->name)) {
             struct ovsdb_table *table = table_node->data;
             struct ovsdb_row *row;
             HMAP_FOR_EACH_SAFE (row, hmap_node, &table->rows) {
@@ -565,7 +563,7 @@  reset_database(struct ovsdb *db)
  * Caller is responsible for disposing 'request'.
  */
 static struct jsonrpc_msg *
-create_monitor_request(struct ovsdb_schema *schema)
+create_monitor_request(struct replication_db *rdb, struct ovsdb_schema *schema)
 {
     struct jsonrpc_msg *request;
     struct json *monitor;
@@ -579,7 +577,7 @@  create_monitor_request(struct ovsdb_schema *schema)
         struct ovsdb_table_schema *table = nodes[j]->data;
 
         /* Monitor all tables not excluded. */
-        if (!excluded_tables_find(db_name, table->name)) {
+        if (!sset_contains(&rdb->excluded_tables, table->name)) {
             add_monitored_table(table, monitor_request);
         }
     }
@@ -689,114 +687,76 @@  process_table_update(struct json *table_update, const char *table_name,
     return NULL;
 }
 
-void
-request_ids_add(const struct json *id, struct ovsdb *db)
+static void
+request_id_set(struct replication_db *rdb, const struct json *id)
 {
-    struct request_ids_hmap_node *node = xmalloc(sizeof *node);
+    ovs_assert(!rdb->request_id);
+    rdb->request_id = json_clone(id);
+}
 
-    node->request_id = json_clone(id);
-    node->db = db;
-    hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
+static void
+request_id_clear(struct replication_db *rdb)
+{
+    json_destroy(rdb->request_id);
+    rdb->request_id = NULL;
 }
 
-/* Look up 'id' from 'request_ids', if found, remove the found id from
- * 'request_ids' and free its memory. If not found, 'request_ids' does
- * not change.  Sets '*db' to the database for the request (NULL if not
- * found).
+/* Compare 'id' with sent 'request_id'.  If it mtches, clear the current
+ * 'request_id'.  If it doesn't match, 'request_id' does not change.
  *
- * Return true if 'id' is found, false otherwise.
+ * Return true if 'id' matches, false otherwise.
  */
-bool
-request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
+static bool
+request_id_compare_and_free(struct replication_db *rdb, const struct json *id)
 {
-    struct request_ids_hmap_node *node;
-
-    HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
-        if (json_equal(id, node->request_id)) {
-            hmap_remove(&request_ids, &node->hmap);
-            *db = node->db;
-            json_destroy(node->request_id);
-            free(node);
-            return true;
-        }
+    if (rdb->request_id && json_equal(id, rdb->request_id)) {
+        request_id_clear(rdb);
+        return true;
     }
-
-    *db = NULL;
     return false;
 }
 
 static void
-request_ids_destroy(void)
+replication_db_destroy(struct replication_db *rdb)
 {
-    struct request_ids_hmap_node *node;
-
-    HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
-        json_destroy(node->request_id);
-        free(node);
+    if (!rdb) {
+        return;
     }
-    hmap_destroy(&request_ids);
-}
 
-void
-request_ids_clear(void)
-{
-    request_ids_destroy();
-    hmap_init(&request_ids);
-}
+    free(rdb->sync_from);
+    rdb->sync_from = NULL;
 
-static struct shash *
-replication_dbs_create(void)
-{
-    struct shash *new = xmalloc(sizeof *new);
-    shash_init(new);
+    free(rdb->excluded_tables_str);
+    rdb->excluded_tables_str = NULL;
+    sset_destroy(&rdb->excluded_tables);
 
-    struct shash_node *node;
-    SHASH_FOR_EACH (node, &local_dbs) {
-        struct replication_db *repl_db = xmalloc(sizeof *repl_db);
-        repl_db->db = node->data;
-        repl_db->schema_version_higher = false;
-        repl_db->active_db_schema = NULL;
-        shash_add(new, node->name, repl_db);
-    }
+    request_id_clear(rdb);
 
-    return new;
-}
-
-static void
-replication_dbs_destroy(void)
-{
-    if (!replication_dbs) {
-        return;
+    if (rdb->session) {
+        jsonrpc_session_close(rdb->session);
+        rdb->session = NULL;
     }
 
-    struct shash_node *node;
-
-    SHASH_FOR_EACH_SAFE (node, replication_dbs) {
-        hmap_remove(&replication_dbs->map, &node->node);
-        struct replication_db *rdb = node->data;
-        if (rdb->active_db_schema) {
-            ovsdb_schema_destroy(rdb->active_db_schema);
-        }
-        free(rdb);
-        free(node->name);
-        free(node);
+    if (rdb->active_db_schema) {
+        ovsdb_schema_destroy(rdb->active_db_schema);
+        rdb->active_db_schema = NULL;
     }
 
-    hmap_destroy(&replication_dbs->map);
-    free(replication_dbs);
-    replication_dbs = NULL;
+    rdb->schema_version_higher = false;
 }
 
 /* Return true if replication just started or is ongoing.
  * Return false if the connection failed, or the replication
  * was not able to start. */
 bool
-replication_is_alive(void)
+replication_is_alive(const struct ovsdb *db)
 {
-    if (session) {
-        return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
+    const struct replication_db *rdb = find_db(db->name);
+
+    if (!rdb || !rdb->session) {
+        return false;
     }
-    return false;
+    return jsonrpc_session_is_alive(rdb->session) && rdb->state != RPL_S_ERR;
 }
 
 /* Return the last error reported on a connection by 'session'. The
@@ -806,60 +766,60 @@  replication_is_alive(void)
  * Return a negative value if replication session has error, or the
  * replication was not able to start.  */
 int
-replication_get_last_error(void)
+replication_get_last_error(const struct ovsdb *db)
 {
+    const struct replication_db *rdb = find_db(db->name);
     int err = 0;
 
-    if (session) {
-        err = jsonrpc_session_get_last_error(session);
+    if (rdb && rdb->session) {
+        err = jsonrpc_session_get_last_error(rdb->session);
         if (!err) {
-            err = (state == RPL_S_ERR) ? ENOENT : 0;
+            err = (rdb->state == RPL_S_ERR) ? ENOENT : 0;
         }
     }
 
     return err;
 }
 
-char *
-replication_status(void)
+char * OVS_WARN_UNUSED_RESULT
+replication_status(const struct ovsdb *db)
 {
-    bool alive = session && jsonrpc_session_is_alive(session);
+    const struct replication_db *rdb = find_db(db->name);
+
+    if (!rdb) {
+        return xasprintf("%s is not configured for replication", db->name);
+    }
+
+    bool alive = rdb->session && jsonrpc_session_is_alive(rdb->session);
     struct ds ds = DS_EMPTY_INITIALIZER;
 
+    ds_put_format(&ds, "database: %s\n", db->name);
     if (alive) {
-        switch(state) {
+        switch (rdb->state) {
         case RPL_S_INIT:
         case RPL_S_SERVER_ID_REQUESTED:
         case RPL_S_DB_REQUESTED:
         case RPL_S_SCHEMA_REQUESTED:
         case RPL_S_MONITOR_REQUESTED:
-            ds_put_format(&ds, "connecting: %s", sync_from);
+            ds_put_format(&ds, "connecting: %s", rdb->sync_from);
             break;
         case RPL_S_REPLICATING: {
-            struct shash_node *node;
-
-            ds_put_format(&ds, "replicating: %s\n", sync_from);
-            ds_put_cstr(&ds, "database:");
-            SHASH_FOR_EACH (node, replication_dbs) {
-                ds_put_format(&ds, " %s,", node->name);
-            }
-            ds_chomp(&ds, ',');
+            ds_put_format(&ds, "replicating: %s\n", rdb->sync_from);
 
-            if (!shash_is_empty(&excluded_tables)) {
-                ds_put_char(&ds, '\n');
+            if (!sset_is_empty(&rdb->excluded_tables)) {
                 ds_put_cstr(&ds, "exclude: ");
-                ds_put_and_free_cstr(&ds, get_excluded_tables());
+                ds_put_and_free_cstr(&ds, get_excluded_tables(db));
             }
             break;
         }
         case RPL_S_ERR:
-            ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
+            ds_put_format(&ds, "Replication to (%s) failed", rdb->sync_from);
             break;
         default:
             OVS_NOT_REACHED();
         }
     } else {
-        ds_put_format(&ds, "not connected to %s", sync_from);
+        ds_put_format(&ds, "not connected to %s", rdb->sync_from);
     }
     return ds_steal_cstr(&ds);
 }
@@ -913,10 +873,12 @@  is_replication_possible(struct ovsdb_schema *local_db_schema,
 }
 
 void
-replication_set_probe_interval(int probe_interval)
+replication_set_probe_interval(const struct ovsdb *db, int probe_interval)
 {
-    if (session) {
-        jsonrpc_session_set_probe_interval(session, probe_interval);
+    const struct replication_db *rdb = find_db(db->name);
+
+    if (rdb && rdb->session) {
+        jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
     }
 }
 
diff --git a/ovsdb/replication.h b/ovsdb/replication.h
index 6d1be820f..f5e226753 100644
--- a/ovsdb/replication.h
+++ b/ovsdb/replication.h
@@ -26,41 +26,41 @@  struct ovsdb;
  * API Usage
  *===========
  *
- * - replication_init() needs to be called whenever OVSDB server switches into
+ * - replication_set_db() needs to be called whenever database switches into
  *   the backup mode.
  *
- * - replication_add_local_db() should be called immediately after to add all
- *   known database that OVSDB server owns, one at a time.
+ * - replication_remove_db() needs to be called whenever backup database
+ *   switches into an active mode.
  *
  * - replication_destroy() should be called when OVSDB server shutdown to
  *   reclaim resources.
  *
  * - replication_run(), replication_wait(), replication_is_alive() and
  *   replication_get_last_error() should be call within the main loop
- *   whenever OVSDB server runs in the backup mode.
+ *   whenever OVSDB has backup databases.
  *
- * - set_excluded_tables(), get_excluded_tables(), disconnect_active_server()
- *   and replication_usage() are support functions used mainly by unixctl
- *   commands.
+ * - parse_excluded_tables(), get_excluded_tables() and replication_usage()
+ *   are support functions used mainly by unixctl commands.
  */
 
 #define REPLICATION_DEFAULT_PROBE_INTERVAL 60000
 
-void replication_init(const char *sync_from, const char *exclude_tables,
-                      const struct uuid *server, int probe_interval);
+void replication_set_db(struct ovsdb *, const char *sync_from,
+                        const char *exclude_tables, const struct uuid *server,
+                        int probe_interval);
+void replication_remove_db(const struct ovsdb *);
+
 void replication_run(void);
 void replication_wait(void);
 void replication_destroy(void);
 void replication_usage(void);
-void replication_add_local_db(const char *databse, struct ovsdb *db);
-bool replication_is_alive(void);
-int replication_get_last_error(void);
-char *replication_status(void);
-void replication_set_probe_interval(int);
+bool replication_is_alive(const struct ovsdb *);
+int replication_get_last_error(const struct ovsdb *);
+char *replication_status(const struct ovsdb *);
+void replication_set_probe_interval(const struct ovsdb *, int probe_interval);
 
-char *set_excluded_tables(const char *excluded, bool dryrun)
-    OVS_WARN_UNUSED_RESULT;
-char *get_excluded_tables(void) OVS_WARN_UNUSED_RESULT;
-void disconnect_active_server(void);
+char *parse_excluded_tables(const char *excluded) OVS_WARN_UNUSED_RESULT;
+char *get_excluded_tables(const struct ovsdb *) OVS_WARN_UNUSED_RESULT;
+void disconnect_active_server(const struct ovsdb *);
 
 #endif /* ovsdb/replication.h */