From patchwork Fri Jan 25 19:10:01 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Ted Elhourani X-Patchwork-Id: 1031257 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=openvswitch.org (client-ip=140.211.169.12; helo=mail.linuxfoundation.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=nutanix.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=nutanix.com header.i=@nutanix.com header.b="jaxCimsX"; dkim-atps=neutral Received: from mail.linuxfoundation.org (mail.linuxfoundation.org [140.211.169.12]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 43mT9z6cm3z9s4s for ; Sat, 26 Jan 2019 06:10:15 +1100 (AEDT) Received: from mail.linux-foundation.org (localhost [127.0.0.1]) by mail.linuxfoundation.org (Postfix) with ESMTP id 529D31AFB; Fri, 25 Jan 2019 19:10:11 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@mail.linuxfoundation.org Received: from smtp1.linuxfoundation.org (smtp1.linux-foundation.org [172.17.192.35]) by mail.linuxfoundation.org (Postfix) with ESMTPS id D40E81AF8 for ; Fri, 25 Jan 2019 19:10:09 +0000 (UTC) X-Greylist: domain auto-whitelisted by SQLgrey-1.7.6 Received: from mx0a-002c1b01.pphosted.com (mx0a-002c1b01.pphosted.com [148.163.151.68]) by smtp1.linuxfoundation.org (Postfix) with ESMTPS id 29DDE857 for ; Fri, 25 Jan 2019 19:10:06 +0000 (UTC) Received: from pps.filterd (m0127839.ppops.net [127.0.0.1]) by mx0a-002c1b01.pphosted.com (8.16.0.27/8.16.0.27) with SMTP id x0PIuqXx026739 for ; Fri, 25 Jan 2019 11:10:04 -0800 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=nutanix.com; h=from : to : cc : subject : date : message-id : content-type : content-transfer-encoding : mime-version; s=proofpoint20171006; bh=LKrHnjNP25irHORywLMdELJ7tOVoydKkr+xj/vx0Ops=; b=jaxCimsXdPIuOnhMOkoPyVMiWMTp6xsxTm0YtU+iXY9g4ltP0/K3GUqEpol+xptlECSN 6weWGs4eV5iyrXmOCKibHgvbMuDXctHSzmpUfbwtzgT1hDOjP0J4HC5m0KBSMjH+ndlt vFwrAevOq6PfLmf5eZiilwvH27+byIONwgt4lYlgf5u3KCA1vtsknLv+r7/K6/37NH9G xW90/sud8ch2FCtAxcdyAcKkhTDPHv6pDzncEArlsk5Umrba12u0JtEB7ErLATSmARGA rf7/7DSta/6hL3BSv2kQ3UUXfAZ+vXwgTOEIXJKl38hzDDNmNI5DUzGvv9xyauYJJ0/H Ag== Received: from nam02-sn1-obe.outbound.protection.outlook.com (mail-sn1nam02lp2055.outbound.protection.outlook.com [104.47.36.55]) by mx0a-002c1b01.pphosted.com with ESMTP id 2q728e4yp3-1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-GCM-SHA384 bits=256 verify=NOT) for ; Fri, 25 Jan 2019 11:10:04 -0800 Received: from MWHPR02MB2782.namprd02.prod.outlook.com (10.175.49.148) by MWHPR02MB2605.namprd02.prod.outlook.com (10.168.205.147) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.1558.20; Fri, 25 Jan 2019 19:10:01 +0000 Received: from MWHPR02MB2782.namprd02.prod.outlook.com ([fe80::183f:d287:8bae:7290]) by MWHPR02MB2782.namprd02.prod.outlook.com ([fe80::183f:d287:8bae:7290%12]) with mapi id 15.20.1558.021; Fri, 25 Jan 2019 19:10:01 +0000 From: Ted Elhourani To: "dev@openvswitch.org" Thread-Topic: [PATCH v5] Monitor Database table to manage lifecycle of IDL client. Thread-Index: AQHUtOGSA4WWvQaFCkmQ2M/J2myaTg== Date: Fri, 25 Jan 2019 19:10:01 +0000 Message-ID: <1548443386-146951-1-git-send-email-ted.elhourani@nutanix.com> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-clientproxiedby: BYAPR07CA0038.namprd07.prod.outlook.com (2603:10b6:a03:60::15) To MWHPR02MB2782.namprd02.prod.outlook.com (2603:10b6:300:107::20) x-ms-exchange-messagesentrepresentingtype: 1 x-mailer: git-send-email 2.7.4 x-originating-ip: [192.146.154.1] x-ms-publictraffictype: Email x-microsoft-exchange-diagnostics: 1; MWHPR02MB2605; 6:IxA2LBGgBhQeVH/pITC/RlIEAYpowoVJ0b0UeEr7Ez8bzKlgVlfqw1Bd7kzviq6pVdpMGFw0QnE1NTf3oWD9pNy8uVk1dMfpAYF7/2ZqKdPrfqny2WrNJuASKWS5IRW3b0nI0Qn6aRUES1mFaEvvmmeY3xZ5VGe+cym6xZ39aOhpIVp2ohTzw3/ARIhaO//UNIes9SPjOcJlSANRhYhr21rhA/yPs358M+JS87Xag5IdI/9g1t3poEs5HzPEpS7y/UDoSnEvUzoRVuLO+Ww1WVuFOc4caLj7ciYwOikkGJt+eXY8sKR6g640D2IphcLFoPs5vlcUZhbyVVwSUWdF8IzbxaXj+VOTSsZRK0kWQC3qSjiTks7BtYbzTLlwqa/Tvdu+p3ciuxvQLlVfe7Spwia+rjoHtl90XA30J4j+8ThNi7iPM+T7hHHoVpOnCCy2leka4hL2b3wD7zDwxMA8Cw==; 5:6c3MvjNdVCdZGHbO/A/2wWN9I7Gi6b7mjPTa26TJTckhLno+/rRsE+U+ciinXaaTUQF8gZ1wc3eM53ekEPCjTsbo3Lxwg0G0mq5i1BCfuuMbZqWc+xQV+bhgQSf6FBbqOlwGcBWVMOq9e8KAzxGFOMLhMyFu/KAv9c23shLmU4Jxalzy0KVVzjMOhPq7G80zzF0lDVLiKf72UJKcb4hSlw==; 7:jnoO3wj4nPjnXpjAVegVameiFqgviX0+LSSZTGBy2HB/OJlF0u0FaSrbU9hUQRF9KVb0xPX114+ozIwVUHtBsJjkHDRJKJ2qq1gaPVTg1Cm7TQh5AUwPDydui4t4zYDnfz5q+U1R/5trWC17PWjNbA== x-ms-office365-filtering-correlation-id: 215176a9-2fed-47d5-dbe5-08d682f8b489 x-microsoft-antispam: BCL:0; PCL:0; RULEID:(2390118)(7020095)(4652040)(8989299)(4534185)(4627221)(201703031133081)(201702281549075)(8990200)(5600110)(711020)(4605077)(2017052603328)(7153060)(7193020); SRVR:MWHPR02MB2605; x-ms-traffictypediagnostic: MWHPR02MB2605: x-microsoft-antispam-prvs: x-forefront-prvs: 0928072091 x-forefront-antispam-report: SFV:NSPM; SFS:(10019020)(979002)(39860400002)(366004)(136003)(346002)(376002)(396003)(55674003)(199004)(189003)(7736002)(305945005)(106356001)(186003)(102836004)(105586002)(2351001)(386003)(26005)(6506007)(99286004)(53946003)(52116002)(8676002)(81156014)(1730700003)(81166006)(50226002)(8936002)(71200400001)(30864003)(256004)(44832011)(71190400001)(14444005)(486006)(2616005)(476003)(6512007)(2501003)(6116002)(3846002)(316002)(478600001)(5640700003)(66066001)(97736004)(14454004)(36756003)(6916009)(25786009)(6486002)(68736007)(53936002)(6436002)(2906002)(4326008)(107886003)(86362001)(64030200001)(959014)(579004)(569006); DIR:OUT; SFP:1102; SCL:1; SRVR:MWHPR02MB2605; H:MWHPR02MB2782.namprd02.prod.outlook.com; FPR:; SPF:None; LANG:en; PTR:InfoNoRecords; A:1; MX:1; received-spf: None (protection.outlook.com: nutanix.com does not designate permitted sender hosts) x-ms-exchange-senderadcheck: 1 x-microsoft-antispam-message-info: GqFBSKT3ReWnDHL1UGhlXRpSpPzC5VkewFV1y/olPk87T1dGUVJIpRhFv5o/QmlEAQNJOoomM6IzfrLKtixeqLuIVqGLCq9DxQFL+vC7Yq7mbNCPSUwdepYA2TqsWUjhqo5nlqdsXXAb/XIUl4WGUX5iiFgVGJM3U22ROUk0vYqr19FmsqFoFctKvfZOq3C6LJ0KhumA8sursyUX8LPC2HBm6zACAST7kPDwGHZVpFUsjwqEwq1CqBv0g5h617S7gDpm46mjLq7lLBK/p4vpriJ2jsvKUnl48trYV+EVYut90OFH7euR898lJQeBWQUojZt6QLH2G839dGP26rSSu1CvT6MGy7IaOPK8feXgtE+BABjuEK8r6bApsUs6U+cDQFMsRYWDcC06XHltYVCdguzBYXevUARgK80nKsJGlAc= MIME-Version: 1.0 X-OriginatorOrg: nutanix.com X-MS-Exchange-CrossTenant-Network-Message-Id: 215176a9-2fed-47d5-dbe5-08d682f8b489 X-MS-Exchange-CrossTenant-originalarrivaltime: 25 Jan 2019 19:10:00.9946 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: bb047546-786f-4de1-bd75-24e5b6f79043 X-MS-Exchange-Transport-CrossTenantHeadersStamped: MWHPR02MB2605 X-Proofpoint-Virus-Version: vendor=fsecure engine=2.50.10434:, , definitions=2019-01-25_12:, , signatures=0 X-Proofpoint-Spam-Details: rule=outbound_notspam policy=outbound score=0 priorityscore=1501 malwarescore=0 suspectscore=0 phishscore=0 bulkscore=0 spamscore=0 clxscore=1015 lowpriorityscore=0 mlxscore=0 impostorscore=0 mlxlogscore=999 adultscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.0.1-1810050000 definitions=main-1901250148 X-Spam-Status: No, score=-2.7 required=5.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID, DKIM_VALID_AU, RCVD_IN_DNSWL_LOW autolearn=ham version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on smtp1.linux-foundation.org Subject: [ovs-dev] [PATCH v5] Monitor Database table to manage lifecycle of IDL client. X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.12 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: ovs-dev-bounces@openvswitch.org Errors-To: ovs-dev-bounces@openvswitch.org The Python IDL implementation supports ovsdb cluster connections. This patch is a follow up to commit 31e434fc98, it adds the option of connecting to the leader (the default) in the Raft-based cluster. It mimics the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. The _Server database schema is first requested, then a monitor of the Database table in the _Server Database. Method __check_server_db verifies the eligibility of the server. If the attempt to obtain a monitor of the _Server database fails and a cluster id was not provided this implementation proceeds to request the data monitor. If a cluster id was provided via the set_cluster_id method then the connection is aborted and a connection to a different node is instead attempted, until a valid cluster node is found. Thus, when supplied, cluster id is interpreted as the intention to only allow connections to a clustered database. If not supplied, connections to standalone nodes, or nodes that do not have the _Server database are allowed. change_seqno is not incremented in the case of Database table updates. Signed-off-by: Ted Elhourani Acked-by: Numan Siddique --- v4 -> v5 -------- * Increase test timeout. * Spell out list of files to cat for shell compatibility. v3 -> v4 -------- * export -f is not compatible with FreeBSD, modify tests to avoid shell function export. * Re-add a line that was removed by mistake. v2 -> v3 -------- * Add 2 tests, treat cluster_id as a string, mv arg till end, pep8 fixes. v1 -> v2 -------- * Modify for backward compatibility with _Server-less ovsdb servers. python/ovs/db/idl.py | 219 ++++++++++++++++++++++++++++++++++++++++++++---- python/ovs/reconnect.py | 3 + tests/ovsdb-idl.at | 129 +++++++++++++++++++++------- tests/test-ovsdb.py | 67 ++++++++++++++- 4 files changed, 372 insertions(+), 46 deletions(-) diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index 250e897..84af978 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -38,6 +38,8 @@ ROW_DELETE = "delete" OVSDB_UPDATE = 0 OVSDB_UPDATE2 = 1 +CLUSTERED = "clustered" + class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -92,10 +94,13 @@ class Idl(object): """ IDL_S_INITIAL = 0 - IDL_S_MONITOR_REQUESTED = 1 - IDL_S_MONITOR_COND_REQUESTED = 2 + IDL_S_SERVER_SCHEMA_REQUESTED = 1 + IDL_S_SERVER_MONITOR_REQUESTED = 2 + IDL_S_DATA_MONITOR_REQUESTED = 3 + IDL_S_DATA_MONITOR_COND_REQUESTED = 4 - def __init__(self, remote, schema_helper, probe_interval=None): + def __init__(self, remote, schema_helper, probe_interval=None, + leader_only=True): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to ovs.jsonrpc.session.open(). The connection will maintain an in-memory @@ -119,6 +124,9 @@ class Idl(object): The IDL uses and modifies 'schema' directly. + If 'leader_only' is set to True (default value) the IDL will only + monitor and transact with the leader of the cluster. + If "probe_interval" is zero it disables the connection keepalive feature. If non-zero the value will be forced to at least 1000 milliseconds. If None it will just use the default value in OVS. @@ -137,6 +145,20 @@ class Idl(object): self._last_seqno = None self.change_seqno = 0 self.uuid = uuid.uuid1() + + # Server monitor. + self._server_schema_request_id = None + self._server_monitor_request_id = None + self._db_change_aware_request_id = None + self._server_db_name = '_Server' + self._server_db_table = 'Database' + self.server_tables = None + self._server_db = None + self.server_monitor_uuid = uuid.uuid1() + self.leader_only = leader_only + self.cluster_id = None + self._min_index = 0 + self.state = self.IDL_S_INITIAL # Database locking. @@ -172,6 +194,12 @@ class Idl(object): remotes.append(r) return remotes + def set_cluster_id(self, cluster_id): + """Set the id of the cluster that this idl must connect to.""" + self.cluster_id = cluster_id + if self.state != self.IDL_S_INITIAL: + self.force_reconnect() + def index_create(self, table, name): """Create a named multi-column index on a table""" return self.tables[table].rows.index_create(name) @@ -222,7 +250,7 @@ class Idl(object): if seqno != self._last_seqno: self._last_seqno = seqno self.__txn_abort_all() - self.__send_monitor_request() + self.__send_server_schema_request() if self.lock_name: self.__send_lock_request() break @@ -230,6 +258,7 @@ class Idl(object): msg = self._session.recv() if msg is None: break + if (msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.method == "update2" and len(msg.params) == 2): @@ -239,7 +268,15 @@ class Idl(object): and msg.method == "update" and len(msg.params) == 2): # Database contents changed. - self.__parse_update(msg.params[1], OVSDB_UPDATE) + if msg.params[0] == str(self.server_monitor_uuid): + self.__parse_update(msg.params[1], OVSDB_UPDATE, + tables=self.server_tables) + self.change_seqno = initial_change_seqno + if not self.__check_server_db(): + self.force_reconnect() + break + else: + self.__parse_update(msg.params[1], OVSDB_UPDATE) elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._monitor_request_id is not None and self._monitor_request_id == msg.id): @@ -248,10 +285,10 @@ class Idl(object): self.change_seqno += 1 self._monitor_request_id = None self.__clear() - if self.state == self.IDL_S_MONITOR_COND_REQUESTED: + if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: self.__parse_update(msg.result, OVSDB_UPDATE2) else: - assert self.state == self.IDL_S_MONITOR_REQUESTED + assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED self.__parse_update(msg.result, OVSDB_UPDATE) except error.Error as e: @@ -259,6 +296,56 @@ class Idl(object): % (self._session.get_name(), e)) self.__error() elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._server_schema_request_id is not None + and self._server_schema_request_id == msg.id): + # Reply to our "get_schema" of _Server request. + try: + self._server_schema_request_id = None + sh = SchemaHelper(None, msg.result) + sh.register_table(self._server_db_table) + schema = sh.get_idl_schema() + self._server_db = schema + self.server_tables = schema.tables + self.__send_server_monitor_request() + except error.Error as e: + vlog.err("%s: error receiving server schema: %s" + % (self._session.get_name(), e)) + if self.cluster_id: + self.__error() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._server_monitor_request_id is not None + and self._server_monitor_request_id == msg.id): + # Reply to our "monitor" of _Server request. + try: + self._server_monitor_request_id = None + self.__parse_update(msg.result, OVSDB_UPDATE, + tables=self.server_tables) + self.change_seqno = initial_change_seqno + if self.__check_server_db(): + self.__send_monitor_request() + self.__send_db_change_aware() + else: + self.force_reconnect() + break + except error.Error as e: + vlog.err("%s: parse error in received schema: %s" + % (self._session.get_name(), e)) + if self.cluster_id: + self.__error() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._db_change_aware_request_id is not None + and self._db_change_aware_request_id == msg.id): + # Reply to us notifying the server of our change awarness. + self._db_change_aware_request_id = None + elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._lock_request_id is not None and self._lock_request_id == msg.id): # Reply to our "lock" request. @@ -275,10 +362,20 @@ class Idl(object): # Reply to our echo request. Ignore it. pass elif (msg.type == ovs.jsonrpc.Message.T_ERROR and - self.state == self.IDL_S_MONITOR_COND_REQUESTED and + self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and self._monitor_request_id == msg.id): if msg.error == "unknown method": self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self._server_schema_request_id is not None and + self._server_schema_request_id == msg.id): + self._server_schema_request_id = None + if self.cluster_id: + self.force_reconnect() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, ovs.jsonrpc.Message.T_REPLY) and self.__txn_process_reply(msg)): @@ -342,6 +439,9 @@ class Idl(object): In the meantime, the contents of the IDL will not change.""" self._session.force_reconnect() + def session_name(self): + return self._session.get_name() + def set_lock(self, lock_name): """If 'lock_name' is not None, configures the IDL to obtain the named lock from the database server and to avoid modifying the database when @@ -440,12 +540,19 @@ class Idl(object): if not new_has_lock: self.is_lock_contended = True + def __send_db_change_aware(self): + msg = ovs.jsonrpc.Message.create_request("set_db_change_aware", + [True]) + self._db_change_aware_request_id = msg.id + self._session.send(msg) + def __send_monitor_request(self): - if self.state == self.IDL_S_INITIAL: - self.state = self.IDL_S_MONITOR_COND_REQUESTED + if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, + self.IDL_S_INITIAL]): + self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED method = "monitor_cond" else: - self.state = self.IDL_S_MONITOR_REQUESTED + self.state = self.IDL_S_DATA_MONITOR_REQUESTED method = "monitor" monitor_requests = {} @@ -467,20 +574,50 @@ class Idl(object): self._monitor_request_id = msg.id self._session.send(msg) - def __parse_update(self, update, version): + def __send_server_schema_request(self): + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED + msg = ovs.jsonrpc.Message.create_request( + "get_schema", [self._server_db_name, str(self.uuid)]) + self._server_schema_request_id = msg.id + self._session.send(msg) + + def __send_server_monitor_request(self): + self.state = self.IDL_S_SERVER_MONITOR_REQUESTED + monitor_requests = {} + table = self.server_tables[self._server_db_table] + columns = [column for column in six.iterkeys(table.columns)] + for column in six.itervalues(table.columns): + if not hasattr(column, 'alert'): + column.alert = True + table.rows = custom_index.IndexedRows(table) + table.need_table = False + table.idl = self + monitor_request = {"columns": columns} + monitor_requests[table.name] = [monitor_request] + msg = ovs.jsonrpc.Message.create_request( + 'monitor', [self._server_db.name, + str(self.server_monitor_uuid), + monitor_requests]) + self._server_monitor_request_id = msg.id + self._session.send(msg) + + def __parse_update(self, update, version, tables=None): try: - self.__do_parse_update(update, version) + if not tables: + self.__do_parse_update(update, version, self.tables) + else: + self.__do_parse_update(update, version, tables) except error.Error as e: vlog.err("%s: error parsing update: %s" % (self._session.get_name(), e)) - def __do_parse_update(self, table_updates, version): + def __do_parse_update(self, table_updates, version, tables): if not isinstance(table_updates, dict): raise error.Error(" is not an object", table_updates) for table_name, table_update in six.iteritems(table_updates): - table = self.tables.get(table_name) + table = tables.get(table_name) if not table: raise error.Error(' includes unknown ' 'table "%s"' % table_name) @@ -605,6 +742,58 @@ class Idl(object): self.notify(op, row, Row.from_json(self, table, uuid, old)) return changed + def __check_server_db(self): + """Returns True if this is a valid server database, False otherwise.""" + session_name = self.session_name() + + if self._server_db_table not in self.server_tables: + vlog.info("%s: server does not have %s table in its %s database" + % (session_name, self._server_db_table, + self._server_db_name)) + return False + + rows = self.server_tables[self._server_db_table].rows + + database = None + for row in six.itervalues(rows): + if self.cluster_id: + if self.cluster_id in \ + map(lambda x: str(x)[:4], row.cid): + database = row + break + elif row.name == self._db.name: + database = row + break + + if not database: + vlog.info("%s: server does not have %s database" + % (session_name, self._db.name)) + return False + + if (database.model == CLUSTERED and + self._session.get_num_of_remotes() > 1): + if not database.schema: + vlog.info('%s: clustered database server has not yet joined ' + 'cluster; trying another server' % session_name) + return False + if not database.connected: + vlog.info('%s: clustered database server is disconnected ' + 'from cluster; trying another server' % session_name) + return False + if (self.leader_only and + not database.leader): + vlog.info('%s: clustered database server is not cluster ' + 'leader; trying another server' % session_name) + return False + if database.index: + if database.index[0] < self._min_index: + vlog.warn('%s: clustered database server has stale data; ' + 'trying another server' % session_name) + return False + self._min_index = database.index[0] + + return True + def __column_name(self, column): if column.type.key.type == ovs.db.types.UuidType: return ovs.ovsuuid.to_json(column.type.key.type.default) diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py index 34cc769..afbe445 100644 --- a/python/ovs/reconnect.py +++ b/python/ovs/reconnect.py @@ -344,6 +344,9 @@ class Reconnect(object): else: self.info_level("%s: error listening for connections" % self.name) + elif self.state == Reconnect.Reconnect: + self.info_level("%s: connection closed by client" + % self.name) elif self.backoff < self.max_backoff: if self.passive: type_ = "listen" diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at index 8981b5e..7c937f7 100644 --- a/tests/ovsdb-idl.at +++ b/tests/ovsdb-idl.at @@ -11,7 +11,42 @@ ovsdb_start_idltest () { ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $? on_exit 'kill `cat ovsdb-server.pid`' } -]) + +# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA] +# +# Creates a database using SCHEMA (default: idltest.ovsschema) and +# starts a database cluster listening on punix:socket and REMOTE (if +# specified). +ovsdb_cluster_start_idltest () { + local n=$1 + ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $? + cid=`ovsdb-tool db-cid s1.db` + schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema` + for i in `seq 2 $n`; do + ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $? + done + for i in `seq $n`; do + ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $? + done + on_exit 'kill `cat s*.pid`' +} + +# ovsdb_cluster_leader [REMOTES] [DATABASE] +# +# Returns the leader of the DATABASE cluster. +ovsdb_cluster_leader () { + remotes=$(echo $1 | tr "," "\n") + for remote in $remotes; do + ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true + if [[ $? == 0 ]]; then + port=$(echo $remote | cut -d':' -f 3) + log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./) + pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' ) + echo "${remote}|${pid}" + return + fi + done +}]) # OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS], # [FILTER]) @@ -1466,40 +1501,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify], "where": [["i", "==", 0]]}]' \ 'reconnect']], [[000: empty -001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} -002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None -002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None -002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +000: event:create, row={uuid=<0>}, updates=None +000: event:create, row={uuid=<1>}, updates=None +001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]} +002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None +002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None +002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 003: {"error":null,"result":[{"count":2}]} -004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>} -004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>} +004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 005: {"error":null,"result":[{"count":2}]} -006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>} -006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>} -006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> -007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} -008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None -008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>} +006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>} +006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> +007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]} +008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None +008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 009: {"error":null,"result":[{"count":2}]} -010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>} -010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>} -010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> -010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>} +010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>} +010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> +010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 011: {"error":null,"result":[{"count":1}]} -012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None -012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None +012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 013: reconnect -014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None -014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None -014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> -014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None +014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None +014: event:create, row={uuid=<0>}, updates=None +014: event:create, row={uuid=<1>}, updates=None +014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8> +014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2> 015: done ]]) @@ -1853,3 +1892,33 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY], CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2]) CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3]) + +# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp +# with multiple remotes to assert the idl connects to the leader of the Raft cluster +m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN], + [AT_SETUP([$1]) + AT_SKIP_IF([test $7 = no]) + AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket]) + m4_define([LPBK],[127.0.0.1]) + AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK]) + PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1]) + PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2]) + PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3]) + remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3 + pids=$(cat s2.pid s3.pid s1.pid | tr '\n' ',') + echo $pids + AT_CHECK([$8 $srcdir/test-ovsdb.py -t30 idl-cluster $srcdir/idltest.ovsschema $remotes $pids $3], + [0], [stdout], [ignore]) + remote=$(ovsdb_cluster_leader $remotes "idltest") + leader=$(echo $remote | cut -d'|' -f 1) + AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore]) + AT_CLEANUP]) + +m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY], + [OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6], + [$HAVE_PYTHON], [$PYTHON]) + OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6], + [$HAVE_PYTHON3], [$PYTHON3])]) + +OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote']) +OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['remote' '+remotestop' 'remote']) diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py index 1d7c023..422321a 100644 --- a/tests/test-ovsdb.py +++ b/tests/test-ovsdb.py @@ -758,6 +758,70 @@ def do_idl_passive(schema_file, remote, *commands): print("%03d: done" % step) +def do_idl_cluster(schema_file, remote, pid, *commands): + schema_helper = ovs.db.idl.SchemaHelper(schema_file) + + if remote.startswith("ssl:"): + if len(commands) < 3: + sys.stderr.write("SSL connection requires private key, " + "certificate for private key, and peer CA " + "certificate as arguments\n") + sys.exit(1) + ovs.stream.Stream.ssl_set_private_key_file(commands[0]) + ovs.stream.Stream.ssl_set_certificate_file(commands[1]) + ovs.stream.Stream.ssl_set_ca_cert_file(commands[2]) + commands = commands[3:] + + schema_helper.register_all() + idl = ovs.db.idl.Idl(remote, schema_helper) + + step = 0 + seqno = 0 + commands = list(commands) + for command in commands: + if command.startswith("+"): + # The previous transaction didn't change anything. + command = command[1:] + else: + # Wait for update. + while idl.change_seqno == seqno and not idl.run(): + poller = ovs.poller.Poller() + idl.wait(poller) + poller.block() + step += 1 + + seqno = idl.change_seqno + + if command == "reconnect": + print("%03d: reconnect" % step) + sys.stdout.flush() + step += 1 + idl.force_reconnect() + elif command == "remote": + print("%03d: %s" % (step, idl.session_name())) + sys.stdout.flush() + step += 1 + elif command == "remotestop": + r = idl.session_name() + remotes = remote.split(',') + i = remotes.index(r) + pids = pid.split(',') + command = None + try: + command = "kill %s" % pids[i] + except ValueError as error: + sys.stderr.write("Cannot find pid of remote: %s\n" + % os.strerror(error)) + sys.exit(1) + os.popen(command) + print("%03d: stop %s" % (step, pids[i])) + sys.stdout.flush() + step += 1 + + idl.close() + print("%03d: done" % step) + + def usage(): print("""\ %(program_name)s: test utility for Open vSwitch database Python bindings @@ -861,7 +925,8 @@ def main(argv): "parse-table": (do_parse_table, (2, 3)), "parse-schema": (do_parse_schema, 1), "idl": (do_idl, (2,)), - "idl_passive": (do_idl_passive, (2,))} + "idl_passive": (do_idl_passive, (2,)), + "idl-cluster": (do_idl_cluster, (3,))} command_name = args[0] args = args[1:]