You are currently considering your task from the viewpoint “let's make a 
callback chain for the perfect workflow and alter this chain in case of 
anything going wrong.” I think the flaw with this approach is that you are 
trying to make your “ideal” flow work at all in situations where it would, in 
fact, fail for the most of the time.

I am not sure I understand you correct...
In my case when ASR or TTS resource aquiring fails I must transfer call to operator but not fail.

Instead of this, you could try to reconsider the task from the viewpoint of 
“Let's not add any further callbacks until this is absolutely necessary”. For 
me, this approach rather works. Moreover, you have the means to do it — a 
callback can return a Deferred, and so on.

Good. But I need to start acquiring ASR and TTS resources at one time (in parallel) because this is very long operations and they can be good performed in parallel because physically this is other and different machines (computers). Why we can't do that way? Why user have to wait double time?

But it doesn't matter, because there is one more thing. Ok. Let's do it sequentially. But what if user hangs up call? I want to immediately stop all deferred processes due I don't need their results any more. All yields inside inlineCallbacks must raise some exception.

This code shows how we can solve this with feautures described

class DestroyingError(Exception):
    pass

class DestroySupport(object):
    __destroying_deferreds = None

    def __get_destroying_deferred(self):
        if self.__destroying_deferreds is None:
            self.__destroying_deferreds = {}
        deferred = defer.Deferred()
        self.__destroying_deferreds[deferred] = 1
        # delete "destroying" `deferred` when it finished
        deferred.addFinalizer(self.__destroying_deferreds.pop, deferred)
        return deferred

    def wait_destroying(self):
        return self.__get_destroying_deferred()

    def destroy(self):
        if self.__destroying_deferreds is not None:
            # errback all "destroying" deferreds
            for deferred in self.__destroying_deferreds.keys():
                try:
                    raise DestroyingError('destroying')
                except DestroyingError:
                    deferred.errback()
            self.__destroying_deferreds = None

# See InlineCallbacksManager in my first message in current discussion thread class InlineCallbacksWithDestroySupportManager(defer.InlineCallbacksManager):
    def __init__(self, instance, *args, **kw):
        defer.InlineCallbacksManager.__init__(self, instance, *args, **kw)
        # chain Deferred with "destroying" deferred
# therefore if destroy occurs Deferred errbacked with DestroyingError
        destroying_deferred = instance.wait_destroying()
        destroying_deferred.chainDeferred(self.deferred)
        # unchain if deferred finished (
# "destroying" deferred will be cancelled also due not used any more # and deletes from DestroySupport.__destroying_deferreds due to its finalizer
        #     we add in DestroySupport.__get_destroying_deferred
        # )
self.deferred.addFinalizer(destroying_deferred.unchainDeferredSafe, self.deferred)

inlineCallbacksWithDestroySupport = defer.create_inline_callbacks_decorator(
    InlineCallbacksWithDestroySupportManager
)

class CallService(DestroySupport):
    def on_connection_lost(self):
        # launch cascade destroying when connection lost!!!
        # all yields will raise DestroyingError
        # therefore all resources will be released
        self.destroy()

    @inlineCallbacksWithDestroySupport
    def incoming_call(self, call):
        call.start_play('music')
        try:
            # tuple yield feature!
tts, asr = yield self.acquire_tts_for(call), self.acquire_asr_for(call)
        except TimeoutError:
# this occurs if TimeoutError raises in `self.acquire_tts_for` or `self.acquire_asr_for`
            # i.e. DeferredList(..., fireOnOneErrback=1)
            call.transfer_to('operator')
# NOTE: at this point I want to automatically _recursively_ (in deep)
            # stop all deferred processes
# starting inside `self.acquire_tts_for` and `self.acquire_asr_for` # because I don't need their results (and expensive resources!) any more
        except DestroyingError:
            log('Destroying detected')
            raise
        else:
            try:
                try:
                    yield tts.speak('what you want? say me after signal')
                    yield asr.start_recognition()
                    call.start_play('signal')
                    result = yield asr.wait_recognition_result()
                    ... do something with result ...
                except DestroyingError:
                    log('Destroying detected')
                    raise
            finally:
                tts.release()
                asr.release()

    @inlineCallbacksWithDestroySupport
    def acquire_tts_for(self, call):
        # may raise TimeoutError inside
        tts_connection = yield self.tts.acquire_connection(timeout=10)
        try:
            # may raise TimeoutError inside
yield call.make_audio_link_with(tts_connection, 'in', timeout=10)
        except Exception:
# we MAY receive this Exception when cascading cancellation occurs
            tts_connection.release()
            raise
        else:
            return tts_connection

    @inlineCallbacksWithDestroySupport
    def acquire_asr_for(self, call):
        # may raise TimeoutError inside
        asr_connection = yield self.asr.acquire_connection(timeout=10)
        try:
            # may raise TimeoutError inside
yield call.make_audio_link_with(asr_connection, 'out', timeout=10)
        except Exception:
# we MAY receive this Exception when cascading cancellation occurs
            asr_connection.release()
            raise
        else:
            return asr_connection

_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to