I see, looks like I’ll have to use Queue.close()

Didn’t think it would be necessary since I was assuming it would be garbage 
collected. Sigh. Bug, fixed.

Thanks everyone!

> On Oct 18, 2015, at 3:41 AM, James DeVincentis <ad...@hexhost.net> wrote:
>
> I get why it needs to be called, but this looks like a serious annoyance.
>
> Now I need help figuring this out.
>
> socketserver.ForkingMixIn needs to use os._exit() so that the process never 
> makes it past handling the request. However, if there is a thread running 
> inside that process that manages a multiprocess.Queue that I’m attached to, 
> how do I ensure it’s buffer gets flushed before it dies? The only way I can 
> think of is to somehow force the flush while I’m inside the HTTP request.
>
> This is kind of a stupid hack to have to put in place but it seems since 
> python doesn’t implement a way to exit nicely (not raising a SystemExit), AND 
> flush the buffers, sooooo I’m stuck in a stupid edge case.
>
>> On Oct 18, 2015, at 3:33 AM, James DeVincentis <ad...@hexhost.net 
>> <mailto:ad...@hexhost.net>> wrote:
>>
>> Seems I found the cause. os._exit() is used in ForkingMixIn for SocketServer 
>> and it’s child classes.
>>
>> Since os._exit() doesn’t flush buffers or clean anything up (hence not 
>> running the Finalize hooks that multiprocess.Queue use to make sure data 
>> gets flushed), this breaks multiprocess.Queue.
>>
>> https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l605 
>> <https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l605>
>> https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l611 
>> <https://hg.python.org/cpython/file/3.5/Lib/socketserver.py#l611>
>>
>> Sigh. Now the question is why is it necessary to call os._exit() ?
>>
>>> On Oct 17, 2015, at 3:19 PM, James DeVincentis <ad...@hexhost.net 
>>> <mailto:ad...@hexhost.net>> wrote:
>>>
>>> So, whatever is causing this is a bit deeper in the multiprocessing.Queue 
>>> class. I tried using a non-blocking  multiprocessing.Queue.get() by setting 
>>> the first parameter to false and then catching the queue.Empty exception. 
>>> For some reason even though there are objects in the queue (as evidenced by 
>>> multiprocessing.Queue.qsize(), the queue.Empty exception is being thrown.
>>>
>>> I dug into the multiprocessing.Queue class it looks like when calling 
>>> multiprocessing.Queue.get() multiprocessing.Queue._poll() is coming up 
>>> false causing it to throw the Empty exception. However, according to 
>>> multiprocessing.Queue._sem there are still locks acquired on it thus giving 
>>> me a multiprocessing.Queue.qsize() > 0.
>>>
>>> It looks like it is possible for a process to die (which includes that 
>>> process’s local multiprocessing.Queue._feed() thread) off before 
>>> multiprocessing.Queue._buffer is fed out using 
>>> multiprocessing.Queue._feed().
>>>
>>> Thoughts from anyone?
>>>
>>>
>>>> On Oct 15, 2015, at 7:37 PM, James DeVincentis <ad...@hexhost.net 
>>>> <mailto:ad...@hexhost.net>> wrote:
>>>>
>>>> Looking into it, I seem to have found a race condition where a 
>>>> multiprocessing.Queue.get() can get hung waiting for an object even if 
>>>> there are objects in the Queue when being placed into the queue by a 
>>>> forked process and then the process ending quickly.
>>>>
>>>> I don’t know how to track this down any farther or fix it.
>>>>
>>>> 1. Server gets booted. This spawns the following
>>>> - API Server - HTTPServer(socketserver.ForkingMixIn, 
>>>> http.server.HTTPServer) - 
>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/server.py 
>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/server.py>
>>>> - Workers - Process(multiprocessing.Process) - 
>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L93
>>>>  
>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L93>
>>>>
>>>> 2. HTTP Request comes in and is handled by 
>>>> Handler(http.server.BaseHTTPRequestHandler) in this case using do_PUT 
>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L226
>>>>  
>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L226>
>>>> 3. Object is created and placed in the global queue: 
>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283
>>>>  
>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283>
>>>> 4. HTTP Request Process dies since request has been handled.
>>>> 5. Worker Process has task manager thread that takes the object from a 
>>>> global queue and places it into a local queue 
>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L64
>>>>  
>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L64>
>>>> 6. one of many (30+) threads on the worker process picks up the object 
>>>> from the local queue and processes it 
>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L12
>>>>  
>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py#L12>
>>>>
>>>> It seems the task management threads are getting hung up waiting for an 
>>>> object from the global queue, even though the global queue has objects in 
>>>> it…
>>>>
>>>> Ignore the ERRORS they aren’t actually errors. I did it for temporary 
>>>> visibility.
>>>>
>>>> 2015-10-16 00:20:07 Manager #3   ERROR    Put 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f4384d53278> into local queue
>>>> 2015-10-16 00:20:07 Manager #4   ERROR    Got 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> from global 
>>>> queue
>>>> 2015-10-16 00:20:07 Manager #4   ERROR    Put 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f43844e40f0> into local queue
>>>> 2015-10-16 00:23:05 Manager #1   ERROR    Waiting for item from global 
>>>> queue
>>>> 2015-10-16 00:23:05 Manager #2   ERROR    Waiting for item from global 
>>>> queue
>>>> 2015-10-16 00:23:05 Manager #3   ERROR    Waiting for item from global 
>>>> queue
>>>> 2015-10-16 00:23:05 Manager #4   ERROR    Waiting for item from global 
>>>> queue
>>>> 2015-10-16 00:23:08 Manager #1   ERROR    Got 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> from global 
>>>> queue
>>>> 2015-10-16 00:23:08 Manager #1   ERROR    Put 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fcf080> into local queue
>>>> 2015-10-16 00:23:08 Manager #1   ERROR    Waiting for item from global 
>>>> queue
>>>> 2015-10-16 00:23:08 Manager #2   ERROR    Got 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> from global 
>>>> queue
>>>> 2015-10-16 00:23:08 Manager #2   ERROR    Put 
>>>> <cif.types.observables.fqdn.Fqdn object at 0x7f8892fce4a8> into local queue
>>>> 2015-10-16 00:23:08 Manager #2   ERROR    Waiting for item from global 
>>>> queue
>>>> — Nothing else gets logged by the manager threads at all, even if I 
>>>> continue adding objects to the global queue.
>>>>
>>>>
>>>> 2015-10-16 00:25:00 worker #1    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:00 worker #3    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:00 worker #4    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:00 worker #2    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:00 MAIN         DEBUG    Global Queue Size: 572
>>>>
>>>> 2015-10-16 00:25:05 worker #1    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:05 worker #3    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:05 worker #4    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:05 worker #2    DEBUG    Local Queue Size: 0
>>>> 2015-10-16 00:25:05 MAIN         DEBUG    Global Queue Size: 572
>>>>
>>>>
>>>>
>>>>
>>>>> On Oct 15, 2015, at 5:02 PM, James DeVincentis <ad...@hexhost.net 
>>>>> <mailto:ad...@hexhost.net>> wrote:
>>>>>
>>>>> Anyone have any ideas? I feel like this could be a bug with the garbage 
>>>>> collector across multiprocessing.
>>>>>
>>>>> From: James DeVincentis [mailto:ad...@hexhost.net 
>>>>> <mailto:ad...@hexhost.net>]
>>>>> Sent: Wednesday, October 14, 2015 12:41 PM
>>>>> To: 'python-list@python.org <mailto:python-list@python.org>'
>>>>> Subject: Problem with copy.deepcopy and multiprocessing.Queue
>>>>>
>>>>> I’ve got a bit of a problem with copy.deepcopy and using 
>>>>> multiprocessing.Queue.
>>>>>
>>>>> I have an HTTPAPI that gets exposed to add objects to a 
>>>>> multiprocessing.Qeue. Source code here: 
>>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283
>>>>>  
>>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/api/handler.py#L283>
>>>>>
>>>>> The trouble is, even using deepcopy, my debugging shows that it keeps 
>>>>> re-using the same address space on every iteration of the loop (and/or 
>>>>> fork since it uses the Forking TCP server mixin).
>>>>>
>>>>> This is reflected when the reference to that address space gets removed 
>>>>> from the Queue. Source code: 
>>>>> https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py 
>>>>> <https://github.com/jmdevince/cifpy3/blob/master/lib/cif/worker/worker.py>
>>>>>
>>>>> So I submit a few objects very rapidly to the HTTP server. It places them 
>>>>> in the queue. For some reason on the second iteration copy.deepcopy stops 
>>>>> using new address spaces for deepcopied objects. As such the queue 
>>>>> processes that object only once (even though Queue.put() is called 
>>>>> repeatedly for the same address space).
>>>>>
>>>>> I’ve tried ‘del’ the object before reusing it but it still reallocates to 
>>>>> the same address space. I’m at a complete loss.
>>>>>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb2e8>
>>>>> 2015-10-14 17:03:30 THREAD #1-1  WARNING  Got Observable with Object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdb978>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 THREAD #2-1  WARNING  Got Observable with Object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbdab70>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>> 2015-10-14 17:03:30 APIHTTP      WARNING  Storing object: 
>>>>> <cif.types.observables.fqdn.Fqdn object at 0x7fb26dbcb0f0>
>>>>>
>>>>>
>>>>> --
>>>>> https://mail.python.org/mailman/listinfo/python-list 
>>>>> <https://mail.python.org/mailman/listinfo/python-list>
>>>>
>>>> --
>>>> https://mail.python.org/mailman/listinfo/python-list 
>>>> <https://mail.python.org/mailman/listinfo/python-list>
>>>
>>> --
>>> https://mail.python.org/mailman/listinfo/python-list 
>>> <https://mail.python.org/mailman/listinfo/python-list>
>>
>> --
>> https://mail.python.org/mailman/listinfo/python-list 
>> <https://mail.python.org/mailman/listinfo/python-list>
>
> --
> https://mail.python.org/mailman/listinfo/python-list



-- 
https://mail.python.org/mailman/listinfo/python-list

Reply via email to