[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17610456#comment-17610456
 ] 

Yun Gao commented on FLINK-18647:
---------------------------------

Very thanks [~pnowojski] and [~dkapoor1] for the discussion and inputs! It 
looks now to me that the timers of different operators indeed have different 
requirements, thus as a whole:
 # For the final solution, I agree with the one raising in the content of this 
issue which allows users to have a complete control on the action of each 
timer. Currently some scenarios are using the low-level ProcessingTimerService 
directly and some scenarios are using the high-level InternalTimerService, we 
need handling both levels carefully.
 # For some kind of shortcuts to configuration actions, operator seems indeed 
to be a better scope than global settings. However, I think it might be not 
easy to use to let users to specify the actions when building graphs. For one 
thing, operators like window operator seems always need to fire the timers, 
thus it might be directly specified by the operator itself. Besides, it seems 
users could only register processing timers with ProcessFunction or Sink, thus 
it might not be required to allow users to specify the behavior for any 
operators. 

Since now both the final solution and shortcuts solution seems to require 
changes to the users' API, and the changes seems could not be reused, for now I 
tend to we directly head to the final solution without the shortcuts one. How 
do you think about that ?

> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
>                 Key: FLINK-18647
>                 URL: https://issues.apache.org/jira/browse/FLINK-18647
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Priority: Not a Priority
>              Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
>     CANCEL_ON_END_OF_INPUT, 
>     TRIGGER_ON_END_OF_INPUT,
>     WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
>     void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer, 
> TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to