If qemurunner doesn't continuously drain stdout we will eventually
cause QEMU to block while trying to write to the pipe. This can
manifest itself if the guest has for example configured its serial
ports to output via stdio even if the test itself is using a TCP
console or SSH to run things.

To do this:

  - always create a logging thread regardless of serial_ports
  - use a semaphore between main and logging threads
  - move the login detection into the logging thread
  - wait until the second acquire before continuing

This doesn't address a potential overflow of stderr although generally
stderr from QEMU will be a lot less likely to block due to the volume
of data.

Signed-off-by: Alex Bennée <alex.ben...@linaro.org>
Cc: Mikko Rapeli <mikko.rap...@linaro.org>
---
 meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------
 1 file changed, 78 insertions(+), 50 deletions(-)

diff --git a/meta/lib/oeqa/utils/qemurunner.py 
b/meta/lib/oeqa/utils/qemurunner.py
index 29fe271976..b768c08f04 100644
--- a/meta/lib/oeqa/utils/qemurunner.py
+++ b/meta/lib/oeqa/utils/qemurunner.py
@@ -207,8 +207,7 @@ class QemuRunner:
         self.logger.info("QMP Available for connection at %s" % (qmp_port2))
 
         try:
-            if self.serial_ports >= 2:
-                self.threadsock, threadport = self.create_socket()
+            self.threadsock, threadport = self.create_socket()
             self.server_socket, self.serverport = self.create_socket()
         except socket.error as msg:
             self.logger.error("Failed to create listening socket: %s" % msg[1])
@@ -243,6 +242,7 @@ class QemuRunner:
         # to be a proper fix but this will suffice for now.
         self.runqemu = subprocess.Popen(launch_cmd, shell=True, 
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, 
preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir)
         output = self.runqemu.stdout
+        output_drain = output
         launch_time = time.time()
 
         #
@@ -431,21 +431,30 @@ class QemuRunner:
         self.logger.debug("Target IP: %s" % self.ip)
         self.logger.debug("Server IP: %s" % self.server_ip)
 
-        if self.serial_ports >= 2:
-            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
-            self.thread.start()
-            if not self.thread.connection_established.wait(self.boottime):
-                self.logger.error("Didn't receive a console connection from 
qemu. "
-                             "Here is the qemu command line used:\n%s\nand "
-                             "output from runqemu:\n%s" % (cmdline, out))
-                self.stop_thread()
-                return False
+        # Create and hold onto the login semaphore, this will block
+        # the LoggingThread until we are ready
+        login_semaphore = threading.Semaphore()
+        login_semaphore.acquire()
+
+        self.thread = LoggingThread(self.log, self.threadsock,
+                                    self.runqemu.stdout, 
self.boot_patterns['search_reached_prompt'],
+                                    self.logger, login_semaphore)
+
+        self.thread.start()
+        login_semaphore.release()
+
+        # if not self.thread.connection_established.wait(self.boottime):
+        #     self.logger.error("Didn't receive a console connection from 
qemu. "
+        #                       "Here is the qemu command line used:\n%s\nand "
+        #                       "output from runqemu:\n%s" % (cmdline, out))
+        #     self.stop_thread()
+        #     return False
 
         self.logger.debug("Output from runqemu:\n%s", out)
         self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
                           (self.boottime, time.strftime("%D %H:%M:%S")))
         endtime = time.time() + self.boottime
-        filelist = [self.server_socket, self.runqemu.stdout]
+        filelist = [self.server_socket]
         reachedlogin = False
         stopread = False
         qemusock = None
@@ -464,46 +473,19 @@ class QemuRunner:
                     filelist.remove(self.server_socket)
                     self.logger.debug("Connection from %s:%s" % addr)
                 else:
-                    # try to avoid reading only a single character at a time
-                    time.sleep(0.1)
-                    if hasattr(file, 'read'):
-                        read = file.read(1024)
-                    elif hasattr(file, 'recv'):
-                        read = file.recv(1024)
-                    else:
-                        self.logger.error('Invalid file type: %s\n%s' % (file))
-                        read = b''
-
-                    self.logger.debug2('Partial boot log:\n%s' % 
(read.decode('utf-8', errors='backslashreplace')))
-                    data = data + read
-                    if data:
-                        bootlog += data
-                        self.log(data, extension = ".2")
-                        data = b''
-
-                        if bytes(self.boot_patterns['search_reached_prompt'], 
'utf-8') in bootlog:
-                            self.server_socket.close()
-                            self.server_socket = qemusock
-                            stopread = True
-                            reachedlogin = True
-                            self.logger.debug("Reached login banner in %.2f 
seconds (%s)" %
-                                              (time.time() - (endtime - 
self.boottime),
-                                              time.strftime("%D %H:%M:%S")))
-                    else:
-                        # no need to check if reachedlogin unless we support 
multiple connections
-                        self.logger.debug("QEMU socket disconnected before 
login banner reached. (%s)" %
-                                          time.strftime("%D %H:%M:%S"))
-                        filelist.remove(file)
-                        file.close()
+                    if login_semaphore.acquire(blocking=False):
+                        self.server_socket.close()
+                        self.server_socket = qemusock
                         stopread = True
+                        reachedlogin = True
+
+        self.logger.debug("continuing on....")
 
         if not reachedlogin:
             if time.time() >= endtime:
                 self.logger.warning("Target didn't reach login banner in %d 
seconds (%s)" %
                                   (self.boottime, time.strftime("%D 
%H:%M:%S")))
             tail = lambda l: "\n".join(l.splitlines()[-25:])
-            bootlog = self.decode_qemulog(bootlog)
-            self.logger.warning("Last 25 lines of login console (%d):\n%s" % 
(len(bootlog), tail(bootlog)))
             self.logger.warning("Last 25 lines of all logging (%d):\n%s" % 
(len(self.msg), tail(self.msg)))
             self.logger.warning("Check full boot log: %s" % self.logfile)
             self.stop()
@@ -539,6 +521,7 @@ class QemuRunner:
                 self.logger.warning("The output:\n%s" % output)
         except:
             self.logger.warning("Serial console failed while trying to login")
+
         return True
 
     def stop(self):
@@ -696,14 +679,16 @@ class QemuRunner:
                     status = 1
         return (status, str(data))
 
-# This class is for reading data from a socket and passing it to logfunc
-# to be processed. It's completely event driven and has a straightforward
-# event loop. The mechanism for stopping the thread is a simple pipe which
-# will wake up the poll and allow for tearing everything down.
+# This class is for reading data from sockets and QEMU's stdio output
+# and passing it to logfunc to be processed. It's completely event
+# driven and has a straightforward event loop. The mechanism for
+# stopping the thread is a simple pipe which will wake up the poll and
+# allow for tearing everything down.
 class LoggingThread(threading.Thread):
-    def __init__(self, logfunc, sock, logger):
+    def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore):
         self.connection_established = threading.Event()
         self.serversock = sock
+        self.qemu_stdio = qemu_stdio
         self.logfunc = logfunc
         self.logger = logger
         self.readsock = None
@@ -713,9 +698,19 @@ class LoggingThread(threading.Thread):
         self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
         self.readevents = select.POLLIN | select.POLLPRI
 
+        # tracking until we see prompt
+        self.prompt = prompt
+        self.prompt_seen = False
+        self.boot_log = b''
+        self.semaphore = semaphore
+        self.boottime = time.time()
+
         threading.Thread.__init__(self, target=self.threadtarget)
 
     def threadtarget(self):
+        # we acquire until we see the boot prompt
+        self.semaphore.acquire()
+
         try:
             self.eventloop()
         finally:
@@ -750,6 +745,7 @@ class LoggingThread(threading.Thread):
         event_read_mask = self.errorevents | self.readevents
         poll.register(self.serversock.fileno())
         poll.register(self.readpipe, event_read_mask)
+        poll.register(self.qemu_stdio, event_read_mask)
 
         breakout = False
         self.running = True
@@ -757,8 +753,12 @@ class LoggingThread(threading.Thread):
         while not breakout:
             events = poll.poll()
             for event in events:
+
+                self.logger.debug(f"event is {event}")
+
                 # An error occurred, bail out
                 if event[1] & self.errorevents:
+                    self.logger.debug("error event")
                     raise Exception(self.stringify_event(event[1]))
 
                 # Event to stop the thread
@@ -767,6 +767,10 @@ class LoggingThread(threading.Thread):
                     breakout = True
                     break
 
+                # stdio data to be logged
+                elif self.qemu_stdio.fileno() == event[0]:
+                    self.consume_qemu_stdio()
+
                 # A connection request was received
                 elif self.serversock.fileno() == event[0]:
                     self.logger.debug("Connection request received")
@@ -783,6 +787,30 @@ class LoggingThread(threading.Thread):
                     data = self.recv(1024)
                     self.logfunc(data)
 
+
+    # Consume QEMU's stdio output, checking for login strings
+    def consume_qemu_stdio(self):
+        # try to avoid reading only a single character at a time
+        time.sleep(0.1)
+        read = self.qemu_stdio.read(1024)
+
+        # log what we have seen
+        decoded_data = read.decode('utf-8', errors='backslashreplace')
+        if self.prompt_seen:
+            self.logger.debug2('Post login log:\n%s' % decoded_data)
+        else:
+            self.logger.debug2('Pre login log:\n%s' % decoded_data)
+
+        if not self.prompt_seen and read:
+            self.boot_log += read
+
+            if bytes(self.prompt, 'utf-8') in self.boot_log:
+                time_to_login = time.time() - self.boottime
+                self.logger.debug("Reached login banner in %.2f seconds (%s)" %
+                                  (time_to_login,  time.strftime("%D 
%H:%M:%S")))
+                self.semaphore.release()
+                self.prompt_seen = True
+
     # Since the socket is non-blocking make sure to honor EAGAIN
     # and EWOULDBLOCK.
     def recv(self, count):
-- 
2.39.2

-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#192177): 
https://lists.openembedded.org/g/openembedded-core/message/192177
Mute This Topic: https://lists.openembedded.org/mt/103111135/21656
Group Owner: openembedded-core+ow...@lists.openembedded.org
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[arch...@mail-archive.com]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to