commit: 767a746035960a211229fac33aabc042d30234b7 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Oct 17 01:54:10 2023 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Tue Oct 17 16:33:57 2023 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=767a7460
ForkProcess: Support logging via multiprocessing.Pipe Detect the multiprocessing spawn start method, and log via multiprocessing.Pipe because file descriptor inheritance via fd_pipes does not work with the spawn start method. The included test cases set the multiprocessing start method to spawn in a subprocess, in order to prevent interference with the main process. Bug: https://bugs.gentoo.org/915123 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/_emerge/SpawnProcess.py | 9 +++- lib/portage/tests/process/meson.build | 1 + lib/portage/tests/process/test_AsyncFunction.py | 23 +++++++++- lib/portage/tests/process/test_ForkProcess.py | 39 +++++++++++++++++ lib/portage/util/_async/ForkProcess.py | 58 ++++++++++++++++++++++--- 5 files changed, 120 insertions(+), 10 deletions(-) diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py index 0e27f29c43..b4eabd07ad 100644 --- a/lib/_emerge/SpawnProcess.py +++ b/lib/_emerge/SpawnProcess.py @@ -151,6 +151,12 @@ class SpawnProcess(SubProcess): if can_log and not self.background: stdout_fd = os.dup(fd_pipes_orig[1]) + self._start_main_task( + master_fd, log_file_path=log_file_path, stdout_fd=stdout_fd + ) + self._registered = True + + def _start_main_task(self, pr, log_file_path=None, stdout_fd=None): build_logger = BuildLogger( env=self.env, log_path=log_file_path, @@ -162,14 +168,13 @@ class SpawnProcess(SubProcess): pipe_logger = PipeLogger( background=self.background, scheduler=self.scheduler, - input_fd=master_fd, + input_fd=pr, log_file_path=build_logger.stdin, stdout_fd=stdout_fd, ) pipe_logger.start() - self._registered = True self._main_task_cancel = functools.partial( self._main_cancel, build_logger, pipe_logger ) diff --git a/lib/portage/tests/process/meson.build b/lib/portage/tests/process/meson.build index 1cef1cc4a2..b86fa10fb1 100644 --- a/lib/portage/tests/process/meson.build +++ b/lib/portage/tests/process/meson.build @@ -1,6 +1,7 @@ py.install_sources( [ 'test_AsyncFunction.py', + 'test_ForkProcess.py', 'test_PipeLogger.py', 'test_PopenProcessBlockingIO.py', 'test_PopenProcess.py', diff --git a/lib/portage/tests/process/test_AsyncFunction.py b/lib/portage/tests/process/test_AsyncFunction.py index 975b590e53..e30ff770b3 100644 --- a/lib/portage/tests/process/test_AsyncFunction.py +++ b/lib/portage/tests/process/test_AsyncFunction.py @@ -1,6 +1,7 @@ # Copyright 2020-2023 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import functools import multiprocessing import sys @@ -46,10 +47,12 @@ class AsyncFunctionTestCase(TestCase): loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop)) @staticmethod - def _test_getpid_fork(): + def _test_getpid_fork(preexec_fn=None): """ Verify that portage.getpid() cache is updated in a forked child process. """ + if preexec_fn is not None: + preexec_fn() loop = asyncio._wrap_loop() proc = AsyncFunction(scheduler=loop, target=portage.getpid) proc.start() @@ -59,6 +62,24 @@ class AsyncFunctionTestCase(TestCase): def test_getpid_fork(self): self.assertTrue(self._test_getpid_fork()) + def test_spawn_getpid(self): + """ + Test portage.getpid() with multiprocessing spawn start method. + """ + loop = asyncio._wrap_loop() + proc = AsyncFunction( + scheduler=loop, + target=self._test_getpid_fork, + kwargs=dict( + preexec_fn=functools.partial( + multiprocessing.set_start_method, "spawn", force=True + ) + ), + ) + proc.start() + self.assertEqual(proc.wait(), 0) + self.assertTrue(proc.result) + def test_getpid_double_fork(self): """ Verify that portage.getpid() cache is updated correctly after diff --git a/lib/portage/tests/process/test_ForkProcess.py b/lib/portage/tests/process/test_ForkProcess.py new file mode 100644 index 0000000000..c07c60e9c6 --- /dev/null +++ b/lib/portage/tests/process/test_ForkProcess.py @@ -0,0 +1,39 @@ +# Copyright 2023 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +import functools +import multiprocessing +import tempfile + +from portage import os +from portage.tests import TestCase +from portage.util._async.ForkProcess import ForkProcess +from portage.util.futures import asyncio + + +class ForkProcessTestCase(TestCase): + @staticmethod + def _test_spawn_logfile(logfile, target): + multiprocessing.set_start_method("spawn", force=True) + loop = asyncio._wrap_loop() + proc = ForkProcess(scheduler=loop, target=target, logfile=logfile) + proc.start() + return proc.wait() + + def test_spawn_logfile(self): + """ + Test logfile with multiprocessing spawn start method. + """ + test_string = "hello world" + with tempfile.NamedTemporaryFile() as logfile: + loop = asyncio._wrap_loop() + proc = ForkProcess( + scheduler=loop, + target=self._test_spawn_logfile, + args=(logfile.name, functools.partial(print, test_string, end="")), + ) + proc.start() + self.assertEqual(proc.wait(), os.EX_OK) + + with open(logfile.name, "rb") as output: + self.assertEqual(output.read(), test_string.encode("utf-8")) diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index 1d2d220ed4..09e40a2d3e 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -21,6 +21,7 @@ class ForkProcess(SpawnProcess): __slots__ = ( "kwargs", "target", + "_child_connection", "_proc", "_proc_join_task", ) @@ -29,6 +30,41 @@ class ForkProcess(SpawnProcess): # (after the sentinel has become ready). _proc_join_interval = 0.1 + def _start(self): + if self.fd_pipes or self.logfile: + if self.fd_pipes: + if multiprocessing.get_start_method() != "fork": + raise NotImplementedError( + 'fd_pipes only supported with multiprocessing start method "fork"' + ) + super()._start() + return + + if self.logfile: + if multiprocessing.get_start_method() == "fork": + # Use superclass pty support. + super()._start() + return + + # Log via multiprocessing.Pipe if necessary. + pr, pw = multiprocessing.Pipe(duplex=False) + self._child_connection = pw + + retval = self._spawn(self.args, fd_pipes=self.fd_pipes) + + self.pid = retval[0] + self._registered = True + + if self._child_connection is None: + self._async_waitpid() + else: + self._child_connection.close() + stdout_fd = None + if not self.background: + stdout_fd = os.dup(sys.__stdout__.fileno()) + + self._start_main_task(pr, log_file_path=self.logfile, stdout_fd=stdout_fd) + def _spawn(self, args, fd_pipes=None, **kwargs): """ Override SpawnProcess._spawn to fork a subprocess that calls @@ -59,7 +95,7 @@ class ForkProcess(SpawnProcess): # things like PROPERTIES=interactive support. stdin_dup = None try: - stdin_fd = fd_pipes.get(0) + stdin_fd = fd_pipes.get(0) if fd_pipes else None if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno(): stdin_dup = os.dup(stdin_fd) fcntl.fcntl( @@ -68,7 +104,7 @@ class ForkProcess(SpawnProcess): fd_pipes[0] = stdin_dup self._proc = multiprocessing.Process( target=self._bootstrap, - args=(fd_pipes, target, args, kwargs), + args=(self._child_connection, fd_pipes, target, args, kwargs), ) self._proc.start() finally: @@ -150,7 +186,7 @@ class ForkProcess(SpawnProcess): self._proc_join_task = None @staticmethod - def _bootstrap(fd_pipes, target, args, kwargs): + def _bootstrap(child_connection, fd_pipes, target, args, kwargs): # Use default signal handlers in order to avoid problems # killing subprocesses as reported in bug #353239. signal.signal(signal.SIGINT, signal.SIG_DFL) @@ -167,14 +203,22 @@ class ForkProcess(SpawnProcess): pass portage.locks._close_fds() - # We don't exec, so use close_fds=False - # (see _setup_pipes docstring). - portage.process._setup_pipes(fd_pipes, close_fds=False) + + if child_connection is not None: + fd_pipes = fd_pipes or {} + fd_pipes[sys.stdout.fileno()] = child_connection.fileno() + fd_pipes[sys.stderr.fileno()] = child_connection.fileno() + fd_pipes[child_connection.fileno()] = child_connection.fileno() + + if fd_pipes: + # We don't exec, so use close_fds=False + # (see _setup_pipes docstring). + portage.process._setup_pipes(fd_pipes, close_fds=False) # Since multiprocessing.Process closes sys.__stdin__ and # makes sys.stdin refer to os.devnull, restore it when # appropriate. - if 0 in fd_pipes: + if fd_pipes and 0 in fd_pipes: # It's possible that sys.stdin.fileno() is already 0, # and in that case the above _setup_pipes call will # have already updated its identity via dup2. Otherwise,
