@@ -8,6 +8,7 @@ import logging
logger = logging.getLogger()
import time
import random
+import re
import hwsim_utils
from wpasupplicant import WpaSupplicant
@@ -122,6 +123,99 @@ def p2ps_nonexact_seek(i_dev, r_dev, svc_name, srv_info=None, adv_num=None):
ev_list.append(''.join([adv_id, ' ', rcvd_svc_name]))
return ev_list
+def p2ps_parse_event(ev, *args):
+ ret = ()
+ for arg in args:
+ m = re.search("\s+" + arg + r"=(\S+)", ev)
+ ret += (m.group(1) if m is not None else None,)
+ return ret
+
+def p2ps_provision(seeker, advertiser, adv_id, auto_accept=True, method="1000"):
+ addr0 = seeker.p2p_dev_addr()
+ addr1 = advertiser.p2p_dev_addr()
+
+ seeker.asp_provision(addr1, adv_id=str(adv_id), adv_mac=addr1, session_id=1, session_mac=addr0, method=method)
+
+ if not auto_accept or method == "100":
+ pin = None
+ ev_pd_start = advertiser.wait_global_event(["P2PS-PROV-START"], timeout=10)
+ if ev_pd_start is None:
+ raise Exception("P2PS-PROV-START timeout on Advertiser side")
+ peer = ev_pd_start.split()[1]
+ advert_id, advert_mac, session, session_mac =\
+ p2ps_parse_event(ev_pd_start, "adv_id", "adv_mac", "session", "mac")
+
+ ev = seeker.wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-FAILURE timeout on seeker side")
+
+ if method == "100":
+ ev = advertiser.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on advertiser side")
+ if addr0 not in ev:
+ raise Exception("Unknown peer " + addr0)
+ ev = seeker.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on seeker side")
+ if addr1 not in ev:
+ raise Exception("Unknown peer " + addr1)
+ pin = ev.split()[2]
+ elif method == "8":
+ ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side")
+ if addr0 not in ev:
+ raise Exception("Unknown peer " + addr0)
+ pin = ev.split()[2]
+
+ advertiser.asp_provision(peer, adv_id=advert_id, adv_mac=advert_mac, session_id=int(session, 0),
+ session_mac=session_mac, status=12)
+
+ ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
+ if ev1 is None:
+ raise Exception("P2PS-PROV-DONE timeout on seeker side")
+
+ ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
+ if ev2 is None:
+ raise Exception("P2PS-PROV-DONE timeout on advertiser side")
+
+ if method == "8":
+ ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-ENTER-PIN failed on seeker side")
+ if addr1 not in ev:
+ raise Exception("Unknown peer " + addr1)
+
+ if pin is not None:
+ return ev1, ev2, pin
+ return ev1, ev2
+
+ # Auto-accept is true and the method is either P2PS or advertiser is DISPLAY
+ ev1 = seeker.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
+ if ev1 is None:
+ raise Exception("P2PS-PROV-DONE timeout on seeker side")
+
+ ev2 = advertiser.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
+ if ev2 is None:
+ raise Exception("P2PS-PROV-DONE timeout on advertiser side")
+
+ if method == "8":
+ ev = seeker.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on seeker side")
+ if addr1 not in ev:
+ raise Exception("Unknown peer " + addr1)
+ ev = advertiser.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=10)
+ if ev is None:
+ raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on advertiser side")
+ if addr0 not in ev:
+ raise Exception("Unknown peer " + addr0)
+ pin = ev.split()[2]
+ return ev1, ev2, pin
+
+ return ev1, ev2
+
def p2p_connect_p2ps_method(i_dev, r_dev, autoaccept):
"""P2PS connect function with p2ps method"""
if autoaccept == False: