Charles-François Natali added the comment:
> By the way, I forgot to mention it previously, but
> multiprocessing.connection uses a custom pickler (ForkingPickler).
Thanks, I didn't know.
Here's a patch using ForkingPickler.
I did a bit of refactoring to move the pickling code from
connection.py to forking.py.
----------
Added file: http://bugs.python.org/file29529/queues_contention-3.diff
_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue17025>
_______________________________________
diff --git a/Lib/multiprocessing/connection.py
b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -12,7 +12,6 @@
import io
import os
import sys
-import pickle
import select
import socket
import struct
@@ -202,9 +201,7 @@
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
- buf = io.BytesIO()
- ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
- self._send_bytes(buf.getbuffer())
+ self._send_bytes(ForkingPickler.dumps(obj))
def recv_bytes(self, maxlength=None):
"""
@@ -249,7 +246,7 @@
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
- return pickle.loads(buf.getbuffer())
+ return ForkingPickler.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -7,7 +7,9 @@
# Licensed to PSF under a Contributor Agreement.
#
+import io
import os
+import pickle
import sys
import signal
@@ -43,6 +45,15 @@
def register(cls, type, reduce):
cls._extra_reducers[type] = reduce
+ @staticmethod
+ def dumps(obj):
+ buf = io.BytesIO()
+ ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
+ return buf.getbuffer()
+
+ loads = pickle.loads
+
+
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__class__, m.__func__.__name__)
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,7 +22,7 @@
from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore,
Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from multiprocessing.forking import assert_spawning, ForkingPickler
#
# Queue type using a pipe, buffer and thread
@@ -69,8 +69,8 @@
self._joincancelled = False
self._closed = False
self._close = None
- self._send = self._writer.send
- self._recv = self._reader.recv
+ self._send_bytes = self._writer.send_bytes
+ self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
@@ -89,14 +89,9 @@
def get(self, block=True, timeout=None):
if block and timeout is None:
- self._rlock.acquire()
- try:
- res = self._recv()
- self._sem.release()
- return res
- finally:
- self._rlock.release()
-
+ with self._rlock:
+ res = self._recv_bytes()
+ self._sem.release()
else:
if block:
deadline = time.time() + timeout
@@ -109,11 +104,12 @@
raise Empty
elif not self._poll():
raise Empty
- res = self._recv()
+ res = self._recv_bytes()
self._sem.release()
- return res
finally:
self._rlock.release()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken
sem_getvalue()
@@ -158,7 +154,7 @@
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
- args=(self._buffer, self._notempty, self._send,
+ args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
@@ -210,7 +206,7 @@
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -241,14 +237,16 @@
close()
return
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
if wacquire is None:
- send(obj)
+ send_bytes(obj)
# Delete references to object. See issue16284
del obj
else:
wacquire()
try:
- send(obj)
+ send_bytes(obj)
# Delete references to object. See issue16284
del obj
finally:
@@ -344,7 +342,6 @@
self._wlock = None
else:
self._wlock = Lock()
- self._make_methods()
def empty(self):
return not self._poll()
@@ -355,29 +352,19 @@
def __setstate__(self, state):
(self._reader, self._writer, self._rlock, self._wlock) = state
- self._make_methods()
- def _make_methods(self):
- recv = self._reader.recv
- racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
- racquire()
- try:
- return recv()
- finally:
- rrelease()
- self.get = get
+ def get(self):
+ with self._rlock:
+ res = self._reader.recv_bytes()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
+ def put(self, obj):
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
- self.put = self._writer.send
+ self._writer.send_bytes(obj)
else:
- send = self._writer.send
- wacquire, wrelease = self._wlock.acquire, self._wlock.release
- def put(obj):
- wacquire()
- try:
- return send(obj)
- finally:
- wrelease()
- self.put = put
+ with self._wlock:
+ self._writer.send_bytes(obj)
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com