From patchwork Thu Jan 31 08:15:50 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Masashi Honma X-Patchwork-Id: 1034087 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=lists.infradead.org (client-ip=2607:7c80:54:e::133; helo=bombadil.infradead.org; envelope-from=hostap-bounces+incoming=patchwork.ozlabs.org@lists.infradead.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=pass (2048-bit key; unprotected) header.d=lists.infradead.org header.i=@lists.infradead.org header.b="Xv7tSekS"; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="CLTWF8yF"; dkim-atps=neutral Received: from bombadil.infradead.org (bombadil.infradead.org [IPv6:2607:7c80:54:e::133]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 43qwG0564vz9sDB for ; Thu, 31 Jan 2019 20:40:40 +1100 (AEDT) DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=lists.infradead.org; s=bombadil.20170209; h=Sender: Content-Transfer-Encoding:Content-Type:MIME-Version:Cc:List-Subscribe: List-Help:List-Post:List-Archive:List-Unsubscribe:List-Id:References: In-Reply-To:Message-Id:Date:Subject:To:From:Reply-To:Content-ID: Content-Description:Resent-Date:Resent-From:Resent-Sender:Resent-To:Resent-Cc :Resent-Message-ID:List-Owner; bh=Iy71cFkABKpvi6mk1uW67H5N6eWJGO8lgInmVBT+FH0=; b=Xv7tSekSVh3He3FIoZFnx4icqc AAKM1fGqDk5SoNo0aMkhu2X5pVqOIMoXm0Xb1/ajc2WGa40oAG1WeYxHCycyCnBy9wiN9HY+sNHJ0 zpgWQiMIn9Y7uxgY9D/7HvygcBiYtjm5IqZTyVLSif0oms205Rql9zYWecfUE9auCio6Wvl7Lvq8t MKqIba5V0ZrcKw5NNuC0vfFU5l3L+vgbWJpWZxvQmljpl2/ucoiUtlEn+w8oY6SDDgNWc05pJ27TN fbcG2HoPWcArk0qEma1XjYbfrG4ckjZLzc3nUupa1zFxRn0XE5ouERxe0E+Y3ZNavQV/8gBKXOPae UObyLUhw==; Received: from localhost ([127.0.0.1] helo=bombadil.infradead.org) by bombadil.infradead.org with esmtp (Exim 4.90_1 #2 (Red Hat Linux)) id 1gp8pf-0006g5-Vm; Thu, 31 Jan 2019 09:40:28 +0000 Received: from mail-pl1-x62e.google.com ([2607:f8b0:4864:20::62e]) by bombadil.infradead.org with esmtps (Exim 4.90_1 #2 (Red Hat Linux)) id 1gp7Wt-0004NJ-RI for hostap@lists.infradead.org; Thu, 31 Jan 2019 08:18:20 +0000 Received: by mail-pl1-x62e.google.com with SMTP id z23so1160370plo.0 for ; Thu, 31 Jan 2019 00:16:59 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=aBBNL3vAwrBnbhcUs+uKBllsbcAjObMscKNF8UDMiTM=; b=CLTWF8yFZV5hONFtkz2+YFQg6MoYTjYgxmZDIidVJf7R1AuAqEcvwrQU+FhY0D043g EcWa1UuI+Mur+pyhY1RkUkGCcyKZOR8Ahu4bJtRyz1lXnWyvnd3wezi/JOU3PpOidWCI scoNQONXAzasMCQbOIC+tXjCbJD0wvbV2SsHsqUZS2rmqd/PiMlbmOKexhdPc8vHHR3S uiV5DqcbSe3ULVjoqtqi45Y0pFl7eD69GuzTqWakkbXJyH1cagGRMEniC0KavE/aedkN ISkjpMKl1qFpEGvC/sAoEUL+o15kZHmaL958ThE2/SP7R1c/ckf7OBnEvVXoBJMroIzL 48UQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=aBBNL3vAwrBnbhcUs+uKBllsbcAjObMscKNF8UDMiTM=; b=snjR1FLCTbbDU67qo/LjzBRtdynnPD0Zv+R2hopSGeOJwipOv/Pd6NSx1XrecakqSh 2e5F4TWnAQJ1ZgHuEYexHaBgvugwPsfH1FV0ME+qTX6mEarFhjmH6A2QFNJCAdLTWqTM BX37h5zoC3NJ9A5pa6fOO5YELEj/4EG7pDsmlbn2i0SiwgOzF6NBXVH50iTkc8BZrrVO wALHOGoqZQxyN5aWsgNF+V2L5uXdJX44Tv/DUO7Wtba1kC8ElpLcmFzkz5dCVmgNqT1A rv/JLJlhTRKqKmIhVXijtLpKsK5NR70E4JG3d8kU7mImNI13tpzbu0HJ3/tQ9ONGa/rZ aWkA== X-Gm-Message-State: AJcUukdAx82NC7ozCh9I4MRHJAZsB0m3eoGQJd8HjCxC84xSygxKjEKN YQZVN0Z/BuLkD5KkLYDzWzd4EiT7 X-Google-Smtp-Source: ALg8bN4gLuJuihKm4PG4MkSR5xxZBbQrM8t9FaWSN21TnBrG/+2otUMX2czUhq9u1HwRXFfxeTNHhw== X-Received: by 2002:a17:902:70c6:: with SMTP id l6mr28072187plt.30.1548922618075; Thu, 31 Jan 2019 00:16:58 -0800 (PST) Received: from gtx740.flets-east.jp ([2409:11:321:2100:78:f8b9:a649:4ae4]) by smtp.gmail.com with ESMTPSA id u137sm5495185pfc.140.2019.01.31.00.16.56 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 31 Jan 2019 00:16:57 -0800 (PST) From: Masashi Honma To: hostap@lists.infradead.org Subject: [PATCH v2 11/38] tests: Modify dbus test sets for python3 Date: Thu, 31 Jan 2019 17:15:50 +0900 Message-Id: <20190131081617.5005-12-masashi.honma@gmail.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20190131081617.5005-1-masashi.honma@gmail.com> References: <20190131081617.5005-1-masashi.honma@gmail.com> X-CRM114-Version: 20100106-BlameMichelson ( TRE 0.8.0 (BSD) ) MR-646709E3 X-CRM114-CacheID: sfid-20190131_001700_809277_6AD9217F X-CRM114-Status: GOOD ( 10.24 ) X-Spam-Score: -0.2 (/) X-Spam-Report: SpamAssassin version 3.4.2 on bombadil.infradead.org summary: Content analysis details: (-0.2 points) pts rule name description ---- ---------------------- -------------------------------------------------- -0.0 RCVD_IN_DNSWL_NONE RBL: Sender listed at http://www.dnswl.org/, no trust [2607:f8b0:4864:20:0:0:0:62e listed in] [list.dnswl.org] 0.0 FREEMAIL_FROM Sender email is commonly abused enduser mail provider (masashi.honma[at]gmail.com) -0.0 SPF_PASS SPF: sender matches SPF record 0.1 DKIM_SIGNED Message has a DKIM or DK signature, not necessarily valid -0.1 DKIM_VALID_EF Message has a valid DKIM or DK signature from envelope-from domain -0.1 DKIM_VALID_AU Message has a valid DKIM or DK signature from author's domain -0.1 DKIM_VALID Message has at least one valid DKIM or DK signature X-Mailman-Approved-At: Thu, 31 Jan 2019 01:40:22 -0800 X-BeenThere: hostap@lists.infradead.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: Masashi Honma MIME-Version: 1.0 Sender: "Hostap" Errors-To: hostap-bounces+incoming=patchwork.ozlabs.org@lists.infradead.org Signed-off-by: Masashi Honma --- tests/hwsim/test_dbus.py | 377 +++++++++++++++++++++------------------ 1 file changed, 203 insertions(+), 174 deletions(-) diff --git a/tests/hwsim/test_dbus.py b/tests/hwsim/test_dbus.py index e0703fae0..a1be67514 100644 --- a/tests/hwsim/test_dbus.py +++ b/tests/hwsim/test_dbus.py @@ -9,9 +9,14 @@ import logging logger = logging.getLogger() import subprocess import time +import sys + +if sys.version_info[0] > 2: + from gi.repository import GObject +else: + import gobject as GObject try: - import gobject import dbus dbus_imported = True except ImportError: @@ -56,7 +61,7 @@ def prepare_dbus(dev): class TestDbus(object): def __init__(self, bus): - self.loop = gobject.MainLoop() + self.loop = GObject.MainLoop() self.signals = [] self.bus = bus @@ -259,14 +264,14 @@ def test_dbus_properties(dev, apdev): if val != res: raise Exception("WFDIEs value changed") try: - dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00')) + dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00')) raise Exception("Invalid WFDIEs value accepted") except dbus.exceptions.DBusException as e: if "InvalidArgs" not in str(e): raise Exception("Unexpected error message: " + str(e)) - dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('')) + dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(bytes())) dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) - dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('')) + dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(bytes())) res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) if len(res) != 0: raise Exception("WFDIEs not cleared properly") @@ -314,7 +319,7 @@ def test_dbus_properties(dev, apdev): try: wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs", - dbus.ByteArray('', variant_level=2), + dbus.ByteArray(bytes(), variant_level=2), dbus_interface=dbus.PROPERTIES_IFACE) raise Exception("Invalid Set accepted") except dbus.exceptions.DBusException as e: @@ -443,7 +448,7 @@ def _test_dbus_get_set_wps(dev, apdev): if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: raise Exception("DeviceType mismatch after Set") - val2 = '\x01\x02\x03\x04\x05\x06\x07\x08' + val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08' if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2), dbus_interface=dbus.PROPERTIES_IFACE) val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", @@ -460,8 +465,8 @@ def _test_dbus_get_set_wps(dev, apdev): self.sets_done = False def __enter__(self): - gobject.timeout_add(1, self.run_sets) - gobject.timeout_add(1000, self.timeout) + GObject.timeout_add(1, self.run_sets) + GObject.timeout_add(1000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS, "PropertiesChanged") self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE, @@ -471,7 +476,7 @@ def _test_dbus_get_set_wps(dev, apdev): def propertiesChanged(self, properties): logger.debug("PropertiesChanged: " + str(properties)) - if properties.has_key("ProcessCredentials"): + if "ProcessCredentials" in properties: self.signal_received_deprecated = True if self.sets_done and self.signal_received: self.loop.quit() @@ -481,7 +486,7 @@ def _test_dbus_get_set_wps(dev, apdev): logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) if interface_name != WPAS_DBUS_IFACE_WPS: return - if changed_properties.has_key("ProcessCredentials"): + if "ProcessCredentials" in changed_properties: self.signal_received = True if self.sets_done and self.signal_received_deprecated: self.loop.quit() @@ -527,11 +532,11 @@ def test_dbus_wps_invalid(dev, apdev): 'Bssid': '02:33:44:55:66:77'}, {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123}, {'Role': 'enrollee', 'Type': 'pbc', - 'Bssid': dbus.ByteArray('12345')}, + 'Bssid': dbus.ByteArray(b'12345')}, {'Role': 'enrollee', 'Type': 'pbc', 'P2PDeviceAddress': 12345}, {'Role': 'enrollee', 'Type': 'pbc', - 'P2PDeviceAddress': dbus.ByteArray('12345')}, + 'P2PDeviceAddress': dbus.ByteArray(b'12345')}, {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ] for args in failures: try: @@ -640,8 +645,8 @@ def _test_dbus_wps_pbc(dev, apdev): self.wps = wps def __enter__(self): - gobject.timeout_add(1, self.start_pbc) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.start_pbc) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, "Credentials") @@ -700,8 +705,8 @@ def test_dbus_wps_pbc_overlap(dev, apdev): self.wps = wps def __enter__(self): - gobject.timeout_add(1, self.start_pbc) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.start_pbc) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") self.loop.run() return self @@ -753,8 +758,8 @@ def _test_dbus_wps_pin(dev, apdev): self.credentials_received = False def __enter__(self): - gobject.timeout_add(1, self.start_pin) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.start_pin) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, "Credentials") @@ -776,7 +781,7 @@ def _test_dbus_wps_pin(dev, apdev): def start_pin(self, *args): logger.debug("start_pin") - bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex')) + bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode())) wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 'Bssid': bssid_ay}) return False @@ -813,8 +818,8 @@ def _test_dbus_wps_pin2(dev, apdev): self.failed = False def __enter__(self): - gobject.timeout_add(1, self.start_pin) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.start_pin) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, "Credentials") @@ -836,7 +841,7 @@ def _test_dbus_wps_pin2(dev, apdev): def start_pin(self, *args): logger.debug("start_pin") - bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex')) + bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode())) res = wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Bssid': bssid_ay}) pin = res['Pin'] @@ -876,8 +881,8 @@ def _test_dbus_wps_pin_m2d(dev, apdev): self.credentials_received = False def __enter__(self): - gobject.timeout_add(1, self.start_pin) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.start_pin) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, "Credentials") @@ -902,7 +907,7 @@ def _test_dbus_wps_pin_m2d(dev, apdev): def start_pin(self, *args): logger.debug("start_pin") - bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex')) + bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode())) wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 'Bssid': bssid_ay}) return False @@ -939,8 +944,8 @@ def _test_dbus_wps_reg(dev, apdev): self.credentials_received = False def __enter__(self): - gobject.timeout_add(100, self.start_reg) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(100, self.start_reg) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, "Credentials") @@ -957,7 +962,7 @@ def _test_dbus_wps_reg(dev, apdev): def start_reg(self, *args): logger.debug("start_reg") - bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex')) + bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode())) wps.Start({'Role': 'registrar', 'Type': 'pin', 'Pin': '12345670', 'Bssid': bssid_ay}) return False @@ -981,7 +986,7 @@ def test_dbus_wps_cancel(dev, apdev): wps.Cancel() dev[0].scan_for_bss(bssid, freq="2412") - bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex')) + bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode())) wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 'Bssid': bssid_ay}) wps.Cancel() @@ -999,18 +1004,18 @@ def test_dbus_scan_invalid(dev, apdev): ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"), ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"), ({'Type': 'active', - 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"), - dbus.ByteArray("3"), dbus.ByteArray("4"), - dbus.ByteArray("5"), dbus.ByteArray("6"), - dbus.ByteArray("7"), dbus.ByteArray("8"), - dbus.ByteArray("9"), dbus.ByteArray("10"), - dbus.ByteArray("11"), dbus.ByteArray("12"), - dbus.ByteArray("13"), dbus.ByteArray("14"), - dbus.ByteArray("15"), dbus.ByteArray("16"), - dbus.ByteArray("17") ]}, + 'SSIDs': [ dbus.ByteArray(b"1"), dbus.ByteArray(b"2"), + dbus.ByteArray(b"3"), dbus.ByteArray(b"4"), + dbus.ByteArray(b"5"), dbus.ByteArray(b"6"), + dbus.ByteArray(b"7"), dbus.ByteArray(b"8"), + dbus.ByteArray(b"9"), dbus.ByteArray(b"10"), + dbus.ByteArray(b"11"), dbus.ByteArray(b"12"), + dbus.ByteArray(b"13"), dbus.ByteArray(b"14"), + dbus.ByteArray(b"15"), dbus.ByteArray(b"16"), + dbus.ByteArray(b"17") ]}, "InvalidArgs"), ({'Type': 'active', - 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]}, + 'SSIDs': [ dbus.ByteArray(b"1234567890abcdef1234567890abcdef1") ]}, "InvalidArgs"), ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"), ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"), @@ -1023,9 +1028,9 @@ def test_dbus_scan_invalid(dev, apdev): 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] }, "InvalidArgs"), ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"), - ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]}, + ({'Type': 'passive', 'IEs': [ dbus.ByteArray(b"\xdd\x00") ]}, "InvalidArgs"), - ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]}, + ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray(b"foo") ]}, "InvalidArgs")] for (t,err) in tests: try: @@ -1056,14 +1061,14 @@ def test_dbus_scan_oom(dev, apdev): "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan", "Scan"): iface.Scan({ 'Type': 'active', - 'IEs': [ dbus.ByteArray("\xdd\x00") ], + 'IEs': [ dbus.ByteArray(b"\xdd\x00") ], 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] }) with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan", "Scan"): iface.Scan({ 'Type': 'active', - 'SSIDs': [ dbus.ByteArray("open"), + 'SSIDs': [ dbus.ByteArray(b"open"), dbus.ByteArray() ], 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] }) @@ -1082,8 +1087,8 @@ def test_dbus_scan(dev, apdev): self.fail_reason = None def __enter__(self): - gobject.timeout_add(1, self.run_scan) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_scan) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded") self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved") @@ -1120,9 +1125,9 @@ def test_dbus_scan(dev, apdev): def run_scan(self, *args): logger.debug("run_scan") iface.Scan({'Type': 'active', - 'SSIDs': [ dbus.ByteArray("open"), + 'SSIDs': [ dbus.ByteArray(b"open"), dbus.ByteArray() ], - 'IEs': [ dbus.ByteArray("\xdd\x00"), + 'IEs': [ dbus.ByteArray(b"\xdd\x00"), dbus.ByteArray() ], 'AllowRoam': False, 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) @@ -1211,8 +1216,8 @@ def test_dbus_connect(dev, apdev): self.state = 0 def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, "NetworkRemoved") @@ -1311,8 +1316,8 @@ def test_dbus_connect_psk_mem(dev, apdev): self.connected = False def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, "PropertiesChanged") self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, @@ -1371,8 +1376,8 @@ def test_dbus_connect_oom(dev, apdev): self.state = 0 def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(1500, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(1500, self.timeout) self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, "NetworkRemoved") @@ -1530,8 +1535,8 @@ def test_dbus_connect_eap(dev, apdev): self.state = 0 def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, "PropertiesChanged") self.add_signal(self.certification, WPAS_DBUS_IFACE, @@ -1793,7 +1798,7 @@ def test_dbus_network_oom(dev, apdev): tests = [ (1, 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network', - dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') }, + dbus.Dictionary({ 'ssid': dbus.ByteArray(b' ') }, signature='sv')), (1, '=set_network_properties;wpas_dbus_handler_add_network', dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')), @@ -1806,7 +1811,7 @@ def test_dbus_network_oom(dev, apdev): dbus.Dictionary({ 'priority': dbus.Int32(1) }, signature='sv')), (1, '=set_network_properties;wpas_dbus_handler_add_network', - dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') }, + dbus.Dictionary({ 'ssid': dbus.ByteArray(b' ') }, signature='sv')) ] for (count,funcs,args) in tests: with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"): @@ -1934,10 +1939,10 @@ def test_dbus_blob(dev, apdev): (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) - blob = dbus.ByteArray("\x01\x02\x03") + blob = dbus.ByteArray(b"\x01\x02\x03") iface.AddBlob('blob1', blob) try: - iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04")) + iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04")) raise Exception("Invalid AddBlob() accepted") except dbus.exceptions.DBusException as e: if "BlobExists" not in str(e): @@ -1973,8 +1978,8 @@ def test_dbus_blob(dev, apdev): self.blob_removed = False def __enter__(self): - gobject.timeout_add(1, self.run_blob) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_blob) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded") self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved") self.loop.run() @@ -1993,7 +1998,7 @@ def test_dbus_blob(dev, apdev): def run_blob(self, *args): logger.debug("run_blob") - iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04")) + iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04")) iface.RemoveBlob('blob2') return False @@ -2012,7 +2017,7 @@ def test_dbus_blob_oom(dev, apdev): for i in range(1, 4): with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob", "AddBlob"): - iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04")) + iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04")) def test_dbus_autoscan(dev, apdev): """D-Bus Autoscan()""" @@ -2121,8 +2126,8 @@ def test_dbus_tdls(dev, apdev): self.tdls_teardown = False def __enter__(self): - gobject.timeout_add(1, self.run_tdls) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_tdls) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, "PropertiesChanged") self.loop.run() @@ -2134,13 +2139,13 @@ def test_dbus_tdls(dev, apdev): def run_tdls(self, *args): logger.debug("run_tdls") iface.TDLSDiscover(addr1) - gobject.timeout_add(100, self.run_tdls2) + GObject.timeout_add(100, self.run_tdls2) return False def run_tdls2(self, *args): logger.debug("run_tdls2") iface.TDLSSetup(addr1) - gobject.timeout_add(500, self.run_tdls3) + GObject.timeout_add(500, self.run_tdls3) return False def run_tdls3(self, *args): @@ -2151,7 +2156,7 @@ def test_dbus_tdls(dev, apdev): else: logger.info("Unexpected TDLSStatus: " + res) iface.TDLSTeardown(addr1) - gobject.timeout_add(200, self.run_tdls4) + GObject.timeout_add(200, self.run_tdls4) return False def run_tdls4(self, *args): @@ -2192,8 +2197,8 @@ def test_dbus_tdls_channel_switch(dev, apdev): self.tdls_done = False def __enter__(self): - gobject.timeout_add(1, self.run_tdls) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_tdls) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, "PropertiesChanged") self.loop.run() @@ -2205,13 +2210,13 @@ def test_dbus_tdls_channel_switch(dev, apdev): def run_tdls(self, *args): logger.debug("run_tdls") iface.TDLSDiscover(addr1) - gobject.timeout_add(100, self.run_tdls2) + GObject.timeout_add(100, self.run_tdls2) return False def run_tdls2(self, *args): logger.debug("run_tdls2") iface.TDLSSetup(addr1) - gobject.timeout_add(500, self.run_tdls3) + GObject.timeout_add(500, self.run_tdls3) return False def run_tdls3(self, *args): @@ -2271,7 +2276,7 @@ def test_dbus_tdls_channel_switch(dev, apdev): signature='sv') iface.TDLSChannelSwitch(args) - gobject.timeout_add(200, self.run_tdls4) + GObject.timeout_add(200, self.run_tdls4) return False def run_tdls4(self, *args): @@ -2587,8 +2592,8 @@ def test_dbus_probe_req_reporting(dev, apdev): self.reported = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, "GroupStarted") self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest", @@ -2704,8 +2709,8 @@ def test_dbus_p2p_invalid(dev, apdev): {'RequestedDeviceTypes': dbus.Array([], signature="s")}, {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")}, {'RequestedDeviceTypes': dbus.Array([], signature="i")}, - {'RequestedDeviceTypes': [dbus.ByteArray('12345678'), - dbus.ByteArray('1234567')]}, + {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'), + dbus.ByteArray(b'1234567')]}, {'Foo': dbus.Int16(1)}, {'Foo': dbus.UInt16(1)}, {'Foo': dbus.Int64(1)}, @@ -2904,29 +2909,29 @@ def test_dbus_p2p_oom(dev, apdev): with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray", "Find", "InvalidArgs"): - p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] })) + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] })) with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray", "Find", "InvalidArgs"): - p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] })) + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] })) with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray", "Find", "InvalidArgs"): - p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123'), - dbus.ByteArray('123') ] })) + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123'), + dbus.ByteArray(b'123') ] })) with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray", "Find", "InvalidArgs"): - p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] })) + p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] })) with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find", "Find", "InvalidArgs"): @@ -2935,7 +2940,7 @@ def test_dbus_p2p_oom(dev, apdev): with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array", "AddService", "InvalidArgs"): args = { 'service_type': 'bonjour', - 'response': dbus.ByteArray(500*'b') } + 'response': dbus.ByteArray(500 * b'b') } p2p.AddService(args) with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array", @@ -2978,7 +2983,7 @@ def run_dbus_p2p_discovery(dev, apdev): raise Exception("Unexpected peer(s) in the list") args = {'DiscoveryType': 'social', - 'RequestedDeviceTypes': [dbus.ByteArray('12345678')], + 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')], 'Timeout': dbus.Int32(1) } p2p.Find(dbus.Dictionary(args)) p2p.StopFind() @@ -2993,8 +2998,8 @@ def run_dbus_p2p_discovery(dev, apdev): self.find_stopped = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.deviceFoundProperties, @@ -3029,7 +3034,7 @@ def run_dbus_p2p_discovery(dev, apdev): sec = res['SecondaryDeviceTypes'] if len(sec) < 1: raise Exception("Secondary device type missing") - if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec: + if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec: raise Exception("Secondary device type mismatch") if 'VendorExtension' not in res: @@ -3037,7 +3042,7 @@ def run_dbus_p2p_discovery(dev, apdev): vendor = res['VendorExtension'] if len(vendor) < 1: raise Exception("Vendor extension missing") - if "\x11\x22\x33\x44" not in vendor: + if b"\x11\x22\x33\x44" not in vendor: raise Exception("Secondary device type mismatch") if 'VSIE' not in res: @@ -3045,7 +3050,7 @@ def run_dbus_p2p_discovery(dev, apdev): vendor = res['VSIE'] if len(vendor) < 1: raise Exception("VSIE missing") - if vendor != "\xdd\x06\x00\x11\x22\x33\x55\x66": + if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66": raise Exception("VSIE mismatch") self.found = True @@ -3143,8 +3148,8 @@ def test_dbus_p2p_discovery_freq(dev, apdev): self.found = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(5000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(5000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.loop.run() @@ -3250,7 +3255,7 @@ def test_dbus_p2p_service_discovery(dev, apdev): { 'service_type': 'bonjour', 'response': 'foo' }, { 'service_type': 'bonjour', 'query': bonjour_query }, { 'service_type': 'bonjour', 'response': bonjour_response }, - { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') }, + { 'service_type': 'bonjour', 'query': dbus.ByteArray(500 * b'a') }, { 'service_type': 'bonjour', 'foo': 'bar' } ] for args in tests: try: @@ -3260,7 +3265,7 @@ def test_dbus_p2p_service_discovery(dev, apdev): if "InvalidArgs" not in str(e): raise Exception("Unexpected error message for invalid AddService(): " + str(e)) - args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") } + args = { 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01") } ref = p2p.ServiceDiscoveryRequest(args) p2p.ServiceDiscoveryCancelRequest(ref) try: @@ -3335,8 +3340,8 @@ def test_dbus_p2p_service_discovery_query(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.serviceDiscoveryResponse, @@ -3348,7 +3353,7 @@ def test_dbus_p2p_service_discovery_query(dev, apdev): def deviceFound(self, path): logger.debug("deviceFound: path=%s" % path) args = { 'peer_object': path, - 'tlv': dbus.ByteArray("\x02\x00\x00\x01") } + 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01") } p2p.ServiceDiscoveryRequest(args) def serviceDiscoveryResponse(self, sd_request): @@ -3396,8 +3401,8 @@ def _test_dbus_p2p_service_discovery_external(dev, apdev): self.sd = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.serviceDiscoveryRequest, @@ -3462,8 +3467,8 @@ def test_dbus_p2p_autogo(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, @@ -3553,7 +3558,10 @@ def test_dbus_p2p_autogo(dev, apdev): for p in peer: if len(addr) > 0: addr += ':' - addr += '%02x' % ord(p) + if type(p) == int: + addr += '%02x' % p + else: + addr += '%02x' % ord(p) params = { 'Role': 'registrar', 'P2PDeviceAddress': self.peer['DeviceAddress'], @@ -3598,7 +3606,7 @@ def test_dbus_p2p_autogo(dev, apdev): self.exceptions = True raise Exception("Unexpected number of group members") - ext = dbus.ByteArray("\x11\x22\x33\x44") + ext = dbus.ByteArray(b"\x11\x22\x33\x44") # Earlier implementation of this interface was a bit strange. The # property is defined to have aay signature and that is what the # getter returned. However, the setter expected there to be a @@ -3621,7 +3629,7 @@ def test_dbus_p2p_autogo(dev, apdev): # And now verify that the more appropriate encoding is accepted as # well. - res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff')) + res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff')) g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, dbus_interface=dbus.PROPERTIES_IFACE) res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', @@ -3635,7 +3643,7 @@ def test_dbus_p2p_autogo(dev, apdev): raise Exception("Vendor extension value changed") for i in range(10): - res.append(dbus.ByteArray('\xaa\xbb')) + res.append(dbus.ByteArray(b'\xaa\xbb')) try: g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, dbus_interface=dbus.PROPERTIES_IFACE) @@ -3722,8 +3730,8 @@ def test_dbus_p2p_autogo_pbc(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, @@ -3767,7 +3775,10 @@ def test_dbus_p2p_autogo_pbc(dev, apdev): for p in peer: if len(addr) > 0: addr += ':' - addr += '%02x' % ord(p) + if type(p) == int: + addr += '%02x' % p + else: + addr += '%02x' % ord(p) params = { 'Role': 'registrar', 'P2PDeviceAddress': self.peer['DeviceAddress'], 'Type': 'pbc' } @@ -3810,8 +3821,8 @@ def test_dbus_p2p_autogo_legacy(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, "GroupStarted") self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, @@ -3828,7 +3839,12 @@ def test_dbus_p2p_autogo_legacy(dev, apdev): res = g_obj.GetAll(WPAS_DBUS_GROUP, dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True) - bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']]) + if sys.version_info[0] > 2: + bssid = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format( + res['BSSID'][0], res['BSSID'][1], res['BSSID'][2], + res['BSSID'][3], res['BSSID'][4], res['BSSID'][5]) + else: + bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']]) pin = '12345670' params = { 'Role': 'enrollee', @@ -3886,8 +3902,8 @@ def test_dbus_p2p_join(dev, apdev): self.go = None def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, @@ -3947,7 +3963,7 @@ def test_dbus_p2p_join(dev, apdev): byte_arrays=True) logger.debug("Group properties: " + str(res)) - ext = dbus.ByteArray("\x11\x22\x33\x44") + ext = dbus.ByteArray(b"\x11\x22\x33\x44") try: # Set(WPSVendorExtensions) not allowed for P2P Client g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, @@ -4012,8 +4028,8 @@ def test_dbus_p2p_invitation_received(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE, "InvitationReceived") self.loop.run() @@ -4068,8 +4084,8 @@ def _test_dbus_p2p_config(dev, apdev): raise Exception("Parameter %s value changes" % k) changes = { 'SsidPostfix': 'foo', - 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ], - 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]} + 'VendorExtension': [ dbus.ByteArray(b'\x11\x22\x33\x44') ], + 'SecondaryDeviceTypes': [ dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88') ]} if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", dbus.Dictionary(changes, signature='sv'), dbus_interface=dbus.PROPERTIES_IFACE) @@ -4147,8 +4163,8 @@ def test_dbus_p2p_persistent(dev, apdev): TestDbus.__init__(self, bus) def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, "GroupStarted") self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, @@ -4246,8 +4262,8 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev): self.invited = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, @@ -4275,7 +4291,12 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev): res = g_obj.GetAll(WPAS_DBUS_GROUP, dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True) - bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']]) + if sys.version_info[0] > 2: + bssid = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format( + res['BSSID'][0], res['BSSID'][1], res['BSSID'][2], + res['BSSID'][3], res['BSSID'][4], res['BSSID'][5]) + else: + bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']]) dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') dev1.scan_for_bss(bssid, freq=2412) dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join") @@ -4328,7 +4349,10 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev): for p in peer: if len(addr) > 0: addr += ':' - addr += '%02x' % ord(p) + if type(p) == int: + addr += '%02x' % p + else: + addr += '%02x' % ord(p) params = { 'Role': 'registrar', 'P2PDeviceAddress': self.peer['DeviceAddress'], 'Bssid': self.peer['DeviceAddress'], @@ -4381,8 +4405,8 @@ def test_dbus_p2p_go_neg_rx(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.goNegotiationRequest, @@ -4466,8 +4490,8 @@ def test_dbus_p2p_go_neg_auth(dev, apdev): self.peer_disconnected = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.goNegotiationSuccess, @@ -4567,8 +4591,8 @@ def test_dbus_p2p_go_neg_init(dev, apdev): self.peer_group_removed = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.goNegotiationSuccess, @@ -4662,8 +4686,8 @@ def test_dbus_p2p_group_termination_by_go(dev, apdev): self.peer_group_removed = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.goNegotiationSuccess, @@ -4758,8 +4782,8 @@ def _test_dbus_p2p_group_idle_timeout(dev, apdev): self.peer_group_removed = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.add_signal(self.goNegotiationSuccess, @@ -4852,8 +4876,8 @@ def test_dbus_p2p_wps_failure(dev, apdev): self.formation_failure = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.goNegotiationRequest, WPAS_DBUS_IFACE_P2PDEVICE, "GONegotiationRequest", @@ -4938,8 +4962,8 @@ def test_dbus_p2p_two_groups(dev, apdev): self.groups_removed = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, "PropertiesChanged", byte_arrays=True) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, @@ -5017,7 +5041,12 @@ def test_dbus_p2p_two_groups(dev, apdev): g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS) g_wps.Start(params) - bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid]) + if sys.version_info[0] > 2: + bssid = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format( + self.g2_bssid[0], self.g2_bssid[1], self.g2_bssid[2], + self.g2_bssid[3], self.g2_bssid[4], self.g2_bssid[5]) + else: + bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid]) dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2') dev2.scan_for_bss(bssid, freq=2412) dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412") @@ -5125,8 +5154,8 @@ def test_dbus_p2p_cancel(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFound") self.loop.run() @@ -5195,8 +5224,8 @@ def test_dbus_p2p_ip_addr(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, "GroupStarted") self.loop.run() @@ -5280,8 +5309,8 @@ def run_busctl(service, obj): cmd.wait() logger.info("busctl stdout:\n%s" % out[0].strip()) if len(out[1]) > 0: - logger.info("busctl stderr: %s" % out[1].strip()) - if "Duplicate property" in out[1]: + logger.info("busctl stderr: %s" % out[1].decode().strip()) + if "Duplicate property" in out[1].decode(): raise Exception("Duplicate property") def test_dbus_introspect_busctl(dev, apdev): @@ -5321,8 +5350,8 @@ def test_dbus_ap(dev, apdev): self.stations = False def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, "NetworkSelected") @@ -5424,8 +5453,8 @@ def test_dbus_connect_wpa_eap(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, "PropertiesChanged") self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") @@ -5554,7 +5583,7 @@ def _test_dbus_vendor_elem(dev, apdev): dev[0].request("VENDOR_ELEM_REMOVE 1 *") try: - ie = dbus.ByteArray("\x00\x00") + ie = dbus.ByteArray(b"\x00\x00") iface.VendorElemAdd(-1, ie) raise Exception("Invalid VendorElemAdd() accepted") except dbus.exceptions.DBusException as e: @@ -5562,7 +5591,7 @@ def _test_dbus_vendor_elem(dev, apdev): raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e)) try: - ie = dbus.ByteArray("") + ie = dbus.ByteArray(bytes()) iface.VendorElemAdd(1, ie) raise Exception("Invalid VendorElemAdd() accepted") except dbus.exceptions.DBusException as e: @@ -5570,7 +5599,7 @@ def _test_dbus_vendor_elem(dev, apdev): raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e)) try: - ie = dbus.ByteArray("\x00\x01") + ie = dbus.ByteArray(b"\x00\x01") iface.VendorElemAdd(1, ie) raise Exception("Invalid VendorElemAdd() accepted") except dbus.exceptions.DBusException as e: @@ -5592,7 +5621,7 @@ def _test_dbus_vendor_elem(dev, apdev): raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e)) try: - ie = dbus.ByteArray("\x00\x00") + ie = dbus.ByteArray(b"\x00\x00") iface.VendorElemRem(-1, ie) raise Exception("Invalid VendorElemRemove() accepted") except dbus.exceptions.DBusException as e: @@ -5600,16 +5629,16 @@ def _test_dbus_vendor_elem(dev, apdev): raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) try: - ie = dbus.ByteArray("") + ie = dbus.ByteArray(bytes()) iface.VendorElemRem(1, ie) raise Exception("Invalid VendorElemRemove() accepted") except dbus.exceptions.DBusException as e: if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) - iface.VendorElemRem(1, "*") + iface.VendorElemRem(1, b"*") - ie = dbus.ByteArray("\x00\x01\x00") + ie = dbus.ByteArray(b"\x00\x01\x00") iface.VendorElemAdd(1, ie) val = iface.VendorElemGet(1) @@ -5619,7 +5648,7 @@ def _test_dbus_vendor_elem(dev, apdev): if val[i] != dbus.Byte(ie[i]): raise Exception("Unexpected VendorElemGet data") - ie2 = dbus.ByteArray("\xe0\x00") + ie2 = dbus.ByteArray(b"\xe0\x00") iface.VendorElemAdd(1, ie2) ies = ie + ie2 @@ -5631,7 +5660,7 @@ def _test_dbus_vendor_elem(dev, apdev): raise Exception("Unexpected VendorElemGet data[2]") try: - test_ie = dbus.ByteArray("\x01\x01") + test_ie = dbus.ByteArray(b"\x01\x01") iface.VendorElemRem(1, test_ie) raise Exception("Invalid VendorElemRemove() accepted") except dbus.exceptions.DBusException as e: @@ -5643,7 +5672,7 @@ def _test_dbus_vendor_elem(dev, apdev): if len(val) != len(ie2): raise Exception("Unexpected VendorElemGet length[3]") - iface.VendorElemRem(1, "*") + iface.VendorElemRem(1, b"*") try: iface.VendorElemGet(1) raise Exception("Invalid VendorElemGet() accepted after removal") @@ -5668,8 +5697,8 @@ def test_dbus_assoc_reject(dev, apdev): self.state = 0 def __enter__(self): - gobject.timeout_add(1, self.run_connect) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_connect) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, "PropertiesChanged") self.loop.run() @@ -5717,8 +5746,8 @@ def test_dbus_mesh(dev, apdev): self.done = False def __enter__(self): - gobject.timeout_add(1, self.run_test) - gobject.timeout_add(15000, self.timeout) + GObject.timeout_add(1, self.run_test) + GObject.timeout_add(15000, self.timeout) self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH, "MeshGroupStarted") self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH, @@ -5747,14 +5776,14 @@ def test_dbus_mesh(dev, apdev): logger.debug("MeshPeers: " + str(res)) if len(res) != 1: raise Exception("Unexpected number of MeshPeer values") - if binascii.hexlify(res[0]) != addr1.replace(':', ''): + if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''): raise Exception("Unexpected peer address") res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup', dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True) logger.debug("MeshGroup: " + str(res)) - if res != "wpas-mesh-open": + if res != b"wpas-mesh-open": raise Exception("Unexpected MeshGroup") dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') dev1.mesh_group_remove()