commit:     ed685eb659fccf6e4031d12fa8a59c3829ef1155
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar  7 06:56:36 2021 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Mar  7 07:01:44 2021 +0000

Removed unused portage.util.futures.transports

Signed-off-by: Zac Medico <zmedico <AT>>

 lib/portage/util/futures/  |  87 ------
 lib/portage/util/futures/ | 492 +-------------------------------
 2 files changed, 2 insertions(+), 577 deletions(-)

diff --git a/lib/portage/util/futures/ 
deleted file mode 100644
index 016ecbef8..000000000
--- a/lib/portage/util/futures/
+++ /dev/null
@@ -1,87 +0,0 @@
-# Copyright 2018 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-from asyncio.transports import Transport as _Transport
-class _FlowControlMixin(_Transport):
-       """
-       This is identical to the standard library's private
-       asyncio.transports._FlowControlMixin class.
-       All the logic for (write) flow control in a mix-in base class.
-       The subclass must implement get_write_buffer_size().  It must call
-       _maybe_pause_protocol() whenever the write buffer size increases,
-       and _maybe_resume_protocol() whenever it decreases.  It may also
-       override set_write_buffer_limits() (e.g. to specify different
-       defaults).
-       The subclass constructor must call super().__init__(extra).  This
-       will call set_write_buffer_limits().
-       The user may call set_write_buffer_limits() and
-       get_write_buffer_size(), and their protocol's pause_writing() and
-       resume_writing() may be called.
-       """
-       def __init__(self, extra=None, loop=None):
-               super().__init__(extra)
-               assert loop is not None
-               self._loop = loop
-               self._protocol_paused = False
-               self._set_write_buffer_limits()
-       def _maybe_pause_protocol(self):
-               size = self.get_write_buffer_size()
-               if size <= self._high_water:
-                       return
-               if not self._protocol_paused:
-                       self._protocol_paused = True
-                       try:
-                               self._protocol.pause_writing()
-                       except Exception as exc:
-                               self._loop.call_exception_handler({
-                                       'message': 'protocol.pause_writing() 
-                                       'exception': exc,
-                                       'transport': self,
-                                       'protocol': self._protocol,
-                               })
-       def _maybe_resume_protocol(self):
-               if (self._protocol_paused and
-                       self.get_write_buffer_size() <= self._low_water):
-                       self._protocol_paused = False
-                       try:
-                               self._protocol.resume_writing()
-                       except Exception as exc:
-                               self._loop.call_exception_handler({
-                                       'message': 'protocol.resume_writing() 
-                                       'exception': exc,
-                                       'transport': self,
-                                       'protocol': self._protocol,
-                               })
-       def get_write_buffer_limits(self):
-               return (self._low_water, self._high_water)
-       def _set_write_buffer_limits(self, high=None, low=None):
-               if high is None:
-                       if low is None:
-                               high = 64*1024
-                       else:
-                               high = 4*low
-               if low is None:
-                       low = high // 4
-               if not high >= low >= 0:
-                       raise ValueError('high (%r) must be >= low (%r) must be 
>= 0' %
-                                                        (high, low))
-               self._high_water = high
-               self._low_water = low
-       def set_write_buffer_limits(self, high=None, low=None):
-               self._set_write_buffer_limits(high=high, low=low)
-               self._maybe_pause_protocol()
-       def get_write_buffer_size(self):
-               raise NotImplementedError

diff --git a/lib/portage/util/futures/ 
index 16a9e12b7..9d5445943 100644
--- a/lib/portage/util/futures/
+++ b/lib/portage/util/futures/
@@ -1,4 +1,4 @@
-# Copyright 2018 Gentoo Foundation
+# Copyright 2018-2021 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 __all__ = (
@@ -7,32 +7,15 @@ __all__ = (
 import asyncio as _real_asyncio
-from asyncio.base_subprocess import BaseSubprocessTransport as 
 from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
-from asyncio.transports import (
-       ReadTransport as _ReadTransport,
-       WriteTransport as _WriteTransport,
-import errno
 import fcntl
-import functools
-import logging
 import os
-import socket
-import stat
-import subprocess
-import sys
 from portage.util._eventloop.global_event_loop import (
        global_event_loop as _global_event_loop,
-from portage.util.futures import (
-       asyncio,
-       events,
-from portage.util.futures.transports import _FlowControlMixin
+from portage.util.futures import events
 class _PortageEventLoop(events.AbstractEventLoop):
@@ -89,158 +72,6 @@ class _PortageEventLoop(events.AbstractEventLoop):
                return self
-       def create_task(self, coro):
-               """
-               Schedule a coroutine object.
-               @type coro: coroutine
-               @param coro: a coroutine to schedule
-               @rtype: asyncio.Task
-               @return: a task object
-               """
-               return asyncio.Task(coro, loop=self)
-       def connect_read_pipe(self, protocol_factory, pipe):
-               """
-               Register read pipe in event loop. Set the pipe to non-blocking 
-               @type protocol_factory: callable
-               @param protocol_factory: must instantiate object with Protocol 
-               @type pipe: file
-               @param pipe: a pipe to read from
-               @rtype: asyncio.Future
-               @return: Return pair (transport, protocol), where transport 
supports the
-                       ReadTransport interface.
-               """
-               protocol = protocol_factory()
-               result = self.create_future()
-               waiter = self.create_future()
-               transport = self._make_read_pipe_transport(pipe, protocol, 
-               def waiter_callback(waiter):
-                       try:
-                               waiter.result()
-                       except Exception as e:
-                               transport.close()
-                               result.set_exception(e)
-                       else:
-                               result.set_result((transport, protocol))
-               waiter.add_done_callback(waiter_callback)
-               return result
-       def connect_write_pipe(self, protocol_factory, pipe):
-               """
-               Register write pipe in event loop. Set the pipe to non-blocking 
-               @type protocol_factory: callable
-               @param protocol_factory: must instantiate object with Protocol 
-               @type pipe: file
-               @param pipe: a pipe to write to
-               @rtype: asyncio.Future
-               @return: Return pair (transport, protocol), where transport 
supports the
-                       WriteTransport interface.
-               """
-               protocol = protocol_factory()
-               result = self.create_future()
-               waiter = self.create_future()
-               transport = self._make_write_pipe_transport(pipe, protocol, 
-               def waiter_callback(waiter):
-                       try:
-                               waiter.result()
-                       except Exception as e:
-                               transport.close()
-                               result.set_exception(e)
-                       else:
-                               result.set_result((transport, protocol))
-               waiter.add_done_callback(waiter_callback)
-               return result
-       def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
-               """
-               Run subprocesses asynchronously using the subprocess module.
-               @type protocol_factory: callable
-               @param protocol_factory: must instantiate a subclass of the
-                       asyncio.SubprocessProtocol class
-               @type program: str or bytes
-               @param program: the program to execute
-               @type args: str or bytes
-               @param args: program's arguments
-               @type kwargs: varies
-               @param kwargs: subprocess.Popen parameters
-               @rtype: asyncio.Future
-               @return: Returns a pair of (transport, protocol), where 
-                       is an instance of BaseSubprocessTransport
-               """
-               # python2.7 does not allow arguments with defaults after *args
-               stdin = kwargs.pop('stdin', subprocess.PIPE)
-               stdout = kwargs.pop('stdout', subprocess.PIPE)
-               stderr = kwargs.pop('stderr', subprocess.PIPE)
-               universal_newlines = kwargs.pop('universal_newlines', False)
-               shell = kwargs.pop('shell', False)
-               bufsize = kwargs.pop('bufsize', 0)
-               if universal_newlines:
-                       raise ValueError("universal_newlines must be False")
-               if shell:
-                       raise ValueError("shell must be False")
-               if bufsize != 0:
-                       raise ValueError("bufsize must be 0")
-               popen_args = (program,) + args
-               for arg in popen_args:
-                       if not isinstance(arg, (str, bytes)):
-                               raise TypeError("program arguments must be "
-                                                               "a bytes or 
text string, not %s"
-                                                               % 
-               result = self.create_future()
-               self._make_subprocess_transport(
-                       result, protocol_factory(), popen_args, False, stdin, 
stdout, stderr,
-                       bufsize, **kwargs)
-               return result
-       def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
-                                                                 extra=None):
-               return _UnixReadPipeTransport(self, pipe, protocol, waiter, 
-       def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
-                                                                  extra=None):
-               return _UnixWritePipeTransport(self, pipe, protocol, waiter, 
-       def _make_subprocess_transport(self, result, protocol, args, shell,
-               stdin, stdout, stderr, bufsize, extra=None, **kwargs):
-               waiter = self.create_future()
-               transp = _UnixSubprocessTransport(self,
-                       protocol, args, shell, stdin, stdout, stderr, bufsize,
-                       waiter=waiter, extra=extra,
-                       **kwargs)
-               self._loop._asyncio_child_watcher.add_child_handler(
-                       transp.get_pid(), self._child_watcher_callback, transp)
-               waiter.add_done_callback(functools.partial(
-                       self._subprocess_transport_callback, transp, protocol, 
-       def _subprocess_transport_callback(self, transp, protocol, result, 
-               if waiter.exception() is None:
-                       result.set_result((transp, protocol))
-               else:
-                       transp.close()
-                       wait_transp = asyncio.ensure_future(transp._wait(), 
-                       wait_transp.add_done_callback(
-                               result, waiter.exception()))
-       def _child_watcher_callback(self, pid, returncode, transp):
-               self.call_soon_threadsafe(transp._process_exited, returncode)
-       def _subprocess_transport_failure(self, result, exception, wait_transp):
-               result.set_exception(wait_transp.exception() or exception)
 if hasattr(os, 'set_blocking'):
        def _set_nonblocking(fd):
@@ -252,325 +83,6 @@ else:
                fcntl.fcntl(fd, fcntl.F_SETFL, flags)
-class _UnixReadPipeTransport(_ReadTransport):
-       """
-       This is identical to the standard library's private
-       asyncio.unix_events._UnixReadPipeTransport class, except that it
-       only calls public AbstractEventLoop methods.
-       """
-       max_size = 256 * 1024  # max bytes we read in one event loop iteration
-       def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
-               super().__init__(extra)
-               self._extra['pipe'] = pipe
-               self._loop = loop
-               self._pipe = pipe
-               self._fileno = pipe.fileno()
-               self._protocol = protocol
-               self._closing = False
-               mode = os.fstat(self._fileno).st_mode
-               if not (stat.S_ISFIFO(mode) or
-                               stat.S_ISSOCK(mode) or
-                               stat.S_ISCHR(mode)):
-                       self._pipe = None
-                       self._fileno = None
-                       self._protocol = None
-                       raise ValueError("Pipe transport is for pipes/sockets 
-               _set_nonblocking(self._fileno)
-               self._loop.call_soon(self._protocol.connection_made, self)
-               # only start reading when connection_made() has been called
-               self._loop.call_soon(self._loop.add_reader,
-                                                        self._fileno, 
-               if waiter is not None:
-                       # only wake up the waiter when connection_made() has 
been called
-                       self._loop.call_soon(
-                               lambda: None if waiter.cancelled() else 
-       def _read_ready(self):
-               try:
-                       data =, self.max_size)
-               except (BlockingIOError, InterruptedError):
-                       pass
-               except OSError as exc:
-                       self._fatal_error(exc, 'Fatal read error on pipe 
-               else:
-                       if data:
-                               self._protocol.data_received(data)
-                       else:
-                               self._closing = True
-                               self._loop.remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, None)
-       def pause_reading(self):
-               self._loop.remove_reader(self._fileno)
-       def resume_reading(self):
-               self._loop.add_reader(self._fileno, self._read_ready)
-       def set_protocol(self, protocol):
-               self._protocol = protocol
-       def get_protocol(self):
-               return self._protocol
-       def is_closing(self):
-               return self._closing
-       def close(self):
-               if not self._closing:
-                       self._close(None)
-       def _fatal_error(self, exc, message='Fatal error on pipe transport'):
-               # should be called by exception handler only
-               if (isinstance(exc, OSError) and exc.errno == errno.EIO):
-                       if self._loop.get_debug():
-                               logging.debug("%r: %s", self, message, 
-               else:
-                       self._loop.call_exception_handler({
-                               'message': message,
-                               'exception': exc,
-                               'transport': self,
-                               'protocol': self._protocol,
-                       })
-               self._close(exc)
-       def _close(self, exc):
-               self._closing = True
-               self._loop.remove_reader(self._fileno)
-               self._loop.call_soon(self._call_connection_lost, exc)
-       def _call_connection_lost(self, exc):
-               try:
-                       self._protocol.connection_lost(exc)
-               finally:
-                       self._pipe.close()
-                       self._pipe = None
-                       self._protocol = None
-                       self._loop = None
-class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport):
-       """
-       This is identical to the standard library's private
-       asyncio.unix_events._UnixWritePipeTransport class, except that it
-       only calls public AbstractEventLoop methods.
-       """
-       def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
-               super().__init__(extra, loop)
-               self._extra['pipe'] = pipe
-               self._pipe = pipe
-               self._fileno = pipe.fileno()
-               self._protocol = protocol
-               self._buffer = bytearray()
-               self._conn_lost = 0
-               self._closing = False  # Set when close() or write_eof() called.
-               mode = os.fstat(self._fileno).st_mode
-               is_char = stat.S_ISCHR(mode)
-               is_fifo = stat.S_ISFIFO(mode)
-               is_socket = stat.S_ISSOCK(mode)
-               if not (is_char or is_fifo or is_socket):
-                       self._pipe = None
-                       self._fileno = None
-                       self._protocol = None
-                       raise ValueError("Pipe transport is only for "
-                                                        "pipes, sockets and 
character devices")
-               _set_nonblocking(self._fileno)
-               self._loop.call_soon(self._protocol.connection_made, self)
-               # On AIX, the reader trick (to be notified when the read end of 
-               # socket is closed) only works for sockets. On other platforms 
-               # works for pipes and sockets. (Exception: OS X 10.4?  Issue 
-               if is_socket or (is_fifo and not 
-                       # only start reading when connection_made() has been 
-                       self._loop.call_soon(self._loop.add_reader,
-                                                                self._fileno, 
-               if waiter is not None:
-                       # only wake up the waiter when connection_made() has 
been called
-                       self._loop.call_soon(
-                               lambda: None if waiter.cancelled() else 
-       def get_write_buffer_size(self):
-               return len(self._buffer)
-       def _read_ready(self):
-               # Pipe was closed by peer.
-               if self._loop.get_debug():
-             "%r was closed by peer", self)
-               if self._buffer:
-                       self._close(BrokenPipeError())
-               else:
-                       self._close()
-       def write(self, data):
-               assert isinstance(data, (bytes, bytearray, memoryview)), 
-               if isinstance(data, bytearray):
-                       data = memoryview(data)
-               if not data:
-                       return
-               if self._conn_lost or self._closing:
-                       self._conn_lost += 1
-                       return
-               if not self._buffer:
-                       # Attempt to send it right away first.
-                       try:
-                               n = os.write(self._fileno, data)
-                       except (BlockingIOError, InterruptedError):
-                               n = 0
-                       except Exception as exc:
-                               self._conn_lost += 1
-                               self._fatal_error(exc, 'Fatal write error on 
pipe transport')
-                               return
-                       if n == len(data):
-                               return
-                       if n > 0:
-                               data = memoryview(data)[n:]
-                       self._loop.add_writer(self._fileno, self._write_ready)
-               self._buffer += data
-               self._maybe_pause_protocol()
-       def _write_ready(self):
-               assert self._buffer, 'Data should not be empty'
-               try:
-                       n = os.write(self._fileno, self._buffer)
-               except (BlockingIOError, InterruptedError):
-                       pass
-               except Exception as exc:
-                       self._buffer.clear()
-                       self._conn_lost += 1
-                       # Remove writer here, _fatal_error() doesn't it
-                       # because _buffer is empty.
-                       self._loop.remove_writer(self._fileno)
-                       self._fatal_error(exc, 'Fatal write error on pipe 
-               else:
-                       if n == len(self._buffer):
-                               self._buffer.clear()
-                               self._loop.remove_writer(self._fileno)
-                               self._maybe_resume_protocol()  # May append to 
-                               if self._closing:
-                                       self._loop.remove_reader(self._fileno)
-                                       self._call_connection_lost(None)
-                               return
-                       if n > 0:
-                               del self._buffer[:n]
-       def can_write_eof(self):
-               return True
-       def write_eof(self):
-               if self._closing:
-                       return
-               assert self._pipe
-               self._closing = True
-               if not self._buffer:
-                       self._loop.remove_reader(self._fileno)
-                       self._loop.call_soon(self._call_connection_lost, None)
-       def set_protocol(self, protocol):
-               self._protocol = protocol
-       def get_protocol(self):
-               return self._protocol
-       def is_closing(self):
-               return self._closing
-       def close(self):
-               if self._pipe is not None and not self._closing:
-                       # write_eof is all what we needed to close the write 
-                       self.write_eof()
-       def abort(self):
-               self._close(None)
-       def _fatal_error(self, exc, message='Fatal error on pipe transport'):
-               # should be called by exception handler only
-               if isinstance(exc,
-                       (BrokenPipeError, ConnectionResetError, 
-                       if self._loop.get_debug():
-                               logging.debug("%r: %s", self, message, 
-               else:
-                       self._loop.call_exception_handler({
-                               'message': message,
-                               'exception': exc,
-                               'transport': self,
-                               'protocol': self._protocol,
-                       })
-               self._close(exc)
-       def _close(self, exc=None):
-               self._closing = True
-               if self._buffer:
-                       self._loop.remove_writer(self._fileno)
-               self._buffer.clear()
-               self._loop.remove_reader(self._fileno)
-               self._loop.call_soon(self._call_connection_lost, exc)
-       def _call_connection_lost(self, exc):
-               try:
-                       self._protocol.connection_lost(exc)
-               finally:
-                       self._pipe.close()
-                       self._pipe = None
-                       self._protocol = None
-                       self._loop = None
-if hasattr(os, 'set_inheritable'):
-       # Python 3.4 and newer
-       _set_inheritable = os.set_inheritable
-       def _set_inheritable(fd, inheritable):
-               cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
-               old = fcntl.fcntl(fd, fcntl.F_GETFD)
-               if not inheritable:
-                       fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
-               else:
-                       fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
-class _UnixSubprocessTransport(_BaseSubprocessTransport):
-       """
-       This is identical to the standard library's private
-       asyncio.unix_events._UnixSubprocessTransport class, except that it
-       only calls public AbstractEventLoop methods.
-       """
-       def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
-               stdin_w = None
-               if stdin == subprocess.PIPE:
-                       # Use a socket pair for stdin, since not all platforms
-                       # support selecting read events on the write end of a
-                       # socket (which we use in order to detect closing of the
-                       # other end).  Notably this is needed on AIX, and works
-                       # just fine on other platforms.
-                       stdin, stdin_w = socket.socketpair()
-                       # Mark the write end of the stdin pipe as 
-                       # needed by close_fds=False on Python 3.3 and older
-                       # (Python 3.4 implements the PEP 446, socketpair returns
-                       # non-inheritable sockets)
-                       _set_inheritable(stdin_w.fileno(), False)
-               self._proc = subprocess.Popen(
-                       args, shell=shell, stdin=stdin, stdout=stdout, 
-                       universal_newlines=False, bufsize=bufsize, **kwargs)
-               if stdin_w is not None:
-                       stdin.close()
-                       self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', 
 class AbstractChildWatcher(_AbstractChildWatcher):
        def add_child_handler(self, pid, callback, *args):
                raise NotImplementedError()

Reply via email to