From patchwork Fri Mar 11 16:24:38 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Vincent Whitchurch X-Patchwork-Id: 1604443 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: bilbo.ozlabs.org; dkim=pass (2048-bit key; secure) header.d=lists.infradead.org header.i=@lists.infradead.org header.a=rsa-sha256 header.s=bombadil.20210309 header.b=hX+68dRS; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=axis.com header.i=@axis.com header.a=rsa-sha256 header.s=axis-central1 header.b=fuSqmpYa; dkim-atps=neutral Authentication-Results: ozlabs.org; spf=none (no SPF record) smtp.mailfrom=lists.infradead.org (client-ip=2607:7c80:54:e::133; helo=bombadil.infradead.org; envelope-from=linux-um-bounces+incoming=patchwork.ozlabs.org@lists.infradead.org; receiver=) Received: from bombadil.infradead.org (bombadil.infradead.org [IPv6:2607:7c80:54:e::133]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by bilbo.ozlabs.org (Postfix) with ESMTPS id 4KFWW614xCz9sGG for ; Sat, 12 Mar 2022 03:25:22 +1100 (AEDT) DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=lists.infradead.org; s=bombadil.20210309; h=Sender: Content-Transfer-Encoding:Content-Type:List-Subscribe:List-Help:List-Post: List-Archive:List-Unsubscribe:List-Id:MIME-Version:References:In-Reply-To: Message-ID:Date:Subject:CC:To:From:Reply-To:Content-ID:Content-Description: Resent-Date:Resent-From:Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID: List-Owner; bh=RhNC6yDOLU6R9p7joIWDdYVsYGJF4cs3r94CjQBDPtE=; b=hX+68dRSYcFyw/ xBagJpvfJJpEgZI4TGu9tarO1juMDYSQ5JrNyZz+ArOsYbgw979s4VJ3+zGAwmnEZycQovfeo3gVC b5Bnt01q3QWplg5wNOlU9XSN73zRguL2aBWFvJg12sNXm+Un06faYHJqQGFPArGvJ9NdFcRLwu51m GjYMEtdmofaV6QYLSHOYJ6qHM709ZKxrvN00pdOmNU3Kcs5FdWvz6AKFhJXRu7qz9vulN2/6siQg0 8G72jGCRrS+AiHhVckxbsMrKTpslO+Uhk6fJocDhsAN/D3+w2Zi2gxu4Eo6MgB9WtJ+E/KE3t4kTM +QfmpmJhxd/tA6vKyFYw==; Received: from localhost ([::1] helo=bombadil.infradead.org) by bombadil.infradead.org with esmtp (Exim 4.94.2 #2 (Red Hat Linux)) id 1nSi4l-00HKpy-16; Fri, 11 Mar 2022 16:25:11 +0000 Received: from smtp2.axis.com ([195.60.68.18]) by bombadil.infradead.org with esmtps (Exim 4.94.2 #2 (Red Hat Linux)) id 1nSi4Z-00HKlM-Jt for linux-um@lists.infradead.org; Fri, 11 Mar 2022 16:25:09 +0000 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=axis.com; q=dns/txt; s=axis-central1; t=1647015899; x=1678551899; h=from:to:cc:subject:date:message-id:in-reply-to: references:mime-version:content-transfer-encoding; bh=WEyVVHDFtozxXKM8qFJKp+Cn8U7LcuaL3JvjOEc4jok=; b=fuSqmpYaroINUg/xN8Z6GZCdOh9uimTCEg7dgKiQ5g6xznVo3i4e/CwG GZ+RBKgEopbVMwgNO4blB82j7KTJ8x9WgcYiSouGrVMvFSBnxgapr0zVW 0oV36RsYGvrySwOuHOGcgWWkwKC33XqXBlT7FI4EwS90KNE2X35kPRRTx +oTQWh5FynE9+ANxN82EhqO42O9zhZW/6Yg0u8TiPDnsI934CE/oST+Yp yHT0xIg4FlRKbXMVFvnfDl9aZE27uZH3q+QvJ3QOvqueWc2FpUzsKNRve eu/5Kbpiwbzj/M7LUay7wy18l+UQCqCQb7W8pEgOJl0MRmgErOmotYi4D A==; From: Vincent Whitchurch To: CC: , Vincent Whitchurch , , , , , , , , , , , , , , Subject: [RFC v1 03/10] roadtest: add framework Date: Fri, 11 Mar 2022 17:24:38 +0100 Message-ID: <20220311162445.346685-4-vincent.whitchurch@axis.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220311162445.346685-1-vincent.whitchurch@axis.com> References: <20220311162445.346685-1-vincent.whitchurch@axis.com> MIME-Version: 1.0 X-CRM114-Version: 20100106-BlameMichelson ( TRE 0.8.0 (BSD) ) MR-646709E3 X-CRM114-CacheID: sfid-20220311_082500_427808_A3FBAE8A X-CRM114-Status: GOOD ( 17.71 ) X-Spam-Score: -2.5 (--) X-Spam-Report: Spam detection software, running on the system "bombadil.infradead.org", has NOT identified this incoming email as spam. The original message has been attached to this so you can view it or label similar future email. If you have any questions, see the administrator of that system for details. Content preview: Add the bulk of the roadtest framework. Apart from one init shell script, this is written in Python and includes three closely-related parts: - The test runner which is invoked from the command line by the user and which starts the backend and sends the test jobs and results to/from UML. Content analysis details: (-2.5 points, 5.0 required) pts rule name description ---- ---------------------- -------------------------------------------------- -2.3 RCVD_IN_DNSWL_MED RBL: Sender listed at https://www.dnswl.org/, medium trust [195.60.68.18 listed in list.dnswl.org] -0.0 SPF_PASS SPF: sender matches SPF record -0.0 SPF_HELO_PASS SPF: HELO matches SPF record -0.1 DKIM_VALID_AU Message has a valid DKIM or DK signature from author's domain -0.1 DKIM_VALID Message has at least one valid DKIM or DK signature 0.1 DKIM_SIGNED Message has a DKIM or DK signature, not necessarily valid -0.1 DKIM_VALID_EF Message has a valid DKIM or DK signature from envelope-from domain X-BeenThere: linux-um@lists.infradead.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: "linux-um" Errors-To: linux-um-bounces+incoming=patchwork.ozlabs.org@lists.infradead.org Add the bulk of the roadtest framework. Apart from one init shell script, this is written in Python and includes three closely-related parts: - The test runner which is invoked from the command line by the user and which starts the backend and sends the test jobs and results to/from UML. - Test support code which is used by the actual driver tests run inside UML and which interact with the backend via a file-based asynchronous communication method. - The backend which is run by the Python interpreter embedded in the C backend. This part runs the hardware models and is controlled by the tests and the driver (via virtio in the C backend). Some unit tests for the framework itself are included and these will be automatically run whenever the driver tests are run. Signed-off-by: Vincent Whitchurch --- tools/testing/roadtest/init.sh | 19 ++ tools/testing/roadtest/roadtest/__init__.py | 2 + .../roadtest/roadtest/backend/__init__.py | 0 .../roadtest/roadtest/backend/backend.py | 32 ++ .../testing/roadtest/roadtest/backend/gpio.py | 111 +++++++ .../testing/roadtest/roadtest/backend/i2c.py | 123 ++++++++ .../testing/roadtest/roadtest/backend/main.py | 13 + .../testing/roadtest/roadtest/backend/mock.py | 20 ++ .../roadtest/roadtest/backend/test_gpio.py | 98 ++++++ .../roadtest/roadtest/backend/test_i2c.py | 84 +++++ .../testing/roadtest/roadtest/cmd/__init__.py | 0 tools/testing/roadtest/roadtest/cmd/main.py | 146 +++++++++ tools/testing/roadtest/roadtest/cmd/remote.py | 48 +++ .../roadtest/roadtest/core/__init__.py | 0 .../testing/roadtest/roadtest/core/control.py | 52 ++++ .../roadtest/roadtest/core/devicetree.py | 155 ++++++++++ .../roadtest/roadtest/core/hardware.py | 94 ++++++ tools/testing/roadtest/roadtest/core/log.py | 42 +++ .../testing/roadtest/roadtest/core/modules.py | 38 +++ .../testing/roadtest/roadtest/core/opslog.py | 35 +++ tools/testing/roadtest/roadtest/core/proxy.py | 48 +++ tools/testing/roadtest/roadtest/core/suite.py | 286 ++++++++++++++++++ tools/testing/roadtest/roadtest/core/sysfs.py | 77 +++++ .../roadtest/roadtest/core/test_control.py | 35 +++ .../roadtest/roadtest/core/test_devicetree.py | 31 ++ .../roadtest/roadtest/core/test_hardware.py | 41 +++ .../roadtest/roadtest/core/test_log.py | 54 ++++ .../roadtest/roadtest/core/test_opslog.py | 27 ++ .../roadtest/roadtest/tests/__init__.py | 0 29 files changed, 1711 insertions(+) create mode 100755 tools/testing/roadtest/init.sh create mode 100644 tools/testing/roadtest/roadtest/__init__.py create mode 100644 tools/testing/roadtest/roadtest/backend/__init__.py create mode 100644 tools/testing/roadtest/roadtest/backend/backend.py create mode 100644 tools/testing/roadtest/roadtest/backend/gpio.py create mode 100644 tools/testing/roadtest/roadtest/backend/i2c.py create mode 100644 tools/testing/roadtest/roadtest/backend/main.py create mode 100644 tools/testing/roadtest/roadtest/backend/mock.py create mode 100644 tools/testing/roadtest/roadtest/backend/test_gpio.py create mode 100644 tools/testing/roadtest/roadtest/backend/test_i2c.py create mode 100644 tools/testing/roadtest/roadtest/cmd/__init__.py create mode 100644 tools/testing/roadtest/roadtest/cmd/main.py create mode 100644 tools/testing/roadtest/roadtest/cmd/remote.py create mode 100644 tools/testing/roadtest/roadtest/core/__init__.py create mode 100644 tools/testing/roadtest/roadtest/core/control.py create mode 100644 tools/testing/roadtest/roadtest/core/devicetree.py create mode 100644 tools/testing/roadtest/roadtest/core/hardware.py create mode 100644 tools/testing/roadtest/roadtest/core/log.py create mode 100644 tools/testing/roadtest/roadtest/core/modules.py create mode 100644 tools/testing/roadtest/roadtest/core/opslog.py create mode 100644 tools/testing/roadtest/roadtest/core/proxy.py create mode 100644 tools/testing/roadtest/roadtest/core/suite.py create mode 100644 tools/testing/roadtest/roadtest/core/sysfs.py create mode 100644 tools/testing/roadtest/roadtest/core/test_control.py create mode 100644 tools/testing/roadtest/roadtest/core/test_devicetree.py create mode 100644 tools/testing/roadtest/roadtest/core/test_hardware.py create mode 100644 tools/testing/roadtest/roadtest/core/test_log.py create mode 100644 tools/testing/roadtest/roadtest/core/test_opslog.py create mode 100644 tools/testing/roadtest/roadtest/tests/__init__.py diff --git a/tools/testing/roadtest/init.sh b/tools/testing/roadtest/init.sh new file mode 100755 index 000000000000..c5fb28478aa3 --- /dev/null +++ b/tools/testing/roadtest/init.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# SPDX-License-Identifier: GPL-2.0-only + +mount -t proc proc /proc +echo 8 > /proc/sys/kernel/printk +mount -t sysfs nodev /sys +mount -t debugfs nodev /sys/kernel/debug + +echo 0 > /sys/bus/i2c/drivers_autoprobe +echo 0 > /sys/bus/platform/drivers_autoprobe + +python3 -m roadtest.cmd.remote +status=$? +[ "${ROADTEST_SHELL}" = "1" ] || { + # rsync doesn't handle these zero-sized files correctly. + cp -ra --no-preserve=ownership /sys/kernel/debug/gcov ${ROADTEST_WORK_DIR}/gcov + echo o > /proc/sysrq-trigger +} +exec setsid sh -c 'exec bash /dev/tty0 2>&1' diff --git a/tools/testing/roadtest/roadtest/__init__.py b/tools/testing/roadtest/roadtest/__init__.py new file mode 100644 index 000000000000..dac3ce6976e5 --- /dev/null +++ b/tools/testing/roadtest/roadtest/__init__.py @@ -0,0 +1,2 @@ +ENV_WORK_DIR = "ROADTEST_WORK_DIR" +ENV_BUILD_DIR = "ROADTEST_BUILD_DIR" diff --git a/tools/testing/roadtest/roadtest/backend/__init__.py b/tools/testing/roadtest/roadtest/backend/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tools/testing/roadtest/roadtest/backend/backend.py b/tools/testing/roadtest/roadtest/backend/backend.py new file mode 100644 index 000000000000..bfd19fc363c2 --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/backend.py @@ -0,0 +1,32 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import logging +import os +from pathlib import Path + +from roadtest import ENV_WORK_DIR +from roadtest.core.control import ControlReader + +from . import gpio, i2c, mock + +logger = logging.getLogger(__name__) + +try: + import cbackend # type: ignore[import] +except ModuleNotFoundError: + # In unit tests + cbackend = None + + +class Backend: + def __init__(self) -> None: + work = Path(os.environ[ENV_WORK_DIR]) + self.control = ControlReader(work_dir=work) + self.c = cbackend + self.i2c = i2c.I2CBackend(self) + self.gpio = gpio.GpioBackend(self) + self.mock = mock.MockBackend(work) + + def process_control(self) -> None: + self.control.process({"backend": self}) diff --git a/tools/testing/roadtest/roadtest/backend/gpio.py b/tools/testing/roadtest/roadtest/backend/gpio.py new file mode 100644 index 000000000000..2eaf52b31c72 --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/gpio.py @@ -0,0 +1,111 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import logging +import typing +from typing import Optional + +if typing.TYPE_CHECKING: + # Avoid circular imports + from .backend import Backend + +logger = logging.getLogger(__name__) + + +class Gpio: + IRQ_TYPE_NONE = 0x00 + IRQ_TYPE_EDGE_RISING = 0x01 + IRQ_TYPE_EDGE_FALLING = 0x02 + IRQ_TYPE_EDGE_BOTH = 0x03 + IRQ_TYPE_LEVEL_HIGH = 0x04 + IRQ_TYPE_LEVEL_LOW = 0x08 + + def __init__(self, backend: "Backend", pin: int): + self.backend = backend + self.pin = pin + self.state = False + self.irq_type = Gpio.IRQ_TYPE_NONE + self.masked = True + self.edge_irq_latched = False + + def _level_irq_active(self) -> bool: + if self.irq_type == Gpio.IRQ_TYPE_LEVEL_HIGH: + return self.state + elif self.irq_type == Gpio.IRQ_TYPE_LEVEL_LOW: + return not self.state + + return False + + def _latch_edge_irq(self, old: bool, new: bool) -> bool: + if old != new: + logger.debug(f"{self}: latch_edge_irq {self.irq_type} {old} -> {new}") + + if self.irq_type == Gpio.IRQ_TYPE_EDGE_RISING: + return not old and new + elif self.irq_type == Gpio.IRQ_TYPE_EDGE_FALLING: + return old and not new + elif self.irq_type == Gpio.IRQ_TYPE_EDGE_BOTH: + return old != new + + return False + + def _check_irq(self) -> None: + if self.irq_type == Gpio.IRQ_TYPE_NONE or self.masked: + return + if not self.edge_irq_latched and not self._level_irq_active(): + return + + self.masked = True + self.edge_irq_latched = False + + logger.debug(f"{self}: trigger irq") + self.backend.c.trigger_gpio_irq(self.pin) + + def set_irq_type(self, irq_type: int) -> None: + logger.debug(f"{self}: set_irq_type {irq_type}") + if irq_type == Gpio.IRQ_TYPE_NONE: + self.masked = True + + self.irq_type = irq_type + self.edge_irq_latched = False + self._check_irq() + + def unmask(self) -> None: + logger.debug(f"{self}: unmask") + self.masked = False + self._check_irq() + + def set(self, val: int) -> None: + old = self.state + new = bool(val) + + if old != new: + logger.debug(f"{self}: gpio set {old} -> {new}") + + self.state = new + if self._latch_edge_irq(old, new): + logger.debug(f"{self}: latching edge") + self.edge_irq_latched = True + + self._check_irq() + + def __str__(self) -> str: + return f"Gpio({self.pin})" + + +class GpioBackend: + def __init__(self, backend: "Backend") -> None: + self.backend = backend + self.gpios = [Gpio(backend, pin) for pin in range(64)] + + def set(self, pin: Optional[int], val: bool) -> None: + if pin is None: + return + + self.gpios[pin].set(val) + + def set_irq_type(self, pin: int, irq_type: int) -> None: + self.gpios[pin].set_irq_type(irq_type) + + def unmask(self, pin: int) -> None: + self.gpios[pin].unmask() diff --git a/tools/testing/roadtest/roadtest/backend/i2c.py b/tools/testing/roadtest/roadtest/backend/i2c.py new file mode 100644 index 000000000000..b877c2b76851 --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/i2c.py @@ -0,0 +1,123 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import abc +import importlib +import logging +import typing +from typing import Any, Literal, Optional + +if typing.TYPE_CHECKING: + # Avoid circular imports + from .backend import Backend + +logger = logging.getLogger(__name__) + + +class I2CBackend: + def __init__(self, backend: "Backend") -> None: + self.model: Optional[I2CModel] = None + self.backend = backend + + def load_model(self, modname: str, clsname: str, *args: Any, **kwargs: Any) -> None: + mod = importlib.import_module(modname) + cls = getattr(mod, clsname) + self.model = cls(*args, **kwargs, backend=self.backend) + + def unload_model(self) -> None: + self.model = None + + def read(self, length: int) -> bytes: + if not self.model: + raise Exception("No I2C model loaded") + + return self.model.read(length) + + def write(self, data: bytes) -> None: + if not self.model: + raise Exception("No I2C model loaded") + + self.model.write(data) + + def __getattr__(self, name: str) -> Any: + return getattr(self.model, name) + + +class I2CModel(abc.ABC): + def __init__(self, backend: "Backend") -> None: + self.backend = backend + + @abc.abstractmethod + def read(self, length: int) -> bytes: + return bytes(length) + + @abc.abstractmethod + def write(self, data: bytes) -> None: + pass + + +class SMBusModel(I2CModel): + def __init__( + self, + regbytes: int, + byteorder: Literal["little", "big"] = "little", + *args: Any, + **kwargs: Any, + ) -> None: + super().__init__(*args, **kwargs) + self.reg_addr = 0x0 + self.regbytes = regbytes + self.byteorder = byteorder + + @abc.abstractmethod + def reg_read(self, addr: int) -> int: + return 0 + + @abc.abstractmethod + def reg_write(self, addr: int, val: int) -> None: + pass + + def val_to_bytes(self, val: int) -> bytes: + return val.to_bytes(self.regbytes, self.byteorder) + + def bytes_to_val(self, data: bytes) -> int: + return int.from_bytes(data, self.byteorder) + + def read(self, length: int) -> bytes: + data = bytearray() + for idx in range(0, length, self.regbytes): + addr = self.reg_addr + idx + val = self.reg_read(addr) + logger.debug(f"SMBus read {addr=:#02x} {val=:#02x}") + data += self.val_to_bytes(val) + return bytes(data) + + def write(self, data: bytes) -> None: + self.reg_addr = data[0] + + if len(data) > 1: + length = len(data) - 1 + data = data[1:] + assert length % self.regbytes == 0 + for idx in range(0, length, self.regbytes): + val = self.bytes_to_val(data[idx : (idx + self.regbytes)]) + addr = self.reg_addr + idx + self.backend.mock.reg_write(addr, val) + self.reg_write(addr, val) + logger.debug(f"SMBus write {addr=:#02x} {val=:#02x}") + elif len(data) == 1: + pass + + +class SimpleSMBusModel(SMBusModel): + def __init__(self, regs: dict[int, int], **kwargs: Any) -> None: + super().__init__(**kwargs) + self.regs = regs + + def reg_read(self, addr: int) -> int: + val = self.regs[addr] + return val + + def reg_write(self, addr: int, val: int) -> None: + assert addr in self.regs + self.regs[addr] = val diff --git a/tools/testing/roadtest/roadtest/backend/main.py b/tools/testing/roadtest/roadtest/backend/main.py new file mode 100644 index 000000000000..25be86ded9ea --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/main.py @@ -0,0 +1,13 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import logging + +import roadtest.backend.backend + +logging.basicConfig( + format="%(asctime)s - %(levelname)s - %(name)s: %(message)s", level=logging.DEBUG +) + +backend = roadtest.backend.backend.Backend() +backend.process_control() diff --git a/tools/testing/roadtest/roadtest/backend/mock.py b/tools/testing/roadtest/roadtest/backend/mock.py new file mode 100644 index 000000000000..8ce33a6bc0f1 --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/mock.py @@ -0,0 +1,20 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import functools +from pathlib import Path +from typing import Any, Callable + +from roadtest.core.opslog import OpsLogWriter + + +class MockBackend: + def __init__(self, work: Path) -> None: + self.opslog = OpsLogWriter(work) + + @functools.cache + def __getattr__(self, name: str) -> Callable: + def func(*args: Any, **kwargs: Any) -> None: + self.opslog.write(f"mock.{name}(*{str(args)}, **{str(kwargs)})") + + return func diff --git a/tools/testing/roadtest/roadtest/backend/test_gpio.py b/tools/testing/roadtest/roadtest/backend/test_gpio.py new file mode 100644 index 000000000000..feffe4fb9625 --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/test_gpio.py @@ -0,0 +1,98 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import unittest +from unittest.mock import MagicMock + +from .gpio import Gpio + + +class TestGpio(unittest.TestCase): + def test_irq_low(self) -> None: + m = MagicMock() + gpio = Gpio(backend=m, pin=1) + + gpio.set_irq_type(Gpio.IRQ_TYPE_LEVEL_LOW) + m.c.trigger_gpio_irq.assert_not_called() + + gpio.unmask() + m.c.trigger_gpio_irq.assert_called_once_with(1) + m.c.trigger_gpio_irq.reset_mock() + + gpio.set(True) + gpio.unmask() + m.c.trigger_gpio_irq.assert_not_called() + + def test_irq_high(self) -> None: + m = MagicMock() + gpio = Gpio(backend=m, pin=2) + + gpio.set_irq_type(Gpio.IRQ_TYPE_LEVEL_HIGH) + gpio.unmask() + + m.c.trigger_gpio_irq.assert_not_called() + + gpio.set(True) + m.c.trigger_gpio_irq.assert_called_once_with(2) + m.c.trigger_gpio_irq.reset_mock() + + gpio.set(False) + gpio.unmask() + m.c.trigger_gpio_irq.assert_not_called() + + def test_irq_rising(self) -> None: + m = MagicMock() + gpio = Gpio(backend=m, pin=63) + + gpio.set_irq_type(Gpio.IRQ_TYPE_EDGE_RISING) + gpio.set(False) + gpio.set(True) + + m.c.trigger_gpio_irq.assert_not_called() + gpio.unmask() + m.c.trigger_gpio_irq.assert_called_once_with(63) + m.c.trigger_gpio_irq.reset_mock() + + gpio.set(False) + gpio.set(True) + + gpio.unmask() + m.c.trigger_gpio_irq.assert_called_once() + + def test_irq_falling(self) -> None: + m = MagicMock() + gpio = Gpio(backend=m, pin=0) + + gpio.set_irq_type(Gpio.IRQ_TYPE_EDGE_FALLING) + gpio.unmask() + gpio.set(False) + gpio.set(True) + m.c.trigger_gpio_irq.assert_not_called() + + gpio.set(False) + m.c.trigger_gpio_irq.assert_called_once_with(0) + m.c.trigger_gpio_irq.reset_mock() + + gpio.set(True) + gpio.set(False) + gpio.set(True) + gpio.unmask() + m.c.trigger_gpio_irq.assert_called_once() + + def test_irq_both(self) -> None: + m = MagicMock() + gpio = Gpio(backend=m, pin=32) + + gpio.set_irq_type(Gpio.IRQ_TYPE_EDGE_BOTH) + gpio.unmask() + gpio.set(False) + gpio.set(True) + m.c.trigger_gpio_irq.assert_called_once_with(32) + + gpio.set(False) + m.c.trigger_gpio_irq.assert_called_once_with(32) + m.c.trigger_gpio_irq.reset_mock() + + gpio.set(True) + gpio.unmask() + m.c.trigger_gpio_irq.assert_called_once_with(32) diff --git a/tools/testing/roadtest/roadtest/backend/test_i2c.py b/tools/testing/roadtest/roadtest/backend/test_i2c.py new file mode 100644 index 000000000000..eda4e1a4b80f --- /dev/null +++ b/tools/testing/roadtest/roadtest/backend/test_i2c.py @@ -0,0 +1,84 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import unittest +from typing import Any +from unittest.mock import MagicMock + +from .i2c import SimpleSMBusModel, SMBusModel + + +class DummyModel(SMBusModel): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.regs: dict[int, int] = {} + + def reg_read(self, addr: int) -> int: + return self.regs[addr] + + def reg_write(self, addr: int, val: int) -> None: + self.regs[addr] = val + + +class TestSMBusModel(unittest.TestCase): + def test_1(self) -> None: + m = DummyModel(regbytes=1, backend=MagicMock()) + + m.write(bytes([0x12, 0x34])) + m.write(bytes([0x13, 0xAB, 0xCD])) + + self.assertEqual(m.regs[0x12], 0x34) + self.assertEqual(m.regs[0x13], 0xAB) + self.assertEqual(m.regs[0x14], 0xCD) + + m.write(bytes([0x12])) + self.assertEqual(m.read(1), bytes([0x34])) + + m.write(bytes([0x12])) + self.assertEqual(m.read(3), bytes([0x34, 0xAB, 0xCD])) + + def test_2big(self) -> None: + m = DummyModel(regbytes=2, byteorder="big", backend=MagicMock()) + + m.write(bytes([0x12, 0x34, 0x56, 0xAB, 0xCD])) + self.assertEqual(m.regs[0x12], 0x3456) + self.assertEqual(m.regs[0x14], 0xABCD) + + m.write(bytes([0x12])) + self.assertEqual(m.read(2), bytes([0x34, 0x56])) + + m.write(bytes([0x14])) + self.assertEqual(m.read(2), bytes([0xAB, 0xCD])) + + m.write(bytes([0x12])) + self.assertEqual(m.read(4), bytes([0x34, 0x56, 0xAB, 0xCD])) + + def test_2little(self) -> None: + m = DummyModel(regbytes=2, byteorder="little", backend=MagicMock()) + + m.write(bytes([0x12, 0x34, 0x56, 0xAB, 0xCD])) + self.assertEqual(m.regs[0x12], 0x5634) + self.assertEqual(m.regs[0x14], 0xCDAB) + + m.write(bytes([0x12])) + self.assertEqual(m.read(2), bytes([0x34, 0x56])) + + +class TestSimpleSMBusModel(unittest.TestCase): + def test_simple(self) -> None: + m = SimpleSMBusModel( + regs={0x01: 0x12, 0x02: 0x34}, + regbytes=1, + backend=MagicMock(), + ) + self.assertEqual(m.reg_read(0x01), 0x12) + self.assertEqual(m.reg_read(0x02), 0x34) + + m.reg_write(0x01, 0x56) + self.assertEqual(m.reg_read(0x01), 0x56) + self.assertEqual(m.reg_read(0x02), 0x34) + + with self.assertRaises(Exception): + m.reg_write(0x03, 0x00) + with self.assertRaises(Exception): + m.reg_read(0x03) diff --git a/tools/testing/roadtest/roadtest/cmd/__init__.py b/tools/testing/roadtest/roadtest/cmd/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tools/testing/roadtest/roadtest/cmd/main.py b/tools/testing/roadtest/roadtest/cmd/main.py new file mode 100644 index 000000000000..634c27fe795c --- /dev/null +++ b/tools/testing/roadtest/roadtest/cmd/main.py @@ -0,0 +1,146 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import argparse +import fnmatch +import sys +import unittest +from typing import Optional +from unittest.suite import TestSuite + +assert sys.version_info >= (3, 9), "Python version is too old" + +from roadtest.core.suite import UMLSuite, UMLTestCase + + +def make_umlsuite(args: argparse.Namespace) -> UMLSuite: + return UMLSuite( + timeout=args.timeout, + workdir=args.work_dir, + builddir=args.build_dir, + ksrcdir=args.ksrc_dir, + uml_args_pre=args.uml_prepend, + uml_args_post=args.uml_append, + shell=args.shell, + ) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "--timeout", + type=int, + default=60, + help="Timeout (in seconds) for each UML run, 0 to disable", + ) + parser.add_argument("--work-dir", type=str, help="Work directory for UML runs") + parser.add_argument("--build-dir", type=str, required=True) + parser.add_argument("--ksrc-dir", type=str, required=True) + parser.add_argument( + "--uml-prepend", + nargs="*", + default=[], + help="Extra arguments to prepend to the UML command (example: gdbserver :1234)", + ) + parser.add_argument( + "--uml-append", + nargs="*", + default=[], + help="Extra arguments to append to the UML command (example: trace_event=i2c:* tp_printk)", + ) + parser.add_argument( + "--filter", + nargs="+", + default=[], + ) + parser.add_argument("--shell", action="store_true") + parser.add_argument("test", nargs="?", default="roadtest") + args = parser.parse_args() + + if args.shell: + args.timeout = 0 + + if not any(p.startswith("con=") for p in args.uml_append): + print( + "Error: --shell used but no con= UML argument specified", + file=sys.stderr, + ) + sys.exit(1) + + test = args.test + test = test.replace("/", ".") + test = test.removesuffix(".py") + test = test.removesuffix(".") + + loader = unittest.defaultTestLoader + suitegroups = loader.discover(test) + + args.filter = [f"*{f}*" for f in args.filter] + + # Backend tests and the like don't need to be run inside UML. + localsuite = None + + # For simplicity, we currently run all target tests in one UML instance + # since python in UML is slow to start up. This can be revisited if we + # want to run several UML instances in parallel. + deftargetsuite = None + targetsuites = [] + + for suites in suitegroups: + # unittest can in arbitrarily nest and mix TestCases + # and TestSuites, but we expect a fixed hierarchy. + assert isinstance(suites, unittest.TestSuite) + + for suite in suites: + # assert not isinstance(suite, unittest.TestCase) + + # If the import of a test fails, then suite is a + # unittest.loader._FailedTest instead of a suite + if not isinstance(suite, unittest.TestSuite): + suite = [suite] # type: ignore[assignment] + + # Suite at this level contains one TestCase for each + # test method in a particular test class. + # + # All the test functions for one particular test class + # can only be run either in UML or locally, not mixed. + destsuite: Optional[TestSuite] = None + + for t in suite: # type: ignore[union-attr] + # We don't support suites nested at this level. + assert isinstance(t, unittest.TestCase) + + id = t.id() + if args.filter and not any(fnmatch.fnmatch(id, f) for f in args.filter): + continue + + if isinstance(t, UMLTestCase): + if t.run_separately: + if not destsuite: + destsuite = make_umlsuite(args) + targetsuites.append(destsuite) + else: + if not deftargetsuite: + deftargetsuite = make_umlsuite(args) + targetsuites.append(deftargetsuite) + + destsuite = deftargetsuite + else: + if not localsuite: + localsuite = TestSuite() + destsuite = localsuite + + if destsuite: + destsuite.addTest(t) + + tests = unittest.TestSuite() + if localsuite: + tests.addTest(localsuite) + tests.addTests(targetsuites) + + result = unittest.TextTestRunner(verbosity=2).run(tests) + sys.exit(not result.wasSuccessful()) + + +if __name__ == "__main__": + main() diff --git a/tools/testing/roadtest/roadtest/cmd/remote.py b/tools/testing/roadtest/roadtest/cmd/remote.py new file mode 100644 index 000000000000..29c3c6d35c65 --- /dev/null +++ b/tools/testing/roadtest/roadtest/cmd/remote.py @@ -0,0 +1,48 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import importlib +import json +import os +from pathlib import Path +from typing import cast +from unittest import TestSuite, TextTestRunner + +from roadtest import ENV_WORK_DIR +from roadtest.core import proxy + + +def main() -> None: + workdir = Path(os.environ[ENV_WORK_DIR]) + with open(workdir / "tests.json") as f: + testinfos = json.load(f) + + suite = TestSuite() + for info in testinfos: + id = info["id"] + *modparts, clsname, method = id.split(".") + + fullname = ".".join(modparts) + mod = importlib.import_module(fullname) + + cls = getattr(mod, clsname) + test = cls(methodName=method) + + values = info["values"] + if values: + test.dts.values = values + + suite.addTest(test) + + runner = TextTestRunner( + verbosity=0, buffer=False, resultclass=proxy.ProxyTextTestResult + ) + result = cast(proxy.ProxyTextTestResult, runner.run(suite)) + + proxyresult = result.to_proxy() + with open(workdir / "results.json", "w") as f: + json.dump(proxyresult, f) + + +if __name__ == "__main__": + main() diff --git a/tools/testing/roadtest/roadtest/core/__init__.py b/tools/testing/roadtest/roadtest/core/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tools/testing/roadtest/roadtest/core/control.py b/tools/testing/roadtest/roadtest/core/control.py new file mode 100644 index 000000000000..cd74861099b9 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/control.py @@ -0,0 +1,52 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import logging +import os +from pathlib import Path +from typing import Optional + +from roadtest import ENV_WORK_DIR + +CONTROL_FILE = "control.txt" + +logger = logging.getLogger(__name__) + + +class ControlReader: + def __init__(self, work_dir: Optional[Path] = None) -> None: + if not work_dir: + work_dir = Path(os.environ[ENV_WORK_DIR]) + + path = work_dir / CONTROL_FILE + path.unlink(missing_ok=True) + path.write_text("") + + self.file = path.open("r") + + def process(self, vars: dict) -> None: + for line in self.file.readlines(): + cmd = line.rstrip() + + if cmd.startswith("# "): + logger.info(line[2:].rstrip()) + continue + + logger.debug(cmd) + eval(cmd, vars) + + +class ControlWriter: + def __init__(self, work_dir: Optional[Path] = None) -> None: + if not work_dir: + work_dir = Path(os.environ[ENV_WORK_DIR]) + self.file = (work_dir / CONTROL_FILE).open("a", buffering=1) + + def write_cmd(self, line: str) -> None: + self.file.write(line + "\n") + + def write_log(self, line: str) -> None: + self.file.write(f"# {line}\n") + + def close(self) -> None: + self.file.close() diff --git a/tools/testing/roadtest/roadtest/core/devicetree.py b/tools/testing/roadtest/roadtest/core/devicetree.py new file mode 100644 index 000000000000..40876738fb39 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/devicetree.py @@ -0,0 +1,155 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import enum +import subprocess +from pathlib import Path +from typing import Any, Optional + +HEADER = """ +/dts-v1/; + +/ { + #address-cells = <2>; + #size-cells = <2>; + + virtio@0 { + compatible = "virtio,uml"; + socket-path = "WORK/gpio.sock"; + virtio-device-id = <0x29>; + + gpio: gpio { + compatible = "virtio,device29"; + + gpio-controller; + #gpio-cells = <2>; + + interrupt-controller; + #interrupt-cells = <2>; + }; + }; + + virtio@1 { + compatible = "virtio,uml"; + socket-path = "WORK/i2c.sock"; + virtio-device-id = <0x22>; + + i2c: i2c { + compatible = "virtio,device22"; + + #address-cells = <1>; + #size-cells = <0>; + }; + }; + + // See Hardware.kick() + leds { + compatible = "gpio-leds"; + led0 { + gpios = <&gpio 0 0>; + }; + }; +}; +""" + + +class DtVar(enum.Enum): + I2C_ADDR = 0 + GPIO_PIN = 1 + + +class DtFragment: + def __init__(self, src: str, variables: Optional[dict[str, DtVar]] = None) -> None: + self.src = src + if not variables: + variables = {} + self.variables = variables + self.values: dict[str, int] = {} + + def apply(self, values: dict[str, Any]) -> str: + src = self.src + + for var in self.variables.keys(): + typ = self.variables[var] + val = values[var] + + if typ == DtVar.I2C_ADDR: + str = f"{val:02x}" + elif typ == DtVar.GPIO_PIN: + str = f"{val:d}" + + src = src.replace(f"${var}$", str) + + self.values = values + return src + + def __getitem__(self, key: str) -> Any: + return self.values[key] + + +class Devicetree: + def __init__(self, workdir: Path, ksrcdir: Path) -> None: + self.workdir: Path = workdir + self.ksrcdir: Path = ksrcdir + self.next_i2c_addr: int = 0x1 + # 0 is used for gpio-leds for Hardware.kick() + self.next_gpio_pin: int = 1 + self.src: str = "" + + def assemble(self, fragments: list[DtFragment]) -> None: + parts = [] + for fragment in fragments: + if fragment.values: + # Multiple test functions from the same class will use + # the same class instance + continue + + values = {} + + for var, type in fragment.variables.items(): + if type == DtVar.I2C_ADDR: + values[var] = self.next_i2c_addr + self.next_i2c_addr += 1 + elif type == DtVar.GPIO_PIN: + values[var] = self.next_gpio_pin + self.next_gpio_pin += 1 + + parts.append(fragment.apply(values)) + + self.src = "\n".join(parts) + + def compile(self, dtb: str) -> None: + dts = self.workdir / "test.dts" + + try: + subprocess.run( + [ + "gcc", + "-E", + "-nostdinc", + f"-I{self.ksrcdir}/scripts/dtc/include-prefixes", + "-undef", + "-D__DTS__", + "-x", + "assembler-with-cpp", + "-o", + dts, + "-", + ], + input=self.src, + text=True, + check=True, + capture_output=True, + ) + + full = HEADER.replace("WORK", str(self.workdir)) + dts.read_text() + dts.write_text(full) + + subprocess.run( + ["dtc", "-I", "dts", "-O", "dtb", dts, "-o", self.workdir / dtb], + check=True, + capture_output=True, + text=True, + ) + except subprocess.CalledProcessError as e: + raise Exception(f"{e.stderr}") diff --git a/tools/testing/roadtest/roadtest/core/hardware.py b/tools/testing/roadtest/roadtest/core/hardware.py new file mode 100644 index 000000000000..ae81a531d2a2 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/hardware.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import contextlib +import functools +import os +from pathlib import Path +from typing import Any, Callable, Optional, Type, cast +from unittest import TestCase +from unittest.mock import MagicMock, call + +from roadtest import ENV_WORK_DIR + +from .control import ControlWriter +from .opslog import OpsLogReader +from .sysfs import write_int + + +class HwMock(MagicMock): + def assert_reg_write_once(self, test: TestCase, reg: int, value: int) -> None: + test.assertEqual( + [c for c in self.mock_calls if c.args[0] == reg], + [call.reg_write(reg, value)], + ) + + def assert_last_reg_write(self, test: TestCase, reg: int, value: int) -> None: + test.assertEqual( + [c for c in self.mock_calls if c.args[0] == reg][-1:], + [call.reg_write(reg, value)], + ) + + def get_last_reg_write(self, reg: int) -> int: + return cast(int, [c for c in self.mock_calls if c.args[0] == reg][-1].args[1]) + + +class Hardware(contextlib.AbstractContextManager): + def __init__(self, bus: str, work: Optional[Path] = None) -> None: + if not work: + work = Path(os.environ[ENV_WORK_DIR]) + + self.bus = bus + self.mock = HwMock() + self.control = ControlWriter(work) + self.opslog = OpsLogReader(work) + self.loaded_model = False + + # Ignore old entries + self.opslog.read_next() + + def _call(self, method: str, *args: Any, **kwargs: Any) -> None: + self.control.write_cmd( + f"backend.{self.bus}.{method}(*{str(args)}, **{str(kwargs)})" + ) + + def kick(self) -> None: + # Control writes are only applied when the backend gets something + # to process, usually because the driver tried to access the device. + # But in some cases, such as when the driver is waiting for a + # sequence of interrupts, the test code needs the control write to take + # effect immediately. For this, we just need to kick the backend + # into processing its control queue. + # + # We (ab)use gpio-leds for this. devicetree.py sets up the device. + write_int(Path("/sys/class/leds/led0/brightness"), 0) + + def load_model(self, cls: Type[Any], *args: Any, **kwargs: Any) -> "Hardware": + self._call("load_model", cls.__module__, cls.__name__, *args, **kwargs) + self.loaded_model = True + return self + + def __enter__(self) -> "Hardware": + return self + + def __exit__(self, *_: Any) -> None: + self.close() + + @functools.cache + def __getattr__(self, name: str) -> Callable: + def func(*args: Any, **kwargs: Any) -> None: + self._call(name, *args, **kwargs) + + return func + + def close(self) -> None: + if self.loaded_model: + self._call("unload_model") + self.control.close() + + def update_mock(self) -> HwMock: + opslog = self.opslog.read_next() + for line in opslog: + eval(line, {"mock": self.mock}) + + return self.mock diff --git a/tools/testing/roadtest/roadtest/core/log.py b/tools/testing/roadtest/roadtest/core/log.py new file mode 100644 index 000000000000..7d73e40eb2d8 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/log.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +from pathlib import Path + + +class LogParser: + DNF_MESSAGE = "" + + def __init__(self, file: Path): + try: + raw = file.read_text() + lines = raw.splitlines() + except FileNotFoundError: + lines = [] + raw = "" + + self.raw = raw + self.lines = lines + + def has_any(self) -> bool: + return "START<" in self.raw + + def get_testcase_log(self, id: str) -> list[str]: + startmarker = f"START<{id}>" + stopmarker = f"STOP<{id}>" + + try: + startpos = next( + i for i, line in enumerate(self.lines) if startmarker in line + ) + except StopIteration: + return [] + + try: + stoppos = next( + i for i, line in enumerate(self.lines[startpos:]) if stopmarker in line + ) + except StopIteration: + return self.lines[startpos + 1 :] + [LogParser.DNF_MESSAGE] + + return self.lines[startpos + 1 : startpos + stoppos] diff --git a/tools/testing/roadtest/roadtest/core/modules.py b/tools/testing/roadtest/roadtest/core/modules.py new file mode 100644 index 000000000000..5bd2d92a322b --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/modules.py @@ -0,0 +1,38 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import os +import subprocess +from pathlib import Path +from typing import Any + +from roadtest import ENV_BUILD_DIR + + +def modprobe(modname: str, remove: bool = False) -> None: + moddir = Path(os.environ[ENV_BUILD_DIR]) / "modules" + args = [] + if remove: + args.append("--remove") + args += [f"--dirname={moddir}", modname] + subprocess.check_output(["/sbin/modprobe"] + args) + + +def insmod(modname: str) -> None: + modprobe(modname) + + +def rmmod(modname: str) -> None: + subprocess.check_output(["/sbin/rmmod", modname]) + + +class Module: + def __init__(self, name: str) -> None: + self.name = name + + def __enter__(self) -> "Module": + modprobe(self.name) + return self + + def __exit__(self, *_: Any) -> None: + rmmod(self.name) diff --git a/tools/testing/roadtest/roadtest/core/opslog.py b/tools/testing/roadtest/roadtest/core/opslog.py new file mode 100644 index 000000000000..83bb4f525d03 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/opslog.py @@ -0,0 +1,35 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import os +from pathlib import Path + +OPSLOG_FILE = "opslog.txt" + + +class OpsLogWriter: + def __init__(self, work: Path) -> None: + path = work / OPSLOG_FILE + path.unlink(missing_ok=True) + self.file = open(path, "a", buffering=1) + + def write(self, line: str) -> None: + self.file.write(line + "\n") + + +class OpsLogReader: + def __init__(self, work: Path) -> None: + self.path = work / OPSLOG_FILE + self.opslogpos = 0 + + def read_next(self) -> list[str]: + # There is a problem in hostfs (see Hostfs Caveats) which means + # that reads from UML on a file which is extended on the host don't see + # the new data unless we open and close the file, so we can't open once + # and use readlines(). + with open(self.path, "r") as f: + os.lseek(f.fileno(), self.opslogpos, os.SEEK_SET) + opslog = [line.rstrip() for line in f.readlines()] + self.opslogpos = os.lseek(f.fileno(), 0, os.SEEK_CUR) + + return opslog diff --git a/tools/testing/roadtest/roadtest/core/proxy.py b/tools/testing/roadtest/roadtest/core/proxy.py new file mode 100644 index 000000000000..36089e21d7d5 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/proxy.py @@ -0,0 +1,48 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +from typing import Any +from unittest import TestCase, TextTestResult + +from . import control + + +class ProxyTextTestResult(TextTestResult): + def __init__(self, stream: Any, descriptions: Any, verbosity: Any) -> None: + super().__init__(stream, descriptions, verbosity) + self.successes: list[tuple[TestCase, str]] = [] + + # Print via kmsg to avoid getting cut off by other kernel prints. + self.kmsg = open("/dev/kmsg", "w", buffering=1) + self.control = control.ControlWriter() + + def addSuccess(self, test: TestCase) -> None: + super().addSuccess(test) + self.successes.append((test, "")) + + def _log(self, test: TestCase, action: str) -> None: + line = f"{action}<{test.id()}>" + self.kmsg.write(line + "\n") + self.control.write_log(line) + + def startTest(self, test: TestCase) -> None: + self._log(test, "START") + super().startTest(test) + + def stopTest(self, test: TestCase) -> None: + super().stopTest(test) + self._log(test, "STOP") + + def _replace_id(self, reslist: list[tuple[TestCase, str]]) -> list[tuple[str, str]]: + return [(case.id(), tb) for case, tb in reslist] + + def to_proxy(self) -> dict[str, Any]: + return { + "testsRun": self.testsRun, + "wasSuccessful": self.wasSuccessful(), + "successes": self._replace_id(self.successes), + "errors": self._replace_id(self.errors), + "failures": self._replace_id(self.failures), + "skipped": self._replace_id(self.skipped), + "unexpectedSuccesses": [t.id() for t in self.unexpectedSuccesses], + } diff --git a/tools/testing/roadtest/roadtest/core/suite.py b/tools/testing/roadtest/roadtest/core/suite.py new file mode 100644 index 000000000000..e99a60b4faba --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/suite.py @@ -0,0 +1,286 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import json +import os +import shlex +import signal +import subprocess +import textwrap +import unittest +from pathlib import Path +from typing import Any, ClassVar, Optional, Tuple, cast +from unittest import TestResult + +from roadtest import ENV_BUILD_DIR, ENV_WORK_DIR + +from . import devicetree +from .log import LogParser + + +class UMLTestCase(unittest.TestCase): + run_separately: ClassVar[bool] = False + dts: ClassVar[Optional[devicetree.DtFragment]] = None + + +class UMLSuite(unittest.TestSuite): + def __init__( + self, + timeout: int, + workdir: str, + builddir: str, + ksrcdir: str, + uml_args_pre: list[str], + uml_args_post: list[str], + shell: bool, + *args: Any, + **kwargs: Any, + ) -> None: + super().__init__(*args, **kwargs) + + self.timeout = timeout + self.workdir = Path(workdir).resolve() + self.builddir = Path(builddir) + self.ksrcdir = Path(ksrcdir) + self.uml_args_pre = uml_args_pre + self.uml_args_post = uml_args_post + self.shell = shell + + self.backendlog = self.workdir / "backend.txt" + self.umllog = self.workdir / "uml.txt" + + # Used from the roadtest.cmd.remote running inside UML + self.testfile = self.workdir / "tests.json" + self.resultfile = self.workdir / "results.json" + + def run( + self, result: unittest.TestResult, debug: bool = False + ) -> unittest.TestResult: + pwd = os.getcwd() + + os.makedirs(self.workdir, exist_ok=True) + workdir = self.workdir + + tests = cast(list[UMLTestCase], list(self)) + + os.environ[ENV_WORK_DIR] = str(workdir) + os.environ[ENV_BUILD_DIR] = str(self.builddir) + + dt = devicetree.Devicetree(workdir=workdir, ksrcdir=self.ksrcdir) + dt.assemble([test.dts for test in tests if test.dts]) + dt.compile("test.dtb") + + testinfos = [] + ids = [] + for t in tests: + id = t.id() + # This fixup is needed when discover is done starting from "roadtest" + if not id.startswith("roadtest."): + id = f"roadtest.{id}" + ids.append(id) + + testinfos.append({"id": id, "values": t.dts.values if t.dts else {}}) + + with self.testfile.open("w") as f: + json.dump(testinfos, f) + + uml_args = [ + str(self.builddir / "vmlinux"), + f"PYTHONPATH={pwd}", + f"{ENV_WORK_DIR}={workdir}", + f"{ENV_BUILD_DIR}={self.builddir}", + # Should be enough for anybody? + "mem=64M", + "dtb=test.dtb", + "rootfstype=hostfs", + "rw", + f"init={pwd}/init.sh", + f"uml_dir={workdir}", + "umid=uml", + # ProxyTextTestResult writes to /dev/kmsg + "printk.devkmsg=on", + "slub_debug", + # For ease of debugging + "no_hash_pointers", + ] + + if self.shell: + # See init.sh + uml_args += ["ROADTEST_SHELL=1"] + else: + # Set by slub_debug + TAINT_BAD_PAGE = 1 << 5 + uml_args += [ + # init.sh increases the loglevel after bootup. + "quiet", + "panic_on_warn=1", + f"panic_on_taint={TAINT_BAD_PAGE}", + "oops=panic", + # Speeds up delays, but as a consequence also causes + # 100% CPU consumption at an idle shell prompt. + "time-travel", + ] + + main_script = (Path(__file__).parent / "../backend/main.py").resolve() + + args = ( + [ + str(self.builddir / "roadtest-backend"), + # The socket locations are also present in the devicetree. + f"--gpio-socket={workdir}/gpio.sock", + f"--i2c-socket={workdir}/i2c.sock", + f"--main-script={main_script}", + "--", + ] + + self.uml_args_pre + + uml_args + + self.uml_args_post + ) + + print( + "Running backend/UML with: {}".format( + " ".join([shlex.quote(a) for a in args]) + ) + ) + + # Truncate instead of deleting so that tail -f can be used to monitor + # the log across runs. + self.backendlog.write_text("") + self.umllog.write_text("") + self.resultfile.unlink(missing_ok=True) + + umlpidfile = workdir / "uml/pid" + umlpidfile.unlink(missing_ok=True) + + newenv = dict(os.environ, PYTHONPATH=pwd) + + try: + process = None + with self.backendlog.open("w") as f: + process = subprocess.Popen( + args, + env=newenv, + stdin=subprocess.PIPE, + stdout=f, + stderr=subprocess.STDOUT, + text=True, + preexec_fn=os.setsid, + ) + process.wait(self.timeout if self.timeout else None) + except subprocess.TimeoutExpired: + pass + finally: + try: + if process: + os.killpg(process.pid, signal.SIGKILL) + except ProcessLookupError: + pass + try: + pid = int(umlpidfile.read_text()) + os.killpg(pid, signal.SIGKILL) + except (FileNotFoundError, ProcessLookupError): + pass + + if process and process.returncode is not None and process.returncode != 0: + with self.backendlog.open("a") as f: + f.write(f"\n") + + try: + with self.resultfile.open("r") as f: + proxy = json.load(f) + except FileNotFoundError: + # UML crashed, timed out, etc + proxy = None + + return self._convert_results(proxy, tests, result) + + def _parse_status(self, id: str, proxy: dict) -> Tuple[str, str]: + if not proxy: + return "ERROR", "No result. UML or backend crashed?\n" + + try: + _, tb = next(e for e in proxy["successes"] if e[0] == id) + return "ok", "" + except StopIteration: + pass + + try: + _, tb = next(e for e in proxy["errors"] if e[0] == id) + return "ERROR", tb + except StopIteration: + pass + + try: + _, tb = next(e for e in proxy["failures"] if e[0] == id) + return "FAIL", tb + except StopIteration: + pass + + # setupClass, etc + if proxy["errors"]: + _, tb = proxy["errors"][0] + return "ERROR", tb + + raise Exception("Unable to parse status") + + def _get_log( + self, name: str, parser: LogParser, id: str, full_if_none: bool + ) -> Optional[str]: + testloglines = parser.get_testcase_log(id) + tb = None + if testloglines: + tb = "\n".join([f"{name} log:"] + [" " + line for line in testloglines]) + elif full_if_none and not parser.has_any(): + if parser.raw: + tb = "\n".join( + [f"Full {name} log:", textwrap.indent(parser.raw, " ").rstrip()] + ) + else: + tb = f"\nNo {name} log found." + + return tb + + def _convert_results( + self, + proxy: dict, + tests: list[UMLTestCase], + result: TestResult, + ) -> TestResult: + umllog = LogParser(self.umllog) + backendlog = LogParser(self.backendlog) + + first_fail = True + for test in tests: + assert isinstance(test, unittest.TestCase) + + id = test.id() + if not id.startswith("roadtest."): + id = f"roadtest.{id}" + + status, tb = self._parse_status(id, proxy) + if status != "ok": + parts = [] + + backendtb = self._get_log("Backend", backendlog, id, first_fail) + if backendtb: + parts.append(backendtb) + + umltb = self._get_log("UML", umllog, id, first_fail) + if umltb: + parts.append(umltb) + + # In the case of no START/STOP markers at all in the logs, we include + # the full logs, but only do it in the first failing test case to + # reduce noise. + first_fail = False + tb = "\n\n".join(parts + [tb]) + + if status == "ERROR": + result.errors.append((test, tb)) + elif status == "FAIL": + result.failures.append((test, tb)) + + print(f"{test} ... {status}") + result.testsRun += 1 + + return result diff --git a/tools/testing/roadtest/roadtest/core/sysfs.py b/tools/testing/roadtest/roadtest/core/sysfs.py new file mode 100644 index 000000000000..64228978718e --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/sysfs.py @@ -0,0 +1,77 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import contextlib +from pathlib import Path +from typing import Iterator + + +# Path.write_text() is inappropriate since Python calls write(2) +# a second time if the first one returns an error, if the file +# was opened as text. +def write_str(path: Path, val: str) -> None: + path.write_bytes(val.encode()) + + +def write_int(path: Path, val: int) -> None: + write_str(path, str(val)) + + +def write_float(path: Path, val: float) -> None: + write_str(path, str(val)) + + +def read_str(path: Path) -> str: + return path.read_text().rstrip() + + +def read_int(path: Path) -> int: + return int(read_str(path)) + + +def read_float(path: Path) -> float: + return float(read_str(path)) + + +class I2CDevice: + def __init__(self, addr: int, bus: int = 0) -> None: + self.id = f"{bus}-{addr:04x}" + self.path = Path(f"/sys/bus/i2c/devices/{self.id}") + + +class PlatformDevice: + def __init__(self, name: str) -> None: + self.id = name + self.path = Path(f"/sys/bus/platform/devices/{self.id}") + + +class I2CDriver: + def __init__(self, driver: str) -> None: + self.driver = driver + self.path = Path(f"/sys/bus/i2c/drivers/{driver}") + + @contextlib.contextmanager + def bind(self, addr: int, bus: int = 0) -> Iterator[I2CDevice]: + dev = I2CDevice(addr, bus) + write_str(self.path / "bind", dev.id) + + try: + yield dev + finally: + write_str(self.path / "unbind", dev.id) + + +class PlatformDriver: + def __init__(self, driver: str) -> None: + self.driver = driver + self.path = Path(f"/sys/bus/platform/drivers/{driver}") + + @contextlib.contextmanager + def bind(self, addr: str) -> Iterator[PlatformDevice]: + dev = PlatformDevice(addr) + write_str(self.path / "bind", dev.id) + + try: + yield dev + finally: + write_str(self.path / "unbind", dev.id) diff --git a/tools/testing/roadtest/roadtest/core/test_control.py b/tools/testing/roadtest/roadtest/core/test_control.py new file mode 100644 index 000000000000..a8cf9105eb52 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/test_control.py @@ -0,0 +1,35 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest import TestCase + +from .control import ControlReader, ControlWriter + + +class TestControl(TestCase): + def test_control(self) -> None: + with TemporaryDirectory() as tmpdir: + work = Path(tmpdir) + reader = ControlReader(work) + writer = ControlWriter(work) + + values = [] + + def append(new: int) -> None: + nonlocal values + values.append(new) + + vars = {"append": append} + writer.write_cmd("append(1)") + + reader.process(vars) + self.assertEqual(values, [1]) + + writer.write_cmd("append(2)") + writer.write_log("append(4)") + writer.write_cmd("append(3)") + + reader.process(vars) + self.assertEqual(values, [1, 2, 3]) diff --git a/tools/testing/roadtest/roadtest/core/test_devicetree.py b/tools/testing/roadtest/roadtest/core/test_devicetree.py new file mode 100644 index 000000000000..db61fd24b39a --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/test_devicetree.py @@ -0,0 +1,31 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +import tempfile +import unittest +from pathlib import Path + +from . import devicetree + + +class TestDevicetree(unittest.TestCase): + def test_compile(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmpdir = Path(tmp) + # We don't have the ksrcdir so we can't test if includes work. + dt = devicetree.Devicetree(tmpdir, tmpdir) + + dt.assemble( + [ + devicetree.DtFragment( + src=""" +&i2c { + foo = <1>; +}; + """ + ) + ] + ) + dt.compile("test.dtb") + dtb = tmpdir / "test.dtb" + self.assertTrue((dtb).exists()) diff --git a/tools/testing/roadtest/roadtest/core/test_hardware.py b/tools/testing/roadtest/roadtest/core/test_hardware.py new file mode 100644 index 000000000000..eb09b317e258 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/test_hardware.py @@ -0,0 +1,41 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest import TestCase + +from roadtest.backend.mock import MockBackend + +from .hardware import Hardware + + +class TestHardware(TestCase): + def test_mock(self) -> None: + with TemporaryDirectory() as tmpdir: + work = Path(tmpdir) + + backend = MockBackend(work) + hw = Hardware(bus="dummy", work=work) + + backend.reg_write(0x1, 0xDEAD) + backend.reg_write(0x2, 0xBEEF) + mock = hw.update_mock() + mock.assert_reg_write_once(self, 0x1, 0xDEAD) + + backend.reg_write(0x1, 0xCAFE) + mock = hw.update_mock() + with self.assertRaises(AssertionError): + mock.assert_reg_write_once(self, 0x1, 0xDEAD) + + mock.assert_last_reg_write(self, 0x1, 0xCAFE) + + self.assertEqual(mock.get_last_reg_write(0x1), 0xCAFE) + self.assertEqual(mock.get_last_reg_write(0x2), 0xBEEF) + + with self.assertRaises(IndexError): + self.assertEqual(mock.get_last_reg_write(0x3), 0x0) + + mock.reset_mock() + with self.assertRaises(AssertionError): + mock.assert_last_reg_write(self, 0x2, 0xBEEF) diff --git a/tools/testing/roadtest/roadtest/core/test_log.py b/tools/testing/roadtest/roadtest/core/test_log.py new file mode 100644 index 000000000000..6988ff4419db --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/test_log.py @@ -0,0 +1,54 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +from pathlib import Path +from tempfile import NamedTemporaryFile +from unittest import TestCase + +from .log import LogParser + + +class TestLog(TestCase): + def test_parser(self) -> None: + with NamedTemporaryFile() as tmpfile: + path = Path(tmpfile.name) + + path.write_text( + """ +xyz START +finished1 +finished2 +STOP +START +STOP +START monkey STOP +START +unfinished1 +unfinished2""" + ) + + parser = LogParser(path) + self.assertEqual( + parser.get_testcase_log("finished"), ["finished1", "finished2"] + ) + + self.assertEqual( + parser.get_testcase_log("unfinished"), + ["unfinished1", "unfinished2", LogParser.DNF_MESSAGE], + ) + + self.assertEqual( + parser.get_testcase_log("notpresent"), + [], + ) + + self.assertEqual( + parser.get_testcase_log("enpty"), + [], + ) + + # Shouldn't happen since we print from the kernel? + self.assertEqual( + parser.get_testcase_log("foo"), + [], + ) diff --git a/tools/testing/roadtest/roadtest/core/test_opslog.py b/tools/testing/roadtest/roadtest/core/test_opslog.py new file mode 100644 index 000000000000..bd594c587032 --- /dev/null +++ b/tools/testing/roadtest/roadtest/core/test_opslog.py @@ -0,0 +1,27 @@ +# SPDX-License-Identifier: GPL-2.0-only +# Copyright Axis Communications AB + +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest import TestCase + +from .opslog import OpsLogReader, OpsLogWriter + + +class TestOpsLOg(TestCase): + def test_opslog(self) -> None: + with TemporaryDirectory() as tmpdir: + work = Path(tmpdir) + writer = OpsLogWriter(work) + reader = OpsLogReader(work) + + self.assertEqual(reader.read_next(), []) + + writer.write("1") + writer.write("2") + + self.assertEqual(reader.read_next(), ["1", "2"]) + self.assertEqual(reader.read_next(), []) + + writer.write("3") + self.assertEqual(reader.read_next(), ["3"]) diff --git a/tools/testing/roadtest/roadtest/tests/__init__.py b/tools/testing/roadtest/roadtest/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1