I need a scheduler which can delay execution of a function for certain period of time. My attempt was something like this:
def delay(self, func, arg, delay_sec=0): fire_at = wallclock() + delay_sec self.queue.put((fire_at, func, arg)) def runner(self): while self.alive: fire_at, func, arg = self.queue.get(block=True) try: now = wallclock() if now < fire_at: time.sleep(fire_at - now) func(arg) except Exception, e: log('DelayedTaskManager %s: %s\n' % (self.name, e)) finally: self.queue.task_done() But then I came up with the following case: 1. I call delay with delay_sec = 10 2. The scheduler goes to sleep for 10 seconds 3. In the meantime (lets say 1 second later) I delay another func but this time with delay_sec=0.5 4. The scheduler is sleeping and won't know call my second function for another 9 seconds insted of 0.5 I started googling for scheduler and found one in standard library but ih has the same code as mine (it calls the functions in the right order and my doesn't, but it still waits too long). The other schedulers from web are dealing with repeating tasks and such. So, I wrote this: # modification of http://code.activestate.com/recipes/87369/ class PriorityMinQueue(Queue): def top(self): try: return self.queue[0] except IndexError: return None def _init(self, maxsize): self.maxsize = maxsize self.queue = [] def _put(self, item): return heappush(self.queue, item) def _get(self): return heappop(self.queue) class DelayedTaskManager: def __init__(self, name): self.name = name self.queue = PriorityMinQueue() # can't use queue.not_empty condition because it isn't # signaled with notifyAll so I have to use my own self.sleeper = threading.Condition() def start(self): log('start delayed task manager %s with %d elements\n' % (self.name, self.queue.qsize())) self.alive = True self.thread = threading.Thread(target=self.runner) self.thread.setDaemon(True) self.thread.start() def stop(self): log('stop delayed task manager %s with %d elements\n' % (self.name, self.queue.qsize())) self.alive = False self._wake() self.thread.join() def delay(self, delay_sec, func, *arg, **kw): # even if delay is 0 or less, put to queue # so the function gets executed concurrently fire_at = wallclock() + delay_sec self.queue.put((fire_at, func, arg, kw)) self._wake() def _wake(self): with self.sleeper: self.sleeper.notify() def _wait(self, timeout): with self.sleeper: self.sleeper.wait(timeout) def runner(self): while self.alive: fire_at, func, arg, kw = self.queue.get(block=True) try: now = wallclock() while now < fire_at: self._wait(fire_at - now) if not self.alive: # canceled log('delayed task manager %s was stoped\n', self.name) return self.queue.put((fire_at, func, arg, kw)) top = self.queue.top() if top is not None and top[0] < fire_at: # temporally closer item, put back the old one self.queue.put((fire_at, func, arg, kw)) self.queue.task_done() fire_at, func, arg, kw = self.queue.get() now = wallclock() func(*arg, **kw) except Exception, e: log('delayed task manager %s: %s\n', self.name, e) finally: self.queue.task_done() Is there a better way or some library that does that? My observations: 1. Threading module uses time.sleep instead of time.clock which results in less precise results (on windows platform) if sys.platform=="win32": #take care of differences in clock accuracy wallclock = time.clock else: wallclock = time.time 2. while analyzing threading module i noticed that wait() is implemented via loop and tiny sleep periods. I was expecting the usage of underlaying OS primitives and functions but then I remembered about GIL and quasi-multithreaded nature of Python. But still, isn't there a more precise method that interpreter itself could implement? Thanks, Tvrtko P.S. This was Python 2.5 -- http://mail.python.org/mailman/listinfo/python-list