[ovs-dev,07/15] ovsdb-idl: Break out database-specific stuff into new data structure.

Message ID 20180101051640.13043-7-blp@ovn.org
State New
Headers show
Series
  • [ovs-dev,01/15] log: Add async commit support.
Related show

Commit Message

Ben Pfaff Jan. 1, 2018, 5:16 a.m.
Until now, a given ovsdb-idl instances has only monitored a single
database.  In an upcoming commit, it will grow to also monitor a second
database that represents the state of the database server itself.  Much of
the work is the same for both databases, so this commit breaks the common
code and data out into new data structures and functions.

Signed-off-by: Ben Pfaff <blp@ovn.org>
---
 lib/ovsdb-idl-provider.h |    2 +-
 lib/ovsdb-idl.c          | 1094 ++++++++++++++++++++++++++--------------------
 lib/ovsdb-idl.h          |    2 +-
 3 files changed, 620 insertions(+), 478 deletions(-)

Patch

diff --git a/lib/ovsdb-idl-provider.h b/lib/ovsdb-idl-provider.h
index b0ebed44f83a..0337303511f0 100644
--- a/lib/ovsdb-idl-provider.h
+++ b/lib/ovsdb-idl-provider.h
@@ -115,7 +115,7 @@  struct ovsdb_idl_table {
                               * for replication. */
     struct shash columns;    /* Contains "const struct ovsdb_idl_column *"s. */
     struct hmap rows;        /* Contains "struct ovsdb_idl_row"s. */
-    struct ovsdb_idl *idl;   /* Containing idl. */
+    struct ovsdb_idl_db *db; /* Containing db. */
     unsigned int change_seqno[OVSDB_IDL_CHANGE_MAX];
     struct shash indexes;    /* Contains "struct ovsdb_idl_index"s */
     struct ovs_list track_list; /* Tracked rows (ovsdb_idl_row.track_node). */
diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
index 29f893116aee..24ba5b50fddc 100644
--- a/lib/ovsdb-idl.c
+++ b/lib/ovsdb-idl.c
@@ -117,12 +117,55 @@  enum ovsdb_idl_state {
     IDL_S_NO_SCHEMA
 };
 
-struct ovsdb_idl {
+struct ovsdb_idl_db {
+    struct ovsdb_idl *idl;
+
     /* Data. */
     const struct ovsdb_idl_class *class_;
     struct shash table_by_name; /* Contains "struct ovsdb_idl_table *"s.*/
     struct ovsdb_idl_table *tables; /* Array of ->class_->n_tables elements. */
+    struct json *monitor_id;
     unsigned int change_seqno;
+    struct ovsdb_idl_txn *txn;
+    struct hmap outstanding_txns;
+    bool verify_write_only;
+    struct json *schema;
+
+    /* True if any of the tables' monitoring conditions has changed. */
+    bool cond_changed;
+
+    unsigned int cond_seqno;   /* Keep track of condition clauses changes
+                                  over a single conditional monitoring session.
+                                  Reverts to zero when idl session
+                                  reconnects.  */
+
+    /* Database locking. */
+    char *lock_name;            /* Name of lock we need, NULL if none. */
+    bool has_lock;              /* Has db server told us we have the lock? */
+    bool is_lock_contended;     /* Has db server told us we can't get lock? */
+    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
+};
+
+static void ovsdb_idl_db_track_clear(struct ovsdb_idl_db *);
+static void ovsdb_idl_db_add_column(struct ovsdb_idl_db *,
+                                    const struct ovsdb_idl_column *);
+static void ovsdb_idl_db_omit(struct ovsdb_idl_db *,
+                              const struct ovsdb_idl_column *);
+static void ovsdb_idl_db_omit_alert(struct ovsdb_idl_db *,
+                                    const struct ovsdb_idl_column *);
+static unsigned int ovsdb_idl_db_set_condition(
+    struct ovsdb_idl_db *, const struct ovsdb_idl_table_class *,
+    const struct ovsdb_idl_condition *);
+
+static void ovsdb_idl_send_schema_request(struct ovsdb_idl *,
+                                          struct ovsdb_idl_db *);
+static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *,
+                                           struct ovsdb_idl_db *,
+                                           bool use_monitor_cond);
+
+struct ovsdb_idl {
+    struct ovsdb_idl_db server;
+    struct ovsdb_idl_db db;     /* XXX rename 'data'? */
 
     /* Session state.
      *
@@ -134,31 +177,14 @@  struct ovsdb_idl {
     enum ovsdb_idl_state state;      /* Current session state. */
     unsigned int state_seqno;        /* See above. */
     struct json *request_id;         /* JSON ID for request awaiting reply. */
-    struct json *schema;             /* Temporary copy of database schema. */
 
-    /* Database locking. */
-    char *lock_name;            /* Name of lock we need, NULL if none. */
-    bool has_lock;              /* Has db server told us we have the lock? */
-    bool is_lock_contended;     /* Has db server told us we can't get lock? */
-    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
-
-    /* Transaction support. */
-    struct ovsdb_idl_txn *txn;
-    struct hmap outstanding_txns;
-    bool verify_write_only;
-
-    /* Conditional monitoring. */
-    bool cond_changed;
-    unsigned int cond_seqno;   /* Keep track of condition clauses changes
-                                  over a single conditional monitoring session.
-                                  Reverts to zero when idl session
-                                  reconnects.  */
+    bool use_monitor_cond;
 };
 
 struct ovsdb_idl_txn {
     struct hmap_node hmap_node;
     struct json *request_id;
-    struct ovsdb_idl *idl;
+    struct ovsdb_idl_db *db;
     struct hmap txn_rows;
     enum ovsdb_idl_txn_status status;
     char *error;
@@ -184,30 +210,19 @@  struct ovsdb_idl_txn_insert {
     struct uuid real;           /* Real UUID used by database server. */
 };
 
-enum ovsdb_update_version {
-    OVSDB_UPDATE,               /* RFC 7047 "update" method. */
-    OVSDB_UPDATE2               /* "update2" Extension to RFC 7047.
-                                   See ovsdb-server(1) for more information. */
-};
-
-/* Name arrays indexed by 'enum ovsdb_update_version'. */
-static const char *table_updates_names[] = {"table_updates", "table_updates2"};
-static const char *table_update_names[] = {"table_update", "table_update2"};
-static const char *row_update_names[] = {"row_update", "row_update2"};
-
 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct vlog_rate_limit other_rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
 static void ovsdb_idl_clear(struct ovsdb_idl *);
-static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
-static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
-static void ovsdb_idl_send_monitor_cond_request(struct ovsdb_idl *);
-static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *,
-                                   enum ovsdb_update_version);
-static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
-                                                    const struct json *,
-                                                    enum ovsdb_update_version);
+static void ovsdb_idl_db_parse_monitor_reply(struct ovsdb_idl_db *,
+                                             const struct json *result,
+                                             bool is_monitor_cond);
+static bool ovsdb_idl_db_parse_update_rpc(struct ovsdb_idl_db *,
+                                          const struct jsonrpc_msg *);
+static void ovsdb_idl_db_parse_update(struct ovsdb_idl_db *,
+                                      const struct json *table_updates,
+                                      bool is_monitor_cond);
 static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
                                      const struct uuid *,
                                      const struct json *old,
@@ -228,7 +243,7 @@  static struct ovsdb_idl_row *ovsdb_idl_row_create__(
 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
                                                   const struct uuid *);
 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
-static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *);
+static void ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl_db *);
 static void ovsdb_idl_destroy_all_map_op_lists(struct ovsdb_idl_row *);
 static void ovsdb_idl_destroy_all_set_op_lists(struct ovsdb_idl_row *);
 
@@ -239,8 +254,8 @@  static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
 static void ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *, bool destroy_dsts);
 
 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
-static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
-                                        const struct jsonrpc_msg *msg);
+static bool ovsdb_idl_db_txn_process_reply(struct ovsdb_idl_db *,
+                                           const struct jsonrpc_msg *msg);
 static bool ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *,
                                             struct json *);
 static void ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *,
@@ -252,13 +267,20 @@  static void ovsdb_idl_txn_add_set_op(struct ovsdb_idl_row *,
                                      struct ovsdb_datum *,
                                      enum set_op_type);
 
-static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
-static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
-static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
-                                       const struct json *);
-static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
-                                        const struct json *params,
-                                        bool new_has_lock);
+static bool ovsdb_idl_db_process_lock_replies(struct ovsdb_idl_db *,
+                                              const struct jsonrpc_msg *);
+static struct jsonrpc_msg *ovsdb_idl_db_compose_lock_request(
+    struct ovsdb_idl_db *);
+static struct jsonrpc_msg *ovsdb_idl_db_compose_unlock_request(
+    struct ovsdb_idl_db *);
+static void ovsdb_idl_db_parse_lock_reply(struct ovsdb_idl_db *,
+                                          const struct json *);
+static bool ovsdb_idl_db_parse_lock_notify(struct ovsdb_idl_db *,
+                                           const struct json *params,
+                                           bool new_has_lock);
+static struct ovsdb_idl_table *
+ovsdb_idl_db_table_from_class(const struct ovsdb_idl_db *,
+                              const struct ovsdb_idl_table_class *);
 static struct ovsdb_idl_table *
 ovsdb_idl_table_from_class(const struct ovsdb_idl *,
                            const struct ovsdb_idl_table_class *);
@@ -273,6 +295,51 @@  static void
 static void ovsdb_idl_add_to_indexes(const struct ovsdb_idl_row *);
 static void ovsdb_idl_remove_from_indexes(const struct ovsdb_idl_row *);
 
+static void
+ovsdb_idl_db_init(struct ovsdb_idl_db *db, const struct ovsdb_idl_class *class,
+                  struct ovsdb_idl *parent, bool monitor_everything_by_default)
+{
+    memset(db, 0, sizeof *db);
+
+    uint8_t default_mode = (monitor_everything_by_default
+                            ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
+                            : 0);
+
+    db->idl = parent;
+    db->class_ = class;
+    shash_init(&db->table_by_name);
+    db->tables = xmalloc(class->n_tables * sizeof *db->tables);
+    for (size_t i = 0; i < class->n_tables; i++) {
+        const struct ovsdb_idl_table_class *tc = &class->tables[i];
+        struct ovsdb_idl_table *table = &db->tables[i];
+
+        shash_add_assert(&db->table_by_name, tc->name, table);
+        table->class_ = tc;
+        table->modes = xmalloc(tc->n_columns);
+        memset(table->modes, default_mode, tc->n_columns);
+        table->need_table = false;
+        shash_init(&table->columns);
+        shash_init(&table->indexes);
+        for (size_t j = 0; j < tc->n_columns; j++) {
+            const struct ovsdb_idl_column *column = &tc->columns[j];
+
+            shash_add_assert(&table->columns, column->name, column);
+        }
+        hmap_init(&table->rows);
+        ovs_list_init(&table->track_list);
+        table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
+            = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
+            = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
+        table->db = db;
+        ovsdb_idl_condition_init(&table->condition);
+        ovsdb_idl_condition_add_clause_true(&table->condition);
+        table->cond_changed = false;
+    }
+    db->monitor_id = json_array_create_2(json_string_create("monid"),
+                                         json_string_create(class->database));
+    hmap_init(&db->outstanding_txns);
+}
+
 /* Creates and returns a connection to database 'remote', which should be in a
  * form acceptable to jsonrpc_session_open().  The connection will maintain an
  * in-memory replica of the remote database whose schema is described by
@@ -296,53 +363,12 @@  ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
                  bool monitor_everything_by_default, bool retry)
 {
     struct ovsdb_idl *idl;
-    uint8_t default_mode;
-    size_t i;
-
-    default_mode = (monitor_everything_by_default
-                    ? OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT
-                    : 0);
 
     idl = xzalloc(sizeof *idl);
-    idl->class_ = class;
+    ovsdb_idl_db_init(&idl->db, class, idl, monitor_everything_by_default);
     idl->session = jsonrpc_session_open(remote, retry);
-    shash_init(&idl->table_by_name);
-    idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
-    for (i = 0; i < class->n_tables; i++) {
-        const struct ovsdb_idl_table_class *tc = &class->tables[i];
-        struct ovsdb_idl_table *table = &idl->tables[i];
-        size_t j;
-
-        shash_add_assert(&idl->table_by_name, tc->name, table);
-        table->class_ = tc;
-        table->modes = xmalloc(tc->n_columns);
-        memset(table->modes, default_mode, tc->n_columns);
-        table->need_table = false;
-        shash_init(&table->columns);
-        shash_init(&table->indexes);
-        for (j = 0; j < tc->n_columns; j++) {
-            const struct ovsdb_idl_column *column = &tc->columns[j];
-
-            shash_add_assert(&table->columns, column->name, column);
-        }
-        hmap_init(&table->rows);
-        ovs_list_init(&table->track_list);
-        table->change_seqno[OVSDB_IDL_CHANGE_INSERT]
-            = table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]
-            = table->change_seqno[OVSDB_IDL_CHANGE_DELETE] = 0;
-        table->idl = idl;
-        ovsdb_idl_condition_init(&table->condition);
-        ovsdb_idl_condition_add_clause_true(&table->condition);
-        table->cond_changed = false;
-    }
-
-    idl->cond_changed = false;
-    idl->cond_seqno = 0;
     idl->state_seqno = UINT_MAX;
     idl->request_id = NULL;
-    idl->schema = NULL;
-
-    hmap_init(&idl->outstanding_txns);
 
     return idl;
 }
@@ -353,51 +379,54 @@  ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote,
                      bool retry)
 {
     if (idl) {
-        ovs_assert(!idl->txn);
-        jsonrpc_session_close(idl->session);
         idl->session = jsonrpc_session_open(remote, retry);
+        /* XXX update condition */
         idl->state_seqno = UINT_MAX;
     }
 }
 
+static void
+ovsdb_idl_db_destroy(struct ovsdb_idl_db *db)
+{
+    ovs_assert(!db->txn);
+    for (size_t i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
+        ovsdb_idl_condition_destroy(&table->condition);
+        ovsdb_idl_destroy_indexes(table);
+        shash_destroy(&table->columns);
+        hmap_destroy(&table->rows);
+        free(table->modes);
+    }
+    shash_destroy(&db->table_by_name);
+    free(db->tables);
+    json_destroy(db->schema);
+    hmap_destroy(&db->outstanding_txns);
+    free(db->lock_name);
+    json_destroy(db->lock_request_id);
+}
+
 /* Destroys 'idl' and all of the data structures that it manages. */
 void
 ovsdb_idl_destroy(struct ovsdb_idl *idl)
 {
     if (idl) {
-        size_t i;
-
-        ovs_assert(!idl->txn);
         ovsdb_idl_clear(idl);
         jsonrpc_session_close(idl->session);
 
-        for (i = 0; i < idl->class_->n_tables; i++) {
-            struct ovsdb_idl_table *table = &idl->tables[i];
-            ovsdb_idl_condition_destroy(&table->condition);
-            ovsdb_idl_destroy_indexes(table);
-            shash_destroy(&table->columns);
-            hmap_destroy(&table->rows);
-            free(table->modes);
-        }
-        shash_destroy(&idl->table_by_name);
-        free(idl->tables);
+        ovsdb_idl_db_destroy(&idl->db);
         json_destroy(idl->request_id);
-        free(idl->lock_name);
-        json_destroy(idl->lock_request_id);
-        json_destroy(idl->schema);
-        hmap_destroy(&idl->outstanding_txns);
         free(idl);
     }
 }
 
 static void
-ovsdb_idl_clear(struct ovsdb_idl *idl)
+ovsdb_idl_db_clear(struct ovsdb_idl_db *db)
 {
     bool changed = false;
     size_t i;
 
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
         struct ovsdb_idl_row *row, *next_row;
 
         table->cond_changed = false;
@@ -427,15 +456,29 @@  ovsdb_idl_clear(struct ovsdb_idl *idl)
         }
     }
 
-    idl->cond_changed = false;
-    idl->cond_seqno = 0;
-    ovsdb_idl_track_clear(idl);
+    db->cond_changed = false;
+    db->cond_seqno = 0;
+    ovsdb_idl_db_track_clear(db);
 
     if (changed) {
-        idl->change_seqno++;
+        db->change_seqno++;
     }
 }
 
+static void
+ovsdb_idl_clear(struct ovsdb_idl *idl)
+{
+    ovsdb_idl_db_clear(&idl->db);
+}
+
+static void
+ovsdb_idl_send_request(struct ovsdb_idl *idl, struct jsonrpc_msg *request)
+{
+    json_destroy(idl->request_id);
+    idl->request_id = json_clone(request->id);
+    jsonrpc_session_send(idl->session, request);
+}
+
 /* Processes a batch of messages from the database server on 'idl'.  This may
  * cause the IDL's contents to change.  The client may check for that with
  * ovsdb_idl_get_seqno(). */
@@ -444,7 +487,7 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
 {
     int i;
 
-    ovs_assert(!idl->txn);
+    ovs_assert(!idl->db.txn);
 
     ovsdb_idl_send_cond_change(idl);
 
@@ -460,10 +503,11 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
             idl->request_id = NULL;
             ovsdb_idl_txn_abort_all(idl);
 
-            ovsdb_idl_send_schema_request(idl);
+            ovsdb_idl_send_schema_request(idl, &idl->db);
             idl->state = IDL_S_SCHEMA_REQUESTED;
-            if (idl->lock_name) {
-                ovsdb_idl_send_lock_request(idl);
+            if (idl->db.lock_name) {
+                jsonrpc_session_send(
+                    idl->session, ovsdb_idl_db_compose_lock_request(&idl->db));
             }
         }
 
@@ -472,14 +516,8 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
             break;
         }
 
-        if (msg->type == JSONRPC_NOTIFY
-            && !strcmp(msg->method, "update2")
-            && msg->params->type == JSON_ARRAY
-            && msg->params->u.array.n == 2
-            && msg->params->u.array.elems[0]->type == JSON_STRING) {
-            /* Database contents changed. */
-            ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
-                                   OVSDB_UPDATE2);
+        if (ovsdb_idl_db_parse_update_rpc(&idl->db, msg)) {
+            /* ovsdb_idl_db_parse_update_rpc() did all the processing. */
         } else if (msg->type == JSONRPC_REPLY
                    && idl->request_id
                    && json_equal(idl->request_id, msg->id)) {
@@ -489,35 +527,35 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
             switch (idl->state) {
             case IDL_S_SCHEMA_REQUESTED:
                 /* Reply to our "get_schema" request. */
-                idl->schema = json_clone(msg->result);
-                ovsdb_idl_send_monitor_cond_request(idl);
+                idl->db.schema = json_clone(msg->result);
+                ovsdb_idl_send_monitor_request(idl, &idl->db, true);
                 idl->state = IDL_S_MONITOR_COND_REQUESTED;
                 break;
 
             case IDL_S_MONITOR_REQUESTED:
             case IDL_S_MONITOR_COND_REQUESTED:
                 /* Reply to our "monitor" or "monitor_cond" request. */
-                idl->change_seqno++;
-                ovsdb_idl_clear(idl);
                 if (idl->state == IDL_S_MONITOR_REQUESTED) {
                     idl->state = IDL_S_MONITORING;
-                    ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE);
+                    ovsdb_idl_db_parse_monitor_reply(&idl->db, msg->result,
+                                                     false);
                 } else { /* IDL_S_MONITOR_COND_REQUESTED. */
                     idl->state = IDL_S_MONITORING_COND;
-                    ovsdb_idl_parse_update(idl, msg->result, OVSDB_UPDATE2);
+                    ovsdb_idl_db_parse_monitor_reply(&idl->db, msg->result,
+                                                     true);
                 }
 
                 /* Schema is not useful after monitor request is accepted
                  * by the server.  */
-                json_destroy(idl->schema);
-                idl->schema = NULL;
+                json_destroy(idl->db.schema);
+                idl->db.schema = NULL;
                 break;
 
             case IDL_S_MONITORING_COND:
                 /* Conditional monitor clauses were updated. Send out
                  * the next condition changes, in any, immediately. */
                 ovsdb_idl_send_cond_change(idl);
-                idl->cond_seqno++;
+                idl->db.cond_seqno++;
                 break;
 
             case IDL_S_MONITORING:
@@ -525,27 +563,8 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
             default:
                 OVS_NOT_REACHED();
             }
-        } else if (msg->type == JSONRPC_NOTIFY
-                   && !strcmp(msg->method, "update")
-                   && msg->params->type == JSON_ARRAY
-                   && msg->params->u.array.n == 2
-                   && msg->params->u.array.elems[0]->type == JSON_STRING) {
-            /* Database contents changed. */
-            ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1],
-                                   OVSDB_UPDATE);
-        } else if (msg->type == JSONRPC_REPLY
-                   && idl->lock_request_id
-                   && json_equal(idl->lock_request_id, msg->id)) {
-            /* Reply to our "lock" request. */
-            ovsdb_idl_parse_lock_reply(idl, msg->result);
-        } else if (msg->type == JSONRPC_NOTIFY
-                   && !strcmp(msg->method, "locked")) {
-            /* We got our lock. */
-            ovsdb_idl_parse_lock_notify(idl, msg->params, true);
-        } else if (msg->type == JSONRPC_NOTIFY
-                   && !strcmp(msg->method, "stolen")) {
-            /* Someone else stole our lock. */
-            ovsdb_idl_parse_lock_notify(idl, msg->params, false);
+        } else if (ovsdb_idl_db_process_lock_replies(&idl->db, msg)) {
+            /* ovsdb_idl_db_process_lock_replies() did all the processing. */
         } else if (msg->type == JSONRPC_ERROR
                    && idl->state == IDL_S_MONITOR_COND_REQUESTED
                    && idl->request_id
@@ -555,7 +574,7 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
                 /* Fall back to using "monitor" method.  */
                 json_destroy(idl->request_id);
                 idl->request_id = NULL;
-                ovsdb_idl_send_monitor_request(idl);
+                ovsdb_idl_send_monitor_request(idl, &idl->db, false);
                 idl->state = IDL_S_MONITOR_REQUESTED;
             }
         } else if (msg->type == JSONRPC_ERROR
@@ -578,7 +597,7 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
             idl->state = IDL_S_NO_SCHEMA;
         } else if ((msg->type == JSONRPC_ERROR
                     || msg->type == JSONRPC_REPLY)
-                   && ovsdb_idl_txn_process_reply(idl, msg)) {
+                   && ovsdb_idl_db_txn_process_reply(&idl->db, msg)) {
             /* ovsdb_idl_txn_process_reply() did everything needful. */
         } else {
             /* This can happen if ovsdb_idl_txn_destroy() is called to destroy
@@ -590,7 +609,7 @@  ovsdb_idl_run(struct ovsdb_idl *idl)
         }
         jsonrpc_msg_destroy(msg);
     }
-    ovsdb_idl_row_destroy_postprocess(idl);
+    ovsdb_idl_row_destroy_postprocess(&idl->db);
 }
 
 /* Arranges for poll_block() to wake up when ovsdb_idl_run() has something to
@@ -622,7 +641,7 @@  ovsdb_idl_wait(struct ovsdb_idl *idl)
 unsigned int
 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
 {
-    return idl->change_seqno;
+    return idl->db.change_seqno;
 }
 
 /* Returns a "sequence number" that represents the number of conditional
@@ -642,7 +661,7 @@  ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
 unsigned int
 ovsdb_idl_get_condition_seqno(const struct ovsdb_idl *idl)
 {
-    return idl->cond_seqno;
+    return idl->db.cond_seqno;
 }
 
 /* Returns true if 'idl' successfully connected to the remote database and
@@ -682,7 +701,7 @@  ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
 void
 ovsdb_idl_verify_write_only(struct ovsdb_idl *idl)
 {
-    idl->verify_write_only = true;
+    idl->db.verify_write_only = true;
 }
 
 /* Returns true if 'idl' is currently connected or trying to connect
@@ -792,7 +811,7 @@  void
 ovsdb_idl_check_consistency(const struct ovsdb_idl *idl)
 {
     /* Consistency is broken while a transaction is in progress. */
-    if (!idl->txn) {
+    if (!idl->db.txn) {
         return;
     }
 
@@ -801,8 +820,8 @@  ovsdb_idl_check_consistency(const struct ovsdb_idl *idl)
     struct uuid *dsts = NULL;
     size_t allocated_dsts = 0;
 
-    for (size_t i = 0; i < idl->class_->n_tables; i++) {
-        const struct ovsdb_idl_table *table = &idl->tables[i];
+    for (size_t i = 0; i < idl->db.class_->n_tables; i++) {
+        const struct ovsdb_idl_table *table = &idl->db.tables[i];
         const struct ovsdb_idl_table_class *class = table->class_;
 
         const struct ovsdb_idl_row *row;
@@ -849,7 +868,7 @@  ovsdb_idl_check_consistency(const struct ovsdb_idl *idl)
 const struct ovsdb_idl_class *
 ovsdb_idl_get_class(const struct ovsdb_idl *idl)
 {
-    return idl->class_;
+    return idl->db.class_;
 }
 
 /* Given 'column' in some table in 'class', returns the table's class. */
@@ -867,44 +886,52 @@  ovsdb_idl_table_class_from_column(const struct ovsdb_idl_class *class,
     OVS_NOT_REACHED();
 }
 
-/* Given 'column' in some table in 'idl', returns the table. */
+/* Given 'column' in some table in 'db', returns the table. */
 static struct ovsdb_idl_table *
-ovsdb_idl_table_from_column(struct ovsdb_idl *idl,
+ovsdb_idl_table_from_column(struct ovsdb_idl_db *db,
                             const struct ovsdb_idl_column *column)
 {
     const struct ovsdb_idl_table_class *tc =
-        ovsdb_idl_table_class_from_column(idl->class_, column);
-    return &idl->tables[tc - idl->class_->tables];
+        ovsdb_idl_table_class_from_column(db->class_, column);
+    return &db->tables[tc - db->class_->tables];
 }
 
 static unsigned char *
-ovsdb_idl_get_mode(struct ovsdb_idl *idl,
-                   const struct ovsdb_idl_column *column)
+ovsdb_idl_db_get_mode(struct ovsdb_idl_db *db,
+                      const struct ovsdb_idl_column *column)
 {
-    ovs_assert(!idl->change_seqno);
+    ovs_assert(!db->change_seqno);
 
-    const struct ovsdb_idl_table *table = ovsdb_idl_table_from_column(idl,
+    const struct ovsdb_idl_table *table = ovsdb_idl_table_from_column(db,
                                                                       column);
     return &table->modes[column - table->class_->columns];
 }
 
 static void
-add_ref_table(struct ovsdb_idl *idl, const struct ovsdb_base_type *base)
+add_ref_table(struct ovsdb_idl_db *db, const struct ovsdb_base_type *base)
 {
     if (base->type == OVSDB_TYPE_UUID && base->u.uuid.refTableName) {
         struct ovsdb_idl_table *table;
 
-        table = shash_find_data(&idl->table_by_name,
-                                base->u.uuid.refTableName);
+        table = shash_find_data(&db->table_by_name, base->u.uuid.refTableName);
         if (table) {
             table->need_table = true;
         } else {
             VLOG_WARN("%s IDL class missing referenced table %s",
-                      idl->class_->database, base->u.uuid.refTableName);
+                      db->class_->database, base->u.uuid.refTableName);
         }
     }
 }
 
+static void
+ovsdb_idl_db_add_column(struct ovsdb_idl_db *db,
+                        const struct ovsdb_idl_column *column)
+{
+    *ovsdb_idl_db_get_mode(db, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
+    add_ref_table(db, &column->type.key);
+    add_ref_table(db, &column->type.value);
+}
+
 /* Turns on OVSDB_IDL_MONITOR and OVSDB_IDL_ALERT for 'column' in 'idl'.  Also
  * ensures that any tables referenced by 'column' will be replicated, even if
  * no columns in that table are selected for replication (see
@@ -918,9 +945,25 @@  void
 ovsdb_idl_add_column(struct ovsdb_idl *idl,
                      const struct ovsdb_idl_column *column)
 {
-    *ovsdb_idl_get_mode(idl, column) = OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT;
-    add_ref_table(idl, &column->type.key);
-    add_ref_table(idl, &column->type.value);
+    ovsdb_idl_db_add_column(&idl->db, column);
+}
+
+static void
+ovsdb_idl_db_add_table(struct ovsdb_idl_db *db,
+                       const struct ovsdb_idl_table_class *tc)
+{
+    size_t i;
+
+    for (i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
+
+        if (table->class_ == tc) {
+            table->need_table = true;
+            return;
+        }
+    }
+
+    OVS_NOT_REACHED();
 }
 
 /* Ensures that the table with class 'tc' will be replicated on 'idl' even if
@@ -940,18 +983,7 @@  void
 ovsdb_idl_add_table(struct ovsdb_idl *idl,
                     const struct ovsdb_idl_table_class *tc)
 {
-    size_t i;
-
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
-
-        if (table->class_ == tc) {
-            table->need_table = true;
-            return;
-        }
-    }
-
-    OVS_NOT_REACHED();
+    ovsdb_idl_db_add_table(&idl->db, tc);
 }
 
 /* A single clause within an ovsdb_idl_condition. */
@@ -1147,6 +1179,24 @@  ovsdb_idl_condition_clone(struct ovsdb_idl_condition *dst,
     }
 }
 
+static unsigned int
+ovsdb_idl_db_set_condition(struct ovsdb_idl_db *db,
+                           const struct ovsdb_idl_table_class *tc,
+                           const struct ovsdb_idl_condition *condition)
+{
+    struct ovsdb_idl_table *table = ovsdb_idl_db_table_from_class(db, tc);
+    unsigned int seqno = db->cond_seqno;
+    if (!ovsdb_idl_condition_equals(condition, &table->condition)) {
+        ovsdb_idl_condition_destroy(&table->condition);
+        ovsdb_idl_condition_clone(&table->condition, condition);
+        db->cond_changed = table->cond_changed = true;
+        poll_immediate_wake();
+        return seqno + 1;
+    }
+
+    return seqno;
+}
+
 /* Sets the replication condition for 'tc' in 'idl' to 'condition' and
  * arranges to send the new condition to the database server.
  *
@@ -1158,17 +1208,7 @@  ovsdb_idl_set_condition(struct ovsdb_idl *idl,
                         const struct ovsdb_idl_table_class *tc,
                         const struct ovsdb_idl_condition *condition)
 {
-    struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl, tc);
-    unsigned int seqno = idl->cond_seqno;
-    if (!ovsdb_idl_condition_equals(condition, &table->condition)) {
-        ovsdb_idl_condition_destroy(&table->condition);
-        ovsdb_idl_condition_clone(&table->condition, condition);
-        idl->cond_changed = table->cond_changed = true;
-        poll_immediate_wake();
-        return seqno + 1;
-    }
-
-    return seqno;
+    return ovsdb_idl_db_set_condition(&idl->db, tc, condition);
 }
 
 static struct json *
@@ -1205,25 +1245,16 @@  ovsdb_idl_create_cond_change_req(struct ovsdb_idl_table *table)
     return monitor_cond_change_request;
 }
 
-static void
-ovsdb_idl_send_cond_change(struct ovsdb_idl *idl)
+static struct jsonrpc_msg *
+ovsdb_idl_db_compose_cond_change(struct ovsdb_idl_db *db)
 {
-    int i;
-    struct json *params;
-    struct jsonrpc_msg *request;
-
-    /* When 'idl-request_id' is not NULL, there is an outstanding
-     * conditional monitoring update request that we have not heard
-     * from the server yet. Don't generate another request in this case.  */
-    if (!idl->cond_changed || !jsonrpc_session_is_connected(idl->session) ||
-        idl->state != IDL_S_MONITORING_COND || idl->request_id) {
-        return;
+    if (!db->cond_changed) {
+        return NULL;
     }
 
     struct json *monitor_cond_change_requests = NULL;
-
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (size_t i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
 
         if (table->cond_changed) {
             struct json *req = ovsdb_idl_create_cond_change_req(table);
@@ -1239,17 +1270,48 @@  ovsdb_idl_send_cond_change(struct ovsdb_idl *idl)
         }
     }
 
-    /* Send request if not empty. */
-    if (monitor_cond_change_requests) {
-        params = json_array_create_3(json_string_create("monid"),
-                                     json_string_create("monid"),
-                                     monitor_cond_change_requests);
+    if (!monitor_cond_change_requests) {
+        return NULL;
+    }
+
+    db->cond_changed = false;
+    struct json *params = json_array_create_3(json_clone(db->monitor_id),
+                                              json_clone(db->monitor_id),
+                                              monitor_cond_change_requests);
+    return jsonrpc_create_request("monitor_cond_change", params, NULL);
+}
+
+static void
+ovsdb_idl_send_cond_change(struct ovsdb_idl *idl)
+{
+    /* When 'idl->request_id' is not NULL, there is an outstanding
+     * conditional monitoring update request that we have not heard
+     * from the server yet. Don't generate another request in this case.
+     *
+     * XXX per-db request_id */
+    if (!jsonrpc_session_is_connected(idl->session)
+        || idl->state != IDL_S_MONITORING_COND
+        || idl->request_id) {
+        return;
+    }
 
-        request = jsonrpc_create_request("monitor_cond_change", params,
-                                         &idl->request_id);
-        jsonrpc_session_send(idl->session, request);
+    struct jsonrpc_msg *msg = ovsdb_idl_db_compose_cond_change(&idl->db);
+    if (msg) {
+        idl->request_id = json_clone(msg->id);
+        jsonrpc_session_send(idl->session, msg);
     }
-    idl->cond_changed = false;
+}
+
+/* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
+ *
+ * This function should be called between ovsdb_idl_create() and the first call
+ * to ovsdb_idl_run().
+ */
+static void
+ovsdb_idl_db_omit_alert(struct ovsdb_idl_db *db,
+                        const struct ovsdb_idl_column *column)
+{
+    *ovsdb_idl_db_get_mode(db, column) &= ~OVSDB_IDL_ALERT;
 }
 
 /* Turns off OVSDB_IDL_ALERT for 'column' in 'idl'.
@@ -1261,7 +1323,14 @@  void
 ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
                      const struct ovsdb_idl_column *column)
 {
-    *ovsdb_idl_get_mode(idl, column) &= ~OVSDB_IDL_ALERT;
+    ovsdb_idl_db_omit_alert(&idl->db, column);
+}
+
+static void
+ovsdb_idl_db_omit(struct ovsdb_idl_db *db,
+                  const struct ovsdb_idl_column *column)
+{
+    *ovsdb_idl_db_get_mode(db, column) = 0;
 }
 
 /* Sets the mode for 'column' in 'idl' to 0.  See the big comment above
@@ -1273,7 +1342,7 @@  ovsdb_idl_omit_alert(struct ovsdb_idl *idl,
 void
 ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
 {
-    *ovsdb_idl_get_mode(idl, column) = 0;
+    ovsdb_idl_db_omit(&idl->db, column);
 }
 
 /* Returns the most recent IDL change sequence number that caused a
@@ -1284,7 +1353,7 @@  ovsdb_idl_table_get_seqno(const struct ovsdb_idl *idl,
                           const struct ovsdb_idl_table_class *table_class)
 {
     struct ovsdb_idl_table *table
-        = ovsdb_idl_table_from_class(idl, table_class);
+        = ovsdb_idl_db_table_from_class(&idl->db, table_class);
     unsigned int max_seqno = table->change_seqno[OVSDB_IDL_CHANGE_INSERT];
 
     if (max_seqno < table->change_seqno[OVSDB_IDL_CHANGE_MODIFY]) {
@@ -1321,10 +1390,10 @@  void
 ovsdb_idl_track_add_column(struct ovsdb_idl *idl,
                            const struct ovsdb_idl_column *column)
 {
-    if (!(*ovsdb_idl_get_mode(idl, column) & OVSDB_IDL_ALERT)) {
+    if (!(*ovsdb_idl_db_get_mode(&idl->db, column) & OVSDB_IDL_ALERT)) {
         ovsdb_idl_add_column(idl, column);
     }
-    *ovsdb_idl_get_mode(idl, column) |= OVSDB_IDL_TRACK;
+    *ovsdb_idl_db_get_mode(&idl->db, column) |= OVSDB_IDL_TRACK;
 }
 
 void
@@ -1332,8 +1401,8 @@  ovsdb_idl_track_add_all(struct ovsdb_idl *idl)
 {
     size_t i, j;
 
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        const struct ovsdb_idl_table_class *tc = &idl->class_->tables[i];
+    for (i = 0; i < idl->db.class_->n_tables; i++) {
+        const struct ovsdb_idl_table_class *tc = &idl->db.class_->tables[i];
 
         for (j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
@@ -1363,7 +1432,7 @@  ovsdb_idl_track_get_first(const struct ovsdb_idl *idl,
                           const struct ovsdb_idl_table_class *table_class)
 {
     struct ovsdb_idl_table *table
-        = ovsdb_idl_table_from_class(idl, table_class);
+        = ovsdb_idl_db_table_from_class(&idl->db, table_class);
 
     if (!ovs_list_is_empty(&table->track_list)) {
         return CONTAINER_OF(ovs_list_front(&table->track_list), struct ovsdb_idl_row, track_node);
@@ -1411,13 +1480,13 @@  ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
  * functions. This is usually done at the end of the client's processing
  * loop when it is ready to do ovsdb_idl_run() again.
  */
-void
-ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
+static void
+ovsdb_idl_db_track_clear(struct ovsdb_idl_db *db)
 {
     size_t i;
 
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
 
         if (!ovs_list_is_empty(&table->track_list)) {
             struct ovsdb_idl_row *row, *next;
@@ -1438,20 +1507,27 @@  ovsdb_idl_track_clear(const struct ovsdb_idl *idl)
     }
 }
 
+/* Flushes the tracked rows. Client calls this function after calling
+ * ovsdb_idl_run() and read all tracked rows with the ovsdb_idl_track_get_*()
+ * functions. This is usually done at the end of the client's processing
+ * loop when it is ready to do ovsdb_idl_run() again.
+ */
+void
+ovsdb_idl_track_clear(struct ovsdb_idl *idl)
+{
+    ovsdb_idl_db_track_clear(&idl->db);
+}
 
 static void
-ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
+ovsdb_idl_send_schema_request(struct ovsdb_idl *idl,
+                              struct ovsdb_idl_db *db)
 {
-    struct jsonrpc_msg *msg;
-
-    json_destroy(idl->request_id);
-    msg = jsonrpc_create_request(
-        "get_schema",
-        json_array_create_1(json_string_create(idl->class_->database)),
-        &idl->request_id);
-    jsonrpc_session_send(idl->session, msg);
+    ovsdb_idl_send_request(idl, jsonrpc_create_request(
+                               "get_schema",
+                               json_array_create_1(json_string_create(
+                                                       db->class_->database)),
+                               NULL));
 }
-
 static void
 log_error(struct ovsdb_error *error)
 {
@@ -1536,36 +1612,29 @@  parse_schema(const struct json *schema_json)
 }
 
 static void
-ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
-                                 const char *method)
+ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl, struct ovsdb_idl_db *db,
+                               bool use_monitor_cond)
 {
-    struct shash *schema;
-    struct json *monitor_requests;
-    struct jsonrpc_msg *msg;
-    size_t i;
+    struct shash *schema = parse_schema(db->schema);
+    struct json *monitor_requests = json_object_create();
 
-    schema = parse_schema(idl->schema);
-    monitor_requests = json_object_create();
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (size_t i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
         const struct ovsdb_idl_table_class *tc = table->class_;
-        struct json *monitor_request, *columns, *where;
-        const struct sset *table_schema;
-        size_t j;
-
-        table_schema = (schema
-                        ? shash_find_data(schema, table->class_->name)
-                        : NULL);
+        struct json *monitor_request;
+        const struct sset *table_schema
+            = schema ? shash_find_data(schema, table->class_->name) : NULL;
 
-        columns = table->need_table ? json_array_create_empty() : NULL;
-        for (j = 0; j < tc->n_columns; j++) {
+        struct json *columns
+            = table->need_table ? json_array_create_empty() : NULL;
+        for (size_t j = 0; j < tc->n_columns; j++) {
             const struct ovsdb_idl_column *column = &tc->columns[j];
             if (table->modes[j] & OVSDB_IDL_MONITOR) {
                 if (table_schema
                     && !sset_contains(table_schema, column->name)) {
                     VLOG_WARN("%s table in %s database lacks %s column "
                               "(database needs upgrade?)",
-                              table->class_->name, idl->class_->database,
+                              table->class_->name, db->class_->database,
                               column->name);
                     continue;
                 }
@@ -1580,17 +1649,18 @@  ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
             if (schema && !table_schema) {
                 VLOG_WARN("%s database lacks %s table "
                           "(database needs upgrade?)",
-                          idl->class_->database, table->class_->name);
+                          db->class_->database, table->class_->name);
                 json_destroy(columns);
                 continue;
             }
 
             monitor_request = json_object_create();
             json_object_put(monitor_request, "columns", columns);
-            if (!strcmp(method, "monitor_cond")
-                && !ovsdb_idl_condition_is_true(&table->condition)) {
-                where = ovsdb_idl_condition_to_json(&table->condition);
-                json_object_put(monitor_request, "where", where);
+
+            const struct ovsdb_idl_condition *cond = &table->condition;
+            if (use_monitor_cond && !ovsdb_idl_condition_is_true(cond)) {
+                json_object_put(monitor_request, "where",
+                                ovsdb_idl_condition_to_json(cond));
                 table->cond_changed = false;
             }
             json_object_put(monitor_requests, tc->name, monitor_request);
@@ -1598,21 +1668,15 @@  ovsdb_idl_send_monitor_request__(struct ovsdb_idl *idl,
     }
     free_schema(schema);
 
-    json_destroy(idl->request_id);
+    db->cond_changed = false;
 
-    msg = jsonrpc_create_request(
-        method,
-        json_array_create_3(json_string_create(idl->class_->database),
-                            json_string_create("monid"), monitor_requests),
-        &idl->request_id);
-    jsonrpc_session_send(idl->session, msg);
-    idl->cond_changed = false;
-}
-
-static void
-ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
-{
-    ovsdb_idl_send_monitor_request__(idl, "monitor");
+    ovsdb_idl_send_request(
+        idl,
+        jsonrpc_create_request(
+            use_monitor_cond ? "monitor_cond" : "monitor",
+            json_array_create_3(json_string_create(db->class_->database),
+                                json_clone(db->monitor_id), monitor_requests),
+            NULL));
 }
 
 static void
@@ -1627,36 +1691,46 @@  log_parse_update_error(struct ovsdb_error *error)
 }
 
 static void
-ovsdb_idl_send_monitor_cond_request(struct ovsdb_idl *idl)
+ovsdb_idl_db_parse_monitor_reply(struct ovsdb_idl_db *db,
+                                 const struct json *result,
+                                 bool is_monitor_cond)
 {
-    ovsdb_idl_send_monitor_request__(idl, "monitor_cond");
+    db->change_seqno++;
+    ovsdb_idl_db_clear(db);
+    ovsdb_idl_db_parse_update(db, result, is_monitor_cond);
 }
 
-static void
-ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates,
-                       enum ovsdb_update_version version)
+static bool
+ovsdb_idl_db_parse_update_rpc(struct ovsdb_idl_db *db,
+                              const struct jsonrpc_msg *msg)
 {
-    struct ovsdb_error *error = ovsdb_idl_parse_update__(idl, table_updates,
-                                                         version);
-    if (error) {
-        log_parse_update_error(error);
+    if (msg->type == JSONRPC_NOTIFY) {
+        bool is_update = !strcmp(msg->method, "update");
+        bool is_update2 = !strcmp(msg->method, "update2");
+        if ((is_update || is_update2)
+            && msg->params->type == JSON_ARRAY
+            && msg->params->u.array.n == 2
+            && json_equal(msg->params->u.array.elems[0], db->monitor_id)) {
+            ovsdb_idl_db_parse_update(db, msg->params->u.array.elems[1],
+                                      is_update2);
+            return true;
+        }
     }
+    return false;
 }
 
 static struct ovsdb_error *
-ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
-                         const struct json *table_updates,
-                         enum ovsdb_update_version version)
+ovsdb_idl_db_parse_update__(struct ovsdb_idl_db *db,
+                            const struct json *table_updates,
+                            bool is_monitor_cond)
 {
     const struct shash_node *tables_node;
-    const char *table_updates_name = table_updates_names[version];
-    const char *table_update_name = table_update_names[version];
-    const char *row_update_name = row_update_names[version];
+    const char *version_suffix = is_monitor_cond ? "2" : "";
 
     if (table_updates->type != JSON_OBJECT) {
         return ovsdb_syntax_error(table_updates, NULL,
-                                  "<%s> is not an object",
-                                  table_updates_name);
+                                  "<table_updates%s> is not an object",
+                                  version_suffix);
     }
 
     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
@@ -1664,75 +1738,43 @@  ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
         const struct shash_node *table_node;
         struct ovsdb_idl_table *table;
 
-        table = shash_find_data(&idl->table_by_name, tables_node->name);
+        table = shash_find_data(&db->table_by_name, tables_node->name);
         if (!table) {
             return ovsdb_syntax_error(
                 table_updates, NULL,
-                "<%s> includes unknown table \"%s\"",
-                table_updates_name,
-                tables_node->name);
+                "<table_updates%s> includes unknown table \"%s\"",
+                version_suffix, tables_node->name);
         }
 
         if (table_update->type != JSON_OBJECT) {
             return ovsdb_syntax_error(table_update, NULL,
-                                      "<%s> for table \"%s\" is "
+                                      "<table_update%s> for table \"%s\" is "
                                       "not an object",
-                                      table_update_name,
-                                      table->class_->name);
+                                      version_suffix, table->class_->name);
         }
         SHASH_FOR_EACH (table_node, json_object(table_update)) {
             const struct json *row_update = table_node->data;
-            const struct json *old_json, *new_json;
             struct uuid uuid;
 
             if (!uuid_from_string(&uuid, table_node->name)) {
                 return ovsdb_syntax_error(table_update, NULL,
-                                          "<%s> for table \"%s\" "
+                                          "<table_update%s> for table \"%s\" "
                                           "contains bad UUID "
                                           "\"%s\" as member name",
-                                          table_update_name,
+                                          version_suffix,
                                           table->class_->name,
                                           table_node->name);
             }
             if (row_update->type != JSON_OBJECT) {
                 return ovsdb_syntax_error(row_update, NULL,
-                                          "<%s> for table \"%s\" "
-                                          "contains <%s> for %s that "
-                                          "is not an object",
-                                          table_update_name,
-                                          table->class_->name,
-                                          row_update_name,
-                                          table_node->name);
+                                          "<table_update%s> for table \"%s\" "
+                                          "contains <row_update%s> for %s "
+                                          "that is not an object",
+                                          version_suffix, table->class_->name,
+                                          version_suffix, table_node->name);
             }
 
-            switch(version) {
-            case OVSDB_UPDATE:
-                old_json = shash_find_data(json_object(row_update), "old");
-                new_json = shash_find_data(json_object(row_update), "new");
-                if (old_json && old_json->type != JSON_OBJECT) {
-                    return ovsdb_syntax_error(old_json, NULL,
-                                              "\"old\" <row> is not object");
-                } else if (new_json && new_json->type != JSON_OBJECT) {
-                    return ovsdb_syntax_error(new_json, NULL,
-                                              "\"new\" <row> is not object");
-                } else if ((old_json != NULL) + (new_json != NULL)
-                           != shash_count(json_object(row_update))) {
-                    return ovsdb_syntax_error(row_update, NULL,
-                                              "<row-update> contains "
-                                              "unexpected member");
-                } else if (!old_json && !new_json) {
-                    return ovsdb_syntax_error(row_update, NULL,
-                                              "<row-update> missing \"old\" "
-                                              "and \"new\" members");
-                }
-
-                if (ovsdb_idl_process_update(table, &uuid, old_json,
-                                             new_json)) {
-                    idl->change_seqno++;
-                }
-                break;
-
-            case OVSDB_UPDATE2: {
+            if (is_monitor_cond) {
                 const char *ops[] = {"modify", "insert", "delete", "initial"};
                 const char *operation;
                 const struct json *row;
@@ -1745,7 +1787,7 @@  ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
                     if (row)  {
                         if (ovsdb_idl_process_update2(table, &uuid, operation,
                                                       row)) {
-                            idl->change_seqno++;
+                            db->change_seqno++;
                         }
                         break;
                     }
@@ -1757,11 +1799,32 @@  ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
                                               "<row_update2> includes unknown "
                                               "object");
                 }
-                break;
-            }
+            } else {
+                const struct json *old_json, *new_json;
 
-            default:
-                OVS_NOT_REACHED();
+                old_json = shash_find_data(json_object(row_update), "old");
+                new_json = shash_find_data(json_object(row_update), "new");
+                if (old_json && old_json->type != JSON_OBJECT) {
+                    return ovsdb_syntax_error(old_json, NULL,
+                                              "\"old\" <row> is not object");
+                } else if (new_json && new_json->type != JSON_OBJECT) {
+                    return ovsdb_syntax_error(new_json, NULL,
+                                              "\"new\" <row> is not object");
+                } else if ((old_json != NULL) + (new_json != NULL)
+                           != shash_count(json_object(row_update))) {
+                    return ovsdb_syntax_error(row_update, NULL,
+                                              "<row-update> contains "
+                                              "unexpected member");
+                } else if (!old_json && !new_json) {
+                    return ovsdb_syntax_error(row_update, NULL,
+                                              "<row-update> missing \"old\" "
+                                              "and \"new\" members");
+                }
+
+                if (ovsdb_idl_process_update(table, &uuid, old_json,
+                                             new_json)) {
+                    db->change_seqno++;
+                }
             }
         }
     }
@@ -1769,6 +1832,18 @@  ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
     return NULL;
 }
 
+static void
+ovsdb_idl_db_parse_update(struct ovsdb_idl_db *db,
+                          const struct json *table_updates,
+                          bool is_monitor_cond)
+{
+    struct ovsdb_error *error = ovsdb_idl_db_parse_update__(db, table_updates,
+                                                            is_monitor_cond);
+    if (error) {
+        log_parse_update_error(error);
+    }
+}
+
 static struct ovsdb_idl_row *
 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
 {
@@ -1955,7 +2030,7 @@  ovsdb_idl_row_change__(struct ovsdb_idl_row *row, const struct json *row_json,
                     changed = true;
                     row->change_seqno[change]
                         = row->table->change_seqno[change]
-                        = row->table->idl->change_seqno + 1;
+                        = row->table->db->change_seqno + 1;
                     if (table->modes[column_idx] & OVSDB_IDL_TRACK) {
                         if (!ovs_list_is_empty(&row->track_node)) {
                             ovs_list_remove(&row->track_node);
@@ -2067,20 +2142,16 @@  ovsdb_idl_row_unparse(struct ovsdb_idl_row *row)
  * iterate over a subset of rows in a defined order.
  */
 
-/* Creates a new index with the provided name, attached to the given idl and
- * table. Note that all indexes must be created and indexing columns added
- * before the first call to ovsdb_idl_run() is made.
- */
-struct ovsdb_idl_index *
-ovsdb_idl_create_index(struct ovsdb_idl *idl,
-                       const struct ovsdb_idl_table_class *tc,
-                       const char *index_name)
+static struct ovsdb_idl_index *
+ovsdb_idl_db_create_index(struct ovsdb_idl_db *db,
+                          const struct ovsdb_idl_table_class *tc,
+                          const char *index_name)
 {
     struct ovsdb_idl_index *index;
     size_t i;
 
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
 
         if (table->class_ == tc) {
             index = ovsdb_idl_create_index_(table, 1);
@@ -2097,6 +2168,18 @@  ovsdb_idl_create_index(struct ovsdb_idl *idl,
     return NULL;
 }
 
+/* Creates a new index with the provided name, attached to the given idl and
+ * table. Note that all indexes must be created and indexing columns added
+ * before the first call to ovsdb_idl_run() is made.
+ */
+struct ovsdb_idl_index *
+ovsdb_idl_create_index(struct ovsdb_idl *idl,
+                       const struct ovsdb_idl_table_class *tc,
+                       const char *index_name)
+{
+    return ovsdb_idl_db_create_index(&idl->db, tc, index_name);
+}
+
 /* Generic comparator that can compare each index, using the custom
  * configuration (an struct ovsdb_idl_index) passed to it.
  * Not intended for direct usage.
@@ -2233,7 +2316,7 @@  ovsdb_idl_index_add_column(struct ovsdb_idl_index *index,
     /* Check that the column or table is tracked */
     if (!index->table->need_table &&
         !((OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT) &
-          *ovsdb_idl_get_mode(index->table->idl, column))) {
+          *ovsdb_idl_db_get_mode(index->table->db, column))) {
         VLOG_ERR("Can't add unmonitored column '%s' at index '%s' in "
                  "table '%s'.",
                  column->name, index->index_name, index->table->class_->name);
@@ -2263,16 +2346,16 @@  ovsdb_idl_index_add_column(struct ovsdb_idl_index *index,
     index->n_columns++;
 }
 
-bool
-ovsdb_idl_initialize_cursor(struct ovsdb_idl *idl,
-                            const struct ovsdb_idl_table_class *tc,
-                            const char *index_name,
-                            struct ovsdb_idl_index_cursor *cursor)
+static bool
+ovsdb_idl_db_initialize_cursor(struct ovsdb_idl_db *db,
+                               const struct ovsdb_idl_table_class *tc,
+                               const char *index_name,
+                               struct ovsdb_idl_index_cursor *cursor)
 {
     size_t i;
 
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
 
         if (table->class_ == tc) {
             struct shash_node *node = shash_find(&table->indexes, index_name);
@@ -2295,6 +2378,15 @@  ovsdb_idl_initialize_cursor(struct ovsdb_idl *idl,
     return false;
 }
 
+bool
+ovsdb_idl_initialize_cursor(struct ovsdb_idl *idl,
+                            const struct ovsdb_idl_table_class *tc,
+                            const char *index_name,
+                            struct ovsdb_idl_index_cursor *cursor)
+{
+    return ovsdb_idl_db_initialize_cursor(&idl->db, tc, index_name, cursor);
+}
+
 /* ovsdb_idl_index_write_ writes a datum in an ovsdb_idl_row,
  * and updates the corresponding field in the table record.
  * Not intended for direct usage.
@@ -2587,7 +2679,7 @@  ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
         if (ovsdb_idl_track_is_set(row->table)) {
             row->change_seqno[OVSDB_IDL_CHANGE_DELETE]
                 = row->table->change_seqno[OVSDB_IDL_CHANGE_DELETE]
-                = row->table->idl->change_seqno + 1;
+                = row->table->db->change_seqno + 1;
         }
         if (!ovs_list_is_empty(&row->track_node)) {
             ovs_list_remove(&row->track_node);
@@ -2639,12 +2731,12 @@  ovsdb_idl_destroy_all_set_op_lists(struct ovsdb_idl_row *row)
 }
 
 static void
-ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl *idl)
+ovsdb_idl_row_destroy_postprocess(struct ovsdb_idl_db *db)
 {
     size_t i;
 
-    for (i = 0; i < idl->class_->n_tables; i++) {
-        struct ovsdb_idl_table *table = &idl->tables[i];
+    for (i = 0; i < db->class_->n_tables; i++) {
+        struct ovsdb_idl_table *table = &db->tables[i];
 
         if (!ovs_list_is_empty(&table->track_list)) {
             struct ovsdb_idl_row *row, *next;
@@ -2748,10 +2840,18 @@  may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
 }
 
 static struct ovsdb_idl_table *
+ovsdb_idl_db_table_from_class(const struct ovsdb_idl_db *db,
+                              const struct ovsdb_idl_table_class *table_class)
+{
+    ptrdiff_t idx = table_class - db->class_->tables;
+    return idx >= 0 && idx < db->class_->n_tables ? &db->tables[idx] : NULL;
+}
+
+static struct ovsdb_idl_table *
 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
                            const struct ovsdb_idl_table_class *table_class)
 {
-    return &idl->tables[table_class - idl->class_->tables];
+    return ovsdb_idl_db_table_from_class(&idl->db, table_class);
 }
 
 /* Called by ovsdb-idlc generated code. */
@@ -2760,14 +2860,14 @@  ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
                       const struct ovsdb_idl_table_class *dst_table_class,
                       const struct uuid *dst_uuid)
 {
-    struct ovsdb_idl *idl = src->table->idl;
+    struct ovsdb_idl_db *db = src->table->db;
     struct ovsdb_idl_table *dst_table;
     struct ovsdb_idl_arc *arc;
     struct ovsdb_idl_row *dst;
 
-    dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
+    dst_table = ovsdb_idl_db_table_from_class(db, dst_table_class);
     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
-    if (idl->txn || is_index_row(src)) {
+    if (db->txn || is_index_row(src)) {
         /* There are two cases we should not update any arcs:
          *
          * 1. We're being called from ovsdb_idl_txn_write(). We must not update
@@ -2837,8 +2937,8 @@  const struct ovsdb_idl_row *
 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
                     const struct ovsdb_idl_table_class *table_class)
 {
-    struct ovsdb_idl_table *table
-        = ovsdb_idl_table_from_class(idl, table_class);
+    struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl,
+                                                               table_class);
     return next_real_row(table, hmap_first(&table->rows));
 }
 
@@ -2972,10 +3072,10 @@  ovsdb_idl_txn_create(struct ovsdb_idl *idl)
 {
     struct ovsdb_idl_txn *txn;
 
-    ovs_assert(!idl->txn);
-    idl->txn = txn = xmalloc(sizeof *txn);
+    ovs_assert(!idl->db.txn);
+    idl->db.txn = txn = xmalloc(sizeof *txn);
     txn->request_id = NULL;
-    txn->idl = idl;
+    txn->db = &idl->db;
     hmap_init(&txn->txn_rows);
     txn->status = TXN_UNCOMMITTED;
     txn->error = NULL;
@@ -3067,7 +3167,7 @@  ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
 
     json_destroy(txn->request_id);
     if (txn->status == TXN_INCOMPLETE) {
-        hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
+        hmap_remove(&txn->db->outstanding_txns, &txn->hmap_node);
     }
     ovsdb_idl_txn_abort(txn);
     ds_destroy(&txn->comment);
@@ -3163,7 +3263,7 @@  ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
      * ovsdb_idl_column's 'parse' function, which will call
      * ovsdb_idl_get_row_arc(), which will seen that the IDL is in a
      * transaction and fail to update the graph.  */
-    txn->idl->txn = NULL;
+    txn->db->txn = NULL;
 
     HMAP_FOR_EACH_SAFE (row, next, txn_node, &txn->txn_rows) {
         ovsdb_idl_destroy_all_map_op_lists(row);
@@ -3449,25 +3549,25 @@  ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
     struct json *operations;
     bool any_updates;
 
-    if (txn != txn->idl->txn) {
+    if (txn != txn->db->txn) {
         goto coverage_out;
     }
 
     /* If we need a lock but don't have it, give up quickly. */
-    if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
+    if (txn->db->lock_name && !txn->db->has_lock) {
         txn->status = TXN_NOT_LOCKED;
         goto disassemble_out;
     }
 
     operations = json_array_create_1(
-        json_string_create(txn->idl->class_->database));
+        json_string_create(txn->db->class_->database));
 
     /* Assert that we have the required lock (avoiding a race). */
-    if (txn->idl->lock_name) {
+    if (txn->db->lock_name) {
         struct json *op = json_object_create();
         json_array_add(operations, op);
         json_object_put_string(op, "op", "assert");
-        json_object_put_string(op, "lock", txn->idl->lock_name);
+        json_object_put_string(op, "lock", txn->db->lock_name);
     }
 
     /* Add prerequisites and declarations of new rows. */
@@ -3670,10 +3770,10 @@  ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
         txn->status = TXN_UNCHANGED;
         json_destroy(operations);
     } else if (!jsonrpc_session_send(
-                   txn->idl->session,
+                   txn->db->idl->session,
                    jsonrpc_create_request(
                        "transact", operations, &txn->request_id))) {
-        hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
+        hmap_insert(&txn->db->outstanding_txns, &txn->hmap_node,
                     json_hash(txn->request_id, 0));
         txn->status = TXN_INCOMPLETE;
     } else {
@@ -3710,8 +3810,8 @@  ovsdb_idl_txn_commit_block(struct ovsdb_idl_txn *txn)
 
     fatal_signal_run();
     while ((status = ovsdb_idl_txn_commit(txn)) == TXN_INCOMPLETE) {
-        ovsdb_idl_run(txn->idl);
-        ovsdb_idl_wait(txn->idl);
+        ovsdb_idl_run(txn->db->idl);
+        ovsdb_idl_wait(txn->db->idl);
         ovsdb_idl_txn_wait(txn);
         poll_block();
     }
@@ -3802,7 +3902,7 @@  ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
                        enum ovsdb_idl_txn_status status)
 {
     txn->status = status;
-    hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
+    hmap_remove(&txn->db->outstanding_txns, &txn->hmap_node);
 }
 
 static void
@@ -3828,7 +3928,7 @@  ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
     ovs_assert(row->old_datum == NULL ||
                row->table->modes[column_idx] & OVSDB_IDL_MONITOR);
 
-    if (row->table->idl->verify_write_only && !write_only) {
+    if (row->table->db->verify_write_only && !write_only) {
         VLOG_ERR("Bug: Attempt to write to a read/write column (%s:%s) when"
                  " explicitly configured not to.", class->name, column->name);
         goto discard_datum;
@@ -3851,7 +3951,7 @@  ovsdb_idl_txn_write__(const struct ovsdb_idl_row *row_,
     }
 
     if (hmap_node_is_null(&row->txn_node)) {
-        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+        hmap_insert(&row->table->db->txn->txn_rows, &row->txn_node,
                     uuid_hash(&row->uuid));
     }
     if (row->old_datum == row->new_datum) {
@@ -3973,7 +4073,7 @@  ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
     }
 
     if (hmap_node_is_null(&row->txn_node)) {
-        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+        hmap_insert(&row->table->db->txn->txn_rows, &row->txn_node,
                     uuid_hash(&row->uuid));
     }
     if (!row->prereqs) {
@@ -4004,12 +4104,12 @@  ovsdb_idl_txn_delete(const struct ovsdb_idl_row *row_)
         ovsdb_idl_row_clear_new(row);
         ovs_assert(!row->prereqs);
         hmap_remove(&row->table->rows, &row->hmap_node);
-        hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
+        hmap_remove(&row->table->db->txn->txn_rows, &row->txn_node);
         free(row);
         return;
     }
     if (hmap_node_is_null(&row->txn_node)) {
-        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+        hmap_insert(&row->table->db->txn->txn_rows, &row->txn_node,
                     uuid_hash(&row->uuid));
     }
     ovsdb_idl_row_clear_new(row);
@@ -4042,7 +4142,7 @@  ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
         uuid_generate(&row->uuid);
     }
 
-    row->table = ovsdb_idl_table_from_class(txn->idl, class);
+    row->table = ovsdb_idl_db_table_from_class(txn->db, class);
     row->new_datum = xmalloc(class->n_columns * sizeof *row->new_datum);
     hmap_insert(&row->table->rows, &row->hmap_node, uuid_hash(&row->uuid));
     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
@@ -4054,18 +4154,18 @@  ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
 {
     struct ovsdb_idl_txn *txn;
 
-    HMAP_FOR_EACH (txn, hmap_node, &idl->outstanding_txns) {
+    HMAP_FOR_EACH (txn, hmap_node, &idl->db.outstanding_txns) {
         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
     }
 }
 
 static struct ovsdb_idl_txn *
-ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
+ovsdb_idl_db_txn_find(struct ovsdb_idl_db *db, const struct json *id)
 {
     struct ovsdb_idl_txn *txn;
 
     HMAP_FOR_EACH_WITH_HASH (txn, hmap_node,
-                             json_hash(id, 0), &idl->outstanding_txns) {
+                             json_hash(id, 0), &db->outstanding_txns) {
         if (json_equal(id, txn->request_id)) {
             return txn;
         }
@@ -4104,7 +4204,7 @@  ovsdb_idl_txn_process_inc_reply(struct ovsdb_idl_txn *txn,
     }
 
     /* We know that this is a JSON object because the loop in
-     * ovsdb_idl_txn_process_reply() checked. */
+     * ovsdb_idl_db_txn_process_reply() checked. */
     mutate = json_object(results->elems[txn->inc_index]);
     count = shash_find_data(mutate, "count");
     if (!check_json_type(count, JSON_INTEGER, "\"mutate\" reply \"count\"")) {
@@ -4181,13 +4281,13 @@  ovsdb_idl_txn_process_insert_reply(struct ovsdb_idl_txn_insert *insert,
 }
 
 static bool
-ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
-                            const struct jsonrpc_msg *msg)
+ovsdb_idl_db_txn_process_reply(struct ovsdb_idl_db *db,
+                               const struct jsonrpc_msg *msg)
 {
     struct ovsdb_idl_txn *txn;
     enum ovsdb_idl_txn_status status;
 
-    txn = ovsdb_idl_txn_find(idl, msg->id);
+    txn = ovsdb_idl_db_txn_find(db, msg->id);
     if (!txn) {
         return false;
     }
@@ -4274,7 +4374,7 @@  ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
 struct ovsdb_idl_txn *
 ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
 {
-    struct ovsdb_idl_txn *txn = row->table->idl->txn;
+    struct ovsdb_idl_txn *txn = row->table->db->txn;
     ovs_assert(txn != NULL);
     return txn;
 }
@@ -4283,7 +4383,7 @@  ovsdb_idl_txn_get(const struct ovsdb_idl_row *row)
 struct ovsdb_idl *
 ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
 {
-    return txn->idl;
+    return txn->db->idl;
 }
 
 /* Blocks until 'idl' successfully connects to the remote database and
@@ -4301,6 +4401,31 @@  ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
     }
 }
 
+static struct jsonrpc_msg *
+ovsdb_idl_db_set_lock(struct ovsdb_idl_db *db, const char *lock_name)
+{
+    ovs_assert(!db->txn);
+    ovs_assert(hmap_is_empty(&db->outstanding_txns));
+
+    if (db->lock_name
+        && (!lock_name || strcmp(lock_name, db->lock_name))) {
+        /* Release previous lock. */
+        struct jsonrpc_msg *msg = ovsdb_idl_db_compose_unlock_request(db);
+        free(db->lock_name);
+        db->lock_name = NULL;
+        db->is_lock_contended = false;
+        return msg;
+    }
+
+    if (lock_name && !db->lock_name) {
+        /* Acquire new lock. */
+        db->lock_name = xstrdup(lock_name);
+        return ovsdb_idl_db_compose_lock_request(db);
+    }
+
+    return NULL;
+}
+
 /* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
  * the database server and to avoid modifying the database when the lock cannot
  * be acquired (that is, when another client has the same lock).
@@ -4310,21 +4435,12 @@  ovsdb_idl_get_initial_snapshot(struct ovsdb_idl *idl)
 void
 ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
 {
-    ovs_assert(!idl->txn);
-    ovs_assert(hmap_is_empty(&idl->outstanding_txns));
-
-    if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
-        /* Release previous lock. */
-        ovsdb_idl_send_unlock_request(idl);
-        free(idl->lock_name);
-        idl->lock_name = NULL;
-        idl->is_lock_contended = false;
-    }
-
-    if (lock_name && !idl->lock_name) {
-        /* Acquire new lock. */
-        idl->lock_name = xstrdup(lock_name);
-        ovsdb_idl_send_lock_request(idl);
+    for (;;) {
+        struct jsonrpc_msg *msg = ovsdb_idl_db_set_lock(&idl->db, lock_name);
+        if (!msg) {
+            break;
+        }
+        jsonrpc_session_send(idl->session, msg);
     }
 }
 
@@ -4337,7 +4453,7 @@  ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
 bool
 ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
 {
-    return idl->has_lock;
+    return idl->db.has_lock;
 }
 
 /* Returns true if 'idl' is configured to obtain a lock but the database server
@@ -4345,63 +4461,87 @@  ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
 bool
 ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
 {
-    return idl->is_lock_contended;
+    return idl->db.is_lock_contended;
 }
 
 static void
-ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
+ovsdb_idl_db_update_has_lock(struct ovsdb_idl_db *db, bool new_has_lock)
 {
-    if (new_has_lock && !idl->has_lock) {
-        if (idl->state == IDL_S_MONITORING ||
-            idl->state == IDL_S_MONITORING_COND) {
-            idl->change_seqno++;
+    if (new_has_lock && !db->has_lock) {
+        if (db->idl->state == IDL_S_MONITORING ||
+            db->idl->state == IDL_S_MONITORING_COND) {
+            db->change_seqno++;
         } else {
             /* We're setting up a session, so don't signal that the database
              * changed.  Finalizing the session will increment change_seqno
              * anyhow. */
         }
-        idl->is_lock_contended = false;
+        db->is_lock_contended = false;
     }
-    idl->has_lock = new_has_lock;
+    db->has_lock = new_has_lock;
 }
 
-static void
-ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
-                              struct json **idp)
-{
-    ovsdb_idl_update_has_lock(idl, false);
+static bool
+ovsdb_idl_db_process_lock_replies(struct ovsdb_idl_db *db,
+                                  const struct jsonrpc_msg *msg)
+{
+    if (msg->type == JSONRPC_REPLY
+        && db->lock_request_id
+        && json_equal(db->lock_request_id, msg->id)) {
+        /* Reply to our "lock" request. */
+        ovsdb_idl_db_parse_lock_reply(db, msg->result);
+        return true;
+    }
 
-    json_destroy(idl->lock_request_id);
-    idl->lock_request_id = NULL;
+    if (msg->type == JSONRPC_NOTIFY) {
+        if (!strcmp(msg->method, "locked")) {
+            /* We got our lock. */
+            return ovsdb_idl_db_parse_lock_notify(db, msg->params, true);
+        } else if (!strcmp(msg->method, "stolen")) {
+            /* Someone else stole our lock. */
+            return ovsdb_idl_db_parse_lock_notify(db, msg->params, false);
+        }
+    }
 
-    if (jsonrpc_session_is_connected(idl->session)) {
-        struct json *params;
+    return false;
+}
 
-        params = json_array_create_1(json_string_create(idl->lock_name));
-        jsonrpc_session_send(idl->session,
-                             jsonrpc_create_request(method, params, idp));
-    }
+static struct jsonrpc_msg *
+ovsdb_idl_db_compose_lock_request__(struct ovsdb_idl_db *db,
+                                    const char *method)
+{
+    ovsdb_idl_db_update_has_lock(db, false);
+
+    json_destroy(db->lock_request_id);
+    db->lock_request_id = NULL;
+
+    struct json *params = json_array_create_1(json_string_create(
+                                                  db->lock_name));
+    return jsonrpc_create_request(method, params, NULL);
 }
 
-static void
-ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
+static struct jsonrpc_msg *
+ovsdb_idl_db_compose_lock_request(struct ovsdb_idl_db *db)
 {
-    ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
+    struct jsonrpc_msg *msg = ovsdb_idl_db_compose_lock_request__(db, "lock");
+    db->lock_request_id = json_clone(msg->id);
+    return msg;
 }
 
-static void
-ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
+static struct jsonrpc_msg *
+ovsdb_idl_db_compose_unlock_request(struct ovsdb_idl_db *db)
 {
-    ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
+    return ovsdb_idl_db_compose_lock_request__(db, "unlock");
 }
 
 static void
-ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
+ovsdb_idl_db_parse_lock_reply(struct ovsdb_idl_db *db,
+                              const struct json *result)
 {
     bool got_lock;
 
-    json_destroy(idl->lock_request_id);
-    idl->lock_request_id = NULL;
+    json_destroy(db->lock_request_id);
+    db->lock_request_id = NULL;
 
     if (result->type == JSON_OBJECT) {
         const struct json *locked;
@@ -4412,30 +4552,32 @@  ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
         got_lock = false;
     }
 
-    ovsdb_idl_update_has_lock(idl, got_lock);
+    ovsdb_idl_db_update_has_lock(db, got_lock);
     if (!got_lock) {
-        idl->is_lock_contended = true;
+        db->is_lock_contended = true;
     }
 }
 
-static void
-ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
-                            const struct json *params,
-                            bool new_has_lock)
+static bool
+ovsdb_idl_db_parse_lock_notify(struct ovsdb_idl_db *db,
+                               const struct json *params,
+                               bool new_has_lock)
 {
-    if (idl->lock_name
+    if (db->lock_name
         && params->type == JSON_ARRAY
         && json_array(params)->n > 0
         && json_array(params)->elems[0]->type == JSON_STRING) {
         const char *lock_name = json_string(json_array(params)->elems[0]);
 
-        if (!strcmp(idl->lock_name, lock_name)) {
-            ovsdb_idl_update_has_lock(idl, new_has_lock);
+        if (!strcmp(db->lock_name, lock_name)) {
+            ovsdb_idl_db_update_has_lock(db, new_has_lock);
             if (!new_has_lock) {
-                idl->is_lock_contended = true;
+                db->is_lock_contended = true;
             }
+            return true;
         }
     }
+    return false;
 }
 
 /* Inserts a new Map Operation into current transaction. */
@@ -4469,7 +4611,7 @@  ovsdb_idl_txn_add_map_op(struct ovsdb_idl_row *row,
 
     /* Add this row to transaction's list of rows. */
     if (hmap_node_is_null(&row->txn_node)) {
-        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+        hmap_insert(&row->table->db->txn->txn_rows, &row->txn_node,
                     uuid_hash(&row->uuid));
     }
 }
@@ -4505,7 +4647,7 @@  ovsdb_idl_txn_add_set_op(struct ovsdb_idl_row *row,
 
     /* Add this row to the transactions's list of rows. */
     if (hmap_node_is_null(&row->txn_node)) {
-        hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
+        hmap_insert(&row->table->db->txn->txn_rows, &row->txn_node,
                     uuid_hash(&row->uuid));
     }
 }
diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h
index 67d48cf0c16b..975f9402b3b4 100644
--- a/lib/ovsdb-idl.h
+++ b/lib/ovsdb-idl.h
@@ -166,7 +166,7 @@  const struct ovsdb_idl_row *ovsdb_idl_track_get_first(
 const struct ovsdb_idl_row *ovsdb_idl_track_get_next(const struct ovsdb_idl_row *);
 bool ovsdb_idl_track_is_updated(const struct ovsdb_idl_row *row,
                                 const struct ovsdb_idl_column *column);
-void ovsdb_idl_track_clear(const struct ovsdb_idl *);
+void ovsdb_idl_track_clear(struct ovsdb_idl *);
 
 
 /* Reading the database replica. */