I would like to get feedback on an idea I had for simplifying the use of queues with daemon consumer threads
Sometimes, I launch one or more consumer threads that wait for a task to enter a queue and then work on the task. A recurring problem is that I sometimes need to know if all of the tasks have been completed so I can exit or do something with the result. If each thread only does a single task, I can use t.join() to wait until the task is done. However, if the thread stays alive and waits for more Queue entries, then there doesn't seem to be a good way to tell when all the processing is done. So, the idea is to create a subclass of Queue that increments a counter when objects are enqueued, that provides a method for worker threads to decrement the counter when the work is done, and that offers a blocking join() method that waits until the counter is zero. There's are working implementation at: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/475160 Here is an example: def worker(): while 1: task = q.get() do_work(task) q.task_done() # setup queue and launch worker threads q = Queue() for i in range(num_worker_threads): Thread(target=worker).start() # load the tasks for elem in tasklist: q.put(elem) # block until they are done q.join() # process with other stuff . . . There are some competing approaches. One is to attach sentinel objects to the end of the line as a way for consumer threads to know that they should shut down. Then a regular t.join() can be used to block until the consumers threads have shut-down. This approach is straight-forward, but it depends on 1) complicating the consumer logic to include sentinel detection and thread shut-down, 2) complicating the producer logic to append one sentinel for each consumer when the data stream is done, 3) actually knowing when the data stream is done. The latter is a problem in one of my programs because the consumer sometimes uses a divide-and-conquer approach, resulting in two new subtasks being loaded to the queue. IOW, the only way to know when all the inputs have been handled is to have the queue empty and all consumers inactive (i.e. processing complete). def worker(): while 1: task = q.get() if task is None: # check for sentinel return do_work(task) # setup queue and launch worker threads q = Queue() threads = [] for i in range(num_worker_threads): t = Thread(target=worker) threads.append(t) t.start() # load the tasks for elem in tasklist: q.put(elem) for i in range(num_worker_threads): q.put(None) # load sentinels # block until they are done for t in threads: t.join() # process with other stuff . . . Another approach is to set-up a second queue for results. Each consumer loads a result when it is done with processing an input. The producer or main thread then becomes responsible for matching all inputs to the corresponding results. This isn't complicated in-practice but it isn't pretty either: def worker(): while 1: task = tasks_in.get() do_work(task) tasks_out.put(None) # enqueue a None result when done with task # setup queues and launch worker threads tasks_in = Queue() tasks_out = Queue() for i in range(num_worker_threads): Thread(target=worker).start() # load the tasks n = len(tasklist) for elem in tasklist: tasks_in.put(elem) # block until they are done for i in range(n): tasks_out.get() # process with other stuff . . . This approach becomes messier if the task loading occurs at multiple points inside a more complex producer function. Also, it doesn't work well with the worker thread divide-and-conquer situation discussed above. -- http://mail.python.org/mailman/listinfo/python-list