Charles-François Natali added the comment:

I'm splitting the patches:
- one which adds loads and dumps to ForkingPicler
- the contention reduction patch

I'd like to commit them soon.

----------
Added file: http://bugs.python.org/file29559/queues_contention.diff
Added file: http://bugs.python.org/file29560/forkingpickler.diff

_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue17025>
_______________________________________
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,7 +22,7 @@
 from multiprocessing.connection import Pipe
 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, 
Condition
 from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from multiprocessing.forking import assert_spawning, ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -69,8 +69,8 @@
         self._joincancelled = False
         self._closed = False
         self._close = None
-        self._send = self._writer.send
-        self._recv = self._reader.recv
+        self._send_bytes = self._writer.send_bytes
+        self._recv_bytes = self._reader.recv_bytes
         self._poll = self._reader.poll
 
     def put(self, obj, block=True, timeout=None):
@@ -89,14 +89,9 @@
 
     def get(self, block=True, timeout=None):
         if block and timeout is None:
-            self._rlock.acquire()
-            try:
-                res = self._recv()
-                self._sem.release()
-                return res
-            finally:
-                self._rlock.release()
-
+            with self._rlock:
+                res = self._recv_bytes()
+            self._sem.release()
         else:
             if block:
                 deadline = time.time() + timeout
@@ -109,11 +104,12 @@
                         raise Empty
                 elif not self._poll():
                     raise Empty
-                res = self._recv()
+                res = self._recv_bytes()
                 self._sem.release()
-                return res
             finally:
                 self._rlock.release()
+        # unserialize the data after having released the lock
+        return ForkingPickler.loads(res)
 
     def qsize(self):
         # Raises NotImplementedError on Mac OSX because of broken 
sem_getvalue()
@@ -158,7 +154,7 @@
         self._buffer.clear()
         self._thread = threading.Thread(
             target=Queue._feed,
-            args=(self._buffer, self._notempty, self._send,
+            args=(self._buffer, self._notempty, self._send_bytes,
                   self._wlock, self._writer.close, self._ignore_epipe),
             name='QueueFeederThread'
             )
@@ -210,7 +206,7 @@
             notempty.release()
 
     @staticmethod
-    def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
+    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
         debug('starting thread to feed data to pipe')
         from .util import is_exiting
 
@@ -241,16 +237,14 @@
                             close()
                             return
 
+                        # serialize the data before acquiring the lock
+                        obj = ForkingPickler.dumps(obj)
                         if wacquire is None:
-                            send(obj)
-                            # Delete references to object. See issue16284
-                            del obj
+                            send_bytes(obj)
                         else:
                             wacquire()
                             try:
-                                send(obj)
-                                # Delete references to object. See issue16284
-                                del obj
+                                send_bytes(obj)
                             finally:
                                 wrelease()
                 except IndexError:
@@ -344,7 +338,6 @@
             self._wlock = None
         else:
             self._wlock = Lock()
-        self._make_methods()
 
     def empty(self):
         return not self._poll()
@@ -355,29 +348,19 @@
 
     def __setstate__(self, state):
         (self._reader, self._writer, self._rlock, self._wlock) = state
-        self._make_methods()
 
-    def _make_methods(self):
-        recv = self._reader.recv
-        racquire, rrelease = self._rlock.acquire, self._rlock.release
-        def get():
-            racquire()
-            try:
-                return recv()
-            finally:
-                rrelease()
-        self.get = get
+    def get(self):
+        with self._rlock:
+            res = self._reader.recv_bytes()
+        # unserialize the data after having released the lock
+        return ForkingPickler.loads(res)
 
+    def put(self, obj):
+        # serialize the data before acquiring the lock
+        obj = ForkingPickler.dumps(obj)
         if self._wlock is None:
             # writes to a message oriented win32 pipe are atomic
-            self.put = self._writer.send
+            self._writer.send_bytes(obj)
         else:
-            send = self._writer.send
-            wacquire, wrelease = self._wlock.acquire, self._wlock.release
-            def put(obj):
-                wacquire()
-                try:
-                    return send(obj)
-                finally:
-                    wrelease()
-            self.put = put
+            with self._wlock:
+                self._writer.send_bytes(obj)
diff --git a/Lib/multiprocessing/connection.py 
b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -12,7 +12,6 @@
 import io
 import os
 import sys
-import pickle
 import select
 import socket
 import struct
@@ -202,9 +201,7 @@
         """Send a (picklable) object"""
         self._check_closed()
         self._check_writable()
-        buf = io.BytesIO()
-        ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
-        self._send_bytes(buf.getbuffer())
+        self._send_bytes(ForkingPickler.dumps(obj))
 
     def recv_bytes(self, maxlength=None):
         """
@@ -249,7 +246,7 @@
         self._check_closed()
         self._check_readable()
         buf = self._recv_bytes()
-        return pickle.loads(buf.getbuffer())
+        return ForkingPickler.loads(buf.getbuffer())
 
     def poll(self, timeout=0.0):
         """Whether there is any input available to be read"""
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -7,7 +7,9 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
+import io
 import os
+import pickle
 import sys
 import signal
 import errno
@@ -44,6 +46,15 @@
     def register(cls, type, reduce):
         cls._extra_reducers[type] = reduce
 
+    @staticmethod
+    def dumps(obj):
+        buf = io.BytesIO()
+        ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
+        return buf.getbuffer()
+
+    loads = pickle.loads
+
+
 def _reduce_method(m):
     if m.__self__ is None:
         return getattr, (m.__class__, m.__func__.__name__)
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to