package python3-gbulb
tag 895726 + pending
tag 904388 + pending
thanks
Hi,
I have just uploaded 0.6.1-0.1 to DELAYED/0-days per [1]. The NMU diff
is attached. This is a new upstream version because it fixes a bunch of
bugs as well as fixing compatibility with newer Python versions.
Kind regards
Philipp Kern
[1]
https://www.debian.org/doc/manuals/developers-reference/ch05.en.html#nmu-guidelinesdiff -Nru python-gbulb-0.5.3/CHANGELOG.md python-gbulb-0.6.1/CHANGELOG.md
--- python-gbulb-0.5.3/CHANGELOG.md 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/CHANGELOG.md 2018-08-10 05:44:52.000000000 +0200
@@ -1,4 +1,21 @@
# Change Log
+## [0.6.1] - 2018-08-09
+
+### Fixed
+ - Support for 3.7, for real this time. Thank you Philippe Normand!
+
+## [0.6.0] - 2018-08-06
+
+### Fixed
+ - Support for 3.7.
+
+### Added
+ - Preliminary Windows support. Please note that using subprocesses is known
+ not to work. Patches welcome.
+
+### Changed
+ - Support for 3.4 and below has been dropped.
+
## [0.5.3] - 2017-01-27
### Fixed
diff -Nru python-gbulb-0.5.3/debian/changelog python-gbulb-0.6.1/debian/changelog
--- python-gbulb-0.5.3/debian/changelog 2017-02-23 10:19:29.000000000 +0100
+++ python-gbulb-0.6.1/debian/changelog 2018-09-18 14:27:08.000000000 +0200
@@ -1,3 +1,11 @@
+python-gbulb (0.6.1-0.1) unstable; urgency=medium
+
+ * Non-maintainer upload
+ * New upstream release (Closes: #904388)
+ * Add dependency on python3-gi (Closes: #895726)
+
+ -- Philipp Kern <pk...@debian.org> Tue, 18 Sep 2018 14:27:08 +0200
+
python-gbulb (0.5.3-2) unstable; urgency=medium
* Fix suggested doc package name
diff -Nru python-gbulb-0.5.3/debian/control python-gbulb-0.6.1/debian/control
--- python-gbulb-0.5.3/debian/control 2017-02-23 10:19:29.000000000 +0100
+++ python-gbulb-0.6.1/debian/control 2018-09-18 14:27:08.000000000 +0200
@@ -10,7 +10,7 @@
Package: python3-gbulb
Architecture: all
-Depends: ${python3:Depends}, ${misc:Depends}
+Depends: python3-gi, ${python3:Depends}, ${misc:Depends}
Suggests: python-gbulb-doc
Description: PEP 3156 event loop based on GLib (Python 3)
Gbulb is a Python library that implements a PEP 3156 interface for the GLib
diff -Nru python-gbulb-0.5.3/.drone.yml python-gbulb-0.6.1/.drone.yml
--- python-gbulb-0.5.3/.drone.yml 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/.drone.yml 2018-08-10 05:44:52.000000000 +0200
@@ -1,9 +1,8 @@
matrix:
PYTHON_VERSION:
- - 3.4.3
- - 3.4.6
- - 3.5.3
- - 3.6.0
+ - 3.5.5
+ - 3.6.6
+ - 3.7.0
pipeline:
build:
diff -Nru python-gbulb-0.5.3/gbulb/glib_events.py python-gbulb-0.6.1/gbulb/glib_events.py
--- python-gbulb-0.5.3/gbulb/glib_events.py 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/gbulb/glib_events.py 2018-08-10 05:44:52.000000000 +0200
@@ -1,18 +1,64 @@
"""PEP 3156 event loop based on GLib"""
+import asyncio
import os
import signal
+import socket
+import sys
import threading
-from asyncio import events, tasks, unix_events
+import weakref
+from asyncio import constants, events, futures, sslproto, tasks
from gi.repository import GLib, Gio
+from . import transports
+
+
+if hasattr(os, 'set_blocking'):
+ def _set_nonblocking(fd):
+ os.set_blocking(fd, False)
+elif sys.platform == 'win32':
+ def _set_nonblocking(fd):
+ pass
+else:
+ import fcntl
+
+ def _set_nonblocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
__all__ = ['GLibEventLoop', 'GLibEventLoopPolicy']
-class GLibChildWatcher(unix_events.AbstractChildWatcher):
+# The Windows `asyncio` implementation doesn't actually use this, but
+# `glib` abstracts so nicely over this that we can use it on any platform
+if sys.platform == "win32":
+ class AbstractChildWatcher:
+ pass
+else:
+ from asyncio.unix_events import AbstractChildWatcher
+
+
+class GLibChildWatcher(AbstractChildWatcher):
def __init__(self):
self._sources = {}
+ self._handles = {}
+
+ # On windows on has to open a process handle for the given PID number
+ # before it's possible to use GLib's `child_watch_add` on it
+ if sys.platform == "win32":
+
+ def _create_handle_for_pid(self, pid):
+ import _winapi
+ return _winapi.OpenProcess(0x00100400, 0, pid)
+
+ def _close_process_handle(self, handle):
+ import _winapi
+ _winapi.CloseHandle(handle)
+ else:
+ _create_handle_for_pid = lambda self, pid: pid
+ _close_process_handle = lambda self, pid: None
def attach_loop(self, loop):
# just ignored
@@ -21,21 +67,28 @@
def add_child_handler(self, pid, callback, *args):
self.remove_child_handler(pid)
- source = GLib.child_watch_add(0, pid, self.__callback__)
- self._sources[pid] = source, callback, args
+ handle = self._create_handle_for_pid(pid)
+ source = GLib.child_watch_add(0, handle, self.__callback__)
+ self._sources[pid] = source, callback, args, handle
+ self._handles[handle] = pid
def remove_child_handler(self, pid):
try:
- source = self._sources.pop(pid)[0]
+ source, callback, args, handle = self._sources.pop(pid)
+ assert self._handles.pop(handle) == pid
except KeyError:
return False
+ self._close_process_handle(handle)
GLib.source_remove(source)
return True
def close(self):
- for source, callback, args in self._sources.values():
+ for source, callback, args, handle in self._sources.values():
+ self._close_process_handle(handle)
GLib.source_remove(source)
+ self._sources = {}
+ self._handles = {}
def __enter__(self):
return self
@@ -43,21 +96,22 @@
def __exit__(self, a, b, c):
pass
- def __callback__(self, pid, status):
-
+ def __callback__(self, handle, status):
try:
- source, callback, args = self._sources.pop(pid)
+ pid = self._handles.pop(handle)
+ source, callback, args, handle = self._sources.pop(pid)
except KeyError:
return
+ self._close_process_handle(handle)
GLib.source_remove(source)
- if os.WIFSIGNALED(status):
+ if hasattr(os, "WIFSIGNALED") and os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
- elif os.WIFEXITED(status):
+ elif hasattr(os, "WIFEXITED") and os.WIFEXITED(status):
returncode = os.WEXITSTATUS(status)
- #FIXME: Hack for adjusting invalid status returned by GLIB
+ # FIXME: Hack for adjusting invalid status returned by GLIB
# Looks like there is a bug in glib or in pygobject
if returncode > 128:
returncode = 128 - returncode
@@ -68,11 +122,15 @@
class GLibHandle(events.Handle):
- __slots__ = ('_source', '_repeat')
+ __slots__ = ('_source', '_repeat', '_context')
- def __init__(self, *, loop, source, repeat, callback, args):
+ def __init__(self, *, loop, source, repeat, callback, args, context=None):
super().__init__(callback, args, loop)
+ if sys.version_info[:2] >= (3, 7) and context is None:
+ import contextvars
+ context = contextvars.copy_context()
+ self._context = context
self._source = source
self._repeat = repeat
loop._handlers.add(self)
@@ -97,254 +155,644 @@
return self._repeat
-class BaseGLibEventLoop(unix_events.SelectorEventLoop):
- """GLib base event loop
-
- This class handles only the operations related to Glib.MainContext objects.
+if sys.platform == "win32":
+ class GLibBaseEventLoopPlatformExt:
+ def __init__(self):
+ pass
+
+ def close(self):
+ pass
+else:
+ from asyncio import unix_events
- Glib.MainLoop operations are implemented in the derived classes.
- """
-
- def __init__(self):
- self._readers = {}
- self._writers = {}
- self._sighandlers = {}
- self._chldhandlers = {}
- self._handlers = set()
-
- super().__init__()
-
- def run_until_complete(self, future, **kw):
- """Run the event loop until a Future is done.
+ class GLibBaseEventLoopPlatformExt(unix_events.SelectorEventLoop):
+ """
+ Semi-hack that allows us to leverage the existing implementation of Unix domain sockets
+ without having to actually implement a selector based event loop.
- Return the Future's result, or raise its exception.
+ Note that both `__init__` and `close` DO NOT and SHOULD NOT ever call their parent
+ implementation!
"""
+ def __init__(self):
+ self._sighandlers = {}
- def stop(f):
- self.stop()
+ def close(self):
+ for sig in list(self._sighandlers):
+ self.remove_signal_handler(sig)
- future = tasks.async(future, loop=self)
- future.add_done_callback(stop)
- try:
- self.run_forever(**kw)
- finally:
- future.remove_done_callback(stop)
+ def add_signal_handler(self, sig, callback, *args):
+ self.remove_signal_handler(sig)
- if not future.done():
- raise RuntimeError('Event loop stopped before Future completed.')
+ s = GLib.unix_signal_source_new(sig)
+ if s is None:
+ # Show custom error messages for signal that are uncatchable
+ if sig == signal.SIGKILL:
+ raise RuntimeError("cannot catch SIGKILL")
+ elif sig == signal.SIGSTOP:
+ raise RuntimeError("cannot catch SIGSTOP")
+ else:
+ raise ValueError("signal not supported")
+
+ assert sig not in self._sighandlers
+
+ self._sighandlers[sig] = GLibHandle(
+ loop=self,
+ source=s,
+ repeat=True,
+ callback=callback,
+ args=args)
+
+ def remove_signal_handler(self, sig):
+ try:
+ self._sighandlers.pop(sig).cancel()
+ return True
+ except KeyError:
+ return False
- return future.result()
- def run_forever(self):
- """Run the event loop until stop() is called."""
- if self.is_running():
- raise RuntimeError(
- "Recursively calling run_forever is forbidden. "
- "To recursively run the event loop, call run().")
+class _BaseEventLoop(asyncio.BaseEventLoop):
+ """
+ Extra inheritance step that needs to be inserted so that we only ever indirectly inherit from
+ `asyncio.BaseEventLoop`. This is necessary as the Unix implementation will also indirectly
+ inherit from that class (thereby creating diamond inheritance).
+ Python permits and fully supports diamond inheritance so this is not a problem. However it
+ is, on the other hand, not permitted to inherit from a class both directly *and* indirectly â
+ hence we add this intermediate class to make sure that can never happen (see
+ https://stackoverflow.com/q/29214888 for a minimal example a forbidden inheritance tree) and
+ https://www.python.org/download/releases/2.3/mro/ for some extensive documentation of the
+ allowed inheritance structures in python.
+ """
- try:
- self.run()
- finally:
- self.stop()
- def is_running(self):
- """Return whether the event loop is currently running."""
- return self._running
+class GLibBaseEventLoop(_BaseEventLoop, GLibBaseEventLoopPlatformExt):
+ def __init__(self, context=None):
+ self._handlers = set()
- def stop(self):
- """Stop the event loop as soon as reasonable.
+ self._accept_futures = {}
+ self._context = context or GLib.MainContext()
+ self._selector = self
+ self._transports = weakref.WeakValueDictionary()
+ self._readers = {}
+ self._writers = {}
- Exactly how soon that is may depend on the implementation, but
- no more I/O callbacks should be scheduled.
- """
- raise NotImplementedError() # pragma: no cover
+ self._channels = weakref.WeakValueDictionary()
+
+ _BaseEventLoop.__init__(self)
+ GLibBaseEventLoopPlatformExt.__init__(self)
def close(self):
- for fd in list(self._readers):
- self._remove_reader(fd)
+ for future in self._accept_futures.values():
+ future.cancel()
+ self._accept_futures.clear()
- for fd in list(self._writers):
- self.remove_writer(fd)
+ for s in list(self._handlers):
+ s.cancel()
+ self._handlers.clear()
- for sig in list(self._sighandlers):
- self.remove_signal_handler(sig)
+ GLibBaseEventLoopPlatformExt.close(self)
+ _BaseEventLoop.close(self)
- for pid in list(self._chldhandlers):
- self._remove_child_handler(pid)
+ def select(self, timeout=None):
+ self._context.acquire()
+ try:
+ if timeout is None:
+ self._context.iteration(True)
+ elif timeout <= 0:
+ self._context.iteration(False)
+ else:
+ # Schedule fake callback that will trigger an event and cause the loop to terminate
+ # after the given number of seconds
+ handle = GLibHandle(
+ loop=self,
+ source=GLib.Timeout(timeout*1000),
+ repeat=False,
+ callback=lambda: None,
+ args=())
+ try:
+ self._context.iteration(True)
+ finally:
+ handle.cancel()
+ return () # Available events are dispatched immediately and not returned
+ finally:
+ self._context.release()
- for s in list(self._handlers):
- s.cancel()
+ def _make_socket_transport(self, sock, protocol, waiter=None, *,
+ extra=None, server=None):
+ """Create socket transport."""
+ return transports.SocketTransport(self, sock, protocol, waiter, extra, server)
+
+ def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
+ *, server_side=False, server_hostname=None,
+ extra=None, server=None, ssl_handshake_timeout=None):
+ """Create SSL transport."""
+ # sslproto._is_sslproto_available was removed from asyncio, starting from Python 3.7.
+ if hasattr(sslproto, '_is_sslproto_available') and not sslproto._is_sslproto_available():
+ raise NotImplementedError("Proactor event loop requires Python 3.5"
+ " or newer (ssl.MemoryBIO) to support "
+ "SSL")
+ # Support for the ssl_handshake_timeout keyword argument was added in Python 3.7.
+ extra_protocol_kwargs = {}
+ if sys.version_info[:2] >= (3, 7):
+ extra_protocol_kwargs['ssl_handshake_timeout'] = ssl_handshake_timeout
+
+ ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
+ server_side, server_hostname, **extra_protocol_kwargs)
+ transports.SocketTransport(self, rawsock, ssl_protocol, extra=extra, server=server)
+ return ssl_protocol._app_transport
+
+ def _make_datagram_transport(self, sock, protocol,
+ address=None, waiter=None, extra=None):
+ """Create datagram transport."""
+ return transports.DatagramTransport(self, sock, protocol, address, waiter, extra)
+
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ """Create read pipe transport."""
+ channel = self._channel_from_fileobj(pipe)
+ return transports.PipeReadTransport(self, channel, protocol, waiter, extra)
+
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ """Create write pipe transport."""
+ channel = self._channel_from_fileobj(pipe)
+ return transports.PipeWriteTransport(self, channel, protocol, waiter, extra)
+
+ @asyncio.coroutine
+ def _make_subprocess_transport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
+ """Create subprocess transport."""
+ with events.get_child_watcher() as watcher:
+ waiter = asyncio.Future(loop=self)
+ transport = transports.SubprocessTransport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra, **kwargs)
+
+ watcher.add_child_handler(transport.get_pid(), self._child_watcher_callback, transport)
+ try:
+ yield from waiter
+ except Exception as exc:
+ err = exc
+ else:
+ err = None
+ if err is not None:
+ transport.close()
+ yield from transport._wait()
+ raise err
+
+ return transport
+
+ def _child_watcher_callback(self, pid, returncode, transport):
+ self.call_soon_threadsafe(transport._process_exited, returncode)
+
+ def _write_to_self(self):
+ self._context.wakeup()
+
+ def _process_events(self, event_list):
+ """Process selector events."""
+ pass # This is already done in `.select()`
+
+ def _start_serving(self, protocol_factory, sock,
+ sslcontext=None, server=None, backlog=100,
+ ssl_handshake_timeout=getattr(constants, 'SSL_HANDSHAKE_TIMEOUT', 60.0)):
+ self._transports[sock.fileno()] = server
+
+ def server_loop(f=None):
+ try:
+ if f is not None:
+ (conn, addr) = f.result()
+ protocol = protocol_factory()
+ if sslcontext is not None:
+ # FIXME: add ssl_handshake_timeout to this call once 3.7 support is merged in.
+ self._make_ssl_transport(
+ conn, protocol, sslcontext, server_side=True,
+ extra={'peername': addr}, server=server)
+ else:
+ self._make_socket_transport(
+ conn, protocol,
+ extra={'peername': addr}, server=server)
+ if self.is_closed():
+ return
+ f = self.sock_accept(sock)
+ except OSError as exc:
+ if sock.fileno() != -1:
+ self.call_exception_handler({
+ 'message': 'Accept failed on a socket',
+ 'exception': exc,
+ 'socket': sock,
+ })
+ sock.close()
+ except futures.CancelledError:
+ sock.close()
+ else:
+ self._accept_futures[sock.fileno()] = f
+ f.add_done_callback(server_loop)
+
+ self.call_soon(server_loop)
- super().close()
+ def _stop_serving(self, sock):
+ if sock.fileno() in self._accept_futures:
+ self._accept_futures[sock.fileno()].cancel()
+ sock.close()
def _check_not_coroutine(self, callback, name):
+ """Check whether the given callback is a coroutine or not."""
from asyncio import coroutines
if (coroutines.iscoroutine(callback) or
coroutines.iscoroutinefunction(callback)):
raise TypeError("coroutines cannot be used with {}()".format(name))
- # Methods scheduling callbacks. All these return Handles.
- def call_soon(self, callback, *args):
- self._check_not_coroutine(callback, 'call_soon')
- source = GLib.Idle()
+ def _ensure_fd_no_transport(self, fd):
+ """Ensure that the given file descriptor is NOT used by any transport.
- # XXX: we set the source's priority to high for the following scenario:
- #
- # - loop.sock_connect() begins asynchronous connection
- # - this adds a write callback to detect when the connection has
- # completed
- # - this write callback sets the result of a future
- # - future.Future schedules callbacks with call_later.
- # - the callback for this future removes the write callback
- # - GLib.Idle() has a much lower priority than that of the GSource for
- # the writer, so it never gets scheduled.
- source.set_priority(GLib.PRIORITY_HIGH)
+ Adding another reader to a fd that is already being waited for causes a hang on Windows."""
+ try:
+ transport = self._transports[fd]
+ except KeyError:
+ pass
+ else:
+ if not hasattr(transport, "is_closing") or not transport.is_closing():
+ raise RuntimeError('File descriptor {!r} is used by transport {!r}'
+ .format(fd, transport))
+
+ def _channel_from_socket(self, sock):
+ """Create GLib IOChannel for the given file object.
+
+ This function will cache weak references to `GLib.Channel` objects
+ it previously has created to prevent weird issues that can occur
+ when two GLib channels point to the same underlying socket resource.
- return GLibHandle(
- loop=self,
- source=source,
- repeat=False,
- callback=callback,
- args=args)
+ On windows this will only work for network sockets.
+ """
+ fd = self._fileobj_to_fd(sock)
- call_soon_threadsafe = call_soon
+ sock_id = id(sock)
+ try:
+ channel = self._channels[sock_id]
+ except KeyError:
+ if sys.platform == "win32":
+ channel = GLib.IOChannel.win32_new_socket(fd)
+ else:
+ channel = GLib.IOChannel.unix_new(fd)
- def call_later(self, delay, callback, *args):
- self._check_not_coroutine(callback, 'call_later')
+ # disabling buffering requires setting the encoding to None
+ channel.set_encoding(None)
+ channel.set_buffered(False)
- return GLibHandle(
- loop=self,
- source=GLib.Timeout(delay*1000) if delay > 0 else GLib.Idle(),
- repeat=False,
- callback=callback,
- args=args)
+ self._channels[sock_id] = channel
+ return channel
- def call_at(self, when, callback, *args):
- self._check_not_coroutine(callback, 'call_at')
+ def _channel_from_fileobj(self, fileobj):
+ """Create GLib IOChannel for the given file object.
- return self.call_later(when - self.time(), callback, *args)
+ On windows this will only work for files and pipes returned GLib's C library.
+ """
+ fd = self._fileobj_to_fd(fileobj)
- def time(self):
- return GLib.get_monotonic_time() / 1000000
+ # pipes have been shown to be blocking here, so we'll do someone
+ # else's job for them.
+ _set_nonblocking(fd)
- # FIXME: these functions are not available on windows
- def _add_reader(self, fd, callback, *args):
- if not isinstance(fd, int):
- fd = fd.fileno()
+ if sys.platform == "win32":
+ channel = GLib.IOChannel.win32_new_fd(fd)
+ else:
+ channel = GLib.IOChannel.unix_new(fd)
+
+ # disabling buffering requires setting the encoding to None
+ channel.set_encoding(None)
+ channel.set_buffered(False)
+ return channel
+
+ def _fileobj_to_fd(self, fileobj):
+ """Obtain the raw file descriptor number for the given file object."""
+ if isinstance(fileobj, int):
+ fd = fileobj
+ else:
+ try:
+ fd = int(fileobj.fileno())
+ except (AttributeError, TypeError, ValueError):
+ raise ValueError("Invalid file object: {!r}".format(fileobj))
+ if fd < 0:
+ raise ValueError("Invalid file descriptor: {}".format(fd))
+ return fd
+
+ def _delayed(self, source, callback=None, *args):
+ """Create a future that will complete after the given GLib Source object has become ready
+ and the data it tracks has been processed."""
+ future = None
+
+ def handle_ready(*args):
+ try:
+ if callback:
+ (done, result) = callback(*args)
+ else:
+ (done, result) = (True, None)
+
+ if done:
+ future.set_result(result)
+ future.handle.cancel()
+ except Exception as error:
+ if not future.cancelled():
+ future.set_exception(error)
+ future.handle.cancel()
+
+ # Create future and properly wire up it's cancellation with the
+ # handle's cancellation machinery
+ future = asyncio.Future(loop=self)
+ future.handle = GLibHandle(
+ loop=self,
+ source=source,
+ repeat=True,
+ callback=handle_ready,
+ args=args
+ )
+ return future
+
+ def _socket_handle_errors(self, sock):
+ """Raise exceptions for error states (SOL_ERROR) on the given socket object."""
+ errno = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if errno != 0:
+ if sys.platform == "win32":
+ msg = socket.errorTab.get(errno, "Error {0}".format(errno))
+ raise OSError(errno, "[WinError {0}] {1}".format(errno, msg), None, errno)
+ else:
+ raise OSError(errno, os.strerror(errno))
- self._remove_reader(fd)
+ ###############################
+ # Low-level socket operations #
+ ###############################
+ def sock_connect(self, sock, address):
+ # Request connection on socket (it is expected that `sock` is already non-blocking)
+ try:
+ sock.connect(address)
+ except BlockingIOError:
+ pass
+
+ # Create glib IOChannel for socket and wait for it to become writable
+ channel = self._channel_from_socket(sock)
+ source = GLib.io_create_watch(channel, GLib.IO_OUT)
+
+ def sock_finish_connect(sock):
+ self._socket_handle_errors(sock)
+ return (True, sock)
+ return self._delayed(source, sock_finish_connect, sock)
+
+ def sock_accept(self, sock):
+ channel = self._channel_from_socket(sock)
+ source = GLib.io_create_watch(channel, GLib.IO_IN)
+
+ def sock_connection_received(sock):
+ return (True, sock.accept())
+
+ @asyncio.coroutine
+ def accept_coro(future, conn):
+ # Coroutine closing the accept socket if the future is cancelled
+ try:
+ return (yield from future)
+ except futures.CancelledError:
+ sock.close()
+ raise
+
+ future = self._delayed(source, sock_connection_received, sock)
+ return self.create_task(accept_coro(future, sock))
+
+ def sock_recv(self, sock, nbytes, flags=0):
+ channel = self._channel_from_socket(sock)
+ read_func = lambda channel, nbytes: sock.recv(nbytes, flags)
+ return self._channel_read(channel, nbytes, read_func)
+
+ def sock_recvfrom(self, sock, nbytes, flags=0):
+ channel = self._channel_from_socket(sock)
+ read_func = lambda channel, nbytes: sock.recvfrom(nbytes, flags)
+ return self._channel_read(channel, nbytes, read_func)
+
+ def sock_sendall(self, sock, buf, flags=0):
+ channel = self._channel_from_socket(sock)
+ write_func = lambda channel, buf: sock.send(buf, flags)
+ return self._channel_write(channel, buf, write_func)
+
+ def sock_sendallto(self, sock, buf, addr, flags=0):
+ channel = self._channel_from_socket(sock)
+ write_func = lambda channel, buf: sock.sendto(buf, flags, addr)
+ return self._channel_write(channel, buf, write_func)
+
+ #####################################
+ # Low-level GLib.Channel operations #
+ #####################################
+ def _channel_read(self, channel, nbytes, read_func=None):
+ if read_func is None:
+ read_func = lambda channel, nbytes: channel.read(nbytes)
+
+ source = GLib.io_create_watch(channel, GLib.IO_IN | GLib.IO_HUP)
+
+ def channel_readable(read_func, channel, nbytes):
+ return (True, read_func(channel, nbytes))
+ return self._delayed(source, channel_readable, read_func, channel, nbytes)
+
+ def _channel_write(self, channel, buf, write_func=None):
+ if write_func is None:
+ # note: channel.write doesn't raise BlockingIOError, instead it
+ # returns 0
+ # gi.overrides.GLib.write has an isinstance(buf, bytes) check, so
+ # we can't give it a bytearray or a memoryview.
+ write_func = lambda channel, buf: channel.write(bytes(buf))
+ buflen = len(buf)
- s = GLib.unix_fd_source_new(fd, GLib.IO_IN)
+ # Fast-path: If there is enough room in the OS buffer all data can be written synchronously
+ try:
+ nbytes = write_func(channel, buf)
+ except BlockingIOError:
+ nbytes = 0
+ else:
+ if nbytes >= len(buf):
+ # All data was written synchronously in one go
+ result = asyncio.Future(loop=self)
+ result.set_result(nbytes)
+ return result
+
+ # Chop off the initially transmitted data and store result
+ # as a bytearray for easier future modification
+ buf = bytearray(buf[nbytes:])
+
+ # Send the remaining data asynchronously as the socket becomes writable
+ source = GLib.io_create_watch(channel, GLib.IO_OUT)
+
+ def channel_writable(buflen, write_func, channel, buf):
+ nbytes = write_func(channel, buf)
+ if nbytes >= len(buf):
+ return (True, buflen)
+ else:
+ del buf[0:nbytes]
+ return (False, buflen)
+ return self._delayed(source, channel_writable, buflen, write_func, channel, buf)
+
+ def add_reader(self, fileobj, callback, *args):
+ fd = self._fileobj_to_fd(fileobj)
+ self._ensure_fd_no_transport(fd)
+
+ self.remove_reader(fd)
+ channel = self._channel_from_socket(fd)
+ source = GLib.io_create_watch(channel, GLib.IO_IN | GLib.IO_HUP | GLib.IO_ERR | GLib.IO_NVAL)
assert fd not in self._readers
self._readers[fd] = GLibHandle(
loop=self,
- source=s,
+ source=source,
repeat=True,
callback=callback,
args=args)
- def _remove_reader(self, fd):
- if not isinstance(fd, int):
- fd = fd.fileno()
+ def remove_reader(self, fileobj):
+ fd = self._fileobj_to_fd(fileobj)
+ self._ensure_fd_no_transport(fd)
try:
self._readers.pop(fd).cancel()
return True
-
except KeyError:
return False
- def _add_writer(self, fd, callback, *args):
- if not isinstance(fd, int):
- fd = fd.fileno()
-
- self._remove_writer(fd)
-
- s = GLib.unix_fd_source_new(fd, GLib.IO_OUT)
+ def add_writer(self, fileobj, callback, *args):
+ fd = self._fileobj_to_fd(fileobj)
+ self._ensure_fd_no_transport(fd)
+
+ self.remove_writer(fd)
+ channel = self._channel_from_socket(fd)
+ source = GLib.io_create_watch(channel, GLib.IO_OUT | GLib.IO_ERR | GLib.IO_NVAL)
assert fd not in self._writers
-
self._writers[fd] = GLibHandle(
loop=self,
- source=s,
+ source=source,
repeat=True,
callback=callback,
args=args)
- def _remove_writer(self, fd):
- if not isinstance(fd, int):
- fd = fd.fileno()
+ def remove_writer(self, fileobj):
+ fd = self._fileobj_to_fd(fileobj)
+ self._ensure_fd_no_transport(fd)
try:
self._writers.pop(fd).cancel()
return True
-
- except KeyError:
- return False
-
- # Disgusting backwards compatibility hack to ensure gbulb keeps working
- # with Python versions that don't have http://bugs.python.org/issue28369
- if not hasattr(unix_events.SelectorEventLoop, '_add_reader'):
- add_reader = _add_reader
- add_writer = _add_writer
- remove_writer = _remove_writer
- remove_reader = _remove_reader
-
- # Signal handling.
-
- def add_signal_handler(self, sig, callback, *args):
- self._check_signal(sig)
- self.remove_signal_handler(sig)
-
- s = GLib.unix_signal_source_new(sig)
- if s is None:
- if sig == signal.SIGKILL:
- raise RuntimeError("cannot catch SIGKILL")
- else:
- raise ValueError("signal not supported")
-
- assert sig not in self._sighandlers
-
- self._sighandlers[sig] = GLibHandle(
- loop=self,
- source=s,
- repeat=True,
- callback=callback,
- args=args)
-
- def remove_signal_handler(self, sig):
- self._check_signal(sig)
- try:
- self._sighandlers.pop(sig).cancel()
- return True
-
except KeyError:
return False
-class GLibEventLoop(BaseGLibEventLoop):
+class GLibEventLoop(GLibBaseEventLoop):
def __init__(self, *, context=None, application=None):
- self._context = context or GLib.MainContext()
self._application = application
self._running = False
+ self._argv = None
+ super().__init__(context)
if application is None:
self._mainloop = GLib.MainLoop(self._context)
- super().__init__()
+
+ def is_running(self):
+ return self._running
def run(self):
recursive = self.is_running()
+ if not recursive and hasattr(events, "_get_running_loop") and events._get_running_loop():
+ raise RuntimeError(
+ 'Cannot run the event loop while another loop is running')
+
+ if not recursive:
+ self._running = True
+ if hasattr(events, "_set_running_loop"):
+ events._set_running_loop(self)
- self._running = True
try:
if self._application is not None:
- self._application.run(None)
+ self._application.run(self._argv)
else:
self._mainloop.run()
finally:
if not recursive:
self._running = False
+ if hasattr(events, "_set_running_loop"):
+ events._set_running_loop(None)
+
+ def run_until_complete(self, future, **kw):
+ """Run the event loop until a Future is done.
+
+ Return the Future's result, or raise its exception.
+ """
+
+ def stop(f):
+ self.stop()
+
+ future = tasks.ensure_future(future, loop=self)
+ future.add_done_callback(stop)
+ try:
+ self.run_forever(**kw)
+ finally:
+ future.remove_done_callback(stop)
+
+ if not future.done():
+ raise RuntimeError('Event loop stopped before Future completed.')
+
+ return future.result()
+
+ def run_forever(self, application=None, argv=None):
+ """Run the event loop until stop() is called."""
+ if application is not None:
+ self.set_application(application)
+ if argv is not None:
+ self.set_argv(argv)
+
+ if self.is_running():
+ raise RuntimeError(
+ "Recursively calling run_forever is forbidden. "
+ "To recursively run the event loop, call run().")
+
+ if hasattr(self, '_mainloop') and hasattr(self._mainloop, "_quit_by_sigint"):
+ del self._mainloop._quit_by_sigint
+
+ try:
+ self.run()
+ finally:
+ self.stop()
+
+ # Methods scheduling callbacks. All these return Handles.
+ def call_soon(self, callback, *args, context=None):
+ self._check_not_coroutine(callback, 'call_soon')
+ source = GLib.Idle()
+
+ source.set_priority(GLib.PRIORITY_DEFAULT)
+
+ return GLibHandle(
+ loop=self,
+ source=source,
+ repeat=False,
+ callback=callback,
+ args=args,
+ context=context,
+ )
+
+ call_soon_threadsafe = call_soon
+
+ def call_later(self, delay, callback, *args, context=None):
+ self._check_not_coroutine(callback, 'call_later')
+
+ return GLibHandle(
+ loop=self,
+ source=GLib.Timeout(delay*1000) if delay > 0 else GLib.Idle(),
+ repeat=False,
+ callback=callback,
+ args=args,
+ context=context,
+ )
+
+ def call_at(self, when, callback, *args, context=None):
+ self._check_not_coroutine(callback, 'call_at')
+
+ return self.call_later(
+ when - self.time(), callback, *args, context=context)
+
+ def time(self):
+ return GLib.get_monotonic_time() / 1000000
def stop(self):
"""Stop the inner-most invocation of the event loop.
@@ -360,13 +808,6 @@
else:
self._mainloop.quit()
- def run_forever(self, application=None):
- """Run the event loop until stop() is called."""
-
- if application is not None:
- self.set_application(application)
- super().run_forever()
-
def set_application(self, application):
if not isinstance(application, Gio.Application):
raise TypeError("application must be a Gio.Application object")
@@ -378,6 +819,10 @@
self._policy._application = application
del self._mainloop
+ def set_argv(self, argv):
+ """Sets argv to be passed to Gio.Application.run()"""
+ self._argv = argv
+
class GLibEventLoopPolicy(events.AbstractEventLoopPolicy):
"""Default GLib event loop policy
@@ -387,7 +832,7 @@
threads by default have no event loop.
"""
- #TODO add a parameter to synchronise with GLib's thread default contexts
+ # TODO add a parameter to synchronise with GLib's thread default contexts
# (g_main_context_push_thread_default())
def __init__(self, application=None):
self._default_loop = None
@@ -395,7 +840,7 @@
self._watcher_lock = threading.Lock()
self._watcher = None
- self._policy = unix_events.DefaultEventLoopPolicy()
+ self._policy = asyncio.DefaultEventLoopPolicy()
self._policy.new_event_loop = self.new_event_loop
self.get_event_loop = self._policy.get_event_loop
self.set_event_loop = self._policy.set_event_loop
@@ -437,7 +882,6 @@
return self._default_loop
def _new_default_loop(self):
- l = GLibEventLoop(
- context=GLib.main_context_default(), application=self._application)
+ l = GLibEventLoop(context=GLib.main_context_default(), application=self._application)
l._policy = self
return l
diff -Nru python-gbulb-0.5.3/gbulb/transports.py python-gbulb-0.6.1/gbulb/transports.py
--- python-gbulb-0.5.3/gbulb/transports.py 1970-01-01 01:00:00.000000000 +0100
+++ python-gbulb-0.6.1/gbulb/transports.py 2018-08-10 05:44:52.000000000 +0200
@@ -0,0 +1,407 @@
+import collections
+import socket
+import subprocess
+from asyncio import base_subprocess, futures, transports
+
+
+class BaseTransport(transports.BaseTransport):
+ def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None):
+ if hasattr(self, '_sock'):
+ return # The joys of multiple inheritance
+
+ transports.BaseTransport.__init__(self, extra)
+
+ self._loop = loop
+ self._sock = sock
+ self._protocol = protocol
+ self._server = server
+ self._closing = False
+ self._closing_delayed = False
+ self._closed = False
+ self._cancelable = set()
+
+ if sock is not None:
+ self._loop._transports[sock.fileno()] = self
+
+ if self._server is not None:
+ self._server._attach()
+
+ def transport_async_init():
+ self._protocol.connection_made(self)
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_result(None)
+ self._loop.call_soon(transport_async_init)
+
+ def close(self):
+ self._closing = True
+ if not self._closing_delayed:
+ self._force_close(None)
+
+ def is_closing(self):
+ return self._closing
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._force_close(exc)
+
+ def _force_close(self, exc):
+ if self._closed:
+ return
+ self._closed = True
+
+ # Stop all running tasks
+ for cancelable in self._cancelable:
+ cancelable.cancel()
+ self._cancelable.clear()
+
+ self._loop.call_soon(self._force_close_async, exc)
+
+ def _force_close_async(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ if self._sock is not None:
+ self._sock.close()
+ self._sock = None
+ if self._server is not None:
+ self._server._detach()
+ self._server = None
+
+
+class ReadTransport(BaseTransport, transports.ReadTransport):
+ max_size = 256 * 1024
+
+ def __init__(self, *args, **kwargs):
+ BaseTransport.__init__(self, *args, **kwargs)
+
+ self._paused = False
+ self._read_fut = None
+ self._loop.call_soon(self._loop_reading)
+
+ def pause_reading(self):
+ if self._closing:
+ raise RuntimeError('Cannot pause_reading() when closing')
+ if self._paused:
+ raise RuntimeError('Already paused')
+ self._paused = True
+
+ def resume_reading(self):
+ if not self._paused:
+ raise RuntimeError('Not paused')
+ self._paused = False
+ if self._closing:
+ return
+ self._loop.call_soon(self._loop_reading, self._read_fut)
+
+ def _close_read(self):
+ # Separate method to allow `Transport.close()` to call us without
+ # us delegating to `BaseTransport.close()`
+ if self._read_fut is not None:
+ self._read_fut.cancel()
+ self._read_fut = None
+
+ def close(self):
+ self._close_read()
+
+ super().close()
+
+ def _create_read_future(self, size):
+ return self._loop.sock_recv(self._sock, size)
+
+ def _submit_read_data(self, data):
+ if data:
+ self._protocol.data_received(data)
+ else:
+ keep_open = self._protocol.eof_received()
+ if not keep_open:
+ self.close()
+
+ def _loop_reading(self, fut=None):
+ if self._paused:
+ return
+ data = None
+
+ try:
+ if fut is not None:
+ assert self._read_fut is fut or (self._read_fut is None and self._closing)
+ if self._read_fut in self._cancelable:
+ self._cancelable.remove(self._read_fut)
+ self._read_fut = None
+ data = fut.result() # Deliver data later in "finally" clause
+
+ if self._closing:
+ # Since `.close()` has been called we ignore any read data
+ data = None
+ return
+
+ if data == b'':
+ # No need to reschedule on end-of-file
+ return
+
+ # Reschedule a new read
+ self._read_fut = self._create_read_future(self.max_size)
+ self._cancelable.add(self._read_fut)
+ except ConnectionAbortedError as exc:
+ if not self._closing:
+ self._fatal_error(exc, 'Fatal read error on pipe transport')
+ except ConnectionResetError as exc:
+ self._force_close(exc)
+ except OSError as exc:
+ self._fatal_error(exc, 'Fatal read error on pipe transport')
+ except futures.CancelledError:
+ if not self._closing:
+ raise
+ except futures.InvalidStateError:
+ self._read_fut = fut
+ self._cancelable.add(self._read_fut)
+ else:
+ self._read_fut.add_done_callback(self._loop_reading)
+ finally:
+ if data is not None:
+ self._submit_read_data(data)
+
+
+class WriteTransport(BaseTransport, transports._FlowControlMixin):
+ _buffer_factory = bytearray
+
+ def __init__(self, loop, *args, **kwargs):
+ transports._FlowControlMixin.__init__(self, None, loop)
+ BaseTransport.__init__(self, loop, *args, **kwargs)
+
+ self._buffer = self._buffer_factory()
+ self._buffer_empty_callbacks = set()
+ self._write_fut = None
+ self._eof_written = False
+
+ def abort(self):
+ self._force_close(None)
+
+ def can_write_eof(self):
+ return True
+
+ def get_write_buffer_size(self):
+ return len(self._buffer)
+
+ def _close_write(self):
+ if self._write_fut is not None:
+ self._closing_delayed = True
+
+ def transport_write_done_callback():
+ self._closing_delayed = False
+ self.close()
+ self._buffer_empty_callbacks.add(transport_write_done_callback)
+
+ def close(self):
+ self._close_write()
+
+ super().close()
+
+ def write(self, data):
+ if self._eof_written:
+ raise RuntimeError('write_eof() already called')
+
+ # Ignore empty data sets or requests to write to a dying connection
+ if not data or self._closing:
+ return
+
+ if self._write_fut is None: # No data is currently buffered or being sent
+ self._loop_writing(data=data)
+ else:
+ self._buffer_add_data(data)
+ self._maybe_pause_protocol() # From _FlowControlMixin
+
+ def _create_write_future(self, data):
+ return self._loop.sock_sendall(self._sock, data)
+
+ def _buffer_add_data(self, data):
+ self._buffer.extend(data)
+
+ def _buffer_pop_data(self):
+ if len(self._buffer) > 0:
+ data = self._buffer
+ self._buffer = bytearray()
+ return data
+ else:
+ return None
+
+ def _loop_writing(self, fut=None, data=None):
+ try:
+ assert fut is self._write_fut
+ if self._write_fut in self._cancelable:
+ self._cancelable.remove(self._write_fut)
+ self._write_fut = None
+
+ # Raise possible exception stored in `fut`
+ if fut:
+ fut.result()
+
+ # Use buffer as next data object if invoked from done callback
+ if data is None:
+ data = self._buffer_pop_data()
+
+ if not data:
+ if len(self._buffer_empty_callbacks) > 0:
+ for callback in self._buffer_empty_callbacks:
+ callback()
+ self._buffer_empty_callbacks.clear()
+
+ self._maybe_resume_protocol()
+ else:
+ self._write_fut = self._create_write_future(data)
+ self._cancelable.add(self._write_fut)
+ if not self._write_fut.done():
+ self._write_fut.add_done_callback(self._loop_writing)
+ self._maybe_pause_protocol()
+ else:
+ self._write_fut.add_done_callback(self._loop_writing)
+ except ConnectionResetError as exc:
+ self._force_close(exc)
+ except OSError as exc:
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+
+ def write_eof(self):
+ self.close()
+
+
+class Transport(ReadTransport, WriteTransport):
+ def __init__(self, *args, **kwargs):
+ ReadTransport.__init__(self, *args, **kwargs)
+ WriteTransport.__init__(self, *args, **kwargs)
+
+ # Set expected extra attributes (available through `.get_extra_info()`)
+ self._extra['socket'] = self._sock
+ try:
+ self._extra['sockname'] = self._sock.getsockname()
+ except (OSError, AttributeError):
+ pass
+ if 'peername' not in self._extra:
+ try:
+ self._extra['peername'] = self._sock.getpeername()
+ except (OSError, AttributeError) as error:
+ pass
+
+ def close(self):
+ # Need to invoke both the read's and the write's part of the transport `close` function
+ self._close_read()
+ self._close_write()
+
+ BaseTransport.close(self)
+
+
+class SocketTransport(Transport):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ def write_eof(self):
+ if self._closing or self._eof_written:
+ return
+ self._eof_written = True
+
+ if self._write_fut is None:
+ self._sock.shutdown(socket.SHUT_WR)
+ else:
+ def transport_write_eof_callback():
+ if not self._closing:
+ self._sock.shutdown(socket.SHUT_WR)
+ self._buffer_empty_callbacks.add(transport_write_eof_callback)
+
+
+class DatagramTransport(Transport, transports.DatagramTransport):
+ _buffer_factory = collections.deque
+
+ def __init__(self, loop, sock, protocol, address=None, *args, **kwargs):
+ self._address = address
+ super().__init__(loop, sock, protocol, *args, **kwargs)
+
+
+ def _create_read_future(self, size):
+ return self._loop.sock_recvfrom(self._sock, size)
+
+ def _submit_read_data(self, args):
+ (data, addr) = args
+
+ self._protocol.datagram_received(data, addr)
+
+ def _create_write_future(self, args):
+ (data, addr) = args
+
+ if self._address:
+ return self._loop.sock_sendall(self._sock, data)
+ else:
+ return self._loop.sock_sendallto(self._sock, data, addr)
+
+ def _buffer_add_data(self, args):
+ (data, addr) = args
+
+ self._buffer.append((bytes(data), addr))
+
+ def _buffer_pop_data(self):
+ if len(self._buffer) > 0:
+ return self._buffer.popleft()
+ else:
+ return None
+
+ def write(self, data, addr=None):
+ if not isinstance(data, (bytes, bytearray, memoryview)):
+ raise TypeError("data argument must be a bytes-like object, "
+ "not {!r}".format(type(data).__name__))
+
+ if not data or self.is_closing():
+ return
+
+ if self._address and addr not in (None, self._address):
+ raise ValueError("Invalid address: must be None or {0}".format(self._address))
+
+ # Do not copy the data yet, as we might be able to send it synchronously
+ super().write((data, addr))
+ sendto = write
+
+
+class PipeReadTransport(ReadTransport):
+ def __init__(self, loop, channel, protocol, waiter, extra):
+ self._channel = channel
+ self._channel.set_close_on_unref(True)
+ super().__init__(loop, None, protocol, waiter, extra)
+
+ def _create_read_future(self, size):
+ return self._loop._channel_read(self._channel, size)
+
+ def _force_close_async(self, exc):
+ try:
+ super()._force_close_async(exc)
+ finally:
+ self._channel.shutdown(True)
+
+
+class PipeWriteTransport(WriteTransport):
+ def __init__(self, loop, channel, protocol, waiter, extra):
+ self._channel = channel
+ self._channel.set_close_on_unref(True)
+ super().__init__(loop, None, protocol, waiter, extra)
+
+ def _create_write_future(self, data):
+ return self._loop._channel_write(self._channel, data)
+
+ def _force_close_async(self, exc):
+ try:
+ super()._force_close_async(exc)
+ finally:
+ self._channel.shutdown(True)
+
+
+class SubprocessTransport(base_subprocess.BaseSubprocessTransport):
+ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+ self._proc = subprocess.Popen(
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+ bufsize=bufsize, **kwargs)
diff -Nru python-gbulb-0.5.3/LICENSE python-gbulb-0.6.1/LICENSE
--- python-gbulb-0.5.3/LICENSE 1970-01-01 01:00:00.000000000 +0100
+++ python-gbulb-0.6.1/LICENSE 2018-08-10 05:44:52.000000000 +0200
@@ -0,0 +1,13 @@
+Copyright 2015 Nathan Hoad
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff -Nru python-gbulb-0.5.3/MANIFEST.in python-gbulb-0.6.1/MANIFEST.in
--- python-gbulb-0.5.3/MANIFEST.in 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/MANIFEST.in 2018-08-10 05:44:52.000000000 +0200
@@ -1,5 +1,6 @@
include AUTHORS.rst
include CHANGELOG.md
+include LICENSE
include README.md
recursive-include examples *
recursive-exclude examples *.pyc *.pyo
diff -Nru python-gbulb-0.5.3/README.md python-gbulb-0.6.1/README.md
--- python-gbulb-0.5.3/README.md 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/README.md 2018-08-10 05:44:52.000000000 +0200
@@ -19,7 +19,7 @@
[https://github.com/nathan-hoad/gbulb](https://github.com/nathan-hoad/gbulb)
## Requirements
-- python3.4+ or python3.3 with [asyncio][asyncio]
+- python3.5+
- pygobject
- glib
- gtk+3 (optional)
@@ -46,6 +46,11 @@
loop = asyncio.get_event_loop()
loop.run_forever(application=my_gapplication_object)
+
+### Waiting on a signal asynchronously
+
+See examples/wait_signal.py
+
## Known issues
- Windows is not supported, sorry. If you are interested in this, please help
@@ -94,5 +99,4 @@
[PEP3156]: http://www.python.org/dev/peps/pep-3156/
-[asyncio]: https://pypi.python.org/pypi/asyncio
[glibloop]: https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html
diff -Nru python-gbulb-0.5.3/setup.py python-gbulb-0.6.1/setup.py
--- python-gbulb-0.5.3/setup.py 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/setup.py 2018-08-10 05:44:52.000000000 +0200
@@ -6,26 +6,26 @@
from distutils.core import setup
-setup(name='gbulb',
- version='0.5.3',
- description='GLib event loop for tulip (PEP 3156)',
- author='Nathan Hoad',
- author_email='nat...@getoffmalawn.com',
- license='Apache 2.0',
- url='http://github.com/nathan-hoad/gbulb',
- packages=['gbulb'],
- long_description="""Gbulb is a python library that implements a PEP 3156 interface for the GLib main event loop. It is designed to be used together with the tulip reference implementation.""",
- classifiers=[
+setup(
+ name='gbulb',
+ version='0.6.1',
+ description='GLib event loop for tulip (PEP 3156)',
+ author='Nathan Hoad',
+ author_email='nat...@getoffmalawn.com',
+ license='Apache 2.0',
+ url='http://github.com/nathan-hoad/gbulb',
+ packages=['gbulb'],
+ long_description="""Gbulb is a python library that implements a PEP 3156 interface for the GLib main event loop. It is designed to be used together with the tulip reference implementation.""",
+ classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Operating System :: POSIX",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.3",
- "Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
"Topic :: Software Development :: Libraries :: Python Modules",
- ]
+ ],
+ python_requires='>3.5'
)
-
diff -Nru python-gbulb-0.5.3/tests/test_glib_events.py python-gbulb-0.6.1/tests/test_glib_events.py
--- python-gbulb-0.5.3/tests/test_glib_events.py 2017-02-22 12:07:07.000000000 +0100
+++ python-gbulb-0.6.1/tests/test_glib_events.py 2018-08-10 05:44:52.000000000 +0200
@@ -1,12 +1,15 @@
import asyncio
+import sys
import pytest
-from unittest import mock
-from gi.repository import Gio
+from unittest import mock, skipIf
+from gi.repository import Gio, GLib
from utils import glib_loop, glib_policy
+is_windows = (sys.platform == "win32")
+
class TestGLibEventLoopPolicy:
def test_set_child_watcher(self, glib_policy):
@@ -71,6 +74,7 @@
class TestBaseGLibEventLoop:
+ @skipIf(is_windows, "Unix signal handlers are not supported on Windows")
def test_add_signal_handler(self, glib_loop):
import os
import signal
@@ -90,6 +94,7 @@
assert called, 'signal handler didnt fire'
+ @skipIf(is_windows, "Unix signal handlers are not supported on Windows")
def test_remove_signal_handler(self, glib_loop):
import signal
@@ -101,15 +106,18 @@
# FIXME: it'd be great if we could actually try signalling the process
+ @skipIf(is_windows, "Unix signal handlers are not supported on Windows")
def test_remove_signal_handler_unhandled(self, glib_loop):
import signal
assert not glib_loop.remove_signal_handler(signal.SIGHUP)
+ @skipIf(is_windows, "Unix signal handlers are not supported on Windows")
def test_remove_signal_handler_sigkill(self, glib_loop):
import signal
with pytest.raises(RuntimeError):
glib_loop.add_signal_handler(signal.SIGKILL, None)
+ @skipIf(is_windows, "Unix signal handlers are not supported on Windows")
def test_remove_signal_handler_sigill(self, glib_loop):
import signal
with pytest.raises(ValueError):
@@ -126,6 +134,7 @@
with pytest.raises(RuntimeError):
glib_loop.run_until_complete(coro())
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_add_writer(self, glib_loop):
import os
rfd, wfd = os.pipe()
@@ -137,14 +146,14 @@
called = True
glib_loop.stop()
- os.close(rfd)
- os.close(wfd)
-
glib_loop.add_writer(wfd, callback)
glib_loop.run_forever()
+ os.close(rfd)
+ os.close(wfd)
assert called, 'callback handler didnt fire'
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_add_reader(self, glib_loop):
import os
rfd, wfd = os.pipe()
@@ -156,61 +165,68 @@
called = True
glib_loop.stop()
- os.close(rfd)
- os.close(wfd)
-
glib_loop.add_reader(rfd, callback)
+
+ os.write(wfd, b'hey')
+
glib_loop.run_forever()
+ os.close(rfd)
+ os.close(wfd)
+
assert called, 'callback handler didnt fire'
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_add_reader_file(self, glib_loop):
import os
rfd, wfd = os.pipe()
f = os.fdopen(rfd, 'r')
+ glib_loop.add_reader(f, None)
+
os.close(rfd)
os.close(wfd)
- glib_loop.add_reader(f, None)
-
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_add_writer_file(self, glib_loop):
import os
rfd, wfd = os.pipe()
f = os.fdopen(wfd, 'r')
+ glib_loop.add_writer(f, None)
+
os.close(rfd)
os.close(wfd)
- glib_loop.add_writer(f, None)
-
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_remove_reader(self, glib_loop):
import os
rfd, wfd = os.pipe()
f = os.fdopen(wfd, 'r')
+ glib_loop.add_reader(f, None)
+
os.close(rfd)
os.close(wfd)
- glib_loop.add_reader(f, None)
-
assert glib_loop.remove_reader(f)
assert not glib_loop.remove_reader(f.fileno())
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_remove_writer(self, glib_loop):
import os
rfd, wfd = os.pipe()
f = os.fdopen(wfd, 'r')
+ glib_loop.add_writer(f, None)
+
os.close(rfd)
os.close(wfd)
- glib_loop.add_writer(f, None)
-
assert glib_loop.remove_writer(f)
assert not glib_loop.remove_writer(f.fileno())
@@ -235,7 +251,8 @@
glib_loop.stop()
print(now, s)
- assert now - s <= 0.15
+
+ assert now - s <= 0.2
s = glib_loop.time()
@@ -272,46 +289,11 @@
assert items == sorted(items)
def test_call_soon_priority(self, glib_loop):
- def remover(fd):
- nonlocal removed
- assert glib_loop.remove_writer(fd)
-
- removed = True
- glib_loop.stop()
-
- def callback(fut):
- fut.set_result(None)
-
- def timeout():
- nonlocal timeout_occurred
- timeout_occurred = True
- glib_loop.stop()
-
- def run_test(fd):
- import asyncio
- from gi.repository import GLib
-
- fut = asyncio.Future(loop=glib_loop)
- fut.add_done_callback(lambda r: remover(fd))
- glib_loop.add_writer(fd, callback, fut)
- glib_loop.call_later(0.1, timeout)
- glib_loop.run_forever()
-
- assert not timeout_occurred
- assert removed
-
- import os
- rfd, wfd = os.pipe()
-
- removed = False
- timeout_occurred = False
-
- try:
- run_test(wfd)
- finally:
- os.close(rfd)
- os.close(wfd)
+ h = glib_loop.call_soon(lambda: None)
+ assert h._source.get_priority() == GLib.PRIORITY_DEFAULT
+ h.cancel()
+ @skipIf(is_windows, "Waiting on raw file descriptors only works for sockets on Windows")
def test_add_writer_multiple_calls(self, glib_loop):
import os
rfd, wfd = os.pipe()
@@ -447,6 +429,20 @@
glib_loop.set_application(app)
+@skipIf(is_windows, "Unix signal handlers are not supported on Windows")
+def test_signal_handling_with_multiple_invocations(glib_loop):
+ import os
+ import signal
+
+ glib_loop.call_later(0.01, os.kill, os.getpid(), signal.SIGINT)
+
+ with pytest.raises(KeyboardInterrupt):
+ glib_loop.run_forever()
+
+ glib_loop.run_until_complete(asyncio.sleep(0))
+
+
+@skipIf(is_windows, "Unix signal handlers are not supported on Windows")
def test_default_signal_handling(glib_loop):
import os
import signal
@@ -457,7 +453,7 @@
glib_loop.run_forever()
-def test_subprocesses(glib_loop):
+def test_subprocesses_read_after_closure(glib_loop):
import asyncio
import subprocess
@@ -479,4 +475,75 @@
out = yield from proc.stdout.read()
assert out == b'hey\n'
+ yield from proc.wait()
+
glib_loop.run_until_complete(coro())
+
+
+def test_subprocesses_readline_without_closure(glib_loop):
+ # needed to ensure events.get_child_watcher() returns the right object
+ import gbulb
+ gbulb.install()
+
+ @asyncio.coroutine
+ def run():
+ proc = yield from asyncio.create_subprocess_exec(
+ 'cat', stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE, loop=glib_loop)
+
+ try:
+ proc.stdin.write(b'test line\n')
+ yield from proc.stdin.drain()
+
+ line = yield from asyncio.wait_for(
+ proc.stdout.readline(), timeout=5, loop=glib_loop)
+ assert line == b'test line\n'
+
+ proc.stdin.close()
+
+ line = yield from asyncio.wait_for(
+ proc.stdout.readline(), timeout=5, loop=glib_loop)
+ assert line == b''
+ finally:
+ yield from proc.wait()
+
+ glib_loop.run_until_complete(run())
+
+
+def test_sockets(glib_loop):
+ server_done = asyncio.Event(loop=glib_loop)
+ server_success = False
+
+ @asyncio.coroutine
+ def cb(reader, writer):
+ nonlocal server_success
+
+ writer.write(b'cool data\n')
+ yield from writer.drain()
+
+ print('reading')
+ d = yield from reader.readline()
+ print('hrm', d)
+ server_success = d == b'thank you\n'
+
+ writer.close()
+ server_done.set()
+
+ @asyncio.coroutine
+ def run():
+ s = yield from asyncio.start_server(cb, '127.0.0.1', 0, loop=glib_loop)
+ reader, writer = yield from asyncio.open_connection('127.0.0.1', s.sockets[0].getsockname()[-1], loop=glib_loop)
+
+ d = yield from reader.readline()
+ assert d == b'cool data\n'
+
+ writer.write(b'thank you\n')
+ yield from writer.drain()
+
+ writer.close()
+
+ yield from server_done.wait()
+
+ assert server_success
+
+ glib_loop.run_until_complete(run())