Charles-François Natali added the comment:

> 1. Extends an abstract interface to support of a priority

I'm not sure I see the use case for priority support, do you have a
sample use case?

Furthermore, the executor is backed by a thread pool, so tasks can be
run concurrently.

Finally, the ordering is based on real-clock time - not user-provided
timefunc and delayfunc: so the probability of having identical
scheduled time (priority is only used for ties) is virtually 0.

>  and absolute time.

If you want and absolute time, you can simply do:

p.schedule(abstime - time(), fn, args)

I don't see the need to complicate the API.

> 2. Subclass sched.scheduler from this interface and implement missing methods.

There again, I don't see the need.
The goal of ScheduledExecutor is to be consistent with the Executor
interface and futures, not being backward-compatible with the sched
module.

Also, the sched module simply can't support some operations: for
example, it's impossible to have the schedule.run() method wake up
when a new event with a deadline easiest than the current one is
inserted.

Really, there is now reason to make it compatible or similar to the
sched module: this will - it it gets accepted - effectively deprecate
threading.Timer and the sched module (except that the later supports
user-provided time and delay functions, I don't know how often those
are used).

----------
Added file: http://bugs.python.org/file30233/scheduled-3.diff

_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue17956>
_______________________________________
diff -r 4b3238923b01 Lib/concurrent/futures/__init__.py
--- a/Lib/concurrent/futures/__init__.py        Fri May 10 19:57:44 2013 -0700
+++ b/Lib/concurrent/futures/__init__.py        Sun May 12 18:49:56 2013 +0200
@@ -15,4 +15,5 @@
                                       wait,
                                       as_completed)
 from concurrent.futures.process import ProcessPoolExecutor
-from concurrent.futures.thread import ThreadPoolExecutor
+from concurrent.futures.thread import (ScheduledThreadPoolExecutor,
+                                       ThreadPoolExecutor)
diff -r 4b3238923b01 Lib/concurrent/futures/_base.py
--- a/Lib/concurrent/futures/_base.py   Fri May 10 19:57:44 2013 -0700
+++ b/Lib/concurrent/futures/_base.py   Sun May 12 18:49:56 2013 +0200
@@ -4,6 +4,7 @@
 __author__ = 'Brian Quinlan (br...@sweetapp.com)'
 
 import collections
+import functools
 import logging
 import threading
 import time
@@ -499,6 +500,87 @@
             self._condition.notify_all()
         self._invoke_callbacks()
 
+
+@functools.total_ordering
+class ScheduledFuture(Future):
+    """A future whose execution can be delayed, one-shot or periodic.
+
+    ScheduledFutures support mutual ordering based on their scheduled execution
+    time, which makes them easy to use in e.g. priority queues.
+    """
+
+    def __init__(self, init_time, period=0, delay=0):
+        """Initializes the future. Should not be called by clients.
+
+        Args:
+            init_time: The initial absolute execution time, in seconds since
+                       epoch.
+            period:    The execution period, in seconds.
+            delay:     The execution delay, in seconds.
+
+        If a period is given, then the task will be scheduled every `period`
+        seconds: each execution will start `period` seconds after the starting
+        time of the previous execution: there will be no drift incurred by the
+        amount of time the task takes to execute.
+        Conversely, if a delay is given, then there will always be `delay`
+        seconds between sequential executions: each execution will start
+        `delay` seconds after the ending time of the previous execution: there
+        will be a drift incurred by the amount of time the task takes to
+        execute.
+        If neither a period nor a delay are specified, then the execution will
+        occur exactly once (one-shot).
+
+        The future's result can only be retrieved for one-shot execution,
+        since there's no meaningful result for periodic execution (for those,
+        calling result()/exception() will block until the future is cancelled).
+        """
+        super().__init__()
+        self._sched_time = init_time
+        if period > 0:
+            # step > 0 => fixed rate
+            self._step = period
+        elif delay > 0:
+            # step < 0 => fixed delay
+            self._step = -delay
+        else:
+            # step == 0 => one-shot
+            self._step = 0
+
+    def is_periodic(self):
+        """Return True if the future execution is periodic."""
+        return self._step != 0
+
+    def get_delay(self):
+        """Return the delay until the next scheduled execution, in seconds."""
+        with self._condition:
+            return self._sched_time - time.time()
+
+    def _get_sched_time(self):
+        with self._condition:
+            return self._sched_time
+
+    def rearm(self):
+        """Re-arm a periodic future, preparing it for its next execution.
+
+        Should only be used by Executor implementations and unit tests.
+        """
+        with self._condition:
+            self._state = PENDING
+            if self._step > 0:
+                self._sched_time += self._step
+            else:
+                self._sched_time = time.time() - self._step
+
+    def __eq__(self, other):
+        return self is other
+
+    def __lt__(self, other):
+        return self._get_sched_time() < other._get_sched_time()
+
+    def __hash__(self):
+        return id(self)
+
+
 class Executor(object):
     """This is an abstract base class for concrete asynchronous executors."""
 
@@ -569,3 +651,48 @@
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.shutdown(wait=True)
         return False
+
+
+class ScheduledExecutor(Executor):
+    """This is an abstract base class for concrete asynchronous scheduled
+    executors."""
+
+    def schedule(self, delay, fn, *args, **kwargs):
+        """Submits a callable to be executed with the given arguments after the
+        given delay.
+
+        Schedules the callable to be executed as fn(*args, **kwargs) and 
returns
+        a ScheduledFuture instance representing the execution of the callable.
+
+        Returns:
+            A ScheduledFuture representing the given call.
+        """
+        raise NotImplementedError()
+
+    def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+        """Submits a callable to be executed with the given arguments after the
+        given delay, and then repeatedly at the given period. In other words,
+        each execution will start `period` seconds after the starting time of
+        the previous execution.
+
+        Schedules the callable to be executed as fn(*args, **kwargs) and 
returns
+        a ScheduledFuture instance representing the execution of the callable.
+
+        Returns:
+            A ScheduledFuture representing the given call.
+        """
+        raise NotImplementedError()
+
+    def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+        """Submits a callable to be executed with the given arguments after the
+        given delay, and then repeatedly with the given delay between two
+        consecutive executions. In other words, each execution will start
+        `delay` seconds after the ending time of the previous execution.
+
+        Schedules the callable to be executed as fn(*args, **kwargs) and 
returns
+        a ScheduledFuture instance representing the execution of the callable.
+
+        Returns:
+            A ScheduledFuture representing the given call.
+        """
+        raise NotImplementedError()
diff -r 4b3238923b01 Lib/concurrent/futures/thread.py
--- a/Lib/concurrent/futures/thread.py  Fri May 10 19:57:44 2013 -0700
+++ b/Lib/concurrent/futures/thread.py  Sun May 12 18:49:56 2013 +0200
@@ -7,8 +7,12 @@
 
 import atexit
 from concurrent.futures import _base
+import functools
+import heapq
 import queue
+import sys
 import threading
+import time
 import weakref
 
 # Workers are created as daemon threads. This is done to allow the interpreter
@@ -33,35 +37,120 @@
     _shutdown = True
     items = list(_threads_queues.items())
     for t, q in items:
-        q.put(None)
+        q.put(q.sentinel)
     for t, q in items:
         t.join()
 
 atexit.register(_python_exit)
 
-class _WorkItem(object):
-    def __init__(self, future, fn, args, kwargs):
-        self.future = future
-        self.fn = fn
-        self.args = args
-        self.kwargs = kwargs
+class _WorkItem(_base.Future):
+
+    def __init__(self, fn, args, kwargs):
+        super().__init__()
+        self._fn = fn
+        self._args = args
+        self._kwargs = kwargs
 
     def run(self):
-        if not self.future.set_running_or_notify_cancel():
+        if not self.set_running_or_notify_cancel():
             return
 
         try:
-            result = self.fn(*self.args, **self.kwargs)
+            result = self._fn(*self._args, **self._kwargs)
         except BaseException as e:
-            self.future.set_exception(e)
+            self.set_exception(e)
         else:
-            self.future.set_result(result)
+            self.set_result(result)
+
+
+class _WorkItemQueue(queue.Queue):
+
+    sentinel = None
+
+
+class _ScheduledWorkItem(_base.ScheduledFuture):
+
+    def __init__(self, pool, fn, args, kwargs, init_time, period=0, delay=0):
+        super().__init__(init_time, period, delay)
+        self._fn = fn
+        self._args = args
+        self._kwargs = kwargs
+        self._pool = pool
+
+    def run(self):
+        if not self.set_running_or_notify_cancel():
+            return
+
+        try:
+            result = self._fn(*self._args, **self._kwargs)
+        except BaseException as e:
+            self.set_exception(e)
+        else:
+            if self.is_periodic():
+                # rearm and reschedule ourselves
+                self.rearm()
+                try:
+                    self._pool._schedule(self)
+                except RuntimeError:
+                    # pool shut down
+                    pass
+            else:
+                # we only set the result in case of one-shot
+                self.set_result(result)
+
+
+class _ScheduledWorkItemQueue:
+    """A wrapper around a priority queue, holding _ScheduledWorkItem ordered by
+    their scheduled execution time.
+    """
+
+    # the sentinel deadline must appear later than any other work deadline to
+    # let pending work finish (hence the sys.maxsize execution time), but it
+    # must be returned immediately when it's the head of the queue, see the
+    # get() method below
+    sentinel = _ScheduledWorkItem(None, None, None, None, sys.maxsize)
+
+    def __init__(self):
+        self._queue = []
+        self._available = threading.Condition()
+
+    def put(self, work):
+        with self._available:
+            if self._queue:
+                first = self._queue[0]
+            else:
+                first = None
+
+            heapq.heappush(self._queue, work)
+
+            if first is None or work < first or first.cancelled():
+                self._available.notify_all()
+
+    def get(self, block=True):
+        with self._available:
+            while True:
+                if not self._queue:
+                    self._available.wait()
+                else:
+                    first = self._queue[0]
+
+                    delay = first.get_delay()
+
+                    if (delay <= 0 or first is self.sentinel or
+                        first.cancelled()):
+                        heapq.heappop(self._queue)
+                        if self._queue:
+                            self._available.notify_all()
+                        return first
+                    else:
+                        self._available.wait(delay)
+
 
 def _worker(executor_reference, work_queue):
     try:
         while True:
             work_item = work_queue.get(block=True)
-            if work_item is not None:
+            if work_item is not work_queue.sentinel:
                 work_item.run()
                 # Delete references to object. See issue16284
                 del work_item
@@ -73,7 +162,7 @@
             #   - The executor that owns the worker has been shutdown.
             if _shutdown or executor is None or executor._shutdown:
                 # Notice other workers
-                work_queue.put(None)
+                work_queue.put(work_queue.sentinel)
                 return
             del executor
     except BaseException:
@@ -88,7 +177,7 @@
                 execute the given calls.
         """
         self._max_workers = max_workers
-        self._work_queue = queue.Queue()
+        self._work_queue = _WorkItemQueue()
         self._threads = set()
         self._shutdown = False
         self._shutdown_lock = threading.Lock()
@@ -98,19 +187,18 @@
             if self._shutdown:
                 raise RuntimeError('cannot schedule new futures after 
shutdown')
 
-            f = _base.Future()
-            w = _WorkItem(f, fn, args, kwargs)
+            w = _WorkItem(fn, args, kwargs)
 
             self._work_queue.put(w)
             self._adjust_thread_count()
-            return f
+            return w
     submit.__doc__ = _base.Executor.submit.__doc__
 
     def _adjust_thread_count(self):
         # When the executor gets lost, the weakref callback will wake up
         # the worker threads.
         def weakref_cb(_, q=self._work_queue):
-            q.put(None)
+            q.put(q.sentinel)
         # TODO(bquinlan): Should avoid creating new threads if there are more
         # idle threads than items in the work queue.
         if len(self._threads) < self._max_workers:
@@ -125,8 +213,49 @@
     def shutdown(self, wait=True):
         with self._shutdown_lock:
             self._shutdown = True
-            self._work_queue.put(None)
+            self._work_queue.put(self._work_queue.sentinel)
         if wait:
             for t in self._threads:
                 t.join()
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+
+class ScheduledThreadPoolExecutor(ThreadPoolExecutor):
+
+    def __init__(self, max_workers):
+        """Initializes a new ScheduledThreadPoolExecutor instance.
+
+        Args:
+            max_workers: The maximum number of threads that can be used to
+                         execute the given calls.
+        """
+        super().__init__(max_workers)
+        self._work_queue = _ScheduledWorkItemQueue()
+
+    def submit(self, fn, *args, **kwargs):
+        return self.schedule(0, fn, *args, **kwargs)
+
+    def schedule(self, init_delay, fn, *args, **kwargs):
+        w = _ScheduledWorkItem(self, fn, args, kwargs, time.time() + 
init_delay)
+        self._schedule(w)
+        return w
+
+    def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+        w = _ScheduledWorkItem(self, fn, args, kwargs, time.time() + 
init_delay,
+                               period=period)
+        self._schedule(w)
+        return w
+
+    def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+        w = _ScheduledWorkItem(self, fn, args, kwargs, time.time() + 
init_delay,
+                               delay=delay)
+        self._schedule(w)
+        return w
+
+    def _schedule(self, w):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after 
shutdown')
+
+            self._work_queue.put(w)
+            self._adjust_thread_count()
diff -r 4b3238923b01 Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py       Fri May 10 19:57:44 2013 -0700
+++ b/Lib/test/test_concurrent_futures.py       Sun May 12 18:49:56 2013 +0200
@@ -11,6 +11,7 @@
 
 from test.script_helper import assert_python_ok
 
+from collections import defaultdict
 import sys
 import threading
 import time
@@ -58,6 +59,34 @@
         pass
 
 
+class RefAccumulator:
+
+    """An helper class to record events."""
+
+    def __init__(self):
+        self._result = defaultdict(list)
+        self._lock = threading.Lock()
+
+    def add(self, key, delay):
+        with self._lock:
+            self._result[key].append(delay)
+
+    def result(self):
+        with self._lock:
+            return self._result
+
+
+class TimedAccumulator(RefAccumulator):
+
+    def __init__(self):
+        super().__init__()
+        self._init_time = time.time()
+
+    def add(self, key):
+        with self._lock:
+            self._result[key].append(time.time() - self._init_time)
+
+
 class ExecutorMixin:
     worker_count = 5
 
@@ -85,11 +114,26 @@
         for f in futures:
             f.result()
 
+    def assertDelay(self, actual, expected):
+        # The waiting and/or time.time() can be imprecise, which
+        # is why comparing to the expected value would sometimes fail
+        # (especially under Windows).
+        # account for 0 timeout
+        expected = max(expected, 0.01)
+        actual = max(actual, 0.01)
+        self.assertGreaterEqual(actual, expected * 0.6)
+        # Test nothing insane happened
+        self.assertLess(actual, expected * 10.0)
+
 
 class ThreadPoolMixin(ExecutorMixin):
     executor_type = futures.ThreadPoolExecutor
 
 
+class ScheduledThreadPoolMixin(ExecutorMixin):
+    executor_type = futures.ScheduledThreadPoolExecutor
+
+
 class ProcessPoolMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
 
@@ -122,10 +166,7 @@
             f.result()
 
 
-class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
-    def _prime_executor(self):
-        pass
-
+class BaseThreadPoolShutdownTest(ExecutorShutdownTest):
     def test_threads_terminate(self):
         self.executor.submit(mul, 21, 2)
         self.executor.submit(mul, 6, 7)
@@ -153,6 +194,17 @@
         for t in threads:
             t.join()
 
+class ThreadPoolShutdownTest(ThreadPoolMixin, BaseThreadPoolShutdownTest):
+    def _prime_executor(self):
+        pass
+
+
+class ScheduledThreadPoolShutdownTest(ScheduledThreadPoolMixin,
+                                      BaseThreadPoolShutdownTest):
+    def _prime_executor(self):
+        pass
+
+
 
 class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
     def _prime_executor(self):
@@ -291,7 +343,7 @@
         self.assertEqual(set([future2]), pending)
 
 
-class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+class BaseThreadPoolWaitTests(WaitTests):
 
     def test_pending_calls_race(self):
         # Issue #14406: multi-threaded race condition when waiting on all
@@ -309,6 +361,12 @@
             sys.setswitchinterval(oldswitchinterval)
 
 
+class ThreadPoolWaitTests(ThreadPoolMixin, BaseThreadPoolWaitTests):
+    pass
+
+class ScheduledThreadPoolWaitTests(ScheduledThreadPoolMixin, 
BaseThreadPoolWaitTests):
+    pass
+
 class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
     pass
 
@@ -355,6 +413,10 @@
     pass
 
 
+class ScheduledThreadPoolAsCompletedTests(ScheduledThreadPoolMixin,
+                                          AsCompletedTests):
+    pass
+
 class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
     pass
 
@@ -445,6 +507,143 @@
         self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 
 
+class ScheduledThreadPoolExecutorTest(ScheduledThreadPoolMixin,
+                                      unittest.TestCase):
+
+    # some of the tests below create many events, bump the number of workers to
+    # keep up
+    worker_count = 10
+
+    def compare_accumulators(self, actual, expected, strict=True):
+        expected = expected.result()
+        actual = actual.result()
+
+        self.assertEqual(actual.keys(), expected.keys())
+        for key in actual:
+            expected_delays = sorted(expected[key])
+            actual_delays = sorted(actual[key])
+            if strict:
+                # in strict mode, the number of events must match exactly for
+                # each key
+                tolerance = 0
+            else:
+                # otherwise, we tolerate an offset, due to timing and rounding
+                # errors: this is useful for timing-dependent tests
+                tolerance = 1
+
+            self.assertLessEqual(abs(len(actual_delays) - 
len(expected_delays)),
+                                 tolerance)
+            for actual_delay, expected_delay in zip(actual_delays,
+                                                    expected_delays):
+                self.assertDelay(actual_delay, expected_delay)
+
+    def test_schedule(self):
+        acc = TimedAccumulator()
+        expected = RefAccumulator()
+        NB = 100
+
+        for i in range(NB):
+            self.executor.schedule(i/NB, acc.add, i)
+            expected.add(i, i/NB)
+
+        self.executor.shutdown(wait=True)
+        self.compare_accumulators(acc, expected)
+
+    def test_schedule_result(self):
+        expected = {}
+        NB = 100
+
+        for i in range(NB):
+            f = self.executor.schedule(i/NB, lambda i: i, i)
+            expected[f] = i
+
+        self.executor.shutdown(wait=True)
+        self.assertEqual(expected, dict((f, f.result()) for f in expected))
+
+    def test_schedule_fixed_rate(self):
+        acc = TimedAccumulator()
+        expected = RefAccumulator()
+        NB = 10
+        RUN_TIME = 2
+
+        for i in range(1, NB+1):
+            self.executor.schedule_fixed_rate(0, RUN_TIME * i/NB, acc.add, i)
+            # let's compute future dealines during the RUN_TIME period
+            for n in range(int(NB/i) + 2):
+                expected.add(i, n*RUN_TIME*i/NB)
+
+        time.sleep(RUN_TIME)
+
+        self.executor.shutdown(wait=True)
+        self.compare_accumulators(acc, expected, False)
+
+    def test_schedule_fixed_delay(self):
+        acc = TimedAccumulator()
+        expected = RefAccumulator()
+        NB = 10
+        RUN_TIME = 2
+
+        for i in range(1, NB+1):
+            self.executor.schedule_fixed_delay(0, RUN_TIME*i/NB, acc.add, i)
+            # let's compute future dealines during the RUN_TIME period
+            for n in range(int(NB/i)+2):
+                expected.add(i, n*RUN_TIME*i/NB)
+
+        time.sleep(RUN_TIME)
+
+        self.executor.shutdown(wait=True)
+        self.compare_accumulators(acc, expected, False)
+
+    def test_schedule_fixed_delay_drift(self):
+        acc = TimedAccumulator()
+        expected = RefAccumulator()
+        NB = 10
+        RUN_TIME = 2
+        DRIFT = 0.1
+
+        def delayed_add(i):
+            time.sleep(DRIFT)
+            acc.add(i)
+
+        for i in range(1, NB+1):
+            self.executor.schedule_fixed_delay(0, RUN_TIME*i/NB, delayed_add, 
i)
+            # let's compute future dealines during the RUN_TIME period, taking
+            # DRIFT drit into account
+            for nb in range(int(RUN_TIME/(RUN_TIME*i/NB + DRIFT))+2):
+                expected.add(i, nb*(RUN_TIME*i/NB+DRIFT)+DRIFT)
+
+        time.sleep(RUN_TIME)
+
+        self.executor.shutdown(wait=True)
+        self.compare_accumulators(acc, expected, False)
+
+    def test_cancel(self):
+        acc = TimedAccumulator()
+        expected = RefAccumulator()
+        NB = 100
+        futures = []
+        RUN_TIME = 5
+
+        for _ in range(0, NB, 3):
+            f = self.executor.schedule(0.1, acc.add, 1)
+            futures.append(f)
+            expected.add(1, 0.1)
+            f = self.executor.schedule_fixed_rate(0.2, 1000, acc.add, 2)
+            futures.append(f)
+            expected.add(2, 0.2)
+            f = self.executor.schedule_fixed_delay(0.3, 1000, acc.add, 3)
+            futures.append(f)
+            expected.add(3, 0.3)
+ 
+        time.sleep(RUN_TIME)
+
+        for f in futures:
+            f.cancel()
+
+        self.executor.shutdown(wait=True)
+        self.compare_accumulators(acc, expected, False)
+
+
 class FutureTests(unittest.TestCase):
     def test_done_callback_with_result(self):
         callback_result = None
@@ -674,11 +873,15 @@
                                   ThreadPoolExecutorTest,
                                   ProcessPoolWaitTests,
                                   ThreadPoolWaitTests,
+                                  ScheduledThreadPoolWaitTests,
                                   ProcessPoolAsCompletedTests,
                                   ThreadPoolAsCompletedTests,
+                                  ScheduledThreadPoolAsCompletedTests,
                                   FutureTests,
                                   ProcessPoolShutdownTest,
                                   ThreadPoolShutdownTest,
+                                  ScheduledThreadPoolShutdownTest,
+                                  ScheduledThreadPoolExecutorTest,
                                   )
     finally:
         test.support.reap_children()
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to