Hi Dawid,
Thanks for the comments!
As a whole I'm also open to the API and I also prefer to use simple 
but flexible interfaces, but it still looks there are some problem to 
just let users to implement the termination actions. 
Let's take the WindowOperator as an example. As seen in [1],
in the timer processing logic it needs to acquire the key / namespace
information bound to the timer (which is only supported by the 
InternalTimerService).
Thus if we want users to implement the same logic on termination, we either let 
users 
to trigger the timer handler directly or we also allows users to access these 
piece of 
information. If we go with the later direction, we might need to provide 
interfaces like
interface PendingTimerProcessor<KEY, NAMESPACE> {
void onTimer(Timer<KEY, NAMESPACE> timer) {
 doHandleTimer(timer);
 }
}
class Timer<KEY, NAMESPACE> {
 long getTimestamp();
 KEY getKey();
 NAMESPACE getNamespace();
}
Then we'll have the issue that since we need the interface to handle both of 
cases of 
InternalTimerSerivce and raw ProcessTimeService, the later do not have key and 
namespace information attached, and its also be a bit inconsistency for users 
to have to set 
the KEY and NAMESPACE types.
Besides, it looks to me that if we want to implement behaviors like waiting 
for, it might
be not simply reuse the time handler time, then it requires every operator 
authors to 
re-implement such waiting logics. 
> Moreover it still have the downside that if you call back to the `onTimer` 
> method after 
> `trigger` you have access to the Context which lets you register new timers.
I think we could simply drop the timers registered during we start processing 
the pending timers
on termination. Logically there should be no new data after termination.
> I think I am not convinced to these arguments. First of all I'm afraid there 
> is no clear distinction 
> in that area what is runtime and what is not. I always found 
> `AbstracStreamOperator(*)` actually part 
> of runtime or Flink's internals and thus I don't find `InternalTimerService` 
> a utility, but a vital part 
> of the system. Let's be honest it is impossible to implement an operator 
> without extending from 
> `AbstractStreamOperator*`.What would be the problem with having a proper 
> implementation in 
> `InternalTimerService`? Can't we do it like this?:
I think the original paragraph is only explanation to that the interface is 
harder to support if we 
allows the users to implement the arbitrary logic. But since now we are at the 
page with the callback 
option, users could always be allowed to implement arbitrary logic no matter we 
support timer.trigger()
 or not, thus I think now there is no divergence on this point. I also believe 
in we'll finally have some logic 
similar to the proposed one that drain all the times and process it. 
Best,
Yun Gao
[1] 
https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
 
<https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
 >
------------------------------------------------------------------
From:Dawid Wysakowicz <dwysakow...@apache.org>
Send Time:2022 Nov. 28 (Mon.) 23:33
To:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job 
Termination
Do we really need to have separate methods for triggering/waiting/cancelling. 
To me it sounds rather counterintuitive. Why can't users just execute whatever 
they want in the handler itself instead of additional back and forth with the 
system? Moreover it still have the downside that if you call back to the 
`onTimer` method after `trigger` you have access to the Context which lets you 
register new timers.
I find following approach much simpler:
 void onTimer(...) {
 doHandleTimer(timestamp);
 }
 void processPendingTimer(...) {
 // trigger
 doHandleTimer(timestamp);
 // for cancel, simply do nothing...
 }
Sorry I might not make it very clear here. I think the difficulty with 
supported setting the currentKey is a special issue for the callback options 
(no matter what the interface is) since it allows users to execute logic other 
than the one registered with the timers. The complexity comes from that 
currently we have two level of TimerServices: The ProcessingTimerService (there 
is no key) and InternalTimerService (with key). Currently only 
ProcessingTimerService is exposed to the runtime and InternalTimerService is 
much more a utility to implement the operator. Then with the current code, the 
runtime could only access to ProcessingTimerService on termination. 
 I think I am not convinced to these arguments. First of all I'm afraid there 
is no clear distinction in that area what is runtime and what is not. I always 
found `AbstracStreamOperator(*)` actually part of runtime or Flink's internals 
and thus I don't find `InternalTimerService` a utility, but a vital part of the 
system. Let's be honest it is impossible to implement an operator without 
extending from `AbstractStreamOperator*`.
What would be the problem with having a proper implementation in 
`InternalTimerService`? Can't we do it like this?:
 AbstractStreamOperator#finish() {
 internalTimerService.finish();
 }
 InternalTimerService#finish() {
 while ((timer = processingTimeTimersQueue.peek()) != null) {
 keyContext.setCurrentKey(timer.getKey());
 processingTimeTimersQueue.poll();
 onEndOfInputHandler.processPendingTimer(timer);
 }
 }
If we only executes some predefined actions, we do not need to worry about the 
implementation of InternalTimerService and just execute the registered timers. 
But if we allow users to execute arbitrary logic, we need to be also aware of 
the InternalTimerServices and parse the key from the timers stored in it. I 
think we should always have method to overcome this issue, but to support the 
callback options would be more complex. 
I am not sure, having "predefined actions" would be good enough that we do not 
need to set a key. As a user I'd anyhow expect the proper key to be set in 
processPendingTimer.
Best,
Dawid
On 24/11/2022 08:51, Yun Gao wrote:
Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we have 
reached the consensus on the high-level API: Most operators should usually have 
only one reasonable action to the pending timers on termination, thus we could 
let the operators to implement its own actions with the low-level interface 
provided. The only exception is the ProcessFunction, with which users might 
register customized timers, thus users might also defines the actions on 
termination (If I have misunderstandings here, please correct me). For the 
low-level API, I could get the benefits with the callback options: since in 
most cases an operator has only one action to all the timers, its a waste for 
us to store the same flag for all the timers, also with a lot of code / state 
format changes. But since it is enough for most users to simply trigger / 
cacnel the timers, it would be redundant for users to implement the logic 
twice. Thus perhaps we might combine the benefits of the two options: We might 
have a separate interface public interface TimerHandlersOnTermination { void 
processPendingTimer(Timer timer, long currentTime); } public class Timer { long 
getRegisteredTimestamp(); void trigger(); void waitFor(); void cancel(); } Then 
if an operator have implemented the TimerHandlersOnTermination interface, on 
termination we could call processPendingTimer(xx) for every pending timers. 
Users might simply trigger / waitFor / cancel it, or execute some other logics 
if needed. Then for the ProcessFunction we might have a similar interface to 
processPendingTimer, except we might need to provide Context / Collector to the 
ProcessFunction. Do you think this would be a good direction? Also @Piotr I 
don't see a problem here. Interface doesn't have to reflect that, only the 
runtime must set the correct key context before executing the handler dealing 
with the processing time timers at the end of input/time. Sorry I might not 
make it very clear here. I think the difficulty with supported setting the 
currentKey is a special issue for the callback options (no matter what the 
interface is) since it allows users to execute logic other than the one 
registered with the timers. The complexity comes from that currently we have 
two level of TimerServices: The ProcessingTimerService (there is no key) and 
InternalTimerService (with key). Currently only ProcessingTimerService is 
exposed to the runtime and InternalTimerService is much more a utility to 
implement the operator. Then with the current code, the runtime could only 
access to ProcessingTimerService on termination. If we only executes some 
predefined actions, we do not need to worry about the implementation of 
InternalTimerService and just execute the registered timers. But if we allow 
users to execute arbitrary logic, we need to be also aware of the 
InternalTimerServices and parse the key from the timers stored in it. I think 
we should always have method to overcome this issue, but to support the 
callback options would be more complex. Best, Yun Gao 
------------------------------------------------------------------ From:Divye 
Kapoor <dkap...@pinterest.com.INVALID> <mailto:dkap...@pinterest.com.INVALID > 
Send Time:2022 Nov. 24 (Thu.) 08:50 To:dev <dev@flink.apache.org> 
<mailto:dev@flink.apache.org > Cc:Xenon Development Team 
<xenon-...@pinterest.com> <mailto:xenon-...@pinterest.com > Subject:Re: Re: 
[DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination 
Sounds good. Looks like we're on the same page. Thanks! Divye On Wed, Nov 23, 
2022 at 2:41 AM Piotr Nowojski <pnowoj...@apache.org> 
<mailto:pnowoj...@apache.org > wrote: Hi Divye I think we are mostly on the 
same page. Just to clarify/rephrase: One thing to think about - on EOF “trigger 
immediately” will mean that the asynchronous wait timeout timers will also fire 
- which is undesirable I didn't mean to fire all timers immediately in all of 
the built-in operators. Just that each built-in operator can have a hard coded 
way (without a way for users to change it) to handle those timers. Windowed 
operators would trigger the lingering timers (flush outputs), AsyncWaitOperator 
could just ignore them. The same way users could register EOF timer handlers in 
the ProcessFunction as Dawid Wysakowicz proposed, we (as flink developers) 
could use the same mechanism to implement any behaviour we want for the 
built-in operators. There should be no need to add any separate mechanism. 
Best, Piotrek śr., 23 lis 2022 o 08:21 Divye Kapoor 
<dkap...@pinterest.com.invalid> <mailto:dkap...@pinterest.com.invalid > 
napisał(a): Thanks Yun/Piotrek, Some brief comments inline below. On Tue, Nov 
22, 2022 at 1:37 AM Piotr Nowojski <pnowoj...@apache.org> 
<mailto:pnowoj...@apache.org > wrote: Hi, All in all I would agree with Dawid's 
proposal. +1 We can add the flexibility of how to deal with the timers in the 
low level API via adding a handler - if someone needs to customize it, he will 
always have a workaround. Note after giving it more thought, I agree that 
registering some handlers is better than overloading the register timer method 
and modifying the timer's state. +1. At the same time, we can force the most 
sensible semantic that we think for the couple of built-in operators, which 
should be pretty straightforward (either ignore the timers, or fire them at 
once). I agree there might be some edge cases, that theoretically user might 
want to wait for the timer to fire naturally, but: 1. I'm not sure how common 
in practice this will be. If not at all, then why should we be complicating the 
API/system? That’s fair. However, the specifics are very important here. One 
thing to think about - on EOF “trigger immediately” will mean that the 
asynchronous wait timeout timers will also fire - which is undesirable (because 
they are racing with the last async call). However, the issue is cleanly 
resolved by waiting for the timer to be canceled when the last event is 
processed. (“Wait for” case). Ignoring the timer has the least justification. 
Registering the handler as per Dawid’s proposal and having that handler 
unregister the timers on EOF makes best sense. This solution also unifies the 
trigger immediately case as that handler can reregister the timers for early 
termination. The proposal: 1. Operator receives EOF 2. EOF timer handler 
triggers 3. EOF handler adjusts the registered timers for early trigger or 
ignore. If wait-for behavior is desired, timers are not changed. This is 
controlled in client code. 4. Operator waits for all timers to drain/trigger. 
(“Always”). There is no special handling for ignore/early trigger. 5. Operator 
allows job to proceed with shutdown. The only api change needed is an EOF 
handler. The other agreement we need is that “Wait for” is the desired behavior 
in processing time and that processing time is fundamentally different from 
event time in this respect. (I have changed my thinking since the last mail). 
2. We can always expand the API in the future, and let the user override the 
default built-in behaviour of the operators via some setter on the stream 
transformation (`SingleOutputStreamOperator`), or via some custom API DSL style 
in each of the operators separately. This is not required. See above. Re 
forcing the same semantics for processing time timers as for event time ones - 
this is tempting, but indeed I see a possibility that users need to adhere to 
some external constraints when using processing time. +1. As above, we should 
consider the 2 cases fundamentally different in this area. Re: Yun - b) Another 
issue is that what if users use timers with different termination actions in 
the same operator / UDF? For example, users use some kind of timeout (like 
throws exception if some thing not happen after some other thing), and also 
some kind of window aggregation logic. In this case, without additional tags, 
users might not be able to distinguish which timer should be canceled and which 
time should be triggered ? as above. The EOF handler makes the choice. 4. How 
could these scenarios adjust their APIs ? From the current listed scenarios, 
I'm more tend to that as @Dawid pointed out, there might be only one expected 
behavior for each scenario, thus it does not seems to need to allow users to 
adjust the behavior. Thus @Divye may I have a double confirmation currently do 
we have explicit scenarios that is expected to change the different behaviors 
for the same scenario? Wait-for behavior is probably the only expected behavior 
and any alterations should be from the EOF handler managing the registered 
timers. Besides @Divye from the listed scenarios, I have another concern for 
global configuration is that for one job, different operators seems to still 
might have different expected behaviors. For example, A job using both Window 
operator and AsyncWaitOperator might have different requirements for timers on 
termination? Thank you for raising this case. This changed my thinking. Based 
on your point, we should try and align on the “Wait-for” with EOF handler 
proposal. I’m withdrawing the “single-runtime-config” proposal. Best, Divye 

Reply via email to