Hi all,

Sorry for the late jump in this thread.

I agree with dawid that we should discuss api changes from the perspective
of proper endOfInput semantics, and I understand the motivation for this
api change and that it should make sense for potential user scenarios.

As a table/sql layer developer, I would also like to share some thoughts
and inputs(please correct me if my understanding is wrong):

I'd like to start with a question: Why are people not complaining in
eventtime mode?  Everyone seems to accept the behavior that in eventtime
mode, when bounded source ends, the system issues a LONG.MAX watermark from
source, and then all eventtime timers in downstream operators are in fact
triggered once, like a window operator, even though it seems window is not
finished (events does not fill the expected window size), but it is
finished (including the sql layer operators, which also force the window to
close and outputs an 'incomplete' result)

Secondly, what exactly does endofinput mean for a bounded source?  I think
it represents the permanent end of the source, without any chance of
continuation in the future.

If we can agree on this clear semantics, then let's see if there is a
fundamental difference between `proctime` vs `eventime`?
I think proctime should be simpler (from the sql perspective, `proctime`
has no stricter semantics than `eventime`)

So what I'm trying to say is that if it's acceptable for everyone to
trigger all untriggered eventtime timers directly when endOfInput in
eventtime mode, why don't we keep the same behavior in `proctime` by
default?

Finally, we can discuss in which user scenarios this default system
behavior may not be satisfied and needs to be extended, which I think may
push this discussion more smoothly.

Some additional input, current dependencies of sql layer operators on
proctime timer:
The three types of operators I have seen so far are essentially the window
type, include interval join, over window, and group window, which do
nothing in the close phase for current implementation(and do not implement
the finish method), and the computation only relies on the watermark
trigger. If the underlying processing of watermark is unified on
`eventtime` and `proctime`, then the sql layer operators will also benefit
and maintain consistent behaviors.

Best,
Lincoln Lee


Dong Lin <lindon...@gmail.com> 于2022年11月15日周二 17:48写道:

> Thank you Yun for the detailed explanation!
>
> Since this FLIP can add quite some complexity to Flink, it will be really
> useful to understand the concrete case-case for the proposed changes so
> that we can identify the approach with minimum extra complexity. We can
> probably continue the discussion after the FLIP is updated with
> the use-cases.
>
> Please see my comments inline.
>
> On Tue, Nov 15, 2022 at 4:18 PM Yun Gao <yungao...@aliyun.com.invalid>
> wrote:
>
> > Hi Dong,
> > Very thanks for the discussion!
> > > It appears that the issues mentioned in the motivation section
> > > are all related to using Windows on the DataStream API, where
> > > the user's code typically does not have anything to do with Timer.
> > IMO it might not only bounded to the case of window operators. For
> > examples, users might implements some complex aggregation logic
> > with ProcessFunction directly. In this case, users might also want to
> > control how these times are dealt at the end of the stream.
>
>
>
>
> > > IMO, the main benefit of this alternative solution is that it is more
> > > consistent with the existing Windows API. Users who are concerned
> > > with firing windows on end-of-stream won't need to additionally
> > > understand/handle timer.
> > First to summary the problem, currently it could be divided into two
> > layers:
> > 1. In the lower layer we need to support different actions to deal with
> > the timers at the end of the stream (though in fact we need to deduct
> > whether we need this ability from the api, but for simplicity I'll first
> > describe
> > this layer since the divergency happen in the higher level).
> > 2. How we let users to specify the actions at the end of the timer?
> > Currently
> > we have different options on this layer.
> >  - The first option is to have a unified SingleOperatorStream#
> >  setTimerTerminationAction.
> >  - The second option is to have a specialized trigger for the window.
> > With whichever interface, in the window operator need to set proper
> > termination actions according to the specified semantics when registering
> > timers.
> > On the other side, specially to the WindowOperator, the interface might
> > not
> > only related to the timers, there are also window types, e.g.
> CountWindow,
> >  that also need to specify the behavior at the end of stream.
> > Therefore, for window operators it looks to me it would be indeed more
> > friendly
> > to users to have a uniform API. Since different operators might have
> > different
> > situations, I wonder it would be better if we first:
> > 1. The operator authors could still set the default actions when
> > registering timers.
> > 2. Each operator consider its API distinctly.
> >  - Window operator provides a uniform API.
> >  - Except for window, Currently it looks to me that users could register
> > customized
> >  timers only with the family of ProcessFunctions. Users could still set
> > actions for
> >  each timer, and we may first only provide a method for ProcessOperator
> to
> > change
> >  the per-timer actions uniformly when building the DAG?
> > > we need the task thread to be blocked until the timer gets triggered on
> > the registered time
> > > point.
> > Currently I do not have real-life scenarios, but some authenticated cases
> > are
> > - Users want the job stopped at the boundary of windows when stopping the
> > job with savepoint --drain.
> >
>
> Hmm... I guess you mean the processing time window in this scenario. It is
> not clear to me why users would want to block waiting for wallclock time to
> pass instead of stopping the job immediately..
>
> - Users have timers to emit message to external systems periodically, and
> > users want to have one finalize
> > message at the end of stream.
>
>
> IMO, an alternative solution for this use-case is to allow users to specify
> what to do at the end of the input, rather than specifying what to do with
> timers at the end of time.
>
>
> > But I also think we could add more actions step-by-step.
> > > I might have missed use-cases for this FLIP which do not involve
> > windows.
> > > If so, could you help explain the use-case in this FLIP?
> > I'll complete the scenarios in the FLIP.
> >
>
> Great! I am looking forward to understanding more about the use-cases.
>
>
> > Best,
> > Yun Gao
> > ------------------------------------------------------------------
> > From:Dong Lin <lindon...@gmail.com>
> > Send Time:2022 Nov. 10 (Thu.) 11:43
> > To:dev <dev@flink.apache.org>
> > Cc:Maximilian Michels <m...@apache.org>
> > Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
> > Timers on Job Termination
> > Hi Piotr,
> > I also think the scenario mentioned in this FLIP is useful to address. I
> am
> > happy to discuss this together and figure out the more usable APIs.
> > I guess the choice of API pretty much depends on when users need to use
> it.
> > I am assuming it is needed when using dataStream.window(...). Is there
> any
> > other case that needs this feature?
> > It is mentioned in FLINK-18647
> > <https://issues.apache.org/jira/browse/FLINK-18647> <
> > https://issues.apache.org/jira/browse/FLINK-18647> > that we need the
> task
> > thread to be blocked until the timer gets triggered on the registered
> time
> > point. The JIRA and the FLIP do not seem to provide the use-case for this
> > feature. Could you explain more about the use-cases that might need this
> > feature?
> > What do you think of the alternative API vs. the approach proposed in the
> > FLIP? Maybe we can continue the discussion by detailing the pros/cons.
> > Best,
> > Dong
> > On Wed, Nov 9, 2022 at 4:35 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> > > Hi all,
> > >
> > > Big thanks to Yun Gao for driving this!
> > >
> > > > I wonder whether we need to add a new option when registering timers.
> > > Won't
> > > > it be sufficient to flush all pending timers on termination but not
> > allow
> > > > new ones to be registered?
> > >
> > > Maximilian, I'm sure that single semantics is not enough. All three
> that
> > > are proposed here (cancel, wait, trigger immediately) were requested by
> > > users.
> > >
> > > Dong, as I initially wrote in the above-mentioned ticket [1] I'm
> > personally
> > > open to discussions about the final shape of the API.
> > >
> > > Best,
> > > Piotrek
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18647 <
> > https://issues.apache.org/jira/browse/FLINK-18647 >
> > >
> > > wt., 8 lis 2022 o 03:42 Yun Gao <yungao...@aliyun.com.invalid>
> > napisał(a):
> > >
> > > > Hi Maximilian,
> > > >
> > > > Thanks for the discussion! It seems there are still other kinds of
> > > > scenarios
> > > > that could not be flushed, like scenarios like "emit record X if
> > record Y
> > > > hasn't
> > > > arrived within 30 seconds after record Z" or "fails the job if the
> > > > external system
> > > > does not response in 30 seconds", these timers seems should be
> dropped
> > > > instead of
> > > > triggering. Thus we think it would be necessary to provide per-timer
> > > > configuration.
> > > >
> > > > Best,
> > > > Yun Gao
> > > >
> > > >
> > > >
> > > >
> > > > ------------------Original Mail ------------------
> > > > Sender:Maximilian Michels <m...@apache.org>
> > > > Send Date:Fri Nov 4 21:35:58 2022
> > > > Recipients:Flink Dev <dev@flink.apache.org>, Yun Gao <
> > > yungao...@aliyun.com
> > > > >
> > > > Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing
> Timers
> > > on
> > > > Job Termination
> > > > Hey Yun,
> > > >
> > > > I wonder whether we need to add a new option when registering timers.
> > > Won't
> > > > it be sufficient to flush all pending timers on termination but not
> > allow
> > > > new ones to be registered?
> > > >
> > > > -Max
> > > >
> > > > On Wed, Nov 2, 2022 at 11:20 AM Yun Gao <yungao...@aliyun.com.invalid
> >
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > > I would like to open a discussion[1] on how to
> > > > > properly handle the processing timers on job
> > > > > termination.
> > > > > Currently all the processing timers would be
> > > > > ignored on job termination. This behavior is
> > > > > not suitable for some cases like WindowOperator.
> > > > > Thus we'd like to provide more options for how
> > > > > to deal with the pending times on job termination,
> > > > > and provide correct semantics on bounded stream
> > > > > for these scenarios. The FLIP is based on the previous
> > > > > discussion with Piotr and Divye in [2].
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
> > >
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-269%3A+Properly+Handling+the+Processing+Timers+on+Job+Termination
> > >
> > > > > >
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-18647 <
> > https://issues.apache.org/jira/browse/FLINK-18647 > <
> > > > > https://issues.apache.org/jira/browse/FLINK-18647 <
> > https://issues.apache.org/jira/browse/FLINK-18647 > >
> > > > >
> > > >
> > >
> >
>

Reply via email to