commit:     767a746035960a211229fac33aabc042d30234b7
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Oct 17 01:54:10 2023 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Oct 17 16:33:57 2023 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=767a7460

ForkProcess: Support logging via multiprocessing.Pipe

Detect the multiprocessing spawn start method, and log via
multiprocessing.Pipe because file descriptor inheritance
via fd_pipes does not work with the spawn start method.

The included test cases set the multiprocessing start
method to spawn in a subprocess, in order to prevent
interference with the main process.

Bug: https://bugs.gentoo.org/915123
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/_emerge/SpawnProcess.py                     |  9 +++-
 lib/portage/tests/process/meson.build           |  1 +
 lib/portage/tests/process/test_AsyncFunction.py | 23 +++++++++-
 lib/portage/tests/process/test_ForkProcess.py   | 39 +++++++++++++++++
 lib/portage/util/_async/ForkProcess.py          | 58 ++++++++++++++++++++++---
 5 files changed, 120 insertions(+), 10 deletions(-)

diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
index 0e27f29c43..b4eabd07ad 100644
--- a/lib/_emerge/SpawnProcess.py
+++ b/lib/_emerge/SpawnProcess.py
@@ -151,6 +151,12 @@ class SpawnProcess(SubProcess):
         if can_log and not self.background:
             stdout_fd = os.dup(fd_pipes_orig[1])
 
+        self._start_main_task(
+            master_fd, log_file_path=log_file_path, stdout_fd=stdout_fd
+        )
+        self._registered = True
+
+    def _start_main_task(self, pr, log_file_path=None, stdout_fd=None):
         build_logger = BuildLogger(
             env=self.env,
             log_path=log_file_path,
@@ -162,14 +168,13 @@ class SpawnProcess(SubProcess):
         pipe_logger = PipeLogger(
             background=self.background,
             scheduler=self.scheduler,
-            input_fd=master_fd,
+            input_fd=pr,
             log_file_path=build_logger.stdin,
             stdout_fd=stdout_fd,
         )
 
         pipe_logger.start()
 
-        self._registered = True
         self._main_task_cancel = functools.partial(
             self._main_cancel, build_logger, pipe_logger
         )

diff --git a/lib/portage/tests/process/meson.build 
b/lib/portage/tests/process/meson.build
index 1cef1cc4a2..b86fa10fb1 100644
--- a/lib/portage/tests/process/meson.build
+++ b/lib/portage/tests/process/meson.build
@@ -1,6 +1,7 @@
 py.install_sources(
     [
         'test_AsyncFunction.py',
+        'test_ForkProcess.py',
         'test_PipeLogger.py',
         'test_PopenProcessBlockingIO.py',
         'test_PopenProcess.py',

diff --git a/lib/portage/tests/process/test_AsyncFunction.py 
b/lib/portage/tests/process/test_AsyncFunction.py
index 975b590e53..e30ff770b3 100644
--- a/lib/portage/tests/process/test_AsyncFunction.py
+++ b/lib/portage/tests/process/test_AsyncFunction.py
@@ -1,6 +1,7 @@
 # Copyright 2020-2023 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
+import functools
 import multiprocessing
 import sys
 
@@ -46,10 +47,12 @@ class AsyncFunctionTestCase(TestCase):
         loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop))
 
     @staticmethod
-    def _test_getpid_fork():
+    def _test_getpid_fork(preexec_fn=None):
         """
         Verify that portage.getpid() cache is updated in a forked child 
process.
         """
+        if preexec_fn is not None:
+            preexec_fn()
         loop = asyncio._wrap_loop()
         proc = AsyncFunction(scheduler=loop, target=portage.getpid)
         proc.start()
@@ -59,6 +62,24 @@ class AsyncFunctionTestCase(TestCase):
     def test_getpid_fork(self):
         self.assertTrue(self._test_getpid_fork())
 
+    def test_spawn_getpid(self):
+        """
+        Test portage.getpid() with multiprocessing spawn start method.
+        """
+        loop = asyncio._wrap_loop()
+        proc = AsyncFunction(
+            scheduler=loop,
+            target=self._test_getpid_fork,
+            kwargs=dict(
+                preexec_fn=functools.partial(
+                    multiprocessing.set_start_method, "spawn", force=True
+                )
+            ),
+        )
+        proc.start()
+        self.assertEqual(proc.wait(), 0)
+        self.assertTrue(proc.result)
+
     def test_getpid_double_fork(self):
         """
         Verify that portage.getpid() cache is updated correctly after

diff --git a/lib/portage/tests/process/test_ForkProcess.py 
b/lib/portage/tests/process/test_ForkProcess.py
new file mode 100644
index 0000000000..c07c60e9c6
--- /dev/null
+++ b/lib/portage/tests/process/test_ForkProcess.py
@@ -0,0 +1,39 @@
+# Copyright 2023 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+import functools
+import multiprocessing
+import tempfile
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.ForkProcess import ForkProcess
+from portage.util.futures import asyncio
+
+
+class ForkProcessTestCase(TestCase):
+    @staticmethod
+    def _test_spawn_logfile(logfile, target):
+        multiprocessing.set_start_method("spawn", force=True)
+        loop = asyncio._wrap_loop()
+        proc = ForkProcess(scheduler=loop, target=target, logfile=logfile)
+        proc.start()
+        return proc.wait()
+
+    def test_spawn_logfile(self):
+        """
+        Test logfile with multiprocessing spawn start method.
+        """
+        test_string = "hello world"
+        with tempfile.NamedTemporaryFile() as logfile:
+            loop = asyncio._wrap_loop()
+            proc = ForkProcess(
+                scheduler=loop,
+                target=self._test_spawn_logfile,
+                args=(logfile.name, functools.partial(print, test_string, 
end="")),
+            )
+            proc.start()
+            self.assertEqual(proc.wait(), os.EX_OK)
+
+            with open(logfile.name, "rb") as output:
+                self.assertEqual(output.read(), test_string.encode("utf-8"))

diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index 1d2d220ed4..09e40a2d3e 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -21,6 +21,7 @@ class ForkProcess(SpawnProcess):
     __slots__ = (
         "kwargs",
         "target",
+        "_child_connection",
         "_proc",
         "_proc_join_task",
     )
@@ -29,6 +30,41 @@ class ForkProcess(SpawnProcess):
     # (after the sentinel has become ready).
     _proc_join_interval = 0.1
 
+    def _start(self):
+        if self.fd_pipes or self.logfile:
+            if self.fd_pipes:
+                if multiprocessing.get_start_method() != "fork":
+                    raise NotImplementedError(
+                        'fd_pipes only supported with multiprocessing start 
method "fork"'
+                    )
+                super()._start()
+                return
+
+            if self.logfile:
+                if multiprocessing.get_start_method() == "fork":
+                    # Use superclass pty support.
+                    super()._start()
+                    return
+
+                # Log via multiprocessing.Pipe if necessary.
+                pr, pw = multiprocessing.Pipe(duplex=False)
+                self._child_connection = pw
+
+        retval = self._spawn(self.args, fd_pipes=self.fd_pipes)
+
+        self.pid = retval[0]
+        self._registered = True
+
+        if self._child_connection is None:
+            self._async_waitpid()
+        else:
+            self._child_connection.close()
+            stdout_fd = None
+            if not self.background:
+                stdout_fd = os.dup(sys.__stdout__.fileno())
+
+            self._start_main_task(pr, log_file_path=self.logfile, 
stdout_fd=stdout_fd)
+
     def _spawn(self, args, fd_pipes=None, **kwargs):
         """
         Override SpawnProcess._spawn to fork a subprocess that calls
@@ -59,7 +95,7 @@ class ForkProcess(SpawnProcess):
         # things like PROPERTIES=interactive support.
         stdin_dup = None
         try:
-            stdin_fd = fd_pipes.get(0)
+            stdin_fd = fd_pipes.get(0) if fd_pipes else None
             if stdin_fd is not None and stdin_fd == 
portage._get_stdin().fileno():
                 stdin_dup = os.dup(stdin_fd)
                 fcntl.fcntl(
@@ -68,7 +104,7 @@ class ForkProcess(SpawnProcess):
                 fd_pipes[0] = stdin_dup
             self._proc = multiprocessing.Process(
                 target=self._bootstrap,
-                args=(fd_pipes, target, args, kwargs),
+                args=(self._child_connection, fd_pipes, target, args, kwargs),
             )
             self._proc.start()
         finally:
@@ -150,7 +186,7 @@ class ForkProcess(SpawnProcess):
             self._proc_join_task = None
 
     @staticmethod
-    def _bootstrap(fd_pipes, target, args, kwargs):
+    def _bootstrap(child_connection, fd_pipes, target, args, kwargs):
         # Use default signal handlers in order to avoid problems
         # killing subprocesses as reported in bug #353239.
         signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -167,14 +203,22 @@ class ForkProcess(SpawnProcess):
             pass
 
         portage.locks._close_fds()
-        # We don't exec, so use close_fds=False
-        # (see _setup_pipes docstring).
-        portage.process._setup_pipes(fd_pipes, close_fds=False)
+
+        if child_connection is not None:
+            fd_pipes = fd_pipes or {}
+            fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
+            fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
+            fd_pipes[child_connection.fileno()] = child_connection.fileno()
+
+        if fd_pipes:
+            # We don't exec, so use close_fds=False
+            # (see _setup_pipes docstring).
+            portage.process._setup_pipes(fd_pipes, close_fds=False)
 
         # Since multiprocessing.Process closes sys.__stdin__ and
         # makes sys.stdin refer to os.devnull, restore it when
         # appropriate.
-        if 0 in fd_pipes:
+        if fd_pipes and 0 in fd_pipes:
             # It's possible that sys.stdin.fileno() is already 0,
             # and in that case the above _setup_pipes call will
             # have already updated its identity via dup2. Otherwise,

Reply via email to