Ad. 1

I'd start with ProcessingTimerService as that's the only public interface. It is exposed in the Sink V2 interface. In this scenario it would be the Sink interface that need to extend from a EOFTimersHandler. I believe it would be hard to pass it from there to the ProcessingTimeService as it is passed from the outside e.g. in the ProcessingTimeServiceAware. For that reason I'd go with a registration method in that interface.

In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can extend from EOFTimersHandler. I'd do that because ProcessFunction does not have an init/open method where we could register the handler.

On operator level I'd have a registration method in InternalTimerService. I believe that's the only way to handle the above ProcessFunction aproach. E.g. in KeyedProcessOperator you need to check if the UDF extend from the interface not the operator itself.

Ad. 2

I'd go with

*(Keyed)ProcessFunction:*

interface EOFTimersHandler {

 void handleProcessingTimer(long timestamp, Context);

}

interface Context {
        public abstract <X> void output(OutputTag<X> outputTag, X value);

        public abstract K getCurrentKey();

// we can extend it for waitFor later

}

*ProcessingTimeService: *

interface EOFTimersHandler {

 void handleProcessingTimer(long timestamp, Context);

}

interface Context {

// we can extend it for waitFor later

}

*InternalTimeService:*

interface EOFTimersHandler {

 void handleProcessingTimer(InternalTimer<K,N> timer Context);

}

interface Context {

// we can extend it for waitFor later

}

Personally I'd not try to unify those places too much. They have also different visibilities (public/internal), have access to different set of metadata (key/namespace).


Ad 3.

I don't like the having the trigger/cancel methods, because:

1. I don't like the back and forth between system and UDF

2. Yes, the biggest issue I have is with the possibility with registering new timers. I am trying to be on the safe side here. I don't like the idea of dropping them, because it is again making assumptions what users do with those timers. What if they e.g. emit counter if it reached certain threshold? We'd need an additional flag in the method that is the final timer. My sentiment is that we're making it questionably easier to trigger a timer for the cost of openning up for unforeseen problems with follow up registration.

Best,

Dawid

On 30/11/2022 12:13, Yun Gao wrote:
Hi Dawid, Piotr
Very thanks for the discussion!
As a whole I think we are already consistent with the callback option, and I 
don't
think I opposed that we could modify the current internal implementation. But 
from
my side it is still not clear what the actual interfaces are proposing. Let me 
first try
to summarize that a bit:
1) Which object does the handlers register on?
It seems there are two options, one is to timer services (InternalTimerService
/ ProcessingTimerService or some equivalent things after refactoring), the other
one is as a lifecycle of the operator. I'm now tending to the latter one, how do
you think on this part?
2) What is the interface of the handler?
Option 1 is that
interface SomeHandlerName {
  void processingTimer(Timer timer);
}
class Timer {
  long getTimestamp();
  void trigger();
  void cancel();
  // Other actions if required.
}
But it seems there is controversy on whether to add actions to the timer class.
If without that, with my understanding the interfaces of the Option 2 are
interface SomeHandlerName {
  void processTimer(Timer timer);
}
interface KeyedSomeHandlerName<KEY, NAMESPACE> {
  void processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx);
}
class Timer {
  long getTimestamp();
}
class KeyedTimer<KEY, NAMESPACE> extends Timer {
  KEY getKey();
  NAMESPACE getNamespace();
}
void Context {
void executeAtScheduledTime(Consumer<timer> handler);
}
As Piotr has pointed out, if we could eliminate the logic of namespace, we 
could then
remove the namespace related type parameter and method from the interfaces.
Do I understand right?
Besides, I'm still fully got the reason that why we should not add the actions 
to the
timer class, in consideration that it seems in most cases users could implement 
their
logical with simply calling timer.trigger() (I think the repeat registration is 
indeed a
problem, but I think we could ignore the timers registered during termination).
Could you further enlighten me a bit on this part?
Best,
Yun Gao
------------------------------------------------------------------
From:Piotr Nowojski<pnowoj...@apache.org>
Send Time:2022 Nov. 30 (Wed.) 17:10
To:dev<dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job 
Termination
Hi,
I have a couple of remarks.
First a general one. For me the important part in the design of this API is
how to expose this to Flink users in public interfaces. Namely
ProcessFunction and StreamOperator. InternalTimerService is an internal
class, so we can change it and break it as needed in the future.
For registering a handler like proposed by Dawid:
interface SomeHandlerName {
  void onTimer(/* whatever type it is */ timer, Context ctx ) {
  }
}
makes sense to me. For the InternalTimerService I think it doesn't matter
too much what we do. We could provide a similar interface as for the
ProcessFunction/StreamOperator, it doesn't have to be the same one. On the
contrary, I think it shouldn't be the same, as part of this effort we
shouldn't be exposing the concept of `Namespaces` to the public facing API.
Re the "waitFor". Theoretically I see arguments why users might want to use
this, but I'm also not convinced whether that's necessary in practice. I
would be +1 either way. First version can be without this functionality and
we can add it later (given that we designed a good place to add it in the
future, like the `Context` proposed by Dawid). But I'm also fine adding it
now if others are insisting.
Best,
Piotrek
śr., 30 lis 2022 o 09:18 Dawid Wysakowicz<dwysakow...@apache.org>
napisał(a):
WindowOperator is not implemented by users. I can see that for
InternalTimerService we'll need

interface PendingTimerProcessor<KEY, NAMESPACE> {
void onTimer(InternalTimer<KEY, NAMESPACE> timer) {
doHandleTimer(timer);
}

I don't see a problem with that.

As you said ProcessingTimeService is a user facing interface and
completely unrelated to the InternalTimerService. I don't see a reason
why we'd need to unify those.

As for the waitFor behaviour. Personally, I have not been convinced it
is necessary. Maybe it's just my lack of vision, but I can't think of a
scenario where I'd use it. Still if we need it, I'd go for something like:

void onTimer(/* whatever type it is */ timer, Context ctx ) {

}

interface Context {
void executeAtScheduledTime(Consumer<timer> handler);
}


That way you have independent simple interfaces that need to work only
in a single well defined scenario and you don't need to match an
interface to multiple different cases.

Best,
Dawid

On 30/11/2022 07:27, Yun Gao wrote:
Hi Dawid,
Thanks for the comments!
As a whole I'm also open to the API and I also prefer to use simple
but flexible interfaces, but it still looks there are some problem to
just let users to implement the termination actions.
Let's take the WindowOperator as an example. As seen in [1],
in the timer processing logic it needs to acquire the key / namespace
information bound to the timer (which is only supported by the
InternalTimerService).
Thus if we want users to implement the same logic on termination, we
either let users
to trigger the timer handler directly or we also allows users to access
these piece of
information. If we go with the later direction, we might need to provide
interfaces like
interface PendingTimerProcessor<KEY, NAMESPACE> {
void onTimer(Timer<KEY, NAMESPACE> timer) {
doHandleTimer(timer);
}
}
class Timer<KEY, NAMESPACE> {
long getTimestamp();
KEY getKey();
NAMESPACE getNamespace();
}
Then we'll have the issue that since we need the interface to handle
both of cases of
InternalTimerSerivce and raw ProcessTimeService, the later do not have
key and
namespace information attached, and its also be a bit inconsistency for
users to have to set
the KEY and NAMESPACE types.
Besides, it looks to me that if we want to implement behaviors like
waiting for, it might
be not simply reuse the time handler time, then it requires every
operator authors to
re-implement such waiting logics.
Moreover it still have the downside that if you call back to the
`onTimer` method after
`trigger` you have access to the Context which lets you register new
timers.
I think we could simply drop the timers registered during we start
processing the pending timers
on termination. Logically there should be no new data after termination.
I think I am not convinced to these arguments. First of all I'm afraid
there is no clear distinction
in that area what is runtime and what is not. I always found
`AbstracStreamOperator(*)` actually part
of runtime or Flink's internals and thus I don't find
`InternalTimerService` a utility, but a vital part
of the system. Let's be honest it is impossible to implement an
operator without extending from
`AbstractStreamOperator*`.What would be the problem with having a
proper implementation in
`InternalTimerService`? Can't we do it like this?:
I think the original paragraph is only explanation to that the interface
is harder to support if we
allows the users to implement the arbitrary logic. But since now we are
at the page with the callback
option, users could always be allowed to implement arbitrary logic no
matter we support timer.trigger()
or not, thus I think now there is no divergence on this point. I also
believe in we'll finally have some logic
similar to the proposed one that drain all the times and process it.
Best,
Yun Gao
[1]
https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 >
<
https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 >
------------------------------------------------------------------
From:Dawid Wysakowicz<dwysakow...@apache.org>
Send Time:2022 Nov. 28 (Mon.) 23:33
To:dev<dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
on Job Termination
Do we really need to have separate methods for
triggering/waiting/cancelling. To me it sounds rather counterintuitive. Why
can't users just execute whatever they want in the handler itself instead
of additional back and forth with the system? Moreover it still have the
downside that if you call back to the `onTimer` method after `trigger` you
have access to the Context which lets you register new timers.
I find following approach much simpler:
void onTimer(...) {
doHandleTimer(timestamp);
}
void processPendingTimer(...) {
// trigger
doHandleTimer(timestamp);
// for cancel, simply do nothing...
}
Sorry I might not make it very clear here. I think the difficulty with
supported setting the currentKey is a special issue for the callback
options (no matter what the interface is) since it allows users to execute
logic other than the one registered with the timers. The complexity comes
from that currently we have two level of TimerServices: The
ProcessingTimerService (there is no key) and InternalTimerService (with
key). Currently only ProcessingTimerService is exposed to the runtime and
InternalTimerService is much more a utility to implement the operator. Then
with the current code, the runtime could only access to
ProcessingTimerService on termination.
I think I am not convinced to these arguments. First of all I'm afraid
there is no clear distinction in that area what is runtime and what is not.
I always found `AbstracStreamOperator(*)` actually part of runtime or
Flink's internals and thus I don't find `InternalTimerService` a utility,
but a vital part of the system. Let's be honest it is impossible to
implement an operator without extending from `AbstractStreamOperator*`.
What would be the problem with having a proper implementation in
`InternalTimerService`? Can't we do it like this?:
AbstractStreamOperator#finish() {
internalTimerService.finish();
}
InternalTimerService#finish() {
while ((timer = processingTimeTimersQueue.peek()) != null) {
keyContext.setCurrentKey(timer.getKey());
processingTimeTimersQueue.poll();
onEndOfInputHandler.processPendingTimer(timer);
}
}
If we only executes some predefined actions, we do not need to worry
about the implementation of InternalTimerService and just execute the
registered timers. But if we allow users to execute arbitrary logic, we
need to be also aware of the InternalTimerServices and parse the key from
the timers stored in it. I think we should always have method to overcome
this issue, but to support the callback options would be more complex.
I am not sure, having "predefined actions" would be good enough that we
do not need to set a key. As a user I'd anyhow expect the proper key to be
set in processPendingTimer.
Best,
Dawid
On 24/11/2022 08:51, Yun Gao wrote:
Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we
have reached the consensus on the high-level API: Most operators should
usually have only one reasonable action to the pending timers on
termination, thus we could let the operators to implement its own actions
with the low-level interface provided. The only exception is the
ProcessFunction, with which users might register customized timers, thus
users might also defines the actions on termination (If I have
misunderstandings here, please correct me). For the low-level API, I could
get the benefits with the callback options: since in most cases an operator
has only one action to all the timers, its a waste for us to store the same
flag for all the timers, also with a lot of code / state format changes.
But since it is enough for most users to simply trigger / cacnel the
timers, it would be redundant for users to implement the logic twice. Thus
perhaps we might combine the benefits of the two options: We might have a
separate interface public interface TimerHandlersOnTermination { void
processPendingTimer(Timer timer, long currentTime); } public class Timer {
long getRegisteredTimestamp(); void trigger(); void waitFor(); void
cancel(); } Then if an operator have implemented the
TimerHandlersOnTermination interface, on termination we could call
processPendingTimer(xx) for every pending timers. Users might simply
trigger / waitFor / cancel it, or execute some other logics if needed. Then
for the ProcessFunction we might have a similar interface to
processPendingTimer, except we might need to provide Context / Collector to
the ProcessFunction. Do you think this would be a good direction? Also
@Piotr I don't see a problem here. Interface doesn't have to reflect that,
only the runtime must set the correct key context before executing the
handler dealing with the processing time timers at the end of input/time.
Sorry I might not make it very clear here. I think the difficulty with
supported setting the currentKey is a special issue for the callback
options (no matter what the interface is) since it allows users to execute
logic other than the one registered with the timers. The complexity comes
from that currently we have two level of TimerServices: The
ProcessingTimerService (there is no key) and InternalTimerService (with
key). Currently only ProcessingTimerService is exposed to the runtime and
InternalTimerService is much more a utility to implement the operator. Then
with the current code, the runtime could only access to
ProcessingTimerService on termination. If we only executes some predefined
actions, we do not need to worry about the implementation of
InternalTimerService and just execute the registered timers. But if we
allow users to execute arbitrary logic, we need to be also aware of the
InternalTimerServices and parse the key from the timers stored in it. I
think we should always have method to overcome this issue, but to support
the callback options would be more complex. Best, Yun Gao
------------------------------------------------------------------
From:Divye Kapoor<dkap...@pinterest.com.INVALID>  <mailto: 
dkap...@pinterest.com.INVALID >  Send Time:2022 Nov. 24 (Thu.) 08:50
To:dev<dev@flink.apache.org>  <mailto:dev@flink.apache.org >  Cc:Xenon
Development Team<xenon-...@pinterest.com>  <mailto:xenon-...@pinterest.com
Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
Timers on Job Termination Sounds good. Looks like we're on the same page.
Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski <
pnowoj...@apache.org> <mailto:pnowoj...@apache.org >  wrote: Hi Divye I
think we are mostly on the same page. Just to clarify/rephrase: One thing
to think about - on EOF “trigger immediately” will mean that the
asynchronous wait timeout timers will also fire - which is undesirable I
didn't mean to fire all timers immediately in all of the built-in
operators. Just that each built-in operator can have a hard coded way
(without a way for users to change it) to handle those timers. Windowed
operators would trigger the lingering timers (flush outputs),
AsyncWaitOperator could just ignore them. The same way users could register
EOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we
(as flink developers) could use the same mechanism to implement any
behaviour we want for the built-in operators. There should be no need to
add any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 Divye
Kapoor<dkap...@pinterest.com.invalid>  <mailto: dkap...@pinterest.com.invalid > 
 napisał(a): Thanks Yun/Piotrek, Some
brief comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski
<pnowoj...@apache.org>  <mailto:pnowoj...@apache.org >  wrote: Hi, All in
all I would agree with Dawid's proposal. +1 We can add the flexibility of
how to deal with the timers in the low level API via adding a handler - if
someone needs to customize it, he will always have a workaround. Note after
giving it more thought, I agree that registering some handlers is better
than overloading the register timer method and modifying the timer's state.
+1. At the same time, we can force the most sensible semantic that we think
for the couple of built-in operators, which should be pretty
straightforward (either ignore the timers, or fire them at once). I agree
there might be some edge cases, that theoretically user might want to wait
for the timer to fire naturally, but: 1. I'm not sure how common in
practice this will be. If not at all, then why should we be complicating
the API/system? That’s fair. However, the specifics are very important
here. One thing to think about - on EOF “trigger immediately” will mean
that the asynchronous wait timeout timers will also fire - which is
undesirable (because they are racing with the last async call). However,
the issue is cleanly resolved by waiting for the timer to be canceled when
the last event is processed. (“Wait for” case). Ignoring the timer has the
least justification. Registering the handler as per Dawid’s proposal and
having that handler unregister the timers on EOF makes best sense. This
solution also unifies the trigger immediately case as that handler can
reregister the timers for early termination. The proposal: 1. Operator
receives EOF 2. EOF timer handler triggers 3. EOF handler adjusts the
registered timers for early trigger or ignore. If wait-for behavior is
desired, timers are not changed. This is controlled in client code. 4.
Operator waits for all timers to drain/trigger. (“Always”). There is no
special handling for ignore/early trigger. 5. Operator allows job to
proceed with shutdown. The only api change needed is an EOF handler. The
other agreement we need is that “Wait for” is the desired behavior in
processing time and that processing time is fundamentally different from
event time in this respect. (I have changed my thinking since the last
mail). 2. We can always expand the API in the future, and let the user
override the default built-in behaviour of the operators via some setter on
the stream transformation (`SingleOutputStreamOperator`), or via some
custom API DSL style in each of the operators separately. This is not
required. See above. Re forcing the same semantics for processing time
timers as for event time ones - this is tempting, but indeed I see a
possibility that users need to adhere to some external constraints when
using processing time. +1. As above, we should consider the 2 cases
fundamentally different in this area. Re: Yun - b) Another issue is that
what if users use timers with different termination actions in the same
operator / UDF? For example, users use some kind of timeout (like throws
exception if some thing not happen after some other thing), and also some
kind of window aggregation logic. In this case, without additional tags,
users might not be able to distinguish which timer should be canceled and
which time should be triggered ? as above. The EOF handler makes the
choice. 4. How could these scenarios adjust their APIs ? From the current
listed scenarios, I'm more tend to that as @Dawid pointed out, there might
be only one expected behavior for each scenario, thus it does not seems to
need to allow users to adjust the behavior. Thus @Divye may I have a double
confirmation currently do we have explicit scenarios that is expected to
change the different behaviors for the same scenario? Wait-for behavior is
probably the only expected behavior and any alterations should be from the
EOF handler managing the registered timers. Besides @Divye from the listed
scenarios, I have another concern for global configuration is that for one
job, different operators seems to still might have different expected
behaviors. For example, A job using both Window operator and
AsyncWaitOperator might have different requirements for timers on
termination? Thank you for raising this case. This changed my thinking.
Based on your point, we should try and align on the “Wait-for” with EOF
handler proposal. I’m withdrawing the “single-runtime-config” proposal.
Best, Divye

Attachment: OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to