On Mon, 2023-12-11 at 15:57 +0000, Alex Bennée wrote:
> If qemurunner doesn't continuously drain stdout we will eventually
> cause QEMU to block while trying to write to the pipe. This can
> manifest itself if the guest has for example configured its serial
> ports to output via stdio even if the test itself is using a TCP
> console or SSH to run things.
> 
> To do this:
> 
>   - always create a logging thread regardless of serial_ports
>   - use a semaphore between main and logging threads
>   - move the login detection into the logging thread
>   - wait until the second acquire before continuing
> 
> This doesn't address a potential overflow of stderr although generally
> stderr from QEMU will be a lot less likely to block due to the volume
> of data.
> 
> Signed-off-by: Alex Bennée <alex.ben...@linaro.org>
> Cc: Mikko Rapeli <mikko.rap...@linaro.org>
> ---
>  meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
>  1 file changed, 78 insertions(+), 50 deletions(-)
> 
> diff --git a/meta/lib/oeqa/utils/qemurunner.py 
> b/meta/lib/oeqa/utils/qemurunner.py
> index 29fe271976..b768c08f04 100644
> --- a/meta/lib/oeqa/utils/qemurunner.py
> +++ b/meta/lib/oeqa/utils/qemurunner.py
> @@ -207,8 +207,7 @@ class QemuRunner:
>          self.logger.info("QMP Available for connection at %s" % (qmp_port2))
>  
>          try:
> -            if self.serial_ports >= 2:
> -                self.threadsock, threadport = self.create_socket()
> +            self.threadsock, threadport = self.create_socket()
>              self.server_socket, self.serverport = self.create_socket()
>          except socket.error as msg:
>              self.logger.error("Failed to create listening socket: %s" % 
> msg[1])
> @@ -243,6 +242,7 @@ class QemuRunner:
>          # to be a proper fix but this will suffice for now.
>          self.runqemu = subprocess.Popen(launch_cmd, shell=True, 
> stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, 
> preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
>          output = self.runqemu.stdout
> +        output_drain = output
>          launch_time = time.time()
>  
>          #
> @@ -431,21 +431,30 @@ class QemuRunner:
>          self.logger.debug("Target IP: %s" % self.ip)
>          self.logger.debug("Server IP: %s" % self.server_ip)
>  
> -        if self.serial_ports >= 2:
> -            self.thread = LoggingThread(self.log, self.threadsock, 
> self.logger)
> -            self.thread.start()
> -            if not self.thread.connection_established.wait(self.boottime):
> -                self.logger.error("Didn't receive a console connection from 
> qemu. "
> -                             "Here is the qemu command line used:\n%s\nand "
> -                             "output from runqemu:\n%s" % (cmdline, out))
> -                self.stop_thread()
> -                return False
> +        # Create and hold onto the login semaphore, this will block
> +        # the LoggingThread until we are ready
> +        login_semaphore = threading.Semaphore()
> +        login_semaphore.acquire()
> +
> +        self.thread = LoggingThread(self.log, self.threadsock,
> +                                    self.runqemu.stdout, 
> self.boot_patterns['search_reached_prompt'],
> +                                    self.logger, login_semaphore)
> +
> +        self.thread.start()
> +        login_semaphore.release()
> +
> +        # if not self.thread.connection_established.wait(self.boottime):
> +        #     self.logger.error("Didn't receive a console connection from 
> qemu. "
> +        #                       "Here is the qemu command line 
> used:\n%s\nand "
> +        #                       "output from runqemu:\n%s" % (cmdline, out))
> +        #     self.stop_thread()
> +        #     return False
>  
>          self.logger.debug("Output from runqemu:\n%s", out)
>          self.logger.debug("Waiting at most %d seconds for login banner (%s)" 
> %
>                            (self.boottime, time.strftime("%D %H:%M:%S")))
>          endtime = time.time() + self.boottime
> -        filelist = [self.server_socket, self.runqemu.stdout]
> +        filelist = [self.server_socket]
>          reachedlogin = False
>          stopread = False
>          qemusock = None
> @@ -464,46 +473,19 @@ class QemuRunner:
>                      filelist.remove(self.server_socket)
>                      self.logger.debug("Connection from %s:%s" % addr)
>                  else:
> -                    # try to avoid reading only a single character at a time
> -                    time.sleep(0.1)
> -                    if hasattr(file, 'read'):
> -                        read = file.read(1024)
> -                    elif hasattr(file, 'recv'):
> -                        read = file.recv(1024)
> -                    else:
> -                        self.logger.error('Invalid file type: %s\n%s' % 
> (file))
> -                        read = b''
> -
> -                    self.logger.debug2('Partial boot log:\n%s' % 
> (read.decode('utf-8', errors='backslashreplace')))
> -                    data = data + read
> -                    if data:
> -                        bootlog += data
> -                        self.log(data, extension = ".2")
> -                        data = b''
> -
> -                        if 
> bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
> -                            self.server_socket.close()
> -                            self.server_socket = qemusock
> -                            stopread = True
> -                            reachedlogin = True
> -                            self.logger.debug("Reached login banner in %.2f 
> seconds (%s)" %
> -                                              (time.time() - (endtime - 
> self.boottime),
> -                                              time.strftime("%D %H:%M:%S")))
> -                    else:
> -                        # no need to check if reachedlogin unless we support 
> multiple connections
> -                        self.logger.debug("QEMU socket disconnected before 
> login banner reached. (%s)" %
> -                                          time.strftime("%D %H:%M:%S"))
> -                        filelist.remove(file)
> -                        file.close()
> +                    if login_semaphore.acquire(blocking=False):
> +                        self.server_socket.close()
> +                        self.server_socket = qemusock
>                          stopread = True
> +                        reachedlogin = True
> +
> +        self.logger.debug("continuing on....")
>  
>          if not reachedlogin:
>              if time.time() >= endtime:
>                  self.logger.warning("Target didn't reach login banner in %d 
> seconds (%s)" %
>                                    (self.boottime, time.strftime("%D 
> %H:%M:%S")))
>              tail = lambda l: "\n".join(l.splitlines()[-25:])
> -            bootlog = self.decode_qemulog(bootlog)
> -            self.logger.warning("Last 25 lines of login console (%d):\n%s" % 
> (len(bootlog), tail(bootlog)))
>              self.logger.warning("Last 25 lines of all logging (%d):\n%s" % 
> (len(self.msg), tail(self.msg)))
>              self.logger.warning("Check full boot log: %s" % self.logfile)
>              self.stop()
> @@ -539,6 +521,7 @@ class QemuRunner:
>                  self.logger.warning("The output:\n%s" % output)
>          except:
>              self.logger.warning("Serial console failed while trying to 
> login")
> +
>          return True
>  
>      def stop(self):
> @@ -696,14 +679,16 @@ class QemuRunner:
>                      status = 1
>          return (status, str(data))
>  
> -# This class is for reading data from a socket and passing it to logfunc
> -# to be processed. It's completely event driven and has a straightforward
> -# event loop. The mechanism for stopping the thread is a simple pipe which
> -# will wake up the poll and allow for tearing everything down.
> +# This class is for reading data from sockets and QEMU's stdio output
> +# and passing it to logfunc to be processed. It's completely event
> +# driven and has a straightforward event loop. The mechanism for
> +# stopping the thread is a simple pipe which will wake up the poll and
> +# allow for tearing everything down.
>  class LoggingThread(threading.Thread):
> -    def __init__(self, logfunc, sock, logger):
> +    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
>          self.connection_established = threading.Event()
>          self.serversock = sock
> +        self.qemu_stdio = qemu_stdio
>          self.logfunc = logfunc
>          self.logger = logger
>          self.readsock = None
> @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
>          self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
>          self.readevents = select.POLLIN | select.POLLPRI
>  
> +        # tracking until we see prompt
> +        self.prompt = prompt
> +        self.prompt_seen = False
> +        self.boot_log = b''
> +        self.semaphore = semaphore
> +        self.boottime = time.time()
> +
>          threading.Thread.__init__(self, target=self.threadtarget)
>  
>      def threadtarget(self):
> +        # we acquire until we see the boot prompt
> +        self.semaphore.acquire()
> +
>          try:
>              self.eventloop()
>          finally:
> @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
>          event_read_mask = self.errorevents | self.readevents
>          poll.register(self.serversock.fileno())
>          poll.register(self.readpipe, event_read_mask)
> +        poll.register(self.qemu_stdio, event_read_mask)
>  
>          breakout = False
>          self.running = True
> @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
>          while not breakout:
>              events = poll.poll()
>              for event in events:
> +
> +                self.logger.debug(f"event is {event}")
> +
>                  # An error occurred, bail out
>                  if event[1] & self.errorevents:
> +                    self.logger.debug("error event")
>                      raise Exception(self.stringify_event(event[1]))
>  
>                  # Event to stop the thread
> @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
>                      breakout = True
>                      break
>  
> +                # stdio data to be logged
> +                elif self.qemu_stdio.fileno() == event[0]:
> +                    self.consume_qemu_stdio()
> +
>                  # A connection request was received
>                  elif self.serversock.fileno() == event[0]:
>                      self.logger.debug("Connection request received")
> @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
>                      data = self.recv(1024)
>                      self.logfunc(data)
>  
> +
> +    # Consume QEMU's stdio output, checking for login strings
> +    def consume_qemu_stdio(self):
> +        # try to avoid reading only a single character at a time
> +        time.sleep(0.1)
> +        read = self.qemu_stdio.read(1024)
> +
> +        # log what we have seen
> +        decoded_data = read.decode('utf-8', errors='backslashreplace')
> +        if self.prompt_seen:
> +            self.logger.debug2('Post login log:\n%s' % decoded_data)
> +        else:
> +            self.logger.debug2('Pre login log:\n%s' % decoded_data)
> +
> +        if not self.prompt_seen and read:
> +            self.boot_log += read
> +
> +            if bytes(self.prompt, 'utf-8') in self.boot_log:
> +                time_to_login = time.time() - self.boottime
> +                self.logger.debug("Reached login banner in %.2f seconds 
> (%s)" %
> +                                  (time_to_login,  time.strftime("%D 
> %H:%M:%S")))
> +                self.semaphore.release()
> +                self.prompt_seen = True
> +
>      # Since the socket is non-blocking make sure to honor EAGAIN
>      # and EWOULDBLOCK.
>      def recv(self, count):


FWIW I tried testing this locally and "bitbake core-image-full-cmdline
-c testimage" just hangs, it can't work out the login prompt.

This code has been the source of a lot of races issues and intermittent
failures which we only "just" got to the bottom of and introducing a
load more isn't something I'm feeling great about. I'll try and find
some time to see if I can work out any better approach.

Cheers,

Richard



-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#192632): 
https://lists.openembedded.org/g/openembedded-core/message/192632
Mute This Topic: https://lists.openembedded.org/mt/103111135/21656
Group Owner: openembedded-core+ow...@lists.openembedded.org
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[arch...@mail-archive.com]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to