Raymond Hettinger wrote:
> [Filipe Fernandes]
>> The reasons for using JoinableQueue I think are obvious.  I want to
>> block the main processing using queue.join() until the tasks that have
>> been placed on the queue have been finished by the worker processes.
>>
>> I can't be the only one experiencing this (besides Brian)... are there
>> others who ran into this?  Are there work arounds (besides a
>> home-brewed one) ?
>
> Before Queue.task_done() and Queue.task_join() were added, other
> idioms were used.
>
> One technique is to use a second queue to track incomplete tasks.
>
> # adding work
> unfinished_tasks.put(None)
> q.put(task)
>
>
> # doing work
> t = q.get()
> f(t)
> unfinished_tasks.get()
>
>
> # waiting for unfinished tasks to complete
> while unfinished_tasks.qsize():
>     sleep(0.1)

Thanks Raymond... yours is by far is the simplest and should have been an obvious solution. I didn't want to stray too far from what I had and this fits the bill.

In case others are curious...

I had looked at using the example in
http://docs.python.org/library/multiprocessing.html#using-a-remote-manager

to use the traditional Queue from module Queue which includes the join and task_done method calls. But creating a server process/thread just for that is rather over-kill.

I also looked at using process pools asynchronously and waiting for the iterators to come back to determine if jobs were completed.

But your solution is <embarrassingly> the simplest to implement. And easy enough to create a composite class containing the two queues to simulate the real one (although I have not tried the following, I'm not in the office right now).


class JoinableQueue(object):
    def __init__(*args, **kwargs):
        self.__tasks = Queue()
        self.__queue = Queue(*args, **kwargs)

    def put(self, *args, **kwargs):
        self.__tasks.put(None)
        self.__queue.put(*args, **kwargs)

    def get(self, *args, **kwargs):
        return self.__queue.get(*args, **kwargs)

    def task_done():
        try:
           self.__tasks.get(False)
        except Queue.Empty:
           raise ValueError('task_done called too many times')

    def join():
        while not self.__tasks.empty():
            sleep(0.1)

[Add methods to simulate as required....]

filipe

ps: Thanks Raymond for the quick reply... and I feel rather apologetic for having bothered the list with this :S

--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to