Message ID | 1546568184-202648-1-git-send-email-ted.elhourani@nutanix.com |
---|---|
State | Changes Requested |
Headers | show |
Series | [ovs-dev,v2] Monitor Database table to manage lifecycle of IDL client. | expand |
Hi Ted, This patch is failing the below test cases for me. All are python3 related. Can you please take a look if that's the case with you as well ? ****** 2139: simple idl, writing via IDL with unicode - Python3 FAILED ( ovsdb-idl.at:456) 2141: simple idl, writing via IDL with unicode - Python3 - register_columns FAILED (ovsdb-idl.at:456) 2143: simple idl, writing via IDL with unicode - Python3 - tcp FAILED ( ovsdb-idl.at:456) 2145: simple idl, writing via IDL with unicode - Python3 (multiple remotes) - tcp FAILED (ovsdb-idl.at:456) 2147: simple idl, writing via IDL with unicode - Python3 - tcp6 FAILED ( ovsdb-idl.at:456) 2149: simple idl, writing via IDL with unicode - Python3 (multiple remotes) - tcp6 FAILED (ovsdb-idl.at:456) 2151: simple idl, writing via IDL with unicode - Python3 - SSL FAILED ( ovsdb-idl.at:456) 2153: simple idl, writing large data via IDL with unicode - Python3 FAILED ( ovsdb-idl.at:490) ********** Please see some comments below. Thanks Numan On Fri, Jan 4, 2019 at 7:47 AM Ted Elhourani <ted.elhourani@nutanix.com> wrote: > The Python IDL implementation supports ovsdb cluster connections. > This patch is a follow up to commit 31e434fc98, it adds the option of > connecting to the leader (the default) in the Raft-based cluster. It mimics > the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. > > The _Server database schema is first requested, then a monitor of the > Database table in the _Server Database. Method __check_server_db verifies > the eligibility of the server. If the attempt to obtain a monitor of the > _Server database fails and a cluster id was not provided this > implementation > proceeds to request the data monitor. If a cluster id was provided via the > set_cluster_id method then the connection is aborted and a connection to a > different node is instead attempted, until a valid cluster node is found. > Thus, when supplied, cluster id is interpreted as the intention to only > allow connections to a clustered database. If not supplied, connections to > standalone nodes, or nodes that do not have the _Server database are > allowed. change_seqno is not incremented in the case of Database table > updates. > Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com> > --- > python/ovs/db/idl.py | 217 > ++++++++++++++++++++++++++++++++++++++++++++---- > python/ovs/reconnect.py | 3 + > tests/ovsdb-idl.at | 62 +++++++------- > 3 files changed, 237 insertions(+), 45 deletions(-) > > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py > index 250e897..f989548 100644 > --- a/python/ovs/db/idl.py > +++ b/python/ovs/db/idl.py > @@ -38,6 +38,7 @@ ROW_DELETE = "delete" > OVSDB_UPDATE = 0 > OVSDB_UPDATE2 = 1 > > +CLUSTERED = "clustered" > > class Idl(object): > """Open vSwitch Database Interface Definition Language (OVSDB IDL). > @@ -92,10 +93,12 @@ class Idl(object): > """ > > IDL_S_INITIAL = 0 > - IDL_S_MONITOR_REQUESTED = 1 > - IDL_S_MONITOR_COND_REQUESTED = 2 > + IDL_S_SERVER_SCHEMA_REQUESTED = 1 > + IDL_S_SERVER_MONITOR_REQUESTED = 2 > + IDL_S_DATA_MONITOR_REQUESTED = 3 > + IDL_S_DATA_MONITOR_COND_REQUESTED = 4 > > - def __init__(self, remote, schema_helper, probe_interval=None): > + def __init__(self, remote, schema_helper, leader_only=True, > probe_interval=None): > Can you please add the "leader_only" arg at the end. Older clients of python idl would break if they were creating Idl object as my_idl = Idl(remote, helper, 1000) """Creates and returns a connection to the database named > 'db_name' on > 'remote', which should be in a form acceptable to > ovs.jsonrpc.session.open(). The connection will maintain an > in-memory > @@ -119,6 +122,9 @@ class Idl(object): > > The IDL uses and modifies 'schema' directly. > > + If 'leader_only' is set to True (default value) the IDL will only > monitor > + and transact with the leader of the cluster. > + > If "probe_interval" is zero it disables the connection keepalive > feature. If non-zero the value will be forced to at least 1000 > milliseconds. If None it will just use the default value in OVS. > @@ -137,6 +143,20 @@ class Idl(object): > self._last_seqno = None > self.change_seqno = 0 > self.uuid = uuid.uuid1() > + > + # Server monitor. > + self._server_schema_request_id = None > + self._server_monitor_request_id = None > + self._db_change_aware_request_id = None > + self._server_db_name = '_Server' > + self._server_db_table = 'Database' > + self.server_tables = None > + self._server_db = None > + self.server_monitor_uuid = uuid.uuid1() > + self.leader_only = leader_only > + self.cluster_id = None > + self._min_index = 0 > + > self.state = self.IDL_S_INITIAL > > # Database locking. > @@ -172,6 +192,15 @@ class Idl(object): > remotes.append(r) > return remotes > > + def set_cluster_id(self, cluster_id): > + """Set the id of the cluster that this idl must connect to.""" > + if cluster_id: > + self.cluster_id = str(cluster_id) > + else: > + self.cluster_id = None > Instead of the "if - else", how about just self.cluster_id = cluster_id Can cluster_id be passed as an integer by the caller ? > + if self.state != self.IDL_S_INITIAL: > + self.force_reconnect() > + > def index_create(self, table, name): > """Create a named multi-column index on a table""" > return self.tables[table].rows.index_create(name) > @@ -222,7 +251,7 @@ class Idl(object): > if seqno != self._last_seqno: > self._last_seqno = seqno > self.__txn_abort_all() > - self.__send_monitor_request() > + self.__send_server_schema_request() > if self.lock_name: > self.__send_lock_request() > break > @@ -230,6 +259,7 @@ class Idl(object): > msg = self._session.recv() > if msg is None: > break > + > if (msg.type == ovs.jsonrpc.Message.T_NOTIFY > and msg.method == "update2" > and len(msg.params) == 2): > @@ -239,7 +269,15 @@ class Idl(object): > and msg.method == "update" > and len(msg.params) == 2): > # Database contents changed. > - self.__parse_update(msg.params[1], OVSDB_UPDATE) > + if msg.params[0] == str(self.server_monitor_uuid): > + self.__parse_update(msg.params[1], OVSDB_UPDATE, > + tables=self.server_tables) > + self.change_seqno = initial_change_seqno > + if not self.__check_server_db(): > + self.force_reconnect() > + break > + else: > + self.__parse_update(msg.params[1], OVSDB_UPDATE) > elif (msg.type == ovs.jsonrpc.Message.T_REPLY > and self._monitor_request_id is not None > and self._monitor_request_id == msg.id): > @@ -248,17 +286,66 @@ class Idl(object): > self.change_seqno += 1 > self._monitor_request_id = None > self.__clear() > - if self.state == self.IDL_S_MONITOR_COND_REQUESTED: > + if self.state == > self.IDL_S_DATA_MONITOR_COND_REQUESTED: > self.__parse_update(msg.result, OVSDB_UPDATE2) > else: > - assert self.state == self.IDL_S_MONITOR_REQUESTED > + assert self.state == > self.IDL_S_DATA_MONITOR_REQUESTED > self.__parse_update(msg.result, OVSDB_UPDATE) > - > except error.Error as e: > vlog.err("%s: parse error in received schema: %s" > % (self._session.get_name(), e)) > self.__error() > elif (msg.type == ovs.jsonrpc.Message.T_REPLY > + and self._server_schema_request_id is not None > + and self._server_schema_request_id == msg.id): > + # Reply to our "get_schema" of _Server request. > + try: > + self._server_schema_request_id = None > + sh = SchemaHelper(None, msg.result) > + sh.register_table(self._server_db_table) > + schema = sh.get_idl_schema() > + self._server_db = schema > + self.server_tables = schema.tables > + self.__send_server_monitor_request() > + except error.Error as e: > + vlog.err("%s: error receiving server schema: %s" > + % (self._session.get_name(), e)) > + if self.cluster_id: > + self.__error() > + break > + else: > + self.change_seqno = initial_change_seqno > + self.__send_monitor_request() > + elif (msg.type == ovs.jsonrpc.Message.T_REPLY > + and self._server_monitor_request_id is not None > + and self._server_monitor_request_id == msg.id): > + # Reply to our "monitor" of _Server request. > + try: > + self._server_monitor_request_id = None > + self.__parse_update(msg.result, OVSDB_UPDATE, > + tables=self.server_tables) > + self.change_seqno = initial_change_seqno > + if self.__check_server_db(): > + self.__send_monitor_request() > + self.__send_db_change_aware() > + else: > + self.force_reconnect() > + break > + except error.Error as e: > + vlog.err("%s: parse error in received schema: %s" > + % (self._session.get_name(), e)) > + if self.cluster_id: > + self.__error() > + break > + else: > + self.change_seqno = initial_change_seqno > + self.__send_monitor_request() > + elif (msg.type == ovs.jsonrpc.Message.T_REPLY > + and self._db_change_aware_request_id is not None > + and self._db_change_aware_request_id == msg.id): > + # Reply to us notifying the server of our change awarness. > + self._db_change_aware_request_id = None > + elif (msg.type == ovs.jsonrpc.Message.T_REPLY > and self._lock_request_id is not None > and self._lock_request_id == msg.id): > # Reply to our "lock" request. > @@ -275,10 +362,20 @@ class Idl(object): > # Reply to our echo request. Ignore it. > pass > elif (msg.type == ovs.jsonrpc.Message.T_ERROR and > - self.state == self.IDL_S_MONITOR_COND_REQUESTED and > + self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and > self._monitor_request_id == msg.id): > if msg.error == "unknown method": > self.__send_monitor_request() > + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and > + self._server_schema_request_id is not None and > + self._server_schema_request_id == msg.id): > + self._server_schema_request_id = None > + if self.cluster_id: > + self.force_reconnect() > + break > + else: > + self.change_seqno = initial_change_seqno > + self.__send_monitor_request() > elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, > ovs.jsonrpc.Message.T_REPLY) > and self.__txn_process_reply(msg)): > @@ -440,12 +537,19 @@ class Idl(object): > if not new_has_lock: > self.is_lock_contended = True > > + def __send_db_change_aware(self): > + msg = ovs.jsonrpc.Message.create_request("set_db_change_aware", > + [True]) > + self._db_change_aware_request_id = msg.id > + self._session.send(msg) > + > def __send_monitor_request(self): > - if self.state == self.IDL_S_INITIAL: > - self.state = self.IDL_S_MONITOR_COND_REQUESTED > + if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, > + self.IDL_S_INITIAL]): > + self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED > method = "monitor_cond" > else: > - self.state = self.IDL_S_MONITOR_REQUESTED > + self.state = self.IDL_S_DATA_MONITOR_REQUESTED > method = "monitor" > > monitor_requests = {} > @@ -467,20 +571,50 @@ class Idl(object): > self._monitor_request_id = msg.id > self._session.send(msg) > > - def __parse_update(self, update, version): > + def __send_server_schema_request(self): > + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED > + msg = ovs.jsonrpc.Message.create_request( > + "get_schema", [self._server_db_name, str(self.uuid)]) > + self._server_schema_request_id = msg.id > + res = self._session.send(msg) > + > + def __send_server_monitor_request(self): > + self.state = self.IDL_S_SERVER_MONITOR_REQUESTED > + monitor_requests = {} > + table = self.server_tables[self._server_db_table] > + columns = [column for column in six.iterkeys(table.columns)] > + for column in six.itervalues(table.columns): > + if not hasattr(column, 'alert'): > + column.alert = True > + table.rows = custom_index.IndexedRows(table) > + table.need_table = False > + table.idl = self > + monitor_request = {"columns": columns} > + monitor_requests[table.name] = [monitor_request] > + msg = ovs.jsonrpc.Message.create_request( > + 'monitor', [self._server_db.name, > + str(self.server_monitor_uuid), > + monitor_requests]) > + self._server_monitor_request_id = msg.id > + self._session.send(msg) > + > + def __parse_update(self, update, version, tables=None): > try: > - self.__do_parse_update(update, version) > + if not tables: > + self.__do_parse_update(update, version, self.tables) > + else: > + self.__do_parse_update(update, version, tables) > except error.Error as e: > vlog.err("%s: error parsing update: %s" > % (self._session.get_name(), e)) > > - def __do_parse_update(self, table_updates, version): > + def __do_parse_update(self, table_updates, version, tables): > if not isinstance(table_updates, dict): > raise error.Error("<table-updates> is not an object", > table_updates) > > for table_name, table_update in six.iteritems(table_updates): > - table = self.tables.get(table_name) > + table = tables.get(table_name) > if not table: > raise error.Error('<table-updates> includes unknown ' > 'table "%s"' % table_name) > @@ -605,6 +739,57 @@ class Idl(object): > self.notify(op, row, Row.from_json(self, table, uuid, > old)) > return changed > > + def __check_server_db(self): > + """Returns True if this is a valid ovsdb server, False > otherwise.""" > The function name seems a bit confusing. This function returns True if the server has "_Server" table and if it is a clustered db, additional checks are done. May be this function can be broken in 2 separate functions ? One to check if the server supports '_Server' db and other to check the status of cluster. + session_name = self._session.get_name() > + > + if self._server_db_table not in self.server_tables: > + vlog.info("%s: server does not have %s table in its %s > database" > + % (session_name, self._server_db_table, > self._server_db_name)) > + return False > + > + rows = self.server_tables[self._server_db_table].rows > + > + database = None > + for row in six.itervalues(rows): > + if self.cluster_id: > + if self.cluster_id in \ > + map(lambda x: str(x)[:4], row.cid): > + database = row > + break > + elif row.name == self._db.name: > + database = row > + break > + > + if not database: > + vlog.info("%s: server does not have %s database" > + % (session_name, self._db.name)) > + return False > + > + if (database.model == CLUSTERED and > + self._session.get_num_of_remotes() > 1): > + if not database.schema: > + vlog.info('%s: clustered database server has not yet > joined ' > + 'cluster; trying another server' % session_name) > + return False > + if not database.connected: > + vlog.info('%s: clustered database server is disconnected > ' > + 'from cluster; trying another server' % > session_name) > + return False > + if (self.leader_only and > + not database.leader): > + vlog.info('%s: clustered database server is not cluster ' > + 'leader; trying another server' % session_name) > + return False > + if database.index: > + if database.index[0] < self._min_index: > + vlog.warn('%s: clustered database server has stale > data; ' > + 'trying another server' % session_name) > + return False > + self._min_index = database.index[0] > + > + return True > + > def __column_name(self, column): > if column.type.key.type == ovs.db.types.UuidType: > return ovs.ovsuuid.to_json(column.type.key.type.default) > diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py > index 34cc769..afbe445 100644 > --- a/python/ovs/reconnect.py > +++ b/python/ovs/reconnect.py > @@ -344,6 +344,9 @@ class Reconnect(object): > else: > self.info_level("%s: error listening for connections" > % self.name) > + elif self.state == Reconnect.Reconnect: > + self.info_level("%s: connection closed by client" > + % self.name) > elif self.backoff < self.max_backoff: > if self.passive: > type_ = "listen" > diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at > index 8981b5e..46b047f 100644 > --- a/tests/ovsdb-idl.at > +++ b/tests/ovsdb-idl.at > @@ -1466,40 +1466,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify], > "where": [["i", "==", 0]]}]' \ > 'reconnect']], > [[000: empty > -001: > {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} > -002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] > ua=[] uuid=<1>}, updates=None > -002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None > -002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > -002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +000: event:create, row={uuid=<0>}, updates=None > +000: event:create, row={uuid=<1>}, updates=None > +001: > {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]} > +002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] > ua=[] uuid=<3>}, updates=None > +002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None > +002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> > +002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > 003: {"error":null,"result":[{"count":2}]} > -004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true > uuid=<0>} > -004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > -004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true > uuid=<2>} > +004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> > +004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > 005: {"error":null,"result":[{"count":2}]} > -006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] > sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>} > -006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 > uuid=<0>} > -006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > -006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > -007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} > -008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] > ba=[false] sa=[] ua=[] uuid=<6>}, updates=None > -008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] > uuid=<6> > -008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > -008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] > sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>} > +006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 > uuid=<2>} > +006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> > +006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > +007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]} > +008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] > ba=[false] sa=[] ua=[] uuid=<8>}, updates=None > +008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] > uuid=<8> > +008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> > +008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > 009: {"error":null,"result":[{"count":2}]} > -010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] > ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>} > -010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] > ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>} > -010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<6> > -010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] > uuid=<1> > -010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] > ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>} > +010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] > ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>} > +010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<8> > +010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] > uuid=<3> > +010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > 011: {"error":null,"result":[{"count":1}]} > -012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] > ba=[] sa=[] ua=[] uuid=<1>}, updates=None > -012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<6> > -012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] > ba=[] sa=[] ua=[] uuid=<3>}, updates=None > +012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<8> > +012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > 013: reconnect > -014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] > ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None > -014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None > -014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<6> > -014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] > ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None > +014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] > ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None > +014: event:create, row={uuid=<0>}, updates=None > +014: event:create, row={uuid=<1>}, updates=None > +014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<8> > +014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<6> <7>] uuid=<2> > 015: done > ]]) > > I think it would be good if we have few test cases which starts a cluster of ovsdb-server's and then -verify if the client has connected to the cluster leader - bring down the leader and check if the client connects to the new leader. Thanks Numan
On Fri, Jan 4, 2019 at 5:26 PM Numan Siddique <nusiddiq@redhat.com> wrote: > Hi Ted, > > This patch is failing the below test cases for me. All are python3 > related. Can you please > take a look if that's the case with you as well ? > ****** > 2139: simple idl, writing via IDL with unicode - Python3 FAILED ( > ovsdb-idl.at:456) > 2141: simple idl, writing via IDL with unicode - Python3 - > register_columns FAILED (ovsdb-idl.at:456) > 2143: simple idl, writing via IDL with unicode - Python3 - tcp FAILED ( > ovsdb-idl.at:456) > 2145: simple idl, writing via IDL with unicode - Python3 (multiple > remotes) - tcp FAILED (ovsdb-idl.at:456) > 2147: simple idl, writing via IDL with unicode - Python3 - tcp6 FAILED ( > ovsdb-idl.at:456) > 2149: simple idl, writing via IDL with unicode - Python3 (multiple > remotes) - tcp6 FAILED (ovsdb-idl.at:456) > 2151: simple idl, writing via IDL with unicode - Python3 - SSL FAILED ( > ovsdb-idl.at:456) > 2153: simple idl, writing large data via IDL with unicode - Python3 FAILED > (ovsdb-idl.at:490) > ********** > These failures are not related to this patch. Please ignore. There seems to be some issues with python3.7 with unicode in Fedora 29 and I think Timothy is planning to submit a patch to fix these failures. Thanks Numan > Please see some comments below. > > Thanks > Numan > > On Fri, Jan 4, 2019 at 7:47 AM Ted Elhourani <ted.elhourani@nutanix.com> > wrote: > >> The Python IDL implementation supports ovsdb cluster connections. >> This patch is a follow up to commit 31e434fc98, it adds the option of >> connecting to the leader (the default) in the Raft-based cluster. It >> mimics >> the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. >> >> The _Server database schema is first requested, then a monitor of the >> Database table in the _Server Database. Method __check_server_db verifies >> the eligibility of the server. If the attempt to obtain a monitor of the >> _Server database fails and a cluster id was not provided this >> implementation >> proceeds to request the data monitor. If a cluster id was provided via the >> set_cluster_id method then the connection is aborted and a connection to a >> different node is instead attempted, until a valid cluster node is found. >> Thus, when supplied, cluster id is interpreted as the intention to only >> allow connections to a clustered database. If not supplied, connections to >> standalone nodes, or nodes that do not have the _Server database are >> allowed. change_seqno is not incremented in the case of Database table >> updates. >> Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com> >> --- >> python/ovs/db/idl.py | 217 >> ++++++++++++++++++++++++++++++++++++++++++++---- >> python/ovs/reconnect.py | 3 + >> tests/ovsdb-idl.at | 62 +++++++------- >> 3 files changed, 237 insertions(+), 45 deletions(-) >> >> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py >> index 250e897..f989548 100644 >> --- a/python/ovs/db/idl.py >> +++ b/python/ovs/db/idl.py >> @@ -38,6 +38,7 @@ ROW_DELETE = "delete" >> OVSDB_UPDATE = 0 >> OVSDB_UPDATE2 = 1 >> >> +CLUSTERED = "clustered" >> >> class Idl(object): >> """Open vSwitch Database Interface Definition Language (OVSDB IDL). >> @@ -92,10 +93,12 @@ class Idl(object): >> """ >> >> IDL_S_INITIAL = 0 >> - IDL_S_MONITOR_REQUESTED = 1 >> - IDL_S_MONITOR_COND_REQUESTED = 2 >> + IDL_S_SERVER_SCHEMA_REQUESTED = 1 >> + IDL_S_SERVER_MONITOR_REQUESTED = 2 >> + IDL_S_DATA_MONITOR_REQUESTED = 3 >> + IDL_S_DATA_MONITOR_COND_REQUESTED = 4 >> >> - def __init__(self, remote, schema_helper, probe_interval=None): >> + def __init__(self, remote, schema_helper, leader_only=True, >> probe_interval=None): >> > > Can you please add the "leader_only" arg at the end. Older clients of > python idl would > break if they were creating Idl object as > my_idl = Idl(remote, helper, 1000) > > > """Creates and returns a connection to the database named >> 'db_name' on >> 'remote', which should be in a form acceptable to >> ovs.jsonrpc.session.open(). The connection will maintain an >> in-memory >> @@ -119,6 +122,9 @@ class Idl(object): >> >> The IDL uses and modifies 'schema' directly. >> >> + If 'leader_only' is set to True (default value) the IDL will >> only monitor >> + and transact with the leader of the cluster. >> + >> If "probe_interval" is zero it disables the connection keepalive >> feature. If non-zero the value will be forced to at least 1000 >> milliseconds. If None it will just use the default value in OVS. >> @@ -137,6 +143,20 @@ class Idl(object): >> self._last_seqno = None >> self.change_seqno = 0 >> self.uuid = uuid.uuid1() >> + >> + # Server monitor. >> + self._server_schema_request_id = None >> + self._server_monitor_request_id = None >> + self._db_change_aware_request_id = None >> + self._server_db_name = '_Server' >> + self._server_db_table = 'Database' >> + self.server_tables = None >> + self._server_db = None >> + self.server_monitor_uuid = uuid.uuid1() >> + self.leader_only = leader_only >> + self.cluster_id = None >> + self._min_index = 0 >> + >> self.state = self.IDL_S_INITIAL >> >> # Database locking. >> @@ -172,6 +192,15 @@ class Idl(object): >> remotes.append(r) >> return remotes >> >> + def set_cluster_id(self, cluster_id): >> + """Set the id of the cluster that this idl must connect to.""" >> + if cluster_id: >> + self.cluster_id = str(cluster_id) >> + else: >> + self.cluster_id = None >> > > Instead of the "if - else", how about just > self.cluster_id = cluster_id > > Can cluster_id be passed as an integer by the caller ? > > > >> + if self.state != self.IDL_S_INITIAL: >> + self.force_reconnect() >> + >> def index_create(self, table, name): >> """Create a named multi-column index on a table""" >> return self.tables[table].rows.index_create(name) >> @@ -222,7 +251,7 @@ class Idl(object): >> if seqno != self._last_seqno: >> self._last_seqno = seqno >> self.__txn_abort_all() >> - self.__send_monitor_request() >> + self.__send_server_schema_request() >> if self.lock_name: >> self.__send_lock_request() >> break >> @@ -230,6 +259,7 @@ class Idl(object): >> msg = self._session.recv() >> if msg is None: >> break >> + >> if (msg.type == ovs.jsonrpc.Message.T_NOTIFY >> and msg.method == "update2" >> and len(msg.params) == 2): >> @@ -239,7 +269,15 @@ class Idl(object): >> and msg.method == "update" >> and len(msg.params) == 2): >> # Database contents changed. >> - self.__parse_update(msg.params[1], OVSDB_UPDATE) >> + if msg.params[0] == str(self.server_monitor_uuid): >> + self.__parse_update(msg.params[1], OVSDB_UPDATE, >> + tables=self.server_tables) >> + self.change_seqno = initial_change_seqno >> + if not self.__check_server_db(): >> + self.force_reconnect() >> + break >> + else: >> + self.__parse_update(msg.params[1], OVSDB_UPDATE) >> elif (msg.type == ovs.jsonrpc.Message.T_REPLY >> and self._monitor_request_id is not None >> and self._monitor_request_id == msg.id): >> @@ -248,17 +286,66 @@ class Idl(object): >> self.change_seqno += 1 >> self._monitor_request_id = None >> self.__clear() >> - if self.state == self.IDL_S_MONITOR_COND_REQUESTED: >> + if self.state == >> self.IDL_S_DATA_MONITOR_COND_REQUESTED: >> self.__parse_update(msg.result, OVSDB_UPDATE2) >> else: >> - assert self.state == self.IDL_S_MONITOR_REQUESTED >> + assert self.state == >> self.IDL_S_DATA_MONITOR_REQUESTED >> self.__parse_update(msg.result, OVSDB_UPDATE) >> - >> except error.Error as e: >> vlog.err("%s: parse error in received schema: %s" >> % (self._session.get_name(), e)) >> self.__error() >> elif (msg.type == ovs.jsonrpc.Message.T_REPLY >> + and self._server_schema_request_id is not None >> + and self._server_schema_request_id == msg.id): >> + # Reply to our "get_schema" of _Server request. >> + try: >> + self._server_schema_request_id = None >> + sh = SchemaHelper(None, msg.result) >> + sh.register_table(self._server_db_table) >> + schema = sh.get_idl_schema() >> + self._server_db = schema >> + self.server_tables = schema.tables >> + self.__send_server_monitor_request() >> + except error.Error as e: >> + vlog.err("%s: error receiving server schema: %s" >> + % (self._session.get_name(), e)) >> + if self.cluster_id: >> + self.__error() >> + break >> + else: >> + self.change_seqno = initial_change_seqno >> + self.__send_monitor_request() >> + elif (msg.type == ovs.jsonrpc.Message.T_REPLY >> + and self._server_monitor_request_id is not None >> + and self._server_monitor_request_id == msg.id): >> + # Reply to our "monitor" of _Server request. >> + try: >> + self._server_monitor_request_id = None >> + self.__parse_update(msg.result, OVSDB_UPDATE, >> + tables=self.server_tables) >> + self.change_seqno = initial_change_seqno >> + if self.__check_server_db(): >> + self.__send_monitor_request() >> + self.__send_db_change_aware() >> + else: >> + self.force_reconnect() >> + break >> + except error.Error as e: >> + vlog.err("%s: parse error in received schema: %s" >> + % (self._session.get_name(), e)) >> + if self.cluster_id: >> + self.__error() >> + break >> + else: >> + self.change_seqno = initial_change_seqno >> + self.__send_monitor_request() >> + elif (msg.type == ovs.jsonrpc.Message.T_REPLY >> + and self._db_change_aware_request_id is not None >> + and self._db_change_aware_request_id == msg.id): >> + # Reply to us notifying the server of our change >> awarness. >> + self._db_change_aware_request_id = None >> + elif (msg.type == ovs.jsonrpc.Message.T_REPLY >> and self._lock_request_id is not None >> and self._lock_request_id == msg.id): >> # Reply to our "lock" request. >> @@ -275,10 +362,20 @@ class Idl(object): >> # Reply to our echo request. Ignore it. >> pass >> elif (msg.type == ovs.jsonrpc.Message.T_ERROR and >> - self.state == self.IDL_S_MONITOR_COND_REQUESTED and >> + self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED >> and >> self._monitor_request_id == msg.id): >> if msg.error == "unknown method": >> self.__send_monitor_request() >> + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and >> + self._server_schema_request_id is not None and >> + self._server_schema_request_id == msg.id): >> + self._server_schema_request_id = None >> + if self.cluster_id: >> + self.force_reconnect() >> + break >> + else: >> + self.change_seqno = initial_change_seqno >> + self.__send_monitor_request() >> elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, >> ovs.jsonrpc.Message.T_REPLY) >> and self.__txn_process_reply(msg)): >> @@ -440,12 +537,19 @@ class Idl(object): >> if not new_has_lock: >> self.is_lock_contended = True >> >> + def __send_db_change_aware(self): >> + msg = ovs.jsonrpc.Message.create_request("set_db_change_aware", >> + [True]) >> + self._db_change_aware_request_id = msg.id >> + self._session.send(msg) >> + >> def __send_monitor_request(self): >> - if self.state == self.IDL_S_INITIAL: >> - self.state = self.IDL_S_MONITOR_COND_REQUESTED >> + if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, >> + self.IDL_S_INITIAL]): >> + self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED >> method = "monitor_cond" >> else: >> - self.state = self.IDL_S_MONITOR_REQUESTED >> + self.state = self.IDL_S_DATA_MONITOR_REQUESTED >> method = "monitor" >> >> monitor_requests = {} >> @@ -467,20 +571,50 @@ class Idl(object): >> self._monitor_request_id = msg.id >> self._session.send(msg) >> >> - def __parse_update(self, update, version): >> + def __send_server_schema_request(self): >> + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED >> + msg = ovs.jsonrpc.Message.create_request( >> + "get_schema", [self._server_db_name, str(self.uuid)]) >> + self._server_schema_request_id = msg.id >> + res = self._session.send(msg) >> + >> + def __send_server_monitor_request(self): >> + self.state = self.IDL_S_SERVER_MONITOR_REQUESTED >> + monitor_requests = {} >> + table = self.server_tables[self._server_db_table] >> + columns = [column for column in six.iterkeys(table.columns)] >> + for column in six.itervalues(table.columns): >> + if not hasattr(column, 'alert'): >> + column.alert = True >> + table.rows = custom_index.IndexedRows(table) >> + table.need_table = False >> + table.idl = self >> + monitor_request = {"columns": columns} >> + monitor_requests[table.name] = [monitor_request] >> + msg = ovs.jsonrpc.Message.create_request( >> + 'monitor', [self._server_db.name, >> + str(self.server_monitor_uuid), >> + monitor_requests]) >> + self._server_monitor_request_id = msg.id >> + self._session.send(msg) >> + >> + def __parse_update(self, update, version, tables=None): >> try: >> - self.__do_parse_update(update, version) >> + if not tables: >> + self.__do_parse_update(update, version, self.tables) >> + else: >> + self.__do_parse_update(update, version, tables) >> except error.Error as e: >> vlog.err("%s: error parsing update: %s" >> % (self._session.get_name(), e)) >> >> - def __do_parse_update(self, table_updates, version): >> + def __do_parse_update(self, table_updates, version, tables): >> if not isinstance(table_updates, dict): >> raise error.Error("<table-updates> is not an object", >> table_updates) >> >> for table_name, table_update in six.iteritems(table_updates): >> - table = self.tables.get(table_name) >> + table = tables.get(table_name) >> if not table: >> raise error.Error('<table-updates> includes unknown ' >> 'table "%s"' % table_name) >> @@ -605,6 +739,57 @@ class Idl(object): >> self.notify(op, row, Row.from_json(self, table, uuid, >> old)) >> return changed >> >> + def __check_server_db(self): >> + """Returns True if this is a valid ovsdb server, False >> otherwise.""" >> > The function name seems a bit confusing. > This function returns True if the server has "_Server" table and if it > is a clustered db, additional checks are done. > May be this function can be broken in 2 separate functions ? > One to check if the server supports '_Server' db and other to check > the status of cluster. > > > + session_name = self._session.get_name() >> + >> + if self._server_db_table not in self.server_tables: >> + vlog.info("%s: server does not have %s table in its %s >> database" >> + % (session_name, self._server_db_table, >> self._server_db_name)) >> + return False >> + >> + rows = self.server_tables[self._server_db_table].rows >> + >> + database = None >> + for row in six.itervalues(rows): >> + if self.cluster_id: >> + if self.cluster_id in \ >> + map(lambda x: str(x)[:4], row.cid): >> + database = row >> + break >> + elif row.name == self._db.name: >> + database = row >> + break >> + >> + if not database: >> + vlog.info("%s: server does not have %s database" >> + % (session_name, self._db.name)) >> + return False >> + >> + if (database.model == CLUSTERED and >> + self._session.get_num_of_remotes() > 1): >> + if not database.schema: >> + vlog.info('%s: clustered database server has not yet >> joined ' >> + 'cluster; trying another server' % >> session_name) >> + return False >> + if not database.connected: >> + vlog.info('%s: clustered database server is >> disconnected ' >> + 'from cluster; trying another server' % >> session_name) >> + return False >> + if (self.leader_only and >> + not database.leader): >> + vlog.info('%s: clustered database server is not cluster >> ' >> + 'leader; trying another server' % session_name) >> + return False >> + if database.index: >> + if database.index[0] < self._min_index: >> + vlog.warn('%s: clustered database server has stale >> data; ' >> + 'trying another server' % session_name) >> + return False >> + self._min_index = database.index[0] >> + >> + return True >> + >> def __column_name(self, column): >> if column.type.key.type == ovs.db.types.UuidType: >> return ovs.ovsuuid.to_json(column.type.key.type.default) >> diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py >> index 34cc769..afbe445 100644 >> --- a/python/ovs/reconnect.py >> +++ b/python/ovs/reconnect.py >> @@ -344,6 +344,9 @@ class Reconnect(object): >> else: >> self.info_level("%s: error listening for connections" >> % self.name) >> + elif self.state == Reconnect.Reconnect: >> + self.info_level("%s: connection closed by client" >> + % self.name) >> elif self.backoff < self.max_backoff: >> if self.passive: >> type_ = "listen" >> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at >> index 8981b5e..46b047f 100644 >> --- a/tests/ovsdb-idl.at >> +++ b/tests/ovsdb-idl.at >> @@ -1466,40 +1466,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify], >> "where": [["i", "==", 0]]}]' \ >> 'reconnect']], >> [[000: empty >> -001: >> {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} >> -002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] >> ua=[] uuid=<1>}, updates=None >> -002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None >> -002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> >> -002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> +000: event:create, row={uuid=<0>}, updates=None >> +000: event:create, row={uuid=<1>}, updates=None >> +001: >> {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]} >> +002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] >> ua=[] uuid=<3>}, updates=None >> +002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None >> +002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> >> +002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> 003: {"error":null,"result":[{"count":2}]} >> -004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true >> uuid=<0>} >> -004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> >> -004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> +004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true >> uuid=<2>} >> +004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> >> +004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> 005: {"error":null,"result":[{"count":2}]} >> -006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] >> sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>} >> -006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 >> uuid=<0>} >> -006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> >> -006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> -007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} >> -008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] >> ba=[false] sa=[] ua=[] uuid=<6>}, updates=None >> -008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] >> uuid=<6> >> -008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> >> -008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> +006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] >> sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>} >> +006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 >> uuid=<2>} >> +006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> >> +006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> +007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]} >> +008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] >> ba=[false] sa=[] ua=[] uuid=<8>}, updates=None >> +008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] >> uuid=<8> >> +008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> >> +008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> 009: {"error":null,"result":[{"count":2}]} >> -010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] >> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>} >> -010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] >> ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>} >> -010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] >> sa=[] ua=[] uuid=<6> >> -010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] >> uuid=<1> >> -010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> +010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] >> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>} >> +010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] >> ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>} >> +010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] >> sa=[] ua=[] uuid=<8> >> +010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] >> uuid=<3> >> +010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> 011: {"error":null,"result":[{"count":1}]} >> -012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] >> ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None >> -012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] >> sa=[] ua=[] uuid=<6> >> -012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> +012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] >> ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None >> +012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] >> sa=[] ua=[] uuid=<8> >> +012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> 013: reconnect >> -014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] >> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None >> -014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None >> -014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] >> sa=[] ua=[] uuid=<6> >> -014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<4> <5>] uuid=<0> >> +014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] >> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None >> +014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] >> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None >> +014: event:create, row={uuid=<0>}, updates=None >> +014: event:create, row={uuid=<1>}, updates=None >> +014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] >> sa=[] ua=[] uuid=<8> >> +014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] >> sa=[abc def] ua=[<6> <7>] uuid=<2> >> 015: done >> ]]) >> >> > I think it would be good if we have few test cases which starts a cluster > of ovsdb-server's and then > -verify if the client has connected to the cluster leader > - bring down the leader and check if the client connects to the new > leader. > > > Thanks > Numan > > -- >> 1.8.3.1 >> >> _______________________________________________ >> dev mailing list >> dev@openvswitch.org >> https://mail.openvswitch.org/mailman/listinfo/ovs-dev >> >
Hi Numan,
Thanks for the review, please find responses inline.
Ted
On Jan 11, 2019, at 2:23 AM, Numan Siddique <nusiddiq@redhat.com<mailto:nusiddiq@redhat.com>> wrote:
On Fri, Jan 4, 2019 at 5:26 PM Numan Siddique <nusiddiq@redhat.com<mailto:nusiddiq@redhat.com>> wrote:
Hi Ted,
This patch is failing the below test cases for me. All are python3 related. Can you please
take a look if that's the case with you as well ?
******
2139: simple idl, writing via IDL with unicode - Python3 FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2141: simple idl, writing via IDL with unicode - Python3 - register_columns FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2143: simple idl, writing via IDL with unicode - Python3 - tcp FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2145: simple idl, writing via IDL with unicode - Python3 (multiple remotes) - tcp FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2147: simple idl, writing via IDL with unicode - Python3 - tcp6 FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2149: simple idl, writing via IDL with unicode - Python3 (multiple remotes) - tcp6 FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2151: simple idl, writing via IDL with unicode - Python3 - SSL FAILED (ovsdb-idl.at:456 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A456&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=O_FLff8wdsNhNgmdTrYVvYi3QYV2Jfdxca-_zhURfYI&e=>)
2153: simple idl, writing large data via IDL with unicode - Python3 FAILED (ovsdb-idl.at:490 [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at-3A490&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=d-KSVXlW0TVOTieNWZtsAIAx54VB7zPs-vp5FgMcwOg&e=>)
**********
These failures are not related to this patch. Please ignore. There seems to be some issues with python3.7 with unicode
in Fedora 29 and I think Timothy is planning to submit a patch to fix these failures.
Thanks
Numan
All tests are passing with Python 3.4.8 & Python 2.7.5 on my machine, no skips except for daemon.at<http://daemon.at/> (windows skip).
Please see some comments below.
Thanks
Numan
On Fri, Jan 4, 2019 at 7:47 AM Ted Elhourani <ted.elhourani@nutanix.com<mailto:ted.elhourani@nutanix.com>> wrote:
The Python IDL implementation supports ovsdb cluster connections.
This patch is a follow up to commit 31e434fc98, it adds the option of
connecting to the leader (the default) in the Raft-based cluster. It mimics
the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.
The _Server database schema is first requested, then a monitor of the
Database table in the _Server Database. Method __check_server_db verifies
the eligibility of the server. If the attempt to obtain a monitor of the
_Server database fails and a cluster id was not provided this implementation
proceeds to request the data monitor. If a cluster id was provided via the
set_cluster_id method then the connection is aborted and a connection to a
different node is instead attempted, until a valid cluster node is found.
Thus, when supplied, cluster id is interpreted as the intention to only
allow connections to a clustered database. If not supplied, connections to
standalone nodes, or nodes that do not have the _Server database are
allowed. change_seqno is not incremented in the case of Database table
updates.
Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com<mailto:ted.elhourani@nutanix.com>>
---
python/ovs/db/idl.py | 217 ++++++++++++++++++++++++++++++++++++++++++++----
python/ovs/reconnect.py | 3 +
tests/ovsdb-idl.at [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=4YuL3pa8C9yzpOxISvjLrpCF6emBAefujjcMeXTtzQA&e=> | 62 +++++++-------
3 files changed, 237 insertions(+), 45 deletions(-)
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 250e897..f989548 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -38,6 +38,7 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
+CLUSTERED = "clustered"
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +93,12 @@ class Idl(object):
"""
IDL_S_INITIAL = 0
- IDL_S_MONITOR_REQUESTED = 1
- IDL_S_MONITOR_COND_REQUESTED = 2
+ IDL_S_SERVER_SCHEMA_REQUESTED = 1
+ IDL_S_SERVER_MONITOR_REQUESTED = 2
+ IDL_S_DATA_MONITOR_REQUESTED = 3
+ IDL_S_DATA_MONITOR_COND_REQUESTED = 4
- def __init__(self, remote, schema_helper, probe_interval=None):
+ def __init__(self, remote, schema_helper, leader_only=True, probe_interval=None):
Can you please add the "leader_only" arg at the end. Older clients of python idl would
break if they were creating Idl object as
my_idl = Idl(remote, helper, 1000)
Will fix in next revision.
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
@@ -119,6 +122,9 @@ class Idl(object):
The IDL uses and modifies 'schema' directly.
+ If 'leader_only' is set to True (default value) the IDL will only monitor
+ and transact with the leader of the cluster.
+
If "probe_interval" is zero it disables the connection keepalive
feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +143,20 @@ class Idl(object):
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
+
+ # Server monitor.
+ self._server_schema_request_id = None
+ self._server_monitor_request_id = None
+ self._db_change_aware_request_id = None
+ self._server_db_name = '_Server'
+ self._server_db_table = 'Database'
+ self.server_tables = None
+ self._server_db = None
+ self.server_monitor_uuid = uuid.uuid1()
+ self.leader_only = leader_only
+ self.cluster_id = None
+ self._min_index = 0
+
self.state = self.IDL_S_INITIAL
# Database locking.
@@ -172,6 +192,15 @@ class Idl(object):
remotes.append(r)
return remotes
+ def set_cluster_id(self, cluster_id):
+ """Set the id of the cluster that this idl must connect to."""
+ if cluster_id:
+ self.cluster_id = str(cluster_id)
+ else:
+ self.cluster_id = None
Instead of the "if - else", how about just
self.cluster_id = cluster_id
Can cluster_id be passed as an integer by the caller ?
Since it is a substring of the UUID we can expect any client code to pass it as a string. I’ll fix in the next revision.
+ if self.state != self.IDL_S_INITIAL:
+ self.force_reconnect()
+
def index_create(self, table, name):
"""Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name)
@@ -222,7 +251,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
- self.__send_monitor_request()
+ self.__send_server_schema_request()
if self.lock_name:
self.__send_lock_request()
break
@@ -230,6 +259,7 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
+
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
@@ -239,7 +269,15 @@ class Idl(object):
and msg.method == "update"
and len(msg.params) == 2):
# Database contents changed.
- self.__parse_update(msg.params[1], OVSDB_UPDATE)
+ if msg.params[0] == str(self.server_monitor_uuid):
+ self.__parse_update(msg.params[1], OVSDB_UPDATE,
+ tables=self.server_tables)
+ self.change_seqno = initial_change_seqno
+ if not self.__check_server_db():
+ self.force_reconnect()
+ break
+ else:
+ self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
@@ -248,17 +286,66 @@ class Idl(object):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
- if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+ if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
- assert self.state == self.IDL_S_MONITOR_REQUESTED
+ assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__parse_update(msg.result, OVSDB_UPDATE)
-
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._server_schema_request_id is not None
+ and self._server_schema_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
+ # Reply to our "get_schema" of _Server request.
+ try:
+ self._server_schema_request_id = None
+ sh = SchemaHelper(None, msg.result)
+ sh.register_table(self._server_db_table)
+ schema = sh.get_idl_schema()
+ self._server_db = schema
+ self.server_tables = schema.tables
+ self.__send_server_monitor_request()
+ except error.Error as e:
+ vlog.err("%s: error receiving server schema: %s"
+ % (self._session.get_name(), e))
+ if self.cluster_id:
+ self.__error()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._server_monitor_request_id is not None
+ and self._server_monitor_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
+ # Reply to our "monitor" of _Server request.
+ try:
+ self._server_monitor_request_id = None
+ self.__parse_update(msg.result, OVSDB_UPDATE,
+ tables=self.server_tables)
+ self.change_seqno = initial_change_seqno
+ if self.__check_server_db():
+ self.__send_monitor_request()
+ self.__send_db_change_aware()
+ else:
+ self.force_reconnect()
+ break
+ except error.Error as e:
+ vlog.err("%s: parse error in received schema: %s"
+ % (self._session.get_name(), e))
+ if self.cluster_id:
+ self.__error()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._db_change_aware_request_id is not None
+ and self._db_change_aware_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
+ # Reply to us notifying the server of our change awarness.
+ self._db_change_aware_request_id = None
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
and self._lock_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
# Reply to our "lock" request.
@@ -275,10 +362,20 @@ class Idl(object):
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
- self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+ self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
if msg.error == "unknown method":
self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self._server_schema_request_id is not None and
+ self._server_schema_request_id == msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>):
+ self._server_schema_request_id = None
+ if self.cluster_id:
+ self.force_reconnect()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
@@ -440,12 +537,19 @@ class Idl(object):
if not new_has_lock:
self.is_lock_contended = True
+ def __send_db_change_aware(self):
+ msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
+ [True])
+ self._db_change_aware_request_id = msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>
+ self._session.send(msg)
+
def __send_monitor_request(self):
- if self.state == self.IDL_S_INITIAL:
- self.state = self.IDL_S_MONITOR_COND_REQUESTED
+ if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
+ self.IDL_S_INITIAL]):
+ self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
else:
- self.state = self.IDL_S_MONITOR_REQUESTED
+ self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
monitor_requests = {}
@@ -467,20 +571,50 @@ class Idl(object):
self._monitor_request_id = msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>
self._session.send(msg)
- def __parse_update(self, update, version):
+ def __send_server_schema_request(self):
+ self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+ msg = ovs.jsonrpc.Message.create_request(
+ "get_schema", [self._server_db_name, str(self.uuid)])
+ self._server_schema_request_id = msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>
+ res = self._session.send(msg)
+
+ def __send_server_monitor_request(self):
+ self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
+ monitor_requests = {}
+ table = self.server_tables[self._server_db_table]
+ columns = [column for column in six.iterkeys(table.columns)]
+ for column in six.itervalues(table.columns):
+ if not hasattr(column, 'alert'):
+ column.alert = True
+ table.rows = custom_index.IndexedRows(table)
+ table.need_table = False
+ table.idl = self
+ monitor_request = {"columns": columns}
+ monitor_requests[table.name [table.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__table.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=CT-ATV90ancafNslReWTWUlAP4U2yOEOTDyh5N9ChEA&e=>] = [monitor_request]
+ msg = ovs.jsonrpc.Message.create_request(
+ 'monitor', [self._server_db.name [server_db.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__server-5Fdb.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=wr8oQebCMEzbper7UCClxjT2UqYIFPTKJcWn5aTVxr4&e=>,
+ str(self.server_monitor_uuid),
+ monitor_requests])
+ self._server_monitor_request_id = msg.id [msg.id]<https://urldefense.proofpoint.com/v2/url?u=http-3A__msg.id&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=ipPX8qg6xX9LFHpiOIoYNtWpxlILFrRBvra7dbxF2hg&e=>
+ self._session.send(msg)
+
+ def __parse_update(self, update, version, tables=None):
try:
- self.__do_parse_update(update, version)
+ if not tables:
+ self.__do_parse_update(update, version, self.tables)
+ else:
+ self.__do_parse_update(update, version, tables)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))
- def __do_parse_update(self, table_updates, version):
+ def __do_parse_update(self, table_updates, version, tables):
if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
for table_name, table_update in six.iteritems(table_updates):
- table = self.tables.get(table_name)
+ table = tables.get(table_name)
if not table:
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
@@ -605,6 +739,57 @@ class Idl(object):
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
+ def __check_server_db(self):
+ """Returns True if this is a valid ovsdb server, False otherwise."""
The function name seems a bit confusing.
This function returns True if the server has "_Server" table and if it
is a clustered db, additional checks are done.
May be this function can be broken in 2 separate functions ?
One to check if the server supports '_Server' db and other to check
the status of cluster.
We are already checking for _Server support in the state machine, with the get-schema transaction (send_server_schema_request). This function is supposed to be identical to
https://github.com/openvswitch/ovs/blob/master/lib/ovsdb-idl.c#L1815
If the server supports _Server then the next step is to check if the Database table contains a row with a matching cid or db name if the client code did not set the cid (then of course the other columns for validity, e.g. leader). The table check "if self._server_db_table not in self.server_tables” adds to the confusion, and might be unnecessary. This function is checking the content of the Database table to determine the admissibility of the server. I’ll be happy to re-arrange if you still see a reason to split the check in two.
+ session_name = self._session.get_name()
+
+ if self._server_db_table not in self.server_tables:
+ vlog.info [vlog.info]<https://urldefense.proofpoint.com/v2/url?u=http-3A__vlog.info&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=6qLDhyx-IO4Gc5C3CnZcIAQADJSLT3fQMx6JHpH43ic&e=>("%s: server does not have %s table in its %s database"
+ % (session_name, self._server_db_table, self._server_db_name))
+ return False
+
+ rows = self.server_tables[self._server_db_table].rows
+
+ database = None
+ for row in six.itervalues(rows):
+ if self.cluster_id:
+ if self.cluster_id in \
+ map(lambda x: str(x)[:4], row.cid):
+ database = row
+ break
+ elif row.name [row.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__row.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=5_G5MzWIKTCLvLO7rzkOmVGA2rKjTF7NS2Xtg8fMYwE&e=> == self._db.name [db.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__db.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=Fnu7uRGwOKN3Lf2iporNo7OUbi7kT-zH1Sv9XcJoevU&e=>:
+ database = row
+ break
+
+ if not database:
+ vlog.info [vlog.info]<https://urldefense.proofpoint.com/v2/url?u=http-3A__vlog.info&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=6qLDhyx-IO4Gc5C3CnZcIAQADJSLT3fQMx6JHpH43ic&e=>("%s: server does not have %s database"
+ % (session_name, self._db.name [db.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__db.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=Fnu7uRGwOKN3Lf2iporNo7OUbi7kT-zH1Sv9XcJoevU&e=>))
+ return False
+
+ if (database.model == CLUSTERED and
+ self._session.get_num_of_remotes() > 1):
+ if not database.schema:
+ vlog.info [vlog.info]<https://urldefense.proofpoint.com/v2/url?u=http-3A__vlog.info&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=6qLDhyx-IO4Gc5C3CnZcIAQADJSLT3fQMx6JHpH43ic&e=>('%s: clustered database server has not yet joined '
+ 'cluster; trying another server' % session_name)
+ return False
+ if not database.connected:
+ vlog.info [vlog.info]<https://urldefense.proofpoint.com/v2/url?u=http-3A__vlog.info&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=6qLDhyx-IO4Gc5C3CnZcIAQADJSLT3fQMx6JHpH43ic&e=>('%s: clustered database server is disconnected '
+ 'from cluster; trying another server' % session_name)
+ return False
+ if (self.leader_only and
+ not database.leader):
+ vlog.info [vlog.info]<https://urldefense.proofpoint.com/v2/url?u=http-3A__vlog.info&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=6qLDhyx-IO4Gc5C3CnZcIAQADJSLT3fQMx6JHpH43ic&e=>('%s: clustered database server is not cluster '
+ 'leader; trying another server' % session_name)
+ return False
+ if database.index:
+ if database.index[0] < self._min_index:
+ vlog.warn('%s: clustered database server has stale data; '
+ 'trying another server' % session_name)
+ return False
+ self._min_index = database.index[0]
+
+ return True
+
def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default)
diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
index 34cc769..afbe445 100644
--- a/python/ovs/reconnect.py
+++ b/python/ovs/reconnect.py
@@ -344,6 +344,9 @@ class Reconnect(object):
else:
self.info_level("%s: error listening for connections"
% self.name [self.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__self.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=uoKThZ5fYJU0gn-r7DWLvQd6I-AB-poCZd0ROFidptU&e=>)
+ elif self.state == Reconnect.Reconnect:
+ self.info_level("%s: connection closed by client"
+ % self.name [self.name]<https://urldefense.proofpoint.com/v2/url?u=http-3A__self.name&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=uoKThZ5fYJU0gn-r7DWLvQd6I-AB-poCZd0ROFidptU&e=>)
elif self.backoff < self.max_backoff:
if self.passive:
type_ = "listen"
diff --git a/tests/ovsdb-idl.at [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=4YuL3pa8C9yzpOxISvjLrpCF6emBAefujjcMeXTtzQA&e=> b/tests/ovsdb-idl.at [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=4YuL3pa8C9yzpOxISvjLrpCF6emBAefujjcMeXTtzQA&e=>
index 8981b5e..46b047f 100644
--- a/tests/ovsdb-idl.at [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=4YuL3pa8C9yzpOxISvjLrpCF6emBAefujjcMeXTtzQA&e=>
+++ b/tests/ovsdb-idl.at [ovsdb-idl.at]<https://urldefense.proofpoint.com/v2/url?u=http-3A__ovsdb-2Didl.at&d=DwMFaQ&c=s883GpUCOChKOHiocYtGcg&r=mB3ItZGjEYI9nn4mb7sBHd5RJ9XbfVZnYVRmdST65Cg&m=mnAThoC2lT5Ia3WO-qtYlzsSvtKGm0f60ORMPJ1apUc&s=4YuL3pa8C9yzpOxISvjLrpCF6emBAefujjcMeXTtzQA&e=>
@@ -1466,40 +1466,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
"where": [["i", "==", 0]]}]' \
'reconnect']],
[[000: empty
-001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
-002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+000: event:create, row={uuid=<0>}, updates=None
+000: event:create, row={uuid=<1>}, updates=None
+001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
+002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
003: {"error":null,"result":[{"count":2}]}
-004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>}
-004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
+004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
005: {"error":null,"result":[{"count":2}]}
-006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
-006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>}
-006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
-007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
-008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
+006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
+006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
+007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
+008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
009: {"error":null,"result":[{"count":2}]}
-010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
-010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
-010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
+010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
+010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
011: {"error":null,"result":[{"count":1}]}
-012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
013: reconnect
-014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+014: event:create, row={uuid=<0>}, updates=None
+014: event:create, row={uuid=<1>}, updates=None
+014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
015: done
]])
I think it would be good if we have few test cases which starts a cluster of ovsdb-server's and then
-verify if the client has connected to the cluster leader
- bring down the leader and check if the client connects to the new leader.
I wrote 2 tests, I’ll include them in the next revision.
Thanks
Numan
--
1.8.3.1
On Fri, Jan 04, 2019 at 02:16:36AM +0000, Ted Elhourani wrote: > The Python IDL implementation supports ovsdb cluster connections. > This patch is a follow up to commit 31e434fc98, it adds the option of > connecting to the leader (the default) in the Raft-based cluster. It mimics > the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. I'm getting the following reports from flake8 when I apply this patch. Can you fix those up for v3? ../python/ovs/db/idl.py:43:1: E302 expected 2 blank lines, found 1 ../python/ovs/db/idl.py:101:80: E501 line too long (85 > 79 characters) ../python/ovs/db/idl.py:125:80: E501 line too long (81 > 79 characters) ../python/ovs/db/idl.py:579:9: F841 local variable 'res' is assigned to but never used ../python/ovs/db/idl.py:748:80: E501 line too long (84 > 79 characters) Thanks, Ben.
Sure, I fixed and verified with “make flake8-check”. Ted > On Jan 16, 2019, at 12:29 PM, Ben Pfaff <blp@ovn.org> wrote: > > On Fri, Jan 04, 2019 at 02:16:36AM +0000, Ted Elhourani wrote: >> The Python IDL implementation supports ovsdb cluster connections. >> This patch is a follow up to commit 31e434fc98, it adds the option of >> connecting to the leader (the default) in the Raft-based cluster. It mimics >> the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. > > I'm getting the following reports from flake8 when I apply this patch. > Can you fix those up for v3? > > ../python/ovs/db/idl.py:43:1: E302 expected 2 blank lines, found 1 > ../python/ovs/db/idl.py:101:80: E501 line too long (85 > 79 characters) > ../python/ovs/db/idl.py:125:80: E501 line too long (81 > 79 characters) > ../python/ovs/db/idl.py:579:9: F841 local variable 'res' is assigned to but never used > ../python/ovs/db/idl.py:748:80: E501 line too long (84 > 79 characters) > > Thanks, > > Ben.
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index 250e897..f989548 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -38,6 +38,7 @@ ROW_DELETE = "delete" OVSDB_UPDATE = 0 OVSDB_UPDATE2 = 1 +CLUSTERED = "clustered" class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -92,10 +93,12 @@ class Idl(object): """ IDL_S_INITIAL = 0 - IDL_S_MONITOR_REQUESTED = 1 - IDL_S_MONITOR_COND_REQUESTED = 2 + IDL_S_SERVER_SCHEMA_REQUESTED = 1 + IDL_S_SERVER_MONITOR_REQUESTED = 2 + IDL_S_DATA_MONITOR_REQUESTED = 3 + IDL_S_DATA_MONITOR_COND_REQUESTED = 4 - def __init__(self, remote, schema_helper, probe_interval=None): + def __init__(self, remote, schema_helper, leader_only=True, probe_interval=None): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to ovs.jsonrpc.session.open(). The connection will maintain an in-memory @@ -119,6 +122,9 @@ class Idl(object): The IDL uses and modifies 'schema' directly. + If 'leader_only' is set to True (default value) the IDL will only monitor + and transact with the leader of the cluster. + If "probe_interval" is zero it disables the connection keepalive feature. If non-zero the value will be forced to at least 1000 milliseconds. If None it will just use the default value in OVS. @@ -137,6 +143,20 @@ class Idl(object): self._last_seqno = None self.change_seqno = 0 self.uuid = uuid.uuid1() + + # Server monitor. + self._server_schema_request_id = None + self._server_monitor_request_id = None + self._db_change_aware_request_id = None + self._server_db_name = '_Server' + self._server_db_table = 'Database' + self.server_tables = None + self._server_db = None + self.server_monitor_uuid = uuid.uuid1() + self.leader_only = leader_only + self.cluster_id = None + self._min_index = 0 + self.state = self.IDL_S_INITIAL # Database locking. @@ -172,6 +192,15 @@ class Idl(object): remotes.append(r) return remotes + def set_cluster_id(self, cluster_id): + """Set the id of the cluster that this idl must connect to.""" + if cluster_id: + self.cluster_id = str(cluster_id) + else: + self.cluster_id = None + if self.state != self.IDL_S_INITIAL: + self.force_reconnect() + def index_create(self, table, name): """Create a named multi-column index on a table""" return self.tables[table].rows.index_create(name) @@ -222,7 +251,7 @@ class Idl(object): if seqno != self._last_seqno: self._last_seqno = seqno self.__txn_abort_all() - self.__send_monitor_request() + self.__send_server_schema_request() if self.lock_name: self.__send_lock_request() break @@ -230,6 +259,7 @@ class Idl(object): msg = self._session.recv() if msg is None: break + if (msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.method == "update2" and len(msg.params) == 2): @@ -239,7 +269,15 @@ class Idl(object): and msg.method == "update" and len(msg.params) == 2): # Database contents changed. - self.__parse_update(msg.params[1], OVSDB_UPDATE) + if msg.params[0] == str(self.server_monitor_uuid): + self.__parse_update(msg.params[1], OVSDB_UPDATE, + tables=self.server_tables) + self.change_seqno = initial_change_seqno + if not self.__check_server_db(): + self.force_reconnect() + break + else: + self.__parse_update(msg.params[1], OVSDB_UPDATE) elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._monitor_request_id is not None and self._monitor_request_id == msg.id): @@ -248,17 +286,66 @@ class Idl(object): self.change_seqno += 1 self._monitor_request_id = None self.__clear() - if self.state == self.IDL_S_MONITOR_COND_REQUESTED: + if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: self.__parse_update(msg.result, OVSDB_UPDATE2) else: - assert self.state == self.IDL_S_MONITOR_REQUESTED + assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED self.__parse_update(msg.result, OVSDB_UPDATE) - except error.Error as e: vlog.err("%s: parse error in received schema: %s" % (self._session.get_name(), e)) self.__error() elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._server_schema_request_id is not None + and self._server_schema_request_id == msg.id): + # Reply to our "get_schema" of _Server request. + try: + self._server_schema_request_id = None + sh = SchemaHelper(None, msg.result) + sh.register_table(self._server_db_table) + schema = sh.get_idl_schema() + self._server_db = schema + self.server_tables = schema.tables + self.__send_server_monitor_request() + except error.Error as e: + vlog.err("%s: error receiving server schema: %s" + % (self._session.get_name(), e)) + if self.cluster_id: + self.__error() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._server_monitor_request_id is not None + and self._server_monitor_request_id == msg.id): + # Reply to our "monitor" of _Server request. + try: + self._server_monitor_request_id = None + self.__parse_update(msg.result, OVSDB_UPDATE, + tables=self.server_tables) + self.change_seqno = initial_change_seqno + if self.__check_server_db(): + self.__send_monitor_request() + self.__send_db_change_aware() + else: + self.force_reconnect() + break + except error.Error as e: + vlog.err("%s: parse error in received schema: %s" + % (self._session.get_name(), e)) + if self.cluster_id: + self.__error() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._db_change_aware_request_id is not None + and self._db_change_aware_request_id == msg.id): + # Reply to us notifying the server of our change awarness. + self._db_change_aware_request_id = None + elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._lock_request_id is not None and self._lock_request_id == msg.id): # Reply to our "lock" request. @@ -275,10 +362,20 @@ class Idl(object): # Reply to our echo request. Ignore it. pass elif (msg.type == ovs.jsonrpc.Message.T_ERROR and - self.state == self.IDL_S_MONITOR_COND_REQUESTED and + self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and self._monitor_request_id == msg.id): if msg.error == "unknown method": self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self._server_schema_request_id is not None and + self._server_schema_request_id == msg.id): + self._server_schema_request_id = None + if self.cluster_id: + self.force_reconnect() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, ovs.jsonrpc.Message.T_REPLY) and self.__txn_process_reply(msg)): @@ -440,12 +537,19 @@ class Idl(object): if not new_has_lock: self.is_lock_contended = True + def __send_db_change_aware(self): + msg = ovs.jsonrpc.Message.create_request("set_db_change_aware", + [True]) + self._db_change_aware_request_id = msg.id + self._session.send(msg) + def __send_monitor_request(self): - if self.state == self.IDL_S_INITIAL: - self.state = self.IDL_S_MONITOR_COND_REQUESTED + if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, + self.IDL_S_INITIAL]): + self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED method = "monitor_cond" else: - self.state = self.IDL_S_MONITOR_REQUESTED + self.state = self.IDL_S_DATA_MONITOR_REQUESTED method = "monitor" monitor_requests = {} @@ -467,20 +571,50 @@ class Idl(object): self._monitor_request_id = msg.id self._session.send(msg) - def __parse_update(self, update, version): + def __send_server_schema_request(self): + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED + msg = ovs.jsonrpc.Message.create_request( + "get_schema", [self._server_db_name, str(self.uuid)]) + self._server_schema_request_id = msg.id + res = self._session.send(msg) + + def __send_server_monitor_request(self): + self.state = self.IDL_S_SERVER_MONITOR_REQUESTED + monitor_requests = {} + table = self.server_tables[self._server_db_table] + columns = [column for column in six.iterkeys(table.columns)] + for column in six.itervalues(table.columns): + if not hasattr(column, 'alert'): + column.alert = True + table.rows = custom_index.IndexedRows(table) + table.need_table = False + table.idl = self + monitor_request = {"columns": columns} + monitor_requests[table.name] = [monitor_request] + msg = ovs.jsonrpc.Message.create_request( + 'monitor', [self._server_db.name, + str(self.server_monitor_uuid), + monitor_requests]) + self._server_monitor_request_id = msg.id + self._session.send(msg) + + def __parse_update(self, update, version, tables=None): try: - self.__do_parse_update(update, version) + if not tables: + self.__do_parse_update(update, version, self.tables) + else: + self.__do_parse_update(update, version, tables) except error.Error as e: vlog.err("%s: error parsing update: %s" % (self._session.get_name(), e)) - def __do_parse_update(self, table_updates, version): + def __do_parse_update(self, table_updates, version, tables): if not isinstance(table_updates, dict): raise error.Error("<table-updates> is not an object", table_updates) for table_name, table_update in six.iteritems(table_updates): - table = self.tables.get(table_name) + table = tables.get(table_name) if not table: raise error.Error('<table-updates> includes unknown ' 'table "%s"' % table_name) @@ -605,6 +739,57 @@ class Idl(object): self.notify(op, row, Row.from_json(self, table, uuid, old)) return changed + def __check_server_db(self): + """Returns True if this is a valid ovsdb server, False otherwise.""" + session_name = self._session.get_name() + + if self._server_db_table not in self.server_tables: + vlog.info("%s: server does not have %s table in its %s database" + % (session_name, self._server_db_table, self._server_db_name)) + return False + + rows = self.server_tables[self._server_db_table].rows + + database = None + for row in six.itervalues(rows): + if self.cluster_id: + if self.cluster_id in \ + map(lambda x: str(x)[:4], row.cid): + database = row + break + elif row.name == self._db.name: + database = row + break + + if not database: + vlog.info("%s: server does not have %s database" + % (session_name, self._db.name)) + return False + + if (database.model == CLUSTERED and + self._session.get_num_of_remotes() > 1): + if not database.schema: + vlog.info('%s: clustered database server has not yet joined ' + 'cluster; trying another server' % session_name) + return False + if not database.connected: + vlog.info('%s: clustered database server is disconnected ' + 'from cluster; trying another server' % session_name) + return False + if (self.leader_only and + not database.leader): + vlog.info('%s: clustered database server is not cluster ' + 'leader; trying another server' % session_name) + return False + if database.index: + if database.index[0] < self._min_index: + vlog.warn('%s: clustered database server has stale data; ' + 'trying another server' % session_name) + return False + self._min_index = database.index[0] + + return True + def __column_name(self, column): if column.type.key.type == ovs.db.types.UuidType: return ovs.ovsuuid.to_json(column.type.key.type.default) diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py index 34cc769..afbe445 100644 --- a/python/ovs/reconnect.py +++ b/python/ovs/reconnect.py @@ -344,6 +344,9 @@ class Reconnect(object): else: self.info_level("%s: error listening for connections" % self.name) + elif self.state == Reconnect.Reconnect: + self.info_level("%s: connection closed by client" + % self.name) elif self.backoff < self.max_backoff: if self.passive: type_ = "listen" diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at index 8981b5e..46b047f 100644 --- a/tests/ovsdb-idl.at +++ b/tests/ovsdb-idl.at @@ -1466,40 +1466,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify], "where": [["i", "==", 0]]}]' \ 'reconnect']], [[000: empty -001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} -002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None -002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None -002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +000: event:create, row={uuid=<0>}, updates=None +000: event:create, row={uuid=<1>}, updates=None +001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]} +002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None +002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None +002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 003: {"error":null,"result":[{"count":2}]} -004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>} -004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>} +004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 005: {"error":null,"result":[{"count":2}]} -006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>} -006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>} -006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> -007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} -008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None -008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>} +006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>} +006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> +007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]} +008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None +008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 009: {"error":null,"result":[{"count":2}]} -010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>} -010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>} -010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>} +010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>} +010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 011: {"error":null,"result":[{"count":1}]} -012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None -012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None +012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 013: reconnect -014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None -014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None -014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None +014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None +014: event:create, row={uuid=<0>}, updates=None +014: event:create, row={uuid=<1>}, updates=None +014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 015: done ]])
The Python IDL implementation supports ovsdb cluster connections. This patch is a follow up to commit 31e434fc98, it adds the option of connecting to the leader (the default) in the Raft-based cluster. It mimics the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. The _Server database schema is first requested, then a monitor of the Database table in the _Server Database. Method __check_server_db verifies the eligibility of the server. If the attempt to obtain a monitor of the _Server database fails and a cluster id was not provided this implementation proceeds to request the data monitor. If a cluster id was provided via the set_cluster_id method then the connection is aborted and a connection to a different node is instead attempted, until a valid cluster node is found. Thus, when supplied, cluster id is interpreted as the intention to only allow connections to a clustered database. If not supplied, connections to standalone nodes, or nodes that do not have the _Server database are allowed. change_seqno is not incremented in the case of Database table updates. Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com> --- python/ovs/db/idl.py | 217 ++++++++++++++++++++++++++++++++++++++++++++---- python/ovs/reconnect.py | 3 + tests/ovsdb-idl.at | 62 +++++++------- 3 files changed, 237 insertions(+), 45 deletions(-)