On 08/05/2014 21:44, Ian Kelly wrote: > On May 8, 2014 12:57 PM, "Andrew McLean" <li...@andros.org.uk > <mailto:li...@andros.org.uk>> wrote: > > So far so good. However, I thought this would be an opportunity to > > explore concurrent.futures and to see whether it offered any benefits > > over the more explicit approach discussed above. The problem I am having > > is that all the discussions I can find of the use of concurrent.futures > > show use with toy problems involving just a few tasks. The url > > downloader in the documentation is typical, it proceeds as follows: > > > > 1. Get an instance of concurrent.futuresThreadPoolExecutor > > 2. Submit a few tasks to the executer > > 3. Iterate over the results using concurrent.futures.as_completed > > > > That's fine, but I suspect that isn't a helpful pattern if I have a very > > large number of tasks. In my case I could run out of memory if I tried > > submitting all of the tasks to the executor before processing any of the > > results. > > I thought that ThreadPoolExecutor.map would handle this transparently > if you passed it a lazy iterable such as a generator. From my testing > though, that seems not to be the case; with a generator of 100000 > items and a pool of 2 workers, the entire generator was consumed > before any results were returned. > > > I'm guessing what I want to do is, submit tasks in batches of perhaps a > > few hundred, iterate over the results until most are complete, then > > submit some more tasks and so on. I'm struggling to see how to do this > > elegantly without a lot of messy code just there to do "bookkeeping". > > This can't be an uncommon scenario. Am I missing something, or is this > > just not a job suitable for futures? > > I don't think it needs to be "messy". Something like this should do > the trick, I think: > > from concurrent.futures import * > from itertools import islice > > def batched_pool_runner(f, iterable, pool, batch_size): > it = iter(iterable) > # Submit the first batch of tasks. > futures = set(pool.submit(f, x) for x in islice(it, batch_size)) > while futures: > done, futures = wait(futures, return_when=FIRST_COMPLETED) > # Replenish submitted tasks up to the number that completed. > futures.update(pool.submit(f, x) for x in islice(it, len(done))) > yield from done
That worked very nicely, thank you. I think that would make a good recipe, whether for the documentation or elsewhere. I suspect I'm not the only person that would benefit from something to bridge the gap between a toy example and something practical. Andrew
-- https://mail.python.org/mailman/listinfo/python-list