commit: 86400e9f864e86f8f677ccda9ce4103d6d02ef87
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Mar 21 06:56:55 2017 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Mar 24 20:32:25 2017 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=86400e9f
PollScheduler: terminate via call_soon for asyncio compat
Use call_soon to schedule the _termination_check callback when needed.
The previous idle_add usage was relatively inefficient, because it
scheduled the _termination_check callback to be called in every
iteration of the event loop.
Add a _cleanup method to handle cleanup of callbacks registered with
the global event loop. Since the terminate method is thread safe and it
interacts with self._term_callback_handle, use this variable only while
holding a lock.
pym/_emerge/PollScheduler.py | 57 +++++++++++++++++++++++--------
pym/_emerge/Scheduler.py | 7 ++--
pym/portage/util/_async/AsyncScheduler.py | 16 ++++-----
3 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index b118ac157..569879b36 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -25,8 +25,10 @@ class PollScheduler(object):
a non-main thread)
@type main: bool
"""
+ self._term_rlock = threading.RLock()
self._terminated = threading.Event()
self._terminated_tasks = False
+ self._term_check_handle = None
self._max_jobs = 1
self._max_load = None
self._scheduling = False
@@ -44,6 +46,21 @@ class PollScheduler(object):
def _is_background(self):
return self._background
+ def _cleanup(self):
+ """
+ Cleanup any callbacks that have been registered with the global
+ event loop.
+ """
+ # The self._term_check_handle attribute requires locking
+ # since it's modified by the thread safe terminate method.
+ with self._term_rlock:
+ if self._term_check_handle not in (None, False):
+ self._term_check_handle.cancel()
+ # This prevents the terminate method from scheduling
+ # any more callbacks (since _cleanup must eliminate all
+ # callbacks in order to ensure complete cleanup).
+ self._term_check_handle = False
+
def terminate(self):
"""
Schedules asynchronous, graceful termination of the scheduler
@@ -51,26 +68,36 @@ class PollScheduler(object):
This method is thread-safe (and safe for signal handlers).
"""
- self._terminated.set()
+ with self._term_rlock:
+ if self._term_check_handle is None:
+ self._terminated.set()
+ self._term_check_handle =
self._event_loop.call_soon_threadsafe(
+ self._termination_check, True)
- def _termination_check(self):
+ def _termination_check(self, retry=False):
"""
Calls _terminate_tasks() if appropriate. It's guaranteed not to
- call it while _schedule_tasks() is being called. The check
should
- be executed for each iteration of the event loop, for response
to
- termination signals at the earliest opportunity. It always
returns
- True, for continuous scheduling via idle_add.
+ call it while _schedule_tasks() is being called. This method
must
+ only be called via the event loop thread.
+
+ @param retry: If True then reschedule if scheduling state
prevents
+ immediate termination.
+ @type retry: bool
"""
- if not self._scheduling and \
- self._terminated.is_set() and \
+ if self._terminated.is_set() and \
not self._terminated_tasks:
- self._scheduling = True
- try:
- self._terminated_tasks = True
- self._terminate_tasks()
- finally:
- self._scheduling = False
- return True
+ if not self._scheduling:
+ self._scheduling = True
+ try:
+ self._terminated_tasks = True
+ self._terminate_tasks()
+ finally:
+ self._scheduling = False
+
+ elif retry:
+ with self._term_rlock:
+ self._term_check_handle =
self._event_loop.call_soon(
+ self._termination_check, True)
def _terminate_tasks(self):
"""
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 71fe75f62..58ff97139 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
else:
signal.signal(signal.SIGCONT,
signal.SIG_DFL)
+ self._termination_check()
if received_signal:
sys.exit(received_signal[0])
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
if isinstance(x, Package) and x.operation ==
"merge"])
self._status_display.maxval = self._pkg_count.maxval
+ # Cleanup any callbacks that have been registered with the
global
+ # event loop by calls to the terminate method.
+ self._cleanup()
+
self._logger.log(" *** Finished. Cleaning up...")
if failed_pkgs:
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
blocker_db.discardBlocker(pkg)
def _main_loop(self):
- term_check_id =
self._event_loop.idle_add(self._termination_check)
loadavg_check_id = None
if self._max_load is not None and \
self._loadavg_latency is not None and \
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
while self._is_work_scheduled():
self._event_loop.iteration()
finally:
- self._event_loop.source_remove(term_check_id)
if loadavg_check_id is not None:
self._event_loop.source_remove(loadavg_check_id)
diff --git a/pym/portage/util/_async/AsyncScheduler.py
b/pym/portage/util/_async/AsyncScheduler.py
index 9b96c6f36..3deb6cb04 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._error_count = 0
self._running_tasks = set()
self._remaining_tasks = True
- self._term_check_id = None
self._loadavg_check_id = None
def _poll(self):
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._schedule()
def _start(self):
- self._term_check_id =
self._event_loop.idle_add(self._termination_check)
if self._max_load is not None and \
self._loadavg_latency is not None and \
(self._max_jobs is True or self._max_jobs > 1):
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._loadavg_latency, self._schedule)
self._schedule()
+ def _cleanup(self):
+ super(AsyncScheduler, self)._cleanup()
+ if self._loadavg_check_id is not None:
+ self._event_loop.source_remove(self._loadavg_check_id)
+ self._loadavg_check_id = None
+
def _wait(self):
# Loop while there are jobs to be scheduled.
while self._keep_scheduling():
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
while self._is_work_scheduled():
self._event_loop.iteration()
- if self._term_check_id is not None:
- self._event_loop.source_remove(self._term_check_id)
- self._term_check_id = None
-
- if self._loadavg_check_id is not None:
- self._event_loop.source_remove(self._loadavg_check_id)
- self._loadavg_check_id = None
+ self._cleanup()
if self._error_count > 0:
self.returncode = 1