I see, looks like I’ll have to use Queue.close() Didn’t think it would be necessary since I was assuming it would be garbage collected. Sigh. Bug, fixed.
Thanks everyone! > On Oct 18, 2015, at 3:41 AM, James DeVincentis <ad...@hexhost.net> wrote: > > I get why it needs to be called, but this looks like a serious annoyance. > > Now I need help figuring this out. > > socketserver.ForkingMixIn needs to use os._exit() so that the process never > makes it past handling the request. However, if there is a thread running > inside that process that manages a multiprocess.Queue that I’m attached to, > how do I ensure it’s buffer gets flushed before it dies? The only way I can > think of is to somehow force the flush while I’m inside the HTTP request. > > This is kind of a stupid hack to have to put in place but it seems since > python doesn’t implement a way to exit nicely (not raising a SystemExit), AND > flush the buffers, sooooo I’m stuck in a stupid edge case. > >> On Oct 18, 2015, at 3:33 AM, James DeVincentis <ad...@hexhost.net >> <mailto:ad...@hexhost.net>> wrote: >> >> Seems I found the cause. os._exit() is used in ForkingMixIn for SocketServer >> and it’s child classes. >> >> Since os._exit() doesn’t flush buffers or clean anything up (hence not >> running the Finalize hooks that multiprocess.Queue use to make sure data >> gets flushed), this breaks multiprocess.Queue. >> >> https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l605 >> <https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l605> >> https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l611 >> <https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l611> >> >> Sigh. Now the question is why is it necessary to call os._exit() ? >> >>> On Oct 17, 2015, at 3:19 PM, James DeVincentis <ad...@hexhost.net >>> <mailto:ad...@hexhost.net>> wrote: >>> >>> So, whatever is causing this is a bit deeper in the multiprocessing.Queue >>> class. I tried using a non-blocking multiprocessing.Queue.get() by setting >>> the first parameter to false and then catching the queue.Empty exception. >>> For some reason even though there are objects in the queue (as evidenced by >>> multiprocessing.Queue.qsize(), the queue.Empty exception is being thrown. >>> >>> I dug into the multiprocessing.Queue class it looks like when calling >>> multiprocessing.Queue.get() multiprocessing.Queue._poll() is coming up >>> false causing it to throw the Empty exception. However, according to >>> multiprocessing.Queue._sem there are still locks acquired on it thus giving >>> me a multiprocessing.Queue.qsize() > 0. >>> >>> It looks like it is possible for a process to die (which includes that >>> process’s local multiprocessing.Queue._feed() thread) off before >>> multiprocessing.Queue._buffer is fed out using >>> multiprocessing.Queue._feed(). >>> >>> Thoughts from anyone? >>> >>> >>>> On Oct 15, 2015, at 7:37 PM, James DeVincentis <ad...@hexhost.net >>>> <mailto:ad...@hexhost.net>> wrote: >>>> >>>> Looking into it, I seem to have found a race condition where a >>>> multiprocessing.Queue.get() can get hung waiting for an object even if >>>> there are objects in the Queue when being placed into the queue by a >>>> forked process and then the process ending quickly. >>>> >>>> I don’t know how to track this down any farther or fix it. >>>> >>>> 1. Server gets booted. This spawns the following >>>> - API Server - HTTPServer(socketserver.ForkingMixIn, >>>> http.server.HTTPServer) - >>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/server.py >>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/server.py> >>>> - Workers - Process(multiprocessing.Process) - >>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L93 >>>> >>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L93> >>>> >>>> 2. HTTP Request comes in and is handled by >>>> Handler(http.server.BaseHTTPRequestHandler) in this case using do_PUT >>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L226 >>>> >>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L226> >>>> 3. Object is created and placed in the global queue: >>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283 >>>> >>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283> >>>> 4. HTTP Request Process dies since request has been handled. >>>> 5. Worker Process has task manager thread that takes the object from a >>>> global queue and places it into a local queue >>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L64 >>>> >>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L64> >>>> 6. one of many (30+) threads on the worker process picks up the object >>>> from the local queue and processes it >>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L12 >>>> >>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L12> >>>> >>>> It seems the task management threads are getting hung up waiting for an >>>> object from the global queue, even though the global queue has objects in >>>> it… >>>> >>>> Ignore the ERRORS they aren’t actually errors. I did it for temporary >>>> visibility. >>>> >>>> 2015-10-16 00:20:07 Manager #3 ERROR Put >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f4384d53278> into local queue >>>> 2015-10-16 00:20:07 Manager #4 ERROR Got >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> from global >>>> queue >>>> 2015-10-16 00:20:07 Manager #4 ERROR Put >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> into local queue >>>> 2015-10-16 00:23:05 Manager #1 ERROR Waiting for item from global >>>> queue >>>> 2015-10-16 00:23:05 Manager #2 ERROR Waiting for item from global >>>> queue >>>> 2015-10-16 00:23:05 Manager #3 ERROR Waiting for item from global >>>> queue >>>> 2015-10-16 00:23:05 Manager #4 ERROR Waiting for item from global >>>> queue >>>> 2015-10-16 00:23:08 Manager #1 ERROR Got >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> from global >>>> queue >>>> 2015-10-16 00:23:08 Manager #1 ERROR Put >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> into local queue >>>> 2015-10-16 00:23:08 Manager #1 ERROR Waiting for item from global >>>> queue >>>> 2015-10-16 00:23:08 Manager #2 ERROR Got >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> from global >>>> queue >>>> 2015-10-16 00:23:08 Manager #2 ERROR Put >>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> into local queue >>>> 2015-10-16 00:23:08 Manager #2 ERROR Waiting for item from global >>>> queue >>>> — Nothing else gets logged by the manager threads at all, even if I >>>> continue adding objects to the global queue. >>>> >>>> >>>> 2015-10-16 00:25:00 worker #1 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:00 worker #3 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:00 worker #4 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:00 worker #2 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:00 MAIN DEBUG Global Queue Size: 572 >>>> >>>> 2015-10-16 00:25:05 worker #1 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:05 worker #3 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:05 worker #4 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:05 worker #2 DEBUG Local Queue Size: 0 >>>> 2015-10-16 00:25:05 MAIN DEBUG Global Queue Size: 572 >>>> >>>> >>>> >>>> >>>>> On Oct 15, 2015, at 5:02 PM, James DeVincentis <ad...@hexhost.net >>>>> <mailto:ad...@hexhost.net>> wrote: >>>>> >>>>> Anyone have any ideas? I feel like this could be a bug with the garbage >>>>> collector across multiprocessing. >>>>> >>>>> From: James DeVincentis [mailto:ad...@hexhost.net >>>>> <mailto:ad...@hexhost.net>] >>>>> Sent: Wednesday, October 14, 2015 12:41 PM >>>>> To: 'python-list@python.org <mailto:python-list@python.org>' >>>>> Subject: Problem with copy.deepcopy and multiprocessing.Queue >>>>> >>>>> I’ve got a bit of a problem with copy.deepcopy and using >>>>> multiprocessing.Queue. >>>>> >>>>> I have an HTTPAPI that gets exposed to add objects to a >>>>> multiprocessing.Qeue. Source code here: >>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283 >>>>> >>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283> >>>>> >>>>> The trouble is, even using deepcopy, my debugging shows that it keeps >>>>> re-using the same address space on every iteration of the loop (and/or >>>>> fork since it uses the Forking TCP server mixin). >>>>> >>>>> This is reflected when the reference to that address space gets removed >>>>> from the Queue. Source code: >>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py >>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py> >>>>> >>>>> So I submit a few objects very rapidly to the HTTP server. It places them >>>>> in the queue. For some reason on the second iteration copy.deepcopy stops >>>>> using new address spaces for deepcopied objects. As such the queue >>>>> processes that object only once (even though Queue.put() is called >>>>> repeatedly for the same address space). >>>>> >>>>> I’ve tried ‘del’ the object before reusing it but it still reallocates to >>>>> the same address space. I’m at a complete loss. >>>>> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb2e8> >>>>> 2015-10-14 17:03:30 THREAD #1-1 WARNING Got Observable with Object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdb978> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 THREAD #2-1 WARNING Got Observable with Object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdab70> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> 2015-10-14 17:03:30 APIHTTP WARNING Storing object: >>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0> >>>>> >>>>> >>>>> -- >>>>> https://mail.python.org/mailman/listinfo/python-list >>>>> <https://mail.python.org/mailman/listinfo/python-list> >>>> >>>> -- >>>> https://mail.python.org/mailman/listinfo/python-list >>>> <https://mail.python.org/mailman/listinfo/python-list> >>> >>> -- >>> https://mail.python.org/mailman/listinfo/python-list >>> <https://mail.python.org/mailman/listinfo/python-list> >> >> -- >> https://mail.python.org/mailman/listinfo/python-list >> <https://mail.python.org/mailman/listinfo/python-list> > > -- > https://mail.python.org/mailman/listinfo/python-list
-- https://mail.python.org/mailman/listinfo/python-list