Do we really need to have separate methods for triggering/waiting/cancelling. To me it sounds rather counterintuitive. Why can't users just execute whatever they want in the handler itself instead of additional back and forth with the system? Moreover it still have the downside that if you call back to the `onTimer` method after `trigger` you have access to the Context which lets you register new timers.

I find following approach much simpler:

void onTimer(...) {
 doHandleTimer(timestamp);
}

void processPendingTimer(...) {
 // trigger
 doHandleTimer(timestamp);

 // for cancel, simply do nothing...
}

   Sorry I might not make it very clear here. I think the difficulty with 
supported
   setting the currentKey is a special issue for the callback options (no 
matter what
   the interface is) since it allows users to execute logic other than the one 
registered
   with the timers.
   The complexity comes from that currently we have two level of TimerServices:
   The ProcessingTimerService (there is no key) and InternalTimerService (with 
key).
   Currently only ProcessingTimerService is exposed to the runtime and
   InternalTimerService is much more a utility to implement the operator.
   Then with the current code, the runtime could only access to
   ProcessingTimerService on termination.


I think I am not convinced to these arguments. First of all I'm afraid there is no clear distinction in that area what is runtime and what is not. I always found `AbstracStreamOperator(*)` actually part of runtime or Flink's internals and thus I don't find `InternalTimerService` a utility, but a vital part of the system. Let's be honest it is impossible to implement an operator without extending from `AbstractStreamOperator*`.

What would be the problem with having a proper implementation in `InternalTimerService`?  Can't we do it like this?:

AbstractStreamOperator#finish() {
  internalTimerService.finish();
}

InternalTimerService#finish() {
          while ((timer = processingTimeTimersQueue.peek()) != null) {
            keyContext.setCurrentKey(timer.getKey());
            processingTimeTimersQueue.poll();
            onEndOfInputHandler.processPendingTimer(timer);
        }
}

   If we only executes some predefined actions, we do not need to worry about 
the implementation of
   InternalTimerService and just execute the registered timers. But if we allow
   users to execute arbitrary logic, we need to be also aware of the 
InternalTimerServices and parse the key from the timers stored in it.
   I think we should always have method to overcome this issue,
   but to support the callback options would be more complex.

I am not sure, having "predefined actions" would be good enough that we do not need to set a key. As a user I'd anyhow expect the proper key to be set in processPendingTimer.

Best,

Dawid

On 24/11/2022 08:51, Yun Gao wrote:
Hi Piotr / Divye,
Very thanks for the discussion!
First IMO it seems we have reached the consensus
on the high-level API: Most operators should usually
have only one reasonable action to the pending timers
on termination, thus we could let the operators to
implement its own actions with the low-level interface
provided. The only exception is the ProcessFunction, with
which users might register customized timers, thus users
might also defines the actions on termination (If I have
misunderstandings here, please correct me).
For the low-level API, I could get the benefits with the callback
options: since in most cases an operator has only one action
to all the timers, its a waste for us to store the same flag for
all the timers, also with a lot of code / state format changes.
But since it is enough for most users to simply trigger / cacnel
the timers, it would be redundant for users to implement the logic
twice. Thus perhaps we might combine the benefits of the two options:
We might have a separate interface
public interface TimerHandlersOnTermination {
  void processPendingTimer(Timer timer, long currentTime);
}
public class Timer {
  long getRegisteredTimestamp();
  void trigger();
  void waitFor();
  void cancel();
}
Then if an operator have implemented the TimerHandlersOnTermination
interface, on termination we could call processPendingTimer(xx) for every
pending timers. Users might simply trigger / waitFor / cancel it, or execute
some other logics if needed.
Then for the ProcessFunction we might have a similar interface to 
processPendingTimer, except we might need to provide Context / Collector
to the ProcessFunction.
Do you think this would be a good direction?
Also @Piotr
I don't see a problem here. Interface doesn't have to reflect that, only
the runtime must set the correct key context before executing the handler
dealing with the processing time timers at the end of input/time.
Sorry I might not make it very clear here. I think the difficulty with supported
setting the currentKey is a special issue for the callback options (no matter 
what
the interface is) since it allows users to execute logic other than the one 
registered
with the timers.
The complexity comes from that currently we have two level of TimerServices:
The ProcessingTimerService (there is no key) and InternalTimerService (with 
key).
Currently only ProcessingTimerService is exposed to the runtime and
InternalTimerService is much more a utility to implement the operator.
Then with the current code, the runtime could only access to
ProcessingTimerService on termination. If we only executes some predefined 
actions, we do not need to worry about the implementation of
InternalTimerService and just execute the registered timers. But if we allow
users to execute arbitrary logic, we need to be also aware of the 
InternalTimerServices and parse the key from the timers stored in it.
I think we should always have method to overcome this issue,
but to support the callback options would be more complex.
Best,
Yun Gao
------------------------------------------------------------------
From:Divye Kapoor<dkap...@pinterest.com.INVALID>
Send Time:2022 Nov. 24 (Thu.) 08:50
To:dev<dev@flink.apache.org>
Cc:Xenon Development Team<xenon-...@pinterest.com>
Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on 
Job Termination
Sounds good. Looks like we're on the same page.
Thanks!
Divye
On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski<pnowoj...@apache.org>  wrote:
Hi Divye

I think we are mostly on the same page. Just to clarify/rephrase:

One thing to think about - on EOF “trigger immediately” will mean that
the
asynchronous wait timeout timers will also fire - which is undesirable
I didn't mean to fire all timers immediately in all of the built-in
operators. Just that each built-in operator can have a hard coded way
(without a way for users to change it) to handle those timers. Windowed
operators would trigger the lingering timers (flush outputs),
AsyncWaitOperator could just ignore them. The same way users could register
EOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we
(as flink developers) could use the same mechanism to implement any
behaviour we want for the built-in operators. There should be no need to
add any separate mechanism.

Best,
Piotrek

śr., 23 lis 2022 o 08:21 Divye Kapoor<dkap...@pinterest.com.invalid>
napisał(a):

Thanks Yun/Piotrek,

Some brief comments inline below.

On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski<pnowoj...@apache.org>
wrote:

Hi,

All in all I would agree with Dawid's proposal.
+1

We can add the flexibility
of how to deal with the timers in the low level API via adding a
handler
-
if someone needs to customize it, he will always have a workaround.
Note
after giving it more thought, I agree that registering some handlers is
better than overloading the register timer method and modifying the
timer's
state.
+1.


At the same time, we can force the most sensible semantic that we think
for
the couple of built-in operators, which should be pretty
straightforward
(either ignore the timers, or fire them at once). I agree there might
be
some edge cases, that theoretically user might want to wait for the
timer
to fire naturally, but:
1. I'm not sure how common in practice this will be. If not at all,
then
why should we be complicating the API/system?
That’s fair.
However, the specifics are very important here.

One thing to think about - on EOF “trigger immediately” will mean that
the
asynchronous wait timeout timers will also fire - which is undesirable
(because they are racing with the last async call). However, the issue is
cleanly resolved by waiting for the timer to be canceled when the last
event is processed. (“Wait for” case).

Ignoring the timer has the least justification. Registering the handler
as
per Dawid’s proposal and having that handler unregister the timers on EOF
makes best sense. This solution also unifies the trigger immediately case
as that handler can reregister the timers for early termination.

The proposal:
1. Operator receives EOF
2. EOF timer handler triggers
3. EOF handler adjusts the registered timers for early trigger or ignore.
If wait-for behavior is desired, timers are not changed. This is
controlled
in client code.
4. Operator waits for all timers to drain/trigger. (“Always”). There is
no
special handling for ignore/early trigger.
5. Operator allows job to proceed with shutdown.

The only api change needed is an EOF handler.
The other agreement we need is that “Wait for” is the desired behavior in
processing time and that processing time is fundamentally different from
event time in this respect.
(I have changed my thinking since the last mail).

2. We can always expand the API in the future, and let the user override
the default built-in behaviour of the operators via some setter on the
stream transformation (`SingleOutputStreamOperator`), or via some
custom
API DSL style in each of the operators separately.

This is not required. See above.


Re forcing the same semantics for processing time timers as for event
time
ones - this is tempting, but indeed I see a possibility that users need
to
adhere to some external constraints when using processing time.
+1. As above, we should consider the 2 cases fundamentally different in
this area.

Re: Yun -

b) Another issue is that what if users use timers with different
termination actions in the same
operator / UDF? For example, users use some kind of timeout (like
throws
exception if some thing
not happen after some other thing), and also some kind of window
aggregation logic. In this case,
without additional tags, users might not be able to distinguish
which
timer should be canceled and
which time should be triggered ?
as above. The EOF handler makes the choice.

4. How could these scenarios adjust their APIs ?
 From the current listed scenarios, I'm more tend to that as @Dawid
pointed out, there might be only
one expected behavior for each scenario, thus it does not seems to
need
to allow users to adjust the
behavior. Thus @Divye may I have a double confirmation currently do
we
have explicit scenarios that
is expected to change the different behaviors for the same scenario?
Wait-for behavior is probably the only expected behavior and any
alterations should be from the EOF handler managing the registered
timers.
Besides @Divye from the listed scenarios, I have another concern for
global configuration is that for
one job, different operators seems to still might have different
expected
behaviors. For example, A
job using both Window operator and AsyncWaitOperator might have
different
requirements for timers
on termination?

Thank you for raising this case. This changed my thinking. Based on your
point, we should try and align on the “Wait-for” with EOF handler
proposal.
I’m withdrawing the “single-runtime-config” proposal.

Best,
Divye

Attachment: OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to