@@ -219,6 +219,7 @@ main_loop(struct server_config *config,
struct shash_node *next;
SHASH_FOR_EACH_SAFE (node, next, all_dbs) {
struct db *db = node->data;
+ ovsdb_txn_history_run(db->db);
if (ovsdb_trigger_run(db->db, time_msec())) {
/* The message below is currently the only reason to disconnect
* all clients. */
@@ -568,6 +569,7 @@ parse_txn(struct server_config *config, struct db *db,
error = ovsdb_file_txn_from_json(db->db, txn_json, false, &txn);
if (!error) {
+ ovsdb_txn_set_txnid(txnid, txn);
log_and_free_error(ovsdb_txn_replay_commit(txn));
}
if (!error && !uuid_is_zero(txnid)) {
@@ -658,6 +660,11 @@ open_db(struct server_config *config, const char *filename)
db->db = ovsdb_create(schema, storage);
ovsdb_jsonrpc_server_add_db(config->jsonrpc, db->db);
+ /* Enable txn history for clustered mode. It is not enabled for other mode
+ * for now, since txn id is available for clustered mode only. */
+ if (ovsdb_storage_is_clustered(storage)) {
+ ovsdb_txn_history_init(db->db);
+ }
read_db(config, db);
error = (db->db->name[0] == '_'
@@ -695,6 +702,8 @@ add_server_db(struct server_config *config)
json_destroy(schema_json);
struct db *db = xzalloc(sizeof *db);
+ /* We don't need txn_history for server_db. */
+
db->filename = xstrdup("<internal>");
db->db = ovsdb_create(schema, ovsdb_storage_create_unbacked());
bool ok OVS_UNUSED = ovsdb_jsonrpc_server_add_db(config->jsonrpc, db->db);
@@ -455,6 +455,9 @@ ovsdb_destroy(struct ovsdb *db)
/* Remove all the monitors. */
ovsdb_monitors_remove(db);
+ /* Destroy txn history. */
+ ovsdb_txn_history_destroy(db);
+
/* The caller must ensure that no triggers remain. */
ovs_assert(ovs_list_is_empty(&db->triggers));
@@ -535,6 +538,9 @@ ovsdb_replace(struct ovsdb *dst, struct ovsdb *src)
ovsdb_trigger_prereplace_db(trigger);
}
+ /* Destroy txn history. */
+ ovsdb_txn_history_destroy(dst);
+
struct ovsdb_schema *tmp_schema = dst->schema;
dst->schema = src->schema;
src->schema = tmp_schema;
@@ -67,6 +67,11 @@ bool ovsdb_parse_version(const char *, struct ovsdb_version *);
bool ovsdb_is_valid_version(const char *);
/* Database. */
+struct ovsdb_txn_history_node {
+ struct ovs_list node; /* Element in struct ovsdb's txn_history list */
+ struct ovsdb_txn *txn;
+};
+
struct ovsdb {
char *name;
struct ovsdb_schema *schema;
@@ -80,6 +85,11 @@ struct ovsdb {
bool run_triggers;
struct ovsdb_table *rbac_role;
+
+ /* History trasanctions for incremental monitor transfer. */
+ bool need_txn_history; /* Need to maintain history of transactions. */
+ unsigned int n_txn_history; /* Current number of history transactions. */
+ struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. */
};
struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
@@ -40,6 +40,7 @@ struct ovsdb_txn {
struct ovsdb *db;
struct ovs_list txn_tables; /* Contains "struct ovsdb_txn_table"s. */
struct ds comment;
+ struct uuid txnid; /* For clustered mode only. It is the eid. */
};
/* A table modified by a transaction. */
@@ -106,13 +107,19 @@ static unsigned int serial;
struct ovsdb_txn *
ovsdb_txn_create(struct ovsdb *db)
{
- struct ovsdb_txn *txn = xmalloc(sizeof *txn);
+ struct ovsdb_txn *txn = xzalloc(sizeof *txn);
txn->db = db;
ovs_list_init(&txn->txn_tables);
ds_init(&txn->comment);
return txn;
}
+void
+ovsdb_txn_set_txnid(const struct uuid *txnid, struct ovsdb_txn *txn)
+{
+ txn->txnid = *txnid;
+}
+
static void
ovsdb_txn_free(struct ovsdb_txn *txn)
{
@@ -881,11 +888,79 @@ ovsdb_txn_precommit(struct ovsdb_txn *txn)
return error;
}
+static struct ovsdb_txn*
+ovsdb_txn_clone(const struct ovsdb_txn *txn)
+{
+ struct ovsdb_txn *txn_cloned = xzalloc(sizeof *txn_cloned);
+ ovs_list_init(&txn_cloned->txn_tables);
+ txn_cloned->txnid = txn->txnid;
+
+ struct ovsdb_txn_table *t;
+ LIST_FOR_EACH (t, node, &txn->txn_tables) {
+ struct ovsdb_txn_table *t_cloned = xmalloc(sizeof *t_cloned);
+ ovs_list_push_back(&txn_cloned->txn_tables, &t_cloned->node);
+ hmap_init(&t_cloned->txn_rows);
+
+ struct ovsdb_txn_row *r;
+ HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
+ size_t n_columns = shash_count(&t->table->schema->columns);
+ struct ovsdb_txn_row *r_cloned =
+ xzalloc(offsetof(struct ovsdb_txn_row, changed)
+ + bitmap_n_bytes(n_columns));
+
+ r_cloned->uuid = r->uuid;
+ r_cloned->table = r->table;
+ r_cloned->old = r->old ? ovsdb_row_clone(r->old) : NULL;
+ r_cloned->new = r->new ? ovsdb_row_clone(r->new) : NULL;
+ memcpy(&r_cloned->changed, &r->changed, bitmap_n_bytes(n_columns));
+ hmap_insert(&t_cloned->txn_rows, &r_cloned->hmap_node,
+ uuid_hash(&r_cloned->uuid));
+ }
+ }
+ return txn_cloned;
+}
+
+static void
+ovsdb_txn_destroy_cloned(struct ovsdb_txn *txn)
+{
+ ovs_assert(!txn->db);
+ struct ovsdb_txn_table *t, *next_txn_table;
+ LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
+ struct ovsdb_txn_row *r, *next_txn_row;
+ HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
+ if (r->old) {
+ ovsdb_row_destroy(r->old);
+ }
+ if (r->new) {
+ ovsdb_row_destroy(r->new);
+ }
+ hmap_remove(&t->txn_rows, &r->hmap_node);
+ free(r);
+ }
+ hmap_destroy(&t->txn_rows);
+ ovs_list_remove(&t->node);
+ free(t);
+ }
+ free(txn);
+}
+
+static void
+ovsdb_txn_add_to_history(struct ovsdb_txn *txn)
+{
+ if (txn->db->need_txn_history) {
+ struct ovsdb_txn_history_node *node = xzalloc(sizeof *node);
+ node->txn = ovsdb_txn_clone(txn);
+ ovs_list_push_back(&txn->db->txn_history, &node->node);
+ txn->db->n_txn_history++;
+ }
+}
+
/* Finalize commit. */
void
ovsdb_txn_complete(struct ovsdb_txn *txn)
{
if (!ovsdb_txn_is_empty(txn)) {
+
txn->db->run_triggers = true;
ovsdb_monitors_commit(txn->db, txn);
ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_update_weak_refs));
@@ -906,6 +981,7 @@ ovsdb_txn_replay_commit(struct ovsdb_txn *txn)
if (error) {
ovsdb_txn_abort(txn);
} else {
+ ovsdb_txn_add_to_history(txn);
ovsdb_txn_complete(txn);
}
return error;
@@ -1304,3 +1380,46 @@ for_each_txn_row(struct ovsdb_txn *txn,
return NULL;
}
+
+void
+ovsdb_txn_history_run(struct ovsdb *db)
+{
+ if (!db->need_txn_history) {
+ return;
+ }
+ /* Remove old histories to limit the size of the history */
+ while (db->n_txn_history > 100) {
+ struct ovsdb_txn_history_node *txn_h_node = CONTAINER_OF(
+ ovs_list_pop_front(&db->txn_history),
+ struct ovsdb_txn_history_node, node);
+
+ ovsdb_txn_destroy_cloned(txn_h_node->txn);
+ free(txn_h_node);
+ db->n_txn_history--;
+ }
+}
+
+void
+ovsdb_txn_history_init(struct ovsdb *db)
+{
+ db->need_txn_history = true;
+ db->n_txn_history = 0;
+ ovs_list_init(&db->txn_history);
+}
+
+void
+ovsdb_txn_history_destroy(struct ovsdb *db)
+{
+
+ if (!db->need_txn_history) {
+ return;
+ }
+
+ struct ovsdb_txn_history_node *txn_h_node, *next;
+ LIST_FOR_EACH_SAFE (txn_h_node, next, node, &db->txn_history) {
+ ovs_list_remove(&txn_h_node->node);
+ ovsdb_txn_destroy_cloned(txn_h_node->txn);
+ free(txn_h_node);
+ }
+ db->n_txn_history = 0;
+}
@@ -25,6 +25,7 @@ struct ovsdb_table;
struct uuid;
struct ovsdb_txn *ovsdb_txn_create(struct ovsdb *);
+void ovsdb_txn_set_txnid(const struct uuid *, struct ovsdb_txn *);
void ovsdb_txn_abort(struct ovsdb_txn *);
struct ovsdb_error *ovsdb_txn_replay_commit(struct ovsdb_txn *)
@@ -59,5 +60,8 @@ void ovsdb_txn_for_each_change(const struct ovsdb_txn *,
void ovsdb_txn_add_comment(struct ovsdb_txn *, const char *);
const char *ovsdb_txn_get_comment(const struct ovsdb_txn *);
+void ovsdb_txn_history_run(struct ovsdb *);
+void ovsdb_txn_history_init(struct ovsdb *);
+void ovsdb_txn_history_destroy(struct ovsdb *);
#endif /* ovsdb/transaction.h */