[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed
Andrey Paramonov added the comment: Might as_completed() be considered a low-level API, but as of Python 3.7 there are seemingly no ready alternatives to achieve proposed behavior. All of asyncio.gather(), asyncio.wait(), asyncio.as_completed() expect awaitables list of limited size; doing something like https://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp is not straightforward. A function that takes iterator/async iterator of tasks and is itself generator/async generator is very much wanted, something in the spirit of (but more efficient?) async def igather(tasks, limit=None): pending = set() while True: for task in islice(tasks, limit - len(pending) if limit else None): pending.add(task) if pending: done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) for task in done: yield task else: break It is an open question whether such function should yield results in the task submission order. Albeit useful, it's a bit harder to implement and (most importantly) has terrible worst-case memory behavior. See also: https://bugs.python.org/issue33533 https://github.com/h2non/paco/issues/38 -- nosy: +aparamon ___ Python tracker <https://bugs.python.org/issue30782> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed
Andrey Paramonov added the comment: > an implicit requirement that back pressure from the consumer should be > handled (i.e. if whoever's iterating through "async for fut in > as_completed(...)" is too slow, then the tasks should pause until it catches > up) No, I don't think it is required or desired to be handled. My initial sketch was imprecise: it's better to asynchronously yield task results, not "completed task" futures. This way, no additional buffer needed, all error handling can be consolidated inside `igather()`, and that's actually more compatible with `asyncio.gather()`. I.e, instead of yield next_fut use yield await next_fut -- ___ Python tracker <https://bugs.python.org/issue30782> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed
Andrey Paramonov added the comment: Hello! Below is updated implementation containing more consistent error handling. The main rough edges encountered: 1. asyncio.Queue alone proved insufficient for precise control of limit, as asyncio.create_task() schedules created Task() immediately and it may start executing before being added to queue (thus leading to limit+1 tasks running). Additional semaphore is used to tackle that. 2. When exception, other running tasks have to be cancel()ed and then await'ed to ensure all tasks are successfully finished by the time igather exits. Just cancel()ing proved not sufficient. 3. When exception, unscheduled coroutines have to be wrapped with asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never awaited". But maybe there is a more elegant way to suppress this warning for a coroutine? In my client code I didn't so far encounter "an implicit requirement that back pressure from the consumer should be handled", but it should be possible to implement separately and quite straightforwardly, with the help of asyncio.Queue. async def igather(coros, limit=None): coros = iter(coros) buf = asyncio.Queue() sem = asyncio.Semaphore(limit or math.inf) async def submit(coros, buf): while True: await sem.acquire() try: # TODO: additionally support async iterators coro = next(coros) except StopIteration: break task = asyncio.create_task(coro) buf.put_nowait(task) await buf.put(None) async def consume(buf): while True: task = await buf.get() if task: v = await asyncio.wait_for(task, None) sem.release() yield v else: break submit_task = asyncio.create_task(submit(coros, buf)) try: async for result in consume(buf): yield result except: submit_task.cancel() # cancel scheduled while not buf.empty(): task = buf.get_nowait() if task: task.cancel() try: await task except: pass # cancel pending for coro in coros: asyncio.create_task(coro).cancel() raise Shall I go ahead and prepare a PR with docs and tests? -- ___ Python tracker <https://bugs.python.org/issue30782> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com
[issue34905] Cannot assign memoryview values from array.array
New submission from Andrey Paramonov : Currently, memoryview values can be assigned from all bytes-like objects (https://docs.python.org/3/glossary.html#term-bytes-like-object) except byte array.array: import array mview = memoryview(bytearray(b'hello')) mview[:] = bytes(b'hello') # success mview[:] = bytearray(b'hello') # success mview[:] = memoryview(b'hello') # success mview[:] = array.array('b', b'hello') # fail mview[:] = array.array('b', b'hello') ValueError: memoryview assignment: lvalue and rvalue have different structures -- components: Library (Lib) messages: 327133 nosy: aparamon priority: normal severity: normal status: open title: Cannot assign memoryview values from array.array type: behavior versions: Python 3.6 ___ Python tracker <https://bugs.python.org/issue34905> ___ ___ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com