I’ve managed to partially confirm this theory. I switched the HTTPServer to use the ThreadingMixIn instead of the ForkingMixIn. This causes the queue to behave correctly.
The queue is created before any items are forked, then all of the processes are forked out that support the HTTPServer. It appears to be a problem specifically with forks that come from the HTTPServer using the ForkingMixIn. Now I just need to dig harder and find out what is keeping the handler that flushes the buffer to the connection.pipe before the thread/process terminates from occurring. > On Oct 17, 2015, at 3:19 PM, James DeVincentis <ad...@hexhost.net> wrote: > > So, whatever is causing this is a bit deeper in the multiprocessing.Queue > class. I tried using a non-blocking multiprocessing.Queue.get() by setting > the first parameter to false and then catching the queue.Empty exception. For > some reason even though there are objects in the queue (as evidenced by > multiprocessing.Queue.qsize(), the queue.Empty exception is being thrown. > > I dug into the multiprocessing.Queue class it looks like when calling > multiprocessing.Queue.get() multiprocessing.Queue._poll() is coming up false > causing it to throw the Empty exception. However, according to > multiprocessing.Queue._sem there are still locks acquired on it thus giving > me a multiprocessing.Queue.qsize() > 0. > > It looks like it is possible for a process to die (which includes that > process’s local multiprocessing.Queue._feed() thread) off before > multiprocessing.Queue._buffer is fed out using multiprocessing.Queue._feed(). > > Thoughts from anyone? > > >> On Oct 15, 2015, at 7:37 PM, James DeVincentis <ad...@hexhost.net> wrote: >> >> Looking into it, I seem to have found a race condition where a >> multiprocessing.Queue.get() can get hung waiting for an object even if there >> are objects in the Queue when being placed into the queue by a forked >> process and then the process ending quickly. >> >> I don’t know how to track this down any farther or fix it. >> >> 1. Server gets booted. This spawns the following >> - API Server - HTTPServer(socketserver.ForkingMixIn, http.server.HTTPServer) >> - https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/server.py >> - Workers - Process(multiprocessing.Process) - >> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L93 >> >> 2. HTTP Request comes in and is handled by >> Handler(http.server.BaseHTTPRequestHandler) in this case using do_PUT >> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L226 >> 3. Object is created and placed in the global queue: >> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283 >> 4. HTTP Request Process dies since request has been handled. >> 5. Worker Process has task manager thread that takes the object from a >> global queue and places it into a local queue >> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L64 >> 6. one of many (30+) threads on the worker process picks up the object from >> the local queue and processes it >> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L12 >> >> It seems the task management threads are getting hung up waiting for an >> object from the global queue, even though the global queue has objects in it… >> >> Ignore the ERRORS they aren’t actually errors. I did it for temporary >> visibility. >> >> 2015-10-16 00:20:07 Manager #3 ERROR Put >> <cif.types.observables.fqdn.Fqdn object at 0x7f4384d53278> into local queue >> 2015-10-16 00:20:07 Manager #4 ERROR Got >> <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> from global queue >> 2015-10-16 00:20:07 Manager #4 ERROR Put >> <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> into local queue >> 2015-10-16 00:23:05 Manager #1 ERROR Waiting for item from global queue >> 2015-10-16 00:23:05 Manager #2 ERROR Waiting for item from global queue >> 2015-10-16 00:23:05 Manager #3 ERROR Waiting for item from global queue >> 2015-10-16 00:23:05 Manager #4 ERROR Waiting for item from global queue >> 2015-10-16 00:23:08 Manager #1 ERROR Got >> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> from global queue >> 2015-10-16 00:23:08 Manager #1 ERROR Put >> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> into local queue >> 2015-10-16 00:23:08 Manager #1 ERROR Waiting for item from global queue >> 2015-10-16 00:23:08 Manager #2 ERROR Got >> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> from global queue >> 2015-10-16 00:23:08 Manager #2 ERROR Put >> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> into local queue >> 2015-10-16 00:23:08 Manager #2 ERROR Waiting for item from global queue >> — Nothing else gets logged by the manager threads at all, even if I continue >> adding objects to the global queue. >> >> >> 2015-10-16 00:25:00 worker #1 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:00 worker #3 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:00 worker #4 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:00 worker #2 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:00 MAIN DEBUG Global Queue Size: 572 >> >> 2015-10-16 00:25:05 worker #1 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:05 worker #3 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:05 worker #4 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:05 worker #2 DEBUG Local Queue Size: 0 >> 2015-10-16 00:25:05 MAIN DEBUG Global Queue Size: 572 >> >> >> >> >>> On Oct 15, 2015, at 5:02 PM, James DeVincentis <ad...@hexhost.net> wrote: >>> >>> Anyone have any ideas? I feel like this could be a bug with the garbage >>> collector across multiprocessing. >>> >>> From: James DeVincentis [mailto:ad...@hexhost.net] >>> Sent: Wednesday, October 14, 2015 12:41 PM >>> To: 'python-list@python.org' >>> Subject: Problem with copy.deepcopy and multiprocessing.Queue >>> >>> I’ve got a bit of a problem with copy.deepcopy and using >>> multiprocessing.Queue. >>> >>> I have an HTTPAPI that gets exposed to add objects to a >>> multiprocessing.Qeue. Source code here: >>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283 >>> >>> The trouble is, even using deepcopy, my debugging shows that it keeps >>> re-using the same address space on every iteration of the loop (and/or fork >>> since it uses the Forking TCP server mixin). >>> >>> This is reflected when the reference to that address space gets removed >>> from the Queue. Source code: >>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py >>> >>> So I submit a few objects very rapidly to the HTTP server. It places them >>> in the queue. For some reason on the second iteration copy.deepcopy stops >>> using new address spaces for deepcopied objects. As such the queue >>> processes that object only once (even though Queue.put() is called >>> repeatedly for the same address space). >>> >>> I’ve tried ‘del’ the object before reusing it but it still reallocates to >>> the same address space. I’m at a complete loss. >>> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb2e8> >>> 2015-10-14 17:03:30 THREAD #1-1 WARNING Got Observable with Object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdb978> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 THREAD #2-1 WARNING Got Observable with Object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdab70> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>> >>> >>> -- >>> https://mail.python.org/mailman/listinfo/python-list >> >> -- >> https://mail.python.org/mailman/listinfo/python-list > > -- > https://mail.python.org/mailman/listinfo/python-list -- https://mail.python.org/mailman/listinfo/python-list