[ https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616262#comment-17616262 ]
Yun Gao commented on FLINK-18647: --------------------------------- Hi [~pnowojski] sorry for the long delay due to just back from the holiday. {quote} - each operator might have different default behaviour - some operators might override/ignore/reject such changes, for all/some timers - like maybe hypothetical WindowOperatorWithTTLTimers registering two different types of timers could honour the setting for firing results, but would always drop the TTL timers{quote} I also agree with these two levels, namely the default behaviors of operators and per-timer behavior. And with my understandings the two levels of behaviors are decided by the operator logics. {quote}Furthermore, actually WindowOperator users might be interested in either of the three settings for the processing time windows - depending on the business logic it might be the most appropriate to either: fire immediately, drop the timers, or wait for the timers to fire naturally. {quote} I got the thoughts behinds the design, but I'm not quite sure if users have requirements in real-life jobs, thus I think I'm neutral with the API of {{org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour.}} Do you have more inputs for this point? Or do you think it might be also acceptable if we postpone adding this API as a second step till we have more inputs from users? Also since this issue seems to require changes in the users' API, perhaps we turn it to be a public discussion in the mail list based on the current conclusions~? > How to handle processing time timers with bounded input > ------------------------------------------------------- > > Key: FLINK-18647 > URL: https://issues.apache.org/jira/browse/FLINK-18647 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.11.0 > Reporter: Piotr Nowojski > Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > (most of this description comes from an offline discussion between me, > [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb]) > In case of end of input (for example for bounded sources), all pending > (untriggered) processing time timers are ignored/dropped. In some cases this > is desirable, but for example for {{WindowOperator}} it means that last > trailing window will not be triggered, causing an apparent data loss. > There are a couple of ideas what should be considered. > 1. Provide a way for users to decide what to do with such timers: cancel, > wait, trigger immediately. For example by overloading the existing methods: > {{ProcessingTimeService#registerTimer}} and > {{ProcessingTimeService#scheduleAtFixedRate}} in the following way: > {code:java} > ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback > target, TimerAction timerAction); > enum TimerAction { > CANCEL_ON_END_OF_INPUT, > TRIGGER_ON_END_OF_INPUT, > WAIT_ON_END_OF_INPUT} > {code} > or maybe: > {code} > public interface TimerAction { > void onEndOfInput(ScheduledFuture<?> timer); > } > {code} > But this would also mean we store additional state with each timer and we > need to modify the serialisation format (providing some kind of state > migration path) and potentially increase the size foot print of the timers. > Extra overhead could have been avoided via some kind of {{Map<Timer, > TimerAction>}}, with lack of entry meaning some default value. > 2. > Also another way to solve this problem might be let the operator code decide > what to do with the given timer. Either ask an operator what should happen > with given timer (a), or let the operator iterate and cancel the timers on > endOfInput() (b), or just fire the timer with some endOfInput flag (c). > I think none of the (a), (b), and (c) would require braking API changes, no > state changes and no additional overheads. Just the logic what to do with the > timer would have to be “hardcoded” in the operator’s code. (which btw might > even has an additional benefit of being easier to change in case of some > bugs, like a timer was registered with wrong/incorrect {{TimerAction}}). > This is complicated a bit by a question, how (if at all?) options a), b) or > c) should be exposed to UDFs? > 3. > Maybe we need a combination of both? Pre existing operators could implement > some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be > handled by 1.? -- This message was sent by Atlassian Jira (v8.20.10#820010)