Hey all,

Sorry for being rather late to the party. I'd like to chime in a few thoughts.

I agree there are at least two aspects to the problem:

1) lower level handling (StreamOperator)

2) higher level APIs (WindowOperator, CEP, ProcessFunction, ...)

First is how do we want to deal with it on a lower level. By lower level I mean AbstractStreamOperator and lower. To be honest I am against adding a property to every registered timer, as it is in the current proposal. It introduces many problems for, in my opinion, really rare case. In the end it applies only to jobs that finish and have registered processing timers.

Some problems that I see:

 * Changes the checkpoint format and stores most of the time
   unnecessary information.
 * We'd need to somehow inform the ProcessingTimeCallback (and alike)
   that a timer is triggered at the end of job. This is necessary at
   least, so that implementors know they should not register any new
   timers.
 * Introduces a method overload. I guess we would want to deprecate the
   method without TimerEndAction at some point.

I'd rather go with a variation of the other proposal from FLINK-18647 and add a hook for what should happen at the end of input. I look at the problem rather from the perspective of endOfInput, not individual timers. This could be something like:

|public| |interface| |ProcessingTimeService {|
|  void registerOnEndOfInputHandler(Consumer<Long> onEndOfInput)|
|
|
|  // we could even think of having a single callback for all timers at once
|
|  void registerOnEndOfInputHandler(Consumer<List<Long>> onEndOfInput)|||||
|}
|


As for the higher level API, I'm not convinced to having a single

|@Public|
|public| |class| |SingleOutputStreamOperator<T> ||extends| |DataStream<T> {|
|||…|
|||void| |setTimerTerminationAction(TimerTerminationAction action);|
|||…|
|}|

All of the discussed APIs have/are specific DSLs and introducing a disjoint approach does not seem natural to me. Users do not/should not think about timers when they define a Window or CEP pattern. They deal with triggers/windows or timeout conditions. Moreover I strongly believe in each of these cases there is only a single valid behaviour. I can't really imagine why would anyone want to discard windows at the end. There is the argument of the "waiting the exact time", but this is of questionable use to me and I'd prefer to see a real request for that. So far, the way I understood, it's more of "we think somebody may come up with such request". My suggestion would be to simply change the behaviour for WindowOperator and CEPOperator without adding any additional API. I understand it changes the semantics a bit, but a) I see it as defining so far undefined behaviour b) again, can't imagine someone depends on the broken behaviour where contents of last windows is dropped. We could of course add a way to change the behaviour back if it annoys someone. On this topic I'm really open for discussion. This would leave only the ProcessFunction to consider, as there one can implement any custom handling. In my opinion this can be approach similarly as the lower APIs, by introducing a hook for timers at the endOfInput, e.g.

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
  public void onEndOf(Timers)/(Input)(...) {}

 // or maybe
 public void onUntriggeredTimer(long timer) {}
}

In either of the above methods there would be no Context, so no way to register new timers.
Let me know what you think.
Best,
Dawid
On 15/11/2022 09:17, Yun Gao wrote:
Hi Dong,
Very thanks for the discussion!
It appears that the issues mentioned in the motivation section
are all related to using Windows on the DataStream API, where
the user's code typically does not have anything to do with Timer.
IMO it might not only bounded to the case of window operators. For
examples, users might implements some complex aggregation logic
with ProcessFunction directly. In this case, users might also want to
control how these times are dealt at the end of the stream.
IMO, the main benefit of this alternative solution is that it is more
consistent with the existing Windows API. Users who are concerned
with firing windows on end-of-stream won't need to additionally
understand/handle timer.
First to summary the problem, currently it could be divided into two
layers:
1. In the lower layer we need to support different actions to deal with
the timers at the end of the stream (though in fact we need to deduct
whether we need this ability from the api, but for simplicity I'll first 
describe
this layer since the divergency happen in the higher level).
2. How we let users to specify the actions at the end of the timer? Currently
we have different options on this layer.
  - The first option is to have a unified SingleOperatorStream#
  setTimerTerminationAction.
  - The second option is to have a specialized trigger for the window.
With whichever interface, in the window operator need to set proper
termination actions according to the specified semantics when registering
timers.
On the other side, specially to the WindowOperator, the interface might not
only related to the timers, there are also window types, e.g. CountWindow,
  that also need to specify the behavior at the end of stream.
Therefore, for window operators it looks to me it would be indeed more friendly
to users to have a uniform API. Since different operators might have different
situations, I wonder it would be better if we first:
1. The operator authors could still set the default actions when registering 
timers.
2. Each operator consider its API distinctly.
  - Window operator provides a uniform API.
  - Except for window, Currently it looks to me that users could register 
customized
  timers only with the family of ProcessFunctions. Users could still set 
actions for
  each timer, and we may first only provide a method for ProcessOperator to 
change
  the per-timer actions uniformly when building the DAG?
we need the task thread to be blocked until the timer gets triggered on the 
registered time
point.
Currently I do not have real-life scenarios, but some authenticated cases are
- Users want the job stopped at the boundary of windows when stopping the job 
with savepoint --drain.
- Users have timers to emit message to external systems periodically, and users 
want to have one finalize
message at the end of stream.
But I also think we could add more actions step-by-step.
I might have missed use-cases for this FLIP which do not involve windows.
If so, could you help explain the use-case in this FLIP?
I'll complete the scenarios in the FLIP.
Best,
Yun Gao
------------------------------------------------------------------
From:Dong Lin<lindon...@gmail.com>
Send Time:2022 Nov. 10 (Thu.) 11:43
To:dev<dev@flink.apache.org>
Cc:Maximilian Michels<m...@apache.org>
Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on 
Job Termination
Hi Piotr,
I also think the scenario mentioned in this FLIP is useful to address. I am
happy to discuss this together and figure out the more usable APIs.
I guess the choice of API pretty much depends on when users need to use it.
I am assuming it is needed when using dataStream.window(...). Is there any
other case that needs this feature?
It is mentioned in FLINK-18647
<https://issues.apache.org/jira/browse/FLINK-18647>  
<https://issues.apache.org/jira/browse/FLINK-18647>  > that we need the task
thread to be blocked until the timer gets triggered on the registered time
point. The JIRA and the FLIP do not seem to provide the use-case for this
feature. Could you explain more about the use-cases that might need this
feature?
What do you think of the alternative API vs. the approach proposed in the
FLIP? Maybe we can continue the discussion by detailing the pros/cons.
Best,
Dong
On Wed, Nov 9, 2022 at 4:35 PM Piotr Nowojski<pnowoj...@apache.org>  wrote:
Hi all,

Big thanks to Yun Gao for driving this!

I wonder whether we need to add a new option when registering timers.
Won't
it be sufficient to flush all pending timers on termination but not allow
new ones to be registered?
Maximilian, I'm sure that single semantics is not enough. All three that
are proposed here (cancel, wait, trigger immediately) were requested by
users.

Dong, as I initially wrote in the above-mentioned ticket [1] I'm personally
open to discussions about the final shape of the API.

Best,
Piotrek

[1]https://issues.apache.org/jira/browse/FLINK-18647  
<https://issues.apache.org/jira/browse/FLINK-18647 >

wt., 8 lis 2022 o 03:42 Yun Gao<yungao...@aliyun.com.invalid>  napisał(a):

Hi Maximilian,

Thanks for the discussion! It seems there are still other kinds of
scenarios
that could not be flushed, like scenarios like "emit record X if record Y
hasn't
arrived within 30 seconds after record Z" or "fails the job if the
external system
does not response in 30 seconds", these timers seems should be dropped
instead of
triggering. Thus we think it would be necessary to provide per-timer
configuration.

Best,
Yun Gao




------------------Original Mail ------------------
Sender:Maximilian Michels<m...@apache.org>
Send Date:Fri Nov 4 21:35:58 2022
Recipients:Flink Dev<dev@flink.apache.org>, Yun Gao <
yungao...@aliyun.com
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
on
Job Termination
Hey Yun,

I wonder whether we need to add a new option when registering timers.
Won't
it be sufficient to flush all pending timers on termination but not allow
new ones to be registered?

-Max

On Wed, Nov 2, 2022 at 11:20 AM Yun Gao<yungao...@aliyun.com.invalid>
wrote:

Hi everyone,
I would like to open a discussion[1] on how to
properly handle the processing timers on job
termination.
Currently all the processing timers would be
ignored on job termination. This behavior is
not suitable for some cases like WindowOperator.
Thus we'd like to provide more options for how
to deal with the pending times on job termination,
and provide correct semantics on bounded stream
for these scenarios. The FLIP is based on the previous
discussion with Piotr and Divye in [2].
[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination <https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination >
<

https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination <https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination >
[2]https://issues.apache.org/jira/browse/FLINK-18647  
<https://issues.apache.org/jira/browse/FLINK-18647 >  <
https://issues.apache.org/jira/browse/FLINK-18647  
<https://issues.apache.org/jira/browse/FLINK-18647 >  >

Attachment: OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to