Khem Raj <raj.k...@gmail.com> writes:

> this version hangs the run forever.

Do you have a reproducer? In my "make test" case it works fine.

Alternatively you could gdb attach to the hanging python and use py-bt
to generate a python backtrace.

>
> On Mon, Dec 11, 2023 at 7:57 AM Alex Bennée <alex.ben...@linaro.org> wrote:
>>
>> If qemurunner doesn't continuously drain stdout we will eventually
>> cause QEMU to block while trying to write to the pipe. This can
>> manifest itself if the guest has for example configured its serial
>> ports to output via stdio even if the test itself is using a TCP
>> console or SSH to run things.
>>
>> To do this:
>>
>>   - always create a logging thread regardless of serial_ports
>>   - use a semaphore between main and logging threads
>>   - move the login detection into the logging thread
>>   - wait until the second acquire before continuing
>>
>> This doesn't address a potential overflow of stderr although generally
>> stderr from QEMU will be a lot less likely to block due to the volume
>> of data.
>>
>> Signed-off-by: Alex Bennée <alex.ben...@linaro.org>
>> Cc: Mikko Rapeli <mikko.rap...@linaro.org>
>> ---
>>  meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
>>  1 file changed, 78 insertions(+), 50 deletions(-)
>>
>> diff --git a/meta/lib/oeqa/utils/qemurunner.py 
>> b/meta/lib/oeqa/utils/qemurunner.py
>> index 29fe271976..b768c08f04 100644
>> --- a/meta/lib/oeqa/utils/qemurunner.py
>> +++ b/meta/lib/oeqa/utils/qemurunner.py
>> @@ -207,8 +207,7 @@ class QemuRunner:
>>          self.logger.info("QMP Available for connection at %s" % (qmp_port2))
>>
>>          try:
>> -            if self.serial_ports >= 2:
>> -                self.threadsock, threadport = self.create_socket()
>> +            self.threadsock, threadport = self.create_socket()
>>              self.server_socket, self.serverport = self.create_socket()
>>          except socket.error as msg:
>>              self.logger.error("Failed to create listening socket: %s" % 
>> msg[1])
>> @@ -243,6 +242,7 @@ class QemuRunner:
>>          # to be a proper fix but this will suffice for now.
>>          self.runqemu = subprocess.Popen(launch_cmd, shell=True, 
>> stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, 
>> preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
>>          output = self.runqemu.stdout
>> +        output_drain = output
>>          launch_time = time.time()
>>
>>          #
>> @@ -431,21 +431,30 @@ class QemuRunner:
>>          self.logger.debug("Target IP: %s" % self.ip)
>>          self.logger.debug("Server IP: %s" % self.server_ip)
>>
>> -        if self.serial_ports >= 2:
>> -            self.thread = LoggingThread(self.log, self.threadsock, 
>> self.logger)
>> -            self.thread.start()
>> -            if not self.thread.connection_established.wait(self.boottime):
>> -                self.logger.error("Didn't receive a console connection from 
>> qemu. "
>> -                             "Here is the qemu command line used:\n%s\nand "
>> -                             "output from runqemu:\n%s" % (cmdline, out))
>> -                self.stop_thread()
>> -                return False
>> +        # Create and hold onto the login semaphore, this will block
>> +        # the LoggingThread until we are ready
>> +        login_semaphore = threading.Semaphore()
>> +        login_semaphore.acquire()
>> +
>> +        self.thread = LoggingThread(self.log, self.threadsock,
>> +                                    self.runqemu.stdout, 
>> self.boot_patterns['search_reached_prompt'],
>> +                                    self.logger, login_semaphore)
>> +
>> +        self.thread.start()
>> +        login_semaphore.release()
>> +
>> +        # if not self.thread.connection_established.wait(self.boottime):
>> +        #     self.logger.error("Didn't receive a console connection from 
>> qemu. "
>> +        #                       "Here is the qemu command line 
>> used:\n%s\nand "
>> +        #                       "output from runqemu:\n%s" % (cmdline, out))
>> +        #     self.stop_thread()
>> +        #     return False
>>
>>          self.logger.debug("Output from runqemu:\n%s", out)
>>          self.logger.debug("Waiting at most %d seconds for login banner 
>> (%s)" %
>>                            (self.boottime, time.strftime("%D %H:%M:%S")))
>>          endtime = time.time() + self.boottime
>> -        filelist = [self.server_socket, self.runqemu.stdout]
>> +        filelist = [self.server_socket]
>>          reachedlogin = False
>>          stopread = False
>>          qemusock = None
>> @@ -464,46 +473,19 @@ class QemuRunner:
>>                      filelist.remove(self.server_socket)
>>                      self.logger.debug("Connection from %s:%s" % addr)
>>                  else:
>> -                    # try to avoid reading only a single character at a time
>> -                    time.sleep(0.1)
>> -                    if hasattr(file, 'read'):
>> -                        read = file.read(1024)
>> -                    elif hasattr(file, 'recv'):
>> -                        read = file.recv(1024)
>> -                    else:
>> -                        self.logger.error('Invalid file type: %s\n%s' % 
>> (file))
>> -                        read = b''
>> -
>> -                    self.logger.debug2('Partial boot log:\n%s' % 
>> (read.decode('utf-8', errors='backslashreplace')))
>> -                    data = data + read
>> -                    if data:
>> -                        bootlog += data
>> -                        self.log(data, extension = ".2")
>> -                        data = b''
>> -
>> -                        if 
>> bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog:
>> -                            self.server_socket.close()
>> -                            self.server_socket = qemusock
>> -                            stopread = True
>> -                            reachedlogin = True
>> -                            self.logger.debug("Reached login banner in %.2f 
>> seconds (%s)" %
>> -                                              (time.time() - (endtime - 
>> self.boottime),
>> -                                              time.strftime("%D %H:%M:%S")))
>> -                    else:
>> -                        # no need to check if reachedlogin unless we 
>> support multiple connections
>> -                        self.logger.debug("QEMU socket disconnected before 
>> login banner reached. (%s)" %
>> -                                          time.strftime("%D %H:%M:%S"))
>> -                        filelist.remove(file)
>> -                        file.close()
>> +                    if login_semaphore.acquire(blocking=False):
>> +                        self.server_socket.close()
>> +                        self.server_socket = qemusock
>>                          stopread = True
>> +                        reachedlogin = True
>> +
>> +        self.logger.debug("continuing on....")
>>
>>          if not reachedlogin:
>>              if time.time() >= endtime:
>>                  self.logger.warning("Target didn't reach login banner in %d 
>> seconds (%s)" %
>>                                    (self.boottime, time.strftime("%D 
>> %H:%M:%S")))
>>              tail = lambda l: "\n".join(l.splitlines()[-25:])
>> -            bootlog = self.decode_qemulog(bootlog)
>> -            self.logger.warning("Last 25 lines of login console (%d):\n%s" 
>> % (len(bootlog), tail(bootlog)))
>>              self.logger.warning("Last 25 lines of all logging (%d):\n%s" % 
>> (len(self.msg), tail(self.msg)))
>>              self.logger.warning("Check full boot log: %s" % self.logfile)
>>              self.stop()
>> @@ -539,6 +521,7 @@ class QemuRunner:
>>                  self.logger.warning("The output:\n%s" % output)
>>          except:
>>              self.logger.warning("Serial console failed while trying to 
>> login")
>> +
>>          return True
>>
>>      def stop(self):
>> @@ -696,14 +679,16 @@ class QemuRunner:
>>                      status = 1
>>          return (status, str(data))
>>
>> -# This class is for reading data from a socket and passing it to logfunc
>> -# to be processed. It's completely event driven and has a straightforward
>> -# event loop. The mechanism for stopping the thread is a simple pipe which
>> -# will wake up the poll and allow for tearing everything down.
>> +# This class is for reading data from sockets and QEMU's stdio output
>> +# and passing it to logfunc to be processed. It's completely event
>> +# driven and has a straightforward event loop. The mechanism for
>> +# stopping the thread is a simple pipe which will wake up the poll and
>> +# allow for tearing everything down.
>>  class LoggingThread(threading.Thread):
>> -    def __init__(self, logfunc, sock, logger):
>> +    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, 
>> semaphore):
>>          self.connection_established = threading.Event()
>>          self.serversock = sock
>> +        self.qemu_stdio = qemu_stdio
>>          self.logfunc = logfunc
>>          self.logger = logger
>>          self.readsock = None
>> @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
>>          self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
>>          self.readevents = select.POLLIN | select.POLLPRI
>>
>> +        # tracking until we see prompt
>> +        self.prompt = prompt
>> +        self.prompt_seen = False
>> +        self.boot_log = b''
>> +        self.semaphore = semaphore
>> +        self.boottime = time.time()
>> +
>>          threading.Thread.__init__(self, target=self.threadtarget)
>>
>>      def threadtarget(self):
>> +        # we acquire until we see the boot prompt
>> +        self.semaphore.acquire()
>> +
>>          try:
>>              self.eventloop()
>>          finally:
>> @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
>>          event_read_mask = self.errorevents | self.readevents
>>          poll.register(self.serversock.fileno())
>>          poll.register(self.readpipe, event_read_mask)
>> +        poll.register(self.qemu_stdio, event_read_mask)
>>
>>          breakout = False
>>          self.running = True
>> @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
>>          while not breakout:
>>              events = poll.poll()
>>              for event in events:
>> +
>> +                self.logger.debug(f"event is {event}")
>> +
>>                  # An error occurred, bail out
>>                  if event[1] & self.errorevents:
>> +                    self.logger.debug("error event")
>>                      raise Exception(self.stringify_event(event[1]))
>>
>>                  # Event to stop the thread
>> @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
>>                      breakout = True
>>                      break
>>
>> +                # stdio data to be logged
>> +                elif self.qemu_stdio.fileno() == event[0]:
>> +                    self.consume_qemu_stdio()
>> +
>>                  # A connection request was received
>>                  elif self.serversock.fileno() == event[0]:
>>                      self.logger.debug("Connection request received")
>> @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
>>                      data = self.recv(1024)
>>                      self.logfunc(data)
>>
>> +
>> +    # Consume QEMU's stdio output, checking for login strings
>> +    def consume_qemu_stdio(self):
>> +        # try to avoid reading only a single character at a time
>> +        time.sleep(0.1)
>> +        read = self.qemu_stdio.read(1024)
>> +
>> +        # log what we have seen
>> +        decoded_data = read.decode('utf-8', errors='backslashreplace')
>> +        if self.prompt_seen:
>> +            self.logger.debug2('Post login log:\n%s' % decoded_data)
>> +        else:
>> +            self.logger.debug2('Pre login log:\n%s' % decoded_data)
>> +
>> +        if not self.prompt_seen and read:
>> +            self.boot_log += read
>> +
>> +            if bytes(self.prompt, 'utf-8') in self.boot_log:
>> +                time_to_login = time.time() - self.boottime
>> +                self.logger.debug("Reached login banner in %.2f seconds 
>> (%s)" %
>> +                                  (time_to_login,  time.strftime("%D 
>> %H:%M:%S")))
>> +                self.semaphore.release()
>> +                self.prompt_seen = True
>> +
>>      # Since the socket is non-blocking make sure to honor EAGAIN
>>      # and EWOULDBLOCK.
>>      def recv(self, count):
>> --
>> 2.39.2
>>
>>
>> 
>>

-- 
Alex Bennée
Virtualisation Tech Lead @ Linaro
-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#192243): 
https://lists.openembedded.org/g/openembedded-core/message/192243
Mute This Topic: https://lists.openembedded.org/mt/103111135/21656
Group Owner: openembedded-core+ow...@lists.openembedded.org
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[arch...@mail-archive.com]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to