[issue29701] Add close method to queue.Queue
New submission from Mathias Fröjdman: queue.Queue should have a close() method. The result of calling the method would be to raise a new exception - queue.Closed, for any subsequent calls to Queue.put, and after the queue is empty, also for Queue.get. Why: To allow producers (callers of Queue.put) to signal there will be no more items, and consumers may stop asking for more by calling Queue.get. Currently the opposite (ie. waiting until all produced items/"tasks" have been consumed and handled) is possible with Queue.task_done() and Queue.join(). This functionality is useful in both application and library code. For example in AMQP, a server may push new messages over a TCP connection to a consumer, which translates into the library calling Queue.put for received messages, and the application using the library calling Queue.get to receive any new messages. The consumer may however be cancelled at any time, or the TCP connection closed and the Queue.get caller signaled that there will be no more messages. With Queue.close() that is easy - without it one needs to wrap the Queue.get calls). In an application context where a KeyboardInterrupt should lead to closing the application cleanly, being able to call Queue.close(), catching the Closed exception in any consumers (some of which may be in other threads) and exiting cleanly makes the job that much easier. A common pattern in working around this issue is to call Queue.put(None), and treat a None from Queue.get() as a signal to clean up. This works well when one knows there is at most one consumer. In the case of many consumers, one needs to wrap the Queue and for example add another None to the queue in consumers to not leave any remaining get() call waiting indefinitely. This pattern occurs even in the standard library: https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Lib/concurrent/futures/thread.py#L141 If accepting this proposal, a corresponding change should be made to asyncio.Queue. I have a tentative implementation (no tests or doc outside the module) in https://github.com/mwfrojdman/cpython/blob/closeable_queue/Lib/queue.py The Queue.close() method has an optional argument clear (default False), which clears the queue of items if set to true. This is useful for example when exiting an application, and one doesn't want consumers to get any more items before being raised a Closed exception. The changes are backwards compatible for users of the class, ie. if Queue.close() is not called, the behavior stays intact. Because of the clear argument, there is a new private method Queue._clear(), which does the actual clearing of the queue representation. Subclasses for which self.queue.clear() doesn't cut it, need to override it before .close(True) works. Background: https://github.com/python/asyncio/pull/415#issuecomment-263658986 -- components: Library (Lib) messages: 288828 nosy: mwf priority: normal severity: normal status: open title: Add close method to queue.Queue type: enhancement versions: Python 3.7 ___ Python tracker <http://bugs.python.org/issue29701> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue24795] Make event loops with statement context managers
New submission from Mathias Fröjdman: Since asyncio event loops have to be closed nowadays, it would be pretty convenient and pythonic to make BaseEventLoop a context manager that calls self.close() in __exit__ the same way as contextlib.closing() does it. Example: import asyncio with asyncio.get_event_loop() as loop: loop.run_until_complete(func()) instead of import asyncio from contextlib import closing with closing(asyncio.get_event_loop()) as loop: loop.run_until_complete(func()) or event the bulkier import asyncio loop = asyncio.get_event_loop() try: loop.run_until_complete(func()) finally: loop.close() The attached patch applies to Python 3.5b4's asyncio/base_events.py -- components: asyncio files: patch messages: 248032 nosy: Mathias Fröjdman, gvanrossum, haypo, yselivanov priority: normal severity: normal status: open title: Make event loops with statement context managers type: enhancement versions: Python 3.5 Added file: http://bugs.python.org/file40129/patch ___ Python tracker <http://bugs.python.org/issue24795> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue24795] Make event loops with statement context managers
Mathias Fröjdman added the comment: (Just noticed http://bugs.python.org/issue19860, which I originally failed to notice when just searching for "asyncio loop" and not context manager) Anyway, in recent Python/asyncio versions, failing to close the event loop before exiting whole the process can cause problems, so I think the case is valid now. -- ___ Python tracker <http://bugs.python.org/issue24795> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue23837] read pipe transport tries to resume reading after loop is gone
New submission from Mathias Fröjdman: Script attached which reproduces the issue. Steps to reproduce: 0) Use python 3.4.3 on Linux. Does not repro with 3.4.0 or 3.4.2. 1) Create a child process with asyncio.create_child_exec and stdout=PIPE 2) loop yield from child.read(n) (i used n=2048, any other positive value < StreamRead._limit ought to work too) 3) Write >= StreamRead._limit * 2 + 1 (by default 2**17+1) bytes from child process and exit File referenced below: asyncio/streams.py feed_data is called when data arrives from the child process. Having more than 2 * self._limit bytes in self._buffer will lead to StreamReader pausing reading on line 372. feed_eof sets self._eof to True, but that value is not checked in relevant sections later. Child process exits, which will lead to self._loop = None being set apparently on line 405 of _UnixReadPipeTransport._call_connection_lost in asyncio/unix_events.py (could not find any other location where it would be set to None). After a number of read()s, self._maybe_resume_transport() is called when len(self._buffer) <= self._limit (ie. <= 64k left in buffer). self._paused will evaluate true too, so self._transport.resume_reading() is called on line 349. That will call self._loop.add_reader(self._fileno, self._read_ready) on line 364 of asyncio/unix_events.py. self._loop is None, so and AttributeError is raised when None has no add_reader attribute: Traceback (most recent call last): File "child_reader.py", line 29, in print('read {} bytes from child'.format(loop.run_until_complete(fail( File "/home/frojma/python-3.4.3/lib/python3.4/asyncio/base_events.py", line 316, in run_until_complete return future.result() File "/home/frojma/python-3.4.3/lib/python3.4/asyncio/futures.py", line 275, in result raise self._exception File "/home/frojma/python-3.4.3/lib/python3.4/asyncio/tasks.py", line 238, in _step result = next(coro) File "child_reader.py", line 15, in fail chunk = yield from child.stdout.read(2048) File "/home/frojma/python-3.4.3/lib/python3.4/asyncio/streams.py", line 478, in read self._maybe_resume_transport() File "/home/frojma/python-3.4.3/lib/python3.4/asyncio/streams.py", line 354, in _maybe_resume_transport self._transport.resume_reading() File "/home/frojma/python-3.4.3/lib/python3.4/asyncio/unix_events.py", line 364, in resume_reading self._loop.add_reader(self._fileno, self._read_ready) -- components: asyncio files: child_reader.py messages: 239779 nosy: gvanrossum, haypo, mwf, yselivanov priority: normal severity: normal status: open title: read pipe transport tries to resume reading after loop is gone type: crash versions: Python 3.4 Added file: http://bugs.python.org/file38778/child_reader.py ___ Python tracker <http://bugs.python.org/issue23837> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com