Kevin Amado <kam...@fluidattacks.com> added the comment:
Yeah definitely it must be workers I've experimented a lot about it and finally found something with an interface similar to asyncio.as_completed - You control concurrency with `workers` parameter - You upper-bound memory usage with `worker_greediness` parameter - Results are yielded back in the same order of the input - Results are yielded! so working over an unknown-sized iterable of `awaitables` like map(func, thins_to_do) or a generator is no problem The implementation may no be the cleanest as it uses some Events and N Queues but it's proven in tests (keep reading to the end) that the overhead is negligible def resolve( awaitables: Iterable[Awaitable[T]], *, workers: int = 1024, worker_greediness: int = 0, ) -> Iterable[Awaitable[T]]: """ if workers < 1: raise ValueError('workers must be >= 1') if worker_greediness < 0: raise ValueError('worker_greediness must be >= 0') if hasattr(awaitables, '__len__'): # A small optimization can be done if we know the length workers = min(workers, len(awaitables)) loop = asyncio.get_event_loop() store: Dict[int, asyncio.Queue] = {} stream, stream_copy = tee(enumerate(awaitables)) stream_finished = asyncio.Event() workers_up = asyncio.Event() workers_tasks: Dict[int, asyncio.Task] = {} async def worker() -> None: done: asyncio.Queue = asyncio.Queue(worker_greediness) for index, awaitable in stream: store[index] = done future = loop.create_future() future.set_result(await schedule(awaitable, loop=loop)) await done.put(future) workers_up.set() workers_up.set() stream_finished.set() async def start_workers() -> None: for index in range(workers): if stream_finished.is_set(): break workers_tasks[index] = asyncio.create_task(worker()) await force_loop_cycle() await workers_up.wait() async def get_one(index: int) -> Awaitable[T]: if not workers_tasks: await start_workers() awaitable = await store.pop(index).get() result: Awaitable[T] = (await awaitable).result() return result for index, _ in stream_copy: yield cast(Awaitable[T], get_one(index)) Some notes on the usage and outputs are part of the docs of this library: https://kamadorueda.github.io/aioextensions/#aioextensions.resolve Here are some proofs about the implementation: - There is bound-concurrency: https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L138 - Workers are always busy even if one of them is processing a long-running job: https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L131 - Many workers do not add overhead: https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L156 - Errors can be caught on retrieval: https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L128 ---------- nosy: +kamado2 _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue41505> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com