diff mbox series

[ovs-dev,v3] Monitor Database table to manage lifecycle of IDL client.

Message ID 1547688619-150042-1-git-send-email-ted.elhourani@nutanix.com
State Superseded
Headers show
Series [ovs-dev,v3] Monitor Database table to manage lifecycle of IDL client. | expand

Commit Message

Ted Elhourani Jan. 17, 2019, 1:30 a.m. UTC
The Python IDL implementation supports ovsdb cluster connections.
This patch is a follow up to commit 31e434fc98, it adds the option of
connecting to the leader (the default) in the Raft-based cluster. It mimics
the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.

The _Server database schema is first requested, then a monitor of the
Database table in the _Server Database. Method __check_server_db verifies
the eligibility of the server. If the attempt to obtain a monitor of the
_Server database fails and a cluster id was not provided this implementation
proceeds to request the data monitor. If a cluster id was provided via the
set_cluster_id method then the connection is aborted and a connection to a
different node is instead attempted, until a valid cluster node is found.
Thus, when supplied, cluster id is interpreted as the intention to only
allow connections to a clustered database. If not supplied, connections to
standalone nodes, or nodes that do not have the _Server database are
allowed. change_seqno is not incremented in the case of Database table
updates.

Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com>
---

v2 -> v3
--------
* Add 2 tests, treat cluster_id as a string, mv arg till end, pep8 fixes.

v1 -> v2
--------
* Modify for backward compatibility with _Server-less ovsdb servers.

 python/ovs/db/idl.py    | 226 ++++++++++++++++++++++++++++++++++++++++++++----
 python/ovs/reconnect.py |   3 +
 tests/ovsdb-idl.at      | 138 ++++++++++++++++++++++-------
 tests/test-ovsdb.py     |  15 +++-
 4 files changed, 335 insertions(+), 47 deletions(-)

Comments

Ilya Maximets Jan. 17, 2019, 11:10 a.m. UTC | #1
Hi.
Not a full review. Just few comments inline.

Best regards, Ilya Maximets.

On 17.01.2019 4:30, Ted Elhourani wrote:
> The Python IDL implementation supports ovsdb cluster connections.
> This patch is a follow up to commit 31e434fc98, it adds the option of
> connecting to the leader (the default) in the Raft-based cluster. It mimics
> the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.
> 
> The _Server database schema is first requested, then a monitor of the
> Database table in the _Server Database. Method __check_server_db verifies
> the eligibility of the server. If the attempt to obtain a monitor of the
> _Server database fails and a cluster id was not provided this implementation
> proceeds to request the data monitor. If a cluster id was provided via the
> set_cluster_id method then the connection is aborted and a connection to a
> different node is instead attempted, until a valid cluster node is found.
> Thus, when supplied, cluster id is interpreted as the intention to only
> allow connections to a clustered database. If not supplied, connections to
> standalone nodes, or nodes that do not have the _Server database are
> allowed. change_seqno is not incremented in the case of Database table
> updates.
> 
> Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com>
> ---
> 
> v2 -> v3
> --------
> * Add 2 tests, treat cluster_id as a string, mv arg till end, pep8 fixes.
> 
> v1 -> v2
> --------
> * Modify for backward compatibility with _Server-less ovsdb servers.
> 
>  python/ovs/db/idl.py    | 226 ++++++++++++++++++++++++++++++++++++++++++++----
>  python/ovs/reconnect.py |   3 +
>  tests/ovsdb-idl.at      | 138 ++++++++++++++++++++++-------
>  tests/test-ovsdb.py     |  15 +++-
>  4 files changed, 335 insertions(+), 47 deletions(-)
> 
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 250e897..2b2d7c5 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -38,6 +38,8 @@ ROW_DELETE = "delete"
>  OVSDB_UPDATE = 0
>  OVSDB_UPDATE2 = 1
>  
> +CLUSTERED = "clustered"
> +
>  
>  class Idl(object):
>      """Open vSwitch Database Interface Definition Language (OVSDB IDL).
> @@ -92,10 +94,13 @@ class Idl(object):
>  """
>  
>      IDL_S_INITIAL = 0
> -    IDL_S_MONITOR_REQUESTED = 1
> -    IDL_S_MONITOR_COND_REQUESTED = 2
> +    IDL_S_SERVER_SCHEMA_REQUESTED = 1
> +    IDL_S_SERVER_MONITOR_REQUESTED = 2
> +    IDL_S_DATA_MONITOR_REQUESTED = 3
> +    IDL_S_DATA_MONITOR_COND_REQUESTED = 4
>  
> -    def __init__(self, remote, schema_helper, probe_interval=None):
> +    def __init__(self, remote, schema_helper, probe_interval=None,
> +                 leader_only=True):
>          """Creates and returns a connection to the database named 'db_name' on
>          'remote', which should be in a form acceptable to
>          ovs.jsonrpc.session.open().  The connection will maintain an in-memory
> @@ -119,6 +124,9 @@ class Idl(object):
>  
>          The IDL uses and modifies 'schema' directly.
>  
> +        If 'leader_only' is set to True (default value) the IDL will only
> +        monitor and transact with the leader of the cluster.
> +
>          If "probe_interval" is zero it disables the connection keepalive
>          feature. If non-zero the value will be forced to at least 1000
>          milliseconds. If None it will just use the default value in OVS.
> @@ -137,6 +145,20 @@ class Idl(object):
>          self._last_seqno = None
>          self.change_seqno = 0
>          self.uuid = uuid.uuid1()
> +
> +        # Server monitor.
> +        self._server_schema_request_id = None
> +        self._server_monitor_request_id = None
> +        self._db_change_aware_request_id = None
> +        self._server_db_name = '_Server'
> +        self._server_db_table = 'Database'
> +        self.server_tables = None
> +        self._server_db = None
> +        self.server_monitor_uuid = uuid.uuid1()
> +        self.leader_only = leader_only
> +        self.cluster_id = None
> +        self._min_index = 0
> +
>          self.state = self.IDL_S_INITIAL
>  
>          # Database locking.
> @@ -172,6 +194,12 @@ class Idl(object):
>                  remotes.append(r)
>          return remotes
>  
> +    def set_cluster_id(self, cluster_id):
> +        """Set the id of the cluster that this idl must connect to."""
> +        self.cluster_id = cluster_id
> +        if self.state != self.IDL_S_INITIAL:
> +            self.force_reconnect()
> +
>      def index_create(self, table, name):
>          """Create a named multi-column index on a table"""
>          return self.tables[table].rows.index_create(name)
> @@ -222,7 +250,7 @@ class Idl(object):
>              if seqno != self._last_seqno:
>                  self._last_seqno = seqno
>                  self.__txn_abort_all()
> -                self.__send_monitor_request()
> +                self.__send_server_schema_request()
>                  if self.lock_name:
>                      self.__send_lock_request()
>                  break
> @@ -230,6 +258,7 @@ class Idl(object):
>              msg = self._session.recv()
>              if msg is None:
>                  break
> +
>              if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
>                      and msg.method == "update2"
>                      and len(msg.params) == 2):
> @@ -239,7 +268,15 @@ class Idl(object):
>                      and msg.method == "update"
>                      and len(msg.params) == 2):
>                  # Database contents changed.
> -                self.__parse_update(msg.params[1], OVSDB_UPDATE)
> +                if msg.params[0] == str(self.server_monitor_uuid):
> +                    self.__parse_update(msg.params[1], OVSDB_UPDATE,
> +                                        tables=self.server_tables)
> +                    self.change_seqno = initial_change_seqno
> +                    if not self.__check_server_db():
> +                        self.force_reconnect()
> +                        break
> +                else:
> +                    self.__parse_update(msg.params[1], OVSDB_UPDATE)
>              elif (msg.type == ovs.jsonrpc.Message.T_REPLY
>                    and self._monitor_request_id is not None
>                    and self._monitor_request_id == msg.id):
> @@ -248,17 +285,66 @@ class Idl(object):
>                      self.change_seqno += 1
>                      self._monitor_request_id = None
>                      self.__clear()
> -                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
> +                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
>                          self.__parse_update(msg.result, OVSDB_UPDATE2)
>                      else:
> -                        assert self.state == self.IDL_S_MONITOR_REQUESTED
> +                        assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
>                          self.__parse_update(msg.result, OVSDB_UPDATE)
> -
>                  except error.Error as e:
>                      vlog.err("%s: parse error in received schema: %s"
>                               % (self._session.get_name(), e))
>                      self.__error()
>              elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> +                  and self._server_schema_request_id is not None
> +                  and self._server_schema_request_id == msg.id):
> +                # Reply to our "get_schema" of _Server request.
> +                try:
> +                    self._server_schema_request_id = None
> +                    sh = SchemaHelper(None, msg.result)
> +                    sh.register_table(self._server_db_table)
> +                    schema = sh.get_idl_schema()
> +                    self._server_db = schema
> +                    self.server_tables = schema.tables
> +                    self.__send_server_monitor_request()
> +                except error.Error as e:
> +                    vlog.err("%s: error receiving server schema: %s"
> +                             % (self._session.get_name(), e))
> +                    if self.cluster_id:
> +                        self.__error()
> +                        break
> +                    else:
> +                        self.change_seqno = initial_change_seqno
> +                        self.__send_monitor_request()
> +            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> +                  and self._server_monitor_request_id is not None
> +                  and self._server_monitor_request_id == msg.id):
> +                # Reply to our "monitor" of _Server request.
> +                try:
> +                    self._server_monitor_request_id = None
> +                    self.__parse_update(msg.result, OVSDB_UPDATE,
> +                                        tables=self.server_tables)
> +                    self.change_seqno = initial_change_seqno
> +                    if self.__check_server_db():
> +                        self.__send_monitor_request()
> +                        self.__send_db_change_aware()
> +                    else:
> +                        self.force_reconnect()
> +                        break
> +                except error.Error as e:
> +                    vlog.err("%s: parse error in received schema: %s"
> +                             % (self._session.get_name(), e))
> +                    if self.cluster_id:
> +                        self.__error()
> +                        break
> +                    else:
> +                        self.change_seqno = initial_change_seqno
> +                        self.__send_monitor_request()
> +            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> +                  and self._db_change_aware_request_id is not None
> +                  and self._db_change_aware_request_id == msg.id):
> +                # Reply to us notifying the server of our change awarness.
> +                self._db_change_aware_request_id = None
> +            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
>                    and self._lock_request_id is not None
>                    and self._lock_request_id == msg.id):
>                  # Reply to our "lock" request.
> @@ -275,10 +361,20 @@ class Idl(object):
>                  # Reply to our echo request.  Ignore it.
>                  pass
>              elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> -                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
> +                  self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
>                    self._monitor_request_id == msg.id):
>                  if msg.error == "unknown method":
>                      self.__send_monitor_request()
> +            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> +                  self._server_schema_request_id is not None and
> +                  self._server_schema_request_id == msg.id):
> +                self._server_schema_request_id = None
> +                if self.cluster_id:
> +                    self.force_reconnect()
> +                    break
> +                else:
> +                    self.change_seqno = initial_change_seqno
> +                    self.__send_monitor_request()
>              elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
>                                 ovs.jsonrpc.Message.T_REPLY)
>                    and self.__txn_process_reply(msg)):
> @@ -342,6 +438,15 @@ class Idl(object):
>          In the meantime, the contents of the IDL will not change."""
>          self._session.force_reconnect()
>  
> +    def session_name(self):
> +        return self._session.get_name()
> +
> +    def remotes(self):
> +        return self._session.remotes
> +
> +    def db_name(self):
> +        return self._db.name
> +
>      def set_lock(self, lock_name):
>          """If 'lock_name' is not None, configures the IDL to obtain the named
>          lock from the database server and to avoid modifying the database when
> @@ -440,12 +545,19 @@ class Idl(object):
>              if not new_has_lock:
>                  self.is_lock_contended = True
>  
> +    def __send_db_change_aware(self):
> +        msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
> +                                                 [True])
> +        self._db_change_aware_request_id = msg.id
> +        self._session.send(msg)
> +
>      def __send_monitor_request(self):
> -        if self.state == self.IDL_S_INITIAL:
> -            self.state = self.IDL_S_MONITOR_COND_REQUESTED
> +        if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
> +                           self.IDL_S_INITIAL]):
> +            self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
>              method = "monitor_cond"
>          else:
> -            self.state = self.IDL_S_MONITOR_REQUESTED
> +            self.state = self.IDL_S_DATA_MONITOR_REQUESTED
>              method = "monitor"
>  
>          monitor_requests = {}
> @@ -467,20 +579,50 @@ class Idl(object):
>          self._monitor_request_id = msg.id
>          self._session.send(msg)
>  
> -    def __parse_update(self, update, version):
> +    def __send_server_schema_request(self):
> +        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
> +        msg = ovs.jsonrpc.Message.create_request(
> +            "get_schema", [self._server_db_name, str(self.uuid)])
> +        self._server_schema_request_id = msg.id
> +        self._session.send(msg)
> +
> +    def __send_server_monitor_request(self):
> +        self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
> +        monitor_requests = {}
> +        table = self.server_tables[self._server_db_table]
> +        columns = [column for column in six.iterkeys(table.columns)]
> +        for column in six.itervalues(table.columns):
> +            if not hasattr(column, 'alert'):
> +                column.alert = True
> +        table.rows = custom_index.IndexedRows(table)
> +        table.need_table = False
> +        table.idl = self
> +        monitor_request = {"columns": columns}
> +        monitor_requests[table.name] = [monitor_request]
> +        msg = ovs.jsonrpc.Message.create_request(
> +            'monitor', [self._server_db.name,
> +                             str(self.server_monitor_uuid),
> +                             monitor_requests])
> +        self._server_monitor_request_id = msg.id
> +        self._session.send(msg)
> +
> +    def __parse_update(self, update, version, tables=None):
>          try:
> -            self.__do_parse_update(update, version)
> +            if not tables:
> +                self.__do_parse_update(update, version, self.tables)
> +            else:
> +                self.__do_parse_update(update, version, tables)
>          except error.Error as e:
>              vlog.err("%s: error parsing update: %s"
>                       % (self._session.get_name(), e))
>  
> -    def __do_parse_update(self, table_updates, version):
> +    def __do_parse_update(self, table_updates, version, tables):
>          if not isinstance(table_updates, dict):
>              raise error.Error("<table-updates> is not an object",
>                                table_updates)
>  
>          for table_name, table_update in six.iteritems(table_updates):
> -            table = self.tables.get(table_name)
> +            table = tables.get(table_name)
>              if not table:
>                  raise error.Error('<table-updates> includes unknown '
>                                    'table "%s"' % table_name)
> @@ -605,6 +747,58 @@ class Idl(object):
>                  self.notify(op, row, Row.from_json(self, table, uuid, old))
>          return changed
>  
> +    def __check_server_db(self):
> +        """Returns True if this is a valid server database, False otherwise."""
> +        session_name = self.session_name()
> +
> +        if self._server_db_table not in self.server_tables:
> +            vlog.info("%s: server does not have %s table in its %s database"
> +                      % (session_name, self._server_db_table,
> +                         self._server_db_name))
> +            return False
> +
> +        rows = self.server_tables[self._server_db_table].rows
> +
> +        database = None
> +        for row in six.itervalues(rows):
> +            if self.cluster_id:
> +                if self.cluster_id in \
> +                   map(lambda x: str(x)[:4], row.cid):
> +                    database = row
> +                    break
> +            elif row.name == self._db.name:
> +                database = row
> +                break
> +
> +        if not database:
> +            vlog.info("%s: server does not have %s database"
> +                      % (session_name, self._db.name))
> +            return False
> +
> +        if (database.model == CLUSTERED and
> +            self._session.get_num_of_remotes() > 1):
> +            if not database.schema:
> +                vlog.info('%s: clustered database server has not yet joined '
> +                          'cluster; trying another server' % session_name)
> +                return False
> +            if not database.connected:
> +                vlog.info('%s: clustered database server is disconnected '
> +                          'from cluster; trying another server' % session_name)
> +                return False
> +            if (self.leader_only and
> +                not database.leader):
> +                vlog.info('%s: clustered database server is not cluster '
> +                          'leader; trying another server' % session_name)
> +                return False
> +            if database.index:
> +                if database.index[0] < self._min_index:
> +                    vlog.warn('%s: clustered database server has stale data; '
> +                              'trying another server' % session_name)
> +                    return False
> +                self._min_index = database.index[0]
> +
> +        return True
> +
>      def __column_name(self, column):
>          if column.type.key.type == ovs.db.types.UuidType:
>              return ovs.ovsuuid.to_json(column.type.key.type.default)
> diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
> index 34cc769..afbe445 100644
> --- a/python/ovs/reconnect.py
> +++ b/python/ovs/reconnect.py
> @@ -344,6 +344,9 @@ class Reconnect(object):
>                  else:
>                      self.info_level("%s: error listening for connections"
>                                      % self.name)
> +            elif self.state == Reconnect.Reconnect:
> +                self.info_level("%s: connection closed by client"
> +                                % self.name)
>              elif self.backoff < self.max_backoff:
>                  if self.passive:
>                      type_ = "listen"
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index 8981b5e..95f28a9 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -11,7 +11,53 @@ ovsdb_start_idltest () {
>      ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
>      on_exit 'kill `cat ovsdb-server.pid`'
>  }
> -])
> +
> +# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
> +#
> +# Creates a database using SCHEMA (default: idltest.ovsschema) and
> +# starts a database cluster listening on punix:socket and REMOTE (if
> +# specified).
> +ovsdb_cluster_start_idltest () {
> +   local n=$1
> +   ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $?
> +   cid=`ovsdb-tool db-cid s1.db`
> +   schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
> +   for i in `seq 2 $n`; do
> +     ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $?
> +   done
> +   for i in `seq $n`; do
> +     ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $?
> +   done
> +   on_exit 'kill `cat s*.pid`'
> +}
> +
> +# ovsdb_cluster_leader [REMOTES] [DATABASE]
> +#
> +# Returns the leader of the DATABASE cluster.
> +ovsdb_cluster_leader () {
> +   remotes=$(echo $1 | tr "," "\n")
> +   for remote in $remotes; do
> +      ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true
> +      if [[ $? == 0 ]]; then
> +        port=$(echo $remote | cut -d':' -f 3)
> +        log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./)
> +        pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
> +        echo "${remote}|${pid}"
> +        return
> +      fi
> +   done
> +}
> +
> +# ovsdb_cluster_kill_leader [REMOTES] [DATABASE]
> +#
> +# Kills the leader of the DATABASE cluster.
> +ovsdb_cluster_kill_leader () {
> +   leader=`ovsdb_cluster_leader $1 $2`
> +   pid=$(echo $leader | cut -d'|' -f 2)
> +   kill `cat $pid`
> +}
> +export -f ovsdb_cluster_leader
> +export -f ovsdb_cluster_kill_leader])

This breaks the testsuite on FreeBSD:

  export: Illegal option -f

You can check ovsrobot builds here:
    https://cirrus-ci.com/github/ovsrobot/ovs

>  
>  # OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
>  #                   [FILTER])
> @@ -1466,40 +1512,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
>         "where": [["i", "==", 0]]}]' \
>      'reconnect']],
>    [[000: empty
> -001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
> -002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
> -002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
> -002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> +000: event:create, row={uuid=<0>}, updates=None
> +000: event:create, row={uuid=<1>}, updates=None
> +001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
> +002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
> +002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
> +002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
>  003: {"error":null,"result":[{"count":2}]}
> -004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>}
> -004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> +004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
> +004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
>  005: {"error":null,"result":[{"count":2}]}
> -006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
> -006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>}
> -006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> -007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
> -008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
> -008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
> -008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> +006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
> +006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
> +006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
> +007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
> +008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
> +008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
> +008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
>  009: {"error":null,"result":[{"count":2}]}
> -010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
> -010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
> -010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
> -010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> +010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
> +010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
> +010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
> +010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
>  011: {"error":null,"result":[{"count":1}]}
> -012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
> -012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
> -012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> +012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
> +012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
> +012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
>  013: reconnect
> -014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
> -014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
> -014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
> -014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
> +014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
> +014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
> +014: event:create, row={uuid=<0>}, updates=None
> +014: event:create, row={uuid=<1>}, updates=None
> +014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
> +014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
>  015: done
>  ]])
>  
> @@ -1853,3 +1903,31 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY],
>  
>  CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
>  CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
> +
> +# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
> +# with multiple remotes to assert the idl connects to the leader of the Raft cluster
> +m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
> +  [AT_SETUP([$1])
> +   AT_SKIP_IF([test $7 = no])
> +   AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
> +   m4_define([LPBK],[127.0.0.1])
> +   AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
> +   PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
> +   PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
> +   PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
> +   remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
> +   AT_CHECK([$8 $srcdir/test-ovsdb.py  -t40 idl $srcdir/idltest.ovsschema $remotes $3],
> +        [0], [stdout], [ignore])
> +   remote=$(ovsdb_cluster_leader $remotes "idltest")
> +   leader=$(echo $remote | cut -d'|' -f 1)
> +   AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
> +   AT_CLEANUP])
> +
> +m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
> +   [OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6],
> +                 [$HAVE_PYTHON], [$PYTHON])
> +    OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6],
> +                 [$HAVE_PYTHON3], [$PYTHON3])])
> +
> +OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote' '+reconnect'])
> +OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['leaderkill' 'remote' '+reconnect'])
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index 1d7c023..c36b073 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -430,6 +430,20 @@ def idl_set(idl, commands, step):
>              sys.stdout.flush()
>              txn.abort()
>              return
> +        elif name == "remote":
> +            print("%03d: %s" % (step, idl.session_name()))
> +            sys.stdout.flush()
> +            txn.abort()
> +            return
> +        elif name == "leaderkill":
> +            remotes = ','.join(idl.remotes())
> +            command = 'ovsdb_cluster_kill_leader %s %s' \
> +                % (remotes, idl.db_name())
> +            os.system(command)
> +            print("%03d: kill %s" % (step, idl.session_name()))
> +            sys.stdout.flush()
> +            txn.abort()
> +            return
>          elif name == "linktest":
>              l1_0 = txn.insert(idl.tables["link1"])
>              l1_0.i = 1
> @@ -651,7 +665,6 @@ def do_idl(schema_file, remote, *commands):
>              # Wait for update.
>              while idl.change_seqno == seqno and not idl.run():
>                  rpc.run()
> -

It's not necessary to remove this line.

>                  poller = ovs.poller.Poller()
>                  idl.wait(poller)
>                  rpc.wait(poller)
>
Ted Elhourani Jan. 17, 2019, 7:15 p.m. UTC | #2
Ilya,

Thanks, I’ll fix the two issues for v4.

Ted

On Jan 17, 2019, at 3:10 AM, Ilya Maximets <i.maximets@samsung.com<mailto:i.maximets@samsung.com>> wrote:

Hi.
Not a full review. Just few comments inline.

Best regards, Ilya Maximets.

On 17.01.2019 4:30, Ted Elhourani wrote:
The Python IDL implementation supports ovsdb cluster connections.
This patch is a follow up to commit 31e434fc98, it adds the option of
connecting to the leader (the default) in the Raft-based cluster. It mimics
the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.

The _Server database schema is first requested, then a monitor of the
Database table in the _Server Database. Method __check_server_db verifies
the eligibility of the server. If the attempt to obtain a monitor of the
_Server database fails and a cluster id was not provided this implementation
proceeds to request the data monitor. If a cluster id was provided via the
set_cluster_id method then the connection is aborted and a connection to a
different node is instead attempted, until a valid cluster node is found.
Thus, when supplied, cluster id is interpreted as the intention to only
allow connections to a clustered database. If not supplied, connections to
standalone nodes, or nodes that do not have the _Server database are
allowed. change_seqno is not incremented in the case of Database table
updates.

Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com<mailto:ted.elhourani@nutanix.com>>
---

v2 -> v3
--------
* Add 2 tests, treat cluster_id as a string, mv arg till end, pep8 fixes.

v1 -> v2
--------
* Modify for backward compatibility with _Server-less ovsdb servers.

python/ovs/db/idl.py    | 226 ++++++++++++++++++++++++++++++++++++++++++++----
python/ovs/reconnect.py |   3 +
tests/ovsdb-idl.at<http://ovsdb-idl.at>      | 138 ++++++++++++++++++++++-------
tests/test-ovsdb.py     |  15 +++-
4 files changed, 335 insertions(+), 47 deletions(-)

diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 250e897..2b2d7c5 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -38,6 +38,8 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1

+CLUSTERED = "clustered"
+

class Idl(object):
    """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +94,13 @@ class Idl(object):
"""

    IDL_S_INITIAL = 0
-    IDL_S_MONITOR_REQUESTED = 1
-    IDL_S_MONITOR_COND_REQUESTED = 2
+    IDL_S_SERVER_SCHEMA_REQUESTED = 1
+    IDL_S_SERVER_MONITOR_REQUESTED = 2
+    IDL_S_DATA_MONITOR_REQUESTED = 3
+    IDL_S_DATA_MONITOR_COND_REQUESTED = 4

-    def __init__(self, remote, schema_helper, probe_interval=None):
+    def __init__(self, remote, schema_helper, probe_interval=None,
+                 leader_only=True):
        """Creates and returns a connection to the database named 'db_name' on
        'remote', which should be in a form acceptable to
        ovs.jsonrpc.session.open().  The connection will maintain an in-memory
@@ -119,6 +124,9 @@ class Idl(object):

        The IDL uses and modifies 'schema' directly.

+        If 'leader_only' is set to True (default value) the IDL will only
+        monitor and transact with the leader of the cluster.
+
        If "probe_interval" is zero it disables the connection keepalive
        feature. If non-zero the value will be forced to at least 1000
        milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +145,20 @@ class Idl(object):
        self._last_seqno = None
        self.change_seqno = 0
        self.uuid = uuid.uuid1()
+
+        # Server monitor.
+        self._server_schema_request_id = None
+        self._server_monitor_request_id = None
+        self._db_change_aware_request_id = None
+        self._server_db_name = '_Server'
+        self._server_db_table = 'Database'
+        self.server_tables = None
+        self._server_db = None
+        self.server_monitor_uuid = uuid.uuid1()
+        self.leader_only = leader_only
+        self.cluster_id = None
+        self._min_index = 0
+
        self.state = self.IDL_S_INITIAL

        # Database locking.
@@ -172,6 +194,12 @@ class Idl(object):
                remotes.append(r)
        return remotes

+    def set_cluster_id(self, cluster_id):
+        """Set the id of the cluster that this idl must connect to."""
+        self.cluster_id = cluster_id
+        if self.state != self.IDL_S_INITIAL:
+            self.force_reconnect()
+
    def index_create(self, table, name):
        """Create a named multi-column index on a table"""
        return self.tables[table].rows.index_create(name)
@@ -222,7 +250,7 @@ class Idl(object):
            if seqno != self._last_seqno:
                self._last_seqno = seqno
                self.__txn_abort_all()
-                self.__send_monitor_request()
+                self.__send_server_schema_request()
                if self.lock_name:
                    self.__send_lock_request()
                break
@@ -230,6 +258,7 @@ class Idl(object):
            msg = self._session.recv()
            if msg is None:
                break
+
            if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
                    and msg.method == "update2"
                    and len(msg.params) == 2):
@@ -239,7 +268,15 @@ class Idl(object):
                    and msg.method == "update"
                    and len(msg.params) == 2):
                # Database contents changed.
-                self.__parse_update(msg.params[1], OVSDB_UPDATE)
+                if msg.params[0] == str(self.server_monitor_uuid):
+                    self.__parse_update(msg.params[1], OVSDB_UPDATE,
+                                        tables=self.server_tables)
+                    self.change_seqno = initial_change_seqno
+                    if not self.__check_server_db():
+                        self.force_reconnect()
+                        break
+                else:
+                    self.__parse_update(msg.params[1], OVSDB_UPDATE)
            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                  and self._monitor_request_id is not None
                  and self._monitor_request_id == msg.id<http://msg.id>):
@@ -248,17 +285,66 @@ class Idl(object):
                    self.change_seqno += 1
                    self._monitor_request_id = None
                    self.__clear()
-                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
                        self.__parse_update(msg.result, OVSDB_UPDATE2)
                    else:
-                        assert self.state == self.IDL_S_MONITOR_REQUESTED
+                        assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
                        self.__parse_update(msg.result, OVSDB_UPDATE)
-
                except error.Error as e:
                    vlog.err("%s: parse error in received schema: %s"
                             % (self._session.get_name(), e))
                    self.__error()
            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._server_schema_request_id is not None
+                  and self._server_schema_request_id == msg.id<http://msg.id>):
+                # Reply to our "get_schema" of _Server request.
+                try:
+                    self._server_schema_request_id = None
+                    sh = SchemaHelper(None, msg.result)
+                    sh.register_table(self._server_db_table)
+                    schema = sh.get_idl_schema()
+                    self._server_db = schema
+                    self.server_tables = schema.tables
+                    self.__send_server_monitor_request()
+                except error.Error as e:
+                    vlog.err("%s: error receiving server schema: %s"
+                             % (self._session.get_name(), e))
+                    if self.cluster_id:
+                        self.__error()
+                        break
+                    else:
+                        self.change_seqno = initial_change_seqno
+                        self.__send_monitor_request()
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._server_monitor_request_id is not None
+                  and self._server_monitor_request_id == msg.id<http://msg.id>):
+                # Reply to our "monitor" of _Server request.
+                try:
+                    self._server_monitor_request_id = None
+                    self.__parse_update(msg.result, OVSDB_UPDATE,
+                                        tables=self.server_tables)
+                    self.change_seqno = initial_change_seqno
+                    if self.__check_server_db():
+                        self.__send_monitor_request()
+                        self.__send_db_change_aware()
+                    else:
+                        self.force_reconnect()
+                        break
+                except error.Error as e:
+                    vlog.err("%s: parse error in received schema: %s"
+                             % (self._session.get_name(), e))
+                    if self.cluster_id:
+                        self.__error()
+                        break
+                    else:
+                        self.change_seqno = initial_change_seqno
+                        self.__send_monitor_request()
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._db_change_aware_request_id is not None
+                  and self._db_change_aware_request_id == msg.id<http://msg.id>):
+                # Reply to us notifying the server of our change awarness.
+                self._db_change_aware_request_id = None
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                  and self._lock_request_id is not None
                  and self._lock_request_id == msg.id<http://msg.id>):
                # Reply to our "lock" request.
@@ -275,10 +361,20 @@ class Idl(object):
                # Reply to our echo request.  Ignore it.
                pass
            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
-                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+                  self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
                  self._monitor_request_id == msg.id<http://msg.id>):
                if msg.error == "unknown method":
                    self.__send_monitor_request()
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self._server_schema_request_id is not None and
+                  self._server_schema_request_id == msg.id<http://msg.id>):
+                self._server_schema_request_id = None
+                if self.cluster_id:
+                    self.force_reconnect()
+                    break
+                else:
+                    self.change_seqno = initial_change_seqno
+                    self.__send_monitor_request()
            elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                               ovs.jsonrpc.Message.T_REPLY)
                  and self.__txn_process_reply(msg)):
@@ -342,6 +438,15 @@ class Idl(object):
        In the meantime, the contents of the IDL will not change."""
        self._session.force_reconnect()

+    def session_name(self):
+        return self._session.get_name()
+
+    def remotes(self):
+        return self._session.remotes
+
+    def db_name(self):
+        return self._db.name
+
    def set_lock(self, lock_name):
        """If 'lock_name' is not None, configures the IDL to obtain the named
        lock from the database server and to avoid modifying the database when
@@ -440,12 +545,19 @@ class Idl(object):
            if not new_has_lock:
                self.is_lock_contended = True

+    def __send_db_change_aware(self):
+        msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
+                                                 [True])
+        self._db_change_aware_request_id = msg.id<http://msg.id>
+        self._session.send(msg)
+
    def __send_monitor_request(self):
-        if self.state == self.IDL_S_INITIAL:
-            self.state = self.IDL_S_MONITOR_COND_REQUESTED
+        if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
+                           self.IDL_S_INITIAL]):
+            self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
            method = "monitor_cond"
        else:
-            self.state = self.IDL_S_MONITOR_REQUESTED
+            self.state = self.IDL_S_DATA_MONITOR_REQUESTED
            method = "monitor"

        monitor_requests = {}
@@ -467,20 +579,50 @@ class Idl(object):
        self._monitor_request_id = msg.id<http://msg.id>
        self._session.send(msg)

-    def __parse_update(self, update, version):
+    def __send_server_schema_request(self):
+        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+        msg = ovs.jsonrpc.Message.create_request(
+            "get_schema", [self._server_db_name, str(self.uuid)])
+        self._server_schema_request_id = msg.id<http://msg.id>
+        self._session.send(msg)
+
+    def __send_server_monitor_request(self):
+        self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
+        monitor_requests = {}
+        table = self.server_tables[self._server_db_table]
+        columns = [column for column in six.iterkeys(table.columns)]
+        for column in six.itervalues(table.columns):
+            if not hasattr(column, 'alert'):
+                column.alert = True
+        table.rows = custom_index.IndexedRows(table)
+        table.need_table = False
+        table.idl = self
+        monitor_request = {"columns": columns}
+        monitor_requests[table.name] = [monitor_request]
+        msg = ovs.jsonrpc.Message.create_request(
+            'monitor', [self._server_db.name,
+                             str(self.server_monitor_uuid),
+                             monitor_requests])
+        self._server_monitor_request_id = msg.id<http://msg.id>
+        self._session.send(msg)
+
+    def __parse_update(self, update, version, tables=None):
        try:
-            self.__do_parse_update(update, version)
+            if not tables:
+                self.__do_parse_update(update, version, self.tables)
+            else:
+                self.__do_parse_update(update, version, tables)
        except error.Error as e:
            vlog.err("%s: error parsing update: %s"
                     % (self._session.get_name(), e))

-    def __do_parse_update(self, table_updates, version):
+    def __do_parse_update(self, table_updates, version, tables):
        if not isinstance(table_updates, dict):
            raise error.Error("<table-updates> is not an object",
                              table_updates)

        for table_name, table_update in six.iteritems(table_updates):
-            table = self.tables.get(table_name)
+            table = tables.get(table_name)
            if not table:
                raise error.Error('<table-updates> includes unknown '
                                  'table "%s"' % table_name)
@@ -605,6 +747,58 @@ class Idl(object):
                self.notify(op, row, Row.from_json(self, table, uuid, old))
        return changed

+    def __check_server_db(self):
+        """Returns True if this is a valid server database, False otherwise."""
+        session_name = self.session_name()
+
+        if self._server_db_table not in self.server_tables:
+            vlog.info<http://vlog.info>("%s: server does not have %s table in its %s database"
+                      % (session_name, self._server_db_table,
+                         self._server_db_name))
+            return False
+
+        rows = self.server_tables[self._server_db_table].rows
+
+        database = None
+        for row in six.itervalues(rows):
+            if self.cluster_id:
+                if self.cluster_id in \
+                   map(lambda x: str(x)[:4], row.cid):
+                    database = row
+                    break
+            elif row.name == self._db.name:
+                database = row
+                break
+
+        if not database:
+            vlog.info<http://vlog.info>("%s: server does not have %s database"
+                      % (session_name, self._db.name))
+            return False
+
+        if (database.model == CLUSTERED and
+            self._session.get_num_of_remotes() > 1):
+            if not database.schema:
+                vlog.info<http://vlog.info>('%s: clustered database server has not yet joined '
+                          'cluster; trying another server' % session_name)
+                return False
+            if not database.connected:
+                vlog.info<http://vlog.info>('%s: clustered database server is disconnected '
+                          'from cluster; trying another server' % session_name)
+                return False
+            if (self.leader_only and
+                not database.leader):
+                vlog.info<http://vlog.info>('%s: clustered database server is not cluster '
+                          'leader; trying another server' % session_name)
+                return False
+            if database.index:
+                if database.index[0] < self._min_index:
+                    vlog.warn('%s: clustered database server has stale data; '
+                              'trying another server' % session_name)
+                    return False
+                self._min_index = database.index[0]
+
+        return True
+
    def __column_name(self, column):
        if column.type.key.type == ovs.db.types.UuidType:
            return ovs.ovsuuid.to_json(column.type.key.type.default)
diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
index 34cc769..afbe445 100644
--- a/python/ovs/reconnect.py
+++ b/python/ovs/reconnect.py
@@ -344,6 +344,9 @@ class Reconnect(object):
                else:
                    self.info_level("%s: error listening for connections"
                                    % self.name)
+            elif self.state == Reconnect.Reconnect:
+                self.info_level("%s: connection closed by client"
+                                % self.name)
            elif self.backoff < self.max_backoff:
                if self.passive:
                    type_ = "listen"
diff --git a/tests/ovsdb-idl.at<http://ovsdb-idl.at> b/tests/ovsdb-idl.at<http://ovsdb-idl.at>
index 8981b5e..95f28a9 100644
--- a/tests/ovsdb-idl.at<http://ovsdb-idl.at>
+++ b/tests/ovsdb-idl.at<http://ovsdb-idl.at>
@@ -11,7 +11,53 @@ ovsdb_start_idltest () {
    ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
    on_exit 'kill `cat ovsdb-server.pid`'
}
-])
+
+# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
+#
+# Creates a database using SCHEMA (default: idltest.ovsschema) and
+# starts a database cluster listening on punix:socket and REMOTE (if
+# specified).
+ovsdb_cluster_start_idltest () {
+   local n=$1
+   ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $?
+   cid=`ovsdb-tool db-cid s1.db`
+   schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
+   for i in `seq 2 $n`; do
+     ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $?
+   done
+   for i in `seq $n`; do
+     ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $?
+   done
+   on_exit 'kill `cat s*.pid`'
+}
+
+# ovsdb_cluster_leader [REMOTES] [DATABASE]
+#
+# Returns the leader of the DATABASE cluster.
+ovsdb_cluster_leader () {
+   remotes=$(echo $1 | tr "," "\n")
+   for remote in $remotes; do
+      ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true
+      if [[ $? == 0 ]]; then
+        port=$(echo $remote | cut -d':' -f 3)
+        log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./)
+        pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
+        echo "${remote}|${pid}"
+        return
+      fi
+   done
+}
+
+# ovsdb_cluster_kill_leader [REMOTES] [DATABASE]
+#
+# Kills the leader of the DATABASE cluster.
+ovsdb_cluster_kill_leader () {
+   leader=`ovsdb_cluster_leader $1 $2`
+   pid=$(echo $leader | cut -d'|' -f 2)
+   kill `cat $pid`
+}
+export -f ovsdb_cluster_leader
+export -f ovsdb_cluster_kill_leader])

This breaks the testsuite on FreeBSD:

 export: Illegal option -f

You can check ovsrobot builds here:
   https://urldefense.proofpoint.com/v2/url?u=https-3A__cirrus-2Dci.com_github_ovsrobot_ovs&d=DwICaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=BvlXcYCI55cHpeLNRQrAxcJi53TIcVN509ayWDY-6GI&s=jUxhbhLShiQ2iEDi5rjYIs_cXjfdZMtkHsv-UxhoVXw&e=


# OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
#                   [FILTER])
@@ -1466,40 +1512,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
       "where": [["i", "==", 0]]}]' \
    'reconnect']],
  [[000: empty
-001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
-002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+000: event:create, row={uuid=<0>}, updates=None
+000: event:create, row={uuid=<1>}, updates=None
+001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
+002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
003: {"error":null,"result":[{"count":2}]}
-004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>}
-004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
+004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
005: {"error":null,"result":[{"count":2}]}
-006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
-006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>}
-006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
-007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
-008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
+006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
+006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
+007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
+008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
009: {"error":null,"result":[{"count":2}]}
-010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
-010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
-010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
+010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
+010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
011: {"error":null,"result":[{"count":1}]}
-012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
013: reconnect
-014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+014: event:create, row={uuid=<0>}, updates=None
+014: event:create, row={uuid=<1>}, updates=None
+014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
015: done
]])

@@ -1853,3 +1903,31 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY],

CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
+
+# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
+# with multiple remotes to assert the idl connects to the leader of the Raft cluster
+m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
+  [AT_SETUP([$1])
+   AT_SKIP_IF([test $7 = no])
+   AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
+   m4_define([LPBK],[127.0.0.1])
+   AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
+   PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
+   PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
+   PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
+   remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
+   AT_CHECK([$8 $srcdir/test-ovsdb.py  -t40 idl $srcdir/idltest.ovsschema $remotes $3],
+        [0], [stdout], [ignore])
+   remote=$(ovsdb_cluster_leader $remotes "idltest")
+   leader=$(echo $remote | cut -d'|' -f 1)
+   AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
+   AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
+   [OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6],
+                 [$HAVE_PYTHON], [$PYTHON])
+    OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6],
+                 [$HAVE_PYTHON3], [$PYTHON3])])
+
+OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote' '+reconnect'])
+OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['leaderkill' 'remote' '+reconnect'])
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index 1d7c023..c36b073 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -430,6 +430,20 @@ def idl_set(idl, commands, step):
            sys.stdout.flush()
            txn.abort()
            return
+        elif name == "remote":
+            print("%03d: %s" % (step, idl.session_name()))
+            sys.stdout.flush()
+            txn.abort()
+            return
+        elif name == "leaderkill":
+            remotes = ','.join(idl.remotes())
+            command = 'ovsdb_cluster_kill_leader %s %s' \
+                % (remotes, idl.db_name())
+            os.system(command)
+            print("%03d: kill %s" % (step, idl.session_name()))
+            sys.stdout.flush()
+            txn.abort()
+            return
        elif name == "linktest":
            l1_0 = txn.insert(idl.tables["link1"])
            l1_0.i = 1
@@ -651,7 +665,6 @@ def do_idl(schema_file, remote, *commands):
            # Wait for update.
            while idl.change_seqno == seqno and not idl.run():
                rpc.run()
-

It's not necessary to remove this line.

                poller = ovs.poller.Poller()
                idl.wait(poller)
                rpc.wait(poller)
diff mbox series

Patch

diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 250e897..2b2d7c5 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -38,6 +38,8 @@  ROW_DELETE = "delete"
 OVSDB_UPDATE = 0
 OVSDB_UPDATE2 = 1
 
+CLUSTERED = "clustered"
+
 
 class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +94,13 @@  class Idl(object):
 """
 
     IDL_S_INITIAL = 0
-    IDL_S_MONITOR_REQUESTED = 1
-    IDL_S_MONITOR_COND_REQUESTED = 2
+    IDL_S_SERVER_SCHEMA_REQUESTED = 1
+    IDL_S_SERVER_MONITOR_REQUESTED = 2
+    IDL_S_DATA_MONITOR_REQUESTED = 3
+    IDL_S_DATA_MONITOR_COND_REQUESTED = 4
 
-    def __init__(self, remote, schema_helper, probe_interval=None):
+    def __init__(self, remote, schema_helper, probe_interval=None,
+                 leader_only=True):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
@@ -119,6 +124,9 @@  class Idl(object):
 
         The IDL uses and modifies 'schema' directly.
 
+        If 'leader_only' is set to True (default value) the IDL will only
+        monitor and transact with the leader of the cluster.
+
         If "probe_interval" is zero it disables the connection keepalive
         feature. If non-zero the value will be forced to at least 1000
         milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +145,20 @@  class Idl(object):
         self._last_seqno = None
         self.change_seqno = 0
         self.uuid = uuid.uuid1()
+
+        # Server monitor.
+        self._server_schema_request_id = None
+        self._server_monitor_request_id = None
+        self._db_change_aware_request_id = None
+        self._server_db_name = '_Server'
+        self._server_db_table = 'Database'
+        self.server_tables = None
+        self._server_db = None
+        self.server_monitor_uuid = uuid.uuid1()
+        self.leader_only = leader_only
+        self.cluster_id = None
+        self._min_index = 0
+
         self.state = self.IDL_S_INITIAL
 
         # Database locking.
@@ -172,6 +194,12 @@  class Idl(object):
                 remotes.append(r)
         return remotes
 
+    def set_cluster_id(self, cluster_id):
+        """Set the id of the cluster that this idl must connect to."""
+        self.cluster_id = cluster_id
+        if self.state != self.IDL_S_INITIAL:
+            self.force_reconnect()
+
     def index_create(self, table, name):
         """Create a named multi-column index on a table"""
         return self.tables[table].rows.index_create(name)
@@ -222,7 +250,7 @@  class Idl(object):
             if seqno != self._last_seqno:
                 self._last_seqno = seqno
                 self.__txn_abort_all()
-                self.__send_monitor_request()
+                self.__send_server_schema_request()
                 if self.lock_name:
                     self.__send_lock_request()
                 break
@@ -230,6 +258,7 @@  class Idl(object):
             msg = self._session.recv()
             if msg is None:
                 break
+
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
                     and msg.method == "update2"
                     and len(msg.params) == 2):
@@ -239,7 +268,15 @@  class Idl(object):
                     and msg.method == "update"
                     and len(msg.params) == 2):
                 # Database contents changed.
-                self.__parse_update(msg.params[1], OVSDB_UPDATE)
+                if msg.params[0] == str(self.server_monitor_uuid):
+                    self.__parse_update(msg.params[1], OVSDB_UPDATE,
+                                        tables=self.server_tables)
+                    self.change_seqno = initial_change_seqno
+                    if not self.__check_server_db():
+                        self.force_reconnect()
+                        break
+                else:
+                    self.__parse_update(msg.params[1], OVSDB_UPDATE)
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._monitor_request_id is not None
                   and self._monitor_request_id == msg.id):
@@ -248,17 +285,66 @@  class Idl(object):
                     self.change_seqno += 1
                     self._monitor_request_id = None
                     self.__clear()
-                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
                         self.__parse_update(msg.result, OVSDB_UPDATE2)
                     else:
-                        assert self.state == self.IDL_S_MONITOR_REQUESTED
+                        assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
                         self.__parse_update(msg.result, OVSDB_UPDATE)
-
                 except error.Error as e:
                     vlog.err("%s: parse error in received schema: %s"
                              % (self._session.get_name(), e))
                     self.__error()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._server_schema_request_id is not None
+                  and self._server_schema_request_id == msg.id):
+                # Reply to our "get_schema" of _Server request.
+                try:
+                    self._server_schema_request_id = None
+                    sh = SchemaHelper(None, msg.result)
+                    sh.register_table(self._server_db_table)
+                    schema = sh.get_idl_schema()
+                    self._server_db = schema
+                    self.server_tables = schema.tables
+                    self.__send_server_monitor_request()
+                except error.Error as e:
+                    vlog.err("%s: error receiving server schema: %s"
+                             % (self._session.get_name(), e))
+                    if self.cluster_id:
+                        self.__error()
+                        break
+                    else:
+                        self.change_seqno = initial_change_seqno
+                        self.__send_monitor_request()
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._server_monitor_request_id is not None
+                  and self._server_monitor_request_id == msg.id):
+                # Reply to our "monitor" of _Server request.
+                try:
+                    self._server_monitor_request_id = None
+                    self.__parse_update(msg.result, OVSDB_UPDATE,
+                                        tables=self.server_tables)
+                    self.change_seqno = initial_change_seqno
+                    if self.__check_server_db():
+                        self.__send_monitor_request()
+                        self.__send_db_change_aware()
+                    else:
+                        self.force_reconnect()
+                        break
+                except error.Error as e:
+                    vlog.err("%s: parse error in received schema: %s"
+                             % (self._session.get_name(), e))
+                    if self.cluster_id:
+                        self.__error()
+                        break
+                    else:
+                        self.change_seqno = initial_change_seqno
+                        self.__send_monitor_request()
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+                  and self._db_change_aware_request_id is not None
+                  and self._db_change_aware_request_id == msg.id):
+                # Reply to us notifying the server of our change awarness.
+                self._db_change_aware_request_id = None
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._lock_request_id is not None
                   and self._lock_request_id == msg.id):
                 # Reply to our "lock" request.
@@ -275,10 +361,20 @@  class Idl(object):
                 # Reply to our echo request.  Ignore it.
                 pass
             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
-                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+                  self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
                   self._monitor_request_id == msg.id):
                 if msg.error == "unknown method":
                     self.__send_monitor_request()
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self._server_schema_request_id is not None and
+                  self._server_schema_request_id == msg.id):
+                self._server_schema_request_id = None
+                if self.cluster_id:
+                    self.force_reconnect()
+                    break
+                else:
+                    self.change_seqno = initial_change_seqno
+                    self.__send_monitor_request()
             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                                ovs.jsonrpc.Message.T_REPLY)
                   and self.__txn_process_reply(msg)):
@@ -342,6 +438,15 @@  class Idl(object):
         In the meantime, the contents of the IDL will not change."""
         self._session.force_reconnect()
 
+    def session_name(self):
+        return self._session.get_name()
+
+    def remotes(self):
+        return self._session.remotes
+
+    def db_name(self):
+        return self._db.name
+
     def set_lock(self, lock_name):
         """If 'lock_name' is not None, configures the IDL to obtain the named
         lock from the database server and to avoid modifying the database when
@@ -440,12 +545,19 @@  class Idl(object):
             if not new_has_lock:
                 self.is_lock_contended = True
 
+    def __send_db_change_aware(self):
+        msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
+                                                 [True])
+        self._db_change_aware_request_id = msg.id
+        self._session.send(msg)
+
     def __send_monitor_request(self):
-        if self.state == self.IDL_S_INITIAL:
-            self.state = self.IDL_S_MONITOR_COND_REQUESTED
+        if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
+                           self.IDL_S_INITIAL]):
+            self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
             method = "monitor_cond"
         else:
-            self.state = self.IDL_S_MONITOR_REQUESTED
+            self.state = self.IDL_S_DATA_MONITOR_REQUESTED
             method = "monitor"
 
         monitor_requests = {}
@@ -467,20 +579,50 @@  class Idl(object):
         self._monitor_request_id = msg.id
         self._session.send(msg)
 
-    def __parse_update(self, update, version):
+    def __send_server_schema_request(self):
+        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+        msg = ovs.jsonrpc.Message.create_request(
+            "get_schema", [self._server_db_name, str(self.uuid)])
+        self._server_schema_request_id = msg.id
+        self._session.send(msg)
+
+    def __send_server_monitor_request(self):
+        self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
+        monitor_requests = {}
+        table = self.server_tables[self._server_db_table]
+        columns = [column for column in six.iterkeys(table.columns)]
+        for column in six.itervalues(table.columns):
+            if not hasattr(column, 'alert'):
+                column.alert = True
+        table.rows = custom_index.IndexedRows(table)
+        table.need_table = False
+        table.idl = self
+        monitor_request = {"columns": columns}
+        monitor_requests[table.name] = [monitor_request]
+        msg = ovs.jsonrpc.Message.create_request(
+            'monitor', [self._server_db.name,
+                             str(self.server_monitor_uuid),
+                             monitor_requests])
+        self._server_monitor_request_id = msg.id
+        self._session.send(msg)
+
+    def __parse_update(self, update, version, tables=None):
         try:
-            self.__do_parse_update(update, version)
+            if not tables:
+                self.__do_parse_update(update, version, self.tables)
+            else:
+                self.__do_parse_update(update, version, tables)
         except error.Error as e:
             vlog.err("%s: error parsing update: %s"
                      % (self._session.get_name(), e))
 
-    def __do_parse_update(self, table_updates, version):
+    def __do_parse_update(self, table_updates, version, tables):
         if not isinstance(table_updates, dict):
             raise error.Error("<table-updates> is not an object",
                               table_updates)
 
         for table_name, table_update in six.iteritems(table_updates):
-            table = self.tables.get(table_name)
+            table = tables.get(table_name)
             if not table:
                 raise error.Error('<table-updates> includes unknown '
                                   'table "%s"' % table_name)
@@ -605,6 +747,58 @@  class Idl(object):
                 self.notify(op, row, Row.from_json(self, table, uuid, old))
         return changed
 
+    def __check_server_db(self):
+        """Returns True if this is a valid server database, False otherwise."""
+        session_name = self.session_name()
+
+        if self._server_db_table not in self.server_tables:
+            vlog.info("%s: server does not have %s table in its %s database"
+                      % (session_name, self._server_db_table,
+                         self._server_db_name))
+            return False
+
+        rows = self.server_tables[self._server_db_table].rows
+
+        database = None
+        for row in six.itervalues(rows):
+            if self.cluster_id:
+                if self.cluster_id in \
+                   map(lambda x: str(x)[:4], row.cid):
+                    database = row
+                    break
+            elif row.name == self._db.name:
+                database = row
+                break
+
+        if not database:
+            vlog.info("%s: server does not have %s database"
+                      % (session_name, self._db.name))
+            return False
+
+        if (database.model == CLUSTERED and
+            self._session.get_num_of_remotes() > 1):
+            if not database.schema:
+                vlog.info('%s: clustered database server has not yet joined '
+                          'cluster; trying another server' % session_name)
+                return False
+            if not database.connected:
+                vlog.info('%s: clustered database server is disconnected '
+                          'from cluster; trying another server' % session_name)
+                return False
+            if (self.leader_only and
+                not database.leader):
+                vlog.info('%s: clustered database server is not cluster '
+                          'leader; trying another server' % session_name)
+                return False
+            if database.index:
+                if database.index[0] < self._min_index:
+                    vlog.warn('%s: clustered database server has stale data; '
+                              'trying another server' % session_name)
+                    return False
+                self._min_index = database.index[0]
+
+        return True
+
     def __column_name(self, column):
         if column.type.key.type == ovs.db.types.UuidType:
             return ovs.ovsuuid.to_json(column.type.key.type.default)
diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
index 34cc769..afbe445 100644
--- a/python/ovs/reconnect.py
+++ b/python/ovs/reconnect.py
@@ -344,6 +344,9 @@  class Reconnect(object):
                 else:
                     self.info_level("%s: error listening for connections"
                                     % self.name)
+            elif self.state == Reconnect.Reconnect:
+                self.info_level("%s: connection closed by client"
+                                % self.name)
             elif self.backoff < self.max_backoff:
                 if self.passive:
                     type_ = "listen"
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index 8981b5e..95f28a9 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -11,7 +11,53 @@  ovsdb_start_idltest () {
     ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
     on_exit 'kill `cat ovsdb-server.pid`'
 }
-])
+
+# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
+#
+# Creates a database using SCHEMA (default: idltest.ovsschema) and
+# starts a database cluster listening on punix:socket and REMOTE (if
+# specified).
+ovsdb_cluster_start_idltest () {
+   local n=$1
+   ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $?
+   cid=`ovsdb-tool db-cid s1.db`
+   schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
+   for i in `seq 2 $n`; do
+     ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $?
+   done
+   for i in `seq $n`; do
+     ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $?
+   done
+   on_exit 'kill `cat s*.pid`'
+}
+
+# ovsdb_cluster_leader [REMOTES] [DATABASE]
+#
+# Returns the leader of the DATABASE cluster.
+ovsdb_cluster_leader () {
+   remotes=$(echo $1 | tr "," "\n")
+   for remote in $remotes; do
+      ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true
+      if [[ $? == 0 ]]; then
+        port=$(echo $remote | cut -d':' -f 3)
+        log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./)
+        pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
+        echo "${remote}|${pid}"
+        return
+      fi
+   done
+}
+
+# ovsdb_cluster_kill_leader [REMOTES] [DATABASE]
+#
+# Kills the leader of the DATABASE cluster.
+ovsdb_cluster_kill_leader () {
+   leader=`ovsdb_cluster_leader $1 $2`
+   pid=$(echo $leader | cut -d'|' -f 2)
+   kill `cat $pid`
+}
+export -f ovsdb_cluster_leader
+export -f ovsdb_cluster_kill_leader])
 
 # OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
 #                   [FILTER])
@@ -1466,40 +1512,44 @@  OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
        "where": [["i", "==", 0]]}]' \
     'reconnect']],
   [[000: empty
-001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
-002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+000: event:create, row={uuid=<0>}, updates=None
+000: event:create, row={uuid=<1>}, updates=None
+001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
+002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
 003: {"error":null,"result":[{"count":2}]}
-004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>}
-004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
+004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
 005: {"error":null,"result":[{"count":2}]}
-006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
-006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>}
-006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
-007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
-008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
+006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
+006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
+007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
+008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
 009: {"error":null,"result":[{"count":2}]}
-010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
-010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
-010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
+010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
+010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
 011: {"error":null,"result":[{"count":1}]}
-012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
 013: reconnect
-014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+014: event:create, row={uuid=<0>}, updates=None
+014: event:create, row={uuid=<1>}, updates=None
+014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
 015: done
 ]])
 
@@ -1853,3 +1903,31 @@  m4_define([CHECK_STREAM_OPEN_BLOCK_PY],
 
 CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
 CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
+
+# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
+# with multiple remotes to assert the idl connects to the leader of the Raft cluster
+m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
+  [AT_SETUP([$1])
+   AT_SKIP_IF([test $7 = no])
+   AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
+   m4_define([LPBK],[127.0.0.1])
+   AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
+   PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
+   PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
+   PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
+   remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
+   AT_CHECK([$8 $srcdir/test-ovsdb.py  -t40 idl $srcdir/idltest.ovsschema $remotes $3],
+        [0], [stdout], [ignore])
+   remote=$(ovsdb_cluster_leader $remotes "idltest")
+   leader=$(echo $remote | cut -d'|' -f 1)
+   AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
+   AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
+   [OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6],
+                 [$HAVE_PYTHON], [$PYTHON])
+    OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6],
+                 [$HAVE_PYTHON3], [$PYTHON3])])
+
+OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote' '+reconnect'])
+OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['leaderkill' 'remote' '+reconnect'])
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index 1d7c023..c36b073 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -430,6 +430,20 @@  def idl_set(idl, commands, step):
             sys.stdout.flush()
             txn.abort()
             return
+        elif name == "remote":
+            print("%03d: %s" % (step, idl.session_name()))
+            sys.stdout.flush()
+            txn.abort()
+            return
+        elif name == "leaderkill":
+            remotes = ','.join(idl.remotes())
+            command = 'ovsdb_cluster_kill_leader %s %s' \
+                % (remotes, idl.db_name())
+            os.system(command)
+            print("%03d: kill %s" % (step, idl.session_name()))
+            sys.stdout.flush()
+            txn.abort()
+            return
         elif name == "linktest":
             l1_0 = txn.insert(idl.tables["link1"])
             l1_0.i = 1
@@ -651,7 +665,6 @@  def do_idl(schema_file, remote, *commands):
             # Wait for update.
             while idl.change_seqno == seqno and not idl.run():
                 rpc.run()
-
                 poller = ovs.poller.Poller()
                 idl.wait(poller)
                 rpc.wait(poller)