array.array()'s memory shared with multiprocessing.Process()

2017-09-09 Thread iurly
Hi,

I'm writing a multiprocessing program whose behavior I don't understand.
Essentially, the main process collects data and then passes it to a consumer 
process.
For performance reasons I'm using a "static" circular buffer created through 
array.array(), and then passing it "as-is" by pushing it onto a queue.

According to:
https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues

I would expect the array to be pickled by the sending process and then 
unpickled at the other end (i.e. no memory would be shared among the two 
processes).
Thus, overwriting data on the buffer should be safe in my understanding.

What happens instead is that the consumer thread may indeed receive a corrupted 
array, in that some elements might have already been overwritten by the 
producer.
I did somehow overcome this limitation by just passing a copy.copy() of the 
buffer, but I really don't understand why this would be necessary at all.

Could someone please shed some light on this?
Thank you!

Here's the example code:

---
import multiprocessing as mp
import array
import time

def consumer_process(queue):
  while True:
ts_buffer = queue.get()
#time.sleep(.1)
i = 0
for idx in range(1, len(ts_buffer)):
  diff = ts_buffer[idx] - ts_buffer[idx-1]
  if diff < 0:
print("error: idx = ", idx, " diff =", diff, ":", ts_buffer[idx-1], " 
-> ", ts_buffer[idx])

queue = mp.Queue(100)
p = mp.Process(name="consumer", target=consumer_process, args=(queue,))
p.daemon = True  # can't use daemon as kwargs when using multiprocessing.dummy
p.start()
samples_dump = 2

ts_buffer = array.array('f', bytearray(4 * (samples_dump)))

i = 0
while True:
  for idx in range(0,len(ts_buffer)):
ts_buffer[idx] = i
i += 1
  queue.put(ts_buffer)
  # enable this to make the error go away
  #time.sleep(.1)
---
error: idx =  18372  diff = -1.0 : 38371.0  ->  18372.0
error: idx =  17011  diff = -1.0 : 97010.0  ->  77011.0
error: idx =  15670  diff = -1.0 : 135669.0  ->  115670.0
error: idx =  14914  diff = -1.0 : 154913.0  ->  134914.0
error: idx =  19405  diff = -1.0 : 179404.0  ->  159405.0
error: idx =  17160  diff = -1.0 : 197159.0  ->  177160.0
error: idx =  19130  diff = -1.0 : 219129.0  ->  199130.0
error: idx =  14298  diff = -1.0 : 254297.0  ->  234298.0
error: idx =  9307  diff = -1.0 : 289306.0  ->  269307.0
error: idx =  15815  diff = -1.0 : 315814.0  ->  295815.0
error: idx =  11587  diff = -1.0 : 331586.0  ->  311587.0
---
$ python3 --version
Python 3.5.2
-- 
https://mail.python.org/mailman/listinfo/python-list


Re: array.array()'s memory shared with multiprocessing.Process()

2017-09-10 Thread iurly
Il giorno domenica 10 settembre 2017 18:53:33 UTC+2, MRAB ha scritto:
> On 2017-09-10 12:40, gerlando.fala...@gmail.com wrote:
> >> 
> >> I suspect it's down to timing.
> >> 
> >> What you're putting into the queue is a reference to the array, and it's 
> >> only some time later that the array itself is pickled and then sent (the 
> >> work being done in the 'background').
> >> 
> >> Modifying the array before (or while) it's actually being sent would 
> >> explain the problem you're seeing.
> > 
> > That would also have been my guess. However, according to documentation:
> > 
> >> When an object is put on a queue, the object is pickled and a background 
> >> thread later flushes the pickled data to an underlying pipe.
> > 
> > In my understanding this means the object is pickled *before* the 
> > background thread takes care of flushing the data to the pipe. Is that a 
> > mistake in the documentation then?
> > 
> > Any suggestion for a way to work around this limitation?
> > Or perhaps a different approach altogether I could use to reduce CPU load?
> > What the main thread actually does is dequeue data from a high-speed 
> > USB-to-serial (2.000.000 bps), that's why I came up with the array.array() 
> > solution to store collected data, hoping for the smallest possible overhead.
> > Thanks!
> > 
> I've had a quick look at the source code.
> 
> When an object is put into the queue, it's actually put into an internal 
> buffer (a deque), and then the method returns.
> 
> An internal thread works through the buffer and pickles the objects.
> 
> Therefore, just because the method has returned, it doesn't mean that 
> it's now safe to modify the object.

I see. So that explains everything. However, I wonder if that's the intended 
behavior and/or that should be documented somehow.
As far as I'm concerned, I'm probably better off using double buffers to avoid 
this kind of issues.
Thanks a lot for your help!
-- 
https://mail.python.org/mailman/listinfo/python-list