@@ -106,10 +106,10 @@ struct ovsdb_idl_arc {
* connection. The next connection attempt has a chance at \
* picking a connected server. \
* \
- * * Otherwise, sends a "monitor_cond" request for the IDL \
+ * * Otherwise, sends a "monitor_cond_since" request for the IDL \
* database whose details are informed by the schema \
* (obtained from the row), and transitions to \
- * IDL_S_DATA_MONITOR_COND_REQUESTED. \
+ * IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED. \
* \
* - If the reply indicates success, but the Database table does \
* not have a row for the IDL database, transitions to \
@@ -125,6 +125,13 @@ struct ovsdb_idl_arc {
* transitions to IDL_S_DATA_MONITOR_COND_REQUESTED. */ \
OVSDB_IDL_STATE(DATA_SCHEMA_REQUESTED) \
\
+ /* Waits for "monitor_cond_since" reply. If successful, replaces \
+ * the IDL contents by the data carried in the reply and \
+ * transitions to IDL_S_MONITORING. On failure, sends a \
+ * "monitor_cond" request and transitions to \
+ * IDL_S_DATA_MONITOR_COND_REQUESTED. */ \
+ OVSDB_IDL_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \
+ \
/* Waits for "monitor_cond" reply. If successful, replaces the \
* IDL contents by the data carried in the reply and transitions \
* to IDL_S_MONITORING. On failure, sends a "monitor" request \
@@ -136,9 +143,9 @@ struct ovsdb_idl_arc {
* IDL_S_MONITORING. On failure, transitions to IDL_S_ERROR. */ \
OVSDB_IDL_STATE(DATA_MONITOR_REQUESTED) \
\
- /* State that processes "update" or "update2" notifications for \
- * the main database (and the Database table in _Server if \
- * available). \
+ /* State that processes "update", "update2" or "update3" \
+ * notifications for the main database (and the Database table \
+ * in _Server if available). \
* \
* If we're monitoring the Database table and we get notified \
* that the IDL database has been deleted, we close the \
@@ -167,10 +174,18 @@ enum ovsdb_idl_state {
static const char *ovsdb_idl_state_to_string(enum ovsdb_idl_state);
+enum ovsdb_idl_monitor_method {
+ OVSDB_IDL_MM_MONITOR,
+ OVSDB_IDL_MM_MONITOR_COND,
+ OVSDB_IDL_MM_MONITOR_COND_SINCE
+};
+
enum ovsdb_idl_monitoring {
OVSDB_IDL_NOT_MONITORING, /* Database is not being monitored. */
OVSDB_IDL_MONITORING, /* Database has "monitor" outstanding. */
OVSDB_IDL_MONITORING_COND, /* Database has "monitor_cond" outstanding. */
+ OVSDB_IDL_MONITORING_COND_SINCE, /* Database has "monitor_cond_since"
+ outstanding. */
};
struct ovsdb_idl_db {
@@ -220,7 +235,7 @@ static void ovsdb_idl_send_db_change_aware(struct ovsdb_idl *);
static bool ovsdb_idl_check_server_db(struct ovsdb_idl *);
static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *,
struct ovsdb_idl_db *,
- bool use_monitor_cond);
+ enum ovsdb_idl_monitor_method);
struct ovsdb_idl {
struct ovsdb_idl_db server;
@@ -287,8 +302,8 @@ static struct vlog_rate_limit other_rl = VLOG_RATE_LIMIT_INIT(1, 5);
static void ovsdb_idl_clear(struct ovsdb_idl *);
static void ovsdb_idl_db_parse_monitor_reply(struct ovsdb_idl_db *,
- const struct json *result,
- bool is_monitor_cond);
+ const struct json *result,
+ enum ovsdb_idl_monitor_method method);
static bool ovsdb_idl_db_parse_update_rpc(struct ovsdb_idl_db *,
const struct jsonrpc_msg *);
static bool ovsdb_idl_handle_monitor_canceled(struct ovsdb_idl *,
@@ -296,7 +311,7 @@ static bool ovsdb_idl_handle_monitor_canceled(struct ovsdb_idl *,
const struct jsonrpc_msg *);
static void ovsdb_idl_db_parse_update(struct ovsdb_idl_db *,
const struct json *table_updates,
- bool is_monitor_cond);
+ enum ovsdb_idl_monitor_method method);
static bool ovsdb_idl_process_update(struct ovsdb_idl_table *,
const struct uuid *,
const struct json *old,
@@ -674,7 +689,8 @@ ovsdb_idl_process_response(struct ovsdb_idl *idl, struct jsonrpc_msg *msg)
if (!ok
&& idl->state != IDL_S_SERVER_SCHEMA_REQUESTED
&& idl->state != IDL_S_SERVER_MONITOR_COND_REQUESTED
- && idl->state != IDL_S_DATA_MONITOR_COND_REQUESTED) {
+ && idl->state != IDL_S_DATA_MONITOR_COND_REQUESTED
+ && idl->state != IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
char *s = jsonrpc_msg_to_string(msg);
VLOG_INFO_RL(&rl, "%s: received unexpected %s response in "
@@ -692,7 +708,8 @@ ovsdb_idl_process_response(struct ovsdb_idl *idl, struct jsonrpc_msg *msg)
if (ok) {
json_destroy(idl->server.schema);
idl->server.schema = json_clone(msg->result);
- ovsdb_idl_send_monitor_request(idl, &idl->server, true);
+ ovsdb_idl_send_monitor_request(idl, &idl->server,
+ OVSDB_IDL_MM_MONITOR_COND);
ovsdb_idl_transition(idl, IDL_S_SERVER_MONITOR_COND_REQUESTED);
} else {
ovsdb_idl_send_schema_request(idl, &idl->data);
@@ -703,7 +720,8 @@ ovsdb_idl_process_response(struct ovsdb_idl *idl, struct jsonrpc_msg *msg)
case IDL_S_SERVER_MONITOR_COND_REQUESTED:
if (ok) {
idl->server.monitoring = OVSDB_IDL_MONITORING_COND;
- ovsdb_idl_db_parse_monitor_reply(&idl->server, msg->result, true);
+ ovsdb_idl_db_parse_monitor_reply(&idl->server, msg->result,
+ OVSDB_IDL_MM_MONITOR_COND);
if (ovsdb_idl_check_server_db(idl)) {
ovsdb_idl_send_db_change_aware(idl);
}
@@ -716,29 +734,48 @@ ovsdb_idl_process_response(struct ovsdb_idl *idl, struct jsonrpc_msg *msg)
case IDL_S_DATA_SCHEMA_REQUESTED:
json_destroy(idl->data.schema);
idl->data.schema = json_clone(msg->result);
- ovsdb_idl_send_monitor_request(idl, &idl->data, true);
+ ovsdb_idl_send_monitor_request(idl, &idl->data,
+ OVSDB_IDL_MM_MONITOR_COND);
ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_COND_REQUESTED);
break;
+ case IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED:
+ if (!ok) {
+ /* "monitor_cond_since" not supported. Try "monitor_cond". */
+ ovsdb_idl_send_monitor_request(idl, &idl->data,
+ OVSDB_IDL_MM_MONITOR_COND);
+ ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_COND_REQUESTED);
+ } else {
+ idl->data.monitoring = OVSDB_IDL_MONITORING_COND_SINCE;
+ ovsdb_idl_transition(idl, IDL_S_MONITORING);
+ ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result,
+ OVSDB_IDL_MM_MONITOR_COND_SINCE);
+ }
+ break;
+
case IDL_S_DATA_MONITOR_COND_REQUESTED:
if (!ok) {
/* "monitor_cond" not supported. Try "monitor". */
- ovsdb_idl_send_monitor_request(idl, &idl->data, false);
+ ovsdb_idl_send_monitor_request(idl, &idl->data,
+ OVSDB_IDL_MM_MONITOR);
ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_REQUESTED);
} else {
idl->data.monitoring = OVSDB_IDL_MONITORING_COND;
ovsdb_idl_transition(idl, IDL_S_MONITORING);
- ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result, true);
+ ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result,
+ OVSDB_IDL_MM_MONITOR_COND);
}
break;
case IDL_S_DATA_MONITOR_REQUESTED:
idl->data.monitoring = OVSDB_IDL_MONITORING;
ovsdb_idl_transition(idl, IDL_S_MONITORING);
- ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result, false);
+ ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result,
+ OVSDB_IDL_MM_MONITOR);
idl->data.change_seqno++;
ovsdb_idl_clear(idl);
- ovsdb_idl_db_parse_update(&idl->data, msg->result, false);
+ ovsdb_idl_db_parse_update(&idl->data, msg->result,
+ OVSDB_IDL_MM_MONITOR);
break;
case IDL_S_MONITORING:
@@ -1564,7 +1601,7 @@ ovsdb_idl_send_cond_change(struct ovsdb_idl *idl)
* conditional monitoring update request that we have not heard
* from the server yet. Don't generate another request in this case. */
if (!jsonrpc_session_is_connected(idl->session)
- || idl->data.monitoring != OVSDB_IDL_MONITORING_COND
+ || idl->data.monitoring == OVSDB_IDL_MONITORING
|| idl->request_id) {
return;
}
@@ -1860,8 +1897,9 @@ ovsdb_idl_check_server_db(struct ovsdb_idl *idl)
if (idl->state == IDL_S_SERVER_MONITOR_COND_REQUESTED) {
json_destroy(idl->data.schema);
idl->data.schema = json_from_string(database->schema);
- ovsdb_idl_send_monitor_request(idl, &idl->data, true);
- ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_COND_REQUESTED);
+ ovsdb_idl_send_monitor_request(idl, &idl->data,
+ OVSDB_IDL_MM_MONITOR_COND_SINCE);
+ ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED);
}
return true;
}
@@ -1951,7 +1989,7 @@ parse_schema(const struct json *schema_json)
static void
ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl, struct ovsdb_idl_db *db,
- bool use_monitor_cond)
+ enum ovsdb_idl_monitor_method monitor_method)
{
struct shash *schema = parse_schema(db->schema);
struct json *monitor_requests = json_object_create();
@@ -2003,7 +2041,9 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl, struct ovsdb_idl_db *db,
json_object_put(monitor_request, "columns", columns);
const struct ovsdb_idl_condition *cond = &table->condition;
- if (use_monitor_cond && !ovsdb_idl_condition_is_true(cond)) {
+ if ((monitor_method == OVSDB_IDL_MM_MONITOR_COND ||
+ monitor_method == OVSDB_IDL_MM_MONITOR_COND_SINCE) &&
+ !ovsdb_idl_condition_is_true(cond)) {
json_object_put(monitor_request, "where",
ovsdb_idl_condition_to_json(cond));
table->cond_changed = false;
@@ -2016,13 +2056,30 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl, struct ovsdb_idl_db *db,
db->cond_changed = false;
- ovsdb_idl_send_request(
- idl,
- jsonrpc_create_request(
- use_monitor_cond ? "monitor_cond" : "monitor",
- json_array_create_3(json_string_create(db->class_->database),
- json_clone(db->monitor_id), monitor_requests),
- NULL));
+ struct json *params = json_array_create_3(
+ json_string_create(db->class_->database),
+ json_clone(db->monitor_id),
+ monitor_requests);
+ const char *method;
+ switch (monitor_method) {
+ case OVSDB_IDL_MM_MONITOR:
+ method = "monitor";
+ break;
+ case OVSDB_IDL_MM_MONITOR_COND:
+ method = "monitor_cond";
+ break;
+ case OVSDB_IDL_MM_MONITOR_COND_SINCE:
+ method = "monitor_cond_since";
+ struct uuid last_id = UUID_ZERO;
+ struct json *json_last_id = json_string_create_nocopy(
+ xasprintf(UUID_FMT, UUID_ARGS(&last_id)));
+ json_array_add(params, json_last_id);
+ break;
+ default:
+ OVS_NOT_REACHED();
+ }
+
+ ovsdb_idl_send_request(idl, jsonrpc_create_request(method, params, NULL));
}
static void
@@ -2039,30 +2096,71 @@ log_parse_update_error(struct ovsdb_error *error)
static void
ovsdb_idl_db_parse_monitor_reply(struct ovsdb_idl_db *db,
const struct json *result,
- bool is_monitor_cond)
+ enum ovsdb_idl_monitor_method method)
{
db->change_seqno++;
+ const struct json *table_updates = result;
+ if (method == OVSDB_IDL_MM_MONITOR_COND_SINCE) {
+ if (result->type != JSON_ARRAY || result->array.n != 3) {
+ struct ovsdb_error *error = ovsdb_syntax_error(result, NULL,
+ "Response of monitor_cond_since must "
+ "be an array with 3 elements.");
+ log_parse_update_error(error);
+ return;
+ }
+
+ bool found = json_boolean(result->array.elems[0]);
+ const char *last_id = json_string(result->array.elems[1]);
+
+ table_updates = result->array.elems[2];
+
+ }
ovsdb_idl_db_clear(db);
- ovsdb_idl_db_parse_update(db, result, is_monitor_cond);
+ ovsdb_idl_db_parse_update(db, table_updates, method);
}
static bool
ovsdb_idl_db_parse_update_rpc(struct ovsdb_idl_db *db,
const struct jsonrpc_msg *msg)
{
- if (msg->type == JSONRPC_NOTIFY) {
- bool is_update = !strcmp(msg->method, "update");
- bool is_update2 = !strcmp(msg->method, "update2");
- if ((is_update || is_update2)
- && msg->params->type == JSON_ARRAY
- && msg->params->array.n == 2
- && json_equal(msg->params->array.elems[0], db->monitor_id)) {
- ovsdb_idl_db_parse_update(db, msg->params->array.elems[1],
- is_update2);
- return true;
- }
+ if (msg->type != JSONRPC_NOTIFY) {
+ return false;
}
- return false;
+
+ enum ovsdb_idl_monitor_method mm;
+ uint8_t n;
+ if (!strcmp(msg->method, "update")) {
+ mm = OVSDB_IDL_MM_MONITOR;
+ n = 2;
+ } else if (!strcmp(msg->method, "update2")) {
+ mm = OVSDB_IDL_MM_MONITOR_COND;
+ n = 2;
+ } else if (!strcmp(msg->method, "update3")) {
+ mm = OVSDB_IDL_MM_MONITOR_COND_SINCE;
+ n = 3;
+ } else {
+ return false;
+ }
+
+ struct json *params = msg->params;
+ if (params->type != JSON_ARRAY || params->array.n != n) {
+ struct ovsdb_error *error = ovsdb_syntax_error(params, NULL,
+ "%s must be an array with %u elements.",
+ msg->method, n);
+ log_parse_update_error(error);
+ return false;
+ }
+
+ if (!json_equal(params->array.elems[0], db->monitor_id)) {
+ return false;
+ }
+
+ struct json *table_updates = params->array.elems[1];
+ if (!strcmp(msg->method, "update3")) {
+ table_updates = params->array.elems[2];
+ }
+ ovsdb_idl_db_parse_update(db, table_updates, mm);
+ return true;
}
static bool
@@ -2103,10 +2201,21 @@ ovsdb_idl_handle_monitor_canceled(struct ovsdb_idl *idl,
static struct ovsdb_error *
ovsdb_idl_db_parse_update__(struct ovsdb_idl_db *db,
const struct json *table_updates,
- bool is_monitor_cond)
+ enum ovsdb_idl_monitor_method method)
{
const struct shash_node *tables_node;
- const char *version_suffix = is_monitor_cond ? "2" : "";
+ const char *version_suffix;
+ switch (method) {
+ case OVSDB_IDL_MM_MONITOR:
+ version_suffix = "";
+ break;
+ case OVSDB_IDL_MM_MONITOR_COND:
+ case OVSDB_IDL_MM_MONITOR_COND_SINCE:
+ version_suffix = "2";
+ break;
+ default:
+ OVS_NOT_REACHED();
+ }
if (table_updates->type != JSON_OBJECT) {
return ovsdb_syntax_error(table_updates, NULL,
@@ -2155,7 +2264,8 @@ ovsdb_idl_db_parse_update__(struct ovsdb_idl_db *db,
version_suffix, table_node->name);
}
- if (is_monitor_cond) {
+ if (method == OVSDB_IDL_MM_MONITOR_COND ||
+ method == OVSDB_IDL_MM_MONITOR_COND_SINCE) {
const char *ops[] = {"modify", "insert", "delete", "initial"};
const char *operation;
const struct json *row;
@@ -2216,10 +2326,10 @@ ovsdb_idl_db_parse_update__(struct ovsdb_idl_db *db,
static void
ovsdb_idl_db_parse_update(struct ovsdb_idl_db *db,
const struct json *table_updates,
- bool is_monitor_cond)
+ enum ovsdb_idl_monitor_method method)
{
struct ovsdb_error *error = ovsdb_idl_db_parse_update__(db, table_updates,
- is_monitor_cond);
+ method);
if (error) {
log_parse_update_error(error);
}