@@ -1216,8 +1216,7 @@ struct ovsdb_jsonrpc_monitor {
struct ovsdb *db;
struct json *monitor_id;
struct ovsdb_monitor *dbmon;
- uint64_t unflushed; /* The first transaction that has not been
- flushed to the jsonrpc remote client. */
+ struct ovsdb_monitor_change_set *change_set;
enum ovsdb_monitor_version version;
struct ovsdb_monitor_session_condition *condition;/* Session's condition */
};
@@ -1389,7 +1388,6 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
if (version == OVSDB_MONITOR_V2) {
m->condition = ovsdb_monitor_session_condition_create();
}
- m->unflushed = 0;
m->version = version;
hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
m->monitor_id = json_clone(monitor_id);
@@ -1436,7 +1434,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
dbmon = ovsdb_monitor_add(m->dbmon);
if (dbmon != m->dbmon) {
/* Found an exisiting dbmon, reuse the current one. */
- ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+ ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, NULL);
ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
m->dbmon = dbmon;
}
@@ -1446,7 +1444,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
ovsdb_monitor_condition_bind(m->dbmon, m->condition);
}
- ovsdb_monitor_get_initial(m->dbmon);
+ ovsdb_monitor_get_initial(m->dbmon, &m->change_set);
json = ovsdb_jsonrpc_monitor_compose_update(m, true);
json = json ? json : json_object_create();
return jsonrpc_create_reply(json, request_id);
@@ -1578,7 +1576,7 @@ ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
struct json *update_json;
update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
- m->condition, m->version, &m->unflushed);
+ m->condition, m->version, &m->change_set);
if (update_json) {
struct jsonrpc_msg *msg;
struct json *p;
@@ -1648,12 +1646,12 @@ ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
bool initial)
{
- if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+ if (!ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) {
return NULL;
}
return ovsdb_monitor_get_update(m->dbmon, initial, false,
- m->condition, m->version, &m->unflushed);
+ m->condition, m->version, &m->change_set);
}
static bool
@@ -1662,7 +1660,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
struct ovsdb_jsonrpc_monitor *m;
HMAP_FOR_EACH (m, node, &s->monitors) {
- if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+ if (ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) {
return true;
}
}
@@ -1686,7 +1684,7 @@ ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m,
json_destroy(m->monitor_id);
hmap_remove(&m->session->monitors, &m->node);
- ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+ ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->change_set);
ovsdb_monitor_session_condition_destroy(m->condition);
free(m);
}
@@ -69,17 +69,31 @@ struct ovsdb_monitor {
struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */
struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */
struct ovsdb *db;
- uint64_t n_transactions; /* Count number of committed transactions. */
- struct hmap_node hmap_node; /* Elements within ovsdb_monitors. */
- struct hmap json_cache; /* Contains "ovsdb_monitor_json_cache_node"s.*/
+
+ /* Contains "ovsdb_monitor_change_set". Each change set contains changes
+ * from some start point up to the latest committed transaction. There can
+ * be different change sets for the same struct ovsdb_monitor because there
+ * are different clients pending on changes starting from different points.
+ * The different change sets are maintained as a list. */
+ struct ovs_list change_sets;
+
+ /* The new change set that is to be populated for future transactions. */
+ struct ovsdb_monitor_change_set *new_change_set;
+
+ /* The change set that starts from the first transaction of the DB, which
+ * is used for populating the initial data for new clients. */
+ struct ovsdb_monitor_change_set *init_change_set;
+
+ struct hmap_node hmap_node; /* Elements within ovsdb_monitors. */
+ struct hmap json_cache; /* Contains "ovsdb_monitor_json_cache_node"s.*/
};
-/* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
- * inclusive. */
+/* A json object of updates for the ovsdb_monitor_change_set and the given
+ * monitor version. */
struct ovsdb_monitor_json_cache_node {
struct hmap_node hmap_node; /* Elements in json cache. */
enum ovsdb_monitor_version version;
- uint64_t from_txn;
+ struct uuid change_set_uuid;
struct json *json; /* Null, or a cloned of json */
};
@@ -97,29 +111,48 @@ struct ovsdb_monitor_column {
/* A row that has changed in a monitored table. */
struct ovsdb_monitor_row {
- struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
+ struct hmap_node hmap_node; /* In ovsdb_monitor_change_set_for_table. */
struct uuid uuid; /* UUID of row that changed. */
struct ovsdb_datum *old; /* Old data, NULL for an inserted row. */
struct ovsdb_datum *new; /* New data, NULL for a deleted row. */
};
-/* Contains 'struct ovsdb_monitor_row's for rows that have been
- * updated but not yet flushed to all the jsonrpc connection.
+/* Contains a set of changes that are not yet flushed to all the jsonrpc
+ * connections.
*
- * 'n_refs' represent the number of jsonrpc connections that have
- * not received updates. Generate the update for the last jsonprc
- * connection will also destroy the whole "struct ovsdb_monitor_changes"
- * object.
- *
- * 'transaction' stores the first update's transaction id.
- * */
-struct ovsdb_monitor_changes {
- struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes
- hmap. */
+ * 'n_refs' represent the number of jsonrpc connections that depend on this
+ * change set (have not received updates). Generate the update for the last
+ * jsonprc connection will also destroy the whole "struct
+ * ovsdb_monitor_change_set" object.
+ */
+struct ovsdb_monitor_change_set {
+ /* Element in change_sets of ovsdb_monitor. */
+ struct ovs_list list_node;
+
+ /* Internally generated uuid that identifies this data structure. */
+ struct uuid uuid;
+
+ /* Contains struct ovsdb_monitor_change_set_for_table. */
+ struct ovs_list change_set_for_tables;
+
+ int n_refs;
+};
+
+/* Contains 'struct ovsdb_monitor_row's for rows in a specific table
+ * of struct ovsdb_monitor_change_set. It can also be searched from
+ * member 'change_sets' of struct ovsdb_monitor_table. */
+struct ovsdb_monitor_change_set_for_table {
+ /* Element in ovsdb_monitor_tables' change_sets list. */
+ struct ovs_list list_in_mt;
+
+ /* Element in ovsdb_monitor_change_sets' change_set_for_tables list. */
+ struct ovs_list list_in_change_set;
+
struct ovsdb_monitor_table *mt;
+ struct ovsdb_monitor_change_set *mcs;
+
+ /* Contains struct ovsdb_monitor_row. */
struct hmap rows;
- int n_refs;
- uint64_t transaction;
};
/* A particular table being monitored. */
@@ -141,8 +174,8 @@ struct ovsdb_monitor_table {
* ovsdb_monitor_row. It is used for condition evaluation. */
unsigned int *columns_index_map;
- /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
- struct hmap changes;
+ /* Contains 'ovsdb_monitor_change_set_for_table'. */
+ struct ovs_list change_sets;
};
enum ovsdb_monitor_row_type {
@@ -159,36 +192,23 @@ typedef struct json *
bool initial, unsigned long int *changed);
static void ovsdb_monitor_destroy(struct ovsdb_monitor *);
-static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
- struct ovsdb_monitor_table *, uint64_t next_txn);
-static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
- struct ovsdb_monitor_table *, uint64_t unflushed);
-static void ovsdb_monitor_changes_destroy(
- struct ovsdb_monitor_changes *);
-static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *,
- uint64_t unflushed);
-
-static uint32_t
-json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
-{
- uint32_t hash;
-
- hash = hash_uint64(version);
- hash = hash_uint64_basis(from_txn, hash);
-
- return hash;
-}
+static struct ovsdb_monitor_change_set * ovsdb_monitor_add_change_set(
+ struct ovsdb_monitor *, bool init_only);
+static void ovsdb_monitor_change_set_destroy(
+ struct ovsdb_monitor_change_set *);
+static void ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *);
static struct ovsdb_monitor_json_cache_node *
ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
enum ovsdb_monitor_version version,
- uint64_t from_txn)
+ struct ovsdb_monitor_change_set *change_set)
{
struct ovsdb_monitor_json_cache_node *node;
- uint32_t hash = json_cache_hash(version, from_txn);
+ uint32_t hash = uuid_hash(&change_set->uuid);
HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
- if (node->from_txn == from_txn && node->version == version) {
+ if (uuid_equals(&node->change_set_uuid, &change_set->uuid) &&
+ node->version == version) {
return node;
}
}
@@ -199,15 +219,16 @@ ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
static void
ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
enum ovsdb_monitor_version version,
- uint64_t from_txn, struct json *json)
+ struct ovsdb_monitor_change_set *change_set,
+ struct json *json)
{
struct ovsdb_monitor_json_cache_node *node;
- uint32_t hash = json_cache_hash(version, from_txn);
+ uint32_t hash = uuid_hash(&change_set->uuid);
node = xmalloc(sizeof *node);
node->version = version;
- node->from_txn = from_txn;
+ node->change_set_uuid = change_set->uuid;
node->json = json ? json_clone(json) : NULL;
hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
@@ -241,8 +262,9 @@ compare_ovsdb_monitor_column(const void *a_, const void *b_)
/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
* given 'uuid', or NULL if there is no such row. */
static struct ovsdb_monitor_row *
-ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
- const struct uuid *uuid)
+ovsdb_monitor_changes_row_find(
+ const struct ovsdb_monitor_change_set_for_table *changes,
+ const struct uuid *uuid)
{
struct ovsdb_monitor_row *row;
@@ -375,7 +397,7 @@ ovsdb_monitor_create(struct ovsdb *db,
ovs_list_push_back(&db->monitors, &dbmon->list_node);
ovs_list_init(&dbmon->jsonrpc_monitors);
dbmon->db = db;
- dbmon->n_transactions = 0;
+ ovs_list_init(&dbmon->change_sets);
shash_init(&dbmon->tables);
hmap_node_nullify(&dbmon->hmap_node);
hmap_init(&dbmon->json_cache);
@@ -395,7 +417,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
mt = xzalloc(sizeof *mt);
mt->table = table;
shash_add(&m->tables, table->schema->name, mt);
- hmap_init(&mt->changes);
+ ovs_list_init(&mt->change_sets);
mt->columns_index_map =
xmalloc(sizeof *mt->columns_index_map * n_columns);
for (i = 0; i < n_columns; i++) {
@@ -481,81 +503,87 @@ ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
return shash_find_data(&m->tables, table->schema->name);
}
-static struct ovsdb_monitor_changes *
-ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
- uint64_t next_txn)
+static struct ovsdb_monitor_change_set *
+ovsdb_monitor_add_change_set(struct ovsdb_monitor *dbmon,
+ bool init_only)
{
- struct ovsdb_monitor_changes *changes;
-
- changes = xzalloc(sizeof *changes);
-
- changes->transaction = next_txn;
- changes->mt = mt;
- changes->n_refs = 1;
- hmap_init(&changes->rows);
- hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
-
- return changes;
-};
+ struct ovsdb_monitor_change_set *change_set = xzalloc(sizeof *change_set);
+ change_set->uuid = uuid_random();
+ ovs_list_push_back(&(dbmon->change_sets), &change_set->list_node);
+ ovs_list_init(&change_set->change_set_for_tables);
+ change_set->n_refs = 1;
-static struct ovsdb_monitor_changes *
-ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
- uint64_t transaction)
-{
- struct ovsdb_monitor_changes *changes;
- size_t hash = hash_uint64(transaction);
-
- HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
- if (changes->transaction == transaction) {
- return changes;
+ struct shash_node *node;
+ SHASH_FOR_EACH (node, &dbmon->tables) {
+ struct ovsdb_monitor_table *mt = node->data;
+ if (!init_only || (mt->select & OJMS_INITIAL)) {
+ struct ovsdb_monitor_change_set_for_table *mcst =
+ xzalloc(sizeof *mcst);
+ mcst->mt = mt;
+ mcst->mcs = change_set;
+ hmap_init(&mcst->rows);
+ ovs_list_push_back(&mt->change_sets, &mcst->list_in_mt);
+ ovs_list_push_back(&change_set->change_set_for_tables,
+ &mcst->list_in_change_set);
}
}
- return NULL;
-}
+ return change_set;
+};
/* Stop currently tracking changes to table 'mt' since 'transaction'. */
static void
-ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
- uint64_t transaction)
+ovsdb_monitor_untrack_change_set(struct ovsdb_monitor *dbmon,
+ struct ovsdb_monitor_change_set *mcs)
{
- struct ovsdb_monitor_changes *changes =
- ovsdb_monitor_table_find_changes(mt, transaction);
- if (changes) {
- if (--changes->n_refs == 0) {
- hmap_remove(&mt->changes, &changes->hmap_node);
- ovsdb_monitor_changes_destroy(changes);
+ ovs_assert(mcs);
+ if (--mcs->n_refs == 0) {
+ ovs_list_remove(&mcs->list_node);
+ if (mcs == dbmon->init_change_set) {
+ dbmon->init_change_set = NULL;
+ } else if (mcs == dbmon->new_change_set) {
+ dbmon->new_change_set = NULL;
}
+ ovsdb_monitor_change_set_destroy(mcs);
}
}
/* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
*/
static void
-ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
- uint64_t transaction)
+ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *dbmon)
{
- struct ovsdb_monitor_changes *changes;
+ struct ovsdb_monitor_change_set *change_set = dbmon->new_change_set;
- changes = ovsdb_monitor_table_find_changes(mt, transaction);
- if (changes) {
- changes->n_refs++;
+ if (change_set) {
+ change_set->n_refs++;
} else {
- ovsdb_monitor_table_add_changes(mt, transaction);
+ change_set = ovsdb_monitor_add_change_set(dbmon, false);
+ dbmon->new_change_set = change_set;
}
}
static void
-ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
+ovsdb_monitor_change_set_destroy(struct ovsdb_monitor_change_set *mcs)
{
- struct ovsdb_monitor_row *row, *next;
+ ovs_list_remove(&mcs->list_node);
+
+ struct ovsdb_monitor_change_set_for_table *mcst, *next_mcst;
+ LIST_FOR_EACH_SAFE (mcst, next_mcst, list_in_change_set,
+ &mcs->change_set_for_tables) {
+ ovs_list_remove(&mcst->list_in_change_set);
+ ovs_list_remove(&mcst->list_in_mt);
+
+ struct ovsdb_monitor_row *row, *next;
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
+ hmap_remove(&mcst->rows, &row->hmap_node);
+ ovsdb_monitor_row_destroy(mcst->mt, row);
+ }
+ hmap_destroy(&mcst->rows);
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
- hmap_remove(&changes->rows, &row->hmap_node);
- ovsdb_monitor_row_destroy(changes->mt, row);
+ free(mcst);
}
- hmap_destroy(&changes->rows);
- free(changes);
+ free(mcs);
}
static enum ovsdb_monitor_selection
@@ -1008,28 +1036,22 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
static struct json*
ovsdb_monitor_compose_update(
struct ovsdb_monitor *dbmon,
- bool initial, uint64_t transaction,
+ bool initial, struct ovsdb_monitor_change_set *mcs,
const struct ovsdb_monitor_session_condition *condition,
compose_row_update_cb_func row_update)
{
- struct shash_node *node;
struct json *json;
size_t max_columns = ovsdb_monitor_max_columns(dbmon);
unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
json = NULL;
- SHASH_FOR_EACH (node, &dbmon->tables) {
- struct ovsdb_monitor_table *mt = node->data;
+ struct ovsdb_monitor_change_set_for_table *mcst;
+ LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
struct ovsdb_monitor_row *row, *next;
- struct ovsdb_monitor_changes *changes;
struct json *table_json = NULL;
+ struct ovsdb_monitor_table *mt = mcst->mt;
- changes = ovsdb_monitor_table_find_changes(mt, transaction);
- if (!changes) {
- continue;
- }
-
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
struct json *row_json;
row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
initial, changed);
@@ -1105,39 +1127,37 @@ ovsdb_monitor_get_update(
bool initial, bool cond_updated,
struct ovsdb_monitor_session_condition *condition,
enum ovsdb_monitor_version version,
- uint64_t *unflushed_)
+ struct ovsdb_monitor_change_set **p_mcs)
{
struct ovsdb_monitor_json_cache_node *cache_node = NULL;
- struct shash_node *node;
struct json *json;
- const uint64_t unflushed = *unflushed_;
- const uint64_t next_unflushed = dbmon->n_transactions + 1;
+ struct ovsdb_monitor_change_set *mcs = *p_mcs;
- ovs_assert(cond_updated ? unflushed == next_unflushed : true);
+ ovs_assert(cond_updated ? mcs == dbmon->new_change_set : true);
/* Return a clone of cached json if one exists. Otherwise,
* generate a new one and add it to the cache. */
if (!condition || (!condition->conditional && !cond_updated)) {
cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
- unflushed);
+ mcs);
}
if (cache_node) {
json = cache_node->json ? json_clone(cache_node->json) : NULL;
} else {
if (version == OVSDB_MONITOR_V1) {
json =
- ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+ ovsdb_monitor_compose_update(dbmon, initial, mcs,
condition,
ovsdb_monitor_compose_row_update);
} else {
ovs_assert(version == OVSDB_MONITOR_V2);
if (!cond_updated) {
- json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+ json = ovsdb_monitor_compose_update(dbmon, initial, mcs,
condition,
ovsdb_monitor_compose_row_update2);
if (!condition || !condition->conditional) {
- ovsdb_monitor_json_cache_insert(dbmon, version, unflushed,
+ ovsdb_monitor_json_cache_insert(dbmon, version, mcs,
json);
}
} else {
@@ -1149,24 +1169,20 @@ ovsdb_monitor_get_update(
}
}
- /* Maintain transaction id of 'changes'. */
- SHASH_FOR_EACH (node, &dbmon->tables) {
- struct ovsdb_monitor_table *mt = node->data;
-
- ovsdb_monitor_table_untrack_changes(mt, unflushed);
- ovsdb_monitor_table_track_changes(mt, next_unflushed);
- }
- *unflushed_ = next_unflushed;
+ /* Maintain tracking change set. */
+ ovsdb_monitor_untrack_change_set(dbmon, mcs);
+ ovsdb_monitor_track_new_change_set(dbmon);
+ *p_mcs = dbmon->new_change_set;
return json;
}
bool
ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
- uint64_t next_transaction)
+ struct ovsdb_monitor_change_set *change_set)
{
- ovs_assert(next_transaction <= dbmon->n_transactions + 1);
- return (next_transaction <= dbmon->n_transactions);
+ ovs_assert(change_set);
+ return (change_set != dbmon->new_change_set);
}
void
@@ -1225,15 +1241,15 @@ static void
ovsdb_monitor_changes_update(const struct ovsdb_row *old,
const struct ovsdb_row *new,
const struct ovsdb_monitor_table *mt,
- struct ovsdb_monitor_changes *changes)
+ struct ovsdb_monitor_change_set_for_table *mcst)
{
const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
struct ovsdb_monitor_row *change;
- change = ovsdb_monitor_changes_row_find(changes, uuid);
+ change = ovsdb_monitor_changes_row_find(mcst, uuid);
if (!change) {
change = xzalloc(sizeof *change);
- hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
+ hmap_insert(&mcst->rows, &change->hmap_node, uuid_hash(uuid));
change->uuid = *uuid;
change->old = clone_monitor_row_data(mt, old);
change->new = clone_monitor_row_data(mt, new);
@@ -1285,7 +1301,7 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
if (!change->old) {
/* This row was added then deleted. Forget about it. */
- hmap_remove(&changes->rows, &change->hmap_node);
+ hmap_remove(&mcst->rows, &change->hmap_node);
free(change);
}
}
@@ -1343,7 +1359,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
const struct ovsdb_monitor *m = aux->monitor;
struct ovsdb_table *table = new ? new->table : old->table;
struct ovsdb_monitor_table *mt;
- struct ovsdb_monitor_changes *changes;
+ struct ovsdb_monitor_change_set_for_table *mcst;
if (!aux->mt || table != aux->mt->table) {
aux->mt = shash_find_data(&m->tables, table->schema->name);
@@ -1360,9 +1376,9 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
enum ovsdb_monitor_changes_efficacy efficacy =
ovsdb_monitor_changes_classify(type, mt, changed);
- HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
- if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
- ovsdb_monitor_changes_update(old, new, mt, changes);
+ if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
+ LIST_FOR_EACH (mcst, list_in_mt, &mt->change_sets) {
+ ovsdb_monitor_changes_update(old, new, mt, mcst);
}
}
if (aux->efficacy < efficacy) {
@@ -1373,34 +1389,34 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
}
void
-ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
+ovsdb_monitor_get_initial(struct ovsdb_monitor *dbmon,
+ struct ovsdb_monitor_change_set **p_mcs)
{
- struct shash_node *node;
-
- SHASH_FOR_EACH (node, &dbmon->tables) {
- struct ovsdb_monitor_table *mt = node->data;
-
- if (mt->select & OJMS_INITIAL) {
- struct ovsdb_row *row;
- struct ovsdb_monitor_changes *changes;
-
- changes = ovsdb_monitor_table_find_changes(mt, 0);
- if (!changes) {
- changes = ovsdb_monitor_table_add_changes(mt, 0);
- HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
- ovsdb_monitor_changes_update(NULL, row, mt, changes);
+ if (!dbmon->init_change_set) {
+ struct ovsdb_monitor_change_set *change_set =
+ ovsdb_monitor_add_change_set(dbmon, true);
+ dbmon->init_change_set = change_set;
+
+ struct ovsdb_monitor_change_set_for_table *mcst;
+ LIST_FOR_EACH (mcst, list_in_change_set,
+ &change_set->change_set_for_tables) {
+ if (mcst->mt->select & OJMS_INITIAL) {
+ struct ovsdb_row *row;
+ HMAP_FOR_EACH (row, hmap_node, &mcst->mt->table->rows) {
+ ovsdb_monitor_changes_update(NULL, row, mcst->mt, mcst);
}
- } else {
- changes->n_refs++;
}
}
+ } else {
+ dbmon->init_change_set->n_refs++;
}
+ *p_mcs = dbmon->init_change_set;
}
void
ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
- uint64_t unflushed)
+ struct ovsdb_monitor_change_set *change_set)
{
struct jsonrpc_monitor_node *jm;
@@ -1413,10 +1429,8 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
if (jm->jsonrpc_monitor == jsonrpc_monitor) {
/* Release the tracked changes. */
- struct shash_node *node;
- SHASH_FOR_EACH (node, &dbmon->tables) {
- struct ovsdb_monitor_table *mt = node->data;
- ovsdb_monitor_table_untrack_changes(mt, unflushed);
+ if (change_set) {
+ ovsdb_monitor_untrack_change_set(dbmon, change_set);
}
ovs_list_remove(&jm->node);
free(jm);
@@ -1547,15 +1561,16 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
ovsdb_monitor_json_cache_flush(dbmon);
hmap_destroy(&dbmon->json_cache);
+ struct ovsdb_monitor_change_set *cs, *cs_next;
+ LIST_FOR_EACH_SAFE (cs, cs_next, list_node, &dbmon->change_sets) {
+ ovs_list_remove(&cs->list_node);
+ ovsdb_monitor_change_set_destroy(cs);
+ free(cs);
+ }
+
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
- struct ovsdb_monitor_changes *changes, *next;
-
- HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
- hmap_remove(&mt->changes, &changes->hmap_node);
- ovsdb_monitor_changes_destroy(changes);
- }
- hmap_destroy(&mt->changes);
+ ovs_assert(ovs_list_is_empty(&mt->change_sets));
free(mt->columns);
free(mt->columns_index_map);
free(mt);
@@ -1570,24 +1585,17 @@ ovsdb_monitor_commit(struct ovsdb_monitor *m, const struct ovsdb_txn *txn)
struct ovsdb_monitor_aux aux;
ovsdb_monitor_init_aux(&aux, m);
- /* Update ovsdb_monitor's transaction number for
- * each transaction, before calling ovsdb_monitor_change_cb(). */
- m->n_transactions++;
ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
- switch(aux.efficacy) {
- case OVSDB_CHANGES_NO_EFFECT:
- /* The transaction is ignored by the monitor.
- * Roll back the 'n_transactions' as if the transaction
- * has never happened. */
- m->n_transactions--;
- break;
- case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
- /* Nothing. */
- break;
- case OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
- ovsdb_monitor_json_cache_flush(m);
- break;
+ if (aux.efficacy > OVSDB_CHANGES_NO_EFFECT) {
+ /* The transaction is has impact to the monitor.
+ * Reset new_change_set, so that a new change set will be
+ * created for future trackings. */
+ m->new_change_set = NULL;
+
+ if (aux.efficacy == OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE) {
+ ovsdb_monitor_json_cache_flush(m);
+ }
}
}
@@ -56,9 +56,10 @@ struct ovsdb_monitor *ovsdb_monitor_add(struct ovsdb_monitor *);
void ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *,
struct ovsdb_jsonrpc_monitor *);
+struct ovsdb_monitor_change_set;
void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *,
struct ovsdb_jsonrpc_monitor *,
- uint64_t unflushed);
+ struct ovsdb_monitor_change_set *);
void ovsdb_monitor_add_table(struct ovsdb_monitor *,
const struct ovsdb_table *);
@@ -77,16 +78,17 @@ struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *,
bool cond_updated,
struct ovsdb_monitor_session_condition *,
enum ovsdb_monitor_version,
- uint64_t *unflushed_transaction);
+ struct ovsdb_monitor_change_set **p_mcs);
void ovsdb_monitor_table_add_select(struct ovsdb_monitor *,
const struct ovsdb_table *,
enum ovsdb_monitor_selection);
bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *,
- uint64_t next_transaction);
+ struct ovsdb_monitor_change_set *);
-void ovsdb_monitor_get_initial(const struct ovsdb_monitor *);
+void ovsdb_monitor_get_initial(struct ovsdb_monitor *,
+ struct ovsdb_monitor_change_set **);
void ovsdb_monitor_get_memory_usage(struct simap *);