On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu <niteesh...@gmail.com>
wrote:

> Instead of manually connecting and disconnecting from the
> server. We now rely on the runstate to manage the QMP
> connection.
>
> Along with this the ability to reconnect on certain exceptions
> has also been added.
>
> Signed-off-by: G S Niteesh Babu <niteesh...@gmail.com>
> ---
>  python/qemu/aqmp/aqmp_tui.py | 109 ++++++++++++++++++++++++++++++-----
>  1 file changed, 94 insertions(+), 15 deletions(-)
>
> diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
> index 0d5ec62cb7..ef91883fa5 100644
> --- a/python/qemu/aqmp/aqmp_tui.py
> +++ b/python/qemu/aqmp/aqmp_tui.py
> @@ -25,8 +25,9 @@
>  import urwid_readline
>
>  from ..qmp import QEMUMonitorProtocol, QMPBadPortError
> +from .error import ProtocolError
>  from .message import DeserializationError, Message, UnexpectedTypeError
> -from .protocol import ConnectError
> +from .protocol import ConnectError, Runstate
>  from .qmp_client import ExecInterruptedError, QMPClient
>  from .util import create_task, pretty_traceback
>
> @@ -67,12 +68,24 @@ def format_json(msg: str) -> str:
>      return ' '.join(words)
>
>
> +def type_name(mtype: Any) -> str:
> +    """
> +    Returns the type name
> +    """
> +    return type(mtype).__name__
>

This is a lot of lines for something that doesn't do very much -- do we
really need it?


> +
> +
>  class App(QMPClient):
> -    def __init__(self, address: Union[str, Tuple[str, int]]) -> None:
> +    def __init__(self, address: Union[str, Tuple[str, int]], num_retries:
> int,
> +                 retry_delay: Optional[int]) -> None:
>          urwid.register_signal(type(self), UPDATE_MSG)
>          self.window = Window(self)
>          self.address = address
>          self.aloop: Optional[Any] = None  # FIXME: Use more concrete type.
> +        self.num_retries = num_retries
> +        self.retry_delay = retry_delay
> +        self.retry: bool = False
> +        self.disconnecting: bool = False
>

Why is this one needed again ? ...


>          super().__init__()
>
>      def add_to_history(self, msg: str, level: Optional[str] = None) ->
> None:
> @@ -119,7 +132,7 @@ def _cb_inbound(self, msg: Message) -> Message:
>              LOGGER.info('Error server disconnected before reply')
>              urwid.emit_signal(self, UPDATE_MSG,
>                                '{"error": "Server disconnected before
> reply"}')
> -            self._set_status("Server disconnected")
> +            await self.disconnect()
>          except Exception as err:
>              LOGGER.error('Exception from _send_to_server: %s', str(err))
>              raise err
> @@ -136,15 +149,29 @@ def kill_app(self) -> None:
>          create_task(self._kill_app())
>
>      async def _kill_app(self) -> None:
> -        # It is ok to call disconnect even in disconnect state
> +        await self.disconnect()
> +        LOGGER.debug('Disconnect finished. Exiting app')
> +        raise urwid.ExitMainLoop()
> +
> +    async def disconnect(self) -> None:
> +        if self.disconnecting:
> +            return
>          try:
> -            await self.disconnect()
> -            LOGGER.debug('Disconnect finished. Exiting app')
> +            self.disconnecting = True
> +            await super().disconnect()
> +            self.retry = True
> +        except EOFError as err:
> +            LOGGER.info('disconnect: %s', type_name(err))
> +            self.retry = True
> +        except ProtocolError as err:
> +            LOGGER.info('disconnect: %s', type_name(err))
> +            self.retry = False
>          except Exception as err:
> -            LOGGER.info('_kill_app: %s', str(err))
> -            # Let the app crash after providing a proper stack trace
> +            LOGGER.error('disconnect: Unhandled exception %s', str(err))
> +            self.retry = False
>              raise err
> -        raise urwid.ExitMainLoop()
> +        finally:
> +            self.disconnecting = False
>
>      def handle_event(self, event: Message) -> None:
>          # FIXME: Consider all states present in qapi/run-state.json
> @@ -161,14 +188,61 @@ def _get_formatted_address(self) -> str:
>              addr = f'{host}:{port}'
>          return addr
>
> -    async def connect_server(self) -> None:
> +    async def _retry_connection(self) -> Optional[str]:
> +        current_retries = 0
> +        err = None
> +        # Increase in power sequence of 2 if no delay is provided
> +        cur_delay = 1
> +        inc_delay = 2
> +        if self.retry_delay:
> +            inc_delay = 1
> +            cur_delay = self.retry_delay
> +        # initial try
> +        await self.connect_server()
> +        while self.retry and current_retries < self.num_retries:
> +            LOGGER.info('Connection Failed, retrying in %d', cur_delay)
> +            status = f'[Retry #{current_retries} ({cur_delay}s)]'
> +            self._set_status(status)
> +
> +            await asyncio.sleep(cur_delay)
> +
> +            err = await self.connect_server()
> +            cur_delay *= inc_delay
> +            # Cap delay to 5mins
> +            cur_delay = min(cur_delay, 5 * 60)
> +            current_retries += 1
> +        # If all retries failed report the last error
> +        LOGGER.info('All retries failed: %s', str(err))
> +        return type_name(err)
>

I had suggested something like an exponential backoff, but maybe a constant
delay would be a little cleaner to implement for right now without getting
too fancy over it. If you go with a simpler retry algorithm, do you think
you could clean up the logic in the retry loop here a bit more?

Something like:

for _ in range(num_retries):
    try:
        whatever_you_have_to_do_to_connect()
        return
    except ConnectError as err:
        LOGGER.info(...etc)
    await asyncio.sleep(whatever_the_delay_is)
# ran out of retries here, presumably the connection manager will just go
idle until the user interferes some other way.

In particular, I think passing around the name of the exception is a little
dubious -- we should be logging with the actual Exception we've received.


> +
> +    async def manage_connection(self) -> None:
> +        while True:
> +            if self.runstate == Runstate.IDLE:
> +                LOGGER.info('Trying to reconnect')
>

But will this be true upon the very first boot? This message might not be
right.


> +                err = await self._retry_connection()
>

This seems named oddly too, since it might be the initial attempt and not
necessarily a reconnection or a retry.


> +                # If retry is still true then, we have exhausted all our
> tries.
> +                if self.retry:
> +                    self._set_status(f'Error: {err}')
>
+                else:
> +                    addr = self._get_formatted_address()
> +                    self._set_status(f'[Connected {addr}]')
> +            elif self.runstate == Runstate.DISCONNECTING:
> +                self._set_status('[Disconnected]')
> +                await self.disconnect()
> +                # check if a retry is needed
>

Is this required? I would have hoped that after calling disconnect that the
state would have again changed to IDLE and you wouldn't need this clause
here.


> +                if self.runstate == Runstate.IDLE:
> +                    continue
> +            await self.runstate_changed()
> +
> +    async def connect_server(self) -> Optional[str]:
>          try:
>              await self.connect(self.address)
> -            addr = self._get_formatted_address()
> -            self._set_status(f'Connected to {addr}')
> +            self.retry = False
>          except ConnectError as err:
>              LOGGER.info('connect_server: ConnectError %s', str(err))
> -            self._set_status('Server shutdown')
> +            self.retry = True
> +            return type_name(err)
> +        return None
>
>      def run(self, debug: bool = False) -> None:
>          screen = urwid.raw_display.Screen()
> @@ -191,7 +265,7 @@ def run(self, debug: bool = False) -> None:
>                                     event_loop=event_loop)
>
>          create_task(self.wait_for_events(), self.aloop)
> -        create_task(self.connect_server(), self.aloop)
> +        create_task(self.manage_connection(), self.aloop)
>          try:
>              main_loop.run()
>          except Exception as err:
> @@ -333,6 +407,11 @@ def main() -> None:
>      parser = argparse.ArgumentParser(description='AQMP TUI')
>      parser.add_argument('qmp_server', help='Address of the QMP server'
>                          '< UNIX socket path | TCP addr:port >')
> +    parser.add_argument('--num-retries', type=int, default=10,
> +                        help='Number of times to reconnect before giving
> up')
> +    parser.add_argument('--retry-delay', type=int,
> +                        help='Time(s) to wait before next retry.'
> +                        'Default action is to increase delay in powers of
> 2')
>      parser.add_argument('--log-file', help='The Log file name')
>      parser.add_argument('--log-level', default='WARNING',
>                          help='Log level
> <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
> @@ -348,7 +427,7 @@ def main() -> None:
>      except QMPBadPortError as err:
>          parser.error(str(err))
>
> -    app = App(address)
> +    app = App(address, args.num_retries, args.retry_delay)
>
>      if args.log_file:
>          LOGGER.addHandler(logging.FileHandler(args.log_file))
> --
> 2.17.1
>
>
Right idea overall - possibly needs some polish and to be integrated with
an earlier patch to avoid the intermediate FIXMEs.

Thanks,
--js

Reply via email to