@@ -14,6 +14,7 @@
from asyncio import StreamReader, StreamWriter
from enum import Enum
from functools import wraps
+import logging
from ssl import SSLContext
from typing import (
Any,
@@ -34,6 +35,7 @@
create_task,
flush,
is_closing,
+ pretty_traceback,
upper_half,
wait_closed,
wait_task_done,
@@ -174,14 +176,28 @@ class AsyncProtocol(Generic[T]):
can be written after the super() call.
- `_on_message`:
Actions to be performed when a message is received.
+
+ :param name:
+ Name used for logging messages, if any. By default, messages
+ will log to 'qemu.aqmp.protocol', but each individual connection
+ can be given its own logger by giving it a name; messages will
+ then log to 'qemu.aqmp.protocol.${name}'.
"""
# pylint: disable=too-many-instance-attributes
+ #: Logger object for debugging messages from this connection.
+ logger = logging.getLogger(__name__)
+
# -------------------------
# Section: Public interface
# -------------------------
- def __init__(self) -> None:
+ def __init__(self, name: Optional[str] = None) -> None:
+ #: The nickname for this connection, if any.
+ self.name: Optional[str] = name
+ if self.name is not None:
+ self.logger = self.logger.getChild(self.name)
+
# stream I/O
self._reader: Optional[StreamReader] = None
self._writer: Optional[StreamWriter] = None
@@ -212,6 +228,15 @@ def __init__(self) -> None:
#: An `asyncio.Event` that signals when `runstate` is changed.
self.runstate_changed: asyncio.Event = asyncio.Event()
+ def __repr__(self) -> str:
+ argstr = ''
+ if self.name is not None:
+ argstr += f"name={self.name}"
+ return "{:s}({:s})".format(
+ type(self).__name__,
+ argstr,
+ )
+
@property
def runstate(self) -> Runstate:
"""The current `Runstate` of the connection."""
@@ -301,6 +326,8 @@ async def _new_session(self,
assert self.runstate == Runstate.IDLE
self._set_state(Runstate.CONNECTING)
+ if not self._outgoing.empty():
+ self.logger.warning("Outgoing message queue was not empty!")
self._outgoing = asyncio.Queue()
phase = "connection"
@@ -311,9 +338,15 @@ async def _new_session(self,
await self._begin_new_session()
except Exception as err:
- # Reset from CONNECTING back to IDLE.
- await self.disconnect()
emsg = f"Failed to establish {phase}"
+ self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+ try:
+ # Reset from CONNECTING back to IDLE.
+ await self.disconnect()
+ except:
+ emsg = "Unexpected bottom half exceptions"
+ self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+ raise
raise ConnectError(emsg, err) from err
assert self.runstate == Runstate.RUNNING
@@ -330,12 +363,16 @@ async def _do_connect(self, address: Union[str, Tuple[str, int]],
:raise OSError: For stream-related errors.
"""
+ self.logger.debug("Connecting ...")
+
if isinstance(address, tuple):
connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
else:
connect = asyncio.open_unix_connection(path=address, ssl=ssl)
self._reader, self._writer = await connect
+ self.logger.debug("Connected.")
+
@upper_half
async def _begin_new_session(self) -> None:
"""
@@ -343,8 +380,8 @@ async def _begin_new_session(self) -> None:
"""
assert self.runstate == Runstate.CONNECTING
- reader_coro = self._bh_loop_forever(self._bh_recv_message)
- writer_coro = self._bh_loop_forever(self._bh_send_message)
+ reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+ writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
self._reader_task = create_task(reader_coro)
self._writer_task = create_task(writer_coro)
@@ -374,6 +411,7 @@ def _schedule_disconnect(self, force: bool = False) -> None:
terminating execution. When `True`, terminate immediately.
"""
if not self._dc_task:
+ self.logger.debug("scheduling disconnect.")
self._dc_task = create_task(self._bh_disconnect(force))
@upper_half
@@ -499,8 +537,13 @@ async def _bh_disconnect(self, force: bool = False) -> None:
# This implicitly closes the reader, too.
if self._writer:
if not is_closing(self._writer):
+ self.logger.debug("Closing StreamWriter.")
self._writer.close()
+ self.logger.debug("Waiting for writer to close.")
await wait_closed(self._writer)
+ self.logger.debug("Writer closed.")
+
+ self.logger.debug("Disconnected.")
@bottom_half
async def _bh_stop_writer(self, force: bool = False) -> None:
@@ -513,17 +556,19 @@ async def _bh_stop_writer(self, force: bool = False) -> None:
# Cancel the writer task.
if self._writer_task and not self._writer_task.done():
+ self.logger.debug("Cancelling writer task.")
self._writer_task.cancel()
await wait_task_done(self._writer_task)
@bottom_half
async def _bh_stop_reader(self) -> None:
if self._reader_task and not self._reader_task.done():
+ self.logger.debug("Cancelling reader task.")
self._reader_task.cancel()
await wait_task_done(self._reader_task)
@bottom_half
- async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
+ async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
"""
Run one of the bottom-half methods in a loop forever.
@@ -531,16 +576,23 @@ async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
disconnect that will terminate the entire loop.
:param async_fn: The bottom-half method to run in a loop.
+ :param name: The name of this task, used for logging.
"""
try:
while True:
await async_fn()
except asyncio.CancelledError:
# We have been cancelled by _bh_disconnect, exit gracefully.
+ self.logger.debug("Task.%s: cancelled.", name)
return
except BaseException:
+ self.logger.error(
+ "Task.%s: failure:\n%s\n", name, pretty_traceback()
+ )
self._schedule_disconnect(force=True)
raise
+ finally:
+ self.logger.debug("Task.%s: exiting.", name)
@bottom_half
async def _bh_send_message(self) -> None:
@@ -3,10 +3,14 @@
This module primarily provides compatibility wrappers for Python 3.6 to
provide some features that otherwise become available in Python 3.7+.
+
+It additionally provides `pretty_traceback()`, used for formatting
+tracebacks for inclusion in the logging stream.
"""
import asyncio
import sys
+import traceback
from typing import (
Any,
Coroutine,
@@ -105,6 +109,34 @@ async def wait_task_done(task: Optional['asyncio.Future[Any]']) -> None:
break
+def pretty_traceback(prefix: str = " | ") -> str:
+ """
+ Formats the current traceback, indented to provide visual distinction.
+
+ This is useful for printing a traceback within a traceback for
+ debugging purposes when encapsulating errors to deliver them up the
+ stack; when those errors are printed, this helps provide a nice
+ visual grouping to quickly identify the parts of the error that
+ belong to the inner exception.
+
+ :param prefix: The prefix to append to each line of the traceback.
+ :return: A string, formatted something like the following::
+
+ | Traceback (most recent call last):
+ | File "foobar.py", line 42, in arbitrary_example
+ | foo.baz()
+ | ArbitraryError: [Errno 42] Something bad happened!
+ """
+ output = "".join(traceback.format_exception(*sys.exc_info()))
+
+ exc_lines = []
+ for line in output.split('\n'):
+ exc_lines.append(prefix + line)
+
+ # The last line is always empty, omit it
+ return "\n".join(exc_lines[:-1])
+
+
def upper_half(func: T) -> T:
"""
Do-nothing decorator that annotates a method as an "upper-half" method.
Give the connection and the reader/writer tasks nicknames, and add logging statements throughout. Signed-off-by: John Snow <jsnow@redhat.com> --- python/qemu/aqmp/protocol.py | 64 ++++++++++++++++++++++++++++++++---- python/qemu/aqmp/util.py | 32 ++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-)