New submission from Edward Catmur:

There is a race condition in multiprocessing.Pool._terminate_pool that can 
result in workers being restarted during shutdown (process shutdown or 
pool.terminate()).

        worker_handler._state = TERMINATE    # <~~~~ race from here
        task_handler._state = TERMINATE

        debug('helping task handler/workers to finish')
        cls._help_stuff_finish(inqueue, task_handler, len(pool))

        assert result_handler.is_alive() or len(cache) == 0

        result_handler._state = TERMINATE
        outqueue.put(None)                  # sentinel

        # We must wait for the worker handler to exit before terminating
        # workers because we don't want workers to be restarted behind our back.
        debug('joining worker handler')
        worker_handler.join()                # <~~~~~ race to here

At any point between setting worker_handler._state = TERMINATE and joining the 
worker handler, if the intervening code causes a worker to exit then it is 
possible for the worker handler to fail to notice that it has been shutdown and 
so attempt to restart the worker:

    @staticmethod
    def _handle_workers(pool):
        thread = threading.current_thread()

        # Keep maintaining workers until the cache gets drained, unless the pool
        # is terminated.
        while thread._state == RUN or (pool._cache and thread._state != 
TERMINATE):
            # <~~~~~~ race here
            pool._maintain_pool()
            time.sleep(0.1)
        # send sentinel to stop workers
        pool._taskqueue.put(None)
        util.debug('worker handler exiting')

We noticed this initially because in the absence of the fix to #14881 a 
ThreadPool trying to restart a worker fails and hangs the process.  In the 
presence of the fix to #14881 there is no immediate issue, but trying to 
restart a worker process/thread on pool shutdown is clearly unwanted and could 
result in bad things happening e.g. at process shutdown.

To trigger the race with ThreadPool, it is enough just to pause the 
_handle_workers thread after checking its state and before calling 
_maintain_pool:

import multiprocessing.pool
import time
class ThreadPool(multiprocessing.pool.ThreadPool):
    def _maintain_pool(self):
        time.sleep(1)
        super(ThreadPool, self)._maintain_pool()
    def _repopulate_pool(self):
        assert self._state == multiprocessing.pool.RUN
        super(ThreadPool, self)._repopulate_pool()
pool = ThreadPool(4)
pool.map(lambda x: x, range(5))
pool.terminate()
pool.join()

Exception in thread Thread-5:
Traceback (most recent call last):
  File ".../cpython/Lib/threading.py", line 657, in _bootstrap_inner
    self.run()
  File ".../cpython/Lib/threading.py", line 605, in run
    self._target(*self._args, **self._kwargs)
  File ".../cpython/Lib/multiprocessing/pool.py", line 358, in _handle_workers
    pool._maintain_pool()
  File ".../bug.py", line 6, in _maintain_pool
    super(ThreadPool, self)._maintain_pool()
  File ".../cpython/Lib/multiprocessing/pool.py", line 232, in _maintain_pool
    self._repopulate_pool()
  File ".../bug.py", line 8, in _repopulate_pool
    assert self._state == multiprocessing.pool.RUN
AssertionError

In this case, the race occurs when ThreadPool._help_stuff_finish puts sentinels 
on inqueue to make the workers finish.

It is also possible to trigger the bug with multiprocessing.pool.Pool:

import multiprocessing.pool
import time
class Pool(multiprocessing.pool.Pool):
    def _maintain_pool(self):
        time.sleep(2)
        super(Pool, self)._maintain_pool()
    def _repopulate_pool(self):
        assert self._state == multiprocessing.pool.RUN
        super(Pool, self)._repopulate_pool()
    @staticmethod
    def _handle_tasks(taskqueue, put, outqueue, pool):
        time.sleep(1)
        _real_handle_tasks(taskqueue, put, outqueue, pool)
_real_handle_tasks = multiprocessing.pool.Pool._handle_tasks
multiprocessing.pool.Pool._handle_tasks = Pool._handle_tasks
pool = Pool(4)
pool.map(str, range(10))
pool.map_async(str, range(10))
pool.terminate()
pool.join()

In this case, the race occurs when _handle_tasks checks thread._state, breaks 
out of its first loop, and sends sentinels to the workers.

The terminate/join can be omitted, in which case the bug will occur at gc or 
process shutdown when the pool's atexit handler runs.  The bug is avoided if 
terminate is replaced with close, and we are using this workaround.

----------
components: Library (Lib)
messages: 198432
nosy: ecatmur
priority: normal
severity: normal
status: open
title: multiprocessing.Pool._terminate_pool restarts workers during shutdown
type: behavior
versions: Python 2.7, Python 3.5

_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue19096>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to