diff mbox series

[ovs-dev,v5] python: Add monitor_cond_since support

Message ID 20211201175120.3229612-1-twilson@redhat.com
State Accepted
Headers show
Series [ovs-dev,v5] python: Add monitor_cond_since support | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Terry Wilson Dec. 1, 2021, 5:51 p.m. UTC
Add support for monitor_cond_since / update3 to python-ovs to
allow more efficient reconnections when connecting to clustered
OVSDB servers.

Signed-off-by: Terry Wilson <twilson@redhat.com>
---
 python/ovs/db/idl.py | 245 ++++++++++++++++++++++++++++++++++++-------
 tests/ovsdb-idl.at   |   2 +-
 2 files changed, 211 insertions(+), 36 deletions(-)

Comments

Dumitru Ceara Dec. 3, 2021, 12:06 p.m. UTC | #1
On 12/1/21 18:51, Terry Wilson wrote:
> Add support for monitor_cond_since / update3 to python-ovs to
> allow more efficient reconnections when connecting to clustered
> OVSDB servers.
> 
> Signed-off-by: Terry Wilson <twilson@redhat.com>
> ---
>  python/ovs/db/idl.py | 245 ++++++++++++++++++++++++++++++++++++-------
>  tests/ovsdb-idl.at   |   2 +-
>  2 files changed, 211 insertions(+), 36 deletions(-)
> 
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 60e58b03e..0d5e00208 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -13,6 +13,7 @@
>  # limitations under the License.
>  
>  import collections
> +import enum
>  import functools
>  import uuid
>  
> @@ -36,6 +37,7 @@ ROW_DELETE = "delete"
>  
>  OVSDB_UPDATE = 0
>  OVSDB_UPDATE2 = 1
> +OVSDB_UPDATE3 = 2
>  
>  CLUSTERED = "clustered"
>  RELAY = "relay"
> @@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
>          return item in self.keys()
>  
>  
> +class Monitor(enum.IntEnum):
> +    monitor = OVSDB_UPDATE
> +    monitor_cond = OVSDB_UPDATE2
> +    monitor_cond_since = OVSDB_UPDATE3
> +
> +
> +class ConditionState(object):
> +    def __init__(self):
> +        self._ack_cond = None
> +        self._req_cond = None
> +        self._new_cond = [True]
> +
> +    def __iter__(self):
> +        return iter([self._new_cond, self._req_cond, self._ack_cond])
> +
> +    @property
> +    def new(self):
> +        """The latest freshly initialized condition change"""
> +        return self._new_cond
> +
> +    @property
> +    def acked(self):
> +        """The last condition change that has been accepted by the server"""
> +        return self._ack_cond
> +
> +    @property
> +    def requested(self):
> +        """A condition that's been requested, but not acked by the server"""
> +        return self._req_cond
> +
> +    @property
> +    def latest(self):
> +        """The most recent condition change"""
> +        return next(cond for cond in self if cond is not None)
> +
> +    @staticmethod
> +    def is_true(condition):
> +        return condition == [True]
> +
> +    def init(self, cond):
> +        """Signal that a condition change is being initiated"""
> +        self._new_cond = cond
> +
> +    def ack(self):
> +        """Signal that a condition change has been acked"""
> +        if self._req_cond is not None:
> +            self._ack_cond, self._req_cond = (self._req_cond, None)
> +
> +    def request(self):
> +        """Signal that a condition change has been requested"""
> +        if self._new_cond is not None:
> +            self._req_cond, self._new_cond = (self._new_cond, None)
> +
> +    def reset(self):
> +        """Reset a requested condition change back to new"""
> +        if self._req_cond is not None and self._new_cond is None:
> +            self._new_cond, self._req_cond = (self._req_cond, None)
> +
> +
>  class Idl(object):
>      """Open vSwitch Database Interface Definition Language (OVSDB IDL).
>  
> @@ -132,7 +193,13 @@ class Idl(object):
>      IDL_S_SERVER_MONITOR_REQUESTED = 2
>      IDL_S_DATA_MONITOR_REQUESTED = 3
>      IDL_S_DATA_MONITOR_COND_REQUESTED = 4
> -    IDL_S_MONITORING = 5
> +    IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
> +    IDL_S_MONITORING = 6
> +
> +    monitor_map = {
> +        Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
> +        Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
> +        Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
>  
>      def __init__(self, remote, schema_helper, probe_interval=None,
>                   leader_only=True):
> @@ -176,10 +243,12 @@ class Idl(object):
>          remotes = self._parse_remotes(remote)
>          self._session = ovs.jsonrpc.Session.open_multiple(remotes,
>              probe_interval=probe_interval)
> +        self._request_id = None
>          self._monitor_request_id = None
>          self._last_seqno = None
>          self.change_seqno = 0
>          self.uuid = uuid.uuid1()
> +        self.last_id = str(uuid.UUID(int=0))
>  
>          # Server monitor.
>          self._server_schema_request_id = None
> @@ -206,6 +275,9 @@ class Idl(object):
>          self.txn = None
>          self._outstanding_txns = {}
>  
> +        self.cond_changed = False
> +        self.cond_seqno = 0
> +
>          for table in schema.tables.values():
>              for column in table.columns.values():
>                  if not hasattr(column, 'alert'):
> @@ -213,8 +285,7 @@ class Idl(object):
>              table.need_table = False
>              table.rows = custom_index.IndexedRows(table)
>              table.idl = self
> -            table.condition = [True]
> -            table.cond_changed = False
> +            table.condition = ConditionState()
>  
>      def _parse_remotes(self, remote):
>          # If remote is -
> @@ -252,6 +323,38 @@ class Idl(object):
>          update."""
>          self._session.close()
>  
> +    def ack_conditions(self):
> +        """Mark all requested table conditions as acked"""
> +        for table in self.tables.values():
> +            table.condition.ack()
> +
> +    def sync_conditions(self):
> +        """Synchronize condition state when the FSM is restarted
> +
> +        If a non-zero last_id is available for the DB, then upon reconnect
> +        the IDL should first request acked conditions to avoid missing updates
> +        about records that were added before the transaction with
> +        txn-id == last_id. If there were requested condition changes in flight
> +        and the IDL client didn't set new conditions, then reset the requested
> +        conditions to new to trigger a follow-up monitor_cond_change request
> +        """
> +        ack_all = self.last_id == str(uuid.UUID(int=0))
> +        for table in self.tables.values():
> +            if ack_all:
> +                table.condition.request()
> +                table.condition.ack()
> +            else:
> +                table.condition.reset()
> +                self.cond_changed = True
> +
> +    def restart_fsm(self):
> +        # Resync data DB table conditions to avoid missing updated due to
> +        # conditions that were in flight or changed locally while the
> +        # connection was down.
> +        self.sync_conditions()
> +        self.__send_server_schema_request()
> +        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
> +
>      def run(self):
>          """Processes a batch of messages from the database server.  Returns
>          True if the database as seen through the IDL changed, False if it did
> @@ -286,7 +389,7 @@ class Idl(object):
>              if seqno != self._last_seqno:
>                  self._last_seqno = seqno
>                  self.__txn_abort_all()
> -                self.__send_server_schema_request()
> +                self.restart_fsm()
>                  if self.lock_name:
>                      self.__send_lock_request()
>                  break
> @@ -294,8 +397,20 @@ class Idl(object):
>              msg = self._session.recv()
>              if msg is None:
>                  break
> +            is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
> +                                       ovs.jsonrpc.Message.T_ERROR)
> +
> +            if is_response and self._request_id and self._request_id == msg.id:
> +                self._request_id = None
> +                # process_response follows
>  
>              if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> +                    and msg.method == "update3"
> +                    and len(msg.params) == 3):
> +                # Database contents changed.
> +                self.__parse_update(msg.params[2], OVSDB_UPDATE3)
> +                self.last_id = msg.params[1]
> +            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
>                      and msg.method == "update2"
>                      and len(msg.params) == 2):
>                  # Database contents changed.
> @@ -320,11 +435,18 @@ class Idl(object):
>                  try:
>                      self.change_seqno += 1
>                      self._monitor_request_id = None
> -                    self.__clear()
> -                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
> +                    if (self.state ==
> +                            self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
> +                        # If 'found' is false, clear table rows for new dump
> +                        if not msg.result[0]:
> +                            self.__clear()
> +                        self.__parse_update(msg.result[2], OVSDB_UPDATE3)
> +                    elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
> +                        self.__clear()
>                          self.__parse_update(msg.result, OVSDB_UPDATE2)
>                      else:
>                          assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
> +                        self.__clear()
>                          self.__parse_update(msg.result, OVSDB_UPDATE)
>                      self.state = self.IDL_S_MONITORING
>  
> @@ -398,11 +520,17 @@ class Idl(object):
>              elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
>                  # Reply to our echo request.  Ignore it.
>                  pass
> +            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> +                  self.state == (
> +                      self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
> +                      self._monitor_request_id == msg.id):
> +                if msg.error == "unknown method":
> +                    self.__send_monitor_request(Monitor.monitor_cond)
>              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
>                    self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
>                    self._monitor_request_id == msg.id):
>                  if msg.error == "unknown method":
> -                    self.__send_monitor_request()
> +                    self.__send_monitor_request(Monitor.monitor)
>              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
>                    self._server_schema_request_id is not None and
>                    self._server_schema_request_id == msg.id):
> @@ -418,6 +546,13 @@ class Idl(object):
>                    and self.__txn_process_reply(msg)):
>                  # __txn_process_reply() did everything needed.
>                  pass
> +            elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
> +                  self.state == self.IDL_S_MONITORING):
> +                # Mark the last requested conditions as acked and if further
> +                # condition changes were pending, send them now.
> +                self.ack_conditions()
> +                self.send_cond_change()
> +                self.cond_seqno += 1
>              else:
>                  # This can happen if a transaction is destroyed before we
>                  # receive the reply, so keep the log level low.
> @@ -427,14 +562,36 @@ class Idl(object):
>  
>          return initial_change_seqno != self.change_seqno
>  
> -    def send_cond_change(self):
> -        if not self._session.is_connected():
> +    def compose_cond_change(self):
> +        if not self.cond_changed:
>              return
>  
> +        change_requests = {}
>          for table in self.tables.values():
> -            if table.cond_changed:
> -                self.__send_cond_change(table, table.condition)
> -                table.cond_changed = False
> +            # Always use the most recent conditions set by the IDL client when
> +            # requesting monitor_cond_change
> +            if table.condition.new is not None:
> +                change_requests[table.name] = [
> +                    {"where": table.condition.new}]
> +                table.condition.request()
> +
> +        if not change_requests:
> +            return
> +
> +        self.cond_changed = False
> +        old_uuid = str(self.uuid)
> +        self.uuid = uuid.uuid1()
> +        params = [old_uuid, str(self.uuid), change_requests]
> +        return ovs.jsonrpc.Message.create_request(
> +            "monitor_cond_change", params)
> +
> +    def send_cond_change(self):
> +        if not self._session.is_connected() or self._request_id is not None:
> +            return
> +
> +        msg = self.compose_cond_change()
> +        if msg:
> +            self.send_request(msg)
>  
>      def cond_change(self, table_name, cond):
>          """Sets the condition for 'table_name' to 'cond', which should be a
> @@ -450,13 +607,28 @@ class Idl(object):
>  
>          if cond == []:
>              cond = [False]
> -        if table.condition != cond:
> -            table.condition = cond
> -            table.cond_changed = True
> +
> +        # Compare the new condition to the last known condition
> +        if table.condition.latest != cond:
> +            table.condition.init(cond)
> +            self.cond_changed = True
> +
> +        # New condition will be sent out after all already requested ones
> +        # are acked.
> +        if table.condition.new:
> +            any_reqs = any(t.condition.request for t in self.tables.values())
> +            return self.cond_seqno + int(any_reqs) + 1
> +
> +        # Already requested conditions should be up to date at
> +        # self.cond_seqno + 1 while acked conditions are already up to date
> +        return self.cond_seqno + int(bool(table.condition.requested))
>  
>      def wait(self, poller):
>          """Arranges for poller.block() to wake up when self.run() has something
>          to do or when activity occurs on a transaction on 'self'."""
> +        if self.cond_changed:
> +            poller.immediate_wake()
> +            return

Nit: In theory, self._session.wait()/recv_wait() should never do more
than register to be awakened when the appropriate events occur, so
skipping that should be fine (and seems to be with the current code).
But what if that changes in the future?  Ilya, what do you think?

In any case, this works for me today, with or without that "return" there:

Acked-by: Dumitru Ceara <dceara@redhat.com>

Thanks!
Dumitru
Terry Wilson Dec. 3, 2021, 3 p.m. UTC | #2
On Fri, Dec 3, 2021 at 6:06 AM Dumitru Ceara <dceara@redhat.com> wrote:
>
> On 12/1/21 18:51, Terry Wilson wrote:
> > Add support for monitor_cond_since / update3 to python-ovs to
> > allow more efficient reconnections when connecting to clustered
> > OVSDB servers.
> >
> > Signed-off-by: Terry Wilson <twilson@redhat.com>
> > ---
> >  python/ovs/db/idl.py | 245 ++++++++++++++++++++++++++++++++++++-------
> >  tests/ovsdb-idl.at   |   2 +-
> >  2 files changed, 211 insertions(+), 36 deletions(-)
> >
> > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> > index 60e58b03e..0d5e00208 100644
> > --- a/python/ovs/db/idl.py
> > +++ b/python/ovs/db/idl.py
> > @@ -13,6 +13,7 @@
> >  # limitations under the License.
> >
> >  import collections
> > +import enum
> >  import functools
> >  import uuid
> >
> > @@ -36,6 +37,7 @@ ROW_DELETE = "delete"
> >
> >  OVSDB_UPDATE = 0
> >  OVSDB_UPDATE2 = 1
> > +OVSDB_UPDATE3 = 2
> >
> >  CLUSTERED = "clustered"
> >  RELAY = "relay"
> > @@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
> >          return item in self.keys()
> >
> >
> > +class Monitor(enum.IntEnum):
> > +    monitor = OVSDB_UPDATE
> > +    monitor_cond = OVSDB_UPDATE2
> > +    monitor_cond_since = OVSDB_UPDATE3
> > +
> > +
> > +class ConditionState(object):
> > +    def __init__(self):
> > +        self._ack_cond = None
> > +        self._req_cond = None
> > +        self._new_cond = [True]
> > +
> > +    def __iter__(self):
> > +        return iter([self._new_cond, self._req_cond, self._ack_cond])
> > +
> > +    @property
> > +    def new(self):
> > +        """The latest freshly initialized condition change"""
> > +        return self._new_cond
> > +
> > +    @property
> > +    def acked(self):
> > +        """The last condition change that has been accepted by the server"""
> > +        return self._ack_cond
> > +
> > +    @property
> > +    def requested(self):
> > +        """A condition that's been requested, but not acked by the server"""
> > +        return self._req_cond
> > +
> > +    @property
> > +    def latest(self):
> > +        """The most recent condition change"""
> > +        return next(cond for cond in self if cond is not None)
> > +
> > +    @staticmethod
> > +    def is_true(condition):
> > +        return condition == [True]
> > +
> > +    def init(self, cond):
> > +        """Signal that a condition change is being initiated"""
> > +        self._new_cond = cond
> > +
> > +    def ack(self):
> > +        """Signal that a condition change has been acked"""
> > +        if self._req_cond is not None:
> > +            self._ack_cond, self._req_cond = (self._req_cond, None)
> > +
> > +    def request(self):
> > +        """Signal that a condition change has been requested"""
> > +        if self._new_cond is not None:
> > +            self._req_cond, self._new_cond = (self._new_cond, None)
> > +
> > +    def reset(self):
> > +        """Reset a requested condition change back to new"""
> > +        if self._req_cond is not None and self._new_cond is None:
> > +            self._new_cond, self._req_cond = (self._req_cond, None)
> > +
> > +
> >  class Idl(object):
> >      """Open vSwitch Database Interface Definition Language (OVSDB IDL).
> >
> > @@ -132,7 +193,13 @@ class Idl(object):
> >      IDL_S_SERVER_MONITOR_REQUESTED = 2
> >      IDL_S_DATA_MONITOR_REQUESTED = 3
> >      IDL_S_DATA_MONITOR_COND_REQUESTED = 4
> > -    IDL_S_MONITORING = 5
> > +    IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
> > +    IDL_S_MONITORING = 6
> > +
> > +    monitor_map = {
> > +        Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
> > +        Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
> > +        Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
> >
> >      def __init__(self, remote, schema_helper, probe_interval=None,
> >                   leader_only=True):
> > @@ -176,10 +243,12 @@ class Idl(object):
> >          remotes = self._parse_remotes(remote)
> >          self._session = ovs.jsonrpc.Session.open_multiple(remotes,
> >              probe_interval=probe_interval)
> > +        self._request_id = None
> >          self._monitor_request_id = None
> >          self._last_seqno = None
> >          self.change_seqno = 0
> >          self.uuid = uuid.uuid1()
> > +        self.last_id = str(uuid.UUID(int=0))
> >
> >          # Server monitor.
> >          self._server_schema_request_id = None
> > @@ -206,6 +275,9 @@ class Idl(object):
> >          self.txn = None
> >          self._outstanding_txns = {}
> >
> > +        self.cond_changed = False
> > +        self.cond_seqno = 0
> > +
> >          for table in schema.tables.values():
> >              for column in table.columns.values():
> >                  if not hasattr(column, 'alert'):
> > @@ -213,8 +285,7 @@ class Idl(object):
> >              table.need_table = False
> >              table.rows = custom_index.IndexedRows(table)
> >              table.idl = self
> > -            table.condition = [True]
> > -            table.cond_changed = False
> > +            table.condition = ConditionState()
> >
> >      def _parse_remotes(self, remote):
> >          # If remote is -
> > @@ -252,6 +323,38 @@ class Idl(object):
> >          update."""
> >          self._session.close()
> >
> > +    def ack_conditions(self):
> > +        """Mark all requested table conditions as acked"""
> > +        for table in self.tables.values():
> > +            table.condition.ack()
> > +
> > +    def sync_conditions(self):
> > +        """Synchronize condition state when the FSM is restarted
> > +
> > +        If a non-zero last_id is available for the DB, then upon reconnect
> > +        the IDL should first request acked conditions to avoid missing updates
> > +        about records that were added before the transaction with
> > +        txn-id == last_id. If there were requested condition changes in flight
> > +        and the IDL client didn't set new conditions, then reset the requested
> > +        conditions to new to trigger a follow-up monitor_cond_change request
> > +        """
> > +        ack_all = self.last_id == str(uuid.UUID(int=0))
> > +        for table in self.tables.values():
> > +            if ack_all:
> > +                table.condition.request()
> > +                table.condition.ack()
> > +            else:
> > +                table.condition.reset()
> > +                self.cond_changed = True
> > +
> > +    def restart_fsm(self):
> > +        # Resync data DB table conditions to avoid missing updated due to
> > +        # conditions that were in flight or changed locally while the
> > +        # connection was down.
> > +        self.sync_conditions()
> > +        self.__send_server_schema_request()
> > +        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
> > +
> >      def run(self):
> >          """Processes a batch of messages from the database server.  Returns
> >          True if the database as seen through the IDL changed, False if it did
> > @@ -286,7 +389,7 @@ class Idl(object):
> >              if seqno != self._last_seqno:
> >                  self._last_seqno = seqno
> >                  self.__txn_abort_all()
> > -                self.__send_server_schema_request()
> > +                self.restart_fsm()
> >                  if self.lock_name:
> >                      self.__send_lock_request()
> >                  break
> > @@ -294,8 +397,20 @@ class Idl(object):
> >              msg = self._session.recv()
> >              if msg is None:
> >                  break
> > +            is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
> > +                                       ovs.jsonrpc.Message.T_ERROR)
> > +
> > +            if is_response and self._request_id and self._request_id == msg.id:
> > +                self._request_id = None
> > +                # process_response follows
> >
> >              if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> > +                    and msg.method == "update3"
> > +                    and len(msg.params) == 3):
> > +                # Database contents changed.
> > +                self.__parse_update(msg.params[2], OVSDB_UPDATE3)
> > +                self.last_id = msg.params[1]
> > +            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> >                      and msg.method == "update2"
> >                      and len(msg.params) == 2):
> >                  # Database contents changed.
> > @@ -320,11 +435,18 @@ class Idl(object):
> >                  try:
> >                      self.change_seqno += 1
> >                      self._monitor_request_id = None
> > -                    self.__clear()
> > -                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
> > +                    if (self.state ==
> > +                            self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
> > +                        # If 'found' is false, clear table rows for new dump
> > +                        if not msg.result[0]:
> > +                            self.__clear()
> > +                        self.__parse_update(msg.result[2], OVSDB_UPDATE3)
> > +                    elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
> > +                        self.__clear()
> >                          self.__parse_update(msg.result, OVSDB_UPDATE2)
> >                      else:
> >                          assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
> > +                        self.__clear()
> >                          self.__parse_update(msg.result, OVSDB_UPDATE)
> >                      self.state = self.IDL_S_MONITORING
> >
> > @@ -398,11 +520,17 @@ class Idl(object):
> >              elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
> >                  # Reply to our echo request.  Ignore it.
> >                  pass
> > +            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> > +                  self.state == (
> > +                      self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
> > +                      self._monitor_request_id == msg.id):
> > +                if msg.error == "unknown method":
> > +                    self.__send_monitor_request(Monitor.monitor_cond)
> >              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> >                    self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
> >                    self._monitor_request_id == msg.id):
> >                  if msg.error == "unknown method":
> > -                    self.__send_monitor_request()
> > +                    self.__send_monitor_request(Monitor.monitor)
> >              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> >                    self._server_schema_request_id is not None and
> >                    self._server_schema_request_id == msg.id):
> > @@ -418,6 +546,13 @@ class Idl(object):
> >                    and self.__txn_process_reply(msg)):
> >                  # __txn_process_reply() did everything needed.
> >                  pass
> > +            elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
> > +                  self.state == self.IDL_S_MONITORING):
> > +                # Mark the last requested conditions as acked and if further
> > +                # condition changes were pending, send them now.
> > +                self.ack_conditions()
> > +                self.send_cond_change()
> > +                self.cond_seqno += 1
> >              else:
> >                  # This can happen if a transaction is destroyed before we
> >                  # receive the reply, so keep the log level low.
> > @@ -427,14 +562,36 @@ class Idl(object):
> >
> >          return initial_change_seqno != self.change_seqno
> >
> > -    def send_cond_change(self):
> > -        if not self._session.is_connected():
> > +    def compose_cond_change(self):
> > +        if not self.cond_changed:
> >              return
> >
> > +        change_requests = {}
> >          for table in self.tables.values():
> > -            if table.cond_changed:
> > -                self.__send_cond_change(table, table.condition)
> > -                table.cond_changed = False
> > +            # Always use the most recent conditions set by the IDL client when
> > +            # requesting monitor_cond_change
> > +            if table.condition.new is not None:
> > +                change_requests[table.name] = [
> > +                    {"where": table.condition.new}]
> > +                table.condition.request()
> > +
> > +        if not change_requests:
> > +            return
> > +
> > +        self.cond_changed = False
> > +        old_uuid = str(self.uuid)
> > +        self.uuid = uuid.uuid1()
> > +        params = [old_uuid, str(self.uuid), change_requests]
> > +        return ovs.jsonrpc.Message.create_request(
> > +            "monitor_cond_change", params)
> > +
> > +    def send_cond_change(self):
> > +        if not self._session.is_connected() or self._request_id is not None:
> > +            return
> > +
> > +        msg = self.compose_cond_change()
> > +        if msg:
> > +            self.send_request(msg)
> >
> >      def cond_change(self, table_name, cond):
> >          """Sets the condition for 'table_name' to 'cond', which should be a
> > @@ -450,13 +607,28 @@ class Idl(object):
> >
> >          if cond == []:
> >              cond = [False]
> > -        if table.condition != cond:
> > -            table.condition = cond
> > -            table.cond_changed = True
> > +
> > +        # Compare the new condition to the last known condition
> > +        if table.condition.latest != cond:
> > +            table.condition.init(cond)
> > +            self.cond_changed = True
> > +
> > +        # New condition will be sent out after all already requested ones
> > +        # are acked.
> > +        if table.condition.new:
> > +            any_reqs = any(t.condition.request for t in self.tables.values())
> > +            return self.cond_seqno + int(any_reqs) + 1
> > +
> > +        # Already requested conditions should be up to date at
> > +        # self.cond_seqno + 1 while acked conditions are already up to date
> > +        return self.cond_seqno + int(bool(table.condition.requested))
> >
> >      def wait(self, poller):
> >          """Arranges for poller.block() to wake up when self.run() has something
> >          to do or when activity occurs on a transaction on 'self'."""
> > +        if self.cond_changed:
> > +            poller.immediate_wake()
> > +            return
>
> Nit: In theory, self._session.wait()/recv_wait() should never do more
> than register to be awakened when the appropriate events occur, so
> skipping that should be fine (and seems to be with the current code).
> But what if that changes in the future?  Ilya, what do you think?

The C code does a poll_immediate_wake() if cond_changed in
ovsdb_cs_db_set_condition() which is cond_change() here. From what I
could tell, the C poll code has a poller per thread, whereas the
pattern in the Python IDL is that we create a Poller() object and pass
it around. Since the Transaction code is the code creating the Poller
object, and calling idl.wait(), this seemed to be the only way to wake
up. I put the return in because it seemed that if you are immediately
waking upon the next block() call, that the other wakeups would be
redundant.

> In any case, this works for me today, with or without that "return" there:
>
> Acked-by: Dumitru Ceara <dceara@redhat.com>
>
> Thanks!
> Dumitru
>
Dumitru Ceara Dec. 3, 2021, 4:06 p.m. UTC | #3
On 12/3/21 16:00, Terry Wilson wrote:
> On Fri, Dec 3, 2021 at 6:06 AM Dumitru Ceara <dceara@redhat.com> wrote:
>>
>> On 12/1/21 18:51, Terry Wilson wrote:
>>> Add support for monitor_cond_since / update3 to python-ovs to
>>> allow more efficient reconnections when connecting to clustered
>>> OVSDB servers.
>>>
>>> Signed-off-by: Terry Wilson <twilson@redhat.com>
>>> ---
>>>  python/ovs/db/idl.py | 245 ++++++++++++++++++++++++++++++++++++-------
>>>  tests/ovsdb-idl.at   |   2 +-
>>>  2 files changed, 211 insertions(+), 36 deletions(-)
>>>
>>> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
>>> index 60e58b03e..0d5e00208 100644
>>> --- a/python/ovs/db/idl.py
>>> +++ b/python/ovs/db/idl.py
>>> @@ -13,6 +13,7 @@
>>>  # limitations under the License.
>>>
>>>  import collections
>>> +import enum
>>>  import functools
>>>  import uuid
>>>
>>> @@ -36,6 +37,7 @@ ROW_DELETE = "delete"
>>>
>>>  OVSDB_UPDATE = 0
>>>  OVSDB_UPDATE2 = 1
>>> +OVSDB_UPDATE3 = 2
>>>
>>>  CLUSTERED = "clustered"
>>>  RELAY = "relay"
>>> @@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
>>>          return item in self.keys()
>>>
>>>
>>> +class Monitor(enum.IntEnum):
>>> +    monitor = OVSDB_UPDATE
>>> +    monitor_cond = OVSDB_UPDATE2
>>> +    monitor_cond_since = OVSDB_UPDATE3
>>> +
>>> +
>>> +class ConditionState(object):
>>> +    def __init__(self):
>>> +        self._ack_cond = None
>>> +        self._req_cond = None
>>> +        self._new_cond = [True]
>>> +
>>> +    def __iter__(self):
>>> +        return iter([self._new_cond, self._req_cond, self._ack_cond])
>>> +
>>> +    @property
>>> +    def new(self):
>>> +        """The latest freshly initialized condition change"""
>>> +        return self._new_cond
>>> +
>>> +    @property
>>> +    def acked(self):
>>> +        """The last condition change that has been accepted by the server"""
>>> +        return self._ack_cond
>>> +
>>> +    @property
>>> +    def requested(self):
>>> +        """A condition that's been requested, but not acked by the server"""
>>> +        return self._req_cond
>>> +
>>> +    @property
>>> +    def latest(self):
>>> +        """The most recent condition change"""
>>> +        return next(cond for cond in self if cond is not None)
>>> +
>>> +    @staticmethod
>>> +    def is_true(condition):
>>> +        return condition == [True]
>>> +
>>> +    def init(self, cond):
>>> +        """Signal that a condition change is being initiated"""
>>> +        self._new_cond = cond
>>> +
>>> +    def ack(self):
>>> +        """Signal that a condition change has been acked"""
>>> +        if self._req_cond is not None:
>>> +            self._ack_cond, self._req_cond = (self._req_cond, None)
>>> +
>>> +    def request(self):
>>> +        """Signal that a condition change has been requested"""
>>> +        if self._new_cond is not None:
>>> +            self._req_cond, self._new_cond = (self._new_cond, None)
>>> +
>>> +    def reset(self):
>>> +        """Reset a requested condition change back to new"""
>>> +        if self._req_cond is not None and self._new_cond is None:
>>> +            self._new_cond, self._req_cond = (self._req_cond, None)
>>> +
>>> +
>>>  class Idl(object):
>>>      """Open vSwitch Database Interface Definition Language (OVSDB IDL).
>>>
>>> @@ -132,7 +193,13 @@ class Idl(object):
>>>      IDL_S_SERVER_MONITOR_REQUESTED = 2
>>>      IDL_S_DATA_MONITOR_REQUESTED = 3
>>>      IDL_S_DATA_MONITOR_COND_REQUESTED = 4
>>> -    IDL_S_MONITORING = 5
>>> +    IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
>>> +    IDL_S_MONITORING = 6
>>> +
>>> +    monitor_map = {
>>> +        Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
>>> +        Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
>>> +        Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
>>>
>>>      def __init__(self, remote, schema_helper, probe_interval=None,
>>>                   leader_only=True):
>>> @@ -176,10 +243,12 @@ class Idl(object):
>>>          remotes = self._parse_remotes(remote)
>>>          self._session = ovs.jsonrpc.Session.open_multiple(remotes,
>>>              probe_interval=probe_interval)
>>> +        self._request_id = None
>>>          self._monitor_request_id = None
>>>          self._last_seqno = None
>>>          self.change_seqno = 0
>>>          self.uuid = uuid.uuid1()
>>> +        self.last_id = str(uuid.UUID(int=0))
>>>
>>>          # Server monitor.
>>>          self._server_schema_request_id = None
>>> @@ -206,6 +275,9 @@ class Idl(object):
>>>          self.txn = None
>>>          self._outstanding_txns = {}
>>>
>>> +        self.cond_changed = False
>>> +        self.cond_seqno = 0
>>> +
>>>          for table in schema.tables.values():
>>>              for column in table.columns.values():
>>>                  if not hasattr(column, 'alert'):
>>> @@ -213,8 +285,7 @@ class Idl(object):
>>>              table.need_table = False
>>>              table.rows = custom_index.IndexedRows(table)
>>>              table.idl = self
>>> -            table.condition = [True]
>>> -            table.cond_changed = False
>>> +            table.condition = ConditionState()
>>>
>>>      def _parse_remotes(self, remote):
>>>          # If remote is -
>>> @@ -252,6 +323,38 @@ class Idl(object):
>>>          update."""
>>>          self._session.close()
>>>
>>> +    def ack_conditions(self):
>>> +        """Mark all requested table conditions as acked"""
>>> +        for table in self.tables.values():
>>> +            table.condition.ack()
>>> +
>>> +    def sync_conditions(self):
>>> +        """Synchronize condition state when the FSM is restarted
>>> +
>>> +        If a non-zero last_id is available for the DB, then upon reconnect
>>> +        the IDL should first request acked conditions to avoid missing updates
>>> +        about records that were added before the transaction with
>>> +        txn-id == last_id. If there were requested condition changes in flight
>>> +        and the IDL client didn't set new conditions, then reset the requested
>>> +        conditions to new to trigger a follow-up monitor_cond_change request
>>> +        """
>>> +        ack_all = self.last_id == str(uuid.UUID(int=0))
>>> +        for table in self.tables.values():
>>> +            if ack_all:
>>> +                table.condition.request()
>>> +                table.condition.ack()
>>> +            else:
>>> +                table.condition.reset()
>>> +                self.cond_changed = True
>>> +
>>> +    def restart_fsm(self):
>>> +        # Resync data DB table conditions to avoid missing updated due to
>>> +        # conditions that were in flight or changed locally while the
>>> +        # connection was down.
>>> +        self.sync_conditions()
>>> +        self.__send_server_schema_request()
>>> +        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
>>> +
>>>      def run(self):
>>>          """Processes a batch of messages from the database server.  Returns
>>>          True if the database as seen through the IDL changed, False if it did
>>> @@ -286,7 +389,7 @@ class Idl(object):
>>>              if seqno != self._last_seqno:
>>>                  self._last_seqno = seqno
>>>                  self.__txn_abort_all()
>>> -                self.__send_server_schema_request()
>>> +                self.restart_fsm()
>>>                  if self.lock_name:
>>>                      self.__send_lock_request()
>>>                  break
>>> @@ -294,8 +397,20 @@ class Idl(object):
>>>              msg = self._session.recv()
>>>              if msg is None:
>>>                  break
>>> +            is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
>>> +                                       ovs.jsonrpc.Message.T_ERROR)
>>> +
>>> +            if is_response and self._request_id and self._request_id == msg.id:
>>> +                self._request_id = None
>>> +                # process_response follows
>>>
>>>              if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
>>> +                    and msg.method == "update3"
>>> +                    and len(msg.params) == 3):
>>> +                # Database contents changed.
>>> +                self.__parse_update(msg.params[2], OVSDB_UPDATE3)
>>> +                self.last_id = msg.params[1]
>>> +            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
>>>                      and msg.method == "update2"
>>>                      and len(msg.params) == 2):
>>>                  # Database contents changed.
>>> @@ -320,11 +435,18 @@ class Idl(object):
>>>                  try:
>>>                      self.change_seqno += 1
>>>                      self._monitor_request_id = None
>>> -                    self.__clear()
>>> -                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
>>> +                    if (self.state ==
>>> +                            self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
>>> +                        # If 'found' is false, clear table rows for new dump
>>> +                        if not msg.result[0]:
>>> +                            self.__clear()
>>> +                        self.__parse_update(msg.result[2], OVSDB_UPDATE3)
>>> +                    elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
>>> +                        self.__clear()
>>>                          self.__parse_update(msg.result, OVSDB_UPDATE2)
>>>                      else:
>>>                          assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
>>> +                        self.__clear()
>>>                          self.__parse_update(msg.result, OVSDB_UPDATE)
>>>                      self.state = self.IDL_S_MONITORING
>>>
>>> @@ -398,11 +520,17 @@ class Idl(object):
>>>              elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
>>>                  # Reply to our echo request.  Ignore it.
>>>                  pass
>>> +            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
>>> +                  self.state == (
>>> +                      self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
>>> +                      self._monitor_request_id == msg.id):
>>> +                if msg.error == "unknown method":
>>> +                    self.__send_monitor_request(Monitor.monitor_cond)
>>>              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
>>>                    self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
>>>                    self._monitor_request_id == msg.id):
>>>                  if msg.error == "unknown method":
>>> -                    self.__send_monitor_request()
>>> +                    self.__send_monitor_request(Monitor.monitor)
>>>              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
>>>                    self._server_schema_request_id is not None and
>>>                    self._server_schema_request_id == msg.id):
>>> @@ -418,6 +546,13 @@ class Idl(object):
>>>                    and self.__txn_process_reply(msg)):
>>>                  # __txn_process_reply() did everything needed.
>>>                  pass
>>> +            elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
>>> +                  self.state == self.IDL_S_MONITORING):
>>> +                # Mark the last requested conditions as acked and if further
>>> +                # condition changes were pending, send them now.
>>> +                self.ack_conditions()
>>> +                self.send_cond_change()
>>> +                self.cond_seqno += 1
>>>              else:
>>>                  # This can happen if a transaction is destroyed before we
>>>                  # receive the reply, so keep the log level low.
>>> @@ -427,14 +562,36 @@ class Idl(object):
>>>
>>>          return initial_change_seqno != self.change_seqno
>>>
>>> -    def send_cond_change(self):
>>> -        if not self._session.is_connected():
>>> +    def compose_cond_change(self):
>>> +        if not self.cond_changed:
>>>              return
>>>
>>> +        change_requests = {}
>>>          for table in self.tables.values():
>>> -            if table.cond_changed:
>>> -                self.__send_cond_change(table, table.condition)
>>> -                table.cond_changed = False
>>> +            # Always use the most recent conditions set by the IDL client when
>>> +            # requesting monitor_cond_change
>>> +            if table.condition.new is not None:
>>> +                change_requests[table.name] = [
>>> +                    {"where": table.condition.new}]
>>> +                table.condition.request()
>>> +
>>> +        if not change_requests:
>>> +            return
>>> +
>>> +        self.cond_changed = False
>>> +        old_uuid = str(self.uuid)
>>> +        self.uuid = uuid.uuid1()
>>> +        params = [old_uuid, str(self.uuid), change_requests]
>>> +        return ovs.jsonrpc.Message.create_request(
>>> +            "monitor_cond_change", params)
>>> +
>>> +    def send_cond_change(self):
>>> +        if not self._session.is_connected() or self._request_id is not None:
>>> +            return
>>> +
>>> +        msg = self.compose_cond_change()
>>> +        if msg:
>>> +            self.send_request(msg)
>>>
>>>      def cond_change(self, table_name, cond):
>>>          """Sets the condition for 'table_name' to 'cond', which should be a
>>> @@ -450,13 +607,28 @@ class Idl(object):
>>>
>>>          if cond == []:
>>>              cond = [False]
>>> -        if table.condition != cond:
>>> -            table.condition = cond
>>> -            table.cond_changed = True
>>> +
>>> +        # Compare the new condition to the last known condition
>>> +        if table.condition.latest != cond:
>>> +            table.condition.init(cond)
>>> +            self.cond_changed = True
>>> +
>>> +        # New condition will be sent out after all already requested ones
>>> +        # are acked.
>>> +        if table.condition.new:
>>> +            any_reqs = any(t.condition.request for t in self.tables.values())
>>> +            return self.cond_seqno + int(any_reqs) + 1
>>> +
>>> +        # Already requested conditions should be up to date at
>>> +        # self.cond_seqno + 1 while acked conditions are already up to date
>>> +        return self.cond_seqno + int(bool(table.condition.requested))
>>>
>>>      def wait(self, poller):
>>>          """Arranges for poller.block() to wake up when self.run() has something
>>>          to do or when activity occurs on a transaction on 'self'."""
>>> +        if self.cond_changed:
>>> +            poller.immediate_wake()
>>> +            return
>>
>> Nit: In theory, self._session.wait()/recv_wait() should never do more
>> than register to be awakened when the appropriate events occur, so
>> skipping that should be fine (and seems to be with the current code).
>> But what if that changes in the future?  Ilya, what do you think?
> 
> The C code does a poll_immediate_wake() if cond_changed in
> ovsdb_cs_db_set_condition() which is cond_change() here. From what I
> could tell, the C poll code has a poller per thread, whereas the
> pattern in the Python IDL is that we create a Poller() object and pass
> it around. Since the Transaction code is the code creating the Poller
> object, and calling idl.wait(), this seemed to be the only way to wake
> up. I put the return in because it seemed that if you are immediately
> waking upon the next block() call, that the other wakeups would be
> redundant.
> 

Right, they other wakeups are currently redundant and have no side
effects.  I wondering if that would change in the future, but that's,
probably, unlikely.

>> In any case, this works for me today, with or without that "return" there:
>>
>> Acked-by: Dumitru Ceara <dceara@redhat.com>
>>
>> Thanks!
>> Dumitru
>>
>
Ilya Maximets Jan. 6, 2022, 4 p.m. UTC | #4
On 12/3/21 17:06, Dumitru Ceara wrote:
> On 12/3/21 16:00, Terry Wilson wrote:
>> On Fri, Dec 3, 2021 at 6:06 AM Dumitru Ceara <dceara@redhat.com> wrote:
>>>
>>> On 12/1/21 18:51, Terry Wilson wrote:
>>>> Add support for monitor_cond_since / update3 to python-ovs to
>>>> allow more efficient reconnections when connecting to clustered
>>>> OVSDB servers.
>>>>
>>>> Signed-off-by: Terry Wilson <twilson@redhat.com>
>>>> ---
>>>>  python/ovs/db/idl.py | 245 ++++++++++++++++++++++++++++++++++++-------
>>>>  tests/ovsdb-idl.at   |   2 +-
>>>>  2 files changed, 211 insertions(+), 36 deletions(-)
>>>>
>>>
>>> Acked-by: Dumitru Ceara <dceara@redhat.com>

Thanks, Terry and Dumitru!

I added a NEWS entry for that change and applied.

I suppose, the next step here should be use of 'monitor_cond' for a monitor
request to _Server database.  Currently it uses simple 'monitor' that results
in relatively bulky updates every time database "index" changes, and that
happens on every update of a main database.  C version uses 'monitor_cond'
with a trivial condition in order to receive small 'update2' notifications.

One problem I spotted that existed before this patch (so not a blocker)
is that python-idl doesn't re-connect if it receives inconsistent update
or a broken database schema.  __parse_update() seems to trap all the
parsing exceptions and only logs them.  In case of schema or initial
monitor reply parsing failures there is an extra check to force
re-connection, but it's in the caller of __parse_update(), hence it
can't actually get the exception.  This, probably, need to be fixed.
C version of a code uses ovsdb_cs_retry()/ovsdb_cs_flag_inconsistency()
functions in similar cases.

Best regards, Ilya Maximets.
diff mbox series

Patch

diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 60e58b03e..0d5e00208 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -13,6 +13,7 @@ 
 # limitations under the License.
 
 import collections
+import enum
 import functools
 import uuid
 
@@ -36,6 +37,7 @@  ROW_DELETE = "delete"
 
 OVSDB_UPDATE = 0
 OVSDB_UPDATE2 = 1
+OVSDB_UPDATE3 = 2
 
 CLUSTERED = "clustered"
 RELAY = "relay"
@@ -75,6 +77,65 @@  class ColumnDefaultDict(dict):
         return item in self.keys()
 
 
+class Monitor(enum.IntEnum):
+    monitor = OVSDB_UPDATE
+    monitor_cond = OVSDB_UPDATE2
+    monitor_cond_since = OVSDB_UPDATE3
+
+
+class ConditionState(object):
+    def __init__(self):
+        self._ack_cond = None
+        self._req_cond = None
+        self._new_cond = [True]
+
+    def __iter__(self):
+        return iter([self._new_cond, self._req_cond, self._ack_cond])
+
+    @property
+    def new(self):
+        """The latest freshly initialized condition change"""
+        return self._new_cond
+
+    @property
+    def acked(self):
+        """The last condition change that has been accepted by the server"""
+        return self._ack_cond
+
+    @property
+    def requested(self):
+        """A condition that's been requested, but not acked by the server"""
+        return self._req_cond
+
+    @property
+    def latest(self):
+        """The most recent condition change"""
+        return next(cond for cond in self if cond is not None)
+
+    @staticmethod
+    def is_true(condition):
+        return condition == [True]
+
+    def init(self, cond):
+        """Signal that a condition change is being initiated"""
+        self._new_cond = cond
+
+    def ack(self):
+        """Signal that a condition change has been acked"""
+        if self._req_cond is not None:
+            self._ack_cond, self._req_cond = (self._req_cond, None)
+
+    def request(self):
+        """Signal that a condition change has been requested"""
+        if self._new_cond is not None:
+            self._req_cond, self._new_cond = (self._new_cond, None)
+
+    def reset(self):
+        """Reset a requested condition change back to new"""
+        if self._req_cond is not None and self._new_cond is None:
+            self._new_cond, self._req_cond = (self._req_cond, None)
+
+
 class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
 
@@ -132,7 +193,13 @@  class Idl(object):
     IDL_S_SERVER_MONITOR_REQUESTED = 2
     IDL_S_DATA_MONITOR_REQUESTED = 3
     IDL_S_DATA_MONITOR_COND_REQUESTED = 4
-    IDL_S_MONITORING = 5
+    IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
+    IDL_S_MONITORING = 6
+
+    monitor_map = {
+        Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
+        Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
+        Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
 
     def __init__(self, remote, schema_helper, probe_interval=None,
                  leader_only=True):
@@ -176,10 +243,12 @@  class Idl(object):
         remotes = self._parse_remotes(remote)
         self._session = ovs.jsonrpc.Session.open_multiple(remotes,
             probe_interval=probe_interval)
+        self._request_id = None
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
         self.uuid = uuid.uuid1()
+        self.last_id = str(uuid.UUID(int=0))
 
         # Server monitor.
         self._server_schema_request_id = None
@@ -206,6 +275,9 @@  class Idl(object):
         self.txn = None
         self._outstanding_txns = {}
 
+        self.cond_changed = False
+        self.cond_seqno = 0
+
         for table in schema.tables.values():
             for column in table.columns.values():
                 if not hasattr(column, 'alert'):
@@ -213,8 +285,7 @@  class Idl(object):
             table.need_table = False
             table.rows = custom_index.IndexedRows(table)
             table.idl = self
-            table.condition = [True]
-            table.cond_changed = False
+            table.condition = ConditionState()
 
     def _parse_remotes(self, remote):
         # If remote is -
@@ -252,6 +323,38 @@  class Idl(object):
         update."""
         self._session.close()
 
+    def ack_conditions(self):
+        """Mark all requested table conditions as acked"""
+        for table in self.tables.values():
+            table.condition.ack()
+
+    def sync_conditions(self):
+        """Synchronize condition state when the FSM is restarted
+
+        If a non-zero last_id is available for the DB, then upon reconnect
+        the IDL should first request acked conditions to avoid missing updates
+        about records that were added before the transaction with
+        txn-id == last_id. If there were requested condition changes in flight
+        and the IDL client didn't set new conditions, then reset the requested
+        conditions to new to trigger a follow-up monitor_cond_change request
+        """
+        ack_all = self.last_id == str(uuid.UUID(int=0))
+        for table in self.tables.values():
+            if ack_all:
+                table.condition.request()
+                table.condition.ack()
+            else:
+                table.condition.reset()
+                self.cond_changed = True
+
+    def restart_fsm(self):
+        # Resync data DB table conditions to avoid missing updated due to
+        # conditions that were in flight or changed locally while the
+        # connection was down.
+        self.sync_conditions()
+        self.__send_server_schema_request()
+        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+
     def run(self):
         """Processes a batch of messages from the database server.  Returns
         True if the database as seen through the IDL changed, False if it did
@@ -286,7 +389,7 @@  class Idl(object):
             if seqno != self._last_seqno:
                 self._last_seqno = seqno
                 self.__txn_abort_all()
-                self.__send_server_schema_request()
+                self.restart_fsm()
                 if self.lock_name:
                     self.__send_lock_request()
                 break
@@ -294,8 +397,20 @@  class Idl(object):
             msg = self._session.recv()
             if msg is None:
                 break
+            is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
+                                       ovs.jsonrpc.Message.T_ERROR)
+
+            if is_response and self._request_id and self._request_id == msg.id:
+                self._request_id = None
+                # process_response follows
 
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                    and msg.method == "update3"
+                    and len(msg.params) == 3):
+                # Database contents changed.
+                self.__parse_update(msg.params[2], OVSDB_UPDATE3)
+                self.last_id = msg.params[1]
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
                     and msg.method == "update2"
                     and len(msg.params) == 2):
                 # Database contents changed.
@@ -320,11 +435,18 @@  class Idl(object):
                 try:
                     self.change_seqno += 1
                     self._monitor_request_id = None
-                    self.__clear()
-                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+                    if (self.state ==
+                            self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
+                        # If 'found' is false, clear table rows for new dump
+                        if not msg.result[0]:
+                            self.__clear()
+                        self.__parse_update(msg.result[2], OVSDB_UPDATE3)
+                    elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+                        self.__clear()
                         self.__parse_update(msg.result, OVSDB_UPDATE2)
                     else:
                         assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
+                        self.__clear()
                         self.__parse_update(msg.result, OVSDB_UPDATE)
                     self.state = self.IDL_S_MONITORING
 
@@ -398,11 +520,17 @@  class Idl(object):
             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
                 # Reply to our echo request.  Ignore it.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self.state == (
+                      self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
+                      self._monitor_request_id == msg.id):
+                if msg.error == "unknown method":
+                    self.__send_monitor_request(Monitor.monitor_cond)
             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
                   self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
                   self._monitor_request_id == msg.id):
                 if msg.error == "unknown method":
-                    self.__send_monitor_request()
+                    self.__send_monitor_request(Monitor.monitor)
             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
                   self._server_schema_request_id is not None and
                   self._server_schema_request_id == msg.id):
@@ -418,6 +546,13 @@  class Idl(object):
                   and self.__txn_process_reply(msg)):
                 # __txn_process_reply() did everything needed.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
+                  self.state == self.IDL_S_MONITORING):
+                # Mark the last requested conditions as acked and if further
+                # condition changes were pending, send them now.
+                self.ack_conditions()
+                self.send_cond_change()
+                self.cond_seqno += 1
             else:
                 # This can happen if a transaction is destroyed before we
                 # receive the reply, so keep the log level low.
@@ -427,14 +562,36 @@  class Idl(object):
 
         return initial_change_seqno != self.change_seqno
 
-    def send_cond_change(self):
-        if not self._session.is_connected():
+    def compose_cond_change(self):
+        if not self.cond_changed:
             return
 
+        change_requests = {}
         for table in self.tables.values():
-            if table.cond_changed:
-                self.__send_cond_change(table, table.condition)
-                table.cond_changed = False
+            # Always use the most recent conditions set by the IDL client when
+            # requesting monitor_cond_change
+            if table.condition.new is not None:
+                change_requests[table.name] = [
+                    {"where": table.condition.new}]
+                table.condition.request()
+
+        if not change_requests:
+            return
+
+        self.cond_changed = False
+        old_uuid = str(self.uuid)
+        self.uuid = uuid.uuid1()
+        params = [old_uuid, str(self.uuid), change_requests]
+        return ovs.jsonrpc.Message.create_request(
+            "monitor_cond_change", params)
+
+    def send_cond_change(self):
+        if not self._session.is_connected() or self._request_id is not None:
+            return
+
+        msg = self.compose_cond_change()
+        if msg:
+            self.send_request(msg)
 
     def cond_change(self, table_name, cond):
         """Sets the condition for 'table_name' to 'cond', which should be a
@@ -450,13 +607,28 @@  class Idl(object):
 
         if cond == []:
             cond = [False]
-        if table.condition != cond:
-            table.condition = cond
-            table.cond_changed = True
+
+        # Compare the new condition to the last known condition
+        if table.condition.latest != cond:
+            table.condition.init(cond)
+            self.cond_changed = True
+
+        # New condition will be sent out after all already requested ones
+        # are acked.
+        if table.condition.new:
+            any_reqs = any(t.condition.request for t in self.tables.values())
+            return self.cond_seqno + int(any_reqs) + 1
+
+        # Already requested conditions should be up to date at
+        # self.cond_seqno + 1 while acked conditions are already up to date
+        return self.cond_seqno + int(bool(table.condition.requested))
 
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
+        if self.cond_changed:
+            poller.immediate_wake()
+            return
         self._session.wait(poller)
         self._session.recv_wait(poller)
 
@@ -531,14 +703,6 @@  class Idl(object):
         to doing nothing to avoid overhead where it is not needed.
         """
 
-    def __send_cond_change(self, table, cond):
-        monitor_cond_change = {table.name: [{"where": cond}]}
-        old_uuid = str(self.uuid)
-        self.uuid = uuid.uuid1()
-        params = [old_uuid, str(self.uuid), monitor_cond_change]
-        msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
-        self._session.send(msg)
-
     def __clear(self):
         changed = False
 
@@ -547,6 +711,8 @@  class Idl(object):
                 changed = True
                 table.rows = custom_index.IndexedRows(table)
 
+        self.cond_seqno = 0
+
         if changed:
             self.change_seqno += 1
 
@@ -601,11 +767,18 @@  class Idl(object):
         self._db_change_aware_request_id = msg.id
         self._session.send(msg)
 
-    def __send_monitor_request(self):
-        if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
-                           self.IDL_S_INITIAL]):
+    def send_request(self, request):
+        self._request_id = request.id
+        if self._session.is_connected():
+            return self._session.send(request)
+
+    def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
+        if self.state == self.IDL_S_INITIAL:
             self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
             method = "monitor_cond"
+        elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
+            self.state = self.monitor_map[Monitor(max_version)]
+            method = Monitor(max_version).name
         else:
             self.state = self.IDL_S_DATA_MONITOR_REQUESTED
             method = "monitor"
@@ -619,22 +792,24 @@  class Idl(object):
                         (column not in self.readonly[table.name])):
                     columns.append(column)
             monitor_request = {"columns": columns}
-            if method == "monitor_cond" and table.condition != [True]:
-                monitor_request["where"] = table.condition
-                table.cond_change = False
+            if method in ("monitor_cond", "monitor_cond_since") and (
+                    not ConditionState.is_true(table.condition.acked)):
+                monitor_request["where"] = table.condition.acked
             monitor_requests[table.name] = [monitor_request]
 
-        msg = ovs.jsonrpc.Message.create_request(
-            method, [self._db.name, str(self.uuid), monitor_requests])
+        args = [self._db.name, str(self.uuid), monitor_requests]
+        if method == "monitor_cond_since":
+            args.append(str(self.last_id))
+        msg = ovs.jsonrpc.Message.create_request(method, args)
         self._monitor_request_id = msg.id
-        self._session.send(msg)
+        self.send_request(msg)
 
     def __send_server_schema_request(self):
         self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
         msg = ovs.jsonrpc.Message.create_request(
             "get_schema", [self._server_db_name, str(self.uuid)])
         self._server_schema_request_id = msg.id
-        self._session.send(msg)
+        self.send_request(msg)
 
     def __send_server_monitor_request(self):
         self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
@@ -654,7 +829,7 @@  class Idl(object):
                              str(self.server_monitor_uuid),
                              monitor_requests])
         self._server_monitor_request_id = msg.id
-        self._session.send(msg)
+        self.send_request(msg)
 
     def __parse_update(self, update, version, tables=None):
         try:
@@ -698,7 +873,7 @@  class Idl(object):
 
                 self.cooperative_yield()
 
-                if version == OVSDB_UPDATE2:
+                if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
                     changes = self.__process_update2(table, uuid, row_update)
                     if changes:
                         notices.append(changes)
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index 3adb9d638..86a75f920 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -2319,7 +2319,7 @@  m4_define([OVSDB_CHECK_CLUSTER_IDL],
 
 # Checks that monitor_cond_since works fine when disconnects happen
 # with cond_change requests in flight (i.e., IDL is properly updated).
-OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect],
+OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
   3,
   [['["idltest",
        {"op": "insert",