commit:     4df7a0a0c16c5ded65ad601d39840797b7704770
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Feb 23 21:44:58 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Feb 24 02:35:15 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=4df7a0a0

ForkExecutor: use async_start method

Also, fix AsynchronousTask.async_start to handle cancellation of the
_async_start coroutine, ensuring that start and exit listeners are
notified in this case (otherwise RetryForkExecutorTestCase will hang).

Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/_emerge/AsynchronousTask.py           | 15 +++++++++++++--
 lib/portage/util/futures/executor/fork.py | 21 ++++++++++++++++++---
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/lib/_emerge/AsynchronousTask.py b/lib/_emerge/AsynchronousTask.py
index d1e23cdf1..1e9e177cb 100644
--- a/lib/_emerge/AsynchronousTask.py
+++ b/lib/_emerge/AsynchronousTask.py
@@ -25,8 +25,19 @@ class AsynchronousTask(SlotObject):
 
        @coroutine
        def async_start(self):
-               yield self._async_start()
-               self._start_hook()
+               try:
+                       if self._was_cancelled():
+                               raise asyncio.CancelledError
+                       yield self._async_start()
+                       if self._was_cancelled():
+                               raise asyncio.CancelledError
+               except asyncio.CancelledError:
+                       self.cancel()
+                       self._was_cancelled()
+                       self._async_wait()
+                       raise
+               finally:
+                       self._start_hook()
 
        @coroutine
        def _async_start(self):

diff --git a/lib/portage/util/futures/executor/fork.py 
b/lib/portage/util/futures/executor/fork.py
index add7b3c9e..3549fdb31 100644
--- a/lib/portage/util/futures/executor/fork.py
+++ b/lib/portage/util/futures/executor/fork.py
@@ -13,6 +13,7 @@ import traceback
 
 from portage.util._async.AsyncFunction import AsyncFunction
 from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
 from portage.util.cpuinfo import get_cpu_count
 
 
@@ -51,11 +52,25 @@ class ForkExecutor(object):
                while (not self._shutdown and self._submit_queue and
                        len(self._running_tasks) < self._max_workers):
                        future, proc = self._submit_queue.popleft()
-                       
future.add_done_callback(functools.partial(self._cancel_cb, proc))
-                       proc.addExitListener(functools.partial(self._proc_exit, 
future))
                        proc.scheduler = self._loop
-                       proc.start()
                        self._running_tasks[id(proc)] = proc
+                       
future.add_done_callback(functools.partial(self._cancel_cb, proc))
+                       proc_future = 
asyncio.ensure_future(self._proc_coroutine(proc), loop=self._loop)
+                       
proc_future.add_done_callback(functools.partial(self._proc_coroutine_done, 
future, proc))
+
+       @coroutine
+       def _proc_coroutine(self, proc):
+               yield proc.async_start()
+               yield proc.async_wait()
+
+       def _proc_coroutine_done(self, future, proc, proc_future):
+               try:
+                       proc_future.result()
+               except asyncio.CancelledError:
+                       future.done() or future.cancel()
+                       if proc.poll() is None:
+                               proc.cancel()
+               self._proc_exit(future, proc)
 
        def _cancel_cb(self, proc, future):
                if future.cancelled():

Reply via email to