Hi, all

I want to bring up a discussion about how to deal with the pending(registered 
but not scheduled for execution) 
timers better before closing operator.


Introduction: From my understanding, there are two types of timers in Flink: 1) 
One is one-shot or periodic
timers registered with ProcessingTimeService. In terms of the underlying 
implementation, each timer corresponds
to a Runnable and is executed through a thread pool. 2) The other is event-time 
or processing-time timers
registered with InternalTimerService, which can be stateful. Event-time timers 
are triggered by watermark,
and processing-time timers registered with the same InternalTimerService 
instance are triggered by a real
timer registered with ProcessingTimeService. For the convenience of later 
expression, here we define the
first type as "physical timer", and the second type (including timers 
registered with the interfaces built on
InternalTimerService such as api.windowing.triggers.TriggerContext) as "logical 
timer".


Why and how to deal with the physical timers better before closing operators?
Currently, after the operator is closed, it is still allowed to fire the 
physical timers registered by it, which may
output data again, making the close semantics of operators on the operator 
chain not strict (the strict semantics
should no longer output data after closing). So we need to explicitly let all 
physical timers done before we close
the operator. Because physical timers are registered by the operator, runtime 
cannot, at its own discretion, cancels
or triggers them before closing the operator. For example, If runtime cancels 
one of the physical timers registered
but not scheduled for execution before closing the operator, a deadlock may 
occur when the operator waits in
the "close()" method for the physical timer to be fired. Overall, we need to 
expose some capabilities to the operators
so that they can decide whether to cancel or trigger the physical timers 
registered but not scheduled for execution
before closing. 


About how to expose such capabilities, we may have the following options. For a 
periodic physical timer, it should
only need the "cancel" action and the operator can cancel it by calling the 
"ScheduledFuture#cancel()" method ,
so here we should not need to consider it.


Option 1:  The physical timer is of the ScheduledFuture type, and there is 
already the "#cancel()" method in
ScheduledFuture. We just need to add the "#triggerIfNotExecuted()" method, and 
the changes mainly include:
    1) Adds the following TimerScheduledFuture interface that extended 
ScheduledFuture, and changes the return
       type of "ProcessingTimerService#registerTimer" to TimerScheduledFuture. 
If the operator wants to trigger a
       pending physical timer before closing, 
"TimerScheduledFuture#triggerIfNotExecuted()"
       need to be called explicitly in its "endInput" method.
public interface TimerScheduledFuture<V> extends TimerScheduledFuture<V> {
    void triggerIfNotExecuted(long timestamp);
}
    2) For the pending physical timers that are not cancelled or triggered by 
the operator before closing, runtime will
        cancel them but wait for those in executing to finish to maintain 
backward compatibility.


Option 2:  When the operator registers a physical timer, it passes a callback 
parameter of Runnable type
(like ProcessingTimeCallback), which is called back by runtime before closing 
the operator to let the operator
deal with those pending physical timers. The changes mainly include:
    1) Applies all changes from option 1
    2) Adds the following overloaded method to the ProcessingTimerService 
interface.
ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target, 
Runnable actionBeforeClosingOperator);


How to deal with the logical timers better before closing operators?
For logic timers, that capabilities also should be exposed to operators. 
According to my understanding, the logical
timer can be registered through the three interfaces: api.TimerService, 
api.windowing.triggers.TriggerContext,
api.operators.InternalTimerService. And the first two are implemented using 
api.operators.InternalTimerService.
As can be known from the declaration of the registration method, it does not 
return the logical timer object, which
is managed uniformly by the implementation of InternalTimerService.


1. For the api.operators.InternalTimerService interface, we may make the 
following changes:
    1) Adds the following overloaded method to the InternalTimerService 
interface. 
void registerProcessingTimeTimer(N namespace, long time, 
TimerActionOnOperatorClose timerActionOnOperatorClose);
/**
 * It defines how to handle the timers not scheduled for execution before the 
operator is closed.
 *
 */
enum TimerActionOnOperatorClose { 
    CANCEL, 
    TRIGGER_WITH_ORIGINAL_TIMESTAMP,
    TRIGGER_WITH_CURRENT_TIMESTAMP,
    TRIGGER_WITH_MAX_TIMESTAMP
}
    2) Changes the serialization of stateful timers to add state processing for 
the TimerActionOnOperatorClose
        parameter, which will slightly increase the size of the state. The 
existing versioned serialization mechanism
        of stateful timers can ensures backward compatibility.
    3) Considering that stateful logical timers should be triggered in most 
cases before closing the operator, the
       default action for the old "#registerProcessingTimeTimer()" method 
without the "timerActionOnOperatorClose"
       parameter is TRIGGER_WITH_MAX_TIMESTAMP.


2. The change of the api.TimerService interface is similar to that of 
InternalTimerService, overloading a method
with the "timerActionOnOperatorClose" parameter.
void registerProcessingTimeTimer(long time, TimerActionOnOperatorClose 
timerAction);


3. Because the api.windowing.triggers.TriggerContext interface is only used for 
window, and the logical timers of
window should be triggered before closing the operator, they can use the 
default action and the TriggerContext
interface do not need to be change.




Please feel free to share you thoughts. Thanks.
Especially, thanks to Piotr, Aljoscha,  Arvid, Roman, who participated in the 
discussion earlier.


Best,
Haibo

Reply via email to