Andrey Paramonov <para...@acdlabs.ru> added the comment:
Hello! Below is updated implementation containing more consistent error handling. The main rough edges encountered: 1. asyncio.Queue alone proved insufficient for precise control of limit, as asyncio.create_task() schedules created Task() immediately and it may start executing before being added to queue (thus leading to limit+1 tasks running). Additional semaphore is used to tackle that. 2. When exception, other running tasks have to be cancel()ed and then await'ed to ensure all tasks are successfully finished by the time igather exits. Just cancel()ing proved not sufficient. 3. When exception, unscheduled coroutines have to be wrapped with asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never awaited". But maybe there is a more elegant way to suppress this warning for a coroutine? In my client code I didn't so far encounter "an implicit requirement that back pressure from the consumer should be handled", but it should be possible to implement separately and quite straightforwardly, with the help of asyncio.Queue. async def igather(coros, limit=None): coros = iter(coros) buf = asyncio.Queue() sem = asyncio.Semaphore(limit or math.inf) async def submit(coros, buf): while True: await sem.acquire() try: # TODO: additionally support async iterators coro = next(coros) except StopIteration: break task = asyncio.create_task(coro) buf.put_nowait(task) await buf.put(None) async def consume(buf): while True: task = await buf.get() if task: v = await asyncio.wait_for(task, None) sem.release() yield v else: break submit_task = asyncio.create_task(submit(coros, buf)) try: async for result in consume(buf): yield result except: submit_task.cancel() # cancel scheduled while not buf.empty(): task = buf.get_nowait() if task: task.cancel() try: await task except: pass # cancel pending for coro in coros: asyncio.create_task(coro).cancel() raise Shall I go ahead and prepare a PR with docs and tests? ---------- _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue30782> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com