commit: 4df7a0a0c16c5ded65ad601d39840797b7704770 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sun Feb 23 21:44:58 2020 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Mon Feb 24 02:35:15 2020 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4df7a0a0
ForkExecutor: use async_start method Also, fix AsynchronousTask.async_start to handle cancellation of the _async_start coroutine, ensuring that start and exit listeners are notified in this case (otherwise RetryForkExecutorTestCase will hang). Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/_emerge/AsynchronousTask.py | 15 +++++++++++++-- lib/portage/util/futures/executor/fork.py | 21 ++++++++++++++++++--- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/lib/_emerge/AsynchronousTask.py b/lib/_emerge/AsynchronousTask.py index d1e23cdf1..1e9e177cb 100644 --- a/lib/_emerge/AsynchronousTask.py +++ b/lib/_emerge/AsynchronousTask.py @@ -25,8 +25,19 @@ class AsynchronousTask(SlotObject): @coroutine def async_start(self): - yield self._async_start() - self._start_hook() + try: + if self._was_cancelled(): + raise asyncio.CancelledError + yield self._async_start() + if self._was_cancelled(): + raise asyncio.CancelledError + except asyncio.CancelledError: + self.cancel() + self._was_cancelled() + self._async_wait() + raise + finally: + self._start_hook() @coroutine def _async_start(self): diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py index add7b3c9e..3549fdb31 100644 --- a/lib/portage/util/futures/executor/fork.py +++ b/lib/portage/util/futures/executor/fork.py @@ -13,6 +13,7 @@ import traceback from portage.util._async.AsyncFunction import AsyncFunction from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine from portage.util.cpuinfo import get_cpu_count @@ -51,11 +52,25 @@ class ForkExecutor(object): while (not self._shutdown and self._submit_queue and len(self._running_tasks) < self._max_workers): future, proc = self._submit_queue.popleft() - future.add_done_callback(functools.partial(self._cancel_cb, proc)) - proc.addExitListener(functools.partial(self._proc_exit, future)) proc.scheduler = self._loop - proc.start() self._running_tasks[id(proc)] = proc + future.add_done_callback(functools.partial(self._cancel_cb, proc)) + proc_future = asyncio.ensure_future(self._proc_coroutine(proc), loop=self._loop) + proc_future.add_done_callback(functools.partial(self._proc_coroutine_done, future, proc)) + + @coroutine + def _proc_coroutine(self, proc): + yield proc.async_start() + yield proc.async_wait() + + def _proc_coroutine_done(self, future, proc, proc_future): + try: + proc_future.result() + except asyncio.CancelledError: + future.done() or future.cancel() + if proc.poll() is None: + proc.cancel() + self._proc_exit(future, proc) def _cancel_cb(self, proc, future): if future.cancelled():