commit:     6abc969109754ab086db2bac5be1029de1a015c3
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct 20 04:11:48 2023 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct 22 04:17:48 2023 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=6abc9691

ForkProcess: Implement fd_pipes via send_handle

This new fd_pipes implementation is only enabled
when the multiprocessing start method is not fork,
ensuring backward compatibility with existing
ForkProcess callers that rely on the fork start
method.

Note that the new fd_pipes implementation uses a
thread via run_in_executor, and threads are not
recommended for mixing with the fork start method
due to cpython issue 84559.

Bug: https://bugs.gentoo.org/915896
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_ForkProcess.py |   7 ++
 lib/portage/util/_async/ForkProcess.py        | 142 +++++++++++++++++++++-----
 2 files changed, 124 insertions(+), 25 deletions(-)

diff --git a/lib/portage/tests/process/test_ForkProcess.py 
b/lib/portage/tests/process/test_ForkProcess.py
index c07c60e9c6..bc0b836f11 100644
--- a/lib/portage/tests/process/test_ForkProcess.py
+++ b/lib/portage/tests/process/test_ForkProcess.py
@@ -4,6 +4,7 @@
 import functools
 import multiprocessing
 import tempfile
+from unittest.mock import patch
 
 from portage import os
 from portage.tests import TestCase
@@ -37,3 +38,9 @@ class ForkProcessTestCase(TestCase):
 
             with open(logfile.name, "rb") as output:
                 self.assertEqual(output.read(), test_string.encode("utf-8"))
+
+    def test_spawn_logfile_no_send_handle(self):
+        with patch(
+            "portage.util._async.ForkProcess.ForkProcess._HAVE_SEND_HANDLE", 
new=False
+        ):
+            self.test_spawn_logfile()

diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index 09e40a2d3e..6d216a5c43 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -10,6 +10,7 @@ import sys
 
 import portage
 from portage import os
+from portage.cache.mappings import slot_dict_class
 from portage.util.futures import asyncio
 from _emerge.SpawnProcess import SpawnProcess
 
@@ -26,29 +27,36 @@ class ForkProcess(SpawnProcess):
         "_proc_join_task",
     )
 
+    _file_names = ("connection", "slave_fd")
+    _files_dict = slot_dict_class(_file_names, prefix="")
+
     # Number of seconds between poll attempts for process exit status
     # (after the sentinel has become ready).
     _proc_join_interval = 0.1
 
-    def _start(self):
-        if self.fd_pipes or self.logfile:
-            if self.fd_pipes:
-                if multiprocessing.get_start_method() != "fork":
-                    raise NotImplementedError(
-                        'fd_pipes only supported with multiprocessing start 
method "fork"'
-                    )
-                super()._start()
-                return
+    _HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", 
False)
 
-            if self.logfile:
-                if multiprocessing.get_start_method() == "fork":
-                    # Use superclass pty support.
-                    super()._start()
-                    return
+    def _start(self):
+        if multiprocessing.get_start_method() == "fork":
+            # Backward compatibility mode.
+            super()._start()
+            return
+
+        # This mode supports multiprocessing start methods
+        # other than fork. Note that the fd_pipes implementation
+        # uses a thread via run_in_executor, and threads are not
+        # recommended for mixing with the fork start method due
+        # to cpython issue 84559.
+        if self.fd_pipes and not self._HAVE_SEND_HANDLE:
+            raise NotImplementedError(
+                'fd_pipes only supported with HAVE_SEND_HANDLE or 
multiprocessing start method "fork"'
+            )
 
-                # Log via multiprocessing.Pipe if necessary.
-                pr, pw = multiprocessing.Pipe(duplex=False)
-                self._child_connection = pw
+        if self.fd_pipes or self.logfile:
+            # Log via multiprocessing.Pipe if necessary.
+            connection, self._child_connection = multiprocessing.Pipe(
+                duplex=self._HAVE_SEND_HANDLE
+            )
 
         retval = self._spawn(self.args, fd_pipes=self.fd_pipes)
 
@@ -59,11 +67,71 @@ class ForkProcess(SpawnProcess):
             self._async_waitpid()
         else:
             self._child_connection.close()
+            self.fd_pipes = self.fd_pipes or {}
             stdout_fd = None
             if not self.background:
-                stdout_fd = os.dup(sys.__stdout__.fileno())
+                self.fd_pipes.setdefault(0, portage._get_stdin().fileno())
+                self.fd_pipes.setdefault(1, sys.__stdout__.fileno())
+                self.fd_pipes.setdefault(2, sys.__stderr__.fileno())
+                stdout_fd = os.dup(self.fd_pipes[1])
+
+            if self._HAVE_SEND_HANDLE:
+                master_fd, slave_fd = self._pipe(self.fd_pipes)
+                self.fd_pipes[1] = slave_fd
+                self.fd_pipes[2] = slave_fd
+                self._files = self._files_dict(connection=connection, 
slave_fd=slave_fd)
+            else:
+                master_fd = connection
+
+            self._start_main_task(
+                master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
+            )
 
-            self._start_main_task(pr, log_file_path=self.logfile, 
stdout_fd=stdout_fd)
+    @property
+    def _fd_pipes_send_handle(self):
+        """Returns True if we have a connection to implement fd_pipes via 
send_handle."""
+        return bool(
+            self._HAVE_SEND_HANDLE
+            and self._files
+            and getattr(self._files, "connection", False)
+        )
+
+    def _send_fd_pipes(self):
+        """
+        Communicate with _bootstrap to send fd_pipes via send_handle.
+        This performs blocking IO, intended for invocation via run_in_executor.
+        """
+        fd_list = list(set(self.fd_pipes.values()))
+        self._files.connection.send(
+            (self.fd_pipes, fd_list),
+        )
+        for fd in fd_list:
+            multiprocessing.reduction.send_handle(
+                self._files.connection,
+                fd,
+                self.pid,
+            )
+
+    async def _main(self, build_logger, pipe_logger, loop=None):
+        try:
+            if self._fd_pipes_send_handle:
+                await self.scheduler.run_in_executor(
+                    None,
+                    self._send_fd_pipes,
+                )
+        except asyncio.CancelledError:
+            self._main_cancel(build_logger, pipe_logger)
+            raise
+        finally:
+            if self._files:
+                if hasattr(self._files, "connection"):
+                    self._files.connection.close()
+                    del self._files.connection
+                if hasattr(self._files, "slave_fd"):
+                    os.close(self._files.slave_fd)
+                    del self._files.slave_fd
+
+        await super()._main(build_logger, pipe_logger, loop=loop)
 
     def _spawn(self, args, fd_pipes=None, **kwargs):
         """
@@ -102,9 +170,21 @@ class ForkProcess(SpawnProcess):
                     stdin_dup, fcntl.F_SETFD, fcntl.fcntl(stdin_fd, 
fcntl.F_GETFD)
                 )
                 fd_pipes[0] = stdin_dup
+
+            if self._fd_pipes_send_handle:
+                # Handle fd_pipes in _main instead.
+                fd_pipes = None
+
             self._proc = multiprocessing.Process(
                 target=self._bootstrap,
-                args=(self._child_connection, fd_pipes, target, args, kwargs),
+                args=(
+                    self._child_connection,
+                    self._HAVE_SEND_HANDLE,
+                    fd_pipes,
+                    target,
+                    args,
+                    kwargs,
+                ),
             )
             self._proc.start()
         finally:
@@ -186,7 +266,7 @@ class ForkProcess(SpawnProcess):
             self._proc_join_task = None
 
     @staticmethod
-    def _bootstrap(child_connection, fd_pipes, target, args, kwargs):
+    def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, 
kwargs):
         # Use default signal handlers in order to avoid problems
         # killing subprocesses as reported in bug #353239.
         signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -205,10 +285,22 @@ class ForkProcess(SpawnProcess):
         portage.locks._close_fds()
 
         if child_connection is not None:
-            fd_pipes = fd_pipes or {}
-            fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
-            fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
-            fd_pipes[child_connection.fileno()] = child_connection.fileno()
+            if have_send_handle:
+                fd_pipes, fd_list = child_connection.recv()
+                fd_pipes_map = {}
+                for fd in fd_list:
+                    fd_pipes_map[fd] = multiprocessing.reduction.recv_handle(
+                        child_connection
+                    )
+                child_connection.close()
+                for k, v in list(fd_pipes.items()):
+                    fd_pipes[k] = fd_pipes_map[v]
+
+            else:
+                fd_pipes = fd_pipes or {}
+                fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
+                fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
+                fd_pipes[child_connection.fileno()] = child_connection.fileno()
 
         if fd_pipes:
             # We don't exec, so use close_fds=False

Reply via email to