Caleb Hattingh <caleb.hatti...@gmail.com> added the comment:

The traditional way this done is with a finite number of workers pulling work 
off a queue. This is straightforward to set up with builtins:


from uuid import uuid4
import asyncio, random


async def worker(q: asyncio.Queue):
    while job := await q.get():
        print(f"working on job {job}")
        await asyncio.sleep(random.random() * 5)
        print(f"Completed job {job}")
        q.task_done()


async def scheduler(q, max_concurrency=5):
    workers = []
    for i in range(max_concurrency):
        w = asyncio.create_task(worker(q))
        workers.append(w)

    try:
        await asyncio.gather(*workers)
    except asyncio.CancelledError:
        pass


async def main():
    jobs = [uuid4().hex for i in range(1_000)]
    q = asyncio.Queue()
    for job in jobs:
        await q.put(job)

    t = asyncio.create_task(scheduler(q))
    await q.join()
    t.cancel()
    await t


if __name__ == "__main__":
    asyncio.run(main())


A neater API would be something like our Executor API in concurrent.futures, 
but we don't yet have one of those for asyncio.  I started playing with some 
ideas for this a while ago here: https://github.com/cjrh/coroexecutor

Alas, I did not yet add a "max_workers" parameter so that isn't available in my 
lib yet. I discuss options for implementing that in an issue: 
https://github.com/cjrh/coroexecutor/issues/2

I believe that the core devs are working on a feature that might also help for 
this, called "task groups", but I haven't been following closely so I don't 
know where that's at currently.

----------
nosy: +cjrh

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue41505>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to