Give the connection and the reader/writer tasks nicknames, and add logging statements throughout.
Signed-off-by: John Snow <js...@redhat.com> --- python/qemu/aqmp/protocol.py | 71 ++++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 247b60c31a6..f9295546cda 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -14,6 +14,7 @@ from asyncio import StreamReader, StreamWriter from enum import Enum from functools import wraps +import logging from ssl import SSLContext from typing import ( Any, @@ -31,8 +32,10 @@ from .util import ( bottom_half, create_task, + exception_summary, flush, is_closing, + pretty_traceback, upper_half, wait_closed, ) @@ -173,14 +176,28 @@ class AsyncProtocol(Generic[T]): can be written after the super() call. - `_on_message`: Actions to be performed when a message is received. + + :param name: + Name used for logging messages, if any. By default, messages + will log to 'qemu.aqmp.protocol', but each individual connection + can be given its own logger by giving it a name; messages will + then log to 'qemu.aqmp.protocol.${name}'. """ # pylint: disable=too-many-instance-attributes + #: Logger object for debugging messages from this connection. + logger = logging.getLogger(__name__) + # ------------------------- # Section: Public interface # ------------------------- - def __init__(self) -> None: + def __init__(self, name: Optional[str] = None) -> None: + #: The nickname for this connection, if any. + self.name: Optional[str] = name + if self.name is not None: + self.logger = self.logger.getChild(self.name) + # stream I/O self._reader: Optional[StreamReader] = None self._writer: Optional[StreamWriter] = None @@ -207,6 +224,14 @@ def __init__(self) -> None: self._runstate = Runstate.IDLE self._runstate_changed: Optional[asyncio.Event] = None + def __repr__(self) -> str: + cls_name = type(self).__name__ + tokens = [] + if self.name is not None: + tokens.append(f"name={self.name!r}") + tokens.append(f"runstate={self.runstate.name}") + return f"<{cls_name} {' '.join(tokens)}>" + @property # @upper_half def runstate(self) -> Runstate: """The current `Runstate` of the connection.""" @@ -275,6 +300,8 @@ def _set_state(self, state: Runstate) -> None: if state == self._runstate: return + self.logger.debug("Transitioning from '%s' to '%s'.", + str(self._runstate), str(state)) self._runstate = state self._runstate_event.set() self._runstate_event.clear() @@ -314,8 +341,15 @@ async def _new_session(self, except BaseException as err: emsg = f"Failed to establish {phase}" - # Reset from CONNECTING back to IDLE. - await self.disconnect() + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + try: + # Reset from CONNECTING back to IDLE. + await self.disconnect() + except: + emsg = "Unexpected bottom half exception" + self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) + raise # NB: CancelledError is not a BaseException before Python 3.8 if isinstance(err, asyncio.CancelledError): @@ -365,12 +399,16 @@ async def _do_connect(self, address: Union[str, Tuple[str, int]], :raise OSError: For stream-related errors. """ + self.logger.debug("Connecting to %s ...", address) + if isinstance(address, tuple): connect = asyncio.open_connection(address[0], address[1], ssl=ssl) else: connect = asyncio.open_unix_connection(path=address, ssl=ssl) self._reader, self._writer = await connect + self.logger.debug("Connected.") + @upper_half async def _establish_session(self) -> None: """ @@ -384,8 +422,8 @@ async def _establish_session(self) -> None: self._outgoing = asyncio.Queue() - reader_coro = self._bh_loop_forever(self._bh_recv_message) - writer_coro = self._bh_loop_forever(self._bh_send_message) + reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') + writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') self._reader_task = create_task(reader_coro) self._writer_task = create_task(writer_coro) @@ -412,6 +450,7 @@ def _schedule_disconnect(self) -> None: """ if not self._dc_task: self._set_state(Runstate.DISCONNECTING) + self.logger.debug("Scheduling disconnect.") self._dc_task = create_task(self._bh_disconnect()) @upper_half @@ -497,8 +536,10 @@ def _exception( # This implicitly closes the reader, too. if self._writer: if not is_closing(self._writer): + self.logger.debug("Closing StreamWriter.") self._writer.close() + self.logger.debug("Waiting for StreamWriter to close ...") try: await wait_closed(self._writer) except Exception as err: # pylint: disable=broad-except @@ -509,6 +550,10 @@ def _exception( self._reader_task, self._writer_task))): raise + self.logger.debug("StreamWriter closed.") + + self.logger.debug("Disconnected.") + @bottom_half async def _bh_stop_writer(self, force: bool = False) -> None: if not self._writer_task or self._writer_task.done(): @@ -516,10 +561,13 @@ async def _bh_stop_writer(self, force: bool = False) -> None: # If we're not in a hurry, drain the outbound queue. if not force: + self.logger.debug("Draining the outbound queue ...") await self._outgoing.join() if self._writer is not None: + self.logger.debug("Flushing the StreamWriter ...") await flush(self._writer) + self.logger.debug("Cancelling writer task ...") self._writer_task.cancel() # Waits for the writer to finish but does NOT raise its exception. await asyncio.wait((self._writer_task,)) @@ -529,12 +577,13 @@ async def _bh_stop_reader(self) -> None: if not self._reader_task or self._reader_task.done(): return + self.logger.debug("Cancelling reader task ...") self._reader_task.cancel() # Waits for the reader to finish but does NOT raise its exception. await asyncio.wait((self._reader_task,)) @bottom_half - async def _bh_loop_forever(self, async_fn: _TaskFN) -> None: + async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: """ Run one of the bottom-half methods in a loop forever. @@ -542,16 +591,24 @@ async def _bh_loop_forever(self, async_fn: _TaskFN) -> None: disconnect that will terminate the entire loop. :param async_fn: The bottom-half method to run in a loop. + :param name: The name of this task, used for logging. """ try: while True: await async_fn() except asyncio.CancelledError: # We have been cancelled by _bh_disconnect, exit gracefully. + self.logger.debug("Task.%s: cancelled.", name) return - except BaseException: + except BaseException as err: + self.logger.error("Task.%s: %s", + name, exception_summary(err)) + self.logger.debug("Task.%s: failure:\n%s\n", + name, pretty_traceback()) self._schedule_disconnect() raise + finally: + self.logger.debug("Task.%s: exiting.", name) @bottom_half async def _bh_send_message(self) -> None: -- 2.31.1