commit:     6b4252d3a0f12808a5bcce888b7f68e1f84b5301
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Jul 28 21:22:42 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Aug  6 04:38:41 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=6b4252d3

Add asyncio.create_subprocess_exec support for python2 (bug 662388)

The asyncio.create_subprocess_exec function is essential for
using subprocesses in coroutines, so add support to do this
for python2. This paves the way for extensive use of coroutines
in portage, since coroutines are well-suited for many portage
tasks that involve subprocesses.

Bug: https://bugs.gentoo.org/662388

 .../util/futures/asyncio/test_subprocess_exec.py   | 184 ++++++---------------
 lib/portage/util/futures/_asyncio/__init__.py      |  53 ++++++
 lib/portage/util/futures/_asyncio/process.py       | 107 ++++++++++++
 lib/portage/util/futures/_asyncio/streams.py       |  96 +++++++++++
 lib/portage/util/futures/compat_coroutine.py       |   6 +-
 5 files changed, 315 insertions(+), 131 deletions(-)

diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 5a812ba6a..61646cb92 100644
--- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -3,61 +3,16 @@
 
 import os
 import subprocess
-
-try:
-       from asyncio import create_subprocess_exec
-except ImportError:
-       create_subprocess_exec = None
+import sys
 
 from portage.process import find_binary
 from portage.tests import TestCase
 from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.futures import asyncio
-from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures._asyncio import create_subprocess_exec
+from portage.util.futures._asyncio.streams import _reader as reader
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from portage.util.futures.unix_events import DefaultEventLoopPolicy
-from _emerge.PipeReader import PipeReader
-
-
-def reader(input_file, loop=None):
-       """
-       Asynchronously read a binary input file.
-
-       @param input_file: binary input file
-       @type input_file: file
-       @param loop: event loop
-       @type loop: EventLoop
-       @return: bytes
-       @rtype: asyncio.Future (or compatible)
-       """
-       loop = asyncio._wrap_loop(loop)
-       future = loop.create_future()
-       _Reader(future, input_file, loop)
-       return future
-
-
-class _Reader(object):
-       def __init__(self, future, input_file, loop):
-               self._future = future
-               self._pipe_reader = PipeReader(
-                       input_files={'input_file':input_file}, scheduler=loop)
-
-               self._future.add_done_callback(self._cancel_callback)
-               self._pipe_reader.addExitListener(self._eof)
-               self._pipe_reader.start()
-
-       def _cancel_callback(self, future):
-               if future.cancelled():
-                       self._cancel()
-
-       def _eof(self, pipe_reader):
-               self._pipe_reader = None
-               self._future.set_result(pipe_reader.getvalue())
-
-       def _cancel(self):
-               if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
-                       self._pipe_reader.removeExitListener(self._eof)
-                       self._pipe_reader.cancel()
-                       self._pipe_reader = None
 
 
 class SubprocessExecTestCase(TestCase):
@@ -76,99 +31,66 @@ class SubprocessExecTestCase(TestCase):
                                
self.assertFalse(global_event_loop().is_closed())
 
        def testEcho(self):
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
-
                args_tuple = (b'hello', b'world')
                echo_binary = find_binary("echo")
                self.assertNotEqual(echo_binary, None)
                echo_binary = echo_binary.encode()
 
-               # Use os.pipe(), since this loop does not implement the
-               # ReadTransport necessary for subprocess.PIPE support.
-               stdout_pr, stdout_pw = os.pipe()
-               stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
-               stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
-               files = [stdout_pr, stdout_pw]
-
                def test(loop):
-                       output = None
-                       try:
-                               with open(os.devnull, 'rb', 0) as devnull:
-                                       proc = loop.run_until_complete(
-                                               create_subprocess_exec(
-                                               echo_binary, *args_tuple,
-                                               stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw))
-
-                               # This belongs exclusively to the subprocess 
now.
-                               stdout_pw.close()
-
-                               output = asyncio.ensure_future(
-                                       reader(stdout_pr, loop=loop), loop=loop)
-
-                               self.assertEqual(
-                                       loop.run_until_complete(proc.wait()), 
os.EX_OK)
-                               self.assertEqual(
-                                       
tuple(loop.run_until_complete(output).split()), args_tuple)
-                       finally:
-                               if output is not None and not output.done():
-                                       output.cancel()
-                               for f in files:
-                                       f.close()
+                       @coroutine
+                       def test_coroutine(loop=None):
 
-               self._run_test(test)
+                               proc = (yield 
create_subprocess_exec(echo_binary, *args_tuple,
+                                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                               loop=loop))
 
-       def testCat(self):
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+                               out, err = (yield proc.communicate())
+                               self.assertEqual(tuple(out.split()), args_tuple)
+                               self.assertEqual(proc.returncode, os.EX_OK)
 
-               stdin_data = b'hello world'
-               cat_binary = find_binary("cat")
-               self.assertNotEqual(cat_binary, None)
-               cat_binary = cat_binary.encode()
+                               proc = (yield create_subprocess_exec(
+                                               'bash', '-c', 'echo foo; echo 
bar 1>&2;',
+                                               stdout=subprocess.PIPE, 
stderr=subprocess.PIPE,
+                                               loop=loop))
 
-               # Use os.pipe(), since this loop does not implement the
-               # ReadTransport necessary for subprocess.PIPE support.
-               stdout_pr, stdout_pw = os.pipe()
-               stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
-               stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+                               out, err = (yield proc.communicate())
+                               self.assertEqual(out, b'foo\n')
+                               self.assertEqual(err, b'bar\n')
+                               self.assertEqual(proc.returncode, os.EX_OK)
 
-               stdin_pr, stdin_pw = os.pipe()
-               stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
-               stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
+                               proc = (yield create_subprocess_exec(
+                                               'bash', '-c', 'echo foo; echo 
bar 1>&2;',
+                                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                               loop=loop))
 
-               files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+                               out, err = (yield proc.communicate())
+                               self.assertEqual(out, b'foo\nbar\n')
+                               self.assertEqual(err, None)
+                               self.assertEqual(proc.returncode, os.EX_OK)
 
-               def test(loop):
-                       output = None
-                       try:
-                               proc = loop.run_until_complete(
-                                       create_subprocess_exec(
-                                       cat_binary,
-                                       stdin=stdin_pr, stdout=stdout_pw, 
stderr=stdout_pw))
+                               coroutine_return('success')
 
-                               # These belong exclusively to the subprocess 
now.
-                               stdout_pw.close()
-                               stdin_pr.close()
+                       self.assertEqual('success',
+                               
loop.run_until_complete(test_coroutine(loop=loop)))
 
-                               output = asyncio.ensure_future(
-                                       reader(stdout_pr, loop=loop), loop=loop)
+               self._run_test(test)
 
-                               with ForkExecutor(loop=loop) as executor:
-                                       writer = 
asyncio.ensure_future(loop.run_in_executor(
-                                               executor, stdin_pw.write, 
stdin_data), loop=loop)
+       def testCat(self):
+               stdin_data = b'hello world'
+               cat_binary = find_binary("cat")
+               self.assertNotEqual(cat_binary, None)
+               cat_binary = cat_binary.encode()
 
-                                       # This belongs exclusively to the 
writer now.
-                                       stdin_pw.close()
-                                       loop.run_until_complete(writer)
+               def test(loop):
+                       proc = loop.run_until_complete(
+                               create_subprocess_exec(cat_binary,
+                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                               loop=loop))
+
+                       out, err = 
loop.run_until_complete(proc.communicate(input=stdin_data))
 
-                               
self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
-                               
self.assertEqual(loop.run_until_complete(output), stdin_data)
-                       finally:
-                               if output is not None and not output.done():
-                                       output.cancel()
-                               for f in files:
-                                       f.close()
+                       self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+                       self.assertEqual(out, stdin_data)
 
                self._run_test(test)
 
@@ -178,8 +100,8 @@ class SubprocessExecTestCase(TestCase):
                requires an AbstractEventLoop.connect_read_pipe implementation
                (and a ReadTransport implementation for it to return).
                """
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+               if sys.version_info.major < 3:
+                       self.skipTest('ReadTransport not implemented for 
python2')
 
                args_tuple = (b'hello', b'world')
                echo_binary = find_binary("echo")
@@ -192,7 +114,8 @@ class SubprocessExecTestCase(TestCase):
                                        create_subprocess_exec(
                                        echo_binary, *args_tuple,
                                        stdin=devnull,
-                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                       loop=loop))
 
                        self.assertEqual(
                                
tuple(loop.run_until_complete(proc.stdout.read()).split()),
@@ -207,8 +130,8 @@ class SubprocessExecTestCase(TestCase):
                requires an AbstractEventLoop.connect_write_pipe implementation
                (and a WriteTransport implementation for it to return).
                """
-               if create_subprocess_exec is None:
-                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+               if sys.version_info.major < 3:
+                       self.skipTest('WriteTransport not implemented for 
python2')
 
                stdin_data = b'hello world'
                cat_binary = find_binary("cat")
@@ -220,7 +143,8 @@ class SubprocessExecTestCase(TestCase):
                                create_subprocess_exec(
                                cat_binary,
                                stdin=subprocess.PIPE,
-                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                               loop=loop))
 
                        # This buffers data when necessary to avoid blocking.
                        proc.stdin.write(stdin_data)

diff --git a/lib/portage/util/futures/_asyncio/__init__.py 
b/lib/portage/util/futures/_asyncio/__init__.py
index acfd59396..faab98e47 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -20,6 +20,9 @@ __all__ = (
        'wait',
 )
 
+import subprocess
+import sys
+
 try:
        import asyncio as _real_asyncio
 except ImportError:
@@ -45,6 +48,7 @@ from portage.util.futures.futures import (
        InvalidStateError,
        TimeoutError,
 )
+from portage.util.futures._asyncio.process import _Process
 from portage.util.futures._asyncio.tasks import (
        ALL_COMPLETED,
        FIRST_COMPLETED,
@@ -105,6 +109,49 @@ def set_child_watcher(watcher):
     return get_event_loop_policy().set_child_watcher(watcher)
 
 
+# Python 3.4 and later implement PEP 446, which makes newly
+# created file descriptors non-inheritable by default.
+_close_fds_default = sys.version_info < (3, 4)
+
+
+def create_subprocess_exec(*args, **kwargs):
+       """
+       Create a subprocess.
+
+       @param args: program and arguments
+       @type args: str
+       @param stdin: stdin file descriptor
+       @type stdin: file or int
+       @param stdout: stdout file descriptor
+       @type stdout: file or int
+       @param stderr: stderr file descriptor
+       @type stderr: file or int
+       @param close_fds: close file descriptors
+       @type close_fds: bool
+       @param loop: asyncio.AbstractEventLoop (or compatible)
+       @type loop: event loop
+       @type kwargs: varies
+       @param kwargs: subprocess.Popen parameters
+       @rtype: asyncio.Future (or compatible)
+       @return: subset of asyncio.subprocess.Process interface
+       """
+       loop = _wrap_loop(kwargs.pop('loop', None))
+       kwargs.setdefault('close_fds', _close_fds_default)
+       if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
+               # Use the real asyncio loop and create_subprocess_exec.
+               return _real_asyncio.create_subprocess_exec(*args, 
loop=loop._loop, **kwargs)
+
+       result = loop.create_future()
+
+       result.set_result(_Process(subprocess.Popen(
+               args,
+               stdin=kwargs.pop('stdin', None),
+               stdout=kwargs.pop('stdout', None),
+               stderr=kwargs.pop('stderr', None), **kwargs), loop))
+
+       return result
+
+
 class Task(Future):
        """
        Schedule the execution of a coroutine: wrap it in a future. A task
@@ -127,6 +174,12 @@ def ensure_future(coro_or_future, loop=None):
        @rtype: asyncio.Future (or compatible)
        @return: an instance of Future
        """
+       loop = _wrap_loop(loop)
+       if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
+               # Use the real asyncio loop and ensure_future.
+               return _real_asyncio.ensure_future(
+                       coro_or_future, loop=loop._loop)
+
        if isinstance(coro_or_future, Future):
                return coro_or_future
        raise NotImplementedError

diff --git a/lib/portage/util/futures/_asyncio/process.py 
b/lib/portage/util/futures/_asyncio/process.py
new file mode 100644
index 000000000..020164c9b
--- /dev/null
+++ b/lib/portage/util/futures/_asyncio/process.py
@@ -0,0 +1,107 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       'portage.util.futures:asyncio',
+)
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+
+
+class _Process(object):
+       """
+       Emulate a subset of the asyncio.subprocess.Process interface,
+       for python2.
+       """
+       def __init__(self, proc, loop):
+               """
+               @param proc: process instance
+               @type proc: subprocess.Popen
+               @param loop: asyncio.AbstractEventLoop (or compatible)
+               @type loop: event loop
+               """
+               self._proc = proc
+               self._loop = loop
+               self.terminate = proc.terminate
+               self.kill = proc.kill
+               self.send_signal = proc.send_signal
+               self.pid = proc.pid
+               self._waiters = []
+               loop._asyncio_child_watcher.\
+                       add_child_handler(self.pid, self._proc_exit)
+
+       @property
+       def returncode(self):
+               return self._proc.returncode
+
+       @coroutine
+       def communicate(self, input=None):
+               """
+               Read data from stdout and stderr, until end-of-file is reached.
+               Wait for process to terminate.
+
+               @param input: stdin content to write
+               @type input: bytes
+               @return: tuple (stdout_data, stderr_data)
+               @rtype: asyncio.Future (or compatible)
+               """
+               futures = []
+               for input_file in (self._proc.stdout, self._proc.stderr):
+                       if input_file is None:
+                               future = self._loop.create_future()
+                               future.set_result(None)
+                       else:
+                               future = _reader(input_file, loop=self._loop)
+                       futures.append(future)
+
+               writer = None
+               if input is not None:
+                       if self._proc.stdin is None:
+                               raise TypeError('communicate: expected file or 
int, got {}'.format(type(self._proc.stdin)))
+                       writer = 
asyncio.ensure_future(_writer(self._proc.stdin, input), loop=self._loop)
+
+               try:
+                       yield asyncio.wait(futures + [self.wait()], 
loop=self._loop)
+               finally:
+                       if writer is not None:
+                               if writer.done():
+                                       # Consume expected exceptions.
+                                       try:
+                                               writer.result()
+                                       except EnvironmentError:
+                                               # This is normal if the other 
end of the pipe was closed.
+                                               pass
+                               else:
+                                       writer.cancel()
+
+               coroutine_return(tuple(future.result() for future in futures))
+
+       def wait(self):
+               """
+               Wait for child process to terminate. Set and return returncode 
attribute.
+
+               @return: returncode
+               @rtype: asyncio.Future (or compatible)
+               """
+               waiter = self._loop.create_future()
+               if self.returncode is None:
+                       self._waiters.append(waiter)
+                       waiter.add_done_callback(self._waiter_cancel)
+               else:
+                       waiter.set_result(self.returncode)
+               return waiter
+
+       def _waiter_cancel(self, waiter):
+               if waiter.cancelled():
+                       try:
+                               self._waiters.remove(waiter)
+                       except ValueError:
+                               pass
+
+       def _proc_exit(self, pid, returncode):
+               self._proc.returncode = returncode
+               waiters = self._waiters
+               self._waiters = []
+               for waiter in waiters:
+                       waiter.set_result(returncode)

diff --git a/lib/portage/util/futures/_asyncio/streams.py 
b/lib/portage/util/futures/_asyncio/streams.py
new file mode 100644
index 000000000..650a16491
--- /dev/null
+++ b/lib/portage/util/futures/_asyncio/streams.py
@@ -0,0 +1,96 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import os
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       '_emerge.PipeReader:PipeReader',
+       'portage.util.futures:asyncio',
+       'portage.util.futures.unix_events:_set_nonblocking',
+)
+from portage.util.futures.compat_coroutine import coroutine
+
+
+def _reader(input_file, loop=None):
+       """
+       Asynchronously read a binary input file, and close it when
+       it reaches EOF.
+
+       @param input_file: binary input file descriptor
+       @type input_file: file or int
+       @param loop: asyncio.AbstractEventLoop (or compatible)
+       @type loop: event loop
+       @return: bytes
+       @rtype: asyncio.Future (or compatible)
+       """
+       loop = asyncio._wrap_loop(loop)
+       future = loop.create_future()
+       _Reader(future, input_file, loop)
+       return future
+
+
+class _Reader(object):
+       def __init__(self, future, input_file, loop):
+               self._future = future
+               self._pipe_reader = PipeReader(
+                       input_files={'input_file':input_file}, scheduler=loop)
+
+               self._future.add_done_callback(self._cancel_callback)
+               self._pipe_reader.addExitListener(self._eof)
+               self._pipe_reader.start()
+
+       def _cancel_callback(self, future):
+               if future.cancelled():
+                       self._cancel()
+
+       def _eof(self, pipe_reader):
+               self._pipe_reader = None
+               self._future.set_result(pipe_reader.getvalue())
+
+       def _cancel(self):
+               if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
+                       self._pipe_reader.removeExitListener(self._eof)
+                       self._pipe_reader.cancel()
+                       self._pipe_reader = None
+
+
+@coroutine
+def _writer(output_file, content, loop=None):
+       """
+       Asynchronously write bytes to output file, and close it when
+       done. If an EnvironmentError other than EAGAIN is encountered,
+       which typically indicates that the other end of the pipe has
+       close, the error is raised. This function is a coroutine.
+
+       @param output_file: output file descriptor
+       @type output_file: file or int
+       @param content: content to write
+       @type content: bytes
+       @param loop: asyncio.AbstractEventLoop (or compatible)
+       @type loop: event loop
+       """
+       fd = output_file if isinstance(output_file, int) else 
output_file.fileno()
+       _set_nonblocking(fd)
+       loop = asyncio._wrap_loop(loop)
+       try:
+               while content:
+                       waiter = loop.create_future()
+                       loop.add_writer(fd, lambda: waiter.set_result(None))
+                       try:
+                               yield waiter
+                               while content:
+                                       try:
+                                               content = content[os.write(fd, 
content):]
+                                       except EnvironmentError as e:
+                                               if e.errno == errno.EAGAIN:
+                                                       break
+                                               else:
+                                                       raise
+                       finally:
+                               loop.remove_writer(fd)
+       except GeneratorExit:
+               raise
+       finally:
+               os.close(output_file) if isinstance(output_file, int) else 
output_file.close()

diff --git a/lib/portage/util/futures/compat_coroutine.py 
b/lib/portage/util/futures/compat_coroutine.py
index 17400b74d..59fdc31b6 100644
--- a/lib/portage/util/futures/compat_coroutine.py
+++ b/lib/portage/util/futures/compat_coroutine.py
@@ -1,9 +1,13 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
-from portage.util.futures import asyncio
 import functools
 
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       'portage.util.futures:asyncio',
+)
+
 
 def coroutine(generator_func):
        """

Reply via email to