Patch Detail
get:
Show a patch.
patch:
Update a patch.
put:
Update a patch.
GET /api/patches/2197053/?format=api
{ "id": 2197053, "url": "http://patchwork.ozlabs.org/api/patches/2197053/?format=api", "web_url": "http://patchwork.ozlabs.org/project/qemu-devel/patch/20260216212952.420120-18-jsnow@redhat.com/", "project": { "id": 14, "url": "http://patchwork.ozlabs.org/api/projects/14/?format=api", "name": "QEMU Development", "link_name": "qemu-devel", "list_id": "qemu-devel.nongnu.org", "list_email": "qemu-devel@nongnu.org", "web_url": "", "scm_url": "", "webscm_url": "", "list_archive_url": "", "list_archive_url_format": "", "commit_url_format": "" }, "msgid": "<20260216212952.420120-18-jsnow@redhat.com>", "list_archive_url": null, "date": "2026-02-16T21:29:49", "name": "[v7,17/19] python: delete qemu.qmp", "commit_ref": null, "pull_url": null, "state": "new", "archived": false, "hash": "468c95dacc88c36d865933291461640fe4cf505a", "submitter": { "id": 64343, "url": "http://patchwork.ozlabs.org/api/people/64343/?format=api", "name": "John Snow", "email": "jsnow@redhat.com" }, "delegate": null, "mbox": "http://patchwork.ozlabs.org/project/qemu-devel/patch/20260216212952.420120-18-jsnow@redhat.com/mbox/", "series": [ { "id": 492353, "url": "http://patchwork.ozlabs.org/api/series/492353/?format=api", "web_url": "http://patchwork.ozlabs.org/project/qemu-devel/list/?series=492353", "date": "2026-02-16T21:29:34", "name": "python: drop qemu.qmp from qemu.git tree", "version": 7, "mbox": "http://patchwork.ozlabs.org/series/492353/mbox/" } ], "comments": "http://patchwork.ozlabs.org/api/patches/2197053/comments/", "check": "pending", "checks": "http://patchwork.ozlabs.org/api/patches/2197053/checks/", "tags": {}, "related": [], "headers": { "Return-Path": "<qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org>", "X-Original-To": "incoming@patchwork.ozlabs.org", "Delivered-To": "patchwork-incoming@legolas.ozlabs.org", "Authentication-Results": [ "legolas.ozlabs.org;\n\tdkim=pass (1024-bit key;\n unprotected) header.d=redhat.com header.i=@redhat.com header.a=rsa-sha256\n header.s=mimecast20190719 header.b=SSukQuTw;\n\tdkim-atps=neutral", "legolas.ozlabs.org;\n spf=pass (sender SPF authorized) smtp.mailfrom=nongnu.org\n (client-ip=209.51.188.17; helo=lists.gnu.org;\n envelope-from=qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org;\n receiver=patchwork.ozlabs.org)" ], "Received": [ "from lists.gnu.org (lists.gnu.org [209.51.188.17])\n\t(using TLSv1.2 with cipher ECDHE-ECDSA-AES256-GCM-SHA384 (256/256 bits))\n\t(No client certificate requested)\n\tby legolas.ozlabs.org (Postfix) with ESMTPS id 4fFGHJ49hWz1xwC\n\tfor <incoming@patchwork.ozlabs.org>; Tue, 17 Feb 2026 08:33:48 +1100 (AEDT)", "from localhost ([::1] helo=lists1p.gnu.org)\n\tby lists.gnu.org with esmtp (Exim 4.90_1)\n\t(envelope-from <qemu-devel-bounces@nongnu.org>)\n\tid 1vs6Cq-0001rL-Gf; Mon, 16 Feb 2026 16:32:36 -0500", "from eggs.gnu.org ([2001:470:142:3::10])\n by lists.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256)\n (Exim 4.90_1) (envelope-from <jsnow@redhat.com>) id 1vs6CH-0000om-Sa\n for qemu-devel@nongnu.org; Mon, 16 Feb 2026 16:32:03 -0500", "from us-smtp-delivery-124.mimecast.com ([170.10.133.124])\n by eggs.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256)\n (Exim 4.90_1) (envelope-from <jsnow@redhat.com>) id 1vs6CA-00041d-Ha\n for qemu-devel@nongnu.org; Mon, 16 Feb 2026 16:31:59 -0500", "from mx-prod-mc-08.mail-002.prod.us-west-2.aws.redhat.com\n (ec2-35-165-154-97.us-west-2.compute.amazonaws.com [35.165.154.97]) by\n relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.3,\n cipher=TLS_AES_256_GCM_SHA384) id us-mta-38-4XVrLCGzO5uMJjuTzdbtwA-1; Mon,\n 16 Feb 2026 16:31:49 -0500", "from mx-prod-int-03.mail-002.prod.us-west-2.aws.redhat.com\n (mx-prod-int-03.mail-002.prod.us-west-2.aws.redhat.com [10.30.177.12])\n (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)\n key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest\n SHA256)\n (No client certificate requested)\n by mx-prod-mc-08.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTPS\n id 8EAC918003F6; Mon, 16 Feb 2026 21:31:47 +0000 (UTC)", "from jsnow-thinkpadp16vgen1.westford.csb (unknown [10.22.88.120])\n by mx-prod-int-03.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTP\n id 193BD19560A2; Mon, 16 Feb 2026 21:31:40 +0000 (UTC)" ], "DKIM-Signature": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com;\n s=mimecast20190719; t=1771277513;\n h=from:from:reply-to:subject:subject:date:date:message-id:message-id:\n to:to:cc:cc:mime-version:mime-version:content-type:content-type:\n content-transfer-encoding:content-transfer-encoding:\n in-reply-to:in-reply-to:references:references;\n bh=nNjrdKOLnsCFK+jADNYlEKEg9Tv8edRE/xUqEAXFnnU=;\n b=SSukQuTwBdoaShf9iFywsAhV9YmzmQAt7Ne9z+wrOZl6DXzmN3lndIKNt1Z3GPnlTUlyPx\n ouR4+x6aKvXo8VLzkAOU05fQPWthrTt/9xOrCBUY5NtPY3guMeouIFH82jSgR961/gCggs\n 9i7bJGAHlP7rPHxcurSYxWxrKcdqpFk=", "X-MC-Unique": "4XVrLCGzO5uMJjuTzdbtwA-1", "X-Mimecast-MFC-AGG-ID": "4XVrLCGzO5uMJjuTzdbtwA_1771277507", "From": "John Snow <jsnow@redhat.com>", "To": "qemu-devel@nongnu.org", "Cc": "qemu-block@nongnu.org, Paolo Bonzini <pbonzini@redhat.com>,\n Pierrick Bouvier <pierrick.bouvier@linaro.org>,\n Cleber Rosa <crosa@redhat.com>, Mahmoud Mandour <ma.mandourr@gmail.com>,\n Markus Armbruster <armbru@redhat.com>,\n =?utf-8?q?Marc-Andr=C3=A9_Lureau?= <marcandre.lureau@redhat.com>,\n Thomas Huth <thuth@redhat.com>,\n Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>,\n Kyle Evans <kevans@freebsd.org>, Peter Xu <peterx@redhat.com>, =?utf-8?q?Al?=\n\t=?utf-8?q?ex_Benn=C3=A9e?= <alex.bennee@linaro.org>, =?utf-8?q?Daniel_P=2E_?=\n\t=?utf-8?q?Berrang=C3=A9?= <berrange@redhat.com>,\n Mauro Carvalho Chehab <mchehab+huawei@kernel.org>,\n Yonggang Luo <luoyonggang@gmail.com>, John Snow <jsnow@redhat.com>,\n Warner Losh <imp@bsdimp.com>, Maksim Davydov <davydov-max@yandex-team.ru>,\n Alexandre Iooss <erdnaxe@crans.org>,\n =?utf-8?q?Philippe_Mathieu-Daud=C3=A9?= <philmd@linaro.org>,\n Ed Maste <emaste@freebsd.org>, Kevin Wolf <kwolf@redhat.com>,\n Fabiano Rosas <farosas@suse.de>, Michael Roth <michael.roth@amd.com>,\n Li-Wen Hsu <lwhsu@freebsd.org>, Hanna Reitz <hreitz@redhat.com>", "Subject": "[PATCH v7 17/19] python: delete qemu.qmp", "Date": "Mon, 16 Feb 2026 16:29:49 -0500", "Message-ID": "<20260216212952.420120-18-jsnow@redhat.com>", "In-Reply-To": "<20260216212952.420120-1-jsnow@redhat.com>", "References": "<20260216212952.420120-1-jsnow@redhat.com>", "MIME-Version": "1.0", "Content-Type": "text/plain; charset=UTF-8", "Content-Transfer-Encoding": "8bit", "X-Scanned-By": "MIMEDefang 3.0 on 10.30.177.12", "Received-SPF": "pass client-ip=170.10.133.124; envelope-from=jsnow@redhat.com;\n helo=us-smtp-delivery-124.mimecast.com", "X-Spam_score_int": "-20", "X-Spam_score": "-2.1", "X-Spam_bar": "--", "X-Spam_report": "(-2.1 / 5.0 requ) BAYES_00=-1.9, DKIMWL_WL_HIGH=-0.001,\n DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1,\n RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H5=0.001, RCVD_IN_MSPIKE_WL=0.001,\n RCVD_IN_VALIDITY_RPBL_BLOCKED=0.001, RCVD_IN_VALIDITY_SAFE_BLOCKED=0.001,\n SPF_HELO_PASS=-0.001,\n SPF_PASS=-0.001 autolearn=unavailable autolearn_force=no", "X-Spam_action": "no action", "X-BeenThere": "qemu-devel@nongnu.org", "X-Mailman-Version": "2.1.29", "Precedence": "list", "List-Id": "qemu development <qemu-devel.nongnu.org>", "List-Unsubscribe": "<https://lists.nongnu.org/mailman/options/qemu-devel>,\n <mailto:qemu-devel-request@nongnu.org?subject=unsubscribe>", "List-Archive": "<https://lists.nongnu.org/archive/html/qemu-devel>", "List-Post": "<mailto:qemu-devel@nongnu.org>", "List-Help": "<mailto:qemu-devel-request@nongnu.org?subject=help>", "List-Subscribe": "<https://lists.nongnu.org/mailman/listinfo/qemu-devel>,\n <mailto:qemu-devel-request@nongnu.org?subject=subscribe>", "Errors-To": "qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org", "Sender": "qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org" }, "content": "Start relying on the external python-qemu-qmp dependency instead, to\nprevent desync between the internal and external libraries.\n\nThis library is now entirely independent; to contribute changes, see\nhttps://gitlab.com/qemu-project/python-qemu-qmp/\n\nSigned-off-by: John Snow <jsnow@redhat.com>\nReviewed-by: Thomas Huth <thuth@redhat.com>\n---\n python/qemu/qmp/__init__.py | 60 --\n python/qemu/qmp/error.py | 53 --\n python/qemu/qmp/events.py | 751 ----------------------\n python/qemu/qmp/legacy.py | 339 ----------\n python/qemu/qmp/message.py | 217 -------\n python/qemu/qmp/models.py | 146 -----\n python/qemu/qmp/protocol.py | 1101 ---------------------------------\n python/qemu/qmp/py.typed | 0\n python/qemu/qmp/qmp_client.py | 732 ----------------------\n python/qemu/qmp/qmp_shell.py | 689 ---------------------\n python/qemu/qmp/qmp_tui.py | 665 --------------------\n python/qemu/qmp/util.py | 150 -----\n python/qemu/utils/qom_fuse.py | 1 -\n python/setup.cfg | 31 +-\n python/tests/minreqs.txt | 8 +-\n python/tests/protocol.py | 596 ------------------\n 16 files changed, 6 insertions(+), 5533 deletions(-)\n delete mode 100644 python/qemu/qmp/__init__.py\n delete mode 100644 python/qemu/qmp/error.py\n delete mode 100644 python/qemu/qmp/events.py\n delete mode 100644 python/qemu/qmp/legacy.py\n delete mode 100644 python/qemu/qmp/message.py\n delete mode 100644 python/qemu/qmp/models.py\n delete mode 100644 python/qemu/qmp/protocol.py\n delete mode 100644 python/qemu/qmp/py.typed\n delete mode 100644 python/qemu/qmp/qmp_client.py\n delete mode 100644 python/qemu/qmp/qmp_shell.py\n delete mode 100644 python/qemu/qmp/qmp_tui.py\n delete mode 100644 python/qemu/qmp/util.py\n delete mode 100644 python/tests/protocol.py", "diff": "diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py\ndeleted file mode 100644\nindex 058139dc3ca..00000000000\n--- a/python/qemu/qmp/__init__.py\n+++ /dev/null\n@@ -1,60 +0,0 @@\n-\"\"\"\n-QEMU Monitor Protocol (QMP) development library & tooling.\n-\n-This package provides a fairly low-level class for communicating\n-asynchronously with QMP protocol servers, as implemented by QEMU, the\n-QEMU Guest Agent, and the QEMU Storage Daemon.\n-\n-`QMPClient` provides the main functionality of this package. All errors\n-raised by this library derive from `QMPError`, see `qmp.error` for\n-additional detail. See `qmp.events` for an in-depth tutorial on\n-managing QMP events.\n-\"\"\"\n-\n-# Copyright (C) 2020-2022 John Snow for Red Hat, Inc.\n-#\n-# Authors:\n-# John Snow <jsnow@redhat.com>\n-#\n-# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.\n-#\n-# This work is licensed under the terms of the GNU LGPL, version 2 or\n-# later. See the COPYING file in the top-level directory.\n-\n-import logging\n-\n-from .error import QMPError\n-from .events import EventListener\n-from .message import Message\n-from .protocol import (\n- ConnectError,\n- Runstate,\n- SocketAddrT,\n- StateError,\n-)\n-from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient\n-\n-\n-# Suppress logging unless an application engages it.\n-logging.getLogger('qemu.qmp').addHandler(logging.NullHandler())\n-\n-\n-# IMPORTANT: When modifying this list, update the Sphinx overview docs.\n-# Anything visible in the qemu.qmp namespace should be on the overview page.\n-__all__ = (\n- # Classes, most to least important\n- 'QMPClient',\n- 'Message',\n- 'EventListener',\n- 'Runstate',\n-\n- # Exceptions, most generic to most explicit\n- 'QMPError',\n- 'StateError',\n- 'ConnectError',\n- 'ExecuteError',\n- 'ExecInterruptedError',\n-\n- # Type aliases\n- 'SocketAddrT',\n-)\ndiff --git a/python/qemu/qmp/error.py b/python/qemu/qmp/error.py\ndeleted file mode 100644\nindex c87b078f620..00000000000\n--- a/python/qemu/qmp/error.py\n+++ /dev/null\n@@ -1,53 +0,0 @@\n-\"\"\"\n-QMP Error Classes\n-\n-This package seeks to provide semantic error classes that are intended\n-to be used directly by clients when they would like to handle particular\n-semantic failures (e.g. \"failed to connect\") without needing to know the\n-enumeration of possible reasons for that failure.\n-\n-QMPError serves as the ancestor for all exceptions raised by this\n-package, and is suitable for use in handling semantic errors from this\n-library. In most cases, individual public methods will attempt to catch\n-and re-encapsulate various exceptions to provide a semantic\n-error-handling interface.\n-\n-.. admonition:: QMP Exception Hierarchy Reference\n-\n- | `Exception`\n- | +-- `QMPError`\n- | +-- `ConnectError`\n- | +-- `StateError`\n- | +-- `ExecInterruptedError`\n- | +-- `ExecuteError`\n- | +-- `ListenerError`\n- | +-- `ProtocolError`\n- | +-- `DeserializationError`\n- | +-- `UnexpectedTypeError`\n- | +-- `ServerParseError`\n- | +-- `BadReplyError`\n- | +-- `GreetingError`\n- | +-- `NegotiationError`\n-\"\"\"\n-\n-\n-class QMPError(Exception):\n- \"\"\"Abstract error class for all errors originating from this package.\"\"\"\n-\n-\n-class ProtocolError(QMPError):\n- \"\"\"\n- Abstract error class for protocol failures.\n-\n- Semantically, these errors are generally the fault of either the\n- protocol server or as a result of a bug in this library.\n-\n- :param error_message: Human-readable string describing the error.\n- \"\"\"\n- def __init__(self, error_message: str, *args: object):\n- super().__init__(error_message, *args)\n- #: Human-readable error message, without any prefix.\n- self.error_message: str = error_message\n-\n- def __str__(self) -> str:\n- return self.error_message\ndiff --git a/python/qemu/qmp/events.py b/python/qemu/qmp/events.py\ndeleted file mode 100644\nindex cfb5f0ac621..00000000000\n--- a/python/qemu/qmp/events.py\n+++ /dev/null\n@@ -1,751 +0,0 @@\n-\"\"\"\n-QMP Events and EventListeners\n-\n-Asynchronous QMP uses `EventListener` objects to listen for events. An\n-`EventListener` is a FIFO event queue that can be pre-filtered to listen\n-for only specific events. Each `EventListener` instance receives its own\n-copy of events that it hears, so events may be consumed without fear or\n-worry for depriving other listeners of events they need to hear.\n-\n-\n-EventListener Tutorial\n-----------------------\n-\n-In all of the following examples, we assume that we have a `QMPClient`\n-instantiated named ``qmp`` that is already connected. For example:\n-\n-.. code:: python\n-\n- from qemu.qmp import QMPClient\n-\n- qmp = QMPClient('example-vm')\n- await qmp.connect('127.0.0.1', 1234)\n-\n-\n-`listener()` context blocks with one name\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-The most basic usage is by using the `listener()` context manager to\n-construct them:\n-\n-.. code:: python\n-\n- with qmp.listener('STOP') as listener:\n- await qmp.execute('stop')\n- await listener.get()\n-\n-The listener is active only for the duration of the ‘with’ block. This\n-instance listens only for ‘STOP’ events.\n-\n-\n-`listener()` context blocks with two or more names\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-Multiple events can be selected for by providing any ``Iterable[str]``:\n-\n-.. code:: python\n-\n- with qmp.listener(('STOP', 'RESUME')) as listener:\n- await qmp.execute('stop')\n- event = await listener.get()\n- assert event['event'] == 'STOP'\n-\n- await qmp.execute('cont')\n- event = await listener.get()\n- assert event['event'] == 'RESUME'\n-\n-\n-`listener()` context blocks with no names\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-By omitting names entirely, you can listen to ALL events.\n-\n-.. code:: python\n-\n- with qmp.listener() as listener:\n- await qmp.execute('stop')\n- event = await listener.get()\n- assert event['event'] == 'STOP'\n-\n-This isn’t a very good use case for this feature: In a non-trivial\n-running system, we may not know what event will arrive next. Grabbing\n-the top of a FIFO queue returning multiple kinds of events may be prone\n-to error.\n-\n-\n-Using async iterators to retrieve events\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-If you’d like to simply watch what events happen to arrive, you can use\n-the listener as an async iterator:\n-\n-.. code:: python\n-\n- with qmp.listener() as listener:\n- async for event in listener:\n- print(f\"Event arrived: {event['event']}\")\n-\n-This is analogous to the following code:\n-\n-.. code:: python\n-\n- with qmp.listener() as listener:\n- while True:\n- event = listener.get()\n- print(f\"Event arrived: {event['event']}\")\n-\n-This event stream will never end, so these blocks will never\n-terminate. Even if the QMP connection errors out prematurely, this\n-listener will go silent without raising an error.\n-\n-\n-Using asyncio.Task to concurrently retrieve events\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-Since a listener’s event stream will never terminate, it is not likely\n-useful to use that form in a script. For longer-running clients, we can\n-create event handlers by using `asyncio.Task` to create concurrent\n-coroutines:\n-\n-.. code:: python\n-\n- async def print_events(listener):\n- try:\n- async for event in listener:\n- print(f\"Event arrived: {event['event']}\")\n- except asyncio.CancelledError:\n- return\n-\n- with qmp.listener() as listener:\n- task = asyncio.Task(print_events(listener))\n- await qmp.execute('stop')\n- await qmp.execute('cont')\n- task.cancel()\n- await task\n-\n-However, there is no guarantee that these events will be received by the\n-time we leave this context block. Once the context block is exited, the\n-listener will cease to hear any new events, and becomes inert.\n-\n-Be mindful of the timing: the above example will *probably*– but does\n-not *guarantee*– that both STOP/RESUMED events will be printed. The\n-example below outlines how to use listeners outside of a context block.\n-\n-\n-Using `register_listener()` and `remove_listener()`\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-To create a listener with a longer lifetime, beyond the scope of a\n-single block, create a listener and then call `register_listener()`:\n-\n-.. code:: python\n-\n- class MyClient:\n- def __init__(self, qmp):\n- self.qmp = qmp\n- self.listener = EventListener()\n-\n- async def print_events(self):\n- try:\n- async for event in self.listener:\n- print(f\"Event arrived: {event['event']}\")\n- except asyncio.CancelledError:\n- return\n-\n- async def run(self):\n- self.task = asyncio.Task(self.print_events)\n- self.qmp.register_listener(self.listener)\n- await qmp.execute('stop')\n- await qmp.execute('cont')\n-\n- async def stop(self):\n- self.task.cancel()\n- await self.task\n- self.qmp.remove_listener(self.listener)\n-\n-The listener can be deactivated by using `remove_listener()`. When it is\n-removed, any possible pending events are cleared and it can be\n-re-registered at a later time.\n-\n-\n-Using the built-in all events listener\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-The `QMPClient` object creates its own default listener named\n-:py:obj:`~Events.events` that can be used for the same purpose without\n-having to create your own:\n-\n-.. code:: python\n-\n- async def print_events(listener):\n- try:\n- async for event in listener:\n- print(f\"Event arrived: {event['event']}\")\n- except asyncio.CancelledError:\n- return\n-\n- task = asyncio.Task(print_events(qmp.events))\n-\n- await qmp.execute('stop')\n- await qmp.execute('cont')\n-\n- task.cancel()\n- await task\n-\n-\n-Using both .get() and async iterators\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-The async iterator and `get()` methods pull events from the same FIFO\n-queue. If you mix the usage of both, be aware: Events are emitted\n-precisely once per listener.\n-\n-If multiple contexts try to pull events from the same listener instance,\n-events are still emitted only precisely once.\n-\n-This restriction can be lifted by creating additional listeners.\n-\n-\n-Creating multiple listeners\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-Additional `EventListener` objects can be created at-will. Each one\n-receives its own copy of events, with separate FIFO event queues.\n-\n-.. code:: python\n-\n- my_listener = EventListener()\n- qmp.register_listener(my_listener)\n-\n- await qmp.execute('stop')\n- copy1 = await my_listener.get()\n- copy2 = await qmp.events.get()\n-\n- assert copy1 == copy2\n-\n-In this example, we await an event from both a user-created\n-`EventListener` and the built-in events listener. Both receive the same\n-event.\n-\n-\n-Clearing listeners\n-~~~~~~~~~~~~~~~~~~\n-\n-`EventListener` objects can be cleared, clearing all events seen thus far:\n-\n-.. code:: python\n-\n- await qmp.execute('stop')\n- discarded = qmp.events.clear()\n- await qmp.execute('cont')\n- event = await qmp.events.get()\n- assert event['event'] == 'RESUME'\n- assert discarded[0]['event'] == 'STOP'\n-\n-`EventListener` objects are FIFO queues. If events are not consumed,\n-they will remain in the queue until they are witnessed or discarded via\n-`clear()`. FIFO queues will be drained automatically upon leaving a\n-context block, or when calling `remove_listener()`.\n-\n-Any events removed from the queue in this fashion will be returned by\n-the clear call.\n-\n-\n-Accessing listener history\n-~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-`EventListener` objects record their history. Even after being cleared,\n-you can obtain a record of all events seen so far:\n-\n-.. code:: python\n-\n- await qmp.execute('stop')\n- await qmp.execute('cont')\n- qmp.events.clear()\n-\n- assert len(qmp.events.history) == 2\n- assert qmp.events.history[0]['event'] == 'STOP'\n- assert qmp.events.history[1]['event'] == 'RESUME'\n-\n-The history is updated immediately and does not require the event to be\n-witnessed first.\n-\n-\n-Using event filters\n-~~~~~~~~~~~~~~~~~~~\n-\n-`EventListener` objects can be given complex filtering criteria if names\n-are not sufficient:\n-\n-.. code:: python\n-\n- def job1_filter(event) -> bool:\n- event_data = event.get('data', {})\n- event_job_id = event_data.get('id')\n- return event_job_id == \"job1\"\n-\n- with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:\n- await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})\n- async for event in listener:\n- if event['data']['status'] == 'concluded':\n- break\n-\n-These filters might be most useful when parameterized. `EventListener`\n-objects expect a function that takes only a single argument (the raw\n-event, as a `Message`) and returns a bool; True if the event should be\n-accepted into the stream. You can create a function that adapts this\n-signature to accept configuration parameters:\n-\n-.. code:: python\n-\n- def job_filter(job_id: str) -> EventFilter:\n- def filter(event: Message) -> bool:\n- return event['data']['id'] == job_id\n- return filter\n-\n- with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:\n- await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})\n- async for event in listener:\n- if event['data']['status'] == 'concluded':\n- break\n-\n-\n-Activating an existing listener with `listen()`\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-Listeners with complex, long configurations can also be created manually\n-and activated temporarily by using `listen()` instead of `listener()`:\n-\n-.. code:: python\n-\n- listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',\n- 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',\n- 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))\n-\n- with qmp.listen(listener):\n- await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})\n- async for event in listener:\n- print(event)\n- if event['event'] == 'BLOCK_JOB_COMPLETED':\n- break\n-\n-Any events that are not witnessed by the time the block is left will be\n-cleared from the queue; entering the block is an implicit\n-`register_listener()` and leaving the block is an implicit\n-`remove_listener()`.\n-\n-\n-Activating multiple existing listeners with `listen()`\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-While `listener()` is only capable of creating a single listener,\n-`listen()` is capable of activating multiple listeners simultaneously:\n-\n-.. code:: python\n-\n- def job_filter(job_id: str) -> EventFilter:\n- def filter(event: Message) -> bool:\n- return event['data']['id'] == job_id\n- return filter\n-\n- jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))\n- jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))\n-\n- with qmp.listen(jobA, jobB):\n- qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})\n- qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})\n-\n- async for event in jobA.get():\n- if event['data']['status'] == 'concluded':\n- break\n- async for event in jobB.get():\n- if event['data']['status'] == 'concluded':\n- break\n-\n-\n-Note that in the above example, we explicitly wait on jobA to conclude\n-first, and then wait for jobB to do the same. All we have guaranteed is\n-that the code that waits for jobA will not accidentally consume the\n-event intended for the jobB waiter.\n-\n-\n-Extending the `EventListener` class\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-In the case that a more specialized `EventListener` is desired to\n-provide either more functionality or more compact syntax for specialized\n-cases, it can be extended.\n-\n-One of the key methods to extend or override is\n-:py:meth:`~EventListener.accept()`. The default implementation checks an\n-incoming message for:\n-\n-1. A qualifying name, if any :py:obj:`~EventListener.names` were\n- specified at initialization time\n-2. That :py:obj:`~EventListener.event_filter()` returns True.\n-\n-This can be modified however you see fit to change the criteria for\n-inclusion in the stream.\n-\n-For convenience, a ``JobListener`` class could be created that simply\n-bakes in configuration so it does not need to be repeated:\n-\n-.. code:: python\n-\n- class JobListener(EventListener):\n- def __init__(self, job_id: str):\n- super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',\n- 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',\n- 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))\n- self.job_id = job_id\n-\n- def accept(self, event) -> bool:\n- if not super().accept(event):\n- return False\n- if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):\n- return event['data']['id'] == job_id\n- return event['data']['device'] == job_id\n-\n-From here on out, you can conjure up a custom-purpose listener that\n-listens only for job-related events for a specific job-id easily:\n-\n-.. code:: python\n-\n- listener = JobListener('job4')\n- with qmp.listener(listener):\n- await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})\n- async for event in listener:\n- print(event)\n- if event['event'] == 'BLOCK_JOB_COMPLETED':\n- break\n-\n-\n-Experimental Interfaces & Design Issues\n----------------------------------------\n-\n-These interfaces are not ones I am sure I will keep or otherwise modify\n-heavily.\n-\n-qmp.listen()’s type signature\n-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n-\n-`listen()` does not return anything, because it was assumed the caller\n-already had a handle to the listener. However, for\n-``qmp.listen(EventListener())`` forms, the caller will not have saved a\n-handle to the listener.\n-\n-Because this function can accept *many* listeners, I found it hard to\n-accurately type in a way where it could be used in both “one” or “many”\n-forms conveniently and in a statically type-safe manner.\n-\n-Ultimately, I removed the return altogether, but perhaps with more time\n-I can work out a way to re-add it.\n-\n-\n-API Reference\n--------------\n-\n-\"\"\"\n-\n-import asyncio\n-from contextlib import contextmanager\n-import logging\n-from typing import (\n- AsyncIterator,\n- Callable,\n- Iterable,\n- Iterator,\n- List,\n- Optional,\n- Set,\n- Tuple,\n- Union,\n-)\n-\n-from .error import QMPError\n-from .message import Message\n-\n-\n-EventNames = Union[str, Iterable[str], None]\n-EventFilter = Callable[[Message], bool]\n-\n-\n-class ListenerError(QMPError):\n- \"\"\"\n- Generic error class for `EventListener`-related problems.\n- \"\"\"\n-\n-\n-class EventListener:\n- \"\"\"\n- Selectively listens for events with runtime configurable filtering.\n-\n- This class is designed to be directly usable for the most common cases,\n- but it can be extended to provide more rigorous control.\n-\n- :param names:\n- One or more names of events to listen for.\n- When not provided, listen for ALL events.\n- :param event_filter:\n- An optional event filtering function.\n- When names are also provided, this acts as a secondary filter.\n-\n- When ``names`` and ``event_filter`` are both provided, the names\n- will be filtered first, and then the filter function will be called\n- second. The event filter function can assume that the format of the\n- event is a known format.\n- \"\"\"\n- def __init__(\n- self,\n- names: EventNames = None,\n- event_filter: Optional[EventFilter] = None,\n- ):\n- # Queue of 'heard' events yet to be witnessed by a caller.\n- self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()\n-\n- # Intended as a historical record, NOT a processing queue or backlog.\n- self._history: List[Message] = []\n-\n- #: Primary event filter, based on one or more event names.\n- self.names: Set[str] = set()\n- if isinstance(names, str):\n- self.names.add(names)\n- elif names is not None:\n- self.names.update(names)\n-\n- #: Optional, secondary event filter.\n- self.event_filter: Optional[EventFilter] = event_filter\n-\n- def __repr__(self) -> str:\n- args: List[str] = []\n- if self.names:\n- args.append(f\"names={self.names!r}\")\n- if self.event_filter:\n- args.append(f\"event_filter={self.event_filter!r}\")\n-\n- if self._queue.qsize():\n- state = f\"<pending={self._queue.qsize()}>\"\n- else:\n- state = ''\n-\n- argstr = \", \".join(args)\n- return f\"{type(self).__name__}{state}({argstr})\"\n-\n- @property\n- def history(self) -> Tuple[Message, ...]:\n- \"\"\"\n- A read-only history of all events seen so far.\n-\n- This represents *every* event, including those not yet witnessed\n- via `get()` or ``async for``. It persists between `clear()`\n- calls and is immutable.\n- \"\"\"\n- return tuple(self._history)\n-\n- def accept(self, event: Message) -> bool:\n- \"\"\"\n- Determine if this listener accepts this event.\n-\n- This method determines which events will appear in the stream.\n- The default implementation simply checks the event against the\n- list of names and the event_filter to decide if this\n- `EventListener` accepts a given event. It can be\n- overridden/extended to provide custom listener behavior.\n-\n- User code is not expected to need to invoke this method.\n-\n- :param event: The event under consideration.\n- :return: `True`, if this listener accepts this event.\n- \"\"\"\n- name_ok = (not self.names) or (event['event'] in self.names)\n- return name_ok and (\n- (not self.event_filter) or self.event_filter(event)\n- )\n-\n- async def put(self, event: Message) -> None:\n- \"\"\"\n- Conditionally put a new event into the FIFO queue.\n-\n- This method is not designed to be invoked from user code, and it\n- should not need to be overridden. It is a public interface so\n- that `QMPClient` has an interface by which it can inform\n- registered listeners of new events.\n-\n- The event will be put into the queue if\n- :py:meth:`~EventListener.accept()` returns `True`.\n-\n- :param event: The new event to put into the FIFO queue.\n- \"\"\"\n- if not self.accept(event):\n- return\n-\n- self._history.append(event)\n- await self._queue.put(event)\n-\n- async def get(self) -> Message:\n- \"\"\"\n- Wait for the very next event in this stream.\n-\n- If one is already available, return that one.\n- \"\"\"\n- return await self._queue.get()\n-\n- def empty(self) -> bool:\n- \"\"\"\n- Return `True` if there are no pending events.\n- \"\"\"\n- return self._queue.empty()\n-\n- def clear(self) -> List[Message]:\n- \"\"\"\n- Clear this listener of all pending events.\n-\n- Called when an `EventListener` is being unregistered, this clears the\n- pending FIFO queue synchronously. It can be also be used to\n- manually clear any pending events, if desired.\n-\n- :return: The cleared events, if any.\n-\n- .. warning::\n- Take care when discarding events. Cleared events will be\n- silently tossed on the floor. All events that were ever\n- accepted by this listener are visible in `history()`.\n- \"\"\"\n- events = []\n- while True:\n- try:\n- events.append(self._queue.get_nowait())\n- except asyncio.QueueEmpty:\n- break\n-\n- return events\n-\n- def __aiter__(self) -> AsyncIterator[Message]:\n- return self\n-\n- async def __anext__(self) -> Message:\n- \"\"\"\n- Enables the `EventListener` to function as an async iterator.\n-\n- It may be used like this:\n-\n- .. code:: python\n-\n- async for event in listener:\n- print(event)\n-\n- These iterators will never terminate of their own accord; you\n- must provide break conditions or otherwise prepare to run them\n- in an `asyncio.Task` that can be cancelled.\n- \"\"\"\n- return await self.get()\n-\n-\n-class Events:\n- \"\"\"\n- Events is a mix-in class that adds event functionality to the QMP class.\n-\n- It's designed specifically as a mix-in for `QMPClient`, and it\n- relies upon the class it is being mixed into having a 'logger'\n- property.\n- \"\"\"\n- def __init__(self) -> None:\n- self._listeners: List[EventListener] = []\n-\n- #: Default, all-events `EventListener`. See `qmp.events` for more info.\n- self.events: EventListener = EventListener()\n- self.register_listener(self.events)\n-\n- # Parent class needs to have a logger\n- self.logger: logging.Logger\n-\n- async def _event_dispatch(self, msg: Message) -> None:\n- \"\"\"\n- Given a new event, propagate it to all of the active listeners.\n-\n- :param msg: The event to propagate.\n- \"\"\"\n- for listener in self._listeners:\n- await listener.put(msg)\n-\n- def register_listener(self, listener: EventListener) -> None:\n- \"\"\"\n- Register and activate an `EventListener`.\n-\n- :param listener: The listener to activate.\n- :raise ListenerError: If the given listener is already registered.\n- \"\"\"\n- if listener in self._listeners:\n- raise ListenerError(\"Attempted to re-register existing listener\")\n- self.logger.debug(\"Registering %s.\", str(listener))\n- self._listeners.append(listener)\n-\n- def remove_listener(self, listener: EventListener) -> None:\n- \"\"\"\n- Unregister and deactivate an `EventListener`.\n-\n- The removed listener will have its pending events cleared via\n- `clear()`. The listener can be re-registered later when\n- desired.\n-\n- :param listener: The listener to deactivate.\n- :raise ListenerError: If the given listener is not registered.\n- \"\"\"\n- if listener == self.events:\n- raise ListenerError(\"Cannot remove the default listener.\")\n- self.logger.debug(\"Removing %s.\", str(listener))\n- listener.clear()\n- self._listeners.remove(listener)\n-\n- @contextmanager\n- def listen(self, *listeners: EventListener) -> Iterator[None]:\n- r\"\"\"\n- Context manager: Temporarily listen with an `EventListener`.\n-\n- Accepts one or more `EventListener` objects and registers them,\n- activating them for the duration of the context block.\n-\n- `EventListener` objects will have any pending events in their\n- FIFO queue cleared upon exiting the context block, when they are\n- deactivated.\n-\n- :param \\*listeners: One or more EventListeners to activate.\n- :raise ListenerError: If the given listener(s) are already active.\n- \"\"\"\n- _added = []\n-\n- try:\n- for listener in listeners:\n- self.register_listener(listener)\n- _added.append(listener)\n-\n- yield\n-\n- finally:\n- for listener in _added:\n- self.remove_listener(listener)\n-\n- @contextmanager\n- def listener(\n- self,\n- names: EventNames = (),\n- event_filter: Optional[EventFilter] = None\n- ) -> Iterator[EventListener]:\n- \"\"\"\n- Context manager: Temporarily listen with a new `EventListener`.\n-\n- Creates an `EventListener` object and registers it, activating\n- it for the duration of the context block.\n-\n- :param names:\n- One or more names of events to listen for.\n- When not provided, listen for ALL events.\n- :param event_filter:\n- An optional event filtering function.\n- When names are also provided, this acts as a secondary filter.\n-\n- :return: The newly created and active `EventListener`.\n- \"\"\"\n- listener = EventListener(names, event_filter)\n- with self.listen(listener):\n- yield listener\ndiff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py\ndeleted file mode 100644\nindex 060ed0eb9d4..00000000000\n--- a/python/qemu/qmp/legacy.py\n+++ /dev/null\n@@ -1,339 +0,0 @@\n-\"\"\"\n-(Legacy) Sync QMP Wrapper\n-\n-This module provides the `QEMUMonitorProtocol` class, which is a\n-synchronous wrapper around `QMPClient`.\n-\n-Its design closely resembles that of the original QEMUMonitorProtocol\n-class, originally written by Luiz Capitulino. It is provided here for\n-compatibility with scripts inside the QEMU source tree that expect the\n-old interface.\n-\"\"\"\n-\n-#\n-# Copyright (C) 2009-2022 Red Hat Inc.\n-#\n-# Authors:\n-# Luiz Capitulino <lcapitulino@redhat.com>\n-# John Snow <jsnow@redhat.com>\n-#\n-# This work is licensed under the terms of the GNU GPL, version 2. See\n-# the COPYING file in the top-level directory.\n-#\n-\n-import asyncio\n-import socket\n-from types import TracebackType\n-from typing import (\n- Any,\n- Awaitable,\n- Dict,\n- List,\n- Optional,\n- Type,\n- TypeVar,\n- Union,\n-)\n-\n-from .error import QMPError\n-from .protocol import Runstate, SocketAddrT\n-from .qmp_client import QMPClient\n-from .util import get_or_create_event_loop\n-\n-\n-#: QMPMessage is an entire QMP message of any kind.\n-QMPMessage = Dict[str, Any]\n-\n-#: QMPReturnValue is the 'return' value of a command.\n-QMPReturnValue = object\n-\n-#: QMPObject is any object in a QMP message.\n-QMPObject = Dict[str, object]\n-\n-# QMPMessage can be outgoing commands or incoming events/returns.\n-# QMPReturnValue is usually a dict/json object, but due to QAPI's\n-# 'command-returns-exceptions', it can actually be anything.\n-#\n-# {'return': {}} is a QMPMessage,\n-# {} is the QMPReturnValue.\n-\n-\n-class QMPBadPortError(QMPError):\n- \"\"\"\n- Unable to parse socket address: Port was non-numerical.\n- \"\"\"\n-\n-\n-class QEMUMonitorProtocol:\n- \"\"\"\n- Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)\n- and then allow to handle commands and events.\n-\n- :param address: QEMU address, can be a unix socket path (string), a tuple\n- in the form ( address, port ) for a TCP connection, or an\n- existing `socket.socket` object.\n- :param server: Act as the socket server. (See 'accept')\n- Not applicable when passing a socket directly.\n- :param nickname: Optional nickname used for logging.\n- \"\"\"\n-\n- def __init__(self,\n- address: Union[SocketAddrT, socket.socket],\n- server: bool = False,\n- nickname: Optional[str] = None):\n-\n- if server and isinstance(address, socket.socket):\n- raise ValueError(\n- \"server argument should be False when passing a socket\")\n-\n- self._qmp = QMPClient(nickname)\n- self._address = address\n- self._timeout: Optional[float] = None\n-\n- # This is a sync shim intended for use in fully synchronous\n- # programs. Create and set an event loop if necessary.\n- self._aloop = get_or_create_event_loop()\n-\n- if server:\n- assert not isinstance(self._address, socket.socket)\n- self._sync(self._qmp.start_server(self._address))\n-\n- _T = TypeVar('_T')\n-\n- def _sync(\n- self, future: Awaitable[_T], timeout: Optional[float] = None\n- ) -> _T:\n- return self._aloop.run_until_complete(\n- asyncio.wait_for(future, timeout=timeout)\n- )\n-\n- def _get_greeting(self) -> Optional[QMPMessage]:\n- if self._qmp.greeting is not None:\n- # pylint: disable=protected-access\n- return self._qmp.greeting._asdict()\n- return None\n-\n- def __enter__(self: _T) -> _T:\n- # Implement context manager enter function.\n- return self\n-\n- def __exit__(self,\n- exc_type: Optional[Type[BaseException]],\n- exc_val: Optional[BaseException],\n- exc_tb: Optional[TracebackType]) -> None:\n- # Implement context manager exit function.\n- self.close()\n-\n- @classmethod\n- def parse_address(cls, address: str) -> SocketAddrT:\n- \"\"\"\n- Parse a string into a QMP address.\n-\n- Figure out if the argument is in the port:host form.\n- If it's not, it's probably a file path.\n- \"\"\"\n- components = address.split(':')\n- if len(components) == 2:\n- try:\n- port = int(components[1])\n- except ValueError:\n- msg = f\"Bad port: '{components[1]}' in '{address}'.\"\n- raise QMPBadPortError(msg) from None\n- return (components[0], port)\n-\n- # Treat as filepath.\n- return address\n-\n- def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:\n- \"\"\"\n- Connect to the QMP Monitor and perform capabilities negotiation.\n-\n- :return: QMP greeting dict, or None if negotiate is false\n- :raise ConnectError: on connection errors\n- \"\"\"\n- self._qmp.await_greeting = negotiate\n- self._qmp.negotiate = negotiate\n-\n- self._sync(\n- self._qmp.connect(self._address)\n- )\n- return self._get_greeting()\n-\n- def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:\n- \"\"\"\n- Await connection from QMP Monitor and perform capabilities negotiation.\n-\n- :param timeout:\n- timeout in seconds (nonnegative float number, or None).\n- If None, there is no timeout, and this may block forever.\n-\n- :return: QMP greeting dict\n- :raise ConnectError: on connection errors\n- \"\"\"\n- self._qmp.await_greeting = True\n- self._qmp.negotiate = True\n-\n- self._sync(self._qmp.accept(), timeout)\n-\n- ret = self._get_greeting()\n- assert ret is not None\n- return ret\n-\n- def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:\n- \"\"\"\n- Send a QMP command to the QMP Monitor.\n-\n- :param qmp_cmd: QMP command to be sent as a Python dict\n- :return: QMP response as a Python dict\n- \"\"\"\n- return dict(\n- self._sync(\n- # pylint: disable=protected-access\n-\n- # _raw() isn't a public API, because turning off\n- # automatic ID assignment is discouraged. For\n- # compatibility with iotests *only*, do it anyway.\n- self._qmp._raw(qmp_cmd, assign_id=False),\n- self._timeout\n- )\n- )\n-\n- def cmd_raw(self, name: str,\n- args: Optional[Dict[str, object]] = None) -> QMPMessage:\n- \"\"\"\n- Build a QMP command and send it to the QMP Monitor.\n-\n- :param name: command name (string)\n- :param args: command arguments (dict)\n- \"\"\"\n- qmp_cmd: QMPMessage = {'execute': name}\n- if args:\n- qmp_cmd['arguments'] = args\n- return self.cmd_obj(qmp_cmd)\n-\n- def cmd(self, cmd: str, **kwds: object) -> QMPReturnValue:\n- \"\"\"\n- Build and send a QMP command to the monitor, report errors if any\n- \"\"\"\n- return self._sync(\n- self._qmp.execute(cmd, kwds),\n- self._timeout\n- )\n-\n- def pull_event(self,\n- wait: Union[bool, float] = False) -> Optional[QMPMessage]:\n- \"\"\"\n- Pulls a single event.\n-\n- :param wait:\n- If False or 0, do not wait. Return None if no events ready.\n- If True, wait forever until the next event.\n- Otherwise, wait for the specified number of seconds.\n-\n- :raise asyncio.TimeoutError:\n- When a timeout is requested and the timeout period elapses.\n-\n- :return: The first available QMP event, or None.\n- \"\"\"\n- # Kick the event loop to allow events to accumulate\n- self._sync(asyncio.sleep(0))\n-\n- if not wait:\n- # wait is False/0: \"do not wait, do not except.\"\n- if self._qmp.events.empty():\n- return None\n-\n- # If wait is 'True', wait forever. If wait is False/0, the events\n- # queue must not be empty; but it still needs some real amount\n- # of time to complete.\n- timeout = None\n- if wait and isinstance(wait, float):\n- timeout = wait\n-\n- return dict(\n- self._sync(\n- self._qmp.events.get(),\n- timeout\n- )\n- )\n-\n- def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:\n- \"\"\"\n- Get a list of QMP events and clear all pending events.\n-\n- :param wait:\n- If False or 0, do not wait. Return None if no events ready.\n- If True, wait until we have at least one event.\n- Otherwise, wait for up to the specified number of seconds for at\n- least one event.\n-\n- :raise asyncio.TimeoutError:\n- When a timeout is requested and the timeout period elapses.\n-\n- :return: A list of QMP events.\n- \"\"\"\n- events = [dict(x) for x in self._qmp.events.clear()]\n- if events:\n- return events\n-\n- event = self.pull_event(wait)\n- return [event] if event is not None else []\n-\n- def clear_events(self) -> None:\n- \"\"\"Clear current list of pending events.\"\"\"\n- self._qmp.events.clear()\n-\n- def close(self) -> None:\n- \"\"\"Close the connection.\"\"\"\n- self._sync(\n- self._qmp.disconnect()\n- )\n-\n- def settimeout(self, timeout: Optional[float]) -> None:\n- \"\"\"\n- Set the timeout for QMP RPC execution.\n-\n- This timeout affects the `cmd`, `cmd_obj`, and `cmd_raw` methods.\n- The `accept`, `pull_event` and `get_events` methods have their\n- own configurable timeouts.\n-\n- :param timeout:\n- timeout in seconds, or None.\n- None will wait indefinitely.\n- \"\"\"\n- self._timeout = timeout\n-\n- def send_fd_scm(self, fd: int) -> None:\n- \"\"\"\n- Send a file descriptor to the remote via SCM_RIGHTS.\n- \"\"\"\n- self._qmp.send_fd_scm(fd)\n-\n- def __del__(self) -> None:\n- if self._qmp.runstate != Runstate.IDLE:\n- self._qmp.logger.warning(\n- \"QEMUMonitorProtocol object garbage collected without a prior \"\n- \"call to close()\"\n- )\n-\n- if not self._aloop.is_running():\n- if self._qmp.runstate != Runstate.IDLE:\n- # If the user neglected to close the QMP session and we\n- # are not currently running in an asyncio context, we\n- # have the opportunity to close the QMP session. If we\n- # do not do this, the error messages presented over\n- # dangling async resources may not make any sense to the\n- # user.\n- self.close()\n-\n- if self._qmp.runstate != Runstate.IDLE:\n- # If QMP is still not quiesced, it means that the garbage\n- # collector ran from a context within the event loop and we\n- # are simply too late to take any corrective action. Raise\n- # our own error to give meaningful feedback to the user in\n- # order to prevent pages of asyncio stacktrace jargon.\n- raise QMPError(\n- \"QEMUMonitorProtocol.close() was not called before object was \"\n- \"garbage collected, and could not be closed due to GC running \"\n- \"in the event loop\"\n- )\ndiff --git a/python/qemu/qmp/message.py b/python/qemu/qmp/message.py\ndeleted file mode 100644\nindex dabb8ec360e..00000000000\n--- a/python/qemu/qmp/message.py\n+++ /dev/null\n@@ -1,217 +0,0 @@\n-\"\"\"\n-QMP Message Format\n-\n-This module provides the `Message` class, which represents a single QMP\n-message sent to or from the server.\n-\"\"\"\n-\n-import json\n-from json import JSONDecodeError\n-from typing import (\n- Dict,\n- Iterator,\n- Mapping,\n- MutableMapping,\n- Optional,\n- Union,\n-)\n-\n-from .error import ProtocolError\n-\n-\n-class Message(MutableMapping[str, object]):\n- \"\"\"\n- Represents a single QMP protocol message.\n-\n- QMP uses JSON objects as its basic communicative unit; so this\n- Python object is a :py:obj:`~collections.abc.MutableMapping`. It may\n- be instantiated from either another mapping (like a `dict`), or from\n- raw `bytes` that still need to be deserialized.\n-\n- Once instantiated, it may be treated like any other\n- :py:obj:`~collections.abc.MutableMapping`::\n-\n- >>> msg = Message(b'{\"hello\": \"world\"}')\n- >>> assert msg['hello'] == 'world'\n- >>> msg['id'] = 'foobar'\n- >>> print(msg)\n- {\n- \"hello\": \"world\",\n- \"id\": \"foobar\"\n- }\n-\n- It can be converted to `bytes`::\n-\n- >>> msg = Message({\"hello\": \"world\"})\n- >>> print(bytes(msg))\n- b'{\"hello\":\"world\",\"id\":\"foobar\"}'\n-\n- Or back into a garden-variety `dict`::\n-\n- >>> dict(msg)\n- {'hello': 'world'}\n-\n- Or pretty-printed::\n-\n- >>> print(str(msg))\n- {\n- \"hello\": \"world\"\n- }\n-\n- :param value: Initial value, if any.\n- :param eager:\n- When `True`, attempt to serialize or deserialize the initial value\n- immediately, so that conversion exceptions are raised during\n- the call to ``__init__()``.\n-\n- \"\"\"\n- # pylint: disable=too-many-ancestors\n-\n- def __init__(self,\n- value: Union[bytes, Mapping[str, object]] = b'{}', *,\n- eager: bool = True):\n- self._data: Optional[bytes] = None\n- self._obj: Optional[Dict[str, object]] = None\n-\n- if isinstance(value, bytes):\n- self._data = value\n- if eager:\n- self._obj = self._deserialize(self._data)\n- else:\n- self._obj = dict(value)\n- if eager:\n- self._data = self._serialize(self._obj)\n-\n- # Methods necessary to implement the MutableMapping interface, see:\n- # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping\n-\n- # We get pop, popitem, clear, update, setdefault, __contains__,\n- # keys, items, values, get, __eq__ and __ne__ for free.\n-\n- def __getitem__(self, key: str) -> object:\n- return self._object[key]\n-\n- def __setitem__(self, key: str, value: object) -> None:\n- self._object[key] = value\n- self._data = None\n-\n- def __delitem__(self, key: str) -> None:\n- del self._object[key]\n- self._data = None\n-\n- def __iter__(self) -> Iterator[str]:\n- return iter(self._object)\n-\n- def __len__(self) -> int:\n- return len(self._object)\n-\n- # Dunder methods not related to MutableMapping:\n-\n- def __repr__(self) -> str:\n- if self._obj is not None:\n- return f\"Message({self._object!r})\"\n- return f\"Message({bytes(self)!r})\"\n-\n- def __str__(self) -> str:\n- \"\"\"Pretty-printed representation of this QMP message.\"\"\"\n- return json.dumps(self._object, indent=2)\n-\n- def __bytes__(self) -> bytes:\n- \"\"\"bytes representing this QMP message.\"\"\"\n- if self._data is None:\n- self._data = self._serialize(self._obj or {})\n- return self._data\n-\n- # Conversion Methods\n-\n- @property\n- def _object(self) -> Dict[str, object]:\n- \"\"\"\n- A `dict` representing this QMP message.\n-\n- Generated on-demand, if required. This property is private\n- because it returns an object that could be used to invalidate\n- the internal state of the `Message` object.\n- \"\"\"\n- if self._obj is None:\n- self._obj = self._deserialize(self._data or b'{}')\n- return self._obj\n-\n- @classmethod\n- def _serialize(cls, value: object) -> bytes:\n- \"\"\"\n- Serialize a JSON object as `bytes`.\n-\n- :raise ValueError: When the object cannot be serialized.\n- :raise TypeError: When the object cannot be serialized.\n-\n- :return: `bytes` ready to be sent over the wire.\n- \"\"\"\n- return json.dumps(value, separators=(',', ':')).encode('utf-8')\n-\n- @classmethod\n- def _deserialize(cls, data: bytes) -> Dict[str, object]:\n- \"\"\"\n- Deserialize JSON `bytes` into a native Python `dict`.\n-\n- :raise DeserializationError:\n- If JSON deserialization fails for any reason.\n- :raise UnexpectedTypeError:\n- If the data does not represent a JSON object.\n-\n- :return: A `dict` representing this QMP message.\n- \"\"\"\n- try:\n- obj = json.loads(data)\n- except JSONDecodeError as err:\n- emsg = \"Failed to deserialize QMP message.\"\n- raise DeserializationError(emsg, data) from err\n- if not isinstance(obj, dict):\n- raise UnexpectedTypeError(\n- \"QMP message is not a JSON object.\",\n- obj\n- )\n- return obj\n-\n-\n-class DeserializationError(ProtocolError):\n- \"\"\"\n- A QMP message was not understood as JSON.\n-\n- When this Exception is raised, ``__cause__`` will be set to the\n- `json.JSONDecodeError` Exception, which can be interrogated for\n- further details.\n-\n- :param error_message: Human-readable string describing the error.\n- :param raw: The raw `bytes` that prompted the failure.\n- \"\"\"\n- def __init__(self, error_message: str, raw: bytes):\n- super().__init__(error_message, raw)\n- #: The raw `bytes` that were not understood as JSON.\n- self.raw: bytes = raw\n-\n- def __str__(self) -> str:\n- return \"\\n\".join((\n- super().__str__(),\n- f\" raw bytes were: {str(self.raw)}\",\n- ))\n-\n-\n-class UnexpectedTypeError(ProtocolError):\n- \"\"\"\n- A QMP message was JSON, but not a JSON object.\n-\n- :param error_message: Human-readable string describing the error.\n- :param value: The deserialized JSON value that wasn't an object.\n- \"\"\"\n- def __init__(self, error_message: str, value: object):\n- super().__init__(error_message, value)\n- #: The JSON value that was expected to be an object.\n- self.value: object = value\n-\n- def __str__(self) -> str:\n- strval = json.dumps(self.value, indent=2)\n- return \"\\n\".join((\n- super().__str__(),\n- f\" json value was: {strval}\",\n- ))\ndiff --git a/python/qemu/qmp/models.py b/python/qemu/qmp/models.py\ndeleted file mode 100644\nindex 7e0d0baf038..00000000000\n--- a/python/qemu/qmp/models.py\n+++ /dev/null\n@@ -1,146 +0,0 @@\n-\"\"\"\n-QMP Data Models\n-\n-This module provides simplistic data classes that represent the few\n-structures that the QMP spec mandates; they are used to verify incoming\n-data to make sure it conforms to spec.\n-\"\"\"\n-# pylint: disable=too-few-public-methods\n-\n-from collections import abc\n-import copy\n-from typing import (\n- Any,\n- Dict,\n- Mapping,\n- Optional,\n- Sequence,\n-)\n-\n-\n-class Model:\n- \"\"\"\n- Abstract data model, representing some QMP object of some kind.\n-\n- :param raw: The raw object to be validated.\n- :raise KeyError: If any required fields are absent.\n- :raise TypeError: If any required fields have the wrong type.\n- \"\"\"\n- def __init__(self, raw: Mapping[str, Any]):\n- self._raw = raw\n-\n- def _check_key(self, key: str) -> None:\n- if key not in self._raw:\n- raise KeyError(f\"'{self._name}' object requires '{key}' member\")\n-\n- def _check_value(self, key: str, type_: type, typestr: str) -> None:\n- assert key in self._raw\n- if not isinstance(self._raw[key], type_):\n- raise TypeError(\n- f\"'{self._name}' member '{key}' must be a {typestr}\"\n- )\n-\n- def _check_member(self, key: str, type_: type, typestr: str) -> None:\n- self._check_key(key)\n- self._check_value(key, type_, typestr)\n-\n- @property\n- def _name(self) -> str:\n- return type(self).__name__\n-\n- def __repr__(self) -> str:\n- return f\"{self._name}({self._raw!r})\"\n-\n-\n-class Greeting(Model):\n- \"\"\"\n- Defined in `interop/qmp-spec`, \"Server Greeting\" section.\n-\n- :param raw: The raw Greeting object.\n- :raise KeyError: If any required fields are absent.\n- :raise TypeError: If any required fields have the wrong type.\n- \"\"\"\n- def __init__(self, raw: Mapping[str, Any]):\n- super().__init__(raw)\n- #: 'QMP' member\n- self.QMP: QMPGreeting # pylint: disable=invalid-name\n-\n- self._check_member('QMP', abc.Mapping, \"JSON object\")\n- self.QMP = QMPGreeting(self._raw['QMP'])\n-\n- def _asdict(self) -> Dict[str, object]:\n- \"\"\"\n- For compatibility with the iotests sync QMP wrapper.\n-\n- The legacy QMP interface needs Greetings as a garden-variety Dict.\n-\n- This interface is private in the hopes that it will be able to\n- be dropped again in the near-future. Caller beware!\n- \"\"\"\n- return dict(copy.deepcopy(self._raw))\n-\n-\n-class QMPGreeting(Model):\n- \"\"\"\n- Defined in `interop/qmp-spec`, \"Server Greeting\" section.\n-\n- :param raw: The raw QMPGreeting object.\n- :raise KeyError: If any required fields are absent.\n- :raise TypeError: If any required fields have the wrong type.\n- \"\"\"\n- def __init__(self, raw: Mapping[str, Any]):\n- super().__init__(raw)\n- #: 'version' member\n- self.version: Mapping[str, object]\n- #: 'capabilities' member\n- self.capabilities: Sequence[object]\n-\n- self._check_member('version', abc.Mapping, \"JSON object\")\n- self.version = self._raw['version']\n-\n- self._check_member('capabilities', abc.Sequence, \"JSON array\")\n- self.capabilities = self._raw['capabilities']\n-\n-\n-class ErrorResponse(Model):\n- \"\"\"\n- Defined in `interop/qmp-spec`, \"Error\" section.\n-\n- :param raw: The raw ErrorResponse object.\n- :raise KeyError: If any required fields are absent.\n- :raise TypeError: If any required fields have the wrong type.\n- \"\"\"\n- def __init__(self, raw: Mapping[str, Any]):\n- super().__init__(raw)\n- #: 'error' member\n- self.error: ErrorInfo\n- #: 'id' member\n- self.id: Optional[object] = None # pylint: disable=invalid-name\n-\n- self._check_member('error', abc.Mapping, \"JSON object\")\n- self.error = ErrorInfo(self._raw['error'])\n-\n- if 'id' in raw:\n- self.id = raw['id']\n-\n-\n-class ErrorInfo(Model):\n- \"\"\"\n- Defined in `interop/qmp-spec`, \"Error\" section.\n-\n- :param raw: The raw ErrorInfo object.\n- :raise KeyError: If any required fields are absent.\n- :raise TypeError: If any required fields have the wrong type.\n- \"\"\"\n- def __init__(self, raw: Mapping[str, Any]):\n- super().__init__(raw)\n- #: 'class' member, with an underscore to avoid conflicts in Python.\n- self.class_: str\n- #: 'desc' member\n- self.desc: str\n-\n- self._check_member('class', str, \"string\")\n- self.class_ = self._raw['class']\n-\n- self._check_member('desc', str, \"string\")\n- self.desc = self._raw['desc']\ndiff --git a/python/qemu/qmp/protocol.py b/python/qemu/qmp/protocol.py\ndeleted file mode 100644\nindex 219d092a792..00000000000\n--- a/python/qemu/qmp/protocol.py\n+++ /dev/null\n@@ -1,1101 +0,0 @@\n-\"\"\"\n-Generic Asynchronous Message-based Protocol Support\n-\n-This module provides a generic framework for sending and receiving\n-messages over an asyncio stream. `AsyncProtocol` is an abstract class\n-that implements the core mechanisms of a simple send/receive protocol,\n-and is designed to be extended.\n-\n-In this package, it is used as the implementation for the `QMPClient`\n-class.\n-\"\"\"\n-\n-# It's all the docstrings ... ! It's long for a good reason ^_^;\n-# pylint: disable=too-many-lines\n-\n-import asyncio\n-from asyncio import StreamReader, StreamWriter\n-from contextlib import asynccontextmanager\n-from enum import Enum\n-from functools import wraps\n-from inspect import iscoroutinefunction\n-import logging\n-import socket\n-from ssl import SSLContext\n-from typing import (\n- Any,\n- AsyncGenerator,\n- Awaitable,\n- Callable,\n- Generic,\n- List,\n- Optional,\n- Tuple,\n- TypeVar,\n- Union,\n- cast,\n-)\n-\n-from .error import QMPError\n-from .util import (\n- bottom_half,\n- exception_summary,\n- flush,\n- pretty_traceback,\n- upper_half,\n-)\n-\n-\n-T = TypeVar('T')\n-_U = TypeVar('_U')\n-_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``\n-\n-InternetAddrT = Tuple[str, int]\n-UnixAddrT = str\n-SocketAddrT = Union[UnixAddrT, InternetAddrT]\n-\n-# Maximum allowable size of read buffer, default\n-_DEFAULT_READBUFLEN = 64 * 1024\n-\n-\n-class Runstate(Enum):\n- \"\"\"Protocol session runstate.\"\"\"\n-\n- #: Fully quiesced and disconnected.\n- IDLE = 0\n- #: In the process of connecting or establishing a session.\n- CONNECTING = 1\n- #: Fully connected and active session.\n- RUNNING = 2\n- #: In the process of disconnecting.\n- #: Runstate may be returned to `IDLE` by calling `disconnect()`.\n- DISCONNECTING = 3\n-\n-\n-class ConnectError(QMPError):\n- \"\"\"\n- Raised when the initial connection process has failed.\n-\n- This Exception always wraps a \"root cause\" exception that can be\n- interrogated for additional information.\n-\n- For example, when connecting to a non-existent socket::\n-\n- await qmp.connect('not_found.sock')\n- # ConnectError: Failed to establish connection:\n- # [Errno 2] No such file or directory\n-\n- :param error_message: Human-readable string describing the error.\n- :param exc: The root-cause exception.\n- \"\"\"\n- def __init__(self, error_message: str, exc: Exception):\n- super().__init__(error_message, exc)\n- #: Human-readable error string\n- self.error_message: str = error_message\n- #: Wrapped root cause exception\n- self.exc: Exception = exc\n-\n- def __str__(self) -> str:\n- cause = str(self.exc)\n- if not cause:\n- # If there's no error string, use the exception name.\n- cause = exception_summary(self.exc)\n- return f\"{self.error_message}: {cause}\"\n-\n-\n-class StateError(QMPError):\n- \"\"\"\n- An API command (connect, execute, etc) was issued at an inappropriate time.\n-\n- This error is raised when a command like\n- :py:meth:`~AsyncProtocol.connect()` is called when the client is\n- already connected.\n-\n- :param error_message: Human-readable string describing the state violation.\n- :param state: The actual `Runstate` seen at the time of the violation.\n- :param required: The `Runstate` required to process this command.\n- \"\"\"\n- def __init__(self, error_message: str,\n- state: Runstate, required: Runstate):\n- super().__init__(error_message, state, required)\n- self.error_message = error_message\n- self.state = state\n- self.required = required\n-\n- def __str__(self) -> str:\n- return self.error_message\n-\n-\n-F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name\n-\n-\n-# Don't Panic.\n-def require(required_state: Runstate) -> Callable[[F], F]:\n- \"\"\"\n- Decorator: protect a method so it can only be run in a certain `Runstate`.\n-\n- :param required_state: The `Runstate` required to invoke this method.\n- :raise StateError: When the required `Runstate` is not met.\n- \"\"\"\n- def _check(proto: 'AsyncProtocol[Any]') -> None:\n- name = type(proto).__name__\n- if proto.runstate == required_state:\n- return\n-\n- if proto.runstate == Runstate.CONNECTING:\n- emsg = f\"{name} is currently connecting.\"\n- elif proto.runstate == Runstate.DISCONNECTING:\n- emsg = (f\"{name} is disconnecting.\"\n- \" Call disconnect() to return to IDLE state.\")\n- elif proto.runstate == Runstate.RUNNING:\n- emsg = f\"{name} is already connected and running.\"\n- elif proto.runstate == Runstate.IDLE:\n- emsg = f\"{name} is disconnected and idle.\"\n- else:\n- assert False\n-\n- raise StateError(emsg, proto.runstate, required_state)\n-\n- def _decorator(func: F) -> F:\n- # _decorator is the decorator that is built by calling the\n- # require() decorator factory; e.g.:\n- #\n- # @require(Runstate.IDLE) def foo(): ...\n- # will replace 'foo' with the result of '_decorator(foo)'.\n-\n- @wraps(func)\n- def _wrapper(proto: 'AsyncProtocol[Any]',\n- *args: Any, **kwargs: Any) -> Any:\n- _check(proto)\n- return func(proto, *args, **kwargs)\n-\n- @wraps(func)\n- async def _async_wrapper(proto: 'AsyncProtocol[Any]',\n- *args: Any, **kwargs: Any) -> Any:\n- _check(proto)\n- return await func(proto, *args, **kwargs)\n-\n- # Return the decorated method; F => Decorated[F]\n- # Use an async version when applicable, which\n- # preserves async signature generation in sphinx.\n- if iscoroutinefunction(func):\n- return cast(F, _async_wrapper)\n- return cast(F, _wrapper)\n-\n- # Return the decorator instance from the decorator factory. Phew!\n- return _decorator\n-\n-\n-class AsyncProtocol(Generic[T]):\n- \"\"\"\n- AsyncProtocol implements a generic async message-based protocol.\n-\n- This protocol assumes the basic unit of information transfer between\n- client and server is a \"message\", the details of which are left up\n- to the implementation. It assumes the sending and receiving of these\n- messages is full-duplex and not necessarily correlated; i.e. it\n- supports asynchronous inbound messages.\n-\n- It is designed to be extended by a specific protocol which provides\n- the implementations for how to read and send messages. These must be\n- defined in `_do_recv()` and `_do_send()`, respectively.\n-\n- Other callbacks have a default implementation, but are intended to be\n- either extended or overridden:\n-\n- - `_establish_session`:\n- The base implementation starts the reader/writer tasks.\n- A protocol implementation can override this call, inserting\n- actions to be taken prior to starting the reader/writer tasks\n- before the super() call; actions needing to occur afterwards\n- can be written after the super() call.\n- - `_on_message`:\n- Actions to be performed when a message is received.\n- - `_cb_outbound`:\n- Logging/Filtering hook for all outbound messages.\n- - `_cb_inbound`:\n- Logging/Filtering hook for all inbound messages.\n- This hook runs *before* `_on_message()`.\n-\n- :param name:\n- Name used for logging messages, if any. By default, messages\n- will log to 'qemu.qmp.protocol', but each individual connection\n- can be given its own logger by giving it a name; messages will\n- then log to 'qemu.qmp.protocol.${name}'.\n- :param readbuflen:\n- The maximum read buffer length of the underlying StreamReader\n- instance.\n- \"\"\"\n- # pylint: disable=too-many-instance-attributes\n-\n- #: Logger object for debugging messages from this connection.\n- logger = logging.getLogger(__name__)\n-\n- # -------------------------\n- # Section: Public interface\n- # -------------------------\n-\n- def __init__(\n- self, name: Optional[str] = None,\n- readbuflen: int = _DEFAULT_READBUFLEN\n- ) -> None:\n- self._name: Optional[str]\n- self.name = name\n- self.readbuflen = readbuflen\n-\n- # stream I/O\n- self._reader: Optional[StreamReader] = None\n- self._writer: Optional[StreamWriter] = None\n-\n- # Outbound Message queue\n- self._outgoing: asyncio.Queue[T]\n-\n- # Special, long-running tasks:\n- self._reader_task: Optional[asyncio.Future[None]] = None\n- self._writer_task: Optional[asyncio.Future[None]] = None\n-\n- # Aggregate of the above two tasks, used for Exception management.\n- self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None\n-\n- #: Disconnect task. The disconnect implementation runs in a task\n- #: so that asynchronous disconnects (initiated by the\n- #: reader/writer) are allowed to wait for the reader/writers to\n- #: exit.\n- self._dc_task: Optional[asyncio.Future[None]] = None\n-\n- self._runstate = Runstate.IDLE\n- self._runstate_changed: Optional[asyncio.Event] = None\n-\n- # Server state for start_server() and _incoming()\n- self._server: Optional[asyncio.AbstractServer] = None\n- self._accepted: Optional[asyncio.Event] = None\n-\n- def __repr__(self) -> str:\n- cls_name = type(self).__name__\n- tokens = []\n- if self.name is not None:\n- tokens.append(f\"name={self.name!r}\")\n- tokens.append(f\"runstate={self.runstate.name}\")\n- return f\"<{cls_name} {' '.join(tokens)}>\"\n-\n- @property\n- def name(self) -> Optional[str]:\n- \"\"\"\n- The nickname for this connection, if any.\n-\n- This name is used for differentiating instances in debug output.\n- \"\"\"\n- return self._name\n-\n- @name.setter\n- def name(self, name: Optional[str]) -> None:\n- logger = logging.getLogger(__name__)\n- if name:\n- self.logger = logger.getChild(name)\n- else:\n- self.logger = logger\n- self._name = name\n-\n- @property # @upper_half\n- def runstate(self) -> Runstate:\n- \"\"\"The current `Runstate` of the connection.\"\"\"\n- return self._runstate\n-\n- @upper_half\n- async def runstate_changed(self) -> Runstate:\n- \"\"\"\n- Wait for the `runstate` to change, then return that `Runstate`.\n- \"\"\"\n- await self._runstate_event.wait()\n- return self.runstate\n-\n- @upper_half\n- @require(Runstate.IDLE)\n- async def start_server_and_accept(\n- self, address: SocketAddrT,\n- ssl: Optional[SSLContext] = None\n- ) -> None:\n- \"\"\"\n- Accept a connection and begin processing message queues.\n-\n- If this call fails, `runstate` is guaranteed to be set back to\n- `IDLE`. This method is precisely equivalent to calling\n- `start_server()` followed by :py:meth:`~AsyncProtocol.accept()`.\n-\n- :param address:\n- Address to listen on; UNIX socket path or TCP address/port.\n- :param ssl: SSL context to use, if any.\n-\n- :raise StateError: When the `Runstate` is not `IDLE`.\n- :raise ConnectError:\n- When a connection or session cannot be established.\n-\n- This exception will wrap a more concrete one. In most cases,\n- the wrapped exception will be `OSError` or `EOFError`. If a\n- protocol-level failure occurs while establishing a new\n- session, the wrapped error may also be a `QMPError`.\n-\n- \"\"\"\n- await self.start_server(address, ssl)\n- await self.accept()\n- assert self.runstate == Runstate.RUNNING\n-\n- @upper_half\n- @require(Runstate.IDLE)\n- async def start_server(self, address: SocketAddrT,\n- ssl: Optional[SSLContext] = None) -> None:\n- \"\"\"\n- Start listening for an incoming connection, but do not wait for a peer.\n-\n- This method starts listening for an incoming connection, but\n- does not block waiting for a peer. This call will return\n- immediately after binding and listening on a socket. A later\n- call to :py:meth:`~AsyncProtocol.accept()` must be made in order\n- to finalize the incoming connection.\n-\n- :param address:\n- Address to listen on; UNIX socket path or TCP address/port.\n- :param ssl: SSL context to use, if any.\n-\n- :raise StateError: When the `Runstate` is not `IDLE`.\n- :raise ConnectError:\n- When the server could not start listening on this address.\n-\n- This exception will wrap a more concrete one. In most cases,\n- the wrapped exception will be `OSError`.\n- \"\"\"\n- async with self._session_guard('Failed to establish connection'):\n- await self._do_start_server(address, ssl)\n- assert self.runstate == Runstate.CONNECTING\n-\n- @upper_half\n- @require(Runstate.CONNECTING)\n- async def accept(self) -> None:\n- \"\"\"\n- Accept an incoming connection and begin processing message queues.\n-\n- Used after a previous call to `start_server()` to accept an\n- incoming connection. If this call fails, `runstate` is\n- guaranteed to be set back to `IDLE`.\n-\n- :raise StateError: When the `Runstate` is not `CONNECTING`.\n- :raise QMPError: When `start_server()` was not called first.\n- :raise ConnectError:\n- When a connection or session cannot be established.\n-\n- This exception will wrap a more concrete one. In most cases,\n- the wrapped exception will be `OSError` or `EOFError`. If a\n- protocol-level failure occurs while establishing a new\n- session, the wrapped error may also be an `QMPError`.\n- \"\"\"\n- if self._accepted is None:\n- raise QMPError(\"Cannot call accept() before start_server().\")\n- async with self._session_guard('Failed to establish connection'):\n- await self._do_accept()\n- async with self._session_guard('Failed to establish session'):\n- await self._establish_session()\n- assert self.runstate == Runstate.RUNNING\n-\n- @upper_half\n- @require(Runstate.IDLE)\n- async def connect(self, address: Union[SocketAddrT, socket.socket],\n- ssl: Optional[SSLContext] = None) -> None:\n- \"\"\"\n- Connect to the server and begin processing message queues.\n-\n- If this call fails, `runstate` is guaranteed to be set back to `IDLE`.\n-\n- :param address:\n- Address to connect to; UNIX socket path or TCP address/port.\n- :param ssl: SSL context to use, if any.\n-\n- :raise StateError: When the `Runstate` is not `IDLE`.\n- :raise ConnectError:\n- When a connection or session cannot be established.\n-\n- This exception will wrap a more concrete one. In most cases,\n- the wrapped exception will be `OSError` or `EOFError`. If a\n- protocol-level failure occurs while establishing a new\n- session, the wrapped error may also be an `QMPError`.\n- \"\"\"\n- async with self._session_guard('Failed to establish connection'):\n- await self._do_connect(address, ssl)\n- async with self._session_guard('Failed to establish session'):\n- await self._establish_session()\n- assert self.runstate == Runstate.RUNNING\n-\n- @upper_half\n- async def disconnect(self) -> None:\n- \"\"\"\n- Disconnect and wait for all tasks to fully stop.\n-\n- If there was an exception that caused the reader/writers to\n- terminate prematurely, it will be raised here.\n-\n- :raise Exception:\n- When the reader or writer terminate unexpectedly. You can\n- expect to see `EOFError` if the server hangs up, or\n- `OSError` for connection-related issues. If there was a QMP\n- protocol-level problem, `ProtocolError` will be seen.\n- \"\"\"\n- self.logger.debug(\"disconnect() called.\")\n- self._schedule_disconnect()\n- await self._wait_disconnect()\n-\n- # --------------------------\n- # Section: Session machinery\n- # --------------------------\n-\n- @asynccontextmanager\n- async def _session_guard(self, emsg: str) -> AsyncGenerator[None, None]:\n- \"\"\"\n- Async guard function used to roll back to `IDLE` on any error.\n-\n- On any Exception, the state machine will be reset back to\n- `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but\n- `BaseException` events will be left alone (This includes\n- asyncio.CancelledError, even prior to Python 3.8).\n-\n- :param error_message:\n- Human-readable string describing what connection phase failed.\n-\n- :raise BaseException:\n- When `BaseException` occurs in the guarded block.\n- :raise ConnectError:\n- When any other error is encountered in the guarded block.\n- \"\"\"\n- try:\n- # Caller's code runs here.\n- yield\n- except BaseException as err:\n- self.logger.error(\"%s: %s\", emsg, exception_summary(err))\n- self.logger.debug(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- try:\n- # Reset the runstate back to IDLE.\n- await self.disconnect()\n- except:\n- # We don't expect any Exceptions from the disconnect function\n- # here, because we failed to connect in the first place.\n- # The disconnect() function is intended to perform\n- # only cannot-fail cleanup here, but you never know.\n- emsg = (\n- \"Unexpected bottom half exception. \"\n- \"This is a bug in the QMP library. \"\n- \"Please report it to <qemu-devel@nongnu.org> and \"\n- \"CC: John Snow <jsnow@redhat.com>.\"\n- )\n- self.logger.critical(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- raise\n-\n- # CancelledError is an Exception with special semantic meaning;\n- # We do NOT want to wrap it up under ConnectError.\n- # NB: CancelledError is not a BaseException before Python 3.8\n- if isinstance(err, asyncio.CancelledError):\n- raise\n-\n- # Any other kind of error can be treated as some kind of connection\n- # failure broadly. Inspect the 'exc' field to explore the root\n- # cause in greater detail.\n- if isinstance(err, Exception):\n- raise ConnectError(emsg, err) from err\n-\n- # Raise BaseExceptions un-wrapped, they're more important.\n- raise\n-\n- @property\n- def _runstate_event(self) -> asyncio.Event:\n- # asyncio.Event() objects should not be created prior to entrance into\n- # an event loop, so we can ensure we create it in the correct context.\n- # Create it on-demand *only* at the behest of an 'async def' method.\n- if not self._runstate_changed:\n- self._runstate_changed = asyncio.Event()\n- return self._runstate_changed\n-\n- @upper_half\n- @bottom_half\n- def _set_state(self, state: Runstate) -> None:\n- \"\"\"\n- Change the `Runstate` of the protocol connection.\n-\n- Signals the `runstate_changed` event.\n- \"\"\"\n- if state == self._runstate:\n- return\n-\n- self.logger.debug(\"Transitioning from '%s' to '%s'.\",\n- str(self._runstate), str(state))\n- self._runstate = state\n- self._runstate_event.set()\n- self._runstate_event.clear()\n-\n- @bottom_half\n- async def _stop_server(self) -> None:\n- \"\"\"\n- Stop listening for / accepting new incoming connections.\n- \"\"\"\n- if self._server is None:\n- return\n-\n- try:\n- self.logger.debug(\"Stopping server.\")\n- self._server.close()\n- self.logger.debug(\"Server stopped.\")\n- finally:\n- self._server = None\n-\n- @bottom_half # However, it does not run from the R/W tasks.\n- async def _incoming(self,\n- reader: asyncio.StreamReader,\n- writer: asyncio.StreamWriter) -> None:\n- \"\"\"\n- Accept an incoming connection and signal the upper_half.\n-\n- This method does the minimum necessary to accept a single\n- incoming connection. It signals back to the upper_half ASAP so\n- that any errors during session initialization can occur\n- naturally in the caller's stack.\n-\n- :param reader: Incoming `asyncio.StreamReader`\n- :param writer: Incoming `asyncio.StreamWriter`\n- \"\"\"\n- peer = writer.get_extra_info('peername', 'Unknown peer')\n- self.logger.debug(\"Incoming connection from %s\", peer)\n-\n- if self._reader or self._writer:\n- # Sadly, we can have more than one pending connection\n- # because of https://bugs.python.org/issue46715\n- # Close any extra connections we don't actually want.\n- self.logger.warning(\"Extraneous connection inadvertently accepted\")\n- writer.close()\n- return\n-\n- # A connection has been accepted; stop listening for new ones.\n- assert self._accepted is not None\n- await self._stop_server()\n- self._reader, self._writer = (reader, writer)\n- self._accepted.set()\n-\n- @upper_half\n- async def _do_start_server(self, address: SocketAddrT,\n- ssl: Optional[SSLContext] = None) -> None:\n- \"\"\"\n- Start listening for an incoming connection, but do not wait for a peer.\n-\n- This method starts listening for an incoming connection, but does not\n- block waiting for a peer. This call will return immediately after\n- binding and listening to a socket. A later call to accept() must be\n- made in order to finalize the incoming connection.\n-\n- :param address:\n- Address to listen on; UNIX socket path or TCP address/port.\n- :param ssl: SSL context to use, if any.\n-\n- :raise OSError: For stream-related errors.\n- \"\"\"\n- assert self.runstate == Runstate.IDLE\n- self._set_state(Runstate.CONNECTING)\n-\n- self.logger.debug(\"Awaiting connection on %s ...\", address)\n- self._accepted = asyncio.Event()\n-\n- if isinstance(address, tuple):\n- coro = asyncio.start_server(\n- self._incoming,\n- host=address[0],\n- port=address[1],\n- ssl=ssl,\n- backlog=1,\n- limit=self.readbuflen,\n- )\n- else:\n- coro = asyncio.start_unix_server(\n- self._incoming,\n- path=address,\n- ssl=ssl,\n- backlog=1,\n- limit=self.readbuflen,\n- )\n-\n- # Allow runstate watchers to witness 'CONNECTING' state; some\n- # failures in the streaming layer are synchronous and will not\n- # otherwise yield.\n- await asyncio.sleep(0)\n-\n- # This will start the server (bind(2), listen(2)). It will also\n- # call accept(2) if we yield, but we don't block on that here.\n- self._server = await coro\n- self.logger.debug(\"Server listening on %s\", address)\n-\n- @upper_half\n- async def _do_accept(self) -> None:\n- \"\"\"\n- Wait for and accept an incoming connection.\n-\n- Requires that we have not yet accepted an incoming connection\n- from the upper_half, but it's OK if the server is no longer\n- running because the bottom_half has already accepted the\n- connection.\n- \"\"\"\n- assert self._accepted is not None\n- await self._accepted.wait()\n- assert self._server is None\n- self._accepted = None\n-\n- self.logger.debug(\"Connection accepted.\")\n-\n- @upper_half\n- async def _do_connect(self, address: Union[SocketAddrT, socket.socket],\n- ssl: Optional[SSLContext] = None) -> None:\n- \"\"\"\n- Acting as the transport client, initiate a connection to a server.\n-\n- :param address:\n- Address to connect to; UNIX socket path or TCP address/port.\n- :param ssl: SSL context to use, if any.\n-\n- :raise OSError: For stream-related errors.\n- \"\"\"\n- assert self.runstate == Runstate.IDLE\n- self._set_state(Runstate.CONNECTING)\n-\n- # Allow runstate watchers to witness 'CONNECTING' state; some\n- # failures in the streaming layer are synchronous and will not\n- # otherwise yield.\n- await asyncio.sleep(0)\n-\n- if isinstance(address, socket.socket):\n- self.logger.debug(\"Connecting with existing socket: \"\n- \"fd=%d, family=%r, type=%r\",\n- address.fileno(), address.family, address.type)\n- connect = asyncio.open_connection(\n- limit=self.readbuflen,\n- ssl=ssl,\n- sock=address,\n- )\n- elif isinstance(address, tuple):\n- self.logger.debug(\"Connecting to %s ...\", address)\n- connect = asyncio.open_connection(\n- address[0],\n- address[1],\n- ssl=ssl,\n- limit=self.readbuflen,\n- )\n- else:\n- self.logger.debug(\"Connecting to file://%s ...\", address)\n- connect = asyncio.open_unix_connection(\n- path=address,\n- ssl=ssl,\n- limit=self.readbuflen,\n- )\n-\n- self._reader, self._writer = await connect\n- self.logger.debug(\"Connected.\")\n-\n- @upper_half\n- async def _establish_session(self) -> None:\n- \"\"\"\n- Establish a new session.\n-\n- Starts the readers/writer tasks; subclasses may perform their\n- own negotiations here. The Runstate will be RUNNING upon\n- successful conclusion.\n- \"\"\"\n- assert self.runstate == Runstate.CONNECTING\n-\n- self._outgoing = asyncio.Queue()\n-\n- reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')\n- writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')\n-\n- self._reader_task = asyncio.create_task(reader_coro)\n- self._writer_task = asyncio.create_task(writer_coro)\n-\n- self._bh_tasks = asyncio.gather(\n- self._reader_task,\n- self._writer_task,\n- )\n-\n- self._set_state(Runstate.RUNNING)\n- await asyncio.sleep(0) # Allow runstate_event to process\n-\n- @upper_half\n- @bottom_half\n- def _schedule_disconnect(self) -> None:\n- \"\"\"\n- Initiate a disconnect; idempotent.\n-\n- This method is used both in the upper-half as a direct\n- consequence of `disconnect()`, and in the bottom-half in the\n- case of unhandled exceptions in the reader/writer tasks.\n-\n- It can be invoked no matter what the `runstate` is.\n- \"\"\"\n- if not self._dc_task:\n- self._set_state(Runstate.DISCONNECTING)\n- self.logger.debug(\"Scheduling disconnect.\")\n- self._dc_task = asyncio.create_task(self._bh_disconnect())\n-\n- @upper_half\n- async def _wait_disconnect(self) -> None:\n- \"\"\"\n- Waits for a previously scheduled disconnect to finish.\n-\n- This method will gather any bottom half exceptions and re-raise\n- the one that occurred first; presuming it to be the root cause\n- of any subsequent Exceptions. It is intended to be used in the\n- upper half of the call chain.\n-\n- :raise Exception:\n- Arbitrary exception re-raised on behalf of the reader/writer.\n- \"\"\"\n- assert self.runstate == Runstate.DISCONNECTING\n- assert self._dc_task\n-\n- aws: List[Awaitable[object]] = [self._dc_task]\n- if self._bh_tasks:\n- aws.insert(0, self._bh_tasks)\n- all_defined_tasks = asyncio.gather(*aws)\n-\n- # Ensure disconnect is done; Exception (if any) is not raised here:\n- await asyncio.wait((self._dc_task,))\n-\n- try:\n- await all_defined_tasks # Raise Exceptions from the bottom half.\n- finally:\n- self._cleanup()\n- self._set_state(Runstate.IDLE)\n-\n- @upper_half\n- def _cleanup(self) -> None:\n- \"\"\"\n- Fully reset this object to a clean state and return to `IDLE`.\n- \"\"\"\n- def _paranoid_task_erase(task: Optional['asyncio.Future[_U]']\n- ) -> Optional['asyncio.Future[_U]']:\n- # Help to erase a task, ENSURING it is fully quiesced first.\n- assert (task is None) or task.done()\n- return None if (task and task.done()) else task\n-\n- assert self.runstate == Runstate.DISCONNECTING\n- self._dc_task = _paranoid_task_erase(self._dc_task)\n- self._reader_task = _paranoid_task_erase(self._reader_task)\n- self._writer_task = _paranoid_task_erase(self._writer_task)\n- self._bh_tasks = _paranoid_task_erase(self._bh_tasks)\n-\n- self._reader = None\n- self._writer = None\n- self._accepted = None\n-\n- # NB: _runstate_changed cannot be cleared because we still need it to\n- # send the final runstate changed event ...!\n-\n- # ----------------------------\n- # Section: Bottom Half methods\n- # ----------------------------\n-\n- @bottom_half\n- async def _bh_disconnect(self) -> None:\n- \"\"\"\n- Disconnect and cancel all outstanding tasks.\n-\n- It is designed to be called from its task context,\n- :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,\n- it is free to wait on any pending actions that may still need to\n- occur in either the reader or writer tasks.\n- \"\"\"\n- assert self.runstate == Runstate.DISCONNECTING\n-\n- def _done(task: Optional['asyncio.Future[Any]']) -> bool:\n- return task is not None and task.done()\n-\n- # If the server is running, stop it.\n- await self._stop_server()\n-\n- # Are we already in an error pathway? If either of the tasks are\n- # already done, or if we have no tasks but a reader/writer; we\n- # must be.\n- #\n- # NB: We can't use _bh_tasks to check for premature task\n- # completion, because it may not yet have had a chance to run\n- # and gather itself.\n- tasks = tuple(filter(None, (self._writer_task, self._reader_task)))\n- error_pathway = _done(self._reader_task) or _done(self._writer_task)\n- if not tasks:\n- error_pathway |= bool(self._reader) or bool(self._writer)\n-\n- try:\n- # Try to flush the writer, if possible.\n- # This *may* cause an error and force us over into the error path.\n- if not error_pathway:\n- await self._bh_flush_writer()\n- except BaseException as err:\n- error_pathway = True\n- emsg = \"Failed to flush the writer\"\n- self.logger.error(\"%s: %s\", emsg, exception_summary(err))\n- self.logger.debug(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- raise\n- finally:\n- # Cancel any still-running tasks (Won't raise):\n- if self._writer_task is not None and not self._writer_task.done():\n- self.logger.debug(\"Cancelling writer task.\")\n- self._writer_task.cancel()\n- if self._reader_task is not None and not self._reader_task.done():\n- self.logger.debug(\"Cancelling reader task.\")\n- self._reader_task.cancel()\n-\n- # Close out the tasks entirely (Won't raise):\n- if tasks:\n- self.logger.debug(\"Waiting for tasks to complete ...\")\n- await asyncio.wait(tasks)\n-\n- # Lastly, close the stream itself. (*May raise*!):\n- await self._bh_close_stream(error_pathway)\n- self.logger.debug(\"Disconnected.\")\n-\n- @bottom_half\n- async def _bh_flush_writer(self) -> None:\n- if not self._writer_task:\n- return\n-\n- self.logger.debug(\"Draining the outbound queue ...\")\n- await self._outgoing.join()\n- if self._writer is not None:\n- self.logger.debug(\"Flushing the StreamWriter ...\")\n- await flush(self._writer)\n-\n- @bottom_half\n- async def _bh_close_stream(self, error_pathway: bool = False) -> None:\n- # NB: Closing the writer also implicitly closes the reader.\n- if not self._writer:\n- return\n-\n- if not self._writer.is_closing():\n- self.logger.debug(\"Closing StreamWriter.\")\n- self._writer.close()\n-\n- self.logger.debug(\"Waiting for StreamWriter to close ...\")\n- try:\n- await self._writer.wait_closed()\n- except Exception: # pylint: disable=broad-except\n- # It's hard to tell if the Stream is already closed or\n- # not. Even if one of the tasks has failed, it may have\n- # failed for a higher-layered protocol reason. The\n- # stream could still be open and perfectly fine.\n- # I don't know how to discern its health here.\n-\n- if error_pathway:\n- # We already know that *something* went wrong. Let's\n- # just trust that the Exception we already have is the\n- # better one to present to the user, even if we don't\n- # genuinely *know* the relationship between the two.\n- self.logger.debug(\n- \"Discarding Exception from wait_closed:\\n%s\\n\",\n- pretty_traceback(),\n- )\n- else:\n- # Oops, this is a brand-new error!\n- raise\n- finally:\n- self.logger.debug(\"StreamWriter closed.\")\n-\n- @bottom_half\n- async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:\n- \"\"\"\n- Run one of the bottom-half methods in a loop forever.\n-\n- If the bottom half ever raises any exception, schedule a\n- disconnect that will terminate the entire loop.\n-\n- :param async_fn: The bottom-half method to run in a loop.\n- :param name: The name of this task, used for logging.\n- \"\"\"\n- try:\n- while True:\n- await async_fn()\n- except asyncio.CancelledError:\n- # We have been cancelled by _bh_disconnect, exit gracefully.\n- self.logger.debug(\"Task.%s: cancelled.\", name)\n- return\n- except BaseException as err:\n- self.logger.log(\n- logging.INFO if isinstance(err, EOFError) else logging.ERROR,\n- \"Task.%s: %s\",\n- name, exception_summary(err)\n- )\n- self.logger.debug(\"Task.%s: failure:\\n%s\\n\",\n- name, pretty_traceback())\n- self._schedule_disconnect()\n- raise\n- finally:\n- self.logger.debug(\"Task.%s: exiting.\", name)\n-\n- @bottom_half\n- async def _bh_send_message(self) -> None:\n- \"\"\"\n- Wait for an outgoing message, then send it.\n-\n- Designed to be run in `_bh_loop_forever()`.\n- \"\"\"\n- msg = await self._outgoing.get()\n- try:\n- await self._send(msg)\n- finally:\n- self._outgoing.task_done()\n-\n- @bottom_half\n- async def _bh_recv_message(self) -> None:\n- \"\"\"\n- Wait for an incoming message and call `_on_message` to route it.\n-\n- Designed to be run in `_bh_loop_forever()`.\n- \"\"\"\n- msg = await self._recv()\n- await self._on_message(msg)\n-\n- # --------------------\n- # Section: Message I/O\n- # --------------------\n-\n- @upper_half\n- @bottom_half\n- def _cb_outbound(self, msg: T) -> T:\n- \"\"\"\n- Callback: outbound message hook.\n-\n- This is intended for subclasses to be able to add arbitrary\n- hooks to filter or manipulate outgoing messages. The base\n- implementation does nothing but log the message without any\n- manipulation of the message.\n-\n- :param msg: raw outbound message\n- :return: final outbound message\n- \"\"\"\n- self.logger.debug(\"--> %s\", str(msg))\n- return msg\n-\n- @upper_half\n- @bottom_half\n- def _cb_inbound(self, msg: T) -> T:\n- \"\"\"\n- Callback: inbound message hook.\n-\n- This is intended for subclasses to be able to add arbitrary\n- hooks to filter or manipulate incoming messages. The base\n- implementation does nothing but log the message without any\n- manipulation of the message.\n-\n- This method does not \"handle\" incoming messages; it is a filter.\n- The actual \"endpoint\" for incoming messages is `_on_message()`.\n-\n- :param msg: raw inbound message\n- :return: processed inbound message\n- \"\"\"\n- self.logger.debug(\"<-- %s\", str(msg))\n- return msg\n-\n- @upper_half\n- @bottom_half\n- async def _readline(self) -> bytes:\n- \"\"\"\n- Wait for a newline from the incoming reader.\n-\n- This method is provided as a convenience for upper-layer\n- protocols, as many are line-based.\n-\n- This method *may* return a sequence of bytes without a trailing\n- newline if EOF occurs, but *some* bytes were received. In this\n- case, the next call will raise `EOFError`. It is assumed that\n- the layer 5 protocol will decide if there is anything meaningful\n- to be done with a partial message.\n-\n- :raise OSError: For stream-related errors.\n- :raise EOFError:\n- If the reader stream is at EOF and there are no bytes to return.\n- :return: bytes, including the newline.\n- \"\"\"\n- assert self._reader is not None\n- msg_bytes = await self._reader.readline()\n-\n- if not msg_bytes:\n- if self._reader.at_eof():\n- raise EOFError\n-\n- return msg_bytes\n-\n- @upper_half\n- @bottom_half\n- async def _do_recv(self) -> T:\n- \"\"\"\n- Abstract: Read from the stream and return a message.\n-\n- Very low-level; intended to only be called by `_recv()`.\n- \"\"\"\n- raise NotImplementedError\n-\n- @upper_half\n- @bottom_half\n- async def _recv(self) -> T:\n- \"\"\"\n- Read an arbitrary protocol message.\n-\n- .. warning::\n- This method is intended primarily for `_bh_recv_message()`\n- to use in an asynchronous task loop. Using it outside of\n- this loop will \"steal\" messages from the normal routing\n- mechanism. It is safe to use prior to `_establish_session()`,\n- but should not be used otherwise.\n-\n- This method uses `_do_recv()` to retrieve the raw message, and\n- then transforms it using `_cb_inbound()`.\n-\n- :return: A single (filtered, processed) protocol message.\n- \"\"\"\n- message = await self._do_recv()\n- return self._cb_inbound(message)\n-\n- @upper_half\n- @bottom_half\n- def _do_send(self, msg: T) -> None:\n- \"\"\"\n- Abstract: Write a message to the stream.\n-\n- Very low-level; intended to only be called by `_send()`.\n- \"\"\"\n- raise NotImplementedError\n-\n- @upper_half\n- @bottom_half\n- async def _send(self, msg: T) -> None:\n- \"\"\"\n- Send an arbitrary protocol message.\n-\n- This method will transform any outgoing messages according to\n- `_cb_outbound()`.\n-\n- .. warning::\n- Like `_recv()`, this method is intended to be called by\n- the writer task loop that processes outgoing\n- messages. Calling it directly may circumvent logic\n- implemented by the caller meant to correlate outgoing and\n- incoming messages.\n-\n- :raise OSError: For problems with the underlying stream.\n- \"\"\"\n- msg = self._cb_outbound(msg)\n- self._do_send(msg)\n-\n- @bottom_half\n- async def _on_message(self, msg: T) -> None:\n- \"\"\"\n- Called to handle the receipt of a new message.\n-\n- .. caution::\n- This is executed from within the reader loop, so be advised\n- that waiting on either the reader or writer task will lead\n- to deadlock. Additionally, any unhandled exceptions will\n- directly cause the loop to halt, so logic may be best-kept\n- to a minimum if at all possible.\n-\n- :param msg: The incoming message, already logged/filtered.\n- \"\"\"\n- # Nothing to do in the abstract case.\ndiff --git a/python/qemu/qmp/py.typed b/python/qemu/qmp/py.typed\ndeleted file mode 100644\nindex e69de29bb2d..00000000000\ndiff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py\ndeleted file mode 100644\nindex 8beccfe29d3..00000000000\n--- a/python/qemu/qmp/qmp_client.py\n+++ /dev/null\n@@ -1,732 +0,0 @@\n-\"\"\"\n-QMP Protocol Implementation\n-\n-This module provides the `QMPClient` class, which can be used to connect\n-and send commands to a QMP server such as QEMU. The QMP class can be\n-used to either connect to a listening server, or used to listen and\n-accept an incoming connection from that server.\n-\"\"\"\n-\n-import asyncio\n-import logging\n-import socket\n-import struct\n-from typing import (\n- Dict,\n- List,\n- Mapping,\n- Optional,\n- Union,\n- cast,\n-)\n-\n-from .error import ProtocolError, QMPError\n-from .events import Events\n-from .message import Message\n-from .models import ErrorResponse, Greeting\n-from .protocol import AsyncProtocol, Runstate, require\n-from .util import (\n- bottom_half,\n- exception_summary,\n- pretty_traceback,\n- upper_half,\n-)\n-\n-\n-class _WrappedProtocolError(ProtocolError):\n- \"\"\"\n- Abstract exception class for Protocol errors that wrap an Exception.\n-\n- :param error_message: Human-readable string describing the error.\n- :param exc: The root-cause exception.\n- \"\"\"\n- def __init__(self, error_message: str, exc: Exception):\n- super().__init__(error_message, exc)\n- self.exc = exc\n-\n- def __str__(self) -> str:\n- return f\"{self.error_message}: {self.exc!s}\"\n-\n-\n-class GreetingError(_WrappedProtocolError):\n- \"\"\"\n- An exception occurred during the Greeting phase.\n-\n- :param error_message: Human-readable string describing the error.\n- :param exc: The root-cause exception.\n- \"\"\"\n-\n-\n-class NegotiationError(_WrappedProtocolError):\n- \"\"\"\n- An exception occurred during the Negotiation phase.\n-\n- :param error_message: Human-readable string describing the error.\n- :param exc: The root-cause exception.\n- \"\"\"\n-\n-\n-class ExecuteError(QMPError):\n- \"\"\"\n- Exception raised by `QMPClient.execute()` on RPC failure.\n-\n- This exception is raised when the server received, interpreted, and\n- replied to a command successfully; but the command itself returned a\n- failure status.\n-\n- For example::\n-\n- await qmp.execute('block-dirty-bitmap-add',\n- {'node': 'foo', 'name': 'my_bitmap'})\n- # qemu.qmp.qmp_client.ExecuteError:\n- # Cannot find device='foo' nor node-name='foo'\n-\n- :param error_response: The RPC error response object.\n- :param sent: The sent RPC message that caused the failure.\n- :param received: The raw RPC error reply received.\n- \"\"\"\n- def __init__(self, error_response: ErrorResponse,\n- sent: Message, received: Message):\n- super().__init__(error_response, sent, received)\n- #: The sent `Message` that caused the failure\n- self.sent: Message = sent\n- #: The received `Message` that indicated failure\n- self.received: Message = received\n- #: The parsed error response\n- self.error: ErrorResponse = error_response\n-\n- @property\n- def error_class(self) -> str:\n- \"\"\"The QMP error class\"\"\"\n- return self.error.error.class_\n-\n- def __str__(self) -> str:\n- return self.error.error.desc\n-\n-\n-class ExecInterruptedError(QMPError):\n- \"\"\"\n- Exception raised by `execute()` (et al) when an RPC is interrupted.\n-\n- This error is raised when an `execute()` statement could not be\n- completed. This can occur because the connection itself was\n- terminated before a reply was received. The true cause of the\n- interruption will be available via `disconnect()`.\n-\n- The QMP protocol does not make it possible to know if a command\n- succeeded or failed after such an event; the client will need to\n- query the server to determine the state of the server on a\n- case-by-case basis.\n-\n- For example, ECONNRESET might look like this::\n-\n- try:\n- await qmp.execute('query-block')\n- # ExecInterruptedError: Disconnected\n- except ExecInterruptedError:\n- await qmp.disconnect()\n- # ConnectionResetError: [Errno 104] Connection reset by peer\n- \"\"\"\n-\n-\n-class _MsgProtocolError(ProtocolError):\n- \"\"\"\n- Abstract error class for protocol errors that have a `Message` object.\n-\n- This Exception class is used for protocol errors where the `Message`\n- was mechanically understood, but was found to be inappropriate or\n- malformed.\n-\n- :param error_message: Human-readable string describing the error.\n- :param msg: The QMP `Message` that caused the error.\n- \"\"\"\n- def __init__(self, error_message: str, msg: Message, *args: object):\n- super().__init__(error_message, msg, *args)\n- #: The received `Message` that caused the error.\n- self.msg: Message = msg\n-\n- def __str__(self) -> str:\n- return \"\\n\".join([\n- super().__str__(),\n- f\" Message was: {str(self.msg)}\\n\",\n- ])\n-\n-\n-class ServerParseError(_MsgProtocolError):\n- \"\"\"\n- The Server sent a `Message` indicating parsing failure.\n-\n- i.e. A reply has arrived from the server, but it is missing the \"ID\"\n- field, indicating a parsing error.\n-\n- :param error_message: Human-readable string describing the error.\n- :param msg: The QMP `Message` that caused the error.\n- \"\"\"\n-\n-\n-class BadReplyError(_MsgProtocolError):\n- \"\"\"\n- An execution reply was successfully routed, but not understood.\n-\n- If a QMP message is received with an 'id' field to allow it to be\n- routed, but is otherwise malformed, this exception will be raised.\n-\n- A reply message is malformed if it is missing either the 'return' or\n- 'error' keys, or if the 'error' value has missing keys or members of\n- the wrong type.\n-\n- :param error_message: Human-readable string describing the error.\n- :param msg: The malformed reply that was received.\n- :param sent: The message that was sent that prompted the error.\n- \"\"\"\n- def __init__(self, error_message: str, msg: Message, sent: Message):\n- super().__init__(error_message, msg, sent)\n- #: The sent `Message` that caused the failure\n- self.sent = sent\n-\n-\n-class QMPClient(AsyncProtocol[Message], Events):\n- \"\"\"Implements a QMP client connection.\n-\n- `QMPClient` can be used to either connect or listen to a QMP server,\n- but always acts as the QMP client.\n-\n- :param name:\n- Optional nickname for the connection, used to differentiate\n- instances when logging.\n-\n- :param readbuflen:\n- The maximum buffer length for reads and writes to and from the QMP\n- server, in bytes. Default is 10MB. If `QMPClient` is used to\n- connect to a guest agent to transfer files via ``guest-file-read``/\n- ``guest-file-write``, increasing this value may be required.\n-\n- Basic script-style usage looks like this::\n-\n- import asyncio\n- from qemu.qmp import QMPClient\n-\n- async def main():\n- qmp = QMPClient('my_virtual_machine_name')\n- await qmp.connect(('127.0.0.1', 1234))\n- ...\n- res = await qmp.execute('query-block')\n- ...\n- await qmp.disconnect()\n-\n- asyncio.run(main())\n-\n- A more advanced example that starts to take advantage of asyncio\n- might look like this::\n-\n- class Client:\n- def __init__(self, name: str):\n- self.qmp = QMPClient(name)\n-\n- async def watch_events(self):\n- try:\n- async for event in self.qmp.events:\n- print(f\"Event: {event['event']}\")\n- except asyncio.CancelledError:\n- return\n-\n- async def run(self, address='/tmp/qemu.socket'):\n- await self.qmp.connect(address)\n- asyncio.create_task(self.watch_events())\n- await self.qmp.runstate_changed.wait()\n- await self.disconnect()\n-\n- See `qmp.events` for more detail on event handling patterns.\n-\n- \"\"\"\n- #: Logger object used for debugging messages.\n- logger = logging.getLogger(__name__)\n-\n- # Read buffer default limit; 10MB like libvirt default\n- _readbuflen = 10 * 1024 * 1024\n-\n- # Type alias for pending execute() result items\n- _PendingT = Union[Message, ExecInterruptedError]\n-\n- def __init__(\n- self,\n- name: Optional[str] = None,\n- readbuflen: int = _readbuflen\n- ) -> None:\n- super().__init__(name, readbuflen)\n- Events.__init__(self)\n-\n- #: Whether or not to await a greeting after establishing a connection.\n- #: Defaults to True; QGA servers expect this to be False.\n- self.await_greeting: bool = True\n-\n- #: Whether or not to perform capabilities negotiation upon\n- #: connection. Implies `await_greeting`. Defaults to True; QGA\n- #: servers expect this to be False.\n- self.negotiate: bool = True\n-\n- # Cached Greeting, if one was awaited.\n- self._greeting: Optional[Greeting] = None\n-\n- # Command ID counter\n- self._execute_id = 0\n-\n- # Incoming RPC reply messages.\n- self._pending: Dict[\n- Union[str, None],\n- 'asyncio.Queue[QMPClient._PendingT]'\n- ] = {}\n-\n- @property\n- def greeting(self) -> Optional[Greeting]:\n- \"\"\"\n- The `Greeting` from the QMP server, if any.\n-\n- Defaults to ``None``, and will be set after a greeting is\n- received during the connection process. It is reset at the start\n- of each connection attempt.\n- \"\"\"\n- return self._greeting\n-\n- @upper_half\n- async def _establish_session(self) -> None:\n- \"\"\"\n- Initiate the QMP session.\n-\n- Wait for the QMP greeting and perform capabilities negotiation.\n-\n- :raise GreetingError: When the greeting is not understood.\n- :raise NegotiationError: If the negotiation fails.\n- :raise EOFError: When the server unexpectedly hangs up.\n- :raise OSError: For underlying stream errors.\n- \"\"\"\n- self._greeting = None\n- self._pending = {}\n-\n- if self.await_greeting or self.negotiate:\n- self._greeting = await self._get_greeting()\n-\n- if self.negotiate:\n- await self._negotiate()\n-\n- # This will start the reader/writers:\n- await super()._establish_session()\n-\n- @upper_half\n- async def _get_greeting(self) -> Greeting:\n- \"\"\"\n- :raise GreetingError: When the greeting is not understood.\n- :raise EOFError: When the server unexpectedly hangs up.\n- :raise OSError: For underlying stream errors.\n-\n- :return: the Greeting object given by the server.\n- \"\"\"\n- self.logger.debug(\"Awaiting greeting ...\")\n-\n- try:\n- msg = await self._recv()\n- return Greeting(msg)\n- except (ProtocolError, KeyError, TypeError) as err:\n- emsg = \"Did not understand Greeting\"\n- self.logger.error(\"%s: %s\", emsg, exception_summary(err))\n- self.logger.debug(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- raise GreetingError(emsg, err) from err\n- except BaseException as err:\n- # EOFError, OSError, or something unexpected.\n- emsg = \"Failed to receive Greeting\"\n- self.logger.error(\"%s: %s\", emsg, exception_summary(err))\n- self.logger.debug(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- raise\n-\n- @upper_half\n- async def _negotiate(self) -> None:\n- \"\"\"\n- Perform QMP capabilities negotiation.\n-\n- :raise NegotiationError: When negotiation fails.\n- :raise EOFError: When the server unexpectedly hangs up.\n- :raise OSError: For underlying stream errors.\n- \"\"\"\n- self.logger.debug(\"Negotiating capabilities ...\")\n-\n- arguments: Dict[str, List[str]] = {}\n- if self._greeting and 'oob' in self._greeting.QMP.capabilities:\n- arguments.setdefault('enable', []).append('oob')\n- msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)\n-\n- # It's not safe to use execute() here, because the reader/writers\n- # aren't running. AsyncProtocol *requires* that a new session\n- # does not fail after the reader/writers are running!\n- try:\n- await self._send(msg)\n- reply = await self._recv()\n- assert 'return' in reply\n- assert 'error' not in reply\n- except (ProtocolError, AssertionError) as err:\n- emsg = \"Negotiation failed\"\n- self.logger.error(\"%s: %s\", emsg, exception_summary(err))\n- self.logger.debug(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- raise NegotiationError(emsg, err) from err\n- except BaseException as err:\n- # EOFError, OSError, or something unexpected.\n- emsg = \"Negotiation failed\"\n- self.logger.error(\"%s: %s\", emsg, exception_summary(err))\n- self.logger.debug(\"%s:\\n%s\\n\", emsg, pretty_traceback())\n- raise\n-\n- @bottom_half\n- async def _bh_disconnect(self) -> None:\n- try:\n- await super()._bh_disconnect()\n- finally:\n- if self._pending:\n- self.logger.debug(\"Cancelling pending executions\")\n- keys = self._pending.keys()\n- for key in keys:\n- self.logger.debug(\"Cancelling execution '%s'\", key)\n- self._pending[key].put_nowait(\n- ExecInterruptedError(\"Disconnected\")\n- )\n-\n- self.logger.debug(\"QMP Disconnected.\")\n-\n- @upper_half\n- def _cleanup(self) -> None:\n- super()._cleanup()\n- assert not self._pending\n-\n- @bottom_half\n- async def _on_message(self, msg: Message) -> None:\n- \"\"\"\n- Add an incoming message to the appropriate queue/handler.\n-\n- :raise ServerParseError: When Message indicates server parse failure.\n- \"\"\"\n- # Incoming messages are not fully parsed/validated here;\n- # do only light peeking to know how to route the messages.\n-\n- if 'event' in msg:\n- await self._event_dispatch(msg)\n- return\n-\n- # Below, we assume everything left is an execute/exec-oob response.\n-\n- exec_id = cast(Optional[str], msg.get('id'))\n-\n- if exec_id in self._pending:\n- await self._pending[exec_id].put(msg)\n- return\n-\n- # We have a message we can't route back to a caller.\n-\n- is_error = 'error' in msg\n- has_id = 'id' in msg\n-\n- if is_error and not has_id:\n- # This is very likely a server parsing error.\n- # It doesn't inherently belong to any pending execution.\n- # Instead of performing clever recovery, just terminate.\n- # See \"NOTE\" in interop/qmp-spec, \"Error\" section.\n- raise ServerParseError(\n- (\"Server sent an error response without an ID, \"\n- \"but there are no ID-less executions pending. \"\n- \"Assuming this is a server parser failure.\"),\n- msg\n- )\n-\n- # qmp-spec.rst, \"Commands Responses\" section:\n- # 'Clients should drop all the responses\n- # that have an unknown \"id\" field.'\n- self.logger.log(\n- logging.ERROR if is_error else logging.WARNING,\n- \"Unknown ID '%s', message dropped.\",\n- exec_id,\n- )\n- self.logger.debug(\"Unroutable message: %s\", str(msg))\n-\n- @upper_half\n- @bottom_half\n- async def _do_recv(self) -> Message:\n- \"\"\"\n- :raise OSError: When a stream error is encountered.\n- :raise EOFError: When the stream is at EOF.\n- :raise ProtocolError:\n- When the Message is not understood.\n- See also `Message._deserialize`.\n-\n- :return: A single QMP `Message`.\n- \"\"\"\n- msg_bytes = await self._readline()\n- msg = Message(msg_bytes, eager=True)\n- return msg\n-\n- @upper_half\n- @bottom_half\n- def _do_send(self, msg: Message) -> None:\n- \"\"\"\n- :raise ValueError: JSON serialization failure\n- :raise TypeError: JSON serialization failure\n- :raise OSError: When a stream error is encountered.\n- \"\"\"\n- assert self._writer is not None\n- self._writer.write(bytes(msg))\n-\n- @upper_half\n- def _get_exec_id(self) -> str:\n- exec_id = f\"__qmp#{self._execute_id:05d}\"\n- self._execute_id += 1\n- return exec_id\n-\n- @upper_half\n- async def _issue(self, msg: Message) -> Union[None, str]:\n- \"\"\"\n- Issue a QMP `Message` and do not wait for a reply.\n-\n- :param msg: The QMP `Message` to send to the server.\n-\n- :return: The ID of the `Message` sent.\n- \"\"\"\n- msg_id: Optional[str] = None\n- if 'id' in msg:\n- assert isinstance(msg['id'], str)\n- msg_id = msg['id']\n-\n- self._pending[msg_id] = asyncio.Queue(maxsize=1)\n- try:\n- await self._outgoing.put(msg)\n- except:\n- del self._pending[msg_id]\n- raise\n-\n- return msg_id\n-\n- @upper_half\n- async def _reply(self, msg_id: Union[str, None]) -> Message:\n- \"\"\"\n- Await a reply to a previously issued QMP message.\n-\n- :param msg_id: The ID of the previously issued message.\n-\n- :return: The reply from the server.\n- :raise ExecInterruptedError:\n- When the reply could not be retrieved because the connection\n- was lost, or some other problem.\n- \"\"\"\n- queue = self._pending[msg_id]\n-\n- try:\n- result = await queue.get()\n- if isinstance(result, ExecInterruptedError):\n- raise result\n- return result\n- finally:\n- del self._pending[msg_id]\n-\n- @upper_half\n- async def _execute(self, msg: Message, assign_id: bool = True) -> Message:\n- \"\"\"\n- Send a QMP `Message` to the server and await a reply.\n-\n- This method *assumes* you are sending some kind of an execute\n- statement that *will* receive a reply.\n-\n- An execution ID will be assigned if assign_id is `True`. It can be\n- disabled, but this requires that an ID is manually assigned\n- instead. For manually assigned IDs, you must not use the string\n- '__qmp#' anywhere in the ID.\n-\n- :param msg: The QMP `Message` to execute.\n- :param assign_id: If True, assign a new execution ID.\n-\n- :return: Execution reply from the server.\n- :raise ExecInterruptedError:\n- When the reply could not be retrieved because the connection\n- was lost, or some other problem.\n- \"\"\"\n- if assign_id:\n- msg['id'] = self._get_exec_id()\n- elif 'id' in msg:\n- assert isinstance(msg['id'], str)\n- assert '__qmp#' not in msg['id']\n-\n- exec_id = await self._issue(msg)\n- return await self._reply(exec_id)\n-\n- @upper_half\n- @require(Runstate.RUNNING)\n- async def _raw(\n- self,\n- msg: Union[Message, Mapping[str, object], bytes],\n- assign_id: bool = True,\n- ) -> Message:\n- \"\"\"\n- Issue a raw `Message` to the QMP server and await a reply.\n-\n- :param msg:\n- A Message to send to the server. It may be a `Message`, any\n- Mapping (including Dict), or raw bytes.\n- :param assign_id:\n- Assign an arbitrary execution ID to this message. If\n- `False`, the existing id must either be absent (and no other\n- such pending execution may omit an ID) or a string. If it is\n- a string, it must not start with '__qmp#' and no other such\n- pending execution may currently be using that ID.\n-\n- :return: Execution reply from the server.\n-\n- :raise ExecInterruptedError:\n- When the reply could not be retrieved because the connection\n- was lost, or some other problem.\n- :raise TypeError:\n- When assign_id is `False`, an ID is given, and it is not a string.\n- :raise ValueError:\n- When assign_id is `False`, but the ID is not usable;\n- Either because it starts with '__qmp#' or it is already in-use.\n- \"\"\"\n- # 1. convert generic Mapping or bytes to a QMP Message\n- # 2. copy Message objects so that we assign an ID only to the copy.\n- msg = Message(msg)\n-\n- exec_id = msg.get('id')\n- if not assign_id and 'id' in msg:\n- if not isinstance(exec_id, str):\n- raise TypeError(f\"ID ('{exec_id}') must be a string.\")\n- if exec_id.startswith('__qmp#'):\n- raise ValueError(\n- f\"ID ('{exec_id}') must not start with '__qmp#'.\"\n- )\n-\n- if not assign_id and exec_id in self._pending:\n- raise ValueError(\n- f\"ID '{exec_id}' is in-use and cannot be used.\"\n- )\n-\n- return await self._execute(msg, assign_id=assign_id)\n-\n- @upper_half\n- @require(Runstate.RUNNING)\n- async def execute_msg(self, msg: Message) -> object:\n- \"\"\"\n- Execute a QMP command on the server and return its value.\n-\n- :param msg: The QMP `Message` to execute.\n-\n- :return:\n- The command execution return value from the server. The type of\n- object returned depends on the command that was issued,\n- though most in QEMU return a `dict`.\n- :raise ValueError:\n- If the QMP `Message` does not have either the 'execute' or\n- 'exec-oob' fields set.\n- :raise ExecuteError: When the server returns an error response.\n- :raise ExecInterruptedError:\n- If the connection was disrupted before\n- receiving a reply from the server.\n- \"\"\"\n- if not ('execute' in msg or 'exec-oob' in msg):\n- raise ValueError(\"Requires 'execute' or 'exec-oob' message\")\n-\n- # Copy the Message so that the ID assigned by _execute() is\n- # local to this method; allowing the ID to be seen in raised\n- # Exceptions but without modifying the caller's held copy.\n- msg = Message(msg)\n- reply = await self._execute(msg)\n-\n- if 'error' in reply:\n- try:\n- error_response = ErrorResponse(reply)\n- except (KeyError, TypeError) as err:\n- # Error response was malformed.\n- raise BadReplyError(\n- \"QMP error reply is malformed\", reply, msg,\n- ) from err\n-\n- raise ExecuteError(error_response, msg, reply)\n-\n- if 'return' not in reply:\n- raise BadReplyError(\n- \"QMP reply is missing a 'error' or 'return' member\",\n- reply, msg,\n- )\n-\n- return reply['return']\n-\n- @classmethod\n- def make_execute_msg(cls, cmd: str,\n- arguments: Optional[Mapping[str, object]] = None,\n- oob: bool = False) -> Message:\n- \"\"\"\n- Create an executable message to be sent by `execute_msg` later.\n-\n- :param cmd: QMP command name.\n- :param arguments: Arguments (if any). Must be JSON-serializable.\n- :param oob:\n- If `True`, execute \"out of band\". See `interop/qmp-spec`\n- section \"Out-of-band execution\".\n-\n- :return: A QMP `Message` that can be executed with `execute_msg()`.\n- \"\"\"\n- msg = Message({'exec-oob' if oob else 'execute': cmd})\n- if arguments is not None:\n- msg['arguments'] = arguments\n- return msg\n-\n- @upper_half\n- async def execute(self, cmd: str,\n- arguments: Optional[Mapping[str, object]] = None,\n- oob: bool = False) -> object:\n- \"\"\"\n- Execute a QMP command on the server and return its value.\n-\n- :param cmd: QMP command name.\n- :param arguments: Arguments (if any). Must be JSON-serializable.\n- :param oob:\n- If `True`, execute \"out of band\". See `interop/qmp-spec`\n- section \"Out-of-band execution\".\n-\n- :return:\n- The command execution return value from the server. The type of\n- object returned depends on the command that was issued,\n- though most in QEMU return a `dict`.\n- :raise ExecuteError: When the server returns an error response.\n- :raise ExecInterruptedError:\n- If the connection was disrupted before\n- receiving a reply from the server.\n- \"\"\"\n- msg = self.make_execute_msg(cmd, arguments, oob=oob)\n- return await self.execute_msg(msg)\n-\n- @upper_half\n- @require(Runstate.RUNNING)\n- def send_fd_scm(self, fd: int) -> None:\n- \"\"\"Send a file descriptor to the remote via SCM_RIGHTS.\n-\n- This method does not close the file descriptor.\n-\n- :param fd: The file descriptor to send to QEMU.\n-\n- This is an advanced feature of QEMU where file descriptors can\n- be passed from client to server. This is usually used as a\n- security measure to isolate the QEMU process from being able to\n- open its own files. See the QMP commands ``getfd`` and\n- ``add-fd`` for more information.\n-\n- See `socket.socket.sendmsg` for more information on the Python\n- implementation for sending file descriptors over a UNIX socket.\n- \"\"\"\n- assert self._writer is not None\n- sock = self._writer.transport.get_extra_info('socket')\n-\n- if sock.family != socket.AF_UNIX:\n- raise QMPError(\"Sending file descriptors requires a UNIX socket.\")\n-\n- if not hasattr(sock, 'sendmsg'):\n- # We need to void the warranty sticker.\n- # Access to sendmsg is scheduled for removal in Python 3.11.\n- # Find the real backing socket to use it anyway.\n- sock = sock._sock # pylint: disable=protected-access\n-\n- sock.sendmsg(\n- [b' '],\n- [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]\n- )\ndiff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py\ndeleted file mode 100644\nindex f8188005685..00000000000\n--- a/python/qemu/qmp/qmp_shell.py\n+++ /dev/null\n@@ -1,689 +0,0 @@\n-#\n-# Copyright (C) 2009-2022 Red Hat Inc.\n-#\n-# Authors:\n-# Luiz Capitulino <lcapitulino@redhat.com>\n-# John Snow <jsnow@redhat.com>\n-#\n-# This work is licensed under the terms of the GNU LGPL, version 2 or\n-# later. See the COPYING file in the top-level directory.\n-#\n-\n-\"\"\"\n-qmp-shell - An interactive QEMU shell powered by QMP\n-\n-qmp-shell offers a simple shell with a convenient shorthand syntax as an\n-alternative to typing JSON by hand. This syntax is not standardized and\n-is not meant to be used as a scriptable interface. This shorthand *may*\n-change incompatibly in the future, and it is strongly encouraged to use\n-the QMP library to provide API-stable scripting when needed.\n-\n-usage: qmp-shell [-h] [-H] [-v] [-p] [-l LOGFILE] [-N] qmp_server\n-\n-positional arguments:\n- qmp_server < UNIX socket path | TCP address:port >\n-\n-optional arguments:\n- -h, --help show this help message and exit\n- -H, --hmp Use HMP interface\n- -v, --verbose Verbose (echo commands sent and received)\n- -p, --pretty Pretty-print JSON\n- -l LOGFILE, --logfile LOGFILE\n- Save log of all QMP messages to PATH\n- -N, --skip-negotiation\n- Skip negotiate (for qemu-ga)\n-\n-Usage\n------\n-\n-First, start QEMU with::\n-\n- > qemu [...] -qmp unix:./qmp-sock,server=on[,wait=off]\n-\n-Then run the shell, passing the address of the socket::\n-\n- > qmp-shell ./qmp-sock\n-\n-Syntax\n-------\n-\n-Commands have the following format::\n-\n- < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]\n-\n-For example, to add a network device::\n-\n- (QEMU) device_add driver=e1000 id=net1\n- {'return': {}}\n- (QEMU)\n-\n-key=value pairs support either Python or JSON object literal notations,\n-**without spaces**. Dictionaries/objects ``{}`` are supported, as are\n-arrays ``[]``::\n-\n- example-command arg-name1={'key':'value','obj'={'prop':\"value\"}}\n-\n-Either JSON or Python formatting for compound values works, including\n-both styles of string literal quotes (either single or double\n-quotes). Both paradigms of literal values are accepted, including\n-``null/true/false`` for JSON and ``None/True/False`` for Python.\n-\n-Transactions\n-------------\n-\n-Transactions have the following multi-line format::\n-\n- transaction(\n- action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]\n- ...\n- action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]\n- )\n-\n-One line transactions are also supported::\n-\n- transaction( action-name1 ... )\n-\n-For example::\n-\n- (QEMU) transaction(\n- TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1\n- TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0\n- TRANS> )\n- {\"return\": {}}\n- (QEMU)\n-\n-Commands\n---------\n-\n-Autocomplete of command names using <tab> is supported. Pressing <tab>\n-at a blank CLI prompt will show you a list of all available commands\n-that the connected QEMU instance supports.\n-\n-For documentation on QMP commands and their arguments, please see\n-`qmp ref`.\n-\n-Events\n-------\n-\n-qmp-shell will display events received from the server, but this version\n-does not do so asynchronously. To check for new events from the server,\n-press <enter> on a blank line::\n-\n- (QEMU) ⏎\n- {'timestamp': {'seconds': 1660071944, 'microseconds': 184667},\n- 'event': 'STOP'}\n-\n-Display options\n----------------\n-\n-Use the -v and -p options to activate the verbose and pretty-print\n-options, which will echo back the properly formatted JSON-compliant QMP\n-that is being sent to QEMU. This is useful for debugging to see the\n-wire-level QMP data being exchanged, and generating output for use in\n-writing documentation for QEMU.\n-\"\"\"\n-\n-import argparse\n-import ast\n-import json\n-import logging\n-import os\n-import re\n-import readline\n-from subprocess import Popen\n-import sys\n-from typing import (\n- IO,\n- Dict,\n- Iterator,\n- List,\n- NoReturn,\n- Optional,\n- Sequence,\n- cast,\n-)\n-\n-from qemu.qmp import (\n- ConnectError,\n- ExecuteError,\n- QMPError,\n- SocketAddrT,\n-)\n-from qemu.qmp.legacy import (\n- QEMUMonitorProtocol,\n- QMPBadPortError,\n- QMPMessage,\n- QMPObject,\n-)\n-\n-\n-LOG = logging.getLogger(__name__)\n-\n-\n-class QMPCompleter:\n- \"\"\"\n- QMPCompleter provides a readline library tab-complete behavior.\n- \"\"\"\n- # NB: Python 3.9+ will probably allow us to subclass list[str] directly,\n- # but pylint as of today does not know that List[str] is simply 'list'.\n- def __init__(self) -> None:\n- self._matches: List[str] = []\n-\n- def append(self, value: str) -> None:\n- \"\"\"Append a new valid completion to the list of possibilities.\"\"\"\n- return self._matches.append(value)\n-\n- def complete(self, text: str, state: int) -> Optional[str]:\n- \"\"\"readline.set_completer() callback implementation.\"\"\"\n- for cmd in self._matches:\n- if cmd.startswith(text):\n- if state == 0:\n- return cmd\n- state -= 1\n- return None\n-\n-\n-class QMPShellError(QMPError):\n- \"\"\"\n- QMP Shell Base error class.\n- \"\"\"\n-\n-\n-class FuzzyJSON(ast.NodeTransformer):\n- \"\"\"\n- This extension of ast.NodeTransformer filters literal \"true/false/null\"\n- values in a Python AST and replaces them by proper \"True/False/None\" values\n- that Python can properly evaluate.\n- \"\"\"\n-\n- @classmethod\n- def visit_Name(cls, # pylint: disable=invalid-name\n- node: ast.Name) -> ast.AST:\n- \"\"\"\n- Transform Name nodes with certain values into Constant (keyword) nodes.\n- \"\"\"\n- if node.id == 'true':\n- return ast.Constant(value=True)\n- if node.id == 'false':\n- return ast.Constant(value=False)\n- if node.id == 'null':\n- return ast.Constant(value=None)\n- return node\n-\n-\n-class QMPShell(QEMUMonitorProtocol):\n- \"\"\"\n- QMPShell provides a basic readline-based QMP shell.\n-\n- :param address: Address of the QMP server.\n- :param pretty: Pretty-print QMP messages.\n- :param verbose: Echo outgoing QMP messages to console.\n- \"\"\"\n- def __init__(self, address: SocketAddrT,\n- pretty: bool = False,\n- verbose: bool = False,\n- server: bool = False,\n- logfile: Optional[str] = None):\n- super().__init__(address, server=server)\n- self._greeting: Optional[QMPMessage] = None\n- self._completer = QMPCompleter()\n- self._transmode = False\n- self._actions: List[QMPMessage] = []\n- self._histfile = os.path.join(os.path.expanduser('~'),\n- '.qmp-shell_history')\n- self.pretty = pretty\n- self.verbose = verbose\n- self.logfile = None\n-\n- if logfile is not None:\n- self.logfile = open(logfile, \"w\", encoding='utf-8')\n-\n- def close(self) -> None:\n- # Hook into context manager of parent to save shell history.\n- self._save_history()\n- super().close()\n-\n- def _fill_completion(self) -> None:\n- try:\n- cmds = cast(List[Dict[str, str]], self.cmd('query-commands'))\n- for cmd in cmds:\n- self._completer.append(cmd['name'])\n- except ExecuteError:\n- pass\n-\n- def _completer_setup(self) -> None:\n- self._completer = QMPCompleter()\n- self._fill_completion()\n- readline.set_history_length(1024)\n- readline.set_completer(self._completer.complete)\n- readline.parse_and_bind(\"tab: complete\")\n- # NB: default delimiters conflict with some command names\n- # (eg. query-), clearing everything as it doesn't seem to matter\n- readline.set_completer_delims('')\n- try:\n- readline.read_history_file(self._histfile)\n- except FileNotFoundError:\n- pass\n- except IOError as err:\n- msg = f\"Failed to read history '{self._histfile}': {err!s}\"\n- LOG.warning(msg)\n-\n- def _save_history(self) -> None:\n- try:\n- readline.write_history_file(self._histfile)\n- except IOError as err:\n- msg = f\"Failed to save history file '{self._histfile}': {err!s}\"\n- LOG.warning(msg)\n-\n- @classmethod\n- def _parse_value(cls, val: str) -> object:\n- try:\n- return int(val)\n- except ValueError:\n- pass\n-\n- if val.lower() == 'true':\n- return True\n- if val.lower() == 'false':\n- return False\n- if val.startswith(('{', '[')):\n- # Try first as pure JSON:\n- try:\n- return json.loads(val)\n- except ValueError:\n- pass\n- # Try once again as FuzzyJSON:\n- try:\n- tree = ast.parse(val, mode='eval')\n- transformed = FuzzyJSON().visit(tree)\n- return ast.literal_eval(transformed)\n- except (SyntaxError, ValueError):\n- pass\n- return val\n-\n- def _cli_expr(self,\n- tokens: Sequence[str],\n- parent: QMPObject) -> None:\n- for arg in tokens:\n- (key, sep, val) = arg.partition('=')\n- if sep != '=':\n- raise QMPShellError(\n- f\"Expected a key=value pair, got '{arg!s}'\"\n- )\n-\n- value = self._parse_value(val)\n- optpath = key.split('.')\n- curpath = []\n- for path in optpath[:-1]:\n- curpath.append(path)\n- obj = parent.get(path, {})\n- if not isinstance(obj, dict):\n- msg = 'Cannot use \"{:s}\" as both leaf and non-leaf key'\n- raise QMPShellError(msg.format('.'.join(curpath)))\n- parent[path] = obj\n- parent = obj\n- if optpath[-1] in parent:\n- if isinstance(parent[optpath[-1]], dict):\n- msg = 'Cannot use \"{:s}\" as both leaf and non-leaf key'\n- raise QMPShellError(msg.format('.'.join(curpath)))\n- raise QMPShellError(f'Cannot set \"{key}\" multiple times')\n- parent[optpath[-1]] = value\n-\n- def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:\n- \"\"\"\n- Build a QMP input object from a user provided command-line in the\n- following format:\n-\n- < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]\n- \"\"\"\n- argument_regex = r'''(?:[^\\s\"']|\"(?:\\\\.|[^\"])*\"|'(?:\\\\.|[^'])*')+'''\n- cmdargs = re.findall(argument_regex, cmdline)\n- qmpcmd: QMPMessage\n-\n- # Transactional CLI entry:\n- if cmdargs and cmdargs[0] == 'transaction(':\n- self._transmode = True\n- self._actions = []\n- cmdargs.pop(0)\n-\n- # Transactional CLI exit:\n- if cmdargs and cmdargs[0] == ')' and self._transmode:\n- self._transmode = False\n- if len(cmdargs) > 1:\n- msg = 'Unexpected input after close of Transaction sub-shell'\n- raise QMPShellError(msg)\n- qmpcmd = {\n- 'execute': 'transaction',\n- 'arguments': {'actions': self._actions}\n- }\n- return qmpcmd\n-\n- # No args, or no args remaining\n- if not cmdargs:\n- return None\n-\n- if self._transmode:\n- # Parse and cache this Transactional Action\n- finalize = False\n- action = {'type': cmdargs[0], 'data': {}}\n- if cmdargs[-1] == ')':\n- cmdargs.pop(-1)\n- finalize = True\n- self._cli_expr(cmdargs[1:], action['data'])\n- self._actions.append(action)\n- return self._build_cmd(')') if finalize else None\n-\n- # Standard command: parse and return it to be executed.\n- qmpcmd = {'execute': cmdargs[0], 'arguments': {}}\n- self._cli_expr(cmdargs[1:], qmpcmd['arguments'])\n- return qmpcmd\n-\n- def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None:\n- jsobj = json.dumps(qmp_message,\n- indent=4 if self.pretty else None,\n- sort_keys=self.pretty)\n- print(str(jsobj), file=fh)\n-\n- def _execute_cmd(self, cmdline: str) -> bool:\n- try:\n- qmpcmd = self._build_cmd(cmdline)\n- except QMPShellError as err:\n- print(\n- f\"Error while parsing command line: {err!s}\\n\"\n- \"command format: <command-name> \"\n- \"[arg-name1=arg1] ... [arg-nameN=argN\",\n- file=sys.stderr\n- )\n- return True\n- # For transaction mode, we may have just cached the action:\n- if qmpcmd is None:\n- return True\n- if self.verbose:\n- self._print(qmpcmd)\n- resp = self.cmd_obj(qmpcmd)\n- if resp is None:\n- print('Disconnected')\n- return False\n- self._print(resp)\n- if self.logfile is not None:\n- cmd = {**qmpcmd, **resp}\n- self._print(cmd, fh=self.logfile)\n- return True\n-\n- def connect(self, negotiate: bool = True) -> None:\n- self._greeting = super().connect(negotiate)\n- self._completer_setup()\n-\n- def show_banner(self,\n- msg: str = 'Welcome to the QMP low-level shell!') -> None:\n- \"\"\"\n- Print to stdio a greeting, and the QEMU version if available.\n- \"\"\"\n- print(msg)\n- if not self._greeting:\n- print('Connected')\n- return\n- version = self._greeting['QMP']['version']['qemu']\n- print(\"Connected to QEMU {major}.{minor}.{micro}\\n\".format(**version))\n-\n- @property\n- def prompt(self) -> str:\n- \"\"\"\n- Return the current shell prompt, including a trailing space.\n- \"\"\"\n- if self._transmode:\n- return 'TRANS> '\n- return '(QEMU) '\n-\n- def read_exec_command(self) -> bool:\n- \"\"\"\n- Read and execute a command.\n-\n- @return True if execution was ok, return False if disconnected.\n- \"\"\"\n- try:\n- cmdline = input(self.prompt)\n- except EOFError:\n- print()\n- return False\n-\n- if cmdline == '':\n- for event in self.get_events():\n- print(event)\n- return True\n-\n- return self._execute_cmd(cmdline)\n-\n- def repl(self) -> Iterator[None]:\n- \"\"\"\n- Return an iterator that implements the REPL.\n- \"\"\"\n- self.show_banner()\n- while self.read_exec_command():\n- yield\n- self.close()\n-\n-\n-class HMPShell(QMPShell):\n- \"\"\"\n- HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.\n-\n- :param address: Address of the QMP server.\n- :param pretty: Pretty-print QMP messages.\n- :param verbose: Echo outgoing QMP messages to console.\n- \"\"\"\n- def __init__(self, address: SocketAddrT,\n- pretty: bool = False,\n- verbose: bool = False,\n- server: bool = False,\n- logfile: Optional[str] = None):\n- super().__init__(address, pretty, verbose, server, logfile)\n- self._cpu_index = 0\n-\n- def _cmd_completion(self) -> None:\n- for cmd in self._cmd_passthrough('help')['return'].split('\\r\\n'):\n- if cmd and cmd[0] != '[' and cmd[0] != '\\t':\n- name = cmd.split()[0] # drop help text\n- if name == 'info':\n- continue\n- if name.find('|') != -1:\n- # Command in the form 'foobar|f' or 'f|foobar', take the\n- # full name\n- opt = name.split('|')\n- if len(opt[0]) == 1:\n- name = opt[1]\n- else:\n- name = opt[0]\n- self._completer.append(name)\n- self._completer.append('help ' + name) # help completion\n-\n- def _info_completion(self) -> None:\n- for cmd in self._cmd_passthrough('info')['return'].split('\\r\\n'):\n- if cmd:\n- self._completer.append('info ' + cmd.split()[1])\n-\n- def _other_completion(self) -> None:\n- # special cases\n- self._completer.append('help info')\n-\n- def _fill_completion(self) -> None:\n- self._cmd_completion()\n- self._info_completion()\n- self._other_completion()\n-\n- def _cmd_passthrough(self, cmdline: str,\n- cpu_index: int = 0) -> QMPMessage:\n- return self.cmd_obj({\n- 'execute': 'human-monitor-command',\n- 'arguments': {\n- 'command-line': cmdline,\n- 'cpu-index': cpu_index\n- }\n- })\n-\n- def _execute_cmd(self, cmdline: str) -> bool:\n- if cmdline.split()[0] == \"cpu\":\n- # trap the cpu command, it requires special setting\n- try:\n- idx = int(cmdline.split()[1])\n- if 'return' not in self._cmd_passthrough('info version', idx):\n- print('bad CPU index')\n- return True\n- self._cpu_index = idx\n- except ValueError:\n- print('cpu command takes an integer argument')\n- return True\n- resp = self._cmd_passthrough(cmdline, self._cpu_index)\n- if resp is None:\n- print('Disconnected')\n- return False\n- assert 'return' in resp or 'error' in resp\n- if 'return' in resp:\n- # Success\n- if len(resp['return']) > 0:\n- print(resp['return'], end=' ')\n- else:\n- # Error\n- print('%s: %s' % (resp['error']['class'], resp['error']['desc']))\n- return True\n-\n- def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:\n- QMPShell.show_banner(self, msg)\n-\n-\n-def die(msg: str) -> NoReturn:\n- \"\"\"Write an error to stderr, then exit with a return code of 1.\"\"\"\n- sys.stderr.write('ERROR: %s\\n' % msg)\n- sys.exit(1)\n-\n-\n-def common_parser() -> argparse.ArgumentParser:\n- \"\"\"Build common parsing options used by qmp-shell and qmp-shell-wrap.\"\"\"\n- parser = argparse.ArgumentParser()\n- parser.add_argument('-H', '--hmp', action='store_true',\n- help='Use HMP interface')\n- parser.add_argument('-v', '--verbose', action='store_true',\n- help='Verbose (echo commands sent and received)')\n- parser.add_argument('-p', '--pretty', action='store_true',\n- help='Pretty-print JSON')\n- parser.add_argument('-l', '--logfile',\n- help='Save log of all QMP messages to PATH')\n- # NOTE: When changing arguments, update both this module docstring\n- # and the manpage synopsis in docs/man/qmp_shell.rst.\n- return parser\n-\n-\n-def main() -> None:\n- \"\"\"\n- qmp-shell entry point: parse command line arguments and start the REPL.\n- \"\"\"\n- parser = common_parser()\n- parser.add_argument('-N', '--skip-negotiation', action='store_true',\n- help='Skip negotiate (for qemu-ga)')\n-\n- default_server = os.environ.get('QMP_SOCKET')\n- parser.add_argument('qmp_server', action='store',\n- default=default_server,\n- help='< UNIX socket path | TCP address:port >')\n-\n- args = parser.parse_args()\n- if args.qmp_server is None:\n- parser.error(\"QMP socket or TCP address must be specified\")\n-\n- shell_class = HMPShell if args.hmp else QMPShell\n-\n- try:\n- address = shell_class.parse_address(args.qmp_server)\n- except QMPBadPortError:\n- parser.error(f\"Bad port number: {args.qmp_server}\")\n- return # pycharm doesn't know error() is noreturn\n-\n- with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu:\n- try:\n- qemu.connect(negotiate=not args.skip_negotiation)\n- except ConnectError as err:\n- if isinstance(err.exc, OSError):\n- die(f\"Couldn't connect to {args.qmp_server}: {err!s}\")\n- die(str(err))\n-\n- for _ in qemu.repl():\n- pass\n-\n-\n-def main_wrap() -> None:\n- \"\"\"\n- qmp-shell-wrap - QEMU + qmp-shell launcher utility\n-\n- Launch QEMU and connect to it with `qmp-shell` in a single command.\n- CLI arguments will be forwarded to qemu, with additional arguments\n- added to allow `qmp-shell` to then connect to the recently launched\n- QEMU instance.\n-\n- usage: qmp-shell-wrap [-h] [-H] [-v] [-p] [-l LOGFILE] ...\n-\n- positional arguments:\n- command QEMU command line to invoke\n-\n- optional arguments:\n- -h, --help show this help message and exit\n- -H, --hmp Use HMP interface\n- -v, --verbose Verbose (echo commands sent and received)\n- -p, --pretty Pretty-print JSON\n- -l LOGFILE, --logfile LOGFILE\n- Save log of all QMP messages to PATH\n-\n- Usage\n- -----\n-\n- Prepend \"qmp-shell-wrap\" to your usual QEMU command line::\n-\n- > qmp-shell-wrap qemu-system-x86_64 -M q35 -m 4096 -display none\n- Welcome to the QMP low-level shell!\n- Connected\n- (QEMU)\n- \"\"\"\n- parser = common_parser()\n- parser.add_argument('command', nargs=argparse.REMAINDER,\n- help='QEMU command line to invoke')\n-\n- args = parser.parse_args()\n-\n- cmd = args.command\n- if len(cmd) != 0 and cmd[0] == '--':\n- cmd = cmd[1:]\n- if len(cmd) == 0:\n- cmd = [\"qemu-system-x86_64\"]\n-\n- sockpath = \"qmp-shell-wrap-%d\" % os.getpid()\n- cmd += [\"-qmp\", \"unix:%s\" % sockpath]\n-\n- shell_class = HMPShell if args.hmp else QMPShell\n-\n- try:\n- address = shell_class.parse_address(sockpath)\n- except QMPBadPortError:\n- parser.error(f\"Bad port number: {sockpath}\")\n- return # pycharm doesn't know error() is noreturn\n-\n- try:\n- with shell_class(address, args.pretty, args.verbose,\n- True, args.logfile) as qemu:\n- with Popen(cmd):\n-\n- try:\n- qemu.accept()\n- except ConnectError as err:\n- if isinstance(err.exc, OSError):\n- die(f\"Couldn't connect to {args.qmp_server}: {err!s}\")\n- die(str(err))\n-\n- for _ in qemu.repl():\n- pass\n- except FileNotFoundError:\n- sys.stderr.write(f\"ERROR: QEMU executable '{cmd[0]}' not found.\\n\")\n- finally:\n- os.unlink(sockpath)\n-\n-\n-if __name__ == '__main__':\n- main()\ndiff --git a/python/qemu/qmp/qmp_tui.py b/python/qemu/qmp/qmp_tui.py\ndeleted file mode 100644\nindex d946c205131..00000000000\n--- a/python/qemu/qmp/qmp_tui.py\n+++ /dev/null\n@@ -1,665 +0,0 @@\n-# Copyright (c) 2021\n-#\n-# Authors:\n-# Niteesh Babu G S <niteesh.gs@gmail.com>\n-#\n-# This work is licensed under the terms of the GNU LGPL, version 2 or\n-# later. See the COPYING file in the top-level directory.\n-\"\"\"\n-QMP TUI\n-\n-QMP TUI is an asynchronous interface built on top the of the QMP library.\n-It is the successor of QMP-shell and is bought-in as a replacement for it.\n-\n-Example Usage: qmp-tui <SOCKET | TCP IP:PORT>\n-Full Usage: qmp-tui --help\n-\"\"\"\n-\n-import argparse\n-import asyncio\n-import json\n-import logging\n-from logging import Handler, LogRecord\n-import signal\n-import sys\n-from typing import (\n- List,\n- Optional,\n- Tuple,\n- Type,\n- Union,\n- cast,\n-)\n-\n-\n-try:\n- from pygments import lexers\n- from pygments import token as Token\n- import urwid\n- import urwid_readline\n-except ModuleNotFoundError as exc:\n- print(\n- f\"Module '{exc.name}' not found.\",\n- \"You need the optional 'tui' group: pip install qemu.qmp[tui]\",\n- sep='\\n',\n- file=sys.stderr,\n- )\n- sys.exit(1)\n-\n-from .error import ProtocolError\n-from .legacy import QEMUMonitorProtocol, QMPBadPortError\n-from .message import DeserializationError, Message, UnexpectedTypeError\n-from .protocol import ConnectError, Runstate\n-from .qmp_client import ExecInterruptedError, QMPClient\n-from .util import get_or_create_event_loop, pretty_traceback\n-\n-\n-# The name of the signal that is used to update the history list\n-UPDATE_MSG: str = 'UPDATE_MSG'\n-\n-\n-palette = [\n- (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),\n- (Token.Text, '', '', '', '', 'g7'),\n- (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),\n- (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),\n- (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),\n- (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),\n- ('DEBUG', '', '', '', '#ddf', 'g7'),\n- ('INFO', '', '', '', 'g100', 'g7'),\n- ('WARNING', '', '', '', '#ff6', 'g7'),\n- ('ERROR', '', '', '', '#a00', 'g7'),\n- ('CRITICAL', '', '', '', '#a00', 'g7'),\n- ('background', '', 'black', '', '', 'g7'),\n-]\n-\n-\n-def format_json(msg: str) -> str:\n- \"\"\"\n- Formats valid/invalid multi-line JSON message into a single-line message.\n-\n- Formatting is first tried using the standard json module. If that fails\n- due to an decoding error then a simple string manipulation is done to\n- achieve a single line JSON string.\n-\n- Converting into single line is more aesthetically pleasing when looking\n- along with error messages.\n-\n- Eg:\n- Input:\n- [ 1,\n- true,\n- 3 ]\n- The above input is not a valid QMP message and produces the following error\n- \"QMP message is not a JSON object.\"\n- When displaying this in TUI in multiline mode we get\n-\n- [ 1,\n- true,\n- 3 ]: QMP message is not a JSON object.\n-\n- whereas in singleline mode we get the following\n-\n- [1, true, 3]: QMP message is not a JSON object.\n-\n- The single line mode is more aesthetically pleasing.\n-\n- :param msg:\n- The message to formatted into single line.\n-\n- :return: Formatted singleline message.\n- \"\"\"\n- try:\n- msg = json.loads(msg)\n- return str(json.dumps(msg))\n- except json.decoder.JSONDecodeError:\n- msg = msg.replace('\\n', '')\n- words = msg.split(' ')\n- words = list(filter(None, words))\n- return ' '.join(words)\n-\n-\n-def has_handler_type(logger: logging.Logger,\n- handler_type: Type[Handler]) -> bool:\n- \"\"\"\n- The Logger class has no interface to check if a certain type of handler is\n- installed or not. So we provide an interface to do so.\n-\n- :param logger:\n- Logger object\n- :param handler_type:\n- The type of the handler to be checked.\n-\n- :return: returns True if handler of type `handler_type`.\n- \"\"\"\n- for handler in logger.handlers:\n- if isinstance(handler, handler_type):\n- return True\n- return False\n-\n-\n-class App(QMPClient):\n- \"\"\"\n- Implements the QMP TUI.\n-\n- Initializes the widgets and starts the urwid event loop.\n-\n- :param address:\n- Address of the server to connect to.\n- :param num_retries:\n- The number of times to retry before stopping to reconnect.\n- :param retry_delay:\n- The delay(sec) before each retry\n- \"\"\"\n- def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,\n- retry_delay: Optional[int]) -> None:\n- urwid.register_signal(type(self), UPDATE_MSG)\n- self.window = Window(self)\n- self.address = address\n- self.aloop: Optional[asyncio.AbstractEventLoop] = None\n- self.num_retries = num_retries\n- self.retry_delay = retry_delay if retry_delay else 2\n- self.retry: bool = False\n- self.exiting: bool = False\n- super().__init__()\n-\n- def add_to_history(self, msg: str, level: Optional[str] = None) -> None:\n- \"\"\"\n- Appends the msg to the history list.\n-\n- :param msg:\n- The raw message to be appended in string type.\n- \"\"\"\n- urwid.emit_signal(self, UPDATE_MSG, msg, level)\n-\n- def _cb_outbound(self, msg: Message) -> Message:\n- \"\"\"\n- Callback: outbound message hook.\n-\n- Appends the outgoing messages to the history box.\n-\n- :param msg: raw outbound message.\n- :return: final outbound message.\n- \"\"\"\n- str_msg = str(msg)\n-\n- if not has_handler_type(logging.getLogger(), TUILogHandler):\n- logging.debug('Request: %s', str_msg)\n- self.add_to_history('<-- ' + str_msg)\n- return msg\n-\n- def _cb_inbound(self, msg: Message) -> Message:\n- \"\"\"\n- Callback: outbound message hook.\n-\n- Appends the incoming messages to the history box.\n-\n- :param msg: raw inbound message.\n- :return: final inbound message.\n- \"\"\"\n- str_msg = str(msg)\n-\n- if not has_handler_type(logging.getLogger(), TUILogHandler):\n- logging.debug('Request: %s', str_msg)\n- self.add_to_history('--> ' + str_msg)\n- return msg\n-\n- async def _send_to_server(self, msg: Message) -> None:\n- \"\"\"\n- This coroutine sends the message to the server.\n- The message has to be pre-validated.\n-\n- :param msg:\n- Pre-validated message to be to sent to the server.\n-\n- :raise Exception: When an unhandled exception is caught.\n- \"\"\"\n- try:\n- await self._raw(msg, assign_id='id' not in msg)\n- except ExecInterruptedError as err:\n- logging.info('Error server disconnected before reply %s', str(err))\n- self.add_to_history('Server disconnected before reply', 'ERROR')\n- except Exception as err:\n- logging.error('Exception from _send_to_server: %s', str(err))\n- raise err\n-\n- def cb_send_to_server(self, raw_msg: str) -> None:\n- \"\"\"\n- Validates and sends the message to the server.\n- The raw string message is first converted into a Message object\n- and is then sent to the server.\n-\n- :param raw_msg:\n- The raw string message to be sent to the server.\n-\n- :raise Exception: When an unhandled exception is caught.\n- \"\"\"\n- try:\n- msg = Message(bytes(raw_msg, encoding='utf-8'))\n- asyncio.create_task(self._send_to_server(msg))\n- except (DeserializationError, UnexpectedTypeError) as err:\n- raw_msg = format_json(raw_msg)\n- logging.info('Invalid message: %s', err.error_message)\n- self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')\n-\n- def unhandled_input(self, key: str) -> None:\n- \"\"\"\n- Handle's keys which haven't been handled by the child widgets.\n-\n- :param key:\n- Unhandled key\n- \"\"\"\n- if key == 'esc':\n- self.kill_app()\n-\n- def kill_app(self) -> None:\n- \"\"\"\n- Initiates killing of app. A bridge between asynchronous and synchronous\n- code.\n- \"\"\"\n- asyncio.create_task(self._kill_app())\n-\n- async def _kill_app(self) -> None:\n- \"\"\"\n- This coroutine initiates the actual disconnect process and calls\n- urwid.ExitMainLoop() to kill the TUI.\n-\n- :raise Exception: When an unhandled exception is caught.\n- \"\"\"\n- self.exiting = True\n- await self.disconnect()\n- logging.debug('Disconnect finished. Exiting app')\n- raise urwid.ExitMainLoop()\n-\n- async def disconnect(self) -> None:\n- \"\"\"\n- Overrides the disconnect method to handle the errors locally.\n- \"\"\"\n- try:\n- await super().disconnect()\n- except (OSError, EOFError) as err:\n- logging.info('disconnect: %s', str(err))\n- self.retry = True\n- except ProtocolError as err:\n- logging.info('disconnect: %s', str(err))\n- except Exception as err:\n- logging.error('disconnect: Unhandled exception %s', str(err))\n- raise err\n-\n- def _set_status(self, msg: str) -> None:\n- \"\"\"\n- Sets the message as the status.\n-\n- :param msg:\n- The message to be displayed in the status bar.\n- \"\"\"\n- self.window.footer.set_text(msg)\n-\n- def _get_formatted_address(self) -> str:\n- \"\"\"\n- Returns a formatted version of the server's address.\n-\n- :return: formatted address\n- \"\"\"\n- if isinstance(self.address, tuple):\n- host, port = self.address\n- addr = f'{host}:{port}'\n- else:\n- addr = f'{self.address}'\n- return addr\n-\n- async def _initiate_connection(self) -> Optional[ConnectError]:\n- \"\"\"\n- Tries connecting to a server a number of times with a delay between\n- each try. If all retries failed then return the error faced during\n- the last retry.\n-\n- :return: Error faced during last retry.\n- \"\"\"\n- current_retries = 0\n- err = None\n-\n- # initial try\n- await self.connect_server()\n- while self.retry and current_retries < self.num_retries:\n- logging.info('Connection Failed, retrying in %d', self.retry_delay)\n- status = f'[Retry #{current_retries} ({self.retry_delay}s)]'\n- self._set_status(status)\n-\n- await asyncio.sleep(self.retry_delay)\n-\n- err = await self.connect_server()\n- current_retries += 1\n- # If all retries failed report the last error\n- if err:\n- logging.info('All retries failed: %s', err)\n- return err\n- return None\n-\n- async def manage_connection(self) -> None:\n- \"\"\"\n- Manage the connection based on the current run state.\n-\n- A reconnect is issued when the current state is IDLE and the number\n- of retries is not exhausted.\n- A disconnect is issued when the current state is DISCONNECTING.\n- \"\"\"\n- while not self.exiting:\n- if self.runstate == Runstate.IDLE:\n- err = await self._initiate_connection()\n- # If retry is still true then, we have exhausted all our tries.\n- if err:\n- self._set_status(f'[Error: {err.error_message}]')\n- else:\n- addr = self._get_formatted_address()\n- self._set_status(f'[Connected {addr}]')\n- elif self.runstate == Runstate.DISCONNECTING:\n- self._set_status('[Disconnected]')\n- await self.disconnect()\n- # check if a retry is needed\n- # mypy 1.4.0 doesn't believe runstate can change after\n- # disconnect(), hence the cast.\n- state = cast(Runstate, self.runstate)\n- if state == Runstate.IDLE:\n- continue\n- await self.runstate_changed()\n-\n- async def connect_server(self) -> Optional[ConnectError]:\n- \"\"\"\n- Initiates a connection to the server at address `self.address`\n- and in case of a failure, sets the status to the respective error.\n- \"\"\"\n- try:\n- await self.connect(self.address)\n- self.retry = False\n- except ConnectError as err:\n- logging.info('connect_server: ConnectError %s', str(err))\n- self.retry = True\n- return err\n- return None\n-\n- def run(self, debug: bool = False) -> None:\n- \"\"\"\n- Starts the long running co-routines and the urwid event loop.\n-\n- :param debug:\n- Enables/Disables asyncio event loop debugging\n- \"\"\"\n- screen = urwid.raw_display.Screen()\n- screen.set_terminal_properties(256)\n- self.aloop = get_or_create_event_loop()\n- self.aloop.set_debug(debug)\n-\n- # Gracefully handle SIGTERM and SIGINT signals\n- cancel_signals = [signal.SIGTERM, signal.SIGINT]\n- for sig in cancel_signals:\n- self.aloop.add_signal_handler(sig, self.kill_app)\n-\n- event_loop = urwid.AsyncioEventLoop(loop=self.aloop)\n- main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),\n- unhandled_input=self.unhandled_input,\n- screen=screen,\n- palette=palette,\n- handle_mouse=True,\n- event_loop=event_loop)\n-\n- self.aloop.create_task(self.manage_connection())\n- try:\n- main_loop.run()\n- except Exception as err:\n- logging.error('%s\\n%s\\n', str(err), pretty_traceback())\n- raise err\n-\n-\n-class StatusBar(urwid.Text):\n- \"\"\"\n- A simple statusbar modelled using the Text widget. The status can be\n- set using the set_text function. All text set is aligned to right.\n-\n- :param text: Initial text to be displayed. Default is empty str.\n- \"\"\"\n- def __init__(self, text: str = ''):\n- super().__init__(text, align='right')\n-\n-\n-class Editor(urwid_readline.ReadlineEdit):\n- \"\"\"\n- A simple editor modelled using the urwid_readline.ReadlineEdit widget.\n- Mimcs GNU readline shortcuts and provides history support.\n-\n- The readline shortcuts can be found below:\n- https://github.com/rr-/urwid_readline#features\n-\n- Along with the readline features, this editor also has support for\n- history. Pressing the 'up'/'down' switches between the prev/next messages\n- available in the history.\n-\n- Currently there is no support to save the history to a file. The history of\n- previous commands is lost on exit.\n-\n- :param parent: Reference to the TUI object.\n- \"\"\"\n- def __init__(self, parent: App) -> None:\n- super().__init__(caption='> ', multiline=True)\n- self.parent = parent\n- self.history: List[str] = []\n- self.last_index: int = -1\n- self.show_history: bool = False\n-\n- def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:\n- \"\"\"\n- Handles the keypress on this widget.\n-\n- :param size:\n- The current size of the widget.\n- :param key:\n- The key to be handled.\n-\n- :return: Unhandled key if any.\n- \"\"\"\n- msg = self.get_edit_text()\n- if key == 'up' and not msg:\n- # Show the history when 'up arrow' is pressed with no input text.\n- # NOTE: The show_history logic is necessary because in 'multiline'\n- # mode (which we use) 'up arrow' is used to move between lines.\n- if not self.history:\n- return None\n- self.show_history = True\n- last_msg = self.history[self.last_index]\n- self.set_edit_text(last_msg)\n- self.edit_pos = len(last_msg)\n- elif key == 'up' and self.show_history:\n- self.last_index = max(self.last_index - 1, -len(self.history))\n- self.set_edit_text(self.history[self.last_index])\n- self.edit_pos = len(self.history[self.last_index])\n- elif key == 'down' and self.show_history:\n- if self.last_index == -1:\n- self.set_edit_text('')\n- self.show_history = False\n- else:\n- self.last_index += 1\n- self.set_edit_text(self.history[self.last_index])\n- self.edit_pos = len(self.history[self.last_index])\n- elif key == 'meta enter':\n- # When using multiline, enter inserts a new line into the editor\n- # send the input to the server on alt + enter\n- self.parent.cb_send_to_server(msg)\n- self.history.append(msg)\n- self.set_edit_text('')\n- self.last_index = -1\n- self.show_history = False\n- else:\n- self.show_history = False\n- self.last_index = -1\n- return cast(Optional[str], super().keypress(size, key))\n- return None\n-\n-\n-class EditorWidget(urwid.Filler):\n- \"\"\"\n- Wrapper around the editor widget.\n-\n- The Editor is a flow widget and has to wrapped inside a box widget.\n- This class wraps the Editor inside filler widget.\n-\n- :param parent: Reference to the TUI object.\n- \"\"\"\n- def __init__(self, parent: App) -> None:\n- super().__init__(Editor(parent), valign='top')\n-\n-\n-class HistoryBox(urwid.ListBox):\n- \"\"\"\n- This widget is modelled using the ListBox widget, contains the list of\n- all messages both QMP messages and log messages to be shown in the TUI.\n-\n- The messages are urwid.Text widgets. On every append of a message, the\n- focus is shifted to the last appended message.\n-\n- :param parent: Reference to the TUI object.\n- \"\"\"\n- def __init__(self, parent: App) -> None:\n- self.parent = parent\n- self.history = urwid.SimpleFocusListWalker([])\n- super().__init__(self.history)\n-\n- def add_to_history(self,\n- history: Union[str, List[Tuple[str, str]]]) -> None:\n- \"\"\"\n- Appends a message to the list and set the focus to the last appended\n- message.\n-\n- :param history:\n- The history item(message/event) to be appended to the list.\n- \"\"\"\n- self.history.append(urwid.Text(history))\n- self.history.set_focus(len(self.history) - 1)\n-\n- def mouse_event(self, size: Tuple[int, int], _event: str, button: float,\n- _x: int, _y: int, focus: bool) -> None:\n- # Unfortunately there are no urwid constants that represent the mouse\n- # events.\n- if button == 4: # Scroll up event\n- super().keypress(size, 'up')\n- elif button == 5: # Scroll down event\n- super().keypress(size, 'down')\n-\n-\n-class HistoryWindow(urwid.Frame):\n- \"\"\"\n- This window composes the HistoryBox and EditorWidget in a horizontal split.\n- By default the first focus is given to the history box.\n-\n- :param parent: Reference to the TUI object.\n- \"\"\"\n- def __init__(self, parent: App) -> None:\n- self.parent = parent\n- self.editor_widget = EditorWidget(parent)\n- self.editor = urwid.LineBox(self.editor_widget)\n- self.history = HistoryBox(parent)\n- self.body = urwid.Pile([('weight', 80, self.history),\n- ('weight', 20, self.editor)])\n- super().__init__(self.body)\n- urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)\n-\n- def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:\n- \"\"\"\n- Appends a message to the history box\n-\n- :param msg:\n- The message to be appended to the history box.\n- :param level:\n- The log level of the message, if it is a log message.\n- \"\"\"\n- formatted = []\n- if level:\n- msg = f'[{level}]: {msg}'\n- formatted.append((level, msg))\n- else:\n- lexer = lexers.JsonLexer() # pylint: disable=no-member\n- for token in lexer.get_tokens(msg):\n- formatted.append(token)\n- self.history.add_to_history(formatted)\n-\n-\n-class Window(urwid.Frame):\n- \"\"\"\n- This window is the top most widget of the TUI and will contain other\n- windows. Each child of this widget is responsible for displaying a specific\n- functionality.\n-\n- :param parent: Reference to the TUI object.\n- \"\"\"\n- def __init__(self, parent: App) -> None:\n- self.parent = parent\n- footer = StatusBar()\n- body = HistoryWindow(parent)\n- super().__init__(body, footer=footer)\n-\n-\n-class TUILogHandler(Handler):\n- \"\"\"\n- This handler routes all the log messages to the TUI screen.\n- It is installed to the root logger to so that the log message from all\n- libraries begin used is routed to the screen.\n-\n- :param tui: Reference to the TUI object.\n- \"\"\"\n- def __init__(self, tui: App) -> None:\n- super().__init__()\n- self.tui = tui\n-\n- def emit(self, record: LogRecord) -> None:\n- \"\"\"\n- Emits a record to the TUI screen.\n-\n- Appends the log message to the TUI screen\n- \"\"\"\n- level = record.levelname\n- msg = record.getMessage()\n- self.tui.add_to_history(msg, level)\n-\n-\n-def main() -> None:\n- \"\"\"\n- Driver of the whole script, parses arguments, initialize the TUI and\n- the logger.\n- \"\"\"\n- parser = argparse.ArgumentParser(description='QMP TUI')\n- parser.add_argument('qmp_server', help='Address of the QMP server. '\n- 'Format <UNIX socket path | TCP addr:port>')\n- parser.add_argument('--num-retries', type=int, default=10,\n- help='Number of times to reconnect before giving up.')\n- parser.add_argument('--retry-delay', type=int,\n- help='Time(s) to wait before next retry. '\n- 'Default action is to wait 2s between each retry.')\n- parser.add_argument('--log-file', help='The Log file name')\n- parser.add_argument('--log-level', default='WARNING',\n- help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')\n- parser.add_argument('--asyncio-debug', action='store_true',\n- help='Enable debug mode for asyncio loop. '\n- 'Generates lot of output, makes TUI unusable when '\n- 'logs are logged in the TUI. '\n- 'Use only when logging to a file.')\n- args = parser.parse_args()\n-\n- try:\n- address = QEMUMonitorProtocol.parse_address(args.qmp_server)\n- except QMPBadPortError as err:\n- parser.error(str(err))\n-\n- app = App(address, args.num_retries, args.retry_delay)\n-\n- root_logger = logging.getLogger()\n- root_logger.setLevel(logging.getLevelName(args.log_level))\n-\n- if args.log_file:\n- root_logger.addHandler(logging.FileHandler(args.log_file))\n- else:\n- root_logger.addHandler(TUILogHandler(app))\n-\n- app.run(args.asyncio_debug)\n-\n-\n-if __name__ == '__main__':\n- main()\ndiff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py\ndeleted file mode 100644\nindex a8229e55245..00000000000\n--- a/python/qemu/qmp/util.py\n+++ /dev/null\n@@ -1,150 +0,0 @@\n-\"\"\"\n-Miscellaneous Utilities\n-\n-This module provides asyncio and various logging and debugging\n-utilities, such as `exception_summary()` and `pretty_traceback()`, used\n-primarily for adding information into the logging stream.\n-\"\"\"\n-\n-import asyncio\n-import sys\n-import traceback\n-from typing import TypeVar, cast\n-import warnings\n-\n-\n-T = TypeVar('T')\n-\n-\n-# --------------------------\n-# Section: Utility Functions\n-# --------------------------\n-\n-\n-def get_or_create_event_loop() -> asyncio.AbstractEventLoop:\n- \"\"\"\n- Return this thread's current event loop, or create a new one.\n-\n- This function behaves similarly to asyncio.get_event_loop() in\n- Python<=3.13, where if there is no event loop currently associated\n- with the current context, it will create and register one. It should\n- generally not be used in any asyncio-native applications.\n- \"\"\"\n- try:\n- with warnings.catch_warnings():\n- # Python <= 3.13 will trigger deprecation warnings if no\n- # event loop is set, but will create and set a new loop.\n- warnings.simplefilter(\"ignore\")\n- loop = asyncio.get_event_loop()\n- except RuntimeError:\n- # Python 3.14+: No event loop set for this thread,\n- # create and set one.\n- loop = asyncio.new_event_loop()\n- # Set this loop as the current thread's loop, to be returned\n- # by calls to get_event_loop() in the future.\n- asyncio.set_event_loop(loop)\n-\n- return loop\n-\n-\n-async def flush(writer: asyncio.StreamWriter) -> None:\n- \"\"\"\n- Utility function to ensure an `asyncio.StreamWriter` is *fully* drained.\n-\n- `asyncio.StreamWriter.drain` only promises we will return to below\n- the \"high-water mark\". This function ensures we flush the entire\n- buffer -- by setting the high water mark to 0 and then calling\n- drain. The flow control limits are restored after the call is\n- completed.\n- \"\"\"\n- transport = cast( # type: ignore[redundant-cast]\n- asyncio.WriteTransport, writer.transport\n- )\n-\n- # https://github.com/python/typeshed/issues/5779\n- low, high = transport.get_write_buffer_limits() # type: ignore\n- transport.set_write_buffer_limits(0, 0)\n- try:\n- await writer.drain()\n- finally:\n- transport.set_write_buffer_limits(high, low)\n-\n-\n-def upper_half(func: T) -> T:\n- \"\"\"\n- Do-nothing decorator that annotates a method as an \"upper-half\" method.\n-\n- These methods must not call bottom-half functions directly, but can\n- schedule them to run.\n- \"\"\"\n- return func\n-\n-\n-def bottom_half(func: T) -> T:\n- \"\"\"\n- Do-nothing decorator that annotates a method as a \"bottom-half\" method.\n-\n- These methods must take great care to handle their own exceptions whenever\n- possible. If they go unhandled, they will cause termination of the loop.\n-\n- These methods do not, in general, have the ability to directly\n- report information to a caller’s context and will usually be\n- collected as an `asyncio.Task` result instead.\n-\n- They must not call upper-half functions directly.\n- \"\"\"\n- return func\n-\n-\n-# ----------------------------\n-# Section: Logging & Debugging\n-# ----------------------------\n-\n-\n-def exception_summary(exc: BaseException) -> str:\n- \"\"\"\n- Return a summary string of an arbitrary exception.\n-\n- It will be of the form \"ExceptionType: Error Message\" if the error\n- string is non-empty, and just \"ExceptionType\" otherwise.\n-\n- This code is based on CPython's implementation of\n- `traceback.TracebackException.format_exception_only`.\n- \"\"\"\n- name = type(exc).__qualname__\n- smod = type(exc).__module__\n- if smod not in (\"__main__\", \"builtins\"):\n- name = smod + '.' + name\n-\n- error = str(exc)\n- if error:\n- return f\"{name}: {error}\"\n- return name\n-\n-\n-def pretty_traceback(prefix: str = \" | \") -> str:\n- \"\"\"\n- Formats the current traceback, indented to provide visual distinction.\n-\n- This is useful for printing a traceback within a traceback for\n- debugging purposes when encapsulating errors to deliver them up the\n- stack; when those errors are printed, this helps provide a nice\n- visual grouping to quickly identify the parts of the error that\n- belong to the inner exception.\n-\n- :param prefix: The prefix to append to each line of the traceback.\n- :return: A string, formatted something like the following::\n-\n- | Traceback (most recent call last):\n- | File \"foobar.py\", line 42, in arbitrary_example\n- | foo.baz()\n- | ArbitraryError: [Errno 42] Something bad happened!\n- \"\"\"\n- output = \"\".join(traceback.format_exception(*sys.exc_info()))\n-\n- exc_lines = []\n- for line in output.split('\\n'):\n- exc_lines.append(prefix + line)\n-\n- # The last line is always empty, omit it\n- return \"\\n\".join(exc_lines[:-1])\ndiff --git a/python/qemu/utils/qom_fuse.py b/python/qemu/utils/qom_fuse.py\nindex cf7e344bd53..e377ef6942f 100644\n--- a/python/qemu/utils/qom_fuse.py\n+++ b/python/qemu/utils/qom_fuse.py\n@@ -47,7 +47,6 @@\n \n import fuse\n from fuse import FUSE, FuseOSError, Operations\n-\n from qemu.qmp import ExecuteError\n \n from .qom_common import QOMCommand\ndiff --git a/python/setup.cfg b/python/setup.cfg\nindex f40f11396c9..c46a95f8d41 100644\n--- a/python/setup.cfg\n+++ b/python/setup.cfg\n@@ -24,9 +24,10 @@ classifiers =\n [options]\n python_requires = >= 3.9\n packages =\n- qemu.qmp\n qemu.machine\n qemu.utils\n+install_requires =\n+ qemu.qmp\n \n [options.package_data]\n * = py.typed\n@@ -38,26 +39,17 @@ devel =\n distlib >= 0.3.6\n flake8 >= 5.0.4\n fusepy >= 2.0.4\n- isort >= 5.1.2\n+ isort >= 5.6.0\n mypy >= 1.4.0\n pylint >= 2.17.3\n pylint != 3.2.4; python_version<\"3.9\"\n tox >= 3.18.0\n- urwid >= 2.1.2\n- urwid-readline >= 0.13\n- Pygments >= 2.9.0\n sphinx >= 3.4.3\n \n # Provides qom-fuse functionality\n fuse =\n fusepy >= 2.0.4\n \n-# QMP TUI dependencies\n-tui =\n- urwid >= 2.1.2\n- urwid-readline >= 0.13\n- Pygments >= 2.9.0\n-\n [options.entry_points]\n console_scripts =\n qom = qemu.utils.qom:main\n@@ -67,9 +59,6 @@ console_scripts =\n qom-tree = qemu.utils.qom:QOMTree.entry_point\n qom-fuse = qemu.utils.qom_fuse:QOMFuse.entry_point [fuse]\n qemu-ga-client = qemu.utils.qemu_ga_client:main\n- qmp-shell = qemu.qmp.qmp_shell:main\n- qmp-shell-wrap = qemu.qmp.qmp_shell:main_wrap\n- qmp-tui = qemu.qmp.qmp_tui:main [tui]\n \n [flake8]\n # Prefer pylint's bare-except checks to flake8's\n@@ -86,10 +75,6 @@ warn_unused_ignores = False\n # fusepy has no type stubs:\n allow_subclassing_any = True\n \n-[mypy-qemu.qmp.qmp_tui]\n-# urwid and urwid_readline have no type stubs:\n-allow_subclassing_any = True\n-\n # The following missing import directives are because these libraries do not\n # provide type stubs. Allow them on an as-needed basis for mypy.\n [mypy-fuse]\n@@ -101,15 +86,6 @@ ignore_missing_imports = True\n [mypy-tomllib]\n ignore_missing_imports = True\n \n-[mypy-urwid]\n-ignore_missing_imports = True\n-\n-[mypy-urwid_readline]\n-ignore_missing_imports = True\n-\n-[mypy-pygments]\n-ignore_missing_imports = True\n-\n [mypy-distlib]\n ignore_missing_imports = True\n \n@@ -194,7 +170,6 @@ allowlist_externals = make\n deps =\n .[devel]\n .[fuse] # Workaround to trigger tox venv rebuild\n- .[tui] # Workaround to trigger tox venv rebuild\n commands =\n make check\n \ndiff --git a/python/tests/minreqs.txt b/python/tests/minreqs.txt\nindex cd2e2a81c3d..855b5129c94 100644\n--- a/python/tests/minreqs.txt\n+++ b/python/tests/minreqs.txt\n@@ -20,10 +20,8 @@ setuptools<=70\n # Dependencies for qapidoc/qapi_domain et al\n sphinx==3.4.3\n \n-# Dependencies for the TUI addon (Required for successful linting)\n-urwid==2.1.2\n-urwid-readline==0.13\n-Pygments==2.9.0\n+# Dependencies for qemu.machine\n+qemu.qmp==0.0.5\n \n # Dependencies for mkvenv\n distlib==0.3.6\n@@ -36,7 +34,7 @@ avocado-framework==90.0\n \n # Linters\n flake8==5.0.4\n-isort==5.1.2\n+isort==5.6.0\n mypy==1.4.0\n pylint==2.17.3\n \ndiff --git a/python/tests/protocol.py b/python/tests/protocol.py\ndeleted file mode 100644\nindex e565802516d..00000000000\n--- a/python/tests/protocol.py\n+++ /dev/null\n@@ -1,596 +0,0 @@\n-import asyncio\n-from contextlib import contextmanager\n-import os\n-import socket\n-from tempfile import TemporaryDirectory\n-\n-import avocado\n-\n-from qemu.qmp import ConnectError, Runstate\n-from qemu.qmp.protocol import AsyncProtocol, StateError\n-\n-\n-class NullProtocol(AsyncProtocol[None]):\n- \"\"\"\n- NullProtocol is a test mockup of an AsyncProtocol implementation.\n-\n- It adds a fake_session instance variable that enables a code path\n- that bypasses the actual connection logic, but still allows the\n- reader/writers to start.\n-\n- Because the message type is defined as None, an asyncio.Event named\n- 'trigger_input' is created that prohibits the reader from\n- incessantly being able to yield None; this event can be poked to\n- simulate an incoming message.\n-\n- For testing symmetry with do_recv, an interface is added to \"send\" a\n- Null message.\n-\n- For testing purposes, a \"simulate_disconnection\" method is also\n- added which allows us to trigger a bottom half disconnect without\n- injecting any real errors into the reader/writer loops; in essence\n- it performs exactly half of what disconnect() normally does.\n- \"\"\"\n- def __init__(self, name=None):\n- self.fake_session = False\n- self.trigger_input: asyncio.Event\n- super().__init__(name)\n-\n- async def _establish_session(self):\n- self.trigger_input = asyncio.Event()\n- await super()._establish_session()\n-\n- async def _do_start_server(self, address, ssl=None):\n- if self.fake_session:\n- self._accepted = asyncio.Event()\n- self._set_state(Runstate.CONNECTING)\n- await asyncio.sleep(0)\n- else:\n- await super()._do_start_server(address, ssl)\n-\n- async def _do_accept(self):\n- if self.fake_session:\n- self._accepted = None\n- else:\n- await super()._do_accept()\n-\n- async def _do_connect(self, address, ssl=None):\n- if self.fake_session:\n- self._set_state(Runstate.CONNECTING)\n- await asyncio.sleep(0)\n- else:\n- await super()._do_connect(address, ssl)\n-\n- async def _do_recv(self) -> None:\n- await self.trigger_input.wait()\n- self.trigger_input.clear()\n-\n- def _do_send(self, msg: None) -> None:\n- pass\n-\n- async def send_msg(self) -> None:\n- await self._outgoing.put(None)\n-\n- async def simulate_disconnect(self) -> None:\n- \"\"\"\n- Simulates a bottom-half disconnect.\n-\n- This method schedules a disconnection but does not wait for it\n- to complete. This is used to put the loop into the DISCONNECTING\n- state without fully quiescing it back to IDLE. This is normally\n- something you cannot coax AsyncProtocol to do on purpose, but it\n- will be similar to what happens with an unhandled Exception in\n- the reader/writer.\n-\n- Under normal circumstances, the library design requires you to\n- await on disconnect(), which awaits the disconnect task and\n- returns bottom half errors as a pre-condition to allowing the\n- loop to return back to IDLE.\n- \"\"\"\n- self._schedule_disconnect()\n-\n-\n-class LineProtocol(AsyncProtocol[str]):\n- def __init__(self, name=None):\n- super().__init__(name)\n- self.rx_history = []\n-\n- async def _do_recv(self) -> str:\n- raw = await self._readline()\n- msg = raw.decode()\n- self.rx_history.append(msg)\n- return msg\n-\n- def _do_send(self, msg: str) -> None:\n- assert self._writer is not None\n- self._writer.write(msg.encode() + b'\\n')\n-\n- async def send_msg(self, msg: str) -> None:\n- await self._outgoing.put(msg)\n-\n-\n-def run_as_task(coro, allow_cancellation=False):\n- \"\"\"\n- Run a given coroutine as a task.\n-\n- Optionally, wrap it in a try..except block that allows this\n- coroutine to be canceled gracefully.\n- \"\"\"\n- async def _runner():\n- try:\n- await coro\n- except asyncio.CancelledError:\n- if allow_cancellation:\n- return\n- raise\n- return asyncio.create_task(_runner())\n-\n-\n-@contextmanager\n-def jammed_socket():\n- \"\"\"\n- Opens up a random unused TCP port on localhost, then jams it.\n- \"\"\"\n- socks = []\n-\n- try:\n- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\n- sock.bind(('127.0.0.1', 0))\n- sock.listen(1)\n- address = sock.getsockname()\n-\n- socks.append(sock)\n-\n- # I don't *fully* understand why, but it takes *two* un-accepted\n- # connections to start jamming the socket.\n- for _ in range(2):\n- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n- sock.connect(address)\n- socks.append(sock)\n-\n- yield address\n-\n- finally:\n- for sock in socks:\n- sock.close()\n-\n-\n-class Smoke(avocado.Test):\n-\n- def setUp(self):\n- self.proto = NullProtocol()\n-\n- def test__repr__(self):\n- self.assertEqual(\n- repr(self.proto),\n- \"<NullProtocol runstate=IDLE>\"\n- )\n-\n- def testRunstate(self):\n- self.assertEqual(\n- self.proto.runstate,\n- Runstate.IDLE\n- )\n-\n- def testDefaultName(self):\n- self.assertEqual(\n- self.proto.name,\n- None\n- )\n-\n- def testLogger(self):\n- self.assertEqual(\n- self.proto.logger.name,\n- 'qemu.qmp.protocol'\n- )\n-\n- def testName(self):\n- self.proto = NullProtocol('Steve')\n-\n- self.assertEqual(\n- self.proto.name,\n- 'Steve'\n- )\n-\n- self.assertEqual(\n- self.proto.logger.name,\n- 'qemu.qmp.protocol.Steve'\n- )\n-\n- self.assertEqual(\n- repr(self.proto),\n- \"<NullProtocol name='Steve' runstate=IDLE>\"\n- )\n-\n-\n-class TestBase(avocado.Test):\n-\n- def setUp(self):\n- self.proto = NullProtocol(type(self).__name__)\n- self.assertEqual(self.proto.runstate, Runstate.IDLE)\n- self.runstate_watcher = None\n-\n- def tearDown(self):\n- self.assertEqual(self.proto.runstate, Runstate.IDLE)\n-\n- async def _asyncSetUp(self):\n- pass\n-\n- async def _asyncTearDown(self):\n- if self.runstate_watcher:\n- await self.runstate_watcher\n-\n- @staticmethod\n- def async_test(async_test_method):\n- \"\"\"\n- Decorator; adds SetUp and TearDown to async tests.\n- \"\"\"\n- async def _wrapper(self, *args, **kwargs):\n- loop = asyncio.get_running_loop()\n- loop.set_debug(True)\n-\n- await self._asyncSetUp()\n- await async_test_method(self, *args, **kwargs)\n- await self._asyncTearDown()\n-\n- return _wrapper\n-\n- # Definitions\n-\n- # The states we expect a \"bad\" connect/accept attempt to transition through\n- BAD_CONNECTION_STATES = (\n- Runstate.CONNECTING,\n- Runstate.DISCONNECTING,\n- Runstate.IDLE,\n- )\n-\n- # The states we expect a \"good\" session to transition through\n- GOOD_CONNECTION_STATES = (\n- Runstate.CONNECTING,\n- Runstate.RUNNING,\n- Runstate.DISCONNECTING,\n- Runstate.IDLE,\n- )\n-\n- # Helpers\n-\n- async def _watch_runstates(self, *states):\n- \"\"\"\n- This launches a task alongside (most) tests below to confirm that\n- the sequence of runstate changes that occur is exactly as\n- anticipated.\n- \"\"\"\n- async def _watcher():\n- for state in states:\n- new_state = await self.proto.runstate_changed()\n- self.assertEqual(\n- new_state,\n- state,\n- msg=f\"Expected state '{state.name}'\",\n- )\n-\n- self.runstate_watcher = asyncio.create_task(_watcher())\n- # Kick the loop and force the task to block on the event.\n- await asyncio.sleep(0)\n-\n-\n-class State(TestBase):\n-\n- @TestBase.async_test\n- async def testSuperfluousDisconnect(self):\n- \"\"\"\n- Test calling disconnect() while already disconnected.\n- \"\"\"\n- await self._watch_runstates(\n- Runstate.DISCONNECTING,\n- Runstate.IDLE,\n- )\n- await self.proto.disconnect()\n-\n-\n-class Connect(TestBase):\n- \"\"\"\n- Tests primarily related to calling Connect().\n- \"\"\"\n- async def _bad_connection(self, family: str):\n- assert family in ('INET', 'UNIX')\n-\n- if family == 'INET':\n- await self.proto.connect(('127.0.0.1', 0))\n- elif family == 'UNIX':\n- await self.proto.connect('/dev/null')\n-\n- async def _hanging_connection(self):\n- with jammed_socket() as addr:\n- await self.proto.connect(addr)\n-\n- async def _bad_connection_test(self, family: str):\n- await self._watch_runstates(*self.BAD_CONNECTION_STATES)\n-\n- with self.assertRaises(ConnectError) as context:\n- await self._bad_connection(family)\n-\n- self.assertIsInstance(context.exception.exc, OSError)\n- self.assertEqual(\n- context.exception.error_message,\n- \"Failed to establish connection\"\n- )\n-\n- @TestBase.async_test\n- async def testBadINET(self):\n- \"\"\"\n- Test an immediately rejected call to an IP target.\n- \"\"\"\n- await self._bad_connection_test('INET')\n-\n- @TestBase.async_test\n- async def testBadUNIX(self):\n- \"\"\"\n- Test an immediately rejected call to a UNIX socket target.\n- \"\"\"\n- await self._bad_connection_test('UNIX')\n-\n- @TestBase.async_test\n- async def testCancellation(self):\n- \"\"\"\n- Test what happens when a connection attempt is aborted.\n- \"\"\"\n- # Note that accept() cannot be cancelled outright, as it isn't a task.\n- # However, we can wrap it in a task and cancel *that*.\n- await self._watch_runstates(*self.BAD_CONNECTION_STATES)\n- task = run_as_task(self._hanging_connection(), allow_cancellation=True)\n-\n- state = await self.proto.runstate_changed()\n- self.assertEqual(state, Runstate.CONNECTING)\n-\n- # This is insider baseball, but the connection attempt has\n- # yielded *just* before the actual connection attempt, so kick\n- # the loop to make sure it's truly wedged.\n- await asyncio.sleep(0)\n-\n- task.cancel()\n- await task\n-\n- @TestBase.async_test\n- async def testTimeout(self):\n- \"\"\"\n- Test what happens when a connection attempt times out.\n- \"\"\"\n- await self._watch_runstates(*self.BAD_CONNECTION_STATES)\n- task = run_as_task(self._hanging_connection())\n-\n- # More insider baseball: to improve the speed of this test while\n- # guaranteeing that the connection even gets a chance to start,\n- # verify that the connection hangs *first*, then await the\n- # result of the task with a nearly-zero timeout.\n-\n- state = await self.proto.runstate_changed()\n- self.assertEqual(state, Runstate.CONNECTING)\n- await asyncio.sleep(0)\n-\n- with self.assertRaises(asyncio.TimeoutError):\n- await asyncio.wait_for(task, timeout=0)\n-\n- @TestBase.async_test\n- async def testRequire(self):\n- \"\"\"\n- Test what happens when a connection attempt is made while CONNECTING.\n- \"\"\"\n- await self._watch_runstates(*self.BAD_CONNECTION_STATES)\n- task = run_as_task(self._hanging_connection(), allow_cancellation=True)\n-\n- state = await self.proto.runstate_changed()\n- self.assertEqual(state, Runstate.CONNECTING)\n-\n- with self.assertRaises(StateError) as context:\n- await self._bad_connection('UNIX')\n-\n- self.assertEqual(\n- context.exception.error_message,\n- \"NullProtocol is currently connecting.\"\n- )\n- self.assertEqual(context.exception.state, Runstate.CONNECTING)\n- self.assertEqual(context.exception.required, Runstate.IDLE)\n-\n- task.cancel()\n- await task\n-\n- @TestBase.async_test\n- async def testImplicitRunstateInit(self):\n- \"\"\"\n- Test what happens if we do not wait on the runstate event until\n- AFTER a connection is made, i.e., connect()/accept() themselves\n- initialize the runstate event. All of the above tests force the\n- initialization by waiting on the runstate *first*.\n- \"\"\"\n- task = run_as_task(self._hanging_connection(), allow_cancellation=True)\n-\n- # Kick the loop to coerce the state change\n- await asyncio.sleep(0)\n- assert self.proto.runstate == Runstate.CONNECTING\n-\n- # We already missed the transition to CONNECTING\n- await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)\n-\n- task.cancel()\n- await task\n-\n-\n-class Accept(Connect):\n- \"\"\"\n- All of the same tests as Connect, but using the accept() interface.\n- \"\"\"\n- async def _bad_connection(self, family: str):\n- assert family in ('INET', 'UNIX')\n-\n- if family == 'INET':\n- await self.proto.start_server_and_accept(('example.com', 1))\n- elif family == 'UNIX':\n- await self.proto.start_server_and_accept('/dev/null')\n-\n- async def _hanging_connection(self):\n- with TemporaryDirectory(suffix='.qmp') as tmpdir:\n- sock = os.path.join(tmpdir, type(self.proto).__name__ + \".sock\")\n- await self.proto.start_server_and_accept(sock)\n-\n-\n-class FakeSession(TestBase):\n-\n- def setUp(self):\n- super().setUp()\n- self.proto.fake_session = True\n-\n- async def _asyncSetUp(self):\n- await super()._asyncSetUp()\n- await self._watch_runstates(*self.GOOD_CONNECTION_STATES)\n-\n- async def _asyncTearDown(self):\n- await self.proto.disconnect()\n- await super()._asyncTearDown()\n-\n- ####\n-\n- @TestBase.async_test\n- async def testFakeConnect(self):\n-\n- \"\"\"Test the full state lifecycle (via connect) with a no-op session.\"\"\"\n- await self.proto.connect('/not/a/real/path')\n- self.assertEqual(self.proto.runstate, Runstate.RUNNING)\n-\n- @TestBase.async_test\n- async def testFakeAccept(self):\n- \"\"\"Test the full state lifecycle (via accept) with a no-op session.\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n- self.assertEqual(self.proto.runstate, Runstate.RUNNING)\n-\n- @TestBase.async_test\n- async def testFakeRecv(self):\n- \"\"\"Test receiving a fake/null message.\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n-\n- logname = self.proto.logger.name\n- with self.assertLogs(logname, level='DEBUG') as context:\n- self.proto.trigger_input.set()\n- self.proto.trigger_input.clear()\n- await asyncio.sleep(0) # Kick reader.\n-\n- self.assertEqual(\n- context.output,\n- [f\"DEBUG:{logname}:<-- None\"],\n- )\n-\n- @TestBase.async_test\n- async def testFakeSend(self):\n- \"\"\"Test sending a fake/null message.\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n-\n- logname = self.proto.logger.name\n- with self.assertLogs(logname, level='DEBUG') as context:\n- # Cheat: Send a Null message to nobody.\n- await self.proto.send_msg()\n- # Kick writer; awaiting on a queue.put isn't sufficient to yield.\n- await asyncio.sleep(0)\n-\n- self.assertEqual(\n- context.output,\n- [f\"DEBUG:{logname}:--> None\"],\n- )\n-\n- async def _prod_session_api(\n- self,\n- current_state: Runstate,\n- error_message: str,\n- accept: bool = True\n- ):\n- with self.assertRaises(StateError) as context:\n- if accept:\n- await self.proto.start_server_and_accept('/not/a/real/path')\n- else:\n- await self.proto.connect('/not/a/real/path')\n-\n- self.assertEqual(context.exception.error_message, error_message)\n- self.assertEqual(context.exception.state, current_state)\n- self.assertEqual(context.exception.required, Runstate.IDLE)\n-\n- @TestBase.async_test\n- async def testAcceptRequireRunning(self):\n- \"\"\"Test that accept() cannot be called when Runstate=RUNNING\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n-\n- await self._prod_session_api(\n- Runstate.RUNNING,\n- \"NullProtocol is already connected and running.\",\n- accept=True,\n- )\n-\n- @TestBase.async_test\n- async def testConnectRequireRunning(self):\n- \"\"\"Test that connect() cannot be called when Runstate=RUNNING\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n-\n- await self._prod_session_api(\n- Runstate.RUNNING,\n- \"NullProtocol is already connected and running.\",\n- accept=False,\n- )\n-\n- @TestBase.async_test\n- async def testAcceptRequireDisconnecting(self):\n- \"\"\"Test that accept() cannot be called when Runstate=DISCONNECTING\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n-\n- # Cheat: force a disconnect.\n- await self.proto.simulate_disconnect()\n-\n- await self._prod_session_api(\n- Runstate.DISCONNECTING,\n- (\"NullProtocol is disconnecting.\"\n- \" Call disconnect() to return to IDLE state.\"),\n- accept=True,\n- )\n-\n- @TestBase.async_test\n- async def testConnectRequireDisconnecting(self):\n- \"\"\"Test that connect() cannot be called when Runstate=DISCONNECTING\"\"\"\n- await self.proto.start_server_and_accept('/not/a/real/path')\n-\n- # Cheat: force a disconnect.\n- await self.proto.simulate_disconnect()\n-\n- await self._prod_session_api(\n- Runstate.DISCONNECTING,\n- (\"NullProtocol is disconnecting.\"\n- \" Call disconnect() to return to IDLE state.\"),\n- accept=False,\n- )\n-\n-\n-class SimpleSession(TestBase):\n-\n- def setUp(self):\n- super().setUp()\n- self.server = LineProtocol(type(self).__name__ + '-server')\n-\n- async def _asyncSetUp(self):\n- await super()._asyncSetUp()\n- await self._watch_runstates(*self.GOOD_CONNECTION_STATES)\n-\n- async def _asyncTearDown(self):\n- await self.proto.disconnect()\n- try:\n- await self.server.disconnect()\n- except EOFError:\n- pass\n- await super()._asyncTearDown()\n-\n- @TestBase.async_test\n- async def testSmoke(self):\n- with TemporaryDirectory(suffix='.qmp') as tmpdir:\n- sock = os.path.join(tmpdir, type(self.proto).__name__ + \".sock\")\n- server_task = asyncio.create_task(\n- self.server.start_server_and_accept(sock))\n-\n- # give the server a chance to start listening [...]\n- await asyncio.sleep(0)\n- await self.proto.connect(sock)\n", "prefixes": [ "v7", "17/19" ] }