@@ -13,6 +13,7 @@
# limitations under the License.
import collections
+import enum
import functools
import uuid
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
+OVSDB_UPDATE3 = 2
CLUSTERED = "clustered"
RELAY = "relay"
@@ -45,6 +47,60 @@ Notice = collections.namedtuple('Notice', ('event', 'row', 'updates'))
Notice.__new__.__defaults__ = (None,) # default updates=None
+class Monitor(enum.IntEnum):
+ monitor = OVSDB_UPDATE
+ monitor_cond = OVSDB_UPDATE2
+ monitor_cond_since = OVSDB_UPDATE3
+
+
+class ConditionState(object):
+ def __init__(self):
+ self._ack_cond = None
+ self._req_cond = None
+ self._new_cond = [True]
+
+ def __iter__(self):
+ return iter([self._new_cond, self._req_cond, self._ack_cond])
+
+ @property
+ def new(self):
+ """The latest freshly initialized condition change"""
+ return self._new_cond
+
+ @property
+ def acked(self):
+ """The last condition change that has been accepted by the server"""
+ return self._ack_cond
+
+ @property
+ def latest(self):
+ """The most recent condition change"""
+ return next(cond for cond in self if cond is not None)
+
+ @staticmethod
+ def is_true(condition):
+ return condition == [True]
+
+ def init(self, cond):
+ """Signal that a a condition change is being initiated"""
+ self._new_cond = cond
+
+ def ack(self):
+ """Signal that a condition change has been acked"""
+ if self._req_cond is not None:
+ self._ack_cond, self._req_cond = (self._req_cond, None)
+
+ def request(self):
+ """Signal that a condition change has been requested"""
+ if self._new_cond is not None:
+ self._req_cond, self._new_cond = (self._new_cond, None)
+
+ def reset(self):
+ """Reset a requested condition change back to new"""
+ if self._req_cond is not None and self._new_cond is None:
+ self._new_cond, self._req_cond = (self._req_cond, None)
+
+
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -102,7 +158,13 @@ class Idl(object):
IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
- IDL_S_MONITORING = 5
+ IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
+ IDL_S_MONITORING = 6
+
+ monitor_map = {
+ Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
+ Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
+ Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
def __init__(self, remote, schema_helper, probe_interval=None,
leader_only=True):
@@ -146,10 +208,12 @@ class Idl(object):
remotes = self._parse_remotes(remote)
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
probe_interval=probe_interval)
+ self._request_id = None
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
+ self.last_id = str(uuid.UUID(int=0))
# Server monitor.
self._server_schema_request_id = None
@@ -176,6 +240,9 @@ class Idl(object):
self.txn = None
self._outstanding_txns = {}
+ self.cond_changed = False
+ self.cond_seqno = 0
+
for table in schema.tables.values():
for column in table.columns.values():
if not hasattr(column, 'alert'):
@@ -183,8 +250,7 @@ class Idl(object):
table.need_table = False
table.rows = custom_index.IndexedRows(table)
table.idl = self
- table.condition = [True]
- table.cond_changed = False
+ table.condition = ConditionState()
def _parse_remotes(self, remote):
# If remote is -
@@ -222,6 +288,38 @@ class Idl(object):
update."""
self._session.close()
+ def ack_conditions(self):
+ """Mark all requested table conditions as acked"""
+ for table in self.tables.values():
+ table.condition.ack()
+
+ def sync_conditions(self):
+ """Synchronize condition state when the FSM is restarted
+
+ If a non-zero last_id is available for the DB, then upon reconnect
+ the IDL should first request acked conditions to avoid missing updates
+ about records that were added before the transaction with
+ txn-id == last_id. If there were requested condition changes in flight
+ and the IDL client didn't set new conditions, then reset the requested
+ conditions to new to trigger a follow-up monitor_cond_change request
+ """
+ ack_all = self.last_id == str(uuid.UUID(int=0))
+ for table in self.tables.values():
+ if ack_all:
+ table.condition.request()
+ table.condition.ack()
+ else:
+ table.condition.reset()
+ self.cond_changed = True
+
+ def restart_fsm(self):
+ # Resync data DB table conditions to avoid missing updated due to
+ # conditions that were in flight or changed locally while the
+ # connection was down.
+ self.sync_conditions()
+ self.__send_server_schema_request()
+ self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+
def run(self):
"""Processes a batch of messages from the database server. Returns
True if the database as seen through the IDL changed, False if it did
@@ -256,7 +354,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
- self.__send_server_schema_request()
+ self.restart_fsm()
if self.lock_name:
self.__send_lock_request()
break
@@ -264,8 +362,19 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
+ is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
+ ovs.jsonrpc.Message.T_ERROR)
+
+ if is_response and self._request_id and self._request_id == msg.id:
+ self._request_id = None
+ # process_response follows
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+ and msg.method == "update3"
+ and len(msg.params) == 3):
+ self.__parse_update(msg.params[2], OVSDB_UPDATE3)
+ self.last_id = msg.params[1]
+ elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
# Database contents changed.
@@ -290,11 +399,18 @@ class Idl(object):
try:
self.change_seqno += 1
self._monitor_request_id = None
- self.__clear()
- if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+ if (self.state ==
+ self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
+ # If 'found' is false, clear table rows for new dump
+ if not msg.result[0]:
+ self.__clear()
+ self.__parse_update(msg.result[2], OVSDB_UPDATE3)
+ elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+ self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
+ self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE)
self.state = self.IDL_S_MONITORING
@@ -368,11 +484,17 @@ class Idl(object):
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
# Reply to our echo request. Ignore it.
pass
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self.state == (
+ self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
+ self._monitor_request_id == msg.id):
+ if msg.error == "unknown method":
+ self.__send_monitor_request(Monitor.monitor_cond)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
- self.__send_monitor_request()
+ self.__send_monitor_request(Monitor.monitor)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id):
@@ -388,6 +510,13 @@ class Idl(object):
and self.__txn_process_reply(msg)):
# __txn_process_reply() did everything needed.
pass
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
+ self.state == self.IDL_S_MONITORING):
+ # Mark the last requested conditions as acked and if further
+ # condition changes were pending, send them now.
+ self.ack_conditions()
+ self.send_cond_change()
+ self.cond_seqno += 1
else:
# This can happen if a transaction is destroyed before we
# receive the reply, so keep the log level low.
@@ -397,14 +526,36 @@ class Idl(object):
return initial_change_seqno != self.change_seqno
- def send_cond_change(self):
- if not self._session.is_connected():
+ def compose_cond_change(self):
+ if not self.cond_changed:
return
+ change_requests = {}
for table in self.tables.values():
- if table.cond_changed:
- self.__send_cond_change(table, table.condition)
- table.cond_changed = False
+ # Always use the most recent conditions set by the IDL client when
+ # requesting monitor_cond_change
+ if table.condition.new is not None:
+ change_requests[table.name] = [
+ {"where": table.condition.new}]
+ table.condition.request()
+
+ if not change_requests:
+ return
+
+ self.cond_changed = False
+ old_uuid = str(self.uuid)
+ self.uuid = uuid.uuid1()
+ params = [old_uuid, str(self.uuid), change_requests]
+ return ovs.jsonrpc.Message.create_request(
+ "monitor_cond_change", params)
+
+ def send_cond_change(self):
+ if not self._session.is_connected() or self._request_id is not None:
+ return
+
+ msg = self.compose_cond_change()
+ if msg:
+ self.send_request(msg)
def cond_change(self, table_name, cond):
"""Sets the condition for 'table_name' to 'cond', which should be a
@@ -420,9 +571,16 @@ class Idl(object):
if cond == []:
cond = [False]
- if table.condition != cond:
- table.condition = cond
- table.cond_changed = True
+
+ # Compare the new condition to the last known condition
+ if table.condition.latest != cond:
+ table.condition.init(cond)
+ self.cond_changed = True
+ p = ovs.poller.Poller()
+ p.immediate_wake()
+ return self.cond_seqno + 1
+
+ return self.cond_seqno
def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
@@ -501,14 +659,6 @@ class Idl(object):
to doing nothing to avoid overhead where it is not needed.
"""
- def __send_cond_change(self, table, cond):
- monitor_cond_change = {table.name: [{"where": cond}]}
- old_uuid = str(self.uuid)
- self.uuid = uuid.uuid1()
- params = [old_uuid, str(self.uuid), monitor_cond_change]
- msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
- self._session.send(msg)
-
def __clear(self):
changed = False
@@ -517,6 +667,8 @@ class Idl(object):
changed = True
table.rows = custom_index.IndexedRows(table)
+ self.cond_seqno = 0
+
if changed:
self.change_seqno += 1
@@ -571,11 +723,18 @@ class Idl(object):
self._db_change_aware_request_id = msg.id
self._session.send(msg)
- def __send_monitor_request(self):
- if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
- self.IDL_S_INITIAL]):
+ def send_request(self, request):
+ self._request_id = request.id
+ if self._session.is_connected():
+ return self._session.send(request)
+
+ def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
+ if self.state == self.IDL_S_INITIAL:
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
+ elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
+ self.state = self.monitor_map[Monitor(max_version)]
+ method = Monitor(max_version).name
else:
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
@@ -589,22 +748,24 @@ class Idl(object):
(column not in self.readonly[table.name])):
columns.append(column)
monitor_request = {"columns": columns}
- if method == "monitor_cond" and table.condition != [True]:
- monitor_request["where"] = table.condition
- table.cond_change = False
+ if method in ("monitor_cond", "monitor_cond_since") and (
+ not ConditionState.is_true(table.condition.acked)):
+ monitor_request["where"] = table.condition.acked
monitor_requests[table.name] = [monitor_request]
- msg = ovs.jsonrpc.Message.create_request(
- method, [self._db.name, str(self.uuid), monitor_requests])
+ args = [self._db.name, str(self.uuid), monitor_requests]
+ if method == "monitor_cond_since":
+ args.append(str(self.last_id))
+ msg = ovs.jsonrpc.Message.create_request(method, args)
self._monitor_request_id = msg.id
- self._session.send(msg)
+ self.send_request(msg)
def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id
- self._session.send(msg)
+ self.send_request(msg)
def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
@@ -624,7 +785,7 @@ class Idl(object):
str(self.server_monitor_uuid),
monitor_requests])
self._server_monitor_request_id = msg.id
- self._session.send(msg)
+ self.send_request(msg)
def __parse_update(self, update, version, tables=None):
try:
@@ -668,7 +829,7 @@ class Idl(object):
self.cooperative_yield()
- if version == OVSDB_UPDATE2:
+ if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
changes = self.__process_update2(table, uuid, row_update)
if changes:
notices.append(changes)
@@ -2293,7 +2293,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL],
# Checks that monitor_cond_since works fine when disconnects happen
# with cond_change requests in flight (i.e., IDL is properly updated).
-OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect],
+OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
3,
[['["idltest",
{"op": "insert",
Add support for monitor_cond_since / update3 to python-ovs to allow more efficient reconnections when connecting to clustered OVSDB servers. Signed-off-by: Terry Wilson <twilson@redhat.com> --- python/ovs/db/idl.py | 231 ++++++++++++++++++++++++++++++++++++------- tests/ovsdb-idl.at | 2 +- 2 files changed, 197 insertions(+), 36 deletions(-)