I found that the format of the mail was out of order. Please look at the document: https://docs.google.com/document/d/1jCKan5LGlkBZr2seUdwTsy-biu8Sz0xyQwqizCUZ2uk/edit?usp=sharing
Best, Haibo At 2019-12-16 21:29:15, "Haibo Sun" <sunhaib...@163.com> wrote: >Hi, all > > >I want to bring up a discussion about how to deal with the pending(registered >but not scheduled for execution) >timers better before closing operator. > > >Introduction: From my understanding, there are two types of timers in Flink: >1) One is one-shot or periodic >timers registered with ProcessingTimeService. In terms of the underlying >implementation, each timer corresponds >to a Runnable and is executed through a thread pool. 2) The other is >event-time or processing-time timers >registered with InternalTimerService, which can be stateful. Event-time timers >are triggered by watermark, >and processing-time timers registered with the same InternalTimerService >instance are triggered by a real >timer registered with ProcessingTimeService. For the convenience of later >expression, here we define the >first type as "physical timer", and the second type (including timers >registered with the interfaces built on >InternalTimerService such as api.windowing.triggers.TriggerContext) as >"logical timer". > > >Why and how to deal with the physical timers better before closing operators? >Currently, after the operator is closed, it is still allowed to fire the >physical timers registered by it, which may >output data again, making the close semantics of operators on the operator >chain not strict (the strict semantics >should no longer output data after closing). So we need to explicitly let all >physical timers done before we close >the operator. Because physical timers are registered by the operator, runtime >cannot, at its own discretion, cancels >or triggers them before closing the operator. For example, If runtime cancels >one of the physical timers registered >but not scheduled for execution before closing the operator, a deadlock may >occur when the operator waits in >the "close()" method for the physical timer to be fired. Overall, we need to >expose some capabilities to the operators >so that they can decide whether to cancel or trigger the physical timers >registered but not scheduled for execution >before closing. > > >About how to expose such capabilities, we may have the following options. For >a periodic physical timer, it should >only need the "cancel" action and the operator can cancel it by calling the >"ScheduledFuture#cancel()" method , >so here we should not need to consider it. > > >Option 1: The physical timer is of the ScheduledFuture type, and there is >already the "#cancel()" method in >ScheduledFuture. We just need to add the "#triggerIfNotExecuted()" method, and >the changes mainly include: > 1) Adds the following TimerScheduledFuture interface that extended > ScheduledFuture, and changes the return > type of "ProcessingTimerService#registerTimer" to TimerScheduledFuture. > If the operator wants to trigger a > pending physical timer before closing, > "TimerScheduledFuture#triggerIfNotExecuted()" > need to be called explicitly in its "endInput" method. >public interface TimerScheduledFuture<V> extends TimerScheduledFuture<V> { > void triggerIfNotExecuted(long timestamp); >} > 2) For the pending physical timers that are not cancelled or triggered by > the operator before closing, runtime will > cancel them but wait for those in executing to finish to maintain > backward compatibility. > > >Option 2: When the operator registers a physical timer, it passes a callback >parameter of Runnable type >(like ProcessingTimeCallback), which is called back by runtime before closing >the operator to let the operator >deal with those pending physical timers. The changes mainly include: > 1) Applies all changes from option 1 > 2) Adds the following overloaded method to the ProcessingTimerService > interface. >ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback >target, Runnable actionBeforeClosingOperator); > > >How to deal with the logical timers better before closing operators? >For logic timers, that capabilities also should be exposed to operators. >According to my understanding, the logical >timer can be registered through the three interfaces: api.TimerService, >api.windowing.triggers.TriggerContext, >api.operators.InternalTimerService. And the first two are implemented using >api.operators.InternalTimerService. >As can be known from the declaration of the registration method, it does not >return the logical timer object, which >is managed uniformly by the implementation of InternalTimerService. > > >1. For the api.operators.InternalTimerService interface, we may make the >following changes: > 1) Adds the following overloaded method to the InternalTimerService > interface. >void registerProcessingTimeTimer(N namespace, long time, >TimerActionOnOperatorClose timerActionOnOperatorClose); >/** > * It defines how to handle the timers not scheduled for execution before the > operator is closed. > * > */ >enum TimerActionOnOperatorClose { > CANCEL, > TRIGGER_WITH_ORIGINAL_TIMESTAMP, > TRIGGER_WITH_CURRENT_TIMESTAMP, > TRIGGER_WITH_MAX_TIMESTAMP >} > 2) Changes the serialization of stateful timers to add state processing > for the TimerActionOnOperatorClose > parameter, which will slightly increase the size of the state. The > existing versioned serialization mechanism > of stateful timers can ensures backward compatibility. > 3) Considering that stateful logical timers should be triggered in most > cases before closing the operator, the > default action for the old "#registerProcessingTimeTimer()" method > without the "timerActionOnOperatorClose" > parameter is TRIGGER_WITH_MAX_TIMESTAMP. > > >2. The change of the api.TimerService interface is similar to that of >InternalTimerService, overloading a method >with the "timerActionOnOperatorClose" parameter. >void registerProcessingTimeTimer(long time, TimerActionOnOperatorClose >timerAction); > > >3. Because the api.windowing.triggers.TriggerContext interface is only used >for window, and the logical timers of >window should be triggered before closing the operator, they can use the >default action and the TriggerContext >interface do not need to be change. > > > > >Please feel free to share you thoughts. Thanks. >Especially, thanks to Piotr, Aljoscha, Arvid, Roman, who participated in the >discussion earlier. > > >Best, >Haibo >