Charles-François Natali added the comment:
> 1. Extends an abstract interface to support of a priority
I'm not sure I see the use case for priority support, do you have a
sample use case?
Furthermore, the executor is backed by a thread pool, so tasks can be
run concurrently.
Finally, the ordering is based on real-clock time - not user-provided
timefunc and delayfunc: so the probability of having identical
scheduled time (priority is only used for ties) is virtually 0.
> and absolute time.
If you want and absolute time, you can simply do:
p.schedule(abstime - time(), fn, args)
I don't see the need to complicate the API.
> 2. Subclass sched.scheduler from this interface and implement missing methods.
There again, I don't see the need.
The goal of ScheduledExecutor is to be consistent with the Executor
interface and futures, not being backward-compatible with the sched
module.
Also, the sched module simply can't support some operations: for
example, it's impossible to have the schedule.run() method wake up
when a new event with a deadline easiest than the current one is
inserted.
Really, there is now reason to make it compatible or similar to the
sched module: this will - it it gets accepted - effectively deprecate
threading.Timer and the sched module (except that the later supports
user-provided time and delay functions, I don't know how often those
are used).
----------
Added file: http://bugs.python.org/file30233/scheduled-3.diff
_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue17956>
_______________________________________
diff -r 4b3238923b01 Lib/concurrent/futures/__init__.py
--- a/Lib/concurrent/futures/__init__.py Fri May 10 19:57:44 2013 -0700
+++ b/Lib/concurrent/futures/__init__.py Sun May 12 18:49:56 2013 +0200
@@ -15,4 +15,5 @@
wait,
as_completed)
from concurrent.futures.process import ProcessPoolExecutor
-from concurrent.futures.thread import ThreadPoolExecutor
+from concurrent.futures.thread import (ScheduledThreadPoolExecutor,
+ ThreadPoolExecutor)
diff -r 4b3238923b01 Lib/concurrent/futures/_base.py
--- a/Lib/concurrent/futures/_base.py Fri May 10 19:57:44 2013 -0700
+++ b/Lib/concurrent/futures/_base.py Sun May 12 18:49:56 2013 +0200
@@ -4,6 +4,7 @@
__author__ = 'Brian Quinlan (br...@sweetapp.com)'
import collections
+import functools
import logging
import threading
import time
@@ -499,6 +500,87 @@
self._condition.notify_all()
self._invoke_callbacks()
+
+@functools.total_ordering
+class ScheduledFuture(Future):
+ """A future whose execution can be delayed, one-shot or periodic.
+
+ ScheduledFutures support mutual ordering based on their scheduled execution
+ time, which makes them easy to use in e.g. priority queues.
+ """
+
+ def __init__(self, init_time, period=0, delay=0):
+ """Initializes the future. Should not be called by clients.
+
+ Args:
+ init_time: The initial absolute execution time, in seconds since
+ epoch.
+ period: The execution period, in seconds.
+ delay: The execution delay, in seconds.
+
+ If a period is given, then the task will be scheduled every `period`
+ seconds: each execution will start `period` seconds after the starting
+ time of the previous execution: there will be no drift incurred by the
+ amount of time the task takes to execute.
+ Conversely, if a delay is given, then there will always be `delay`
+ seconds between sequential executions: each execution will start
+ `delay` seconds after the ending time of the previous execution: there
+ will be a drift incurred by the amount of time the task takes to
+ execute.
+ If neither a period nor a delay are specified, then the execution will
+ occur exactly once (one-shot).
+
+ The future's result can only be retrieved for one-shot execution,
+ since there's no meaningful result for periodic execution (for those,
+ calling result()/exception() will block until the future is cancelled).
+ """
+ super().__init__()
+ self._sched_time = init_time
+ if period > 0:
+ # step > 0 => fixed rate
+ self._step = period
+ elif delay > 0:
+ # step < 0 => fixed delay
+ self._step = -delay
+ else:
+ # step == 0 => one-shot
+ self._step = 0
+
+ def is_periodic(self):
+ """Return True if the future execution is periodic."""
+ return self._step != 0
+
+ def get_delay(self):
+ """Return the delay until the next scheduled execution, in seconds."""
+ with self._condition:
+ return self._sched_time - time.time()
+
+ def _get_sched_time(self):
+ with self._condition:
+ return self._sched_time
+
+ def rearm(self):
+ """Re-arm a periodic future, preparing it for its next execution.
+
+ Should only be used by Executor implementations and unit tests.
+ """
+ with self._condition:
+ self._state = PENDING
+ if self._step > 0:
+ self._sched_time += self._step
+ else:
+ self._sched_time = time.time() - self._step
+
+ def __eq__(self, other):
+ return self is other
+
+ def __lt__(self, other):
+ return self._get_sched_time() < other._get_sched_time()
+
+ def __hash__(self):
+ return id(self)
+
+
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
@@ -569,3 +651,48 @@
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
+
+
+class ScheduledExecutor(Executor):
+ """This is an abstract base class for concrete asynchronous scheduled
+ executors."""
+
+ def schedule(self, delay, fn, *args, **kwargs):
+ """Submits a callable to be executed with the given arguments after the
+ given delay.
+
+ Schedules the callable to be executed as fn(*args, **kwargs) and
returns
+ a ScheduledFuture instance representing the execution of the callable.
+
+ Returns:
+ A ScheduledFuture representing the given call.
+ """
+ raise NotImplementedError()
+
+ def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+ """Submits a callable to be executed with the given arguments after the
+ given delay, and then repeatedly at the given period. In other words,
+ each execution will start `period` seconds after the starting time of
+ the previous execution.
+
+ Schedules the callable to be executed as fn(*args, **kwargs) and
returns
+ a ScheduledFuture instance representing the execution of the callable.
+
+ Returns:
+ A ScheduledFuture representing the given call.
+ """
+ raise NotImplementedError()
+
+ def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+ """Submits a callable to be executed with the given arguments after the
+ given delay, and then repeatedly with the given delay between two
+ consecutive executions. In other words, each execution will start
+ `delay` seconds after the ending time of the previous execution.
+
+ Schedules the callable to be executed as fn(*args, **kwargs) and
returns
+ a ScheduledFuture instance representing the execution of the callable.
+
+ Returns:
+ A ScheduledFuture representing the given call.
+ """
+ raise NotImplementedError()
diff -r 4b3238923b01 Lib/concurrent/futures/thread.py
--- a/Lib/concurrent/futures/thread.py Fri May 10 19:57:44 2013 -0700
+++ b/Lib/concurrent/futures/thread.py Sun May 12 18:49:56 2013 +0200
@@ -7,8 +7,12 @@
import atexit
from concurrent.futures import _base
+import functools
+import heapq
import queue
+import sys
import threading
+import time
import weakref
# Workers are created as daemon threads. This is done to allow the interpreter
@@ -33,35 +37,120 @@
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
- q.put(None)
+ q.put(q.sentinel)
for t, q in items:
t.join()
atexit.register(_python_exit)
-class _WorkItem(object):
- def __init__(self, future, fn, args, kwargs):
- self.future = future
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
+class _WorkItem(_base.Future):
+
+ def __init__(self, fn, args, kwargs):
+ super().__init__()
+ self._fn = fn
+ self._args = args
+ self._kwargs = kwargs
def run(self):
- if not self.future.set_running_or_notify_cancel():
+ if not self.set_running_or_notify_cancel():
return
try:
- result = self.fn(*self.args, **self.kwargs)
+ result = self._fn(*self._args, **self._kwargs)
except BaseException as e:
- self.future.set_exception(e)
+ self.set_exception(e)
else:
- self.future.set_result(result)
+ self.set_result(result)
+
+
+class _WorkItemQueue(queue.Queue):
+
+ sentinel = None
+
+
+class _ScheduledWorkItem(_base.ScheduledFuture):
+
+ def __init__(self, pool, fn, args, kwargs, init_time, period=0, delay=0):
+ super().__init__(init_time, period, delay)
+ self._fn = fn
+ self._args = args
+ self._kwargs = kwargs
+ self._pool = pool
+
+ def run(self):
+ if not self.set_running_or_notify_cancel():
+ return
+
+ try:
+ result = self._fn(*self._args, **self._kwargs)
+ except BaseException as e:
+ self.set_exception(e)
+ else:
+ if self.is_periodic():
+ # rearm and reschedule ourselves
+ self.rearm()
+ try:
+ self._pool._schedule(self)
+ except RuntimeError:
+ # pool shut down
+ pass
+ else:
+ # we only set the result in case of one-shot
+ self.set_result(result)
+
+
+class _ScheduledWorkItemQueue:
+ """A wrapper around a priority queue, holding _ScheduledWorkItem ordered by
+ their scheduled execution time.
+ """
+
+ # the sentinel deadline must appear later than any other work deadline to
+ # let pending work finish (hence the sys.maxsize execution time), but it
+ # must be returned immediately when it's the head of the queue, see the
+ # get() method below
+ sentinel = _ScheduledWorkItem(None, None, None, None, sys.maxsize)
+
+ def __init__(self):
+ self._queue = []
+ self._available = threading.Condition()
+
+ def put(self, work):
+ with self._available:
+ if self._queue:
+ first = self._queue[0]
+ else:
+ first = None
+
+ heapq.heappush(self._queue, work)
+
+ if first is None or work < first or first.cancelled():
+ self._available.notify_all()
+
+ def get(self, block=True):
+ with self._available:
+ while True:
+ if not self._queue:
+ self._available.wait()
+ else:
+ first = self._queue[0]
+
+ delay = first.get_delay()
+
+ if (delay <= 0 or first is self.sentinel or
+ first.cancelled()):
+ heapq.heappop(self._queue)
+ if self._queue:
+ self._available.notify_all()
+ return first
+ else:
+ self._available.wait(delay)
+
def _worker(executor_reference, work_queue):
try:
while True:
work_item = work_queue.get(block=True)
- if work_item is not None:
+ if work_item is not work_queue.sentinel:
work_item.run()
# Delete references to object. See issue16284
del work_item
@@ -73,7 +162,7 @@
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
- work_queue.put(None)
+ work_queue.put(work_queue.sentinel)
return
del executor
except BaseException:
@@ -88,7 +177,7 @@
execute the given calls.
"""
self._max_workers = max_workers
- self._work_queue = queue.Queue()
+ self._work_queue = _WorkItemQueue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
@@ -98,19 +187,18 @@
if self._shutdown:
raise RuntimeError('cannot schedule new futures after
shutdown')
- f = _base.Future()
- w = _WorkItem(f, fn, args, kwargs)
+ w = _WorkItem(fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
- return f
+ return w
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
- q.put(None)
+ q.put(q.sentinel)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
@@ -125,8 +213,49 @@
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
- self._work_queue.put(None)
+ self._work_queue.put(self._work_queue.sentinel)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+
+class ScheduledThreadPoolExecutor(ThreadPoolExecutor):
+
+ def __init__(self, max_workers):
+ """Initializes a new ScheduledThreadPoolExecutor instance.
+
+ Args:
+ max_workers: The maximum number of threads that can be used to
+ execute the given calls.
+ """
+ super().__init__(max_workers)
+ self._work_queue = _ScheduledWorkItemQueue()
+
+ def submit(self, fn, *args, **kwargs):
+ return self.schedule(0, fn, *args, **kwargs)
+
+ def schedule(self, init_delay, fn, *args, **kwargs):
+ w = _ScheduledWorkItem(self, fn, args, kwargs, time.time() +
init_delay)
+ self._schedule(w)
+ return w
+
+ def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+ w = _ScheduledWorkItem(self, fn, args, kwargs, time.time() +
init_delay,
+ period=period)
+ self._schedule(w)
+ return w
+
+ def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+ w = _ScheduledWorkItem(self, fn, args, kwargs, time.time() +
init_delay,
+ delay=delay)
+ self._schedule(w)
+ return w
+
+ def _schedule(self, w):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after
shutdown')
+
+ self._work_queue.put(w)
+ self._adjust_thread_count()
diff -r 4b3238923b01 Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py Fri May 10 19:57:44 2013 -0700
+++ b/Lib/test/test_concurrent_futures.py Sun May 12 18:49:56 2013 +0200
@@ -11,6 +11,7 @@
from test.script_helper import assert_python_ok
+from collections import defaultdict
import sys
import threading
import time
@@ -58,6 +59,34 @@
pass
+class RefAccumulator:
+
+ """An helper class to record events."""
+
+ def __init__(self):
+ self._result = defaultdict(list)
+ self._lock = threading.Lock()
+
+ def add(self, key, delay):
+ with self._lock:
+ self._result[key].append(delay)
+
+ def result(self):
+ with self._lock:
+ return self._result
+
+
+class TimedAccumulator(RefAccumulator):
+
+ def __init__(self):
+ super().__init__()
+ self._init_time = time.time()
+
+ def add(self, key):
+ with self._lock:
+ self._result[key].append(time.time() - self._init_time)
+
+
class ExecutorMixin:
worker_count = 5
@@ -85,11 +114,26 @@
for f in futures:
f.result()
+ def assertDelay(self, actual, expected):
+ # The waiting and/or time.time() can be imprecise, which
+ # is why comparing to the expected value would sometimes fail
+ # (especially under Windows).
+ # account for 0 timeout
+ expected = max(expected, 0.01)
+ actual = max(actual, 0.01)
+ self.assertGreaterEqual(actual, expected * 0.6)
+ # Test nothing insane happened
+ self.assertLess(actual, expected * 10.0)
+
class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor
+class ScheduledThreadPoolMixin(ExecutorMixin):
+ executor_type = futures.ScheduledThreadPoolExecutor
+
+
class ProcessPoolMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
@@ -122,10 +166,7 @@
f.result()
-class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
- def _prime_executor(self):
- pass
-
+class BaseThreadPoolShutdownTest(ExecutorShutdownTest):
def test_threads_terminate(self):
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
@@ -153,6 +194,17 @@
for t in threads:
t.join()
+class ThreadPoolShutdownTest(ThreadPoolMixin, BaseThreadPoolShutdownTest):
+ def _prime_executor(self):
+ pass
+
+
+class ScheduledThreadPoolShutdownTest(ScheduledThreadPoolMixin,
+ BaseThreadPoolShutdownTest):
+ def _prime_executor(self):
+ pass
+
+
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
def _prime_executor(self):
@@ -291,7 +343,7 @@
self.assertEqual(set([future2]), pending)
-class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+class BaseThreadPoolWaitTests(WaitTests):
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
@@ -309,6 +361,12 @@
sys.setswitchinterval(oldswitchinterval)
+class ThreadPoolWaitTests(ThreadPoolMixin, BaseThreadPoolWaitTests):
+ pass
+
+class ScheduledThreadPoolWaitTests(ScheduledThreadPoolMixin,
BaseThreadPoolWaitTests):
+ pass
+
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
pass
@@ -355,6 +413,10 @@
pass
+class ScheduledThreadPoolAsCompletedTests(ScheduledThreadPoolMixin,
+ AsCompletedTests):
+ pass
+
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
pass
@@ -445,6 +507,143 @@
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
+class ScheduledThreadPoolExecutorTest(ScheduledThreadPoolMixin,
+ unittest.TestCase):
+
+ # some of the tests below create many events, bump the number of workers to
+ # keep up
+ worker_count = 10
+
+ def compare_accumulators(self, actual, expected, strict=True):
+ expected = expected.result()
+ actual = actual.result()
+
+ self.assertEqual(actual.keys(), expected.keys())
+ for key in actual:
+ expected_delays = sorted(expected[key])
+ actual_delays = sorted(actual[key])
+ if strict:
+ # in strict mode, the number of events must match exactly for
+ # each key
+ tolerance = 0
+ else:
+ # otherwise, we tolerate an offset, due to timing and rounding
+ # errors: this is useful for timing-dependent tests
+ tolerance = 1
+
+ self.assertLessEqual(abs(len(actual_delays) -
len(expected_delays)),
+ tolerance)
+ for actual_delay, expected_delay in zip(actual_delays,
+ expected_delays):
+ self.assertDelay(actual_delay, expected_delay)
+
+ def test_schedule(self):
+ acc = TimedAccumulator()
+ expected = RefAccumulator()
+ NB = 100
+
+ for i in range(NB):
+ self.executor.schedule(i/NB, acc.add, i)
+ expected.add(i, i/NB)
+
+ self.executor.shutdown(wait=True)
+ self.compare_accumulators(acc, expected)
+
+ def test_schedule_result(self):
+ expected = {}
+ NB = 100
+
+ for i in range(NB):
+ f = self.executor.schedule(i/NB, lambda i: i, i)
+ expected[f] = i
+
+ self.executor.shutdown(wait=True)
+ self.assertEqual(expected, dict((f, f.result()) for f in expected))
+
+ def test_schedule_fixed_rate(self):
+ acc = TimedAccumulator()
+ expected = RefAccumulator()
+ NB = 10
+ RUN_TIME = 2
+
+ for i in range(1, NB+1):
+ self.executor.schedule_fixed_rate(0, RUN_TIME * i/NB, acc.add, i)
+ # let's compute future dealines during the RUN_TIME period
+ for n in range(int(NB/i) + 2):
+ expected.add(i, n*RUN_TIME*i/NB)
+
+ time.sleep(RUN_TIME)
+
+ self.executor.shutdown(wait=True)
+ self.compare_accumulators(acc, expected, False)
+
+ def test_schedule_fixed_delay(self):
+ acc = TimedAccumulator()
+ expected = RefAccumulator()
+ NB = 10
+ RUN_TIME = 2
+
+ for i in range(1, NB+1):
+ self.executor.schedule_fixed_delay(0, RUN_TIME*i/NB, acc.add, i)
+ # let's compute future dealines during the RUN_TIME period
+ for n in range(int(NB/i)+2):
+ expected.add(i, n*RUN_TIME*i/NB)
+
+ time.sleep(RUN_TIME)
+
+ self.executor.shutdown(wait=True)
+ self.compare_accumulators(acc, expected, False)
+
+ def test_schedule_fixed_delay_drift(self):
+ acc = TimedAccumulator()
+ expected = RefAccumulator()
+ NB = 10
+ RUN_TIME = 2
+ DRIFT = 0.1
+
+ def delayed_add(i):
+ time.sleep(DRIFT)
+ acc.add(i)
+
+ for i in range(1, NB+1):
+ self.executor.schedule_fixed_delay(0, RUN_TIME*i/NB, delayed_add,
i)
+ # let's compute future dealines during the RUN_TIME period, taking
+ # DRIFT drit into account
+ for nb in range(int(RUN_TIME/(RUN_TIME*i/NB + DRIFT))+2):
+ expected.add(i, nb*(RUN_TIME*i/NB+DRIFT)+DRIFT)
+
+ time.sleep(RUN_TIME)
+
+ self.executor.shutdown(wait=True)
+ self.compare_accumulators(acc, expected, False)
+
+ def test_cancel(self):
+ acc = TimedAccumulator()
+ expected = RefAccumulator()
+ NB = 100
+ futures = []
+ RUN_TIME = 5
+
+ for _ in range(0, NB, 3):
+ f = self.executor.schedule(0.1, acc.add, 1)
+ futures.append(f)
+ expected.add(1, 0.1)
+ f = self.executor.schedule_fixed_rate(0.2, 1000, acc.add, 2)
+ futures.append(f)
+ expected.add(2, 0.2)
+ f = self.executor.schedule_fixed_delay(0.3, 1000, acc.add, 3)
+ futures.append(f)
+ expected.add(3, 0.3)
+
+ time.sleep(RUN_TIME)
+
+ for f in futures:
+ f.cancel()
+
+ self.executor.shutdown(wait=True)
+ self.compare_accumulators(acc, expected, False)
+
+
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
callback_result = None
@@ -674,11 +873,15 @@
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
+ ScheduledThreadPoolWaitTests,
ProcessPoolAsCompletedTests,
ThreadPoolAsCompletedTests,
+ ScheduledThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest,
+ ScheduledThreadPoolShutdownTest,
+ ScheduledThreadPoolExecutorTest,
)
finally:
test.support.reap_children()
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com