@@ -30,6 +30,7 @@
#include "replication.h"
#include "row.h"
#include "sset.h"
+#include "storage.h"
#include "stream.h"
#include "svec.h"
#include "table.h"
@@ -105,11 +106,18 @@ static enum ovsdb_replication_state state;
* schema matches. */
static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
static struct shash *replication_dbs;
+static struct shash unbacked_dbs = SHASH_INITIALIZER(&unbacked_dbs);
static struct shash *replication_db_clone(struct shash *dbs);
static void replication_dbs_destroy(void);
/* Find 'struct ovsdb' by name within 'replication_dbs' */
static struct ovsdb* find_db(const char *db_name);
+
+static void create_unbacked_db(struct ovsdb *db);
+static void unbacked_dbs_destroy(void);
+static struct ovsdb *find_unbacked_db(const char *db_name);
+static struct ovsdb_error *clone_from_unbacked_database(
+ struct ovsdb *db, struct ovsdb *unbacked_db);
void
@@ -123,7 +131,7 @@ replication_init(const char *sync_from_, const char *exclude_tables,
ovs_assert(!set_blacklist_tables(exclude_tables, false));
replication_dbs_destroy();
-
+ unbacked_dbs_destroy();
shash_clear(&local_dbs);
if (session) {
jsonrpc_session_close(session);
@@ -299,20 +307,7 @@ replication_run(void)
/* After receiving schemas, reset the local databases that
* will be monitored and send out monitor requests for them. */
if (hmap_is_empty(&request_ids)) {
- struct shash_node *node, *next;
-
- SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
- db = node->data;
- error = reset_database(db);
- if (error) {
- const char *db_name = db->schema->name;
- shash_find_and_delete(replication_dbs, db_name);
- ovsdb_error_assert(error);
- VLOG_WARN("Failed to reset database, "
- "%s not replicated.", db_name);
- }
- }
-
+ struct shash_node *node;
if (shash_is_empty(replication_dbs)) {
VLOG_WARN("Nothing to replicate.");
state = RPL_S_ERR;
@@ -321,7 +316,7 @@ replication_run(void)
db = node->data;
struct jsonrpc_msg *request =
create_monitor_request(db);
-
+ create_unbacked_db(db);
request_ids_add(request->id, db);
jsonrpc_session_send(session, request);
VLOG_DBG("Send monitor requests");
@@ -335,18 +330,51 @@ replication_run(void)
case RPL_S_MONITOR_REQUESTED: {
/* Reply to monitor requests. */
struct ovsdb_error *error;
- error = process_notification(msg->result, db);
+ struct ovsdb *unbacked_db = find_unbacked_db(db->name);
+ error = process_notification(msg->result, unbacked_db);
if (error) {
ovsdb_error_assert(error);
state = RPL_S_ERR;
- } else {
- /* Transition to replicating state after receiving
- * all replies of "monitor" requests. */
- if (hmap_is_empty(&request_ids)) {
+ break;
+ }
+ if (!hmap_is_empty(&request_ids)) {
+ break;
+ }
+ if (shash_is_empty(replication_dbs)) {
+ VLOG_WARN("Nothing to replicate.");
+ state = RPL_S_ERR;
+ break;
+ }
+
+ struct shash_node *node;
+ SHASH_FOR_EACH (node, replication_dbs) {
+ db = node->data;
+ /* Reset the local databases and then clone from the
+ * unbacked db.
+ */
+ error = reset_database(db);
+ if (error) {
+ const char *db_name = db->schema->name;
+ shash_find_and_delete(replication_dbs, db_name);
+ ovsdb_error_assert(error);
+ VLOG_WARN("Failed to reset database, "
+ "%s not replicated.", db_name);
+ state = RPL_S_ERR;
+ break;
+ }
+ unbacked_db = find_unbacked_db(db->name);
+ error = clone_from_unbacked_database(db, unbacked_db);
+ if (error) {
+ ovsdb_error_assert(error);
+ VLOG_WARN("Failed to set database, "
+ "%s not replicated.", db->name);
+ state = RPL_S_ERR;
+ } else {
VLOG_DBG("Listening to monitor updates");
state = RPL_S_REPLICATING;
}
}
+ unbacked_dbs_destroy();
break;
}
@@ -508,7 +536,7 @@ replication_destroy(void)
request_ids_destroy();
replication_dbs_destroy();
-
+ unbacked_dbs_destroy();
shash_destroy(&local_dbs);
}
@@ -517,6 +545,37 @@ find_db(const char *db_name)
{
return shash_find_data(replication_dbs, db_name);
}
+
+static void
+create_unbacked_db(struct ovsdb *db)
+{
+ struct ovsdb *unbacked_db = ovsdb_create(ovsdb_schema_clone(db->schema),
+ ovsdb_storage_create_unbacked());
+ struct shash_node *node = shash_find(&unbacked_dbs, db->name);
+ if (node) {
+ shash_delete(&unbacked_dbs, node);
+ ovsdb_destroy(node->data);
+ }
+ shash_add_assert(&unbacked_dbs, db->name, unbacked_db);
+}
+
+static void
+unbacked_dbs_destroy(void)
+{
+ struct shash_node *node, *next;
+
+ SHASH_FOR_EACH_SAFE (node, next, &unbacked_dbs) {
+ ovsdb_destroy(node->data);
+ shash_delete(&unbacked_dbs, node);
+ }
+ shash_destroy(&unbacked_dbs);
+}
+
+static struct ovsdb *
+find_unbacked_db(const char *db_name)
+{
+ return shash_find_data(&unbacked_dbs, db_name);
+}
static struct ovsdb_error *
reset_database(struct ovsdb *db)
@@ -538,6 +597,40 @@ reset_database(struct ovsdb *db)
return ovsdb_txn_propose_commit_block(txn, false);
}
+static struct ovsdb_error *
+clone_from_unbacked_database(struct ovsdb *db, struct ovsdb *unbacked_db)
+{
+ struct ovsdb_txn *txn = ovsdb_txn_create(db);
+ struct shash_node *table_node;
+
+ SHASH_FOR_EACH (table_node, &unbacked_db->tables) {
+ struct ovsdb_table *unbacked_db_table = table_node->data;
+ struct ovsdb_table *table = ovsdb_get_table(db, table_node->name);
+ struct ovsdb_row *unbacked_db_row, *next;
+ struct ovsdb_column_set all_columns;
+ ovsdb_column_set_init(&all_columns);
+ ovsdb_column_set_add_all(&all_columns, unbacked_db_table);
+
+ HMAP_FOR_EACH_SAFE (unbacked_db_row, next, hmap_node,
+ &unbacked_db_table->rows) {
+ const struct uuid *row_uuid = ovsdb_row_get_uuid(unbacked_db_row);
+ struct ovsdb_row *row = ovsdb_row_create(table);
+ struct json *unbacked_db_json_row =
+ ovsdb_row_to_json(unbacked_db_row, &all_columns);
+ struct ovsdb_error *error = ovsdb_row_from_json(
+ row, unbacked_db_json_row, NULL, NULL);
+ if (error) {
+ return error;
+ }
+ *ovsdb_row_get_uuid_rw(row) = *row_uuid;
+ ovsdb_txn_row_insert(txn, row);
+ }
+ ovsdb_column_set_destroy(&all_columns);
+ }
+
+ return ovsdb_txn_propose_commit_block(txn, false);
+}
+
/* Create a monitor request for 'db'. The monitor request will include
* any tables from 'blacklisted_tables'
*