Charles-François Natali added the comment:
Patches for 2.7 and default.
----------
keywords: +patch
Added file: http://bugs.python.org/file39170/mp_map_fail_fast_27.diff
Added file: http://bugs.python.org/file39171/mp_map_fail_fast_default.diff
_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue23992>
_______________________________________
diff -r 5576c8240963 Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py Wed Apr 15 19:30:38 2015 +0100
+++ b/Lib/multiprocessing/pool.py Wed Apr 22 19:34:48 2015 +0100
@@ -599,10 +599,10 @@
self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result):
+ self._number_left -= 1
success, result = success_result
- if success:
+ if success and self._success:
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
- self._number_left -= 1
if self._number_left == 0:
if self._callback:
self._callback(self._value)
@@ -615,15 +615,17 @@
self._cond.release()
else:
- self._success = False
- self._value = result
- del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ if self._success:
+ self._success = False
+ self._value = result
+ if self._number_left == 0:
+ del self._cache[self._job]
+ self._cond.acquire()
+ try:
+ self._ready = True
+ self._cond.notify()
+ finally:
+ self._cond.release()
#
# Class whose instances are returned by `Pool.imap()`
diff -r 5576c8240963 Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py Wed Apr 15 19:30:38 2015 +0100
+++ b/Lib/test/test_multiprocessing.py Wed Apr 22 19:34:48 2015 +0100
@@ -1123,6 +1123,12 @@
time.sleep(wait)
return x*x
+
+def raise_large_valuerror(wait):
+ time.sleep(wait)
+ raise ValueError("x" * 1024**2)
+
+
class SayWhenError(ValueError): pass
def exception_throwing_generator(total, when):
@@ -1262,6 +1268,27 @@
p.close()
p.join()
+ def test_map_no_failfast(self):
+ # Issue #23992: the fail-fast behaviour when an exception is raised
+ # during map() would make Pool.join() deadlock, because a worker
+ # process would fill the result queue (after the result handler thread
+ # terminated, hence not draining it anymore).
+
+ t_start = time.time()
+
+ with self.assertRaises(ValueError):
+ p = self.Pool(2)
+ try:
+ p.map(raise_large_valuerror, [0, 1])
+ finally:
+ time.sleep(0.5)
+ p.close()
+ p.join()
+
+ # check that we indeed waited for all jobs
+ self.assertGreater(time.time() - t_start, 0.9)
+
+
def unpickleable_result():
return lambda: 42
diff -r 21ae8abc1af3 Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py Mon Apr 13 12:30:53 2015 -0500
+++ b/Lib/multiprocessing/pool.py Wed Apr 22 19:35:31 2015 +0100
@@ -638,22 +638,24 @@
self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result):
+ self._number_left -= 1
success, result = success_result
if success:
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
- self._number_left -= 1
if self._number_left == 0:
if self._callback:
self._callback(self._value)
del self._cache[self._job]
self._event.set()
else:
- self._success = False
- self._value = result
- if self._error_callback:
- self._error_callback(self._value)
- del self._cache[self._job]
- self._event.set()
+ if self._success:
+ self._success = False
+ self._value = result
+ if self._number_left == 0:
+ if self._error_callback:
+ self._error_callback(self._value)
+ del self._cache[self._job]
+ self._event.set()
#
# Class whose instances are returned by `Pool.imap()`
diff -r 21ae8abc1af3 Lib/test/_test_multiprocessing.py
--- a/Lib/test/_test_multiprocessing.py Mon Apr 13 12:30:53 2015 -0500
+++ b/Lib/test/_test_multiprocessing.py Wed Apr 22 19:35:31 2015 +0100
@@ -1660,6 +1660,10 @@
def mul(x, y):
return x*y
+def raise_large_valuerror(wait):
+ time.sleep(wait)
+ raise ValueError("x" * 1024**2)
+
class SayWhenError(ValueError): pass
def exception_throwing_generator(total, when):
@@ -1889,6 +1893,26 @@
with self.assertRaises(RuntimeError):
p.apply(self._test_wrapped_exception)
+ def test_map_no_failfast(self):
+ # Issue #23992: the fail-fast behaviour when an exception is raised
+ # during map() would make Pool.join() deadlock, because a worker
+ # process would fill the result queue (after the result handler thread
+ # terminated, hence not draining it anymore).
+
+ t_start = time.time()
+
+ with self.assertRaises(ValueError):
+ with self.Pool(2) as p:
+ try:
+ p.map(raise_large_valuerror, [0, 1])
+ finally:
+ time.sleep(0.5)
+ p.close()
+ p.join()
+
+ # check that we indeed waited for all jobs
+ self.assertGreater(time.time() - t_start, 0.9)
+
def raising():
raise KeyError("key")
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com