On Jul 12, 12:16 pm, Josiah Carlson <[EMAIL PROTECTED]> wrote: > On Jul 9, 4:13 am, "Giampaolo Rodola'" <[EMAIL PROTECTED]> wrote: > > > > > Hi, > > I'm trying to implement an asynchronous scheduler forasyncoreto call > > functions at a later time without blocking the main loop. > > The logic behind it consists in: > > > - adding the scheduled functions into a heapified list > > - calling a "scheduler" function at every loop which checks the > > scheduled functions due to expire soonest > > > Note that, by using a heap, the first element of the list is always > > supposed to be the one with the lower timeout. > > Here's the code I wrote: > > > <--- snippet ---> > > import heapq > > import time > > import sys > > > delayed_map = [] > > > class delayed_call: > > """Calls a function at a later time. > > > The instance returned is an object that can be used to cancel the > > scheduled call, by calling its cancel() method. > > It also may be rescheduled by calling delay() or reset()} methods. > > """ > > > def __init__(self, delay, target, *args, **kwargs): > > """ > > - delay: the number of seconds to wait > > - target: the callable object to call later > > - args: the arguments to call it with > > - kwargs: the keyword arguments to call it with > > """ > > assert callable(target), "%s is not callable" %target > > assert sys.maxint >= delay >= 0, "%s is not greater than or > > equal " \ > > "to 0 seconds" % (delay) > > self.__delay = delay > > self.__target = target > > self.__args = args > > self.__kwargs = kwargs > > # seconds from the epoch at which to call the function > > self.timeout = time.time() + self.__delay > > self.cancelled = False > > heapq.heappush(delayed_map, self) > > > def __le__(self, other): > > return self.timeout <= other.timeout > > > def active(self): > > """Return True if this scheduler has not been cancelled.""" > > return not self.cancelled > > > def call(self): > > """Call this scheduled function.""" > > self.__target(*self.__args, **self.__kwargs) > > > def reset(self): > > """Reschedule this call resetting the current countdown.""" > > assert not self.cancelled, "Already cancelled" > > self.timeout = time.time() + self.__delay > > if delayed_map[0] is self: > > heapq.heapify(delayed_map) > > > def delay(self, seconds): > > """Reschedule this call for a later time.""" > > assert not self.cancelled, "Already cancelled." > > assert sys.maxint >= seconds >= 0, "%s is not greater than or > > equal " \ > > "to 0 seconds" %(seconds) > > self.__delay = seconds > > self.reset() > > > def cancel(self): > > """Unschedule this call.""" > > assert not self.cancelled, "Already cancelled" > > del self.__target, self.__args, self.__kwargs > > if self in delayed_map: > > if delayed_map[0] is self: > > delayed_map.remove(self) > > heapq.heapify(delayed_map) > > else: > > delayed_map.remove(self) > > self.cancelled = True > > > def fun(arg): > > print arg > > > a = delayed_call(0.6, fun, '0.6') > > b = delayed_call(0.5, fun, '0.5') > > c = delayed_call(0.4, fun, '0.4') > > d = delayed_call(0.3, fun, '0.3') > > e = delayed_call(0.2, fun, '0.2') > > f = delayed_call(0.1, fun, '0.1') > > > while delayed_map: > > now = time.time() > > while delayed_map and now >= delayed_map[0].timeout: > > delayed = heapq.heappop(delayed_map) > > try: > > delayed.call() > > finally: > > if not delayed.cancelled: > > delayed.cancel() > > time.sleep(0.01) > > </--- snippet ---> > > > Here comes the questions. > > Since that the timeouts of the scheduled functions contained in the > > list can change when I reset() or cancel() them I don't know exactly > > *when* the list needs to be heapified(). > > By doing some tests I came to the conclusion that I need the heapify() > > the list only when the function I reset() or cancel() is the *first of > > the list* but I'm not absolutely sure about it. > > When do you think it would be necessary calling heapify()? > > I wrote a short test suite which tests the code above and I didn't > > notice strange behaviors but since that I don't know much about the > > logic behind heaps I'd need some help. > > Thanks a lot in advance. > > > --- Giampaolohttp://code.google.com/p/pyftpdlib/ > > I dug through my old pair heap implementation, did a little hacking on > heapq, and wrote a task scheduler system that plugs in toasyncore. > > To schedule a task, you use: > task =asyncore.schedule_task(schedule, delay, callable, *args, > **kwargs) > > Once you have that task object, you can then > use:asyncore.reschedule_task(schedule, delay, > task)asyncore.abs_reschedule_task(schedule, time, task) > ... to reschedule the task into the future (or past). > > You can also:asyncore.delete_task(schedule, task) > ... to completely remove the task from the scheduler. > > Each one of these operations are O(logn), where n is the number of > tasks currently known to the scheduler. > > To accommodate the new scheduler,asyncore.loop() now has the > following call signature. > def loop(timeout=30.0, use_poll=False, map=None, count=None, > schedule=None, use_schedule=False): > > To try to help prevent poll_fcn() starvation (in the case of long- > running scheduled tasks), the task window execution is set just prior > to the poll_fun() call to be now + .01 seconds. That is, window is > set, poll_fun() is called (which should handle current I/O > operations), and all tasks that are to be completed prior to the end > of the task window (now+.01 seconds, set prior to the poll_fun() > call). > > Asyncoreobjects will not gain a set_scheduler() method, nor will they > gain a schedule keyword argument on instantiation. Why? Because the > scheduler is not required for socket I/O to work properly. If you > want to use the scheduler from your own > subclasses,asyncore.<schedule_fcn>(asyncore.scheduled_tasks, ...) should be > sufficient. > > This scheduler can be easily plugged into other systems, and it's > likely that I'll add it as an interactive scheduler to sched.py, put > the pair heap implementation into collections, and call it good. > > - Josiah
After some personal thought a couple weeks back, I instead added bits of functionality to sched.py to make all of these features possible, then using it from asyncore. I discussed why a couple weeks ago on my blog: http://chouyu-31.livejournal.com/316112.html - Josiah -- http://mail.python.org/mailman/listinfo/python-list