[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616262#comment-17616262
 ] 

Yun Gao commented on FLINK-18647:
---------------------------------

Hi [~pnowojski]  sorry for the long delay due to just back from the holiday. 
{quote} - each operator might have different default behaviour
 - some operators might override/ignore/reject such changes, for all/some 
timers - like maybe hypothetical WindowOperatorWithTTLTimers registering two 
different types of timers could honour the setting for firing results, but 
would always drop the TTL timers{quote}
I also agree with these two levels, namely the default behaviors of operators 
and per-timer behavior. And with my understandings the two levels of behaviors 
are decided by the operator logics.
{quote}Furthermore, actually WindowOperator users might be interested in either 
of the three settings for the processing time windows - depending on the 
business logic it might be the most appropriate to either: fire immediately, 
drop the timers, or wait for the timers to fire naturally.
{quote}
I got the thoughts behinds the design, but I'm not quite sure if users have 
requirements in real-life jobs, thus I think I'm neutral with the API of 
{{org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour.}}
 Do you have more inputs for this point? Or do you think it might be also 
acceptable if we postpone adding this API as a second step till we have more 
inputs from users?

Also since this issue seems to require changes in the users' API, perhaps we 
turn it to be a public discussion in the mail list based on the current 
conclusions~?

> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
>                 Key: FLINK-18647
>                 URL: https://issues.apache.org/jira/browse/FLINK-18647
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Priority: Not a Priority
>              Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
>     CANCEL_ON_END_OF_INPUT, 
>     TRIGGER_ON_END_OF_INPUT,
>     WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
>     void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer, 
> TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to