On Fri, May 8, 2015 at 8:08 PM, Michael Welle <mwe012...@gmx.net> wrote: > Hello, > > what's wrong with [0]? As num_tasks gets higher proc.join() seems to > block forever. First I thought the magical frontier is around 32k tasks, > but then it seemed to work with 40k tasks. Now I'm stuck around 7k > tasks. I think I do something fundamentally wrong, but I can't find it. > > Regards > hmw > > [0] http://pastebin.com/adfBYgY9
Your code's small enough to include inline, so I'm doing that: #!/usr/bin/python3 # -*- coding: utf-8 -*- from multiprocessing import Process, Queue class Foo(Process): def __init__(self, task_queue, result_queue): Process.__init__(self) self.result_queue = result_queue self.task_queue = task_queue def run(self): while True: n = self.task_queue.get() if n is None: break self.result_queue.put(1) return def main(): results = Queue() tasks = Queue() procs = [] num_procs = 8 num_tasks = 8000 for i in range(num_procs): proc = Foo(tasks, results) procs.append(proc) for proc in procs: proc.start() for i in range(num_tasks): tasks.put(i) for i in range(num_procs): tasks.put(None) for proc in procs: print("join") proc.join() while not results.empty(): result = results.get() print('Result: {}'.format(result)) if __name__ == '__main__': main() # -- end of code -- First thing I'd look at is the default queue size. If your result queue fills up, all processes will block until something starts retrieving results. If you really want to have all your results stay in the queue like that, you may need to specify a huge queue size, which may cost you a lot of memory; much better would be to have each job post something on the result queue when it's done, and then you wait till they're all done: from multiprocessing import Process, Queue def foo(task_queue, result_queue): while True: n = task_queue.get() if n is None: break result_queue.put(1) # Make sure None is not a possible actual result # Otherwise, create an object() to use as a flag. result_queue.put(None) def feed_tasks(num_tasks, num_procs, tasks): for i in range(num_tasks): tasks.put(i) for i in range(num_procs): tasks.put(None) def main(): results = Queue() tasks = Queue() num_procs = 8 num_tasks = 8000 procs = [Process(target=foo, args=(tasks, results)) for i in range(num_procs)] for proc in procs: proc.start() Process(target=feed_tasks, args=(num_tasks, num_procs, tasks)).start() while num_procs: result = results.get() if result is None: num_procs -= 1 else: print('Result: {}'.format(result)) for proc in procs: print("join") proc.join() if __name__ == '__main__': main() I've also made a few other changes (for instance, no need to subclass Process just to pass args), but the most important parts are a result_queue.put() just before the process ends, and switching the order of the result-queue-pump and process-join loops. That still might block, though, at the point where the tasks are being put onto the queue; so I've spun that off into its own process. (It might not be necessary, depending on how your tasks work.) But I tested this on 200,000 tasks (with the printing of results replaced with a simple counter), and it worked fine, churning through the work in about ten seconds. As a general rule, queues need to have both ends operating simultaneously, otherwise you're likely to have them blocking. In theory, your code should all work with ridiculously low queue sizes; the only cost will be concurrency (since you'd forever be waiting for the queue, so your tasks will all be taking turns). I tested this by changing the Queue() calls to Queue(1), and the code took about twice as long to complete. :) Hope that helps! ChrisA -- https://mail.python.org/mailman/listinfo/python-list