I see the problem differently. I really don't think this should be looked at from the perspective of timers, but rather from the semantics of endOfInput.

   If a proc time timer is set for 10s, the 10s are "real-time" and the delay
   is important.
   One particularly important example is "waiting" for other parts of the
   system to settle down.
   For example: Async operators where an async call writes data and
   waits for 10s seconds and then reads it back from a different service. If
   we don't wait for
   the timers to fire 10s later, it breaks the implicit guarantee of the
   operator.

Honestly, I don't get this point. Do you really run a production pipeline on a bounded data where you have "early" (or in general processing time driven) firings? Using processing timers on bounded data sounds really wrong to me, as there are no guarantees on the speed of data processing and what gets processed in the period of time.

Personally I really don't like the "WAIT_FOR_TIMERS" mode and delaying Job shutdown from within an operator as it in my opinion is a) unnecessary b) complicates the shutdown procedure.

The main benefit for me of `onEndOfInput` over having a flag in `OnTimerContext` is that you can not end in an infinite loop there as you don't have a way to register new timers. It also does not mix a really corner case (handling end of data) with regular logic (onTimer). Here I am happy to be convinced otherwise as long as we don't add any flags to `|registerProcessingTimeTimer`, |which would mean we need to modify the snapshot format.

Best,

Dawid
||



On 19/11/2022 02:19, Divye Kapoor wrote:
Hi Folks,
I raised the initial issue with Yun and others (apologies for the long
email below, I promise it's worth the read).
Speaking as someone who has hit this issue in a practical way, Dawid's
first solution is the simplest and easiest way to fix the problem without
introducing additional complexity.

If we are to summarize the problem --
1. EventTime jobs always have their windows fired on EOF. The window
termination behavior is well defined wrt Timers and **consistent w/ Event
time guarantees**.
2. ProcessingTime jobs have undefined behavior for Timers on EOF (they may
execute, they may not execute or they may partly execute). There are no
guarantees.

As a consequence of (2), the last windows of a proc time job are thrown
away.
The minimal solution is to "fix" (2) by defining the behavior similar to
(1) with no API changes.
As Dawid said, this is a semantic change and I consider it similar to a
"bug-fix".
(From an implementation perspective, it would be waiting on a condition
variable (wait indefinitely till timers.size() > 0) before termination).

The main benefit/usecase is that it allows Minicluster integration tests
for CEP operators will then work as intended.
Today they are not possible / broken.
(in a sense, it's a strict improvement because there is already an
immediate pain that's being solved).

As Lincoln said:
I'd like to start with a question: Why are people not complaining in
eventtime mode?
+1 - because the behavior of timers is consistent with event time
expectations.
Timers will "fast-fire" because event time has moved to LONG_MAX.

So what I'm trying to say is that if it's acceptable for everyone to
trigger all untriggered eventtime timers directly when endOfInput in
eventtime mode, why don't we keep the same behavior in `proctime` by
default?

because the behavior of timers in proc time needs to be consistent with
proc time expectations.
If a proc time timer is set for 10s, the 10s are "real-time" and the delay
is important.
One particularly important example is "waiting" for other parts of the
system to settle down.
For example: Async operators where an async call writes data and
waits for 10s seconds and then reads it back from a different service. If
we don't wait for
the timers to fire 10s later, it breaks the implicit guarantee of the
operator.

We can work around this behavior but that's undesirable. When people are
configuring timers, they
should get exactly what they have asked for.

Re: Dong's view:
Since this FLIP can add quite some complexity to Flink, it will be really
useful to understand the concrete case-case for the proposed changes so
that we can identify the approach with minimum extra complexity.
+1.
The concrete usecase hit multiple times by teams at Pinterest is that
stateful (CEP) operators cannot be integ tested on the Minicluster
(especially **Session windows in processing time**) because the job shuts
down as soon as the source shuts down without generating the last session.
This cannot be worked around even by sending dummy messages - a fully
custom source would be required
to artificially delay the EOF till the timers fire. This is generally too
much effort and not scalable. From our perspective,
it's a bug that a timer was registered but was not fired before job
termination.

Looking at the options proposed -
I would suggest that we try and agree on the first part of Dawid's proposal
-
The first and most critical part is that we define the expected EOF
behavior for proc time timers.
For the reasons posted above, immediate triggering breaks proc time
guarantees. (it is a desirable feature but it should not be the default).
The default should be triggering Timers as per their schedule and terminate
when there are no more timers to run. I hope that this portion of the
discussion will end up being less controversial.

The 2nd portion of the discussion is to consider - how can we enable the
early triggering case without adding complexity.
The simplest option is a config that toggles between the three termination
behaviors (trigger on-time/immediately/cancel).
I believe that we don't really have a viable 4th scenario.

Re: Dawid's onEOF API - the main usecase of such an API is to stop timers
from scheduling follow on timers by setting a flag / triggering a
condition-variable.
Extending the OnTimerContext
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/ProcessFunction.OnTimerContext.html>
with a new flag - isEOF serves most of that purpose without a new API and
vastly reduces the race conditions / thread safety problems (only 2 fns
instead of 3).
For this reason, I would suggest avoiding the onEOF part of Dawid's
proposal.

The last portion of the discussion is whether we should have the
TimerTerminationAction be part of the API or just a config.
I would propose that this be a config driven feature rather than an API
driven feature, either jobs are going to wait for the longest timer to fire
or they do not want to do that.
There is no real middle ground. Having a special case where some timers
fire early and others don't isn't really helping (because if we're
composing jobs with operators written in different teams,
the runtime behavior is being conflated with the business logic).
For this reason - Yun - would you be amenable to modifying the FLIP so that
it's runtime config driven only and has no API changes to the TimerService?

Best,
Divye


On Tue, Nov 15, 2022 at 7:06 AM Lincoln Lee<lincoln.8...@gmail.com>  wrote:

Hi all,

Sorry for the late jump in this thread.

I agree with dawid that we should discuss api changes from the perspective
of proper endOfInput semantics, and I understand the motivation for this
api change and that it should make sense for potential user scenarios.

As a table/sql layer developer, I would also like to share some thoughts
and inputs(please correct me if my understanding is wrong):

I'd like to start with a question: Why are people not complaining in
eventtime mode?  Everyone seems to accept the behavior that in eventtime
mode, when bounded source ends, the system issues a LONG.MAX watermark from
source, and then all eventtime timers in downstream operators are in fact
triggered once, like a window operator, even though it seems window is not
finished (events does not fill the expected window size), but it is
finished (including the sql layer operators, which also force the window to
close and outputs an 'incomplete' result)

Secondly, what exactly does endofinput mean for a bounded source?  I think
it represents the permanent end of the source, without any chance of
continuation in the future.

If we can agree on this clear semantics, then let's see if there is a
fundamental difference between `proctime` vs `eventime`?
I think proctime should be simpler (from the sql perspective, `proctime`
has no stricter semantics than `eventime`)

So what I'm trying to say is that if it's acceptable for everyone to
trigger all untriggered eventtime timers directly when endOfInput in
eventtime mode, why don't we keep the same behavior in `proctime` by
default?

Finally, we can discuss in which user scenarios this default system
behavior may not be satisfied and needs to be extended, which I think may
push this discussion more smoothly.

Some additional input, current dependencies of sql layer operators on
proctime timer:
The three types of operators I have seen so far are essentially the window
type, include interval join, over window, and group window, which do
nothing in the close phase for current implementation(and do not implement
the finish method), and the computation only relies on the watermark
trigger. If the underlying processing of watermark is unified on
`eventtime` and `proctime`, then the sql layer operators will also benefit
and maintain consistent behaviors.

Best,
Lincoln Lee


Dong Lin<lindon...@gmail.com>  于2022年11月15日周二 17:48写道:

Thank you Yun for the detailed explanation!

Since this FLIP can add quite some complexity to Flink, it will be really
useful to understand the concrete case-case for the proposed changes so
that we can identify the approach with minimum extra complexity. We can
probably continue the discussion after the FLIP is updated with
the use-cases.

Please see my comments inline.

On Tue, Nov 15, 2022 at 4:18 PM Yun Gao<yungao...@aliyun.com.invalid>
wrote:

Hi Dong,
Very thanks for the discussion!
It appears that the issues mentioned in the motivation section
are all related to using Windows on the DataStream API, where
the user's code typically does not have anything to do with Timer.
IMO it might not only bounded to the case of window operators. For
examples, users might implements some complex aggregation logic
with ProcessFunction directly. In this case, users might also want to
control how these times are dealt at the end of the stream.



IMO, the main benefit of this alternative solution is that it is more
consistent with the existing Windows API. Users who are concerned
with firing windows on end-of-stream won't need to additionally
understand/handle timer.
First to summary the problem, currently it could be divided into two
layers:
1. In the lower layer we need to support different actions to deal with
the timers at the end of the stream (though in fact we need to deduct
whether we need this ability from the api, but for simplicity I'll
first
describe
this layer since the divergency happen in the higher level).
2. How we let users to specify the actions at the end of the timer?
Currently
we have different options on this layer.
  - The first option is to have a unified SingleOperatorStream#
  setTimerTerminationAction.
  - The second option is to have a specialized trigger for the window.
With whichever interface, in the window operator need to set proper
termination actions according to the specified semantics when
registering
timers.
On the other side, specially to the WindowOperator, the interface might
not
only related to the timers, there are also window types, e.g.
CountWindow,
  that also need to specify the behavior at the end of stream.
Therefore, for window operators it looks to me it would be indeed more
friendly
to users to have a uniform API. Since different operators might have
different
situations, I wonder it would be better if we first:
1. The operator authors could still set the default actions when
registering timers.
2. Each operator consider its API distinctly.
  - Window operator provides a uniform API.
  - Except for window, Currently it looks to me that users could
register
customized
  timers only with the family of ProcessFunctions. Users could still set
actions for
  each timer, and we may first only provide a method for ProcessOperator
to
change
  the per-timer actions uniformly when building the DAG?
we need the task thread to be blocked until the timer gets triggered
on
the registered time
point.
Currently I do not have real-life scenarios, but some authenticated
cases
are
- Users want the job stopped at the boundary of windows when stopping
the
job with savepoint --drain.

Hmm... I guess you mean the processing time window in this scenario. It
is
not clear to me why users would want to block waiting for wallclock time
to
pass instead of stopping the job immediately..

- Users have timers to emit message to external systems periodically, and
users want to have one finalize
message at the end of stream.

IMO, an alternative solution for this use-case is to allow users to
specify
what to do at the end of the input, rather than specifying what to do
with
timers at the end of time.


But I also think we could add more actions step-by-step.
I might have missed use-cases for this FLIP which do not involve
windows.
If so, could you help explain the use-case in this FLIP?
I'll complete the scenarios in the FLIP.

Great! I am looking forward to understanding more about the use-cases.


Best,
Yun Gao
------------------------------------------------------------------
From:Dong Lin<lindon...@gmail.com>
Send Time:2022 Nov. 10 (Thu.) 11:43
To:dev<dev@flink.apache.org>
Cc:Maximilian Michels<m...@apache.org>
Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
Timers on Job Termination
Hi Piotr,
I also think the scenario mentioned in this FLIP is useful to address.
I
am
happy to discuss this together and figure out the more usable APIs.
I guess the choice of API pretty much depends on when users need to use
it.
I am assuming it is needed when using dataStream.window(...). Is there
any
other case that needs this feature?
It is mentioned in FLINK-18647
<https://issues.apache.org/jira/browse/FLINK-18647>  < 
https://issues.apache.org/jira/browse/FLINK-18647>  > that we need the
task
thread to be blocked until the timer gets triggered on the registered
time
point. The JIRA and the FLIP do not seem to provide the use-case for
this
feature. Could you explain more about the use-cases that might need
this
feature?
What do you think of the alternative API vs. the approach proposed in
the
FLIP? Maybe we can continue the discussion by detailing the pros/cons.
Best,
Dong
On Wed, Nov 9, 2022 at 4:35 PM Piotr Nowojski<pnowoj...@apache.org>
wrote:
Hi all,

Big thanks to Yun Gao for driving this!

I wonder whether we need to add a new option when registering
timers.
Won't
it be sufficient to flush all pending timers on termination but not
allow
new ones to be registered?
Maximilian, I'm sure that single semantics is not enough. All three
that
are proposed here (cancel, wait, trigger immediately) were requested
by
users.

Dong, as I initially wrote in the above-mentioned ticket [1] I'm
personally
open to discussions about the final shape of the API.

Best,
Piotrek

[1]https://issues.apache.org/jira/browse/FLINK-18647  <
https://issues.apache.org/jira/browse/FLINK-18647  >
wt., 8 lis 2022 o 03:42 Yun Gao<yungao...@aliyun.com.invalid>
napisał(a):
Hi Maximilian,

Thanks for the discussion! It seems there are still other kinds of
scenarios
that could not be flushed, like scenarios like "emit record X if
record Y
hasn't
arrived within 30 seconds after record Z" or "fails the job if the
external system
does not response in 30 seconds", these timers seems should be
dropped
instead of
triggering. Thus we think it would be necessary to provide
per-timer
configuration.

Best,
Yun Gao




------------------Original Mail ------------------
Sender:Maximilian Michels<m...@apache.org>
Send Date:Fri Nov 4 21:35:58 2022
Recipients:Flink Dev<dev@flink.apache.org>, Yun Gao <
yungao...@aliyun.com
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing
Timers
on
Job Termination
Hey Yun,

I wonder whether we need to add a new option when registering
timers.
Won't
it be sufficient to flush all pending timers on termination but not
allow
new ones to be registered?

-Max

On Wed, Nov 2, 2022 at 11:20 AM Yun Gao
<yungao...@aliyun.com.invalid
wrote:

Hi everyone,
I would like to open a discussion[1] on how to
properly handle the processing timers on job
termination.
Currently all the processing timers would be
ignored on job termination. This behavior is
not suitable for some cases like WindowOperator.
Thus we'd like to provide more options for how
to deal with the pending times on job termination,
and provide correct semantics on bounded stream
for these scenarios. The FLIP is based on the previous
discussion with Piotr and Divye in [2].
[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
<

https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
<

https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
<

https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
[2]https://issues.apache.org/jira/browse/FLINK-18647  <
https://issues.apache.org/jira/browse/FLINK-18647  > <
https://issues.apache.org/jira/browse/FLINK-18647  <
https://issues.apache.org/jira/browse/FLINK-18647  > >

Attachment: OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to