New submission from Weize Xu <vet....@gmail.com>: I try to implement a "producer consumer" like design with mulitiprocessing module in my project, but I found that mulitiprocessing.Queue 's behavior is not as my expected. It seems Queue.get method return the end flag at the end of my queue too early.
I am not experienced at muliti-process programing, I am not sure it's a bug or not. For reproduce this, I have simplified my code as following: ``` Python import time import multiprocessing as mp def worker(task_queue, output_queue): while 1: i = task_queue.get() if i is None: print("Process-%d done"%mp.current_process().pid) task_queue.task_done() break output_queue.put(i+1) task_queue.task_done() def outputer(output_queue): c = 0 # val for count how many obj geted while 1: j = output_queue.get() if j is None: print("Process(output)-%d done"%mp.current_process().pid) c += 1 print("outputer get %d objects from the output_queue"%c) assert output_queue.empty(), "output queue should be empty here" break time.sleep(0.0001) # do output here c += 1 if __name__ == "__main__": task_queue = mp.JoinableQueue() #output_queue = mp.JoinableQueue() output_queue = mp.Queue() workers = [mp.Process(target=worker, args=(task_queue, output_queue)) for i in range(10)] outputer = mp.Process(target=outputer, args=(output_queue,)) for w in workers: w.start() outputer.start() for i in range(10**6): task_queue.put(i) for w in workers: # put end flag to task queue task_queue.put(None) task_queue.join() # wait all tasks done print("all tasks done.") print("queue size before put end flag: %d"%output_queue.qsize()) output_queue.put(None) # put end flag to output queue print("end") ``` Get the output: Process-20923 done Process-20931 done Process-20925 done Process-20930 done Process-20927 done Process-20929 done Process-20928 done Process-20926 done Process-20924 done Process-20932 done all tasks done. queue size before put end flag: 914789 end Process(output)-20933 done outputer get 90383 objects from the output_queue Process Process-11: Traceback (most recent call last): File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "joinablequeue.py", line 27, in outputer assert output_queue.empty(), "output queue should be empty here" AssertionError: output queue should be empty here I have wait all worker put the output to the output queue use taks_queue.join(), then I put the end flag to the output queue, but according to outputer's printed information, it get the `None` end flag before other value in the queue. It seems queue not get value according to 'FIFO' rule. ---------- components: Library (Lib) messages: 308710 nosy: Weize Xu, davin priority: normal severity: normal status: open title: Python mulitiprocessing.Queue fail to get according to correct sequence type: behavior versions: Python 3.6 _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue32382> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com