I found that the format of the mail was out of order. Please look at the 
document: 
https://docs.google.com/document/d/1jCKan5LGlkBZr2seUdwTsy-biu8Sz0xyQwqizCUZ2uk/edit?usp=sharing


Best,
Haibo
At 2019-12-16 21:29:15, "Haibo Sun" <sunhaib...@163.com> wrote:
>Hi, all
>
>
>I want to bring up a discussion about how to deal with the pending(registered 
>but not scheduled for execution) 
>timers better before closing operator.
>
>
>Introduction: From my understanding, there are two types of timers in Flink: 
>1) One is one-shot or periodic
>timers registered with ProcessingTimeService. In terms of the underlying 
>implementation, each timer corresponds
>to a Runnable and is executed through a thread pool. 2) The other is 
>event-time or processing-time timers
>registered with InternalTimerService, which can be stateful. Event-time timers 
>are triggered by watermark,
>and processing-time timers registered with the same InternalTimerService 
>instance are triggered by a real
>timer registered with ProcessingTimeService. For the convenience of later 
>expression, here we define the
>first type as "physical timer", and the second type (including timers 
>registered with the interfaces built on
>InternalTimerService such as api.windowing.triggers.TriggerContext) as 
>"logical timer".
>
>
>Why and how to deal with the physical timers better before closing operators?
>Currently, after the operator is closed, it is still allowed to fire the 
>physical timers registered by it, which may
>output data again, making the close semantics of operators on the operator 
>chain not strict (the strict semantics
>should no longer output data after closing). So we need to explicitly let all 
>physical timers done before we close
>the operator. Because physical timers are registered by the operator, runtime 
>cannot, at its own discretion, cancels
>or triggers them before closing the operator. For example, If runtime cancels 
>one of the physical timers registered
>but not scheduled for execution before closing the operator, a deadlock may 
>occur when the operator waits in
>the "close()" method for the physical timer to be fired. Overall, we need to 
>expose some capabilities to the operators
>so that they can decide whether to cancel or trigger the physical timers 
>registered but not scheduled for execution
>before closing. 
>
>
>About how to expose such capabilities, we may have the following options. For 
>a periodic physical timer, it should
>only need the "cancel" action and the operator can cancel it by calling the 
>"ScheduledFuture#cancel()" method ,
>so here we should not need to consider it.
>
>
>Option 1:  The physical timer is of the ScheduledFuture type, and there is 
>already the "#cancel()" method in
>ScheduledFuture. We just need to add the "#triggerIfNotExecuted()" method, and 
>the changes mainly include:
>    1) Adds the following TimerScheduledFuture interface that extended 
> ScheduledFuture, and changes the return
>       type of "ProcessingTimerService#registerTimer" to TimerScheduledFuture. 
> If the operator wants to trigger a
>       pending physical timer before closing, 
> "TimerScheduledFuture#triggerIfNotExecuted()"
>       need to be called explicitly in its "endInput" method.
>public interface TimerScheduledFuture<V> extends TimerScheduledFuture<V> {
>    void triggerIfNotExecuted(long timestamp);
>}
>    2) For the pending physical timers that are not cancelled or triggered by 
> the operator before closing, runtime will
>        cancel them but wait for those in executing to finish to maintain 
> backward compatibility.
>
>
>Option 2:  When the operator registers a physical timer, it passes a callback 
>parameter of Runnable type
>(like ProcessingTimeCallback), which is called back by runtime before closing 
>the operator to let the operator
>deal with those pending physical timers. The changes mainly include:
>    1) Applies all changes from option 1
>    2) Adds the following overloaded method to the ProcessingTimerService 
> interface.
>ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
>target, Runnable actionBeforeClosingOperator);
>
>
>How to deal with the logical timers better before closing operators?
>For logic timers, that capabilities also should be exposed to operators. 
>According to my understanding, the logical
>timer can be registered through the three interfaces: api.TimerService, 
>api.windowing.triggers.TriggerContext,
>api.operators.InternalTimerService. And the first two are implemented using 
>api.operators.InternalTimerService.
>As can be known from the declaration of the registration method, it does not 
>return the logical timer object, which
>is managed uniformly by the implementation of InternalTimerService.
>
>
>1. For the api.operators.InternalTimerService interface, we may make the 
>following changes:
>    1) Adds the following overloaded method to the InternalTimerService 
> interface. 
>void registerProcessingTimeTimer(N namespace, long time, 
>TimerActionOnOperatorClose timerActionOnOperatorClose);
>/**
> * It defines how to handle the timers not scheduled for execution before the 
> operator is closed.
> *
> */
>enum TimerActionOnOperatorClose { 
>    CANCEL, 
>    TRIGGER_WITH_ORIGINAL_TIMESTAMP,
>    TRIGGER_WITH_CURRENT_TIMESTAMP,
>    TRIGGER_WITH_MAX_TIMESTAMP
>}
>    2) Changes the serialization of stateful timers to add state processing 
> for the TimerActionOnOperatorClose
>        parameter, which will slightly increase the size of the state. The 
> existing versioned serialization mechanism
>        of stateful timers can ensures backward compatibility.
>    3) Considering that stateful logical timers should be triggered in most 
> cases before closing the operator, the
>       default action for the old "#registerProcessingTimeTimer()" method 
> without the "timerActionOnOperatorClose"
>       parameter is TRIGGER_WITH_MAX_TIMESTAMP.
>
>
>2. The change of the api.TimerService interface is similar to that of 
>InternalTimerService, overloading a method
>with the "timerActionOnOperatorClose" parameter.
>void registerProcessingTimeTimer(long time, TimerActionOnOperatorClose 
>timerAction);
>
>
>3. Because the api.windowing.triggers.TriggerContext interface is only used 
>for window, and the logical timers of
>window should be triggered before closing the operator, they can use the 
>default action and the TriggerContext
>interface do not need to be change.
>
>
>
>
>Please feel free to share you thoughts. Thanks.
>Especially, thanks to Piotr, Aljoscha,  Arvid, Roman, who participated in the 
>discussion earlier.
>
>
>Best,
>Haibo
>

Reply via email to