Sounds good to me as well!

Best,
Piotrek

czw., 8 gru 2022 o 09:53 Dawid Wysakowicz <dwysakow...@apache.org>
napisał(a):

> Sounds like a good plan to me.
> On 08/12/2022 08:58, Yun Gao wrote:
>
> Hi Dawid,
>
> Very thanks for the discussion and sorry for the delayed response
> since I was hesitated on some points.
>
> But as a whole, with some more thought, first I agree with that adding
> the trigger() / cancle() methods to some kind of timer object is not
> necessary
> for us to achieve the exactly-once for the operators. We could follow the
> direction of "modifying the implementation of the operators" to achieve the
> same target.
>
> But continue to think with this direction, it now looks to me it is also
> not
> needed to add the callback to the timer services:
> 1. For InternalTimerService, the operators could just call
> `InternalTimerService
> #forEachProcessingTimer()` on finish to handle the pending timers.
> 2. For the timers registered to the underlying ProcessingTimerService, at
> least in
> the currently listed scenarios, the operators itself knows what is the
> remaining work
> (e.g., the FileWriter knows if it has in-progress file to flush).
>
> Operators could handle the remaining timers in finish() method.
>
> Then the only interface we need to consider is that added to the
> ProcessFunction. The
> current interface also looks ok to me.
>
> If you think the above option works, I could first have a PoC that
> demonstrate it is sufficient
> to only modify the operator implementation to handling the remaining
> workers properly on
> finish(). If there are new issues I'll post here and we could have some
> more discussion.
>
> Best,
> Yun Gao
>
>
> ------------------Original Mail ------------------
> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
> <dwysakow...@apache.org>
> *Send Date:*Fri Dec 2 21:21:25 2022
> *Recipients:*Dev <dev@flink.apache.org> <dev@flink.apache.org>
> *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
> on Job Termination
>
>> Ad. 1
>>
>> I'd start with ProcessingTimerService as that's the only public
>> interface. It is exposed in the Sink V2 interface. In this scenario it
>> would be the Sink interface that need to extend from a EOFTimersHandler. I
>> believe it would be hard to pass it from there to the ProcessingTimeService
>> as it is passed from the outside e.g. in the ProcessingTimeServiceAware.
>> For that reason I'd go with a registration method in that interface.
>>
>> In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can
>> extend from EOFTimersHandler. I'd do that because ProcessFunction does not
>> have an init/open method where we could register the handler.
>>
>> On operator level I'd have a registration method in InternalTimerService.
>> I believe that's the only way to handle the above ProcessFunction aproach.
>> E.g. in KeyedProcessOperator you need to check if the UDF extend from the
>> interface not the operator itself.
>>
>> Ad. 2
>>
>> I'd go with
>>
>> *(Keyed)ProcessFunction:*
>>
>> interface EOFTimersHandler {
>>
>>  void handleProcessingTimer(long timestamp, Context);
>>
>> }
>>
>> interface Context {
>>         public abstract <X> void output(OutputTag<X> outputTag, X value);
>>
>>         public abstract K getCurrentKey();
>>
>> // we can extend it for waitFor later
>>
>> }
>>
>> *ProcessingTimeService: *
>>
>> interface EOFTimersHandler {
>>
>>  void handleProcessingTimer(long timestamp, Context);
>>
>> }
>>
>> interface Context {
>>
>> // we can extend it for waitFor later
>>
>> }
>>
>> *InternalTimeService:*
>>
>> interface EOFTimersHandler {
>>
>>  void handleProcessingTimer(InternalTimer<K,N> timer Context);
>>
>> }
>>
>> interface Context {
>>
>> // we can extend it for waitFor later
>>
>> }
>>
>> Personally I'd not try to unify those places too much. They have also
>> different visibilities (public/internal), have access to different set of
>> metadata (key/namespace).
>>
>>
>> Ad 3.
>>
>> I don't like the having the trigger/cancel methods, because:
>>
>> 1. I don't like the back and forth between system and UDF
>>
>> 2. Yes, the biggest issue I have is with the possibility with registering
>> new timers. I am trying to be on the safe side here. I don't like the idea
>> of dropping them, because it is again making assumptions what users do with
>> those timers. What if they e.g. emit counter if it reached certain
>> threshold? We'd need an additional flag in the method that is the final
>> timer. My sentiment is that we're making it questionably easier to trigger
>> a timer for the cost of openning up for unforeseen problems with follow up
>> registration.
>>
>> Best,
>>
>> Dawid
>> On 30/11/2022 12:13, Yun Gao wrote:
>>
>> Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are 
>> already consistent with the callback option, and I don't think I opposed 
>> that we could modify the current internal implementation. But from my side 
>> it is still not clear what the actual interfaces are proposing. Let me first 
>> try to summarize that a bit:1) Which object does the handlers register on?It 
>> seems there are two options, one is to timer services (InternalTimerService/ 
>> ProcessingTimerService or some equivalent things after refactoring), the 
>> otherone is as a lifecycle of the operator. I'm now tending to the latter 
>> one, how do you think on this part?2) What is the interface of the 
>> handler?Option 1 is that interface SomeHandlerName { void 
>> processingTimer(Timer timer);}class Timer { long getTimestamp(); void 
>> trigger(); void cancel(); // Other actions if required. }But it seems there 
>> is controversy on whether to add actions to the timer class. If without 
>> that, with my understanding the interfaces of the Option 2 areinterface 
>> SomeHandlerName { void processTimer(Timer timer); }interface 
>> KeyedSomeHandlerName<KEY, NAMESPACE> { void 
>> processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class 
>> Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends Timer 
>> { KEY getKey(); NAMESPACE getNamespace();}void Context {void 
>> executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, 
>> if we could eliminate the logic of namespace, we could thenremove the 
>> namespace related type parameter and method from the interfaces.Do I 
>> understand right?Besides, I'm still fully got the reason that why we should 
>> not add the actions to the timer class, in consideration that it seems in 
>> most cases users could implement their logical with simply calling 
>> timer.trigger() (I think the repeat registration is indeed a problem, but I 
>> think we could ignore the timers registered during termination). Could you 
>> further enlighten me a bit on this part?Best,Yun 
>> Gao------------------------------------------------------------------From:Piotr
>>  Nowojski <pnowoj...@apache.org> <pnowoj...@apache.org>Send Time:2022 Nov. 
>> 30 (Wed.) 17:10To:dev <dev@flink.apache.org> 
>> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the 
>> Processing Timers on Job TerminationHi,I have a couple of remarks.First a 
>> general one. For me the important part in the design of this API ishow to 
>> expose this to Flink users in public interfaces. NamelyProcessFunction and 
>> StreamOperator. InternalTimerService is an internalclass, so we can change 
>> it and break it as needed in the future.For registering a handler like 
>> proposed by Dawid:interface SomeHandlerName { void onTimer(/* whatever type 
>> it is */ timer, Context ctx ) { }}makes sense to me. For the 
>> InternalTimerService I think it doesn't mattertoo much what we do. We could 
>> provide a similar interface as for theProcessFunction/StreamOperator, it 
>> doesn't have to be the same one. On thecontrary, I think it shouldn't be the 
>> same, as part of this effort weshouldn't be exposing the concept of 
>> `Namespaces` to the public facing API.Re the "waitFor". Theoretically I see 
>> arguments why users might want to usethis, but I'm also not convinced 
>> whether that's necessary in practice. Iwould be +1 either way. First version 
>> can be without this functionality andwe can add it later (given that we 
>> designed a good place to add it in thefuture, like the `Context` proposed by 
>> Dawid). But I'm also fine adding itnow if others are 
>> insisting.Best,Piotrekśr., 30 lis 2022 o 09:18 Dawid Wysakowicz 
>> <dwysakow...@apache.org> <dwysakow...@apache.org>napisał(a):
>>
>> WindowOperator is not implemented by users. I can see that 
>> forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, 
>> NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) 
>> {doHandleTimer(timer);}I don't see a problem with that.As you said 
>> ProcessingTimeService is a user facing interface andcompletely unrelated to 
>> the InternalTimerService. I don't see a reasonwhy we'd need to unify 
>> those.As for the waitFor behaviour. Personally, I have not been convinced 
>> itis necessary. Maybe it's just my lack of vision, but I can't think of 
>> ascenario where I'd use it. Still if we need it, I'd go for something 
>> like:void onTimer(/* whatever type it is */ timer, Context ctx ) {}interface 
>> Context {void executeAtScheduledTime(Consumer<timer> handler);}That way you 
>> have independent simple interfaces that need to work onlyin a single well 
>> defined scenario and you don't need to match aninterface to multiple 
>> different cases.Best,DawidOn 30/11/2022 07:27, Yun Gao wrote:
>>
>> Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I 
>> also prefer to use simplebut flexible interfaces, but it still looks there 
>> are some problem tojust let users to implement the termination actions.Let's 
>> take the WindowOperator as an example. As seen in [1],in the timer 
>> processing logic it needs to acquire the key / namespaceinformation bound to 
>> the timer (which is only supported by the
>>
>> InternalTimerService).
>>
>> Thus if we want users to implement the same logic on termination, we
>>
>> either let users
>>
>> to trigger the timer handler directly or we also allows users to access
>>
>> these piece of
>>
>> information. If we go with the later direction, we might need to provide
>>
>> interfaces like
>>
>> interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, 
>> NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> {long 
>> getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll have the 
>> issue that since we need the interface to handle
>>
>> both of cases of
>>
>> InternalTimerSerivce and raw ProcessTimeService, the later do not have
>>
>> key and
>>
>> namespace information attached, and its also be a bit inconsistency for
>>
>> users to have to set
>>
>> the KEY and NAMESPACE types.Besides, it looks to me that if we want to 
>> implement behaviors like
>>
>> waiting for, it might
>>
>> be not simply reuse the time handler time, then it requires every
>>
>> operator authors to
>>
>> re-implement such waiting logics.
>>
>> Moreover it still have the downside that if you call back to the
>>
>> `onTimer` method after
>>
>> `trigger` you have access to the Context which lets you register new
>>
>> timers.
>>
>> I think we could simply drop the timers registered during we start
>>
>> processing the pending timers
>>
>> on termination. Logically there should be no new data after termination.
>>
>> I think I am not convinced to these arguments. First of all I'm afraid
>>
>> there is no clear distinction
>>
>> in that area what is runtime and what is not. I always found
>>
>> `AbstracStreamOperator(*)` actually part
>>
>> of runtime or Flink's internals and thus I don't find
>>
>> `InternalTimerService` a utility, but a vital part
>>
>> of the system. Let's be honest it is impossible to implement an
>>
>> operator without extending from
>>
>> `AbstractStreamOperator*`.What would be the problem with having a
>>
>> proper implementation in
>>
>> `InternalTimerService`? Can't we do it like this?:
>>
>> I think the original paragraph is only explanation to that the interface
>>
>> is harder to support if we
>>
>> allows the users to implement the arbitrary logic. But since now we are
>>
>> at the page with the callback
>>
>> option, users could always be allowed to implement arbitrary logic no
>>
>> matter we support timer.trigger()
>>
>> or not, thus I think now there is no divergence on this point. I also
>>
>> believe in we'll finally have some logic
>>
>> similar to the proposed one that drain all the times and process it.Best,Yun 
>> Gao[1]
>>
>> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
>>  
>> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
>>  > 
>> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
>>  
>> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
>>  > 
>> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488>
>>
>> ------------------------------------------------------------------From:Dawid 
>> Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org>Send Time:2022 
>> Nov. 28 (Mon.) 23:33To:dev <dev@flink.apache.org> 
>> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the 
>> Processing Timers
>>
>> on Job Termination
>>
>> Do we really need to have separate methods for
>>
>> triggering/waiting/cancelling. To me it sounds rather counterintuitive. 
>> Whycan't users just execute whatever they want in the handler itself 
>> insteadof additional back and forth with the system? Moreover it still have 
>> thedownside that if you call back to the `onTimer` method after `trigger` 
>> youhave access to the Context which lets you register new timers.
>>
>> I find following approach much simpler:void onTimer(...) 
>> {doHandleTimer(timestamp);}void processPendingTimer(...) {// 
>> triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry I 
>> might not make it very clear here. I think the difficulty with
>>
>> supported setting the currentKey is a special issue for the callbackoptions 
>> (no matter what the interface is) since it allows users to executelogic 
>> other than the one registered with the timers. The complexity comesfrom that 
>> currently we have two level of TimerServices: TheProcessingTimerService 
>> (there is no key) and InternalTimerService (withkey). Currently only 
>> ProcessingTimerService is exposed to the runtime andInternalTimerService is 
>> much more a utility to implement the operator. Thenwith the current code, 
>> the runtime could only access toProcessingTimerService on termination.
>>
>> I think I am not convinced to these arguments. First of all I'm afraid
>>
>> there is no clear distinction in that area what is runtime and what is not.I 
>> always found `AbstracStreamOperator(*)` actually part of runtime orFlink's 
>> internals and thus I don't find `InternalTimerService` a utility,but a vital 
>> part of the system. Let's be honest it is impossible toimplement an operator 
>> without extending from `AbstractStreamOperator*`.
>>
>> What would be the problem with having a proper implementation in
>>
>> `InternalTimerService`? Can't we do it like this?:
>>
>> AbstractStreamOperator#finish() 
>> {internalTimerService.finish();}InternalTimerService#finish() {while ((timer 
>> = processingTimeTimersQueue.peek()) != null) 
>> {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If
>>  we only executes some predefined actions, we do not need to worry
>>
>> about the implementation of InternalTimerService and just execute 
>> theregistered timers. But if we allow users to execute arbitrary logic, 
>> weneed to be also aware of the InternalTimerServices and parse the key 
>> fromthe timers stored in it. I think we should always have method to 
>> overcomethis issue, but to support the callback options would be more 
>> complex.
>>
>> I am not sure, having "predefined actions" would be good enough that we
>>
>> do not need to set a key. As a user I'd anyhow expect the proper key to 
>> beset in processPendingTimer.
>>
>> Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks 
>> for the discussion! First IMO it seems we
>>
>> have reached the consensus on the high-level API: Most operators 
>> shouldusually have only one reasonable action to the pending timers 
>> ontermination, thus we could let the operators to implement its own 
>> actionswith the low-level interface provided. The only exception is 
>> theProcessFunction, with which users might register customized timers, 
>> thususers might also defines the actions on termination (If I 
>> havemisunderstandings here, please correct me). For the low-level API, I 
>> couldget the benefits with the callback options: since in most cases an 
>> operatorhas only one action to all the timers, its a waste for us to store 
>> the sameflag for all the timers, also with a lot of code / state format 
>> changes.But since it is enough for most users to simply trigger / cacnel 
>> thetimers, it would be redundant for users to implement the logic twice. 
>> Thusperhaps we might combine the benefits of the two options: We might have 
>> aseparate interface public interface TimerHandlersOnTermination { 
>> voidprocessPendingTimer(Timer timer, long currentTime); } public class Timer 
>> {long getRegisteredTimestamp(); void trigger(); void waitFor(); 
>> voidcancel(); } Then if an operator have implemented 
>> theTimerHandlersOnTermination interface, on termination we could 
>> callprocessPendingTimer(xx) for every pending timers. Users might 
>> simplytrigger / waitFor / cancel it, or execute some other logics if needed. 
>> Thenfor the ProcessFunction we might have a similar interface 
>> toprocessPendingTimer, except we might need to provide Context / Collector 
>> tothe ProcessFunction. Do you think this would be a good direction? 
>> Also@Piotr I don't see a problem here. Interface doesn't have to reflect 
>> that,only the runtime must set the correct key context before executing 
>> thehandler dealing with the processing time timers at the end of 
>> input/time.Sorry I might not make it very clear here. I think the difficulty 
>> withsupported setting the currentKey is a special issue for the 
>> callbackoptions (no matter what the interface is) since it allows users to 
>> executelogic other than the one registered with the timers. The complexity 
>> comesfrom that currently we have two level of TimerServices: 
>> TheProcessingTimerService (there is no key) and InternalTimerService 
>> (withkey). Currently only ProcessingTimerService is exposed to the runtime 
>> andInternalTimerService is much more a utility to implement the operator. 
>> Thenwith the current code, the runtime could only access 
>> toProcessingTimerService on termination. If we only executes some 
>> predefinedactions, we do not need to worry about the implementation 
>> ofInternalTimerService and just execute the registered timers. But if 
>> weallow users to execute arbitrary logic, we need to be also aware of 
>> theInternalTimerServices and parse the key from the timers stored in it. 
>> Ithink we should always have method to overcome this issue, but to 
>> supportthe callback options would be more complex. Best, Yun 
>> Gao------------------------------------------------------------------From:Divye
>>  Kapoor <dkap...@pinterest.com.INVALID> <dkap...@pinterest.com.INVALID> 
>> <mailto:dkap...@pinterest.com.INVALID > <dkap...@pinterest.com.INVALID> Send 
>> Time:2022 Nov. 24 (Thu.) 08:50To:dev <dev@flink.apache.org> 
>> <dev@flink.apache.org> <mailto:dev@flink.apache.org > <dev@flink.apache.org> 
>> Cc:XenonDevelopment Team <xenon-...@pinterest.com> <xenon-...@pinterest.com> 
>> <mailto:xenon-...@pinterest.com <xenon-...@pinterest.com>
>>
>> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
>>
>> Timers on Job Termination Sounds good. Looks like we're on the same 
>> page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski 
>> <pnowoj...@apache.org> <mailto:pnowoj...@apache.org > <pnowoj...@apache.org> 
>> wrote: Hi Divye Ithink we are mostly on the same page. Just to 
>> clarify/rephrase: One thingto think about - on EOF “trigger immediately” 
>> will mean that theasynchronous wait timeout timers will also fire - which is 
>> undesirable Ididn't mean to fire all timers immediately in all of the 
>> built-inoperators. Just that each built-in operator can have a hard coded 
>> way(without a way for users to change it) to handle those timers. 
>> Windowedoperators would trigger the lingering timers (flush 
>> outputs),AsyncWaitOperator could just ignore them. The same way users could 
>> registerEOF timer handlers in the ProcessFunction as Dawid Wysakowicz 
>> proposed, we(as flink developers) could use the same mechanism to implement 
>> anybehaviour we want for the built-in operators. There should be no need 
>> toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 
>> DivyeKapoor <dkap...@pinterest.com.invalid> <dkap...@pinterest.com.invalid> 
>> <mailto:dkap...@pinterest.com.invalid > <dkap...@pinterest.com.invalid> 
>> napisał(a): Thanks Yun/Piotrek, Somebrief comments inline below. On Tue, Nov 
>> 22, 2022 at 1:37 AM Piotr Nowojski<pnowoj...@apache.org> 
>> <pnowoj...@apache.org> <mailto:pnowoj...@apache.org > <pnowoj...@apache.org> 
>> wrote: Hi, All inall I would agree with Dawid's proposal. +1 We can add the 
>> flexibility ofhow to deal with the timers in the low level API via adding a 
>> handler - ifsomeone needs to customize it, he will always have a workaround. 
>> Note aftergiving it more thought, I agree that registering some handlers is 
>> betterthan overloading the register timer method and modifying the timer's 
>> state.+1. At the same time, we can force the most sensible semantic that we 
>> thinkfor the couple of built-in operators, which should be 
>> prettystraightforward (either ignore the timers, or fire them at once). I 
>> agreethere might be some edge cases, that theoretically user might want to 
>> waitfor the timer to fire naturally, but: 1. I'm not sure how common 
>> inpractice this will be. If not at all, then why should we be 
>> complicatingthe API/system? That’s fair. However, the specifics are very 
>> importanthere. One thing to think about - on EOF “trigger immediately” will 
>> meanthat the asynchronous wait timeout timers will also fire - which 
>> isundesirable (because they are racing with the last async call). 
>> However,the issue is cleanly resolved by waiting for the timer to be 
>> canceled whenthe last event is processed. (“Wait for” case). Ignoring the 
>> timer has theleast justification. Registering the handler as per Dawid’s 
>> proposal andhaving that handler unregister the timers on EOF makes best 
>> sense. Thissolution also unifies the trigger immediately case as that 
>> handler canreregister the timers for early termination. The proposal: 1. 
>> Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts 
>> theregistered timers for early trigger or ignore. If wait-for behavior 
>> isdesired, timers are not changed. This is controlled in client code. 
>> 4.Operator waits for all timers to drain/trigger. (“Always”). There is 
>> nospecial handling for ignore/early trigger. 5. Operator allows job 
>> toproceed with shutdown. The only api change needed is an EOF handler. 
>> Theother agreement we need is that “Wait for” is the desired behavior 
>> inprocessing time and that processing time is fundamentally different 
>> fromevent time in this respect. (I have changed my thinking since the 
>> lastmail). 2. We can always expand the API in the future, and let the 
>> useroverride the default built-in behaviour of the operators via some setter 
>> onthe stream transformation (`SingleOutputStreamOperator`), or via 
>> somecustom API DSL style in each of the operators separately. This is 
>> notrequired. See above. Re forcing the same semantics for processing 
>> timetimers as for event time ones - this is tempting, but indeed I see 
>> apossibility that users need to adhere to some external constraints 
>> whenusing processing time. +1. As above, we should consider the 2 
>> casesfundamentally different in this area. Re: Yun - b) Another issue is 
>> thatwhat if users use timers with different termination actions in the 
>> sameoperator / UDF? For example, users use some kind of timeout (like 
>> throwsexception if some thing not happen after some other thing), and also 
>> somekind of window aggregation logic. In this case, without additional 
>> tags,users might not be able to distinguish which timer should be canceled 
>> andwhich time should be triggered ? as above. The EOF handler makes 
>> thechoice. 4. How could these scenarios adjust their APIs ? From the 
>> currentlisted scenarios, I'm more tend to that as @Dawid pointed out, there 
>> mightbe only one expected behavior for each scenario, thus it does not seems 
>> toneed to allow users to adjust the behavior. Thus @Divye may I have a 
>> doubleconfirmation currently do we have explicit scenarios that is expected 
>> tochange the different behaviors for the same scenario? Wait-for behavior 
>> isprobably the only expected behavior and any alterations should be from 
>> theEOF handler managing the registered timers. Besides @Divye from the 
>> listedscenarios, I have another concern for global configuration is that for 
>> onejob, different operators seems to still might have different 
>> expectedbehaviors. For example, A job using both Window operator 
>> andAsyncWaitOperator might have different requirements for timers 
>> ontermination? Thank you for raising this case. This changed my 
>> thinking.Based on your point, we should try and align on the “Wait-for” with 
>> EOFhandler proposal. I’m withdrawing the “single-runtime-config” 
>> proposal.Best, Divye
>>
>>

Reply via email to