At first, sorry for my bad english I will try to explain idea about Deferreds and inlineCallbacks
Wanted features: 1. Ability to delete callbacks (delCallbacks) 2. Automatic cancelling Deferreds if they are not needed any more (no callbacks registered any more, all was deleted) 3. Ability to add/del hooks on deferred's finishing with errback or callback (addFinalizer/delFinalizer) 4. Automatic call registered finalizers when deferred finished with errback or callback (and cancel due to cancel calls errback) With this features we can make cascading cancelling of inlineCallbacks and cancelling full stack tree of inlineCallbacks when it is not needed any more (some shutdown occurs for example at top level) See full code for details (this is runnable script): from twisted.internet import defer from twisted.python.failure import Failure from sys import exc_info import warnings class InlineCallbacksManager(object): def __init__(self, *args, **kw): self.deferred = defer.Deferred() def send_result(self, g, result): return g.send(result) def throw_exception(self, g, result): return result.throwExceptionIntoGenerator(g) def stop_iteration(self): self.deferred.callback(None) return self.deferred def return_value(self, value): self.deferred.callback(value) return self.deferred def exception(self): self.deferred.errback() return self.deferred def _inlineCallbacks(self, result, g): """ See L{inlineCallbacks}. """ # This function is complicated by the need to prevent unbounded recursion # arising from repeatedly yielding immediately ready deferreds. This while # loop and the waiting variable solve that by manually unfolding the # recursion. waiting = [True, # waiting for result? None] # result while 1: try: # Send the last result back as the result of the yield expression. isFailure = isinstance(result, Failure) if isFailure: result = self.throw_exception(g, result) else: result = self.send_result(g, result) except StopIteration: # fell off the end, or "return" statement return self.stop_iteration() except defer._DefGen_Return, e: # returnValue() was called; time to give a result to the original # Deferred. First though, let's try to identify the potentially # confusing situation which results when returnValue() is # accidentally invoked from a different function, one that wasn't # decorated with @inlineCallbacks. # The traceback starts in this frame (the one for # _inlineCallbacks); the next one down should be the application # code. appCodeTrace = exc_info()[2].tb_next.tb_next if isFailure: # If we invoked this generator frame by throwing an exception # into it, then throwExceptionIntoGenerator will consume an # additional stack frame itself, so we need to skip that too. appCodeTrace = appCodeTrace.tb_next # Now that we've identified the frame being exited by the # exception, let's figure out if returnValue was called from it # directly. returnValue itself consumes a stack frame, so the # application code will have a tb_next, but it will *not* have a # second tb_next. if appCodeTrace.tb_next.tb_next: # If returnValue was invoked non-local to the frame which it is # exiting, identify the frame that ultimately invoked # returnValue so that we can warn the user, as this behavior is # confusing. ultimateTrace = appCodeTrace while ultimateTrace.tb_next.tb_next: ultimateTrace = ultimateTrace.tb_next filename = ultimateTrace.tb_frame.f_code.co_filename lineno = ultimateTrace.tb_lineno warnings.warn_explicit( "returnValue() in %r causing %r to exit: " "returnValue should only be invoked by functions decorated " "with inlineCallbacks" % ( ultimateTrace.tb_frame.f_code.co_name, appCodeTrace.tb_frame.f_code.co_name), DeprecationWarning, filename, lineno) return self.return_value(e.value) except: return self.exception() if isinstance(result, tuple): # yield tuple support!!! non_deferreds_cnt = 0 list_of_deferreds = [] for r in result: if not isinstance(r, defer.Deferred): r = defer.succeed(r) non_deferreds_cnt += 1 list_of_deferreds.append(r) if non_deferreds_cnt != len(result): result = defer.DeferredList(list_of_deferreds, fireOnOneErrback=1) if isinstance(result, defer.Deferred): # a deferred was yielded, get the result. if isinstance(result, defer.DeferredList): # yield tuple support!!! def gotResult(r): if isinstance(r, Failure): r = r.value.subFailure else: r = tuple(_r for _s, _r in r) if waiting[0]: waiting[0] = False waiting[1] = r else: self.deferred.delFinalizer(result.delBoth, gotResult) # cascading cancelling support!!! self._inlineCallbacks(r, g) else: def gotResult(r): if waiting[0]: waiting[0] = False waiting[1] = r else: self.deferred.delFinalizer(result.delBoth, gotResult) # cascading cancelling support!!! self._inlineCallbacks(r, g) result.addBoth(gotResult) if waiting[0]: # Haven't called back yet, set flag so that we get reinvoked # and return from the loop waiting[0] = False self.deferred.addFinalizer(result.delBoth, gotResult) # cascading cancelling support!!! return self.deferred result = waiting[1] # Reset waiting to initial values for next loop. gotResult uses # waiting, but this isn't a problem because gotResult is only # executed once, and if it hasn't been executed yet, the return # branch above would have been taken. waiting[0] = True waiting[1] = None from twisted.python.util import mergeFunctionMetadata def create_inline_callbacks_decorator(manager_factory): def inline_callbacks(f): def unwind_generator(*args, **kwargs): manager = manager_factory(*args, **kwargs) return manager._inlineCallbacks(None, f(*args, **kwargs)) return mergeFunctionMetadata(f, unwind_generator) return inline_callbacks inlineCallbacks = create_inline_callbacks_decorator(InlineCallbacksManager) defer.inlineCallbacks = inlineCallbacks # # # # # Deferred # # Fixes: # 1. raise CancelledError with current traceback (original twisted code raises with empty traceback) # 2. ability to cancal with given traceback (parameter `failure`, if callable must return failure) def deferred_cancel(self, failure=None): if not self.called: canceller = self._canceller if canceller: canceller(self) else: # Arrange to eat the callback that will eventually be fired # since there was no real canceller. self._suppressAlreadyCalled = 1 if not self.called: # There was no canceller, or the canceller didn't call # callback or errback. if failure is not None: if callable(failure): failure = failure() self.errback(failure) else: try: raise defer.CancelledError() except defer.CancelledError: self.errback(Failure()) elif isinstance(self.result, defer.Deferred): # Waiting for another deferred -- cancel it instead. self.result.cancel() defer.Deferred.cancel = deferred_cancel # Fixes: add `finalizers` member original_deferred___init__ = defer.Deferred.__init__ def deferred___init__(self, *args, **kw): original_deferred___init__(self, *args, **kw) self.finalizers = [] defer.Deferred.__init__ = deferred___init__ def deferred_addFinalizer(self, callback, *args, **kw): assert callable(callback) self.finalizers.append((callback, args, kw)) if self.called: self._runFinalizers() return self defer.Deferred.addFinalizer = deferred_addFinalizer def deferred_delFinalizer(self, callback, *args, **kw): if self.called: defer.AlreadyCalledError assert callable(callback) self.finalizers.remove((callback, args, kw)) return self defer.Deferred.delFinalizer = deferred_delFinalizer def deferred__runFinalizers(self): if not self._finalized: self._finalized = True for callback, args, kw in self.finalizers: callback(*args, **kw) defer.Deferred._runFinalizers = deferred__runFinalizers defer.Deferred._finalized = False # Fixes: run `finalizers` when done original_deferred_callback = defer.Deferred.callback def deferred_callback(self, result): original_deferred_callback(self, result) self._runFinalizers() defer.Deferred.callback = deferred_callback # Fixes: run `finalizers` when done original_deferred_errback = defer.Deferred.errback def deferred_errback(self, fail=None): original_deferred_errback(self, fail=fail) self._runFinalizers() defer.Deferred.errback = deferred_errback def _skip_result(result): pass def deferred_delCallbacks(self, callback, errback=None, callbackArgs=None, callbackKeywords=None, errbackArgs=None, errbackKeywords=None): if self.called: defer.AlreadyCalledError assert callable(callback) assert errback == None or callable(errback) cbs = ((callback, callbackArgs, callbackKeywords), (errback or (defer.passthru), errbackArgs, errbackKeywords)) self.callbacks.remove(cbs) if not self.callbacks: self.addBoth(_skip_result) self.cancel() return self defer.Deferred.delCallbacks = deferred_delCallbacks def deferred_delCallback(self, callback, *args, **kw): return self.delCallbacks(callback, callbackArgs=args, callbackKeywords=kw) defer.Deferred.delCallback = deferred_delCallback def deferred_delErrback(self, errback, *args, **kw): return self.delCallbacks(defer.passthru, errback, errbackArgs=args, errbackKeywords=kw) defer.Deferred.delErrback = deferred_delErrback def deferred_delBoth(self, callback, *args, **kw): return self.delCallbacks(callback, callback, callbackArgs=args, errbackArgs=args, callbackKeywords=kw, errbackKeywords=kw) defer.Deferred.delBoth = deferred_delBoth def deferred_unchainDeferred(self, d): return self.delCallbacks(d.callback, d.errback) defer.Deferred.unchainDeferred = deferred_unchainDeferred def deferred_delCallbacksSafe(self, *args, **kw): if not self.called: return self.delCallbacks(*args, **kw) return self defer.Deferred.delCallbacksSafe = deferred_delCallbacksSafe def deferred_delCallbackSafe(self, *args, **kw): if not self.called: return self.delCallback(*args, **kw) return self defer.Deferred.delCallbackSafe = deferred_delCallbackSafe def deferred_delErrbackSafe(self, *args, **kw): if not self.called: return self.delErrback(*args, **kw) return self defer.Deferred.delErrbackSafe = deferred_delErrbackSafe def deferred_delBothSafe(self, *args, **kw): if not self.called: return self.delBoth(*args, **kw) return self defer.Deferred.delBothSafe = deferred_delBothSafe def deferred_unchainDeferredSafe(self, *args, **kw): if not self.called: return self.unchainDeferred(*args, **kw) return self defer.Deferred.unchainDeferredSafe = deferred_unchainDeferredSafe # # # # # DeferredList # # fixes: delCallbacks when finished original_deferred_list___init__ = defer.DeferredList.__init__ def deferred_list___init__(self, deferredList, *args, **kw): original_deferred_list___init__(self, deferredList, *args, **kw) index = 0 for deferred in deferredList: self.addFinalizer( deferred.delCallbacksSafe, self._cbDeferred, self._cbDeferred, callbackArgs=(index,defer.SUCCESS), errbackArgs=(index,defer.FAILURE), ) index = index + 1 defer.DeferredList.__init__ = deferred_list___init__ # # # # # Test # if __name__ == '__main__': from twisted.internet import reactor import threading import time def log(s): print time.strftime('%H:%M:%S'), s # some long deferred work def deferred_work(): cancel_flag = [False] def my_canceller(d): cancel_flag[0] = True res = defer.Deferred(canceller=my_canceller) def work(): log('start work') cnt = 7 while cnt: cnt -= 1 time.sleep(1) if cancel_flag[0]: log('work cancelled') return log('finish work, work not cancelled!!!') return res.callback(7) threading.Thread(target=work).start() return res # # # # # Shuttingdown test # notes: register all inlineCallbacks deferreds and cancel them all when shuttingdown occurs # class ShuttingdownError(BaseException): pass # base class for shuttingdown support class ShuttingdownSupport(object): shuttingdown = 0 def __init__(self): self._deferreds = {} def shutdown(self): self.shuttingdown = 1 self.do_shutdown() def do_shutdown(self): log('SHUTDOWN') for deferred in self._deferreds.keys(): try: raise ShuttingdownError except ShuttingdownError: deferred.errback() def _reg_deferred(self, deferred): self._deferreds[deferred] = 1 def _unreg_deferred(self, deferred): del self._deferreds[deferred] # special inlineCallbacks manager for shuttingdown support class InlineCallbacksManagerWithShuttingdownSupport(InlineCallbacksManager): def __init__(self, instance, *args, **kw): self.deferred = defer.Deferred() self.instance = instance instance._reg_deferred(self.deferred) self.deferred.addFinalizer(instance._unreg_deferred, self.deferred) def send_result(self, g, result): if self.instance.shuttingdown: # if shuttingdown occurs while waiting result then raise ShuttingdownError into generator try: raise ShuttingdownError except ShuttingdownError: failure = Failure() self.throw_exception(g, failure) else: return InlineCallbacksManager.send_result(self, g, result) # decorator for methods of ShuttingdownSupport objects inlineCallbacksShuttingdown = create_inline_callbacks_decorator(InlineCallbacksManagerWithShuttingdownSupport) class ShuttingdownTest(ShuttingdownSupport): @inlineCallbacksShuttingdown def test1(self): log('start ShuttingdownTest.test1') try: res = yield deferred_work() except defer.CancelledError: log('ShuttingdownTest.test1 cancelled') raise except ShuttingdownError: log('ShuttingdownTest.test1 shuttingdown detected') raise log('finish ShuttingdownTest.test1') defer.returnValue(res) @inlineCallbacksShuttingdown def test2(self): log('start ShuttingdownTest.test2') try: res = yield self.test1(), self.test1() # test yield tuple!!! except defer.CancelledError: log('ShuttingdownTest.test2 cancelled') raise except ShuttingdownError: log('ShuttingdownTest.test2 shuttingdown detected') raise log('finish ShuttingdownTest.test2') defer.returnValue(res) # @defer.inlineCallbacks # def test1(): # log('start test1') # try: # res = yield deferred_work() # except defer.CancelledError: # log('test1 cancelled') # raise # log('finish test1, test1 not cancelled!!!') # defer.returnValue(res) # @defer.inlineCallbacks # def test2(): # log('start test2') # try: # res = yield test1() # except defer.CancelledError: # log('test2 cancelled') # raise # log('finish test2, test2 not cancelled!!!') # defer.returnValue(res) def ok(result): log('ok ' + repr(result)) def error(result): log('error ' + repr(result)) # def run_test1(): # deferred = test2() # deferred.addCallbacks(ok, error) # reactor.callLater(1, deferred.cancel) # reactor.callLater(1, run_test1) # run_test1() def run_test2(): obj = ShuttingdownTest() deferred = obj.test2() deferred.addCallbacks(ok, error) reactor.callLater(1, obj.shutdown) # reactor.callLater(3, run_test2) run_test2() reactor.callLater(10, reactor.stop) reactor.run() _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python