On Mon, 2023-12-11 at 15:57 +0000, Alex Bennée wrote: > If qemurunner doesn't continuously drain stdout we will eventually > cause QEMU to block while trying to write to the pipe. This can > manifest itself if the guest has for example configured its serial > ports to output via stdio even if the test itself is using a TCP > console or SSH to run things. > > To do this: > > - always create a logging thread regardless of serial_ports > - use a semaphore between main and logging threads > - move the login detection into the logging thread > - wait until the second acquire before continuing > > This doesn't address a potential overflow of stderr although generally > stderr from QEMU will be a lot less likely to block due to the volume > of data. > > Signed-off-by: Alex Bennée <alex.ben...@linaro.org> > Cc: Mikko Rapeli <mikko.rap...@linaro.org> > --- > meta/lib/oeqa/utils/qemurunner.py | 128 ++++++++++++++++++------------ > 1 file changed, 78 insertions(+), 50 deletions(-) > > diff --git a/meta/lib/oeqa/utils/qemurunner.py > b/meta/lib/oeqa/utils/qemurunner.py > index 29fe271976..b768c08f04 100644 > --- a/meta/lib/oeqa/utils/qemurunner.py > +++ b/meta/lib/oeqa/utils/qemurunner.py > @@ -207,8 +207,7 @@ class QemuRunner: > self.logger.info("QMP Available for connection at %s" % (qmp_port2)) > > try: > - if self.serial_ports >= 2: > - self.threadsock, threadport = self.create_socket() > + self.threadsock, threadport = self.create_socket() > self.server_socket, self.serverport = self.create_socket() > except socket.error as msg: > self.logger.error("Failed to create listening socket: %s" % > msg[1]) > @@ -243,6 +242,7 @@ class QemuRunner: > # to be a proper fix but this will suffice for now. > self.runqemu = subprocess.Popen(launch_cmd, shell=True, > stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, > preexec_fn=os.setpgrp, env=env, cwd=self.tmpdir) > output = self.runqemu.stdout > + output_drain = output > launch_time = time.time() > > # > @@ -431,21 +431,30 @@ class QemuRunner: > self.logger.debug("Target IP: %s" % self.ip) > self.logger.debug("Server IP: %s" % self.server_ip) > > - if self.serial_ports >= 2: > - self.thread = LoggingThread(self.log, self.threadsock, > self.logger) > - self.thread.start() > - if not self.thread.connection_established.wait(self.boottime): > - self.logger.error("Didn't receive a console connection from > qemu. " > - "Here is the qemu command line used:\n%s\nand " > - "output from runqemu:\n%s" % (cmdline, out)) > - self.stop_thread() > - return False > + # Create and hold onto the login semaphore, this will block > + # the LoggingThread until we are ready > + login_semaphore = threading.Semaphore() > + login_semaphore.acquire() > + > + self.thread = LoggingThread(self.log, self.threadsock, > + self.runqemu.stdout, > self.boot_patterns['search_reached_prompt'], > + self.logger, login_semaphore) > + > + self.thread.start() > + login_semaphore.release() > + > + # if not self.thread.connection_established.wait(self.boottime): > + # self.logger.error("Didn't receive a console connection from > qemu. " > + # "Here is the qemu command line > used:\n%s\nand " > + # "output from runqemu:\n%s" % (cmdline, out)) > + # self.stop_thread() > + # return False > > self.logger.debug("Output from runqemu:\n%s", out) > self.logger.debug("Waiting at most %d seconds for login banner (%s)" > % > (self.boottime, time.strftime("%D %H:%M:%S"))) > endtime = time.time() + self.boottime > - filelist = [self.server_socket, self.runqemu.stdout] > + filelist = [self.server_socket] > reachedlogin = False > stopread = False > qemusock = None > @@ -464,46 +473,19 @@ class QemuRunner: > filelist.remove(self.server_socket) > self.logger.debug("Connection from %s:%s" % addr) > else: > - # try to avoid reading only a single character at a time > - time.sleep(0.1) > - if hasattr(file, 'read'): > - read = file.read(1024) > - elif hasattr(file, 'recv'): > - read = file.recv(1024) > - else: > - self.logger.error('Invalid file type: %s\n%s' % > (file)) > - read = b'' > - > - self.logger.debug2('Partial boot log:\n%s' % > (read.decode('utf-8', errors='backslashreplace'))) > - data = data + read > - if data: > - bootlog += data > - self.log(data, extension = ".2") > - data = b'' > - > - if > bytes(self.boot_patterns['search_reached_prompt'], 'utf-8') in bootlog: > - self.server_socket.close() > - self.server_socket = qemusock > - stopread = True > - reachedlogin = True > - self.logger.debug("Reached login banner in %.2f > seconds (%s)" % > - (time.time() - (endtime - > self.boottime), > - time.strftime("%D %H:%M:%S"))) > - else: > - # no need to check if reachedlogin unless we support > multiple connections > - self.logger.debug("QEMU socket disconnected before > login banner reached. (%s)" % > - time.strftime("%D %H:%M:%S")) > - filelist.remove(file) > - file.close() > + if login_semaphore.acquire(blocking=False): > + self.server_socket.close() > + self.server_socket = qemusock > stopread = True > + reachedlogin = True > + > + self.logger.debug("continuing on....") > > if not reachedlogin: > if time.time() >= endtime: > self.logger.warning("Target didn't reach login banner in %d > seconds (%s)" % > (self.boottime, time.strftime("%D > %H:%M:%S"))) > tail = lambda l: "\n".join(l.splitlines()[-25:]) > - bootlog = self.decode_qemulog(bootlog) > - self.logger.warning("Last 25 lines of login console (%d):\n%s" % > (len(bootlog), tail(bootlog))) > self.logger.warning("Last 25 lines of all logging (%d):\n%s" % > (len(self.msg), tail(self.msg))) > self.logger.warning("Check full boot log: %s" % self.logfile) > self.stop() > @@ -539,6 +521,7 @@ class QemuRunner: > self.logger.warning("The output:\n%s" % output) > except: > self.logger.warning("Serial console failed while trying to > login") > + > return True > > def stop(self): > @@ -696,14 +679,16 @@ class QemuRunner: > status = 1 > return (status, str(data)) > > -# This class is for reading data from a socket and passing it to logfunc > -# to be processed. It's completely event driven and has a straightforward > -# event loop. The mechanism for stopping the thread is a simple pipe which > -# will wake up the poll and allow for tearing everything down. > +# This class is for reading data from sockets and QEMU's stdio output > +# and passing it to logfunc to be processed. It's completely event > +# driven and has a straightforward event loop. The mechanism for > +# stopping the thread is a simple pipe which will wake up the poll and > +# allow for tearing everything down. > class LoggingThread(threading.Thread): > - def __init__(self, logfunc, sock, logger): > + def __init__(self, logfunc, sock, qemu_stdio, prompt, logger, semaphore): > self.connection_established = threading.Event() > self.serversock = sock > + self.qemu_stdio = qemu_stdio > self.logfunc = logfunc > self.logger = logger > self.readsock = None > @@ -713,9 +698,19 @@ class LoggingThread(threading.Thread): > self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL > self.readevents = select.POLLIN | select.POLLPRI > > + # tracking until we see prompt > + self.prompt = prompt > + self.prompt_seen = False > + self.boot_log = b'' > + self.semaphore = semaphore > + self.boottime = time.time() > + > threading.Thread.__init__(self, target=self.threadtarget) > > def threadtarget(self): > + # we acquire until we see the boot prompt > + self.semaphore.acquire() > + > try: > self.eventloop() > finally: > @@ -750,6 +745,7 @@ class LoggingThread(threading.Thread): > event_read_mask = self.errorevents | self.readevents > poll.register(self.serversock.fileno()) > poll.register(self.readpipe, event_read_mask) > + poll.register(self.qemu_stdio, event_read_mask) > > breakout = False > self.running = True > @@ -757,8 +753,12 @@ class LoggingThread(threading.Thread): > while not breakout: > events = poll.poll() > for event in events: > + > + self.logger.debug(f"event is {event}") > + > # An error occurred, bail out > if event[1] & self.errorevents: > + self.logger.debug("error event") > raise Exception(self.stringify_event(event[1])) > > # Event to stop the thread > @@ -767,6 +767,10 @@ class LoggingThread(threading.Thread): > breakout = True > break > > + # stdio data to be logged > + elif self.qemu_stdio.fileno() == event[0]: > + self.consume_qemu_stdio() > + > # A connection request was received > elif self.serversock.fileno() == event[0]: > self.logger.debug("Connection request received") > @@ -783,6 +787,30 @@ class LoggingThread(threading.Thread): > data = self.recv(1024) > self.logfunc(data) > > + > + # Consume QEMU's stdio output, checking for login strings > + def consume_qemu_stdio(self): > + # try to avoid reading only a single character at a time > + time.sleep(0.1) > + read = self.qemu_stdio.read(1024) > + > + # log what we have seen > + decoded_data = read.decode('utf-8', errors='backslashreplace') > + if self.prompt_seen: > + self.logger.debug2('Post login log:\n%s' % decoded_data) > + else: > + self.logger.debug2('Pre login log:\n%s' % decoded_data) > + > + if not self.prompt_seen and read: > + self.boot_log += read > + > + if bytes(self.prompt, 'utf-8') in self.boot_log: > + time_to_login = time.time() - self.boottime > + self.logger.debug("Reached login banner in %.2f seconds > (%s)" % > + (time_to_login, time.strftime("%D > %H:%M:%S"))) > + self.semaphore.release() > + self.prompt_seen = True > + > # Since the socket is non-blocking make sure to honor EAGAIN > # and EWOULDBLOCK. > def recv(self, count):
FWIW I tried testing this locally and "bitbake core-image-full-cmdline -c testimage" just hangs, it can't work out the login prompt. This code has been the source of a lot of races issues and intermittent failures which we only "just" got to the bottom of and introducing a load more isn't something I'm feeling great about. I'll try and find some time to see if I can work out any better approach. Cheers, Richard
-=-=-=-=-=-=-=-=-=-=-=- Links: You receive all messages sent to this group. View/Reply Online (#192632): https://lists.openembedded.org/g/openembedded-core/message/192632 Mute This Topic: https://lists.openembedded.org/mt/103111135/21656 Group Owner: openembedded-core+ow...@lists.openembedded.org Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [arch...@mail-archive.com] -=-=-=-=-=-=-=-=-=-=-=-