diff mbox

[RFC] tests: add tests for HS 2.0 frame filtering

Message ID 1428652411-1162-1-git-send-email-johannes@sipsolutions.net
State RFC
Headers show

Commit Message

Johannes Berg April 10, 2015, 7:53 a.m. UTC
From: Johannes Berg <johannes.berg@intel.com>

Signed-off-by: Johannes Berg <johannes.berg@intel.com>
---
 tests/hwsim/test_ap_hs20.py     |   7 +-
 tests/hwsim/test_hs20_filter.py | 218 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 221 insertions(+), 4 deletions(-)
 create mode 100644 tests/hwsim/test_hs20_filter.py

Comments

Johannes Berg April 10, 2015, 7:58 a.m. UTC | #1
A few notes...

First of all, of course this is conditional on the kernel patches being
accepted.

> +    procfile = '/proc/sys/net/ipv4/conf/%s/drop_unicast_in_l2_multicast' % dev.ifname

Secondly, with all this proc-file access, this is clearly a very
low-level test.

We'll need some logic in wpa_supplicant to set these knobs
appropriately, and once we have that we should have another set of tests
that checks that when we connect to a suitable HS2 AP they get set
correctly (and the filtering works as expected). Some of the test code
from here could be lifted for that, but the changes to the sysctl knobs
clearly should come from wpa_supplicant instead of the test code then.

johannes
diff mbox

Patch

diff --git a/tests/hwsim/test_ap_hs20.py b/tests/hwsim/test_ap_hs20.py
index 90f630cd7597..196ec78df8f9 100644
--- a/tests/hwsim/test_ap_hs20.py
+++ b/tests/hwsim/test_ap_hs20.py
@@ -2819,7 +2819,7 @@  def send_ns(dev, src_ll=None, target=None, ip_src=None, ip_dst=None, opt=None,
     if "OK" not in dev.request(cmd + binascii.hexlify(pkt)):
         raise Exception("DATA_TEST_FRAME failed")
 
-def build_na(src_ll, ip_src, ip_dst, target, opt=None):
+def build_na(src_ll, ip_src, ip_dst, target, opt=None, flags=0):
     link_mc = binascii.unhexlify("3333ff000002")
     _src_ll = binascii.unhexlify(src_ll.replace(':',''))
     proto = '\x86\xdd'
@@ -2827,12 +2827,11 @@  def build_na(src_ll, ip_src, ip_dst, target, opt=None):
     _ip_src = socket.inet_pton(socket.AF_INET6, ip_src)
     _ip_dst = socket.inet_pton(socket.AF_INET6, ip_dst)
 
-    reserved = '\x00\x00\x00\x00'
     _target = socket.inet_pton(socket.AF_INET6, target)
     if opt:
-        payload = reserved + _target + opt
+        payload = struct.pack('>Bxxx', flags) + _target + opt
     else:
-        payload = reserved + _target
+        payload = struct.pack('>Bxxx', flags) + _target
     icmp = build_icmpv6(_ip_src + _ip_dst, 136, 0, payload)
 
     ipv6 = struct.pack('>BBBBHBB', 0x60, 0, 0, 0, len(icmp), 58, 255)
diff --git a/tests/hwsim/test_hs20_filter.py b/tests/hwsim/test_hs20_filter.py
new file mode 100644
index 000000000000..4bc31c3048e0
--- /dev/null
+++ b/tests/hwsim/test_hs20_filter.py
@@ -0,0 +1,218 @@ 
+# Hotspot 2.0 filtering tests
+# Copyright (c) 2015, Intel Mobile Communications GmbH
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import hostapd
+import hwsim_utils
+import socket
+import subprocess
+import binascii
+from utils import HwsimSkip, require_under_vm
+import os
+import time
+from test_ap_hs20 import build_arp, build_na
+import struct
+
+class IPAssign(object):
+    def __init__(self, iface, addr, ipv6=False):
+        self._iface = iface
+        self._addr = addr
+        self._cmd = ['ip']
+        if ipv6:
+            self._cmd.append('-6')
+        self._cmd.append('addr')
+        self._ipv6 = ipv6
+    def __enter__(self):
+        subprocess.call(self._cmd + ['add', self._addr, 'dev', self._iface])
+        if self._ipv6:
+            # wait for DAD to finish
+            while True:
+                o = subprocess.check_output(self._cmd + ['show', 'tentative', 'dev', self._iface])
+                if not self._addr in o:
+                    break
+                time.sleep(0.1)
+    def __exit__(self, type, value, traceback):
+        subprocess.call(self._cmd + ['del', self._addr, 'dev', self._iface])
+
+def _test_ip4_gtk_drop(devs, apdevs, params, dst):
+    require_under_vm()
+    dev = devs[0]
+    procfile = '/proc/sys/net/ipv4/conf/%s/drop_unicast_in_l2_multicast' % dev.ifname
+    if not os.path.exists(procfile):
+        raise HwsimSkip("kernel doesn't have capability")
+
+    ap_params = { 'ssid': 'open', 'channel': '5' }
+    hapd = hostapd.add_ap(apdevs[0]['ifname'], ap_params)
+    dev.connect('open', key_mgmt="NONE", scan_freq="2432")
+
+    with IPAssign(dev.ifname, '10.0.0.1/24'):
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        s.bind(("10.0.0.1", 12345))
+        s.settimeout(0.1)
+
+        pkt = dst
+        pkt += hapd.own_addr().replace(':', '')
+        pkt += '0800'
+        pkt += '45000020786840004011ae600a0000040a000001'
+        pkt += '30393039000c0000'
+        pkt += '61736466' # "asdf"
+        if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+            raise Exception("DATA_TEST_FRAME failed")
+
+        data, addr = s.recvfrom(1024)
+        if data != 'asdf':
+            raise Exception("invalid data received")
+
+        open(procfile, 'w').write('1')
+        try:
+            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+                raise Exception("DATA_TEST_FRAME failed")
+
+            try:
+                print s.recvfrom(1024)
+                raise Exception("erroneously received frame!")
+            except socket.timeout:
+                # this is the expected behaviour
+                pass
+        finally:
+            open(procfile, 'w').write('0')
+
+def test_ip4_gtk_drop_bcast(devs, apdevs, params):
+    _test_ip4_gtk_drop(devs, apdevs, params, dst='ffffffffffff')
+
+def test_ip4_gtk_drop_mcast(devs, apdevs, params):
+    _test_ip4_gtk_drop(devs, apdevs, params, dst='ff0000000000')
+
+def _test_ip6_gtk_drop(devs, apdevs, params, dst):
+    require_under_vm()
+    dev = devs[0]
+    procfile = '/proc/sys/net/ipv6/conf/%s/drop_unicast_in_l2_multicast' % dev.ifname
+    if not os.path.exists(procfile):
+        raise HwsimSkip("kernel doesn't have capability")
+
+    ap_params = { 'ssid': 'open', 'channel': '5' }
+    hapd = hostapd.add_ap(apdevs[0]['ifname'], ap_params)
+    dev.connect('open', key_mgmt="NONE", scan_freq="2432")
+
+    with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True):
+        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
+        s.bind(("fdaa::1", 12345))
+        s.settimeout(0.1)
+
+        pkt = dst
+        pkt += hapd.own_addr().replace(':', '')
+        pkt += '86dd'
+        pkt += '60000000000c1140fdaa0000000000000000000000000002fdaa0000000000000000000000000001'
+        pkt += '30393039000cde31'
+        pkt += '61736466' # "asdf"
+        if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+            raise Exception("DATA_TEST_FRAME failed")
+
+        data, addr = s.recvfrom(1024)
+        if data != 'asdf':
+            raise Exception("invalid data received")
+
+        open(procfile, 'w').write('1')
+        try:
+            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+                raise Exception("DATA_TEST_FRAME failed")
+
+            try:
+                print s.recvfrom(1024)
+                raise Exception("erroneously received frame!")
+            except socket.timeout:
+                # this is the expected behaviour
+                pass
+        finally:
+            open(procfile, 'w').write('0')
+
+def test_ip6_gtk_drop_bcast(devs, apdevs, params):
+    _test_ip6_gtk_drop(devs, apdevs, params, dst='ffffffffffff')
+
+def test_ip6_gtk_drop_mcast(devs, apdevs, params):
+    _test_ip6_gtk_drop(devs, apdevs, params, dst='ff0000000000')
+
+def test_ip4_drop_gratuitous_arp(devs, apdevs, params):
+    require_under_vm()
+    dev = devs[0]
+    procfile = '/proc/sys/net/ipv4/conf/%s/drop_gratuitous_arp' % dev.ifname
+    if not os.path.exists(procfile):
+        raise HwsimSkip("kernel doesn't have capability")
+
+    ap_params = { 'ssid': 'open', 'channel': '5' }
+    hapd = hostapd.add_ap(apdevs[0]['ifname'], ap_params)
+    dev.connect('open', key_mgmt="NONE", scan_freq="2432")
+
+    with IPAssign(dev.ifname, '10.0.0.2/24'):
+        # add an entry that can be updated by gratuitous ARP
+        subprocess.call(['ip', 'neigh', 'add', '10.0.0.1', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
+        # wait for lock-time
+        time.sleep(1)
+        try:
+            ap_addr = hapd.own_addr()
+            cl_addr = dev.own_addr()
+            pkt = build_arp(cl_addr, ap_addr, 2, ap_addr, '10.0.0.1', ap_addr, '10.0.0.1')
+            pkt = binascii.hexlify(pkt)
+
+            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+                raise Exception("DATA_TEST_FRAME failed")
+
+            if not hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']):
+                raise Exception("gratuitous ARP frame failed to update")
+
+            subprocess.call(['ip', 'neigh', 'replace', '10.0.0.1', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
+            # wait for lock-time
+            time.sleep(1)
+
+            open(procfile, 'w').write('1')
+
+            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+                raise Exception("DATA_TEST_FRAME failed")
+
+            if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']):
+                raise Exception("gratuitous ARP frame updated erroneously")
+        finally:
+            subprocess.call(['ip', 'neigh', 'del', '10.0.0.1', 'dev', dev.ifname])
+            open(procfile, 'w').write('0')
+
+def test_ip6_drop_unsolicited_na(devs, apdevs, params):
+    require_under_vm()
+    dev = devs[0]
+    procfile = '/proc/sys/net/ipv6/conf/%s/drop_unsolicited_na' % dev.ifname
+    if not os.path.exists(procfile):
+        raise HwsimSkip("kernel doesn't have capability")
+
+    ap_params = { 'ssid': 'open', 'channel': '5' }
+    hapd = hostapd.add_ap(apdevs[0]['ifname'], ap_params)
+    dev.connect('open', key_mgmt="NONE", scan_freq="2432")
+
+    with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True):
+        # add an entry that can be updated by unsolicited NA
+        subprocess.call(['ip', '-6', 'neigh', 'add', 'fdaa::2', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
+        try:
+            ap_addr = hapd.own_addr()
+            cl_addr = dev.own_addr()
+            pkt = build_na(ap_addr, 'fdaa::2', 'ff02::1', 'fdaa::2', flags=0x20,
+                           opt=binascii.unhexlify('0201' + ap_addr.replace(':', '')))
+            pkt = binascii.hexlify(pkt)
+
+            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+                raise Exception("DATA_TEST_FRAME failed")
+
+            if not hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']):
+                raise Exception("unsolicited NA frame failed to update")
+
+            subprocess.call(['ip', '-6', 'neigh', 'replace', 'fdaa::2', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
+
+            open(procfile, 'w').write('1')
+
+            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
+                raise Exception("DATA_TEST_FRAME failed")
+
+            if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']):
+                raise Exception("unsolicited NA frame updated erroneously")
+        finally:
+            subprocess.call(['ip', '-6', 'neigh', 'del', 'fdaa::2', 'dev', dev.ifname])
+            open(procfile, 'w').write('0')