@@ -9,9 +9,14 @@ import logging
logger = logging.getLogger()
import subprocess
import time
+import sys
+
+if sys.version_info[0] > 2:
+ from gi.repository import GObject
+else:
+ import gobject as GObject
try:
- import gobject
import dbus
dbus_imported = True
except ImportError:
@@ -56,7 +61,7 @@ def prepare_dbus(dev):
class TestDbus(object):
def __init__(self, bus):
- self.loop = gobject.MainLoop()
+ self.loop = GObject.MainLoop()
self.signals = []
self.bus = bus
@@ -259,14 +264,14 @@ def test_dbus_properties(dev, apdev):
if val != res:
raise Exception("WFDIEs value changed")
try:
- dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
+ dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00'))
raise Exception("Invalid WFDIEs value accepted")
except dbus.exceptions.DBusException as e:
if "InvalidArgs" not in str(e):
raise Exception("Unexpected error message: " + str(e))
- dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
+ dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(bytes()))
dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
- dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
+ dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(bytes()))
res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
if len(res) != 0:
raise Exception("WFDIEs not cleared properly")
@@ -314,7 +319,7 @@ def test_dbus_properties(dev, apdev):
try:
wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
- dbus.ByteArray('', variant_level=2),
+ dbus.ByteArray(bytes(), variant_level=2),
dbus_interface=dbus.PROPERTIES_IFACE)
raise Exception("Invalid Set accepted")
except dbus.exceptions.DBusException as e:
@@ -443,7 +448,7 @@ def _test_dbus_get_set_wps(dev, apdev):
if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
raise Exception("DeviceType mismatch after Set")
- val2 = '\x01\x02\x03\x04\x05\x06\x07\x08'
+ val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08'
if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
dbus_interface=dbus.PROPERTIES_IFACE)
val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
@@ -460,8 +465,8 @@ def _test_dbus_get_set_wps(dev, apdev):
self.sets_done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_sets)
- gobject.timeout_add(1000, self.timeout)
+ GObject.timeout_add(1, self.run_sets)
+ GObject.timeout_add(1000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
"PropertiesChanged")
self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
@@ -471,7 +476,7 @@ def _test_dbus_get_set_wps(dev, apdev):
def propertiesChanged(self, properties):
logger.debug("PropertiesChanged: " + str(properties))
- if properties.has_key("ProcessCredentials"):
+ if "ProcessCredentials" in properties:
self.signal_received_deprecated = True
if self.sets_done and self.signal_received:
self.loop.quit()
@@ -481,7 +486,7 @@ def _test_dbus_get_set_wps(dev, apdev):
logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
if interface_name != WPAS_DBUS_IFACE_WPS:
return
- if changed_properties.has_key("ProcessCredentials"):
+ if "ProcessCredentials" in changed_properties:
self.signal_received = True
if self.sets_done and self.signal_received_deprecated:
self.loop.quit()
@@ -527,11 +532,11 @@ def test_dbus_wps_invalid(dev, apdev):
'Bssid': '02:33:44:55:66:77'},
{'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
{'Role': 'enrollee', 'Type': 'pbc',
- 'Bssid': dbus.ByteArray('12345')},
+ 'Bssid': dbus.ByteArray(b'12345')},
{'Role': 'enrollee', 'Type': 'pbc',
'P2PDeviceAddress': 12345},
{'Role': 'enrollee', 'Type': 'pbc',
- 'P2PDeviceAddress': dbus.ByteArray('12345')},
+ 'P2PDeviceAddress': dbus.ByteArray(b'12345')},
{'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
for args in failures:
try:
@@ -640,8 +645,8 @@ def _test_dbus_wps_pbc(dev, apdev):
self.wps = wps
def __enter__(self):
- gobject.timeout_add(1, self.start_pbc)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.start_pbc)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
"Credentials")
@@ -700,8 +705,8 @@ def test_dbus_wps_pbc_overlap(dev, apdev):
self.wps = wps
def __enter__(self):
- gobject.timeout_add(1, self.start_pbc)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.start_pbc)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
self.loop.run()
return self
@@ -753,8 +758,8 @@ def _test_dbus_wps_pin(dev, apdev):
self.credentials_received = False
def __enter__(self):
- gobject.timeout_add(1, self.start_pin)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.start_pin)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
"Credentials")
@@ -776,7 +781,7 @@ def _test_dbus_wps_pin(dev, apdev):
def start_pin(self, *args):
logger.debug("start_pin")
- bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
+ bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
'Bssid': bssid_ay})
return False
@@ -813,8 +818,8 @@ def _test_dbus_wps_pin2(dev, apdev):
self.failed = False
def __enter__(self):
- gobject.timeout_add(1, self.start_pin)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.start_pin)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
"Credentials")
@@ -836,7 +841,7 @@ def _test_dbus_wps_pin2(dev, apdev):
def start_pin(self, *args):
logger.debug("start_pin")
- bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
+ bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
'Bssid': bssid_ay})
pin = res['Pin']
@@ -876,8 +881,8 @@ def _test_dbus_wps_pin_m2d(dev, apdev):
self.credentials_received = False
def __enter__(self):
- gobject.timeout_add(1, self.start_pin)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.start_pin)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
"Credentials")
@@ -902,7 +907,7 @@ def _test_dbus_wps_pin_m2d(dev, apdev):
def start_pin(self, *args):
logger.debug("start_pin")
- bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
+ bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
'Bssid': bssid_ay})
return False
@@ -939,8 +944,8 @@ def _test_dbus_wps_reg(dev, apdev):
self.credentials_received = False
def __enter__(self):
- gobject.timeout_add(100, self.start_reg)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(100, self.start_reg)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
"Credentials")
@@ -957,7 +962,7 @@ def _test_dbus_wps_reg(dev, apdev):
def start_reg(self, *args):
logger.debug("start_reg")
- bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
+ bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
wps.Start({'Role': 'registrar', 'Type': 'pin',
'Pin': '12345670', 'Bssid': bssid_ay})
return False
@@ -981,7 +986,7 @@ def test_dbus_wps_cancel(dev, apdev):
wps.Cancel()
dev[0].scan_for_bss(bssid, freq="2412")
- bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
+ bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
'Bssid': bssid_ay})
wps.Cancel()
@@ -999,18 +1004,18 @@ def test_dbus_scan_invalid(dev, apdev):
({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
({'Type': 'active',
- 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
- dbus.ByteArray("3"), dbus.ByteArray("4"),
- dbus.ByteArray("5"), dbus.ByteArray("6"),
- dbus.ByteArray("7"), dbus.ByteArray("8"),
- dbus.ByteArray("9"), dbus.ByteArray("10"),
- dbus.ByteArray("11"), dbus.ByteArray("12"),
- dbus.ByteArray("13"), dbus.ByteArray("14"),
- dbus.ByteArray("15"), dbus.ByteArray("16"),
- dbus.ByteArray("17") ]},
+ 'SSIDs': [ dbus.ByteArray(b"1"), dbus.ByteArray(b"2"),
+ dbus.ByteArray(b"3"), dbus.ByteArray(b"4"),
+ dbus.ByteArray(b"5"), dbus.ByteArray(b"6"),
+ dbus.ByteArray(b"7"), dbus.ByteArray(b"8"),
+ dbus.ByteArray(b"9"), dbus.ByteArray(b"10"),
+ dbus.ByteArray(b"11"), dbus.ByteArray(b"12"),
+ dbus.ByteArray(b"13"), dbus.ByteArray(b"14"),
+ dbus.ByteArray(b"15"), dbus.ByteArray(b"16"),
+ dbus.ByteArray(b"17") ]},
"InvalidArgs"),
({'Type': 'active',
- 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
+ 'SSIDs': [ dbus.ByteArray(b"1234567890abcdef1234567890abcdef1") ]},
"InvalidArgs"),
({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
@@ -1023,9 +1028,9 @@ def test_dbus_scan_invalid(dev, apdev):
'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
"InvalidArgs"),
({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
- ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
+ ({'Type': 'passive', 'IEs': [ dbus.ByteArray(b"\xdd\x00") ]},
"InvalidArgs"),
- ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
+ ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray(b"foo") ]},
"InvalidArgs")]
for (t,err) in tests:
try:
@@ -1056,14 +1061,14 @@ def test_dbus_scan_oom(dev, apdev):
"=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
"Scan"):
iface.Scan({ 'Type': 'active',
- 'IEs': [ dbus.ByteArray("\xdd\x00") ],
+ 'IEs': [ dbus.ByteArray(b"\xdd\x00") ],
'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
with alloc_fail_dbus(dev[0], 1,
"=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
"Scan"):
iface.Scan({ 'Type': 'active',
- 'SSIDs': [ dbus.ByteArray("open"),
+ 'SSIDs': [ dbus.ByteArray(b"open"),
dbus.ByteArray() ],
'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
@@ -1082,8 +1087,8 @@ def test_dbus_scan(dev, apdev):
self.fail_reason = None
def __enter__(self):
- gobject.timeout_add(1, self.run_scan)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_scan)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
@@ -1120,9 +1125,9 @@ def test_dbus_scan(dev, apdev):
def run_scan(self, *args):
logger.debug("run_scan")
iface.Scan({'Type': 'active',
- 'SSIDs': [ dbus.ByteArray("open"),
+ 'SSIDs': [ dbus.ByteArray(b"open"),
dbus.ByteArray() ],
- 'IEs': [ dbus.ByteArray("\xdd\x00"),
+ 'IEs': [ dbus.ByteArray(b"\xdd\x00"),
dbus.ByteArray() ],
'AllowRoam': False,
'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
@@ -1211,8 +1216,8 @@ def test_dbus_connect(dev, apdev):
self.state = 0
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
"NetworkRemoved")
@@ -1311,8 +1316,8 @@ def test_dbus_connect_psk_mem(dev, apdev):
self.connected = False
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
"PropertiesChanged")
self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
@@ -1371,8 +1376,8 @@ def test_dbus_connect_oom(dev, apdev):
self.state = 0
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(1500, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(1500, self.timeout)
self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
"NetworkRemoved")
@@ -1530,8 +1535,8 @@ def test_dbus_connect_eap(dev, apdev):
self.state = 0
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
"PropertiesChanged")
self.add_signal(self.certification, WPAS_DBUS_IFACE,
@@ -1793,7 +1798,7 @@ def test_dbus_network_oom(dev, apdev):
tests = [ (1,
'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
- dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
+ dbus.Dictionary({ 'ssid': dbus.ByteArray(b' ') },
signature='sv')),
(1, '=set_network_properties;wpas_dbus_handler_add_network',
dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
@@ -1806,7 +1811,7 @@ def test_dbus_network_oom(dev, apdev):
dbus.Dictionary({ 'priority': dbus.Int32(1) },
signature='sv')),
(1, '=set_network_properties;wpas_dbus_handler_add_network',
- dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
+ dbus.Dictionary({ 'ssid': dbus.ByteArray(b' ') },
signature='sv')) ]
for (count,funcs,args) in tests:
with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
@@ -1934,10 +1939,10 @@ def test_dbus_blob(dev, apdev):
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
- blob = dbus.ByteArray("\x01\x02\x03")
+ blob = dbus.ByteArray(b"\x01\x02\x03")
iface.AddBlob('blob1', blob)
try:
- iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
+ iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04"))
raise Exception("Invalid AddBlob() accepted")
except dbus.exceptions.DBusException as e:
if "BlobExists" not in str(e):
@@ -1973,8 +1978,8 @@ def test_dbus_blob(dev, apdev):
self.blob_removed = False
def __enter__(self):
- gobject.timeout_add(1, self.run_blob)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_blob)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
self.loop.run()
@@ -1993,7 +1998,7 @@ def test_dbus_blob(dev, apdev):
def run_blob(self, *args):
logger.debug("run_blob")
- iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
+ iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04"))
iface.RemoveBlob('blob2')
return False
@@ -2012,7 +2017,7 @@ def test_dbus_blob_oom(dev, apdev):
for i in range(1, 4):
with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
"AddBlob"):
- iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
+ iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04"))
def test_dbus_autoscan(dev, apdev):
"""D-Bus Autoscan()"""
@@ -2121,8 +2126,8 @@ def test_dbus_tdls(dev, apdev):
self.tdls_teardown = False
def __enter__(self):
- gobject.timeout_add(1, self.run_tdls)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_tdls)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
"PropertiesChanged")
self.loop.run()
@@ -2134,13 +2139,13 @@ def test_dbus_tdls(dev, apdev):
def run_tdls(self, *args):
logger.debug("run_tdls")
iface.TDLSDiscover(addr1)
- gobject.timeout_add(100, self.run_tdls2)
+ GObject.timeout_add(100, self.run_tdls2)
return False
def run_tdls2(self, *args):
logger.debug("run_tdls2")
iface.TDLSSetup(addr1)
- gobject.timeout_add(500, self.run_tdls3)
+ GObject.timeout_add(500, self.run_tdls3)
return False
def run_tdls3(self, *args):
@@ -2151,7 +2156,7 @@ def test_dbus_tdls(dev, apdev):
else:
logger.info("Unexpected TDLSStatus: " + res)
iface.TDLSTeardown(addr1)
- gobject.timeout_add(200, self.run_tdls4)
+ GObject.timeout_add(200, self.run_tdls4)
return False
def run_tdls4(self, *args):
@@ -2192,8 +2197,8 @@ def test_dbus_tdls_channel_switch(dev, apdev):
self.tdls_done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_tdls)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_tdls)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
"PropertiesChanged")
self.loop.run()
@@ -2205,13 +2210,13 @@ def test_dbus_tdls_channel_switch(dev, apdev):
def run_tdls(self, *args):
logger.debug("run_tdls")
iface.TDLSDiscover(addr1)
- gobject.timeout_add(100, self.run_tdls2)
+ GObject.timeout_add(100, self.run_tdls2)
return False
def run_tdls2(self, *args):
logger.debug("run_tdls2")
iface.TDLSSetup(addr1)
- gobject.timeout_add(500, self.run_tdls3)
+ GObject.timeout_add(500, self.run_tdls3)
return False
def run_tdls3(self, *args):
@@ -2271,7 +2276,7 @@ def test_dbus_tdls_channel_switch(dev, apdev):
signature='sv')
iface.TDLSChannelSwitch(args)
- gobject.timeout_add(200, self.run_tdls4)
+ GObject.timeout_add(200, self.run_tdls4)
return False
def run_tdls4(self, *args):
@@ -2587,8 +2592,8 @@ def test_dbus_probe_req_reporting(dev, apdev):
self.reported = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
"GroupStarted")
self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
@@ -2704,8 +2709,8 @@ def test_dbus_p2p_invalid(dev, apdev):
{'RequestedDeviceTypes': dbus.Array([], signature="s")},
{'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
{'RequestedDeviceTypes': dbus.Array([], signature="i")},
- {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
- dbus.ByteArray('1234567')]},
+ {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'),
+ dbus.ByteArray(b'1234567')]},
{'Foo': dbus.Int16(1)},
{'Foo': dbus.UInt16(1)},
{'Foo': dbus.Int64(1)},
@@ -2904,29 +2909,29 @@ def test_dbus_p2p_oom(dev, apdev):
with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
"Find", "InvalidArgs"):
- p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
+ p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] }))
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
"Find", "InvalidArgs"):
- p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
+ p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] }))
with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
"Find", "InvalidArgs"):
- p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123'),
- dbus.ByteArray('123') ] }))
+ p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123'),
+ dbus.ByteArray(b'123') ] }))
with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
"Find", "InvalidArgs"):
- p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
+ p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] }))
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
"Find", "InvalidArgs"):
@@ -2935,7 +2940,7 @@ def test_dbus_p2p_oom(dev, apdev):
with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
"AddService", "InvalidArgs"):
args = { 'service_type': 'bonjour',
- 'response': dbus.ByteArray(500*'b') }
+ 'response': dbus.ByteArray(500 * b'b') }
p2p.AddService(args)
with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
@@ -2978,7 +2983,7 @@ def run_dbus_p2p_discovery(dev, apdev):
raise Exception("Unexpected peer(s) in the list")
args = {'DiscoveryType': 'social',
- 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
+ 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')],
'Timeout': dbus.Int32(1) }
p2p.Find(dbus.Dictionary(args))
p2p.StopFind()
@@ -2993,8 +2998,8 @@ def run_dbus_p2p_discovery(dev, apdev):
self.find_stopped = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.deviceFoundProperties,
@@ -3029,7 +3034,7 @@ def run_dbus_p2p_discovery(dev, apdev):
sec = res['SecondaryDeviceTypes']
if len(sec) < 1:
raise Exception("Secondary device type missing")
- if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
+ if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
raise Exception("Secondary device type mismatch")
if 'VendorExtension' not in res:
@@ -3037,7 +3042,7 @@ def run_dbus_p2p_discovery(dev, apdev):
vendor = res['VendorExtension']
if len(vendor) < 1:
raise Exception("Vendor extension missing")
- if "\x11\x22\x33\x44" not in vendor:
+ if b"\x11\x22\x33\x44" not in vendor:
raise Exception("Secondary device type mismatch")
if 'VSIE' not in res:
@@ -3045,7 +3050,7 @@ def run_dbus_p2p_discovery(dev, apdev):
vendor = res['VSIE']
if len(vendor) < 1:
raise Exception("VSIE missing")
- if vendor != "\xdd\x06\x00\x11\x22\x33\x55\x66":
+ if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66":
raise Exception("VSIE mismatch")
self.found = True
@@ -3143,8 +3148,8 @@ def test_dbus_p2p_discovery_freq(dev, apdev):
self.found = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(5000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(5000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.loop.run()
@@ -3250,7 +3255,7 @@ def test_dbus_p2p_service_discovery(dev, apdev):
{ 'service_type': 'bonjour', 'response': 'foo' },
{ 'service_type': 'bonjour', 'query': bonjour_query },
{ 'service_type': 'bonjour', 'response': bonjour_response },
- { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
+ { 'service_type': 'bonjour', 'query': dbus.ByteArray(500 * b'a') },
{ 'service_type': 'bonjour', 'foo': 'bar' } ]
for args in tests:
try:
@@ -3260,7 +3265,7 @@ def test_dbus_p2p_service_discovery(dev, apdev):
if "InvalidArgs" not in str(e):
raise Exception("Unexpected error message for invalid AddService(): " + str(e))
- args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
+ args = { 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01") }
ref = p2p.ServiceDiscoveryRequest(args)
p2p.ServiceDiscoveryCancelRequest(ref)
try:
@@ -3335,8 +3340,8 @@ def test_dbus_p2p_service_discovery_query(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.serviceDiscoveryResponse,
@@ -3348,7 +3353,7 @@ def test_dbus_p2p_service_discovery_query(dev, apdev):
def deviceFound(self, path):
logger.debug("deviceFound: path=%s" % path)
args = { 'peer_object': path,
- 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
+ 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01") }
p2p.ServiceDiscoveryRequest(args)
def serviceDiscoveryResponse(self, sd_request):
@@ -3396,8 +3401,8 @@ def _test_dbus_p2p_service_discovery_external(dev, apdev):
self.sd = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.serviceDiscoveryRequest,
@@ -3462,8 +3467,8 @@ def test_dbus_p2p_autogo(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -3553,7 +3558,10 @@ def test_dbus_p2p_autogo(dev, apdev):
for p in peer:
if len(addr) > 0:
addr += ':'
- addr += '%02x' % ord(p)
+ if type(p) == int:
+ addr += '%02x' % p
+ else:
+ addr += '%02x' % ord(p)
params = { 'Role': 'registrar',
'P2PDeviceAddress': self.peer['DeviceAddress'],
@@ -3598,7 +3606,7 @@ def test_dbus_p2p_autogo(dev, apdev):
self.exceptions = True
raise Exception("Unexpected number of group members")
- ext = dbus.ByteArray("\x11\x22\x33\x44")
+ ext = dbus.ByteArray(b"\x11\x22\x33\x44")
# Earlier implementation of this interface was a bit strange. The
# property is defined to have aay signature and that is what the
# getter returned. However, the setter expected there to be a
@@ -3621,7 +3629,7 @@ def test_dbus_p2p_autogo(dev, apdev):
# And now verify that the more appropriate encoding is accepted as
# well.
- res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
+ res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff'))
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
dbus_interface=dbus.PROPERTIES_IFACE)
res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
@@ -3635,7 +3643,7 @@ def test_dbus_p2p_autogo(dev, apdev):
raise Exception("Vendor extension value changed")
for i in range(10):
- res.append(dbus.ByteArray('\xaa\xbb'))
+ res.append(dbus.ByteArray(b'\xaa\xbb'))
try:
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
dbus_interface=dbus.PROPERTIES_IFACE)
@@ -3722,8 +3730,8 @@ def test_dbus_p2p_autogo_pbc(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -3767,7 +3775,10 @@ def test_dbus_p2p_autogo_pbc(dev, apdev):
for p in peer:
if len(addr) > 0:
addr += ':'
- addr += '%02x' % ord(p)
+ if type(p) == int:
+ addr += '%02x' % p
+ else:
+ addr += '%02x' % ord(p)
params = { 'Role': 'registrar',
'P2PDeviceAddress': self.peer['DeviceAddress'],
'Type': 'pbc' }
@@ -3810,8 +3821,8 @@ def test_dbus_p2p_autogo_legacy(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
"GroupStarted")
self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -3828,7 +3839,12 @@ def test_dbus_p2p_autogo_legacy(dev, apdev):
res = g_obj.GetAll(WPAS_DBUS_GROUP,
dbus_interface=dbus.PROPERTIES_IFACE,
byte_arrays=True)
- bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
+ if sys.version_info[0] > 2:
+ bssid = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format(
+ res['BSSID'][0], res['BSSID'][1], res['BSSID'][2],
+ res['BSSID'][3], res['BSSID'][4], res['BSSID'][5])
+ else:
+ bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
pin = '12345670'
params = { 'Role': 'enrollee',
@@ -3886,8 +3902,8 @@ def test_dbus_p2p_join(dev, apdev):
self.go = None
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -3947,7 +3963,7 @@ def test_dbus_p2p_join(dev, apdev):
byte_arrays=True)
logger.debug("Group properties: " + str(res))
- ext = dbus.ByteArray("\x11\x22\x33\x44")
+ ext = dbus.ByteArray(b"\x11\x22\x33\x44")
try:
# Set(WPSVendorExtensions) not allowed for P2P Client
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
@@ -4012,8 +4028,8 @@ def test_dbus_p2p_invitation_received(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
"InvitationReceived")
self.loop.run()
@@ -4068,8 +4084,8 @@ def _test_dbus_p2p_config(dev, apdev):
raise Exception("Parameter %s value changes" % k)
changes = { 'SsidPostfix': 'foo',
- 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
- 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
+ 'VendorExtension': [ dbus.ByteArray(b'\x11\x22\x33\x44') ],
+ 'SecondaryDeviceTypes': [ dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88') ]}
if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
dbus.Dictionary(changes, signature='sv'),
dbus_interface=dbus.PROPERTIES_IFACE)
@@ -4147,8 +4163,8 @@ def test_dbus_p2p_persistent(dev, apdev):
TestDbus.__init__(self, bus)
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
"GroupStarted")
self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -4246,8 +4262,8 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
self.invited = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -4275,7 +4291,12 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
res = g_obj.GetAll(WPAS_DBUS_GROUP,
dbus_interface=dbus.PROPERTIES_IFACE,
byte_arrays=True)
- bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
+ if sys.version_info[0] > 2:
+ bssid = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format(
+ res['BSSID'][0], res['BSSID'][1], res['BSSID'][2],
+ res['BSSID'][3], res['BSSID'][4], res['BSSID'][5])
+ else:
+ bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
dev1.scan_for_bss(bssid, freq=2412)
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
@@ -4328,7 +4349,10 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
for p in peer:
if len(addr) > 0:
addr += ':'
- addr += '%02x' % ord(p)
+ if type(p) == int:
+ addr += '%02x' % p
+ else:
+ addr += '%02x' % ord(p)
params = { 'Role': 'registrar',
'P2PDeviceAddress': self.peer['DeviceAddress'],
'Bssid': self.peer['DeviceAddress'],
@@ -4381,8 +4405,8 @@ def test_dbus_p2p_go_neg_rx(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.goNegotiationRequest,
@@ -4466,8 +4490,8 @@ def test_dbus_p2p_go_neg_auth(dev, apdev):
self.peer_disconnected = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.goNegotiationSuccess,
@@ -4567,8 +4591,8 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
self.peer_group_removed = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.goNegotiationSuccess,
@@ -4662,8 +4686,8 @@ def test_dbus_p2p_group_termination_by_go(dev, apdev):
self.peer_group_removed = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.goNegotiationSuccess,
@@ -4758,8 +4782,8 @@ def _test_dbus_p2p_group_idle_timeout(dev, apdev):
self.peer_group_removed = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.add_signal(self.goNegotiationSuccess,
@@ -4852,8 +4876,8 @@ def test_dbus_p2p_wps_failure(dev, apdev):
self.formation_failure = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.goNegotiationRequest,
WPAS_DBUS_IFACE_P2PDEVICE,
"GONegotiationRequest",
@@ -4938,8 +4962,8 @@ def test_dbus_p2p_two_groups(dev, apdev):
self.groups_removed = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
"PropertiesChanged", byte_arrays=True)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
@@ -5017,7 +5041,12 @@ def test_dbus_p2p_two_groups(dev, apdev):
g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
g_wps.Start(params)
- bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
+ if sys.version_info[0] > 2:
+ bssid = "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format(
+ self.g2_bssid[0], self.g2_bssid[1], self.g2_bssid[2],
+ self.g2_bssid[3], self.g2_bssid[4], self.g2_bssid[5])
+ else:
+ bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
dev2.scan_for_bss(bssid, freq=2412)
dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
@@ -5125,8 +5154,8 @@ def test_dbus_p2p_cancel(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
"DeviceFound")
self.loop.run()
@@ -5195,8 +5224,8 @@ def test_dbus_p2p_ip_addr(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
"GroupStarted")
self.loop.run()
@@ -5280,8 +5309,8 @@ def run_busctl(service, obj):
cmd.wait()
logger.info("busctl stdout:\n%s" % out[0].strip())
if len(out[1]) > 0:
- logger.info("busctl stderr: %s" % out[1].strip())
- if "Duplicate property" in out[1]:
+ logger.info("busctl stderr: %s" % out[1].decode().strip())
+ if "Duplicate property" in out[1].decode():
raise Exception("Duplicate property")
def test_dbus_introspect_busctl(dev, apdev):
@@ -5321,8 +5350,8 @@ def test_dbus_ap(dev, apdev):
self.stations = False
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
"NetworkSelected")
@@ -5424,8 +5453,8 @@ def test_dbus_connect_wpa_eap(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
"PropertiesChanged")
self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
@@ -5554,7 +5583,7 @@ def _test_dbus_vendor_elem(dev, apdev):
dev[0].request("VENDOR_ELEM_REMOVE 1 *")
try:
- ie = dbus.ByteArray("\x00\x00")
+ ie = dbus.ByteArray(b"\x00\x00")
iface.VendorElemAdd(-1, ie)
raise Exception("Invalid VendorElemAdd() accepted")
except dbus.exceptions.DBusException as e:
@@ -5562,7 +5591,7 @@ def _test_dbus_vendor_elem(dev, apdev):
raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
try:
- ie = dbus.ByteArray("")
+ ie = dbus.ByteArray(bytes())
iface.VendorElemAdd(1, ie)
raise Exception("Invalid VendorElemAdd() accepted")
except dbus.exceptions.DBusException as e:
@@ -5570,7 +5599,7 @@ def _test_dbus_vendor_elem(dev, apdev):
raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
try:
- ie = dbus.ByteArray("\x00\x01")
+ ie = dbus.ByteArray(b"\x00\x01")
iface.VendorElemAdd(1, ie)
raise Exception("Invalid VendorElemAdd() accepted")
except dbus.exceptions.DBusException as e:
@@ -5592,7 +5621,7 @@ def _test_dbus_vendor_elem(dev, apdev):
raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
try:
- ie = dbus.ByteArray("\x00\x00")
+ ie = dbus.ByteArray(b"\x00\x00")
iface.VendorElemRem(-1, ie)
raise Exception("Invalid VendorElemRemove() accepted")
except dbus.exceptions.DBusException as e:
@@ -5600,16 +5629,16 @@ def _test_dbus_vendor_elem(dev, apdev):
raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
try:
- ie = dbus.ByteArray("")
+ ie = dbus.ByteArray(bytes())
iface.VendorElemRem(1, ie)
raise Exception("Invalid VendorElemRemove() accepted")
except dbus.exceptions.DBusException as e:
if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
- iface.VendorElemRem(1, "*")
+ iface.VendorElemRem(1, b"*")
- ie = dbus.ByteArray("\x00\x01\x00")
+ ie = dbus.ByteArray(b"\x00\x01\x00")
iface.VendorElemAdd(1, ie)
val = iface.VendorElemGet(1)
@@ -5619,7 +5648,7 @@ def _test_dbus_vendor_elem(dev, apdev):
if val[i] != dbus.Byte(ie[i]):
raise Exception("Unexpected VendorElemGet data")
- ie2 = dbus.ByteArray("\xe0\x00")
+ ie2 = dbus.ByteArray(b"\xe0\x00")
iface.VendorElemAdd(1, ie2)
ies = ie + ie2
@@ -5631,7 +5660,7 @@ def _test_dbus_vendor_elem(dev, apdev):
raise Exception("Unexpected VendorElemGet data[2]")
try:
- test_ie = dbus.ByteArray("\x01\x01")
+ test_ie = dbus.ByteArray(b"\x01\x01")
iface.VendorElemRem(1, test_ie)
raise Exception("Invalid VendorElemRemove() accepted")
except dbus.exceptions.DBusException as e:
@@ -5643,7 +5672,7 @@ def _test_dbus_vendor_elem(dev, apdev):
if len(val) != len(ie2):
raise Exception("Unexpected VendorElemGet length[3]")
- iface.VendorElemRem(1, "*")
+ iface.VendorElemRem(1, b"*")
try:
iface.VendorElemGet(1)
raise Exception("Invalid VendorElemGet() accepted after removal")
@@ -5668,8 +5697,8 @@ def test_dbus_assoc_reject(dev, apdev):
self.state = 0
def __enter__(self):
- gobject.timeout_add(1, self.run_connect)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_connect)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
"PropertiesChanged")
self.loop.run()
@@ -5717,8 +5746,8 @@ def test_dbus_mesh(dev, apdev):
self.done = False
def __enter__(self):
- gobject.timeout_add(1, self.run_test)
- gobject.timeout_add(15000, self.timeout)
+ GObject.timeout_add(1, self.run_test)
+ GObject.timeout_add(15000, self.timeout)
self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
"MeshGroupStarted")
self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
@@ -5747,14 +5776,14 @@ def test_dbus_mesh(dev, apdev):
logger.debug("MeshPeers: " + str(res))
if len(res) != 1:
raise Exception("Unexpected number of MeshPeer values")
- if binascii.hexlify(res[0]) != addr1.replace(':', ''):
+ if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
raise Exception("Unexpected peer address")
res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
dbus_interface=dbus.PROPERTIES_IFACE,
byte_arrays=True)
logger.debug("MeshGroup: " + str(res))
- if res != "wpas-mesh-open":
+ if res != b"wpas-mesh-open":
raise Exception("Unexpected MeshGroup")
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
dev1.mesh_group_remove()
Signed-off-by: Masashi Honma <masashi.honma@gmail.com> --- tests/hwsim/test_dbus.py | 377 +++++++++++++++++++++------------------ 1 file changed, 203 insertions(+), 174 deletions(-)