diff mbox series

[ovs-dev,RFC,3/7] ovsdb-monitor: Refactor ovsdb monitor implementation.

Message ID 1548371481-109219-4-git-send-email-hzhou8@ebay.com
State Superseded
Headers show
Series Fast OVSDB resync after restart or failover. | expand

Commit Message

Han Zhou Jan. 24, 2019, 11:11 p.m. UTC
From: Han Zhou <hzhou8@ebay.com>

Current ovsdb monitor maintains pending changes through an incremental
integer to figure out if the set of changes should be flushed. And it
uses number 0 to represent that the change set contains all data for
initial client population.  It is a smart way but it prevents further
extension of the monitoring mechanism to support future use case of
monitoring starting from an arbitory history point. This patch
refactors the structures so that change sets are tracked directly,
instead of through calculated version numbers based on implicite
rules.

Signed-off-by: Han Zhou <hzhou8@ebay.com>
---
 ovsdb/jsonrpc-server.c |  18 ++-
 ovsdb/monitor.c        | 365 +++++++++++++++++++++++++------------------------
 ovsdb/monitor.h        |  10 +-
 3 files changed, 204 insertions(+), 189 deletions(-)
diff mbox series

Patch

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 77f15d9..f9b7c27 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -1216,8 +1216,7 @@  struct ovsdb_jsonrpc_monitor {
     struct ovsdb *db;
     struct json *monitor_id;
     struct ovsdb_monitor *dbmon;
-    uint64_t unflushed;         /* The first transaction that has not been
-                                       flushed to the jsonrpc remote client. */
+    struct ovsdb_monitor_change_set *change_set;
     enum ovsdb_monitor_version version;
     struct ovsdb_monitor_session_condition *condition;/* Session's condition */
 };
@@ -1389,7 +1388,6 @@  ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     if (version == OVSDB_MONITOR_V2) {
         m->condition = ovsdb_monitor_session_condition_create();
     }
-    m->unflushed = 0;
     m->version = version;
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
     m->monitor_id = json_clone(monitor_id);
@@ -1436,7 +1434,7 @@  ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     dbmon = ovsdb_monitor_add(m->dbmon);
     if (dbmon != m->dbmon) {
         /* Found an exisiting dbmon, reuse the current one. */
-        ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+        ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, NULL);
         ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
         m->dbmon = dbmon;
     }
@@ -1446,7 +1444,7 @@  ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         ovsdb_monitor_condition_bind(m->dbmon, m->condition);
     }
 
-    ovsdb_monitor_get_initial(m->dbmon);
+    ovsdb_monitor_get_initial(m->dbmon, &m->change_set);
     json = ovsdb_jsonrpc_monitor_compose_update(m, true);
     json = json ? json : json_object_create();
     return jsonrpc_create_reply(json, request_id);
@@ -1578,7 +1576,7 @@  ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
     struct json *update_json;
 
     update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
-                                    m->condition, m->version, &m->unflushed);
+                                    m->condition, m->version, &m->change_set);
     if (update_json) {
         struct jsonrpc_msg *msg;
         struct json *p;
@@ -1648,12 +1646,12 @@  ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
                                      bool initial)
 {
 
-    if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+    if (!ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) {
         return NULL;
     }
 
     return ovsdb_monitor_get_update(m->dbmon, initial, false,
-                                    m->condition, m->version, &m->unflushed);
+                                    m->condition, m->version, &m->change_set);
 }
 
 static bool
@@ -1662,7 +1660,7 @@  ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
     struct ovsdb_jsonrpc_monitor *m;
 
     HMAP_FOR_EACH (m, node, &s->monitors) {
-        if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+        if (ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) {
             return true;
         }
     }
@@ -1686,7 +1684,7 @@  ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m,
 
     json_destroy(m->monitor_id);
     hmap_remove(&m->session->monitors, &m->node);
-    ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+    ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->change_set);
     ovsdb_monitor_session_condition_destroy(m->condition);
     free(m);
 }
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index bf130ad..d42dddf 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -69,17 +69,31 @@  struct ovsdb_monitor {
     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
     struct ovsdb *db;
-    uint64_t n_transactions;      /* Count number of committed transactions. */
-    struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
-    struct hmap json_cache;       /* Contains "ovsdb_monitor_json_cache_node"s.*/
+
+    /* Contains "ovsdb_monitor_change_set". Each change set contains changes
+     * from some start point up to the latest committed transaction. There can
+     * be different change sets for the same struct ovsdb_monitor because there
+     * are different clients pending on changes starting from different points.
+     * The different change sets are maintained as a list. */
+    struct ovs_list change_sets;
+
+    /* The new change set that is to be populated for future transactions. */
+    struct ovsdb_monitor_change_set *new_change_set;
+
+    /* The change set that starts from the first transaction of the DB, which
+     * is used for populating the initial data for new clients. */
+    struct ovsdb_monitor_change_set *init_change_set;
+
+    struct hmap_node hmap_node; /* Elements within ovsdb_monitors.  */
+    struct hmap json_cache;     /* Contains "ovsdb_monitor_json_cache_node"s.*/
 };
 
-/* A json object of updates between 'from_txn' and 'dbmon->n_transactions'
- * inclusive.  */
+/* A json object of updates for the ovsdb_monitor_change_set and the given
+ * monitor version. */
 struct ovsdb_monitor_json_cache_node {
     struct hmap_node hmap_node;   /* Elements in json cache. */
     enum ovsdb_monitor_version version;
-    uint64_t from_txn;
+    struct ovsdb_monitor_change_set *change_set;
     struct json *json;            /* Null, or a cloned of json */
 };
 
@@ -97,29 +111,45 @@  struct ovsdb_monitor_column {
 
 /* A row that has changed in a monitored table. */
 struct ovsdb_monitor_row {
-    struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
+    struct hmap_node hmap_node; /* In ovsdb_monitor_change_set_for_table. */
     struct uuid uuid;           /* UUID of row that changed. */
     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
 };
 
-/* Contains 'struct ovsdb_monitor_row's for rows that have been
- * updated but not yet flushed to all the jsonrpc connection.
+/* Contains a set of changes that are not yet flushed to all the jsonrpc
+ * connections.
  *
- * 'n_refs' represent the number of jsonrpc connections that have
- * not received updates. Generate the update for the last jsonprc
- * connection will also destroy the whole "struct ovsdb_monitor_changes"
- * object.
- *
- * 'transaction' stores the first update's transaction id.
- * */
-struct ovsdb_monitor_changes {
-    struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
-                                    hmap.  */
+ * 'n_refs' represent the number of jsonrpc connections that depend on this
+ * change set (have not received updates). Generate the update for the last
+ * jsonprc connection will also destroy the whole "struct
+ * ovsdb_monitor_change_set" object.
+ */
+struct ovsdb_monitor_change_set {
+    /* Element in change_sets of ovsdb_monitor. */
+    struct ovs_list list_node;
+
+    /* Contains struct ovsdb_monitor_change_set_for_table. */
+    struct ovs_list change_set_for_tables;
+
+    int n_refs;
+};
+
+/* Contains 'struct ovsdb_monitor_row's for rows in a specific table
+ * of struct ovsdb_monitor_change_set. It can also be searched from
+ * member 'change_sets' of struct ovsdb_monitor_table. */
+struct ovsdb_monitor_change_set_for_table {
+    /* Element in ovsdb_monitor_tables' change_sets list. */
+    struct ovs_list list_in_mt;
+
+    /* Element in ovsdb_monitor_change_sets' change_set_for_tables list. */
+    struct ovs_list list_in_change_set;
+
     struct ovsdb_monitor_table *mt;
+    struct ovsdb_monitor_change_set *mcs;
+
+    /* Contains struct ovsdb_monitor_row. */
     struct hmap rows;
-    int n_refs;
-    uint64_t transaction;
 };
 
 /* A particular table being monitored. */
@@ -141,8 +171,8 @@  struct ovsdb_monitor_table {
      * ovsdb_monitor_row. It is used for condition evaluation. */
     unsigned int *columns_index_map;
 
-    /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
-    struct hmap changes;
+    /* Contains 'ovsdb_monitor_change_set_for_table'. */
+    struct ovs_list change_sets;
 };
 
 enum ovsdb_monitor_row_type {
@@ -159,22 +189,20 @@  typedef struct json *
      bool initial, unsigned long int *changed);
 
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *);
-static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
-    struct ovsdb_monitor_table *, uint64_t next_txn);
-static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
-    struct ovsdb_monitor_table *, uint64_t unflushed);
-static void ovsdb_monitor_changes_destroy(
-                                  struct ovsdb_monitor_changes *);
-static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *,
-                                  uint64_t unflushed);
+static struct ovsdb_monitor_change_set * ovsdb_monitor_add_change_set(
+        struct ovsdb_monitor *, bool init_only);
+static void ovsdb_monitor_change_set_destroy(
+        struct ovsdb_monitor_change_set *);
+static void ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *);
 
 static uint32_t
-json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
+json_cache_hash(enum ovsdb_monitor_version version,
+                struct ovsdb_monitor_change_set *change_set)
 {
     uint32_t hash;
 
     hash = hash_uint64(version);
-    hash = hash_uint64_basis(from_txn, hash);
+    hash = hash_pointer(change_set, hash);
 
     return hash;
 }
@@ -182,13 +210,13 @@  json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn)
 static struct ovsdb_monitor_json_cache_node *
 ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
                                 enum ovsdb_monitor_version version,
-                                uint64_t from_txn)
+                                struct ovsdb_monitor_change_set *change_set)
 {
     struct ovsdb_monitor_json_cache_node *node;
-    uint32_t hash = json_cache_hash(version, from_txn);
+    uint32_t hash = json_cache_hash(version, change_set);
 
     HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) {
-        if (node->from_txn == from_txn && node->version == version) {
+        if (node->change_set == change_set && node->version == version) {
             return node;
         }
     }
@@ -199,15 +227,16 @@  ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon,
 static void
 ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon,
                                 enum ovsdb_monitor_version version,
-                                uint64_t from_txn, struct json *json)
+                                struct ovsdb_monitor_change_set *change_set,
+                                struct json *json)
 {
     struct ovsdb_monitor_json_cache_node *node;
-    uint32_t hash = json_cache_hash(version, from_txn);
+    uint32_t hash = json_cache_hash(version, change_set);
 
     node = xmalloc(sizeof *node);
 
     node->version = version;
-    node->from_txn = from_txn;
+    node->change_set = change_set;
     node->json = json ? json_clone(json) : NULL;
 
     hmap_insert(&dbmon->json_cache, &node->hmap_node, hash);
@@ -241,8 +270,9 @@  compare_ovsdb_monitor_column(const void *a_, const void *b_)
 /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
  * given 'uuid', or NULL if there is no such row. */
 static struct ovsdb_monitor_row *
-ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
-                               const struct uuid *uuid)
+ovsdb_monitor_changes_row_find(
+        const struct ovsdb_monitor_change_set_for_table *changes,
+        const struct uuid *uuid)
 {
     struct ovsdb_monitor_row *row;
 
@@ -375,7 +405,7 @@  ovsdb_monitor_create(struct ovsdb *db,
     ovs_list_push_back(&db->monitors, &dbmon->list_node);
     ovs_list_init(&dbmon->jsonrpc_monitors);
     dbmon->db = db;
-    dbmon->n_transactions = 0;
+    ovs_list_init(&dbmon->change_sets);
     shash_init(&dbmon->tables);
     hmap_node_nullify(&dbmon->hmap_node);
     hmap_init(&dbmon->json_cache);
@@ -395,7 +425,7 @@  ovsdb_monitor_add_table(struct ovsdb_monitor *m,
     mt = xzalloc(sizeof *mt);
     mt->table = table;
     shash_add(&m->tables, table->schema->name, mt);
-    hmap_init(&mt->changes);
+    ovs_list_init(&mt->change_sets);
     mt->columns_index_map =
         xmalloc(sizeof *mt->columns_index_map * n_columns);
     for (i = 0; i < n_columns; i++) {
@@ -481,81 +511,86 @@  ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
     return shash_find_data(&m->tables, table->schema->name);
 }
 
-static struct ovsdb_monitor_changes *
-ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
-                                uint64_t next_txn)
-{
-    struct ovsdb_monitor_changes *changes;
-
-    changes = xzalloc(sizeof *changes);
-
-    changes->transaction = next_txn;
-    changes->mt = mt;
-    changes->n_refs = 1;
-    hmap_init(&changes->rows);
-    hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
-
-    return changes;
-};
-
-static struct ovsdb_monitor_changes *
-ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
-                                 uint64_t transaction)
+static struct ovsdb_monitor_change_set *
+ovsdb_monitor_add_change_set(struct ovsdb_monitor *dbmon,
+                             bool init_only)
 {
-    struct ovsdb_monitor_changes *changes;
-    size_t hash = hash_uint64(transaction);
+    struct ovsdb_monitor_change_set *change_set = xzalloc(sizeof *change_set);
+    ovs_list_push_back(&(dbmon->change_sets), &change_set->list_node);
+    ovs_list_init(&change_set->change_set_for_tables);
+    change_set->n_refs = 1;
 
-    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
-        if (changes->transaction == transaction) {
-            return changes;
+    struct shash_node *node;
+    SHASH_FOR_EACH (node, &dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
+        if (!init_only || (mt->select & OJMS_INITIAL)) {
+            struct ovsdb_monitor_change_set_for_table *mcst =
+                xzalloc(sizeof *mcst);
+            mcst->mt = mt;
+            mcst->mcs = change_set;
+            hmap_init(&mcst->rows);
+            ovs_list_push_back(&mt->change_sets, &mcst->list_in_mt);
+            ovs_list_push_back(&change_set->change_set_for_tables,
+                               &mcst->list_in_change_set);
         }
     }
 
-    return NULL;
-}
+    return change_set;
+};
 
 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
 static void
-ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
-                                    uint64_t transaction)
+ovsdb_monitor_untrack_change_set(struct ovsdb_monitor *dbmon,
+                                 struct ovsdb_monitor_change_set *mcs)
 {
-    struct ovsdb_monitor_changes *changes =
-                ovsdb_monitor_table_find_changes(mt, transaction);
-    if (changes) {
-        if (--changes->n_refs == 0) {
-            hmap_remove(&mt->changes, &changes->hmap_node);
-            ovsdb_monitor_changes_destroy(changes);
+    ovs_assert(mcs);
+    if (--mcs->n_refs == 0) {
+        ovs_list_remove(&mcs->list_node);
+        if (mcs == dbmon->init_change_set) {
+            dbmon->init_change_set = NULL;
+        } else if (mcs == dbmon->new_change_set) {
+            dbmon->new_change_set = NULL;
         }
+        ovsdb_monitor_change_set_destroy(mcs);
     }
 }
 
 /* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
  */
 static void
-ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
-                                  uint64_t transaction)
+ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *dbmon)
 {
-    struct ovsdb_monitor_changes *changes;
+    struct ovsdb_monitor_change_set *change_set = dbmon->new_change_set;
 
-    changes = ovsdb_monitor_table_find_changes(mt, transaction);
-    if (changes) {
-        changes->n_refs++;
+    if (change_set) {
+        change_set->n_refs++;
     } else {
-        ovsdb_monitor_table_add_changes(mt, transaction);
+        change_set = ovsdb_monitor_add_change_set(dbmon, false);
+        dbmon->new_change_set = change_set;
     }
 }
 
 static void
-ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
+ovsdb_monitor_change_set_destroy(struct ovsdb_monitor_change_set *mcs)
 {
-    struct ovsdb_monitor_row *row, *next;
+    ovs_list_remove(&mcs->list_node);
+
+    struct ovsdb_monitor_change_set_for_table *mcst, *next_mcst;
+    LIST_FOR_EACH_SAFE (mcst, next_mcst, list_in_change_set,
+                        &mcs->change_set_for_tables) {
+        ovs_list_remove(&mcst->list_in_change_set);
+        ovs_list_remove(&mcst->list_in_mt);
 
-    HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
-        hmap_remove(&changes->rows, &row->hmap_node);
-        ovsdb_monitor_row_destroy(changes->mt, row);
+        struct ovsdb_monitor_row *row, *next;
+        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
+            hmap_remove(&mcst->rows, &row->hmap_node);
+            ovsdb_monitor_row_destroy(mcst->mt, row);
+        }
+        hmap_destroy(&mcst->rows);
+
+        free(mcst);
     }
-    hmap_destroy(&changes->rows);
-    free(changes);
+    free(mcs);
 }
 
 static enum ovsdb_monitor_selection
@@ -1008,28 +1043,22 @@  ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
 static struct json*
 ovsdb_monitor_compose_update(
                       struct ovsdb_monitor *dbmon,
-                      bool initial, uint64_t transaction,
+                      bool initial, struct ovsdb_monitor_change_set *mcs,
                       const struct ovsdb_monitor_session_condition *condition,
                       compose_row_update_cb_func row_update)
 {
-    struct shash_node *node;
     struct json *json;
     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
 
     json = NULL;
-    SHASH_FOR_EACH (node, &dbmon->tables) {
-        struct ovsdb_monitor_table *mt = node->data;
+    struct ovsdb_monitor_change_set_for_table *mcst;
+    LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
         struct ovsdb_monitor_row *row, *next;
-        struct ovsdb_monitor_changes *changes;
         struct json *table_json = NULL;
+        struct ovsdb_monitor_table *mt = mcst->mt;
 
-        changes = ovsdb_monitor_table_find_changes(mt, transaction);
-        if (!changes) {
-            continue;
-        }
-
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
             struct json *row_json;
             row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
                                      initial, changed);
@@ -1105,39 +1134,37 @@  ovsdb_monitor_get_update(
              bool initial, bool cond_updated,
              struct ovsdb_monitor_session_condition *condition,
              enum ovsdb_monitor_version version,
-             uint64_t *unflushed_)
+             struct ovsdb_monitor_change_set **p_mcs)
 {
     struct ovsdb_monitor_json_cache_node *cache_node = NULL;
-    struct shash_node *node;
     struct json *json;
-    const uint64_t unflushed = *unflushed_;
-    const uint64_t next_unflushed = dbmon->n_transactions + 1;
+    struct ovsdb_monitor_change_set *mcs = *p_mcs;
 
-    ovs_assert(cond_updated ? unflushed == next_unflushed : true);
+    ovs_assert(cond_updated ? mcs == dbmon->new_change_set : true);
 
     /* Return a clone of cached json if one exists. Otherwise,
      * generate a new one and add it to the cache.  */
     if (!condition || (!condition->conditional && !cond_updated)) {
         cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
-                                                     unflushed);
+                                                     mcs);
     }
     if (cache_node) {
         json = cache_node->json ? json_clone(cache_node->json) : NULL;
     } else {
         if (version == OVSDB_MONITOR_V1) {
             json =
-               ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+               ovsdb_monitor_compose_update(dbmon, initial, mcs,
                                             condition,
                                             ovsdb_monitor_compose_row_update);
         } else {
             ovs_assert(version == OVSDB_MONITOR_V2);
             if (!cond_updated) {
-                json = ovsdb_monitor_compose_update(dbmon, initial, unflushed,
+                json = ovsdb_monitor_compose_update(dbmon, initial, mcs,
                                             condition,
                                             ovsdb_monitor_compose_row_update2);
 
                 if (!condition || !condition->conditional) {
-                    ovsdb_monitor_json_cache_insert(dbmon, version, unflushed,
+                    ovsdb_monitor_json_cache_insert(dbmon, version, mcs,
                                                     json);
                 }
             } else {
@@ -1149,24 +1176,20 @@  ovsdb_monitor_get_update(
         }
     }
 
-    /* Maintain transaction id of 'changes'. */
-    SHASH_FOR_EACH (node, &dbmon->tables) {
-        struct ovsdb_monitor_table *mt = node->data;
-
-        ovsdb_monitor_table_untrack_changes(mt, unflushed);
-        ovsdb_monitor_table_track_changes(mt, next_unflushed);
-    }
-    *unflushed_ = next_unflushed;
+    /* Maintain tracking change set. */
+    ovsdb_monitor_untrack_change_set(dbmon, mcs);
+    ovsdb_monitor_track_new_change_set(dbmon);
+    *p_mcs = dbmon->new_change_set;
 
     return json;
 }
 
 bool
 ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
-                          uint64_t next_transaction)
+                          struct ovsdb_monitor_change_set *change_set)
 {
-    ovs_assert(next_transaction <= dbmon->n_transactions + 1);
-    return (next_transaction <= dbmon->n_transactions);
+    ovs_assert(change_set);
+    return (change_set != dbmon->new_change_set);
 }
 
 void
@@ -1225,15 +1248,15 @@  static void
 ovsdb_monitor_changes_update(const struct ovsdb_row *old,
                              const struct ovsdb_row *new,
                              const struct ovsdb_monitor_table *mt,
-                             struct ovsdb_monitor_changes *changes)
+                             struct ovsdb_monitor_change_set_for_table *mcst)
 {
     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
     struct ovsdb_monitor_row *change;
 
-    change = ovsdb_monitor_changes_row_find(changes, uuid);
+    change = ovsdb_monitor_changes_row_find(mcst, uuid);
     if (!change) {
         change = xzalloc(sizeof *change);
-        hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
+        hmap_insert(&mcst->rows, &change->hmap_node, uuid_hash(uuid));
         change->uuid = *uuid;
         change->old = clone_monitor_row_data(mt, old);
         change->new = clone_monitor_row_data(mt, new);
@@ -1285,7 +1308,7 @@  ovsdb_monitor_changes_update(const struct ovsdb_row *old,
 
             if (!change->old) {
                 /* This row was added then deleted.  Forget about it. */
-                hmap_remove(&changes->rows, &change->hmap_node);
+                hmap_remove(&mcst->rows, &change->hmap_node);
                 free(change);
             }
         }
@@ -1343,7 +1366,7 @@  ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     const struct ovsdb_monitor *m = aux->monitor;
     struct ovsdb_table *table = new ? new->table : old->table;
     struct ovsdb_monitor_table *mt;
-    struct ovsdb_monitor_changes *changes;
+    struct ovsdb_monitor_change_set_for_table *mcst;
 
     if (!aux->mt || table != aux->mt->table) {
         aux->mt = shash_find_data(&m->tables, table->schema->name);
@@ -1360,9 +1383,9 @@  ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     enum ovsdb_monitor_changes_efficacy efficacy =
         ovsdb_monitor_changes_classify(type, mt, changed);
 
-    HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
+    LIST_FOR_EACH (mcst, list_in_mt, &mt->change_sets) {
         if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
-            ovsdb_monitor_changes_update(old, new, mt, changes);
+            ovsdb_monitor_changes_update(old, new, mt, mcst);
         }
     }
     if (aux->efficacy < efficacy) {
@@ -1373,34 +1396,34 @@  ovsdb_monitor_change_cb(const struct ovsdb_row *old,
 }
 
 void
-ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
+ovsdb_monitor_get_initial(struct ovsdb_monitor *dbmon,
+                          struct ovsdb_monitor_change_set **p_mcs)
 {
-    struct shash_node *node;
-
-    SHASH_FOR_EACH (node, &dbmon->tables) {
-        struct ovsdb_monitor_table *mt = node->data;
-
-        if (mt->select & OJMS_INITIAL) {
-            struct ovsdb_row *row;
-            struct ovsdb_monitor_changes *changes;
-
-            changes = ovsdb_monitor_table_find_changes(mt, 0);
-            if (!changes) {
-                changes = ovsdb_monitor_table_add_changes(mt, 0);
-                HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
-                    ovsdb_monitor_changes_update(NULL, row, mt, changes);
+    if (!dbmon->init_change_set) {
+        struct ovsdb_monitor_change_set *change_set =
+            ovsdb_monitor_add_change_set(dbmon, true);
+        dbmon->init_change_set = change_set;
+
+        struct ovsdb_monitor_change_set_for_table *mcst;
+        LIST_FOR_EACH (mcst, list_in_change_set,
+                       &change_set->change_set_for_tables) {
+            if (mcst->mt->select & OJMS_INITIAL) {
+                struct ovsdb_row *row;
+                HMAP_FOR_EACH (row, hmap_node, &mcst->mt->table->rows) {
+                    ovsdb_monitor_changes_update(NULL, row, mcst->mt, mcst);
                 }
-            } else {
-                changes->n_refs++;
             }
         }
+    } else {
+        dbmon->init_change_set->n_refs++;
     }
+    *p_mcs = dbmon->init_change_set;
 }
 
 void
 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
-                   uint64_t unflushed)
+                   struct ovsdb_monitor_change_set *change_set)
 {
     struct jsonrpc_monitor_node *jm;
 
@@ -1413,10 +1436,8 @@  ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
             /* Release the tracked changes. */
-            struct shash_node *node;
-            SHASH_FOR_EACH (node, &dbmon->tables) {
-                struct ovsdb_monitor_table *mt = node->data;
-                ovsdb_monitor_table_untrack_changes(mt, unflushed);
+            if (change_set) {
+                ovsdb_monitor_untrack_change_set(dbmon, change_set);
             }
             ovs_list_remove(&jm->node);
             free(jm);
@@ -1547,15 +1568,16 @@  ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
     ovsdb_monitor_json_cache_flush(dbmon);
     hmap_destroy(&dbmon->json_cache);
 
+    struct ovsdb_monitor_change_set *cs, *cs_next;
+    LIST_FOR_EACH_SAFE (cs, cs_next, list_node, &dbmon->change_sets) {
+        ovs_list_remove(&cs->list_node);
+        ovsdb_monitor_change_set_destroy(cs);
+        free(cs);
+    }
+
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
-        struct ovsdb_monitor_changes *changes, *next;
-
-        HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
-            hmap_remove(&mt->changes, &changes->hmap_node);
-            ovsdb_monitor_changes_destroy(changes);
-        }
-        hmap_destroy(&mt->changes);
+        ovs_assert(ovs_list_is_empty(&mt->change_sets));
         free(mt->columns);
         free(mt->columns_index_map);
         free(mt);
@@ -1570,24 +1592,17 @@  ovsdb_monitor_commit(struct ovsdb_monitor *m, const struct ovsdb_txn *txn)
     struct ovsdb_monitor_aux aux;
 
     ovsdb_monitor_init_aux(&aux, m);
-    /* Update ovsdb_monitor's transaction number for
-     * each transaction, before calling ovsdb_monitor_change_cb().  */
-    m->n_transactions++;
     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
 
-    switch(aux.efficacy) {
-    case OVSDB_CHANGES_NO_EFFECT:
-        /* The transaction is ignored by the monitor.
-         * Roll back the 'n_transactions' as if the transaction
-         * has never happened. */
-        m->n_transactions--;
-        break;
-    case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE:
-        /* Nothing.  */
-        break;
-    case  OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE:
-        ovsdb_monitor_json_cache_flush(m);
-        break;
+    if (aux.efficacy > OVSDB_CHANGES_NO_EFFECT) {
+        /* The transaction is has impact to the monitor.
+         * Reset new_change_set, so that a new change set will be
+         * created for future trackings. */
+        m->new_change_set = NULL;
+
+        if (aux.efficacy == OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE) {
+            ovsdb_monitor_json_cache_flush(m);
+        }
     }
 }
 
diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
index 9bc0613..112c672 100644
--- a/ovsdb/monitor.h
+++ b/ovsdb/monitor.h
@@ -56,9 +56,10 @@  struct ovsdb_monitor *ovsdb_monitor_add(struct ovsdb_monitor *);
 void ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *,
                        struct ovsdb_jsonrpc_monitor *);
 
+struct ovsdb_monitor_change_set;
 void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *,
                                struct ovsdb_jsonrpc_monitor *,
-                               uint64_t unflushed);
+                               struct ovsdb_monitor_change_set *);
 
 void ovsdb_monitor_add_table(struct ovsdb_monitor *,
                              const struct ovsdb_table *);
@@ -77,16 +78,17 @@  struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *,
                                       bool cond_updated,
                                       struct ovsdb_monitor_session_condition *,
                                       enum ovsdb_monitor_version,
-                                      uint64_t *unflushed_transaction);
+                                      struct ovsdb_monitor_change_set **p_mcs);
 
 void ovsdb_monitor_table_add_select(struct ovsdb_monitor *,
                                     const struct ovsdb_table *,
                                     enum ovsdb_monitor_selection);
 
 bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *,
-                               uint64_t next_transaction);
+                               struct ovsdb_monitor_change_set *);
 
-void ovsdb_monitor_get_initial(const struct ovsdb_monitor *);
+void ovsdb_monitor_get_initial(struct ovsdb_monitor *,
+                               struct ovsdb_monitor_change_set **);
 
 void ovsdb_monitor_get_memory_usage(struct simap *);