[ovs-dev,RFC,v2,5/7] ovsdb-monitor: Support monitor_cond_since.

Message ID 1548792109-111156-6-git-send-email-hzhou8@ebay.com
State New
Headers show
Series
  • Fast OVSDB resync after restart or failover.
Related show

Commit Message

Han Zhou Jan. 29, 2019, 8:01 p.m.
From: Han Zhou <hzhou8@ebay.com>

Support the new monitor method monitor_cond_since so that a client
can request monitoring start from a specific point instead of always
from beginning. This will reduce the cost at scenarios when server
is restarted/failed-over but client still has all existing data. In
these scenarios only new changes (and in most cases no change) needed
to be transfered to client.

This change includes both server side and ovsdb-client side changes
with the new protocol. IDLs using this capability will be added in
future patches.

Note: the feature takes effect only for cluster mode of ovsdb-server,
because cluster mode is the only mode that supports unique transcation
uuid today. For other modes, the monitor_cond_since always fall back
to transfer all data with found = false.

For more details of the protocol change, see
Documentation/ref/ovsdb-server.7.rst.

Signed-off-by: Han Zhou <hzhou8@ebay.com>
---
 Documentation/ref/ovsdb-server.7.rst |  78 +++++++++-
 ovsdb/jsonrpc-server.c               |  85 ++++++++--
 ovsdb/monitor.c                      | 109 ++++++++++++-
 ovsdb/monitor.h                      |   6 +
 ovsdb/ovsdb-client.c                 | 104 ++++++++++++-
 ovsdb/transaction.c                  |   6 +
 ovsdb/transaction.h                  |   1 +
 tests/ovsdb-monitor.at               | 294 +++++++++++++++++++++++++++++++++++
 8 files changed, 655 insertions(+), 28 deletions(-)

Patch

diff --git a/Documentation/ref/ovsdb-server.7.rst b/Documentation/ref/ovsdb-server.7.rst
index 14c7da8..4bbe232 100644
--- a/Documentation/ref/ovsdb-server.7.rst
+++ b/Documentation/ref/ovsdb-server.7.rst
@@ -364,7 +364,79 @@  Initial views of rows are not presented in update2 notifications, but in the
 response object to the ``monitor_cond`` request.  The formatting of the
 <table-updates2> object, however, is the same in either case.
 
-4.1.15 Get Server ID
+4.1.15 Monitor_cond_since
+-------------------------
+
+A new monitor method added in Open vSwitch version 2.12.  The
+``monitor_cond_since`` request enables a client to request changes that
+happened after a specific transaction id. A client can use this feature to
+request only latest changes after a server connection reset instead of
+re-transfer all data from the server again.
+
+The ``monitor_cond`` method described in Section 4.1.12 also applies to
+``monitor_cond_since``, with the following exceptions:
+
+* RPC request method becomes ``monitor_cond_since``.
+
+* Reply result includes extra parameters.
+
+* Subsequent changes are sent to the client using the ``update3`` monitor
+  notification, described in Section 4.1.16
+
+The request object has the following members::
+
+    "method": "monitor_cond_since"
+    "params": [<db-name>, <json-value>, <monitor-cond-requests>, <last-txn-id>]
+    "id": <nonnull-json-value>
+
+The <last-txn-id> parameter is the transaction id that identifies the latest
+data the client already has, and it requests server to send changes AFTER this
+transaction (exclusive).
+
+All other parameters are the same as ``monitor_cond`` method.
+
+The response object has the following members::
+
+    "result": [<found>, <last-txn-id>, <table-updates2>]
+    "error": null
+    "id": same "id" as request
+
+The <found> is a boolean value that tells if the <last-txn-id> requested by
+client is found in server's history or not. If true, the changes after that
+version up to current is sent. Otherwise, all data is sent.
+
+The <last-txn-id> is the transaction id that identifies the latest transaction
+included in the changes in <table-updates2> of this response, so that client
+can keep tracking.  If there is no change involved in this response, it is the
+same as the <last-txn-id> in the request if <found> is true, or zero uuid if
+<found> is false.  If the server does not support transaction uuid, it will
+be zero uuid as well.
+
+All other parameters are the same as in response object of ``monitor_cond``
+method.
+
+Like in ``monitor_cond``, subsequent changes that match conditions in
+<monitor-cond-request> are automatically sent to the client, but using
+``update3`` monitor notification (see Section 4.1.16), instead of ``update2``.
+
+4.1.16 Update3 notification
+---------------------------
+
+The ``update3`` notification is sent by the server to the client to report
+changes in tables that are being monitored following a ``monitor_cond_since``
+request as described above. The notification has the following members::
+
+    "method": "update3"
+    "params": [<json-value>, <last-txn-id>, <table-updates2>]
+    "id": null
+
+The <last-txn-id> is the same as described in the response object of
+``monitor_cond_since``.
+
+All other parameters are the same as in ``update2`` monitor notification (see
+Section 4.1.14).
+
+4.1.17 Get Server ID
 --------------------
 
 A new RPC method added in Open vSwitch version 2.7.  The request contains the
@@ -384,7 +456,7 @@  The response object contains the following members::
 running OVSDB server process.  A fresh UUID is generated when the process
 restarts.
 
-4.1.16 Database Change Awareness
+4.1.18 Database Change Awareness
 --------------------------------
 
 RFC 7047 does not provide a way for a client to find out about some kinds of
@@ -414,7 +486,7 @@  The reply is always the same::
     "error": null
     "id": same "id" as request
 
-4.1.17 Schema Conversion
+4.1.19 Schema Conversion
 ------------------------
 
 Open vSwitch 2.9 adds a new JSON-RPC request to convert an online database from
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index f9b7c27..0f8024c 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -986,13 +986,19 @@  ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
             ovsdb_jsonrpc_trigger_create(s, db, request);
         }
     } else if (!strcmp(request->method, "monitor") ||
-               (monitor_cond_enable__ && !strcmp(request->method,
-                                                 "monitor_cond"))) {
+               (monitor_cond_enable__ &&
+                (!strcmp(request->method, "monitor_cond") ||
+                 !strcmp(request->method, "monitor_cond_since")))) {
         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
         if (!reply) {
-            int l = strlen(request->method) - strlen("monitor");
-            enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
-                                                   : OVSDB_MONITOR_V1;
+            enum ovsdb_monitor_version version;
+            if (!strcmp(request->method, "monitor")) {
+                version = OVSDB_MONITOR_V1;
+            } else if (!strcmp(request->method, "monitor_cond")) {
+                version = OVSDB_MONITOR_V2;
+            } else {
+                version = OVSDB_MONITOR_V3;
+            }
             reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
                                                  version, request->id);
         }
@@ -1364,7 +1370,8 @@  ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     struct shash_node *node;
     struct json *json;
 
-    if (json_array(params)->n != 3) {
+    if ((version == OVSDB_MONITOR_V2 && json_array(params)->n != 3) ||
+        (version == OVSDB_MONITOR_V3 && json_array(params)->n != 4)) {
         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
         goto error;
     }
@@ -1385,7 +1392,7 @@  ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     m->session = s;
     m->db = db;
     m->dbmon = ovsdb_monitor_create(db, m);
-    if (version == OVSDB_MONITOR_V2) {
+    if (version == OVSDB_MONITOR_V2 || version == OVSDB_MONITOR_V3) {
         m->condition = ovsdb_monitor_session_condition_create();
     }
     m->version = version;
@@ -1444,9 +1451,42 @@  ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         ovsdb_monitor_condition_bind(m->dbmon, m->condition);
     }
 
-    ovsdb_monitor_get_initial(m->dbmon, &m->change_set);
-    json = ovsdb_jsonrpc_monitor_compose_update(m, true);
+    bool initial = false;
+    if (version == OVSDB_MONITOR_V3) {
+        struct json *last_id = params->array.elems[3];
+        if (last_id->type != JSON_STRING) {
+            error = ovsdb_syntax_error(last_id, NULL,
+                                       "last_version_id must be string");
+            goto error;
+        }
+        struct uuid txn_uuid;
+        if (!uuid_from_string(&txn_uuid, last_id->string)) {
+            error = ovsdb_syntax_error(last_id, NULL,
+                                       "last_version_id must be UUID format.");
+            goto error;
+        }
+        if (!uuid_is_zero(&txn_uuid)) {
+            ovsdb_monitor_get_changes_after(&txn_uuid, m->dbmon,
+                                            &m->change_set);
+        }
+    }
+    if (!m->change_set) {
+        ovsdb_monitor_get_initial(m->dbmon, &m->change_set);
+        initial = true;
+    }
+    json = ovsdb_jsonrpc_monitor_compose_update(m, initial);
     json = json ? json : json_object_create();
+
+    if (m->version == OVSDB_MONITOR_V3) {
+        struct json *json_last_id = json_string_create_nocopy(
+                xasprintf(UUID_FMT,
+                          UUID_ARGS(ovsdb_monitor_get_last_txnid(
+                                  m->dbmon))));
+
+        struct json *json_found = json_boolean_create(!initial);
+        json = json_array_create_3(json_found, json_last_id, json);
+    }
+
     return jsonrpc_create_reply(json, request_id);
 
 error:
@@ -1580,8 +1620,17 @@  ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
     if (update_json) {
         struct jsonrpc_msg *msg;
         struct json *p;
-
-        p = json_array_create_2(json_clone(m->monitor_id), update_json);
+        if (m->version == OVSDB_MONITOR_V3) {
+            struct json *json_last_id = json_string_create_nocopy(
+                    xasprintf(UUID_FMT,
+                              UUID_ARGS(ovsdb_monitor_get_last_txnid(
+                                      m->dbmon))));
+
+            p = json_array_create_3(json_clone(m->monitor_id), json_last_id,
+                                    update_json);
+        } else {
+            p = json_array_create_2(json_clone(m->monitor_id), update_json);
+        }
         msg = ovsdb_jsonrpc_create_notify(m, p);
         jsonrpc_session_send(s->js, msg);
     }
@@ -1702,6 +1751,9 @@  ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
     case OVSDB_MONITOR_V2:
         method = "update2";
         break;
+    case OVSDB_MONITOR_V3:
+        method = "update3";
+        break;
     case OVSDB_MONITOR_VERSION_MAX:
     default:
         OVS_NOT_REACHED();
@@ -1728,8 +1780,17 @@  ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
         if (json) {
             struct jsonrpc_msg *msg;
             struct json *params;
+            if (m->version == OVSDB_MONITOR_V3) {
+                struct json *json_last_id = json_string_create_nocopy(
+                        xasprintf(UUID_FMT,
+                                  UUID_ARGS(ovsdb_monitor_get_last_txnid(
+                                          m->dbmon))));
+                params = json_array_create_3(json_clone(m->monitor_id),
+                                             json_last_id, json);
+            } else {
+                params = json_array_create_2(json_clone(m->monitor_id), json);
+            }
 
-            params = json_array_create_2(json_clone(m->monitor_id), json);
             msg = ovsdb_jsonrpc_create_notify(m, params);
             jsonrpc_session_send(s->js, msg);
         }
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index b41054d..6936dc1 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -136,6 +136,9 @@  struct ovsdb_monitor_change_set {
     struct ovs_list change_set_for_tables;
 
     int n_refs;
+
+    /* The previous txn id before this change set's start point. */
+    struct uuid prev_txn;
 };
 
 /* Contains 'struct ovsdb_monitor_row's for rows in a specific table
@@ -193,7 +196,9 @@  typedef struct json *
 
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *);
 static struct ovsdb_monitor_change_set * ovsdb_monitor_add_change_set(
-        struct ovsdb_monitor *, bool init_only);
+        struct ovsdb_monitor *, bool init_only, const struct uuid *prev_txn);
+static struct ovsdb_monitor_change_set * ovsdb_monitor_find_change_set(
+        const struct ovsdb_monitor *, const struct uuid *prev_txn);
 static void ovsdb_monitor_change_set_destroy(
         struct ovsdb_monitor_change_set *);
 static void ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *);
@@ -505,13 +510,14 @@  ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
 
 static struct ovsdb_monitor_change_set *
 ovsdb_monitor_add_change_set(struct ovsdb_monitor *dbmon,
-                             bool init_only)
+                             bool init_only, const struct uuid *prev_txn)
 {
     struct ovsdb_monitor_change_set *change_set = xzalloc(sizeof *change_set);
     change_set->uuid = uuid_random();
     ovs_list_push_back(&(dbmon->change_sets), &change_set->list_node);
     ovs_list_init(&change_set->change_set_for_tables);
     change_set->n_refs = 1;
+    change_set->prev_txn = prev_txn ? *prev_txn : UUID_ZERO;
 
     struct shash_node *node;
     SHASH_FOR_EACH (node, &dbmon->tables) {
@@ -531,6 +537,19 @@  ovsdb_monitor_add_change_set(struct ovsdb_monitor *dbmon,
     return change_set;
 };
 
+static struct ovsdb_monitor_change_set *
+ovsdb_monitor_find_change_set(const struct ovsdb_monitor *dbmon,
+                              const struct uuid *prev_txn)
+{
+    struct ovsdb_monitor_change_set *cs;
+    LIST_FOR_EACH (cs, list_node, &dbmon->change_sets) {
+        if (uuid_equals(&cs->prev_txn, prev_txn)) {
+            return cs;
+        }
+    }
+    return NULL;
+}
+
 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
 static void
 ovsdb_monitor_untrack_change_set(struct ovsdb_monitor *dbmon,
@@ -558,7 +577,8 @@  ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *dbmon)
     if (change_set) {
         change_set->n_refs++;
     } else {
-        change_set = ovsdb_monitor_add_change_set(dbmon, false);
+        change_set = ovsdb_monitor_add_change_set(dbmon, false,
+                                 ovsdb_monitor_get_last_txnid(dbmon));
         dbmon->new_change_set = change_set;
     }
 }
@@ -1150,12 +1170,13 @@  ovsdb_monitor_get_update(
                                             condition,
                                             ovsdb_monitor_compose_row_update);
         } else {
-            ovs_assert(version == OVSDB_MONITOR_V2);
+            ovs_assert(version == OVSDB_MONITOR_V2 ||
+                       version == OVSDB_MONITOR_V3);
+
             if (!cond_updated) {
                 json = ovsdb_monitor_compose_update(dbmon, initial, mcs,
                                             condition,
                                             ovsdb_monitor_compose_row_update2);
-
                 if (!condition || !condition->conditional) {
                     ovsdb_monitor_json_cache_insert(dbmon, version, mcs,
                                                     json);
@@ -1394,7 +1415,7 @@  ovsdb_monitor_get_initial(struct ovsdb_monitor *dbmon,
 {
     if (!dbmon->init_change_set) {
         struct ovsdb_monitor_change_set *change_set =
-            ovsdb_monitor_add_change_set(dbmon, true);
+            ovsdb_monitor_add_change_set(dbmon, true, NULL);
         dbmon->init_change_set = change_set;
 
         struct ovsdb_monitor_change_set_for_table *mcst;
@@ -1413,6 +1434,70 @@  ovsdb_monitor_get_initial(struct ovsdb_monitor *dbmon,
     *p_mcs = dbmon->init_change_set;
 }
 
+static bool
+ovsdb_monitor_history_change_cb(const struct ovsdb_row *old,
+                        const struct ovsdb_row *new,
+                        const unsigned long int *changed,
+                        void *aux)
+{
+    struct ovsdb_monitor_change_set *change_set = aux;
+    struct ovsdb_table *table = new ? new->table : old->table;
+    struct ovsdb_monitor_change_set_for_table *mcst;
+
+    enum ovsdb_monitor_selection type =
+        ovsdb_monitor_row_update_type(false, old, new);
+    LIST_FOR_EACH (mcst, list_in_change_set,
+                   &change_set->change_set_for_tables) {
+        if (mcst->mt->table == table) {
+            enum ovsdb_monitor_changes_efficacy efficacy =
+                ovsdb_monitor_changes_classify(type, mcst->mt, changed);
+            if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
+                ovsdb_monitor_changes_update(old, new, mcst->mt, mcst);
+            }
+            return true;
+        }
+    }
+    return false;
+}
+
+void
+ovsdb_monitor_get_changes_after(const struct uuid *txn_uuid,
+                                struct ovsdb_monitor *dbmon,
+                                struct ovsdb_monitor_change_set **p_mcs)
+{
+    ovs_assert(*p_mcs == NULL);
+    ovs_assert(!uuid_is_zero(txn_uuid));
+    struct ovsdb_monitor_change_set *change_set =
+        ovsdb_monitor_find_change_set(dbmon, txn_uuid);
+    if (change_set) {
+        change_set->n_refs++;
+        *p_mcs = change_set;
+        return;
+    }
+
+    struct ovsdb_txn_history_node *h_node;
+    bool found = false;
+    LIST_FOR_EACH (h_node, node, &dbmon->db->txn_history) {
+        struct ovsdb_txn *txn = h_node->txn;
+        if (!found) {
+            /* find the txn with last_id in history */
+            if (uuid_equals(ovsdb_txn_get_txnid(txn), txn_uuid)) {
+                found = true;
+                if (!change_set) {
+                    change_set = ovsdb_monitor_add_change_set(dbmon, false,
+                                                              txn_uuid);
+                    *p_mcs = change_set;
+                }
+            }
+        } else {
+            /* Already found. Add changes in each follow up transaction to
+             * the new change_set. */
+            ovsdb_txn_for_each_change(txn, ovsdb_monitor_history_change_cb,
+                                      change_set);
+        }
+    }
+}
+
 void
 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
@@ -1653,3 +1738,15 @@  ovsdb_monitor_prereplace_db(struct ovsdb *db)
         }
     }
 }
+
+const struct uuid *
+ovsdb_monitor_get_last_txnid(struct ovsdb_monitor *dbmon) {
+    static struct uuid dummy = { .parts = { 0, 0, 0, 0 } };
+    if (dbmon->db->n_txn_history) {
+        struct ovsdb_txn_history_node *thn = CONTAINER_OF(
+                ovs_list_back(&dbmon->db->txn_history),
+                struct ovsdb_txn_history_node, node);
+        return ovsdb_txn_get_txnid(thn->txn);
+    }
+    return &dummy;
+}
diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
index 112c672..1ac9aaf 100644
--- a/ovsdb/monitor.h
+++ b/ovsdb/monitor.h
@@ -39,6 +39,8 @@  enum ovsdb_monitor_version {
       OVSDB_MONITOR_V1,         /* RFC 7047 "monitor" method. */
       OVSDB_MONITOR_V2,         /* Extension to RFC 7047, see ovsdb-server
                                    man page for details. */
+      OVSDB_MONITOR_V3,         /* Extension to V2, see ovsdb-server man
+                                   page for details. */
 
       /* Last entry.  */
       OVSDB_MONITOR_VERSION_MAX
@@ -80,6 +82,8 @@  struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *,
                                       enum ovsdb_monitor_version,
                                       struct ovsdb_monitor_change_set **p_mcs);
 
+const struct uuid *ovsdb_monitor_get_last_txnid(struct ovsdb_monitor *);
+
 void ovsdb_monitor_table_add_select(struct ovsdb_monitor *,
                                     const struct ovsdb_table *,
                                     enum ovsdb_monitor_selection);
@@ -89,6 +93,8 @@  bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *,
 
 void ovsdb_monitor_get_initial(struct ovsdb_monitor *,
                                struct ovsdb_monitor_change_set **);
+void ovsdb_monitor_get_changes_after(const struct uuid *txn_uuid,
+        struct ovsdb_monitor *, struct ovsdb_monitor_change_set **);
 
 void ovsdb_monitor_get_memory_usage(struct simap *);
 
diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
index 0215357..9ae15e5 100644
--- a/ovsdb/ovsdb-client.c
+++ b/ovsdb/ovsdb-client.c
@@ -430,6 +430,13 @@  usage(void)
            "    DATABASE on SERVER.\n"
            "    COLUMNs may include !initial, !insert, !delete, !modify\n"
            "    to avoid seeing the specified kinds of changes.\n"
+           "\n  monitor-cond-since [SERVER] [DATABASE] [LASTID] CONDITION TABLE [COLUMN,...]...\n"
+           "    monitor contents that match CONDITION of COLUMNs in TABLE in\n"
+           "    DATABASE on SERVER, since change after LASTID.\n"
+           "    LASTID specifies transaction ID after which the monitoring\n"
+           "    starts, which works only for cluster mode. If ignored, it\n"
+           "    defaults to an all-zero uuid.\n"
+           "    Other arguments are the same as in monitor-cond.\n"
            "\n  convert [SERVER] SCHEMA\n"
            "    convert database on SERVER named in SCHEMA to SCHEMA.\n"
            "\n  needs-conversion [SERVER] SCHEMA\n"
@@ -1127,6 +1134,35 @@  monitor2_print(struct json *table_updates2,
 }
 
 static void
+monitor3_print(struct json *result,
+               const struct monitored_table *mts, size_t n_mts)
+{
+    if (result->type != JSON_ARRAY) {
+        ovs_error(0, "<result> is not array");
+    }
+
+    if (result->array.n != 3) {
+        ovs_error(0, "<result> should have 3 elements, but has %"PRIuSIZE".",
+                  result->array.n);
+    }
+
+    bool found = json_boolean(result->array.elems[0]);
+    const char *last_id = json_string(result->array.elems[1]);
+    printf("found: %s, last_id: %s\n", found ? "true" : "false", last_id);
+
+    struct json *table_updates2 = result->array.elems[2];
+    monitor2_print(table_updates2, mts, n_mts);
+}
+
+static void
+monitor3_notify_print(const char *last_id, struct json *table_updates2,
+                      const struct monitored_table *mts, size_t n_mts)
+{
+    printf("\nlast_id: %s", last_id);
+    monitor2_print(table_updates2, mts, n_mts);
+}
+
+static void
 add_column(const char *server, const struct ovsdb_column *column,
            struct ovsdb_column_set *columns, struct json *columns_json)
 {
@@ -1333,7 +1369,8 @@  destroy_monitored_table(struct monitored_table *mts, size_t n)
 static void
 do_monitor__(struct jsonrpc *rpc, const char *database,
              enum ovsdb_monitor_version version,
-             int argc, char *argv[], struct json *condition)
+             int argc, char *argv[], struct json *condition,
+             const struct uuid *last_id)
 {
     const char *server = jsonrpc_get_name(rpc);
     const char *table_name = argv[0];
@@ -1411,8 +1448,24 @@  do_monitor__(struct jsonrpc *rpc, const char *database,
 
     monitor = json_array_create_3(json_string_create(database),
                                   json_null_create(), monitor_requests);
-    const char *method = version == OVSDB_MONITOR_V2 ? "monitor_cond"
-                                                     : "monitor";
+    const char *method;
+    switch (version) {
+    case OVSDB_MONITOR_V1:
+        method = "monitor";
+        break;
+    case OVSDB_MONITOR_V2:
+        method = "monitor_cond";
+        break;
+    case OVSDB_MONITOR_V3:
+        method = "monitor_cond_since";
+        struct json *json_last_id = json_string_create_nocopy(
+                xasprintf(UUID_FMT, UUID_ARGS(last_id)));
+        json_array_add(monitor, json_last_id);
+        break;
+    case OVSDB_MONITOR_VERSION_MAX:
+    default:
+        OVS_NOT_REACHED();
+    }
 
     struct jsonrpc_msg *request;
     request = jsonrpc_create_request(method, monitor, NULL);
@@ -1444,6 +1497,9 @@  do_monitor__(struct jsonrpc *rpc, const char *database,
                 case OVSDB_MONITOR_V2:
                     monitor2_print(msg->result, mts, n_mts);
                     break;
+                case OVSDB_MONITOR_V3:
+                    monitor3_print(msg->result, mts, n_mts);
+                    break;
                 case OVSDB_MONITOR_VERSION_MAX:
                 default:
                     OVS_NOT_REACHED();
@@ -1470,6 +1526,17 @@  do_monitor__(struct jsonrpc *rpc, const char *database,
                     fflush(stdout);
                 }
             } else if (msg->type == JSONRPC_NOTIFY
+                       && version == OVSDB_MONITOR_V3
+                       && !strcmp(msg->method, "update3")) {
+                struct json *params = msg->params;
+                if (params->type == JSON_ARRAY
+                    && params->array.n == 3
+                    && params->array.elems[0]->type == JSON_NULL) {
+                    monitor3_notify_print(json_string(params->array.elems[1]),
+                                          params->array.elems[2], mts, n_mts);
+                    fflush(stdout);
+                }
+            } else if (msg->type == JSONRPC_NOTIFY
                        && !strcmp(msg->method, "monitor_canceled")) {
                 ovs_fatal(0, "%s: %s database was removed",
                           server, database);
@@ -1500,12 +1567,13 @@  static void
 do_monitor(struct jsonrpc *rpc, const char *database,
            int argc, char *argv[])
 {
-    do_monitor__(rpc, database, OVSDB_MONITOR_V1, argc, argv, NULL);
+    do_monitor__(rpc, database, OVSDB_MONITOR_V1, argc, argv, NULL, NULL);
 }
 
 static void
-do_monitor_cond(struct jsonrpc *rpc, const char *database,
-           int argc, char *argv[])
+do_monitor_cond__(struct jsonrpc *rpc, const char *database,
+                  enum ovsdb_monitor_version version,
+                  struct uuid *last_id, int argc, char *argv[])
 {
     struct ovsdb_condition cnd;
     struct json *condition = NULL;
@@ -1524,10 +1592,31 @@  do_monitor_cond(struct jsonrpc *rpc, const char *database,
     check_ovsdb_error(ovsdb_condition_from_json(table, condition,
                                                     NULL, &cnd));
     ovsdb_condition_destroy(&cnd);
-    do_monitor__(rpc, database, OVSDB_MONITOR_V2, --argc, ++argv, condition);
+    do_monitor__(rpc, database, version, --argc, ++argv, condition,
+                 last_id);
     ovsdb_schema_destroy(schema);
 }
 
+static void
+do_monitor_cond(struct jsonrpc *rpc, const char *database,
+                int argc, char *argv[])
+{
+    do_monitor_cond__(rpc, database, OVSDB_MONITOR_V2, NULL, argc, argv);
+}
+
+static void
+do_monitor_cond_since(struct jsonrpc *rpc, const char *database,
+           int argc, char *argv[])
+{
+    ovs_assert(argc > 1);
+    struct uuid last_id;
+    if (uuid_from_string(&last_id, argv[0])) {
+        argc--;
+        argv++;
+    }
+    do_monitor_cond__(rpc, database, OVSDB_MONITOR_V3, &last_id, argc, argv);
+}
+
 static bool
 is_database_clustered(struct jsonrpc *rpc, const char *database)
 {
@@ -2409,6 +2498,7 @@  static const struct ovsdb_client_command all_commands[] = {
     { "query",              NEED_NONE,     1, 2,       do_query },
     { "monitor",            NEED_DATABASE, 1, INT_MAX, do_monitor },
     { "monitor-cond",       NEED_DATABASE, 2, 3,       do_monitor_cond },
+    { "monitor-cond-since", NEED_DATABASE, 2, 4,       do_monitor_cond_since },
     { "wait",               NEED_NONE,     2, 3,       do_wait },
     { "convert",            NEED_NONE,     1, 2,       do_convert },
     { "needs-conversion",   NEED_NONE,     1, 2,       do_needs_conversion },
diff --git a/ovsdb/transaction.c b/ovsdb/transaction.c
index 3485383..0081840 100644
--- a/ovsdb/transaction.c
+++ b/ovsdb/transaction.c
@@ -120,6 +120,12 @@  ovsdb_txn_set_txnid(const struct uuid *txnid, struct ovsdb_txn *txn)
     txn->txnid = *txnid;
 }
 
+const struct uuid *
+ovsdb_txn_get_txnid(const struct ovsdb_txn *txn)
+{
+    return &txn->txnid;
+}
+
 static void
 ovsdb_txn_free(struct ovsdb_txn *txn)
 {
diff --git a/ovsdb/transaction.h b/ovsdb/transaction.h
index c601d47..929520e 100644
--- a/ovsdb/transaction.h
+++ b/ovsdb/transaction.h
@@ -26,6 +26,7 @@  struct uuid;
 
 struct ovsdb_txn *ovsdb_txn_create(struct ovsdb *);
 void ovsdb_txn_set_txnid(const struct uuid *, struct ovsdb_txn *);
+const struct uuid *ovsdb_txn_get_txnid(const struct ovsdb_txn *);
 void ovsdb_txn_abort(struct ovsdb_txn *);
 
 struct ovsdb_error *ovsdb_txn_replay_commit(struct ovsdb_txn *)
diff --git a/tests/ovsdb-monitor.at b/tests/ovsdb-monitor.at
index dca7cad..657858b 100644
--- a/tests/ovsdb-monitor.at
+++ b/tests/ovsdb-monitor.at
@@ -83,6 +83,7 @@  m4_define([OVSDB_CHECK_MONITOR],
 m4_define([OVSDB_CHECK_MONITOR_COND],
   [AT_SETUP([$1])
    AT_KEYWORDS([ovsdb server monitor monitor-cond positive $10])
+   echo $10
    $2 > schema
    AT_CHECK([ovsdb-tool create db schema], [0], [stdout], [ignore])
    for txn in m4_foreach([txn], [$3], ['txn' ]); do
@@ -589,3 +590,296 @@  row,action,name,number,_version
    [[[["name","==","one"]]]],
     [[[false]]],
     [[[true]]]])
+
+# Test monitor-cond-since with zero uuid, which shouldn't
+# be found in server and server should send all rows
+# as initial.
+AT_SETUP([monitor-cond-since not found])
+AT_KEYWORDS([ovsdb server monitor monitor-cond-since positive])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create-cluster db schema unix:db.raft], [0], [stdout], [ignore])
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0], [ignore], [ignore])
+done
+
+# Omitting the last_id parameter in ovsdb-client monitor-cond-since command
+# will by default using all zero uuid, which doesn't exist in any history txn.
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+on_exit 'kill `cat ovsdb-client.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 10, "name": "ten"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 11, "name": "eleven"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0],
+           [ignore], [ignore], [kill `cat server-pid client-pid`])
+done
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+         [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0],
+         [[found: false, last_id: <0>
+row,action,name,number,_version
+<1>,initial,"""one""",1,"[""uuid"",""<2>""]"
+
+last_id: <3>
+row,action,name,number,_version
+<4>,insert,"""ten""",10,"[""uuid"",""<5>""]"
+]], [ignore])
+AT_CLEANUP
+
+# Test monitor-cond-since in ovsdb server restart scenario.
+# ovsdb-client should receive only new changes after the
+# specific transaction id.
+AT_SETUP([monitor-cond-since db restart])
+AT_KEYWORDS([ovsdb server monitor monitor-cond-since positive])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create-cluster db schema unix:db.raft], [0], [stdout], [ignore])
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0], [ignore], [ignore])
+done
+
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+on_exit 'kill `cat ovsdb-client.pid`'
+OVS_WAIT_UNTIL([grep last_id output])
+
+kill `cat ovsdb-client.pid`
+kill `cat ovsdb-server.pid`
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+
+# Remember the last_id, which will be used for monitor-cond-since later.
+last_id=`grep last_id output | awk '{print $4}'`
+
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+
+# Some new changes made to db after restarting the server.
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 10, "name": "ten"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 11, "name": "eleven"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0],
+           [ignore], [ignore], [kill `cat server-pid client-pid`])
+done
+
+# Use last_id to monitor and get only the new changes.
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals $last_id '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+         [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0],
+         [[found: true, last_id: <0>
+row,action,name,number,_version
+<1>,insert,"""ten""",10,"[""uuid"",""<2>""]"
+]], [ignore])
+AT_CLEANUP
+
+# Test monitor-cond-since with last_id found in server
+# but there is no new change after that transaction.
+AT_SETUP([monitor-cond-since found but no new rows])
+AT_KEYWORDS([ovsdb server monitor monitor-cond-since positive])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create-cluster db schema unix:db.raft], [0], [stdout], [ignore])
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0], [ignore], [ignore])
+done
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+on_exit 'kill `cat ovsdb-client.pid`'
+OVS_WAIT_UNTIL([grep last_id output])
+
+kill `cat ovsdb-client.pid`
+OVS_WAIT_UNTIL([test ! -e ovsdb-client.pid])
+last_id=`grep last_id output | awk '{print $4}'`
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals $last_id '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+         [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0],
+         [[found: true, last_id: <0>
+]], [ignore])
+AT_CLEANUP
+
+# Test monitor-cond-since against empty DB
+AT_SETUP([monitor-cond-since empty db])
+AT_KEYWORDS([ovsdb server monitor monitor-cond-since positive])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create-cluster db schema unix:db.raft], [0], [stdout], [ignore])
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+on_exit 'kill `cat ovsdb-client.pid`'
+OVS_WAIT_UNTIL([grep last_id output])
+
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+         [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0],
+         [[found: false, last_id: <0>
+]], [ignore])
+AT_CLEANUP
+
+# Test monitor-cond-since with cond-change followed.
+AT_SETUP([monitor-cond-since condition change])
+AT_KEYWORDS([ovsdb server monitor monitor-cond-since positive])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create-cluster db schema unix:db.raft], [0], [stdout], [ignore])
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0], [ignore], [ignore])
+done
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals '[[]]' ordinals > output], [0], [ignore], [ignore])
+on_exit 'kill `cat ovsdb-client.pid`'
+for cond in m4_foreach([cond],
+                       [[[[["name","==","one"],["name","==","two"]]]],
+                        [[[["name","==","one"]]]],
+                        [[[false]]],
+                        [[[true]]]], ['cond' ]); do
+  AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/cond_change ordinals "$cond"], [0], [ignore], [ignore])
+done
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+         [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0],
+         [[found: false, last_id: <0>
+row,action,name,number,_version
+<1>,initial,"""one""",1,"[""uuid"",""<2>""]"
+<3>,initial,"""two""",2,"[""uuid"",""<4>""]"
+<5>,initial,"""zero""",,"[""uuid"",""<6>""]"
+
+last_id: <0>
+row,action,name,number,_version
+<5>,delete,,,
+
+last_id: <0>
+row,action,name,number,_version
+<3>,delete,,,
+
+last_id: <0>
+row,action,name,number,_version
+<1>,delete,,,
+
+last_id: <0>
+row,action,name,number,_version
+<1>,insert,"""one""",1,"[""uuid"",""<2>""]"
+<3>,insert,"""two""",2,"[""uuid"",""<4>""]"
+<5>,insert,"""zero""",,"[""uuid"",""<6>""]"
+]], [ignore])
+AT_CLEANUP
+
+# Test monitor-cond-since with non-cluster mode server
+AT_SETUP([monitor-cond-since non-cluster])
+AT_KEYWORDS([ovsdb server monitor monitor-cond-since positive])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create db schema], [0], [stdout], [ignore])
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0], [ignore], [ignore])
+done
+
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond-since --format=csv unix:socket ordinals '[[["name","==","one"],["name","==","ten"]]]' ordinals > output],
+       [0], [ignore], [ignore])
+on_exit 'kill `cat ovsdb-client.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 10, "name": "ten"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 11, "name": "eleven"}}]]]], ['txn' ]); do
+  AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0],
+           [ignore], [ignore], [kill `cat server-pid client-pid`])
+done
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+         [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+
+# Transaction shouldn't be found, and last_id returned should always
+# be the same (all zero uuid)
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0],
+         [[found: false, last_id: <0>
+row,action,name,number,_version
+<1>,initial,"""one""",1,"[""uuid"",""<2>""]"
+
+last_id: <0>
+row,action,name,number,_version
+<3>,insert,"""ten""",10,"[""uuid"",""<4>""]"
+]], [ignore])
+AT_CLEANUP
+