commit: 6b4252d3a0f12808a5bcce888b7f68e1f84b5301 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sat Jul 28 21:22:42 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Mon Aug 6 04:38:41 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=6b4252d3
Add asyncio.create_subprocess_exec support for python2 (bug 662388) The asyncio.create_subprocess_exec function is essential for using subprocesses in coroutines, so add support to do this for python2. This paves the way for extensive use of coroutines in portage, since coroutines are well-suited for many portage tasks that involve subprocesses. Bug: https://bugs.gentoo.org/662388 .../util/futures/asyncio/test_subprocess_exec.py | 184 ++++++--------------- lib/portage/util/futures/_asyncio/__init__.py | 53 ++++++ lib/portage/util/futures/_asyncio/process.py | 107 ++++++++++++ lib/portage/util/futures/_asyncio/streams.py | 96 +++++++++++ lib/portage/util/futures/compat_coroutine.py | 6 +- 5 files changed, 315 insertions(+), 131 deletions(-) diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py index 5a812ba6a..61646cb92 100644 --- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py +++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -3,61 +3,16 @@ import os import subprocess - -try: - from asyncio import create_subprocess_exec -except ImportError: - create_subprocess_exec = None +import sys from portage.process import find_binary from portage.tests import TestCase from portage.util._eventloop.global_event_loop import global_event_loop from portage.util.futures import asyncio -from portage.util.futures.executor.fork import ForkExecutor +from portage.util.futures._asyncio import create_subprocess_exec +from portage.util.futures._asyncio.streams import _reader as reader +from portage.util.futures.compat_coroutine import coroutine, coroutine_return from portage.util.futures.unix_events import DefaultEventLoopPolicy -from _emerge.PipeReader import PipeReader - - -def reader(input_file, loop=None): - """ - Asynchronously read a binary input file. - - @param input_file: binary input file - @type input_file: file - @param loop: event loop - @type loop: EventLoop - @return: bytes - @rtype: asyncio.Future (or compatible) - """ - loop = asyncio._wrap_loop(loop) - future = loop.create_future() - _Reader(future, input_file, loop) - return future - - -class _Reader(object): - def __init__(self, future, input_file, loop): - self._future = future - self._pipe_reader = PipeReader( - input_files={'input_file':input_file}, scheduler=loop) - - self._future.add_done_callback(self._cancel_callback) - self._pipe_reader.addExitListener(self._eof) - self._pipe_reader.start() - - def _cancel_callback(self, future): - if future.cancelled(): - self._cancel() - - def _eof(self, pipe_reader): - self._pipe_reader = None - self._future.set_result(pipe_reader.getvalue()) - - def _cancel(self): - if self._pipe_reader is not None and self._pipe_reader.poll() is None: - self._pipe_reader.removeExitListener(self._eof) - self._pipe_reader.cancel() - self._pipe_reader = None class SubprocessExecTestCase(TestCase): @@ -76,99 +31,66 @@ class SubprocessExecTestCase(TestCase): self.assertFalse(global_event_loop().is_closed()) def testEcho(self): - if create_subprocess_exec is None: - self.skipTest('create_subprocess_exec not implemented for python2') - args_tuple = (b'hello', b'world') echo_binary = find_binary("echo") self.assertNotEqual(echo_binary, None) echo_binary = echo_binary.encode() - # Use os.pipe(), since this loop does not implement the - # ReadTransport necessary for subprocess.PIPE support. - stdout_pr, stdout_pw = os.pipe() - stdout_pr = os.fdopen(stdout_pr, 'rb', 0) - stdout_pw = os.fdopen(stdout_pw, 'wb', 0) - files = [stdout_pr, stdout_pw] - def test(loop): - output = None - try: - with open(os.devnull, 'rb', 0) as devnull: - proc = loop.run_until_complete( - create_subprocess_exec( - echo_binary, *args_tuple, - stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) - - # This belongs exclusively to the subprocess now. - stdout_pw.close() - - output = asyncio.ensure_future( - reader(stdout_pr, loop=loop), loop=loop) - - self.assertEqual( - loop.run_until_complete(proc.wait()), os.EX_OK) - self.assertEqual( - tuple(loop.run_until_complete(output).split()), args_tuple) - finally: - if output is not None and not output.done(): - output.cancel() - for f in files: - f.close() + @coroutine + def test_coroutine(loop=None): - self._run_test(test) + proc = (yield create_subprocess_exec(echo_binary, *args_tuple, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + loop=loop)) - def testCat(self): - if create_subprocess_exec is None: - self.skipTest('create_subprocess_exec not implemented for python2') + out, err = (yield proc.communicate()) + self.assertEqual(tuple(out.split()), args_tuple) + self.assertEqual(proc.returncode, os.EX_OK) - stdin_data = b'hello world' - cat_binary = find_binary("cat") - self.assertNotEqual(cat_binary, None) - cat_binary = cat_binary.encode() + proc = (yield create_subprocess_exec( + 'bash', '-c', 'echo foo; echo bar 1>&2;', + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + loop=loop)) - # Use os.pipe(), since this loop does not implement the - # ReadTransport necessary for subprocess.PIPE support. - stdout_pr, stdout_pw = os.pipe() - stdout_pr = os.fdopen(stdout_pr, 'rb', 0) - stdout_pw = os.fdopen(stdout_pw, 'wb', 0) + out, err = (yield proc.communicate()) + self.assertEqual(out, b'foo\n') + self.assertEqual(err, b'bar\n') + self.assertEqual(proc.returncode, os.EX_OK) - stdin_pr, stdin_pw = os.pipe() - stdin_pr = os.fdopen(stdin_pr, 'rb', 0) - stdin_pw = os.fdopen(stdin_pw, 'wb', 0) + proc = (yield create_subprocess_exec( + 'bash', '-c', 'echo foo; echo bar 1>&2;', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + loop=loop)) - files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw] + out, err = (yield proc.communicate()) + self.assertEqual(out, b'foo\nbar\n') + self.assertEqual(err, None) + self.assertEqual(proc.returncode, os.EX_OK) - def test(loop): - output = None - try: - proc = loop.run_until_complete( - create_subprocess_exec( - cat_binary, - stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw)) + coroutine_return('success') - # These belong exclusively to the subprocess now. - stdout_pw.close() - stdin_pr.close() + self.assertEqual('success', + loop.run_until_complete(test_coroutine(loop=loop))) - output = asyncio.ensure_future( - reader(stdout_pr, loop=loop), loop=loop) + self._run_test(test) - with ForkExecutor(loop=loop) as executor: - writer = asyncio.ensure_future(loop.run_in_executor( - executor, stdin_pw.write, stdin_data), loop=loop) + def testCat(self): + stdin_data = b'hello world' + cat_binary = find_binary("cat") + self.assertNotEqual(cat_binary, None) + cat_binary = cat_binary.encode() - # This belongs exclusively to the writer now. - stdin_pw.close() - loop.run_until_complete(writer) + def test(loop): + proc = loop.run_until_complete( + create_subprocess_exec(cat_binary, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + loop=loop)) + + out, err = loop.run_until_complete(proc.communicate(input=stdin_data)) - self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) - self.assertEqual(loop.run_until_complete(output), stdin_data) - finally: - if output is not None and not output.done(): - output.cancel() - for f in files: - f.close() + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + self.assertEqual(out, stdin_data) self._run_test(test) @@ -178,8 +100,8 @@ class SubprocessExecTestCase(TestCase): requires an AbstractEventLoop.connect_read_pipe implementation (and a ReadTransport implementation for it to return). """ - if create_subprocess_exec is None: - self.skipTest('create_subprocess_exec not implemented for python2') + if sys.version_info.major < 3: + self.skipTest('ReadTransport not implemented for python2') args_tuple = (b'hello', b'world') echo_binary = find_binary("echo") @@ -192,7 +114,8 @@ class SubprocessExecTestCase(TestCase): create_subprocess_exec( echo_binary, *args_tuple, stdin=devnull, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + loop=loop)) self.assertEqual( tuple(loop.run_until_complete(proc.stdout.read()).split()), @@ -207,8 +130,8 @@ class SubprocessExecTestCase(TestCase): requires an AbstractEventLoop.connect_write_pipe implementation (and a WriteTransport implementation for it to return). """ - if create_subprocess_exec is None: - self.skipTest('create_subprocess_exec not implemented for python2') + if sys.version_info.major < 3: + self.skipTest('WriteTransport not implemented for python2') stdin_data = b'hello world' cat_binary = find_binary("cat") @@ -220,7 +143,8 @@ class SubprocessExecTestCase(TestCase): create_subprocess_exec( cat_binary, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + loop=loop)) # This buffers data when necessary to avoid blocking. proc.stdin.write(stdin_data) diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index acfd59396..faab98e47 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -20,6 +20,9 @@ __all__ = ( 'wait', ) +import subprocess +import sys + try: import asyncio as _real_asyncio except ImportError: @@ -45,6 +48,7 @@ from portage.util.futures.futures import ( InvalidStateError, TimeoutError, ) +from portage.util.futures._asyncio.process import _Process from portage.util.futures._asyncio.tasks import ( ALL_COMPLETED, FIRST_COMPLETED, @@ -105,6 +109,49 @@ def set_child_watcher(watcher): return get_event_loop_policy().set_child_watcher(watcher) +# Python 3.4 and later implement PEP 446, which makes newly +# created file descriptors non-inheritable by default. +_close_fds_default = sys.version_info < (3, 4) + + +def create_subprocess_exec(*args, **kwargs): + """ + Create a subprocess. + + @param args: program and arguments + @type args: str + @param stdin: stdin file descriptor + @type stdin: file or int + @param stdout: stdout file descriptor + @type stdout: file or int + @param stderr: stderr file descriptor + @type stderr: file or int + @param close_fds: close file descriptors + @type close_fds: bool + @param loop: asyncio.AbstractEventLoop (or compatible) + @type loop: event loop + @type kwargs: varies + @param kwargs: subprocess.Popen parameters + @rtype: asyncio.Future (or compatible) + @return: subset of asyncio.subprocess.Process interface + """ + loop = _wrap_loop(kwargs.pop('loop', None)) + kwargs.setdefault('close_fds', _close_fds_default) + if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop): + # Use the real asyncio loop and create_subprocess_exec. + return _real_asyncio.create_subprocess_exec(*args, loop=loop._loop, **kwargs) + + result = loop.create_future() + + result.set_result(_Process(subprocess.Popen( + args, + stdin=kwargs.pop('stdin', None), + stdout=kwargs.pop('stdout', None), + stderr=kwargs.pop('stderr', None), **kwargs), loop)) + + return result + + class Task(Future): """ Schedule the execution of a coroutine: wrap it in a future. A task @@ -127,6 +174,12 @@ def ensure_future(coro_or_future, loop=None): @rtype: asyncio.Future (or compatible) @return: an instance of Future """ + loop = _wrap_loop(loop) + if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop): + # Use the real asyncio loop and ensure_future. + return _real_asyncio.ensure_future( + coro_or_future, loop=loop._loop) + if isinstance(coro_or_future, Future): return coro_or_future raise NotImplementedError diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py new file mode 100644 index 000000000..020164c9b --- /dev/null +++ b/lib/portage/util/futures/_asyncio/process.py @@ -0,0 +1,107 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import portage +portage.proxy.lazyimport.lazyimport(globals(), + 'portage.util.futures:asyncio', +) +from portage.util.futures._asyncio.streams import _reader, _writer +from portage.util.futures.compat_coroutine import coroutine, coroutine_return + + +class _Process(object): + """ + Emulate a subset of the asyncio.subprocess.Process interface, + for python2. + """ + def __init__(self, proc, loop): + """ + @param proc: process instance + @type proc: subprocess.Popen + @param loop: asyncio.AbstractEventLoop (or compatible) + @type loop: event loop + """ + self._proc = proc + self._loop = loop + self.terminate = proc.terminate + self.kill = proc.kill + self.send_signal = proc.send_signal + self.pid = proc.pid + self._waiters = [] + loop._asyncio_child_watcher.\ + add_child_handler(self.pid, self._proc_exit) + + @property + def returncode(self): + return self._proc.returncode + + @coroutine + def communicate(self, input=None): + """ + Read data from stdout and stderr, until end-of-file is reached. + Wait for process to terminate. + + @param input: stdin content to write + @type input: bytes + @return: tuple (stdout_data, stderr_data) + @rtype: asyncio.Future (or compatible) + """ + futures = [] + for input_file in (self._proc.stdout, self._proc.stderr): + if input_file is None: + future = self._loop.create_future() + future.set_result(None) + else: + future = _reader(input_file, loop=self._loop) + futures.append(future) + + writer = None + if input is not None: + if self._proc.stdin is None: + raise TypeError('communicate: expected file or int, got {}'.format(type(self._proc.stdin))) + writer = asyncio.ensure_future(_writer(self._proc.stdin, input), loop=self._loop) + + try: + yield asyncio.wait(futures + [self.wait()], loop=self._loop) + finally: + if writer is not None: + if writer.done(): + # Consume expected exceptions. + try: + writer.result() + except EnvironmentError: + # This is normal if the other end of the pipe was closed. + pass + else: + writer.cancel() + + coroutine_return(tuple(future.result() for future in futures)) + + def wait(self): + """ + Wait for child process to terminate. Set and return returncode attribute. + + @return: returncode + @rtype: asyncio.Future (or compatible) + """ + waiter = self._loop.create_future() + if self.returncode is None: + self._waiters.append(waiter) + waiter.add_done_callback(self._waiter_cancel) + else: + waiter.set_result(self.returncode) + return waiter + + def _waiter_cancel(self, waiter): + if waiter.cancelled(): + try: + self._waiters.remove(waiter) + except ValueError: + pass + + def _proc_exit(self, pid, returncode): + self._proc.returncode = returncode + waiters = self._waiters + self._waiters = [] + for waiter in waiters: + waiter.set_result(returncode) diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py new file mode 100644 index 000000000..650a16491 --- /dev/null +++ b/lib/portage/util/futures/_asyncio/streams.py @@ -0,0 +1,96 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import errno +import os + +import portage +portage.proxy.lazyimport.lazyimport(globals(), + '_emerge.PipeReader:PipeReader', + 'portage.util.futures:asyncio', + 'portage.util.futures.unix_events:_set_nonblocking', +) +from portage.util.futures.compat_coroutine import coroutine + + +def _reader(input_file, loop=None): + """ + Asynchronously read a binary input file, and close it when + it reaches EOF. + + @param input_file: binary input file descriptor + @type input_file: file or int + @param loop: asyncio.AbstractEventLoop (or compatible) + @type loop: event loop + @return: bytes + @rtype: asyncio.Future (or compatible) + """ + loop = asyncio._wrap_loop(loop) + future = loop.create_future() + _Reader(future, input_file, loop) + return future + + +class _Reader(object): + def __init__(self, future, input_file, loop): + self._future = future + self._pipe_reader = PipeReader( + input_files={'input_file':input_file}, scheduler=loop) + + self._future.add_done_callback(self._cancel_callback) + self._pipe_reader.addExitListener(self._eof) + self._pipe_reader.start() + + def _cancel_callback(self, future): + if future.cancelled(): + self._cancel() + + def _eof(self, pipe_reader): + self._pipe_reader = None + self._future.set_result(pipe_reader.getvalue()) + + def _cancel(self): + if self._pipe_reader is not None and self._pipe_reader.poll() is None: + self._pipe_reader.removeExitListener(self._eof) + self._pipe_reader.cancel() + self._pipe_reader = None + + +@coroutine +def _writer(output_file, content, loop=None): + """ + Asynchronously write bytes to output file, and close it when + done. If an EnvironmentError other than EAGAIN is encountered, + which typically indicates that the other end of the pipe has + close, the error is raised. This function is a coroutine. + + @param output_file: output file descriptor + @type output_file: file or int + @param content: content to write + @type content: bytes + @param loop: asyncio.AbstractEventLoop (or compatible) + @type loop: event loop + """ + fd = output_file if isinstance(output_file, int) else output_file.fileno() + _set_nonblocking(fd) + loop = asyncio._wrap_loop(loop) + try: + while content: + waiter = loop.create_future() + loop.add_writer(fd, lambda: waiter.set_result(None)) + try: + yield waiter + while content: + try: + content = content[os.write(fd, content):] + except EnvironmentError as e: + if e.errno == errno.EAGAIN: + break + else: + raise + finally: + loop.remove_writer(fd) + except GeneratorExit: + raise + finally: + os.close(output_file) if isinstance(output_file, int) else output_file.close() diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py index 17400b74d..59fdc31b6 100644 --- a/lib/portage/util/futures/compat_coroutine.py +++ b/lib/portage/util/futures/compat_coroutine.py @@ -1,9 +1,13 @@ # Copyright 2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 -from portage.util.futures import asyncio import functools +import portage +portage.proxy.lazyimport.lazyimport(globals(), + 'portage.util.futures:asyncio', +) + def coroutine(generator_func): """