Kevin Amado <kam...@fluidattacks.com> added the comment:

Yeah definitely it must be workers

I've experimented a lot about it and finally found something with an interface
similar to asyncio.as_completed

- You control concurrency with `workers` parameter
- You upper-bound memory usage with `worker_greediness` parameter
- Results are yielded back in the same order of the input
- Results are yielded! so working over an unknown-sized iterable of 
`awaitables` like map(func, thins_to_do) or a generator is no problem

The implementation may no be the cleanest as it uses some Events and N Queues 
but
it's proven in tests (keep reading to the end) that the overhead is negligible

    def resolve(
        awaitables: Iterable[Awaitable[T]],
        *,
        workers: int = 1024,
        worker_greediness: int = 0,
    ) -> Iterable[Awaitable[T]]:
        """
        if workers < 1:
            raise ValueError('workers must be >= 1')
        if worker_greediness < 0:
            raise ValueError('worker_greediness must be >= 0')

        if hasattr(awaitables, '__len__'):
            # A small optimization can be done if we know the length
            workers = min(workers, len(awaitables))

        loop = asyncio.get_event_loop()
        store: Dict[int, asyncio.Queue] = {}
        stream, stream_copy = tee(enumerate(awaitables))
        stream_finished = asyncio.Event()
        workers_up = asyncio.Event()
        workers_tasks: Dict[int, asyncio.Task] = {}

        async def worker() -> None:
            done: asyncio.Queue = asyncio.Queue(worker_greediness)
            for index, awaitable in stream:
                store[index] = done
                future = loop.create_future()
                future.set_result(await schedule(awaitable, loop=loop))
                await done.put(future)
                workers_up.set()
            workers_up.set()
            stream_finished.set()

        async def start_workers() -> None:
            for index in range(workers):
                if stream_finished.is_set():
                    break
                workers_tasks[index] = asyncio.create_task(worker())
                await force_loop_cycle()
            await workers_up.wait()

        async def get_one(index: int) -> Awaitable[T]:
            if not workers_tasks:
                await start_workers()

            awaitable = await store.pop(index).get()
            result: Awaitable[T] = (await awaitable).result()
            return result

        for index, _ in stream_copy:
            yield cast(Awaitable[T], get_one(index))


Some notes on the usage and outputs are part of the docs of this library:

https://kamadorueda.github.io/aioextensions/#aioextensions.resolve

Here are some proofs about the implementation:
- There is bound-concurrency:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L138
- Workers are always busy even if one of them is processing a long-running job:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L131
- Many workers do not add overhead:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L156
- Errors can be caught on retrieval:
  
https://github.com/kamadorueda/aioextensions/blob/4a38cb343ceb0f931b655634195f311745e2db32/test/test___init__.py#L128

----------
nosy: +kamado2

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue41505>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to