Serhiy Storchaka added the comment:
> I don't understand what you're gaining with this complicated class: your
> class guarantees that the sleepers will be woken up, but it doesn't
> guarantee that any user code will actually run.
It guarantees that advance() returns only when all sleepers are sleeping or
died, and all user code before specified time mark have executed. User code
running does not take time, only sleeps take time. Actually this class is a
forward-running time-machine with pause. But it looks overkill here.
> Perhaps the whole thing would be simpler if your tests used a Queue
> instead of a list?
Hmm, I had not occurred to this idea. Indeed, the patch is a little
complicated (honestly speaking, it is a lot complicated). Here is a more
simplified patch that uses a queue for synchronization in one direction and
simple custom timer for synchronization in another direction. Thank you for
suggestion.
----------
Added file: http://bugs.python.org/file28593/test_sched_queue.patch
_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue16843>
_______________________________________
diff -r c1fc6b6d1cfc Lib/test/test_sched.py
--- a/Lib/test/test_sched.py Sat Jan 05 06:26:39 2013 -0800
+++ b/Lib/test/test_sched.py Sun Jan 06 19:48:45 2013 +0200
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+import queue
import sched
import time
import unittest
@@ -9,6 +10,35 @@
except ImportError:
threading = None
+TIMEOUT = 10
+
+
+class Timer:
+ def __init__(self):
+ self._cond = threading.Condition()
+ self._time = 0
+ self._stop = 0
+
+ def time(self):
+ with self._cond:
+ return self._time
+
+ def sleep(self, t):
+ assert t >= 0
+ with self._cond:
+ t += self._time
+ while self._stop < t:
+ self._time = self._stop
+ self._cond.wait()
+ self._time = t
+
+ def advance(self, t):
+ assert t >= 0
+ with self._cond:
+ self._stop += t
+ self._cond.notify_all()
+
+
class TestCase(unittest.TestCase):
def test_enter(self):
@@ -31,17 +61,33 @@
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_enter_concurrent(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- scheduler.enter(0.03, 1, fun, (0.03,))
+ q = queue.Queue()
+ fun = q.put
+ timer = Timer()
+ scheduler = sched.scheduler(timer.time, timer.sleep)
+ scheduler.enter(1, 1, fun, (1,))
+ scheduler.enter(3, 1, fun, (3,))
t = threading.Thread(target=scheduler.run)
t.start()
- for x in [0.05, 0.04, 0.02, 0.01]:
- z = scheduler.enter(x, 1, fun, (x,))
- scheduler.run()
- t.join()
- self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 1)
+ self.assertTrue(q.empty())
+ for x in [4, 5, 2]:
+ z = scheduler.enter(x - 1, 1, fun, (x,))
+ timer.advance(2)
+ self.assertEqual(q.get(timeout=TIMEOUT), 2)
+ self.assertEqual(q.get(timeout=TIMEOUT), 3)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 4)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 5)
+ self.assertTrue(q.empty())
+ timer.advance(1000)
+ t.join(timeout=TIMEOUT)
+ self.assertFalse(t.is_alive())
+ self.assertTrue(q.empty())
def test_priority(self):
l = []
@@ -69,21 +115,35 @@
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_cancel_concurrent(self):
- l = []
- fun = lambda x: l.append(x)
- scheduler = sched.scheduler(time.time, time.sleep)
- now = time.time()
- event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
- event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
- event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
- event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
- event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
+ q = queue.Queue()
+ fun = q.put
+ timer = Timer()
+ scheduler = sched.scheduler(timer.time, timer.sleep)
+ now = timer.time()
+ event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
+ event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
+ event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
+ event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
+ event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
t = threading.Thread(target=scheduler.run)
t.start()
- scheduler.cancel(event1)
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 1)
+ self.assertTrue(q.empty())
+ scheduler.cancel(event2)
scheduler.cancel(event5)
- t.join()
- self.assertEqual(l, [0.02, 0.03, 0.04])
+ timer.advance(1)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 3)
+ self.assertTrue(q.empty())
+ timer.advance(1)
+ self.assertEqual(q.get(timeout=TIMEOUT), 4)
+ self.assertTrue(q.empty())
+ timer.advance(1000)
+ t.join(timeout=TIMEOUT)
+ self.assertFalse(t.is_alive())
+ self.assertTrue(q.empty())
def test_empty(self):
l = []
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com