Sounds good to me as well! Best, Piotrek
czw., 8 gru 2022 o 09:53 Dawid Wysakowicz <dwysakow...@apache.org> napisał(a): > Sounds like a good plan to me. > On 08/12/2022 08:58, Yun Gao wrote: > > Hi Dawid, > > Very thanks for the discussion and sorry for the delayed response > since I was hesitated on some points. > > But as a whole, with some more thought, first I agree with that adding > the trigger() / cancle() methods to some kind of timer object is not > necessary > for us to achieve the exactly-once for the operators. We could follow the > direction of "modifying the implementation of the operators" to achieve the > same target. > > But continue to think with this direction, it now looks to me it is also > not > needed to add the callback to the timer services: > 1. For InternalTimerService, the operators could just call > `InternalTimerService > #forEachProcessingTimer()` on finish to handle the pending timers. > 2. For the timers registered to the underlying ProcessingTimerService, at > least in > the currently listed scenarios, the operators itself knows what is the > remaining work > (e.g., the FileWriter knows if it has in-progress file to flush). > > Operators could handle the remaining timers in finish() method. > > Then the only interface we need to consider is that added to the > ProcessFunction. The > current interface also looks ok to me. > > If you think the above option works, I could first have a PoC that > demonstrate it is sufficient > to only modify the operator implementation to handling the remaining > workers properly on > finish(). If there are new issues I'll post here and we could have some > more discussion. > > Best, > Yun Gao > > > ------------------Original Mail ------------------ > *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > *Send Date:*Fri Dec 2 21:21:25 2022 > *Recipients:*Dev <dev@flink.apache.org> <dev@flink.apache.org> > *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers > on Job Termination > >> Ad. 1 >> >> I'd start with ProcessingTimerService as that's the only public >> interface. It is exposed in the Sink V2 interface. In this scenario it >> would be the Sink interface that need to extend from a EOFTimersHandler. I >> believe it would be hard to pass it from there to the ProcessingTimeService >> as it is passed from the outside e.g. in the ProcessingTimeServiceAware. >> For that reason I'd go with a registration method in that interface. >> >> In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can >> extend from EOFTimersHandler. I'd do that because ProcessFunction does not >> have an init/open method where we could register the handler. >> >> On operator level I'd have a registration method in InternalTimerService. >> I believe that's the only way to handle the above ProcessFunction aproach. >> E.g. in KeyedProcessOperator you need to check if the UDF extend from the >> interface not the operator itself. >> >> Ad. 2 >> >> I'd go with >> >> *(Keyed)ProcessFunction:* >> >> interface EOFTimersHandler { >> >> void handleProcessingTimer(long timestamp, Context); >> >> } >> >> interface Context { >> public abstract <X> void output(OutputTag<X> outputTag, X value); >> >> public abstract K getCurrentKey(); >> >> // we can extend it for waitFor later >> >> } >> >> *ProcessingTimeService: * >> >> interface EOFTimersHandler { >> >> void handleProcessingTimer(long timestamp, Context); >> >> } >> >> interface Context { >> >> // we can extend it for waitFor later >> >> } >> >> *InternalTimeService:* >> >> interface EOFTimersHandler { >> >> void handleProcessingTimer(InternalTimer<K,N> timer Context); >> >> } >> >> interface Context { >> >> // we can extend it for waitFor later >> >> } >> >> Personally I'd not try to unify those places too much. They have also >> different visibilities (public/internal), have access to different set of >> metadata (key/namespace). >> >> >> Ad 3. >> >> I don't like the having the trigger/cancel methods, because: >> >> 1. I don't like the back and forth between system and UDF >> >> 2. Yes, the biggest issue I have is with the possibility with registering >> new timers. I am trying to be on the safe side here. I don't like the idea >> of dropping them, because it is again making assumptions what users do with >> those timers. What if they e.g. emit counter if it reached certain >> threshold? We'd need an additional flag in the method that is the final >> timer. My sentiment is that we're making it questionably easier to trigger >> a timer for the cost of openning up for unforeseen problems with follow up >> registration. >> >> Best, >> >> Dawid >> On 30/11/2022 12:13, Yun Gao wrote: >> >> Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are >> already consistent with the callback option, and I don't think I opposed >> that we could modify the current internal implementation. But from my side >> it is still not clear what the actual interfaces are proposing. Let me first >> try to summarize that a bit:1) Which object does the handlers register on?It >> seems there are two options, one is to timer services (InternalTimerService/ >> ProcessingTimerService or some equivalent things after refactoring), the >> otherone is as a lifecycle of the operator. I'm now tending to the latter >> one, how do you think on this part?2) What is the interface of the >> handler?Option 1 is that interface SomeHandlerName { void >> processingTimer(Timer timer);}class Timer { long getTimestamp(); void >> trigger(); void cancel(); // Other actions if required. }But it seems there >> is controversy on whether to add actions to the timer class. If without >> that, with my understanding the interfaces of the Option 2 areinterface >> SomeHandlerName { void processTimer(Timer timer); }interface >> KeyedSomeHandlerName<KEY, NAMESPACE> { void >> processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class >> Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends Timer >> { KEY getKey(); NAMESPACE getNamespace();}void Context {void >> executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, >> if we could eliminate the logic of namespace, we could thenremove the >> namespace related type parameter and method from the interfaces.Do I >> understand right?Besides, I'm still fully got the reason that why we should >> not add the actions to the timer class, in consideration that it seems in >> most cases users could implement their logical with simply calling >> timer.trigger() (I think the repeat registration is indeed a problem, but I >> think we could ignore the timers registered during termination). Could you >> further enlighten me a bit on this part?Best,Yun >> Gao------------------------------------------------------------------From:Piotr >> Nowojski <pnowoj...@apache.org> <pnowoj...@apache.org>Send Time:2022 Nov. >> 30 (Wed.) 17:10To:dev <dev@flink.apache.org> >> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the >> Processing Timers on Job TerminationHi,I have a couple of remarks.First a >> general one. For me the important part in the design of this API ishow to >> expose this to Flink users in public interfaces. NamelyProcessFunction and >> StreamOperator. InternalTimerService is an internalclass, so we can change >> it and break it as needed in the future.For registering a handler like >> proposed by Dawid:interface SomeHandlerName { void onTimer(/* whatever type >> it is */ timer, Context ctx ) { }}makes sense to me. For the >> InternalTimerService I think it doesn't mattertoo much what we do. We could >> provide a similar interface as for theProcessFunction/StreamOperator, it >> doesn't have to be the same one. On thecontrary, I think it shouldn't be the >> same, as part of this effort weshouldn't be exposing the concept of >> `Namespaces` to the public facing API.Re the "waitFor". Theoretically I see >> arguments why users might want to usethis, but I'm also not convinced >> whether that's necessary in practice. Iwould be +1 either way. First version >> can be without this functionality andwe can add it later (given that we >> designed a good place to add it in thefuture, like the `Context` proposed by >> Dawid). But I'm also fine adding itnow if others are >> insisting.Best,Piotrekśr., 30 lis 2022 o 09:18 Dawid Wysakowicz >> <dwysakow...@apache.org> <dwysakow...@apache.org>napisał(a): >> >> WindowOperator is not implemented by users. I can see that >> forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, >> NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) >> {doHandleTimer(timer);}I don't see a problem with that.As you said >> ProcessingTimeService is a user facing interface andcompletely unrelated to >> the InternalTimerService. I don't see a reasonwhy we'd need to unify >> those.As for the waitFor behaviour. Personally, I have not been convinced >> itis necessary. Maybe it's just my lack of vision, but I can't think of >> ascenario where I'd use it. Still if we need it, I'd go for something >> like:void onTimer(/* whatever type it is */ timer, Context ctx ) {}interface >> Context {void executeAtScheduledTime(Consumer<timer> handler);}That way you >> have independent simple interfaces that need to work onlyin a single well >> defined scenario and you don't need to match aninterface to multiple >> different cases.Best,DawidOn 30/11/2022 07:27, Yun Gao wrote: >> >> Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I >> also prefer to use simplebut flexible interfaces, but it still looks there >> are some problem tojust let users to implement the termination actions.Let's >> take the WindowOperator as an example. As seen in [1],in the timer >> processing logic it needs to acquire the key / namespaceinformation bound to >> the timer (which is only supported by the >> >> InternalTimerService). >> >> Thus if we want users to implement the same logic on termination, we >> >> either let users >> >> to trigger the timer handler directly or we also allows users to access >> >> these piece of >> >> information. If we go with the later direction, we might need to provide >> >> interfaces like >> >> interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, >> NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> {long >> getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll have the >> issue that since we need the interface to handle >> >> both of cases of >> >> InternalTimerSerivce and raw ProcessTimeService, the later do not have >> >> key and >> >> namespace information attached, and its also be a bit inconsistency for >> >> users to have to set >> >> the KEY and NAMESPACE types.Besides, it looks to me that if we want to >> implement behaviors like >> >> waiting for, it might >> >> be not simply reuse the time handler time, then it requires every >> >> operator authors to >> >> re-implement such waiting logics. >> >> Moreover it still have the downside that if you call back to the >> >> `onTimer` method after >> >> `trigger` you have access to the Context which lets you register new >> >> timers. >> >> I think we could simply drop the timers registered during we start >> >> processing the pending timers >> >> on termination. Logically there should be no new data after termination. >> >> I think I am not convinced to these arguments. First of all I'm afraid >> >> there is no clear distinction >> >> in that area what is runtime and what is not. I always found >> >> `AbstracStreamOperator(*)` actually part >> >> of runtime or Flink's internals and thus I don't find >> >> `InternalTimerService` a utility, but a vital part >> >> of the system. Let's be honest it is impossible to implement an >> >> operator without extending from >> >> `AbstractStreamOperator*`.What would be the problem with having a >> >> proper implementation in >> >> `InternalTimerService`? Can't we do it like this?: >> >> I think the original paragraph is only explanation to that the interface >> >> is harder to support if we >> >> allows the users to implement the arbitrary logic. But since now we are >> >> at the page with the callback >> >> option, users could always be allowed to implement arbitrary logic no >> >> matter we support timer.trigger() >> >> or not, thus I think now there is no divergence on this point. I also >> >> believe in we'll finally have some logic >> >> similar to the proposed one that drain all the times and process it.Best,Yun >> Gao[1] >> >> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 >> >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 >> > >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 >> >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 >> > >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488> >> >> ------------------------------------------------------------------From:Dawid >> Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org>Send Time:2022 >> Nov. 28 (Mon.) 23:33To:dev <dev@flink.apache.org> >> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the >> Processing Timers >> >> on Job Termination >> >> Do we really need to have separate methods for >> >> triggering/waiting/cancelling. To me it sounds rather counterintuitive. >> Whycan't users just execute whatever they want in the handler itself >> insteadof additional back and forth with the system? Moreover it still have >> thedownside that if you call back to the `onTimer` method after `trigger` >> youhave access to the Context which lets you register new timers. >> >> I find following approach much simpler:void onTimer(...) >> {doHandleTimer(timestamp);}void processPendingTimer(...) {// >> triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry I >> might not make it very clear here. I think the difficulty with >> >> supported setting the currentKey is a special issue for the callbackoptions >> (no matter what the interface is) since it allows users to executelogic >> other than the one registered with the timers. The complexity comesfrom that >> currently we have two level of TimerServices: TheProcessingTimerService >> (there is no key) and InternalTimerService (withkey). Currently only >> ProcessingTimerService is exposed to the runtime andInternalTimerService is >> much more a utility to implement the operator. Thenwith the current code, >> the runtime could only access toProcessingTimerService on termination. >> >> I think I am not convinced to these arguments. First of all I'm afraid >> >> there is no clear distinction in that area what is runtime and what is not.I >> always found `AbstracStreamOperator(*)` actually part of runtime orFlink's >> internals and thus I don't find `InternalTimerService` a utility,but a vital >> part of the system. Let's be honest it is impossible toimplement an operator >> without extending from `AbstractStreamOperator*`. >> >> What would be the problem with having a proper implementation in >> >> `InternalTimerService`? Can't we do it like this?: >> >> AbstractStreamOperator#finish() >> {internalTimerService.finish();}InternalTimerService#finish() {while ((timer >> = processingTimeTimersQueue.peek()) != null) >> {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If >> we only executes some predefined actions, we do not need to worry >> >> about the implementation of InternalTimerService and just execute >> theregistered timers. But if we allow users to execute arbitrary logic, >> weneed to be also aware of the InternalTimerServices and parse the key >> fromthe timers stored in it. I think we should always have method to >> overcomethis issue, but to support the callback options would be more >> complex. >> >> I am not sure, having "predefined actions" would be good enough that we >> >> do not need to set a key. As a user I'd anyhow expect the proper key to >> beset in processPendingTimer. >> >> Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks >> for the discussion! First IMO it seems we >> >> have reached the consensus on the high-level API: Most operators >> shouldusually have only one reasonable action to the pending timers >> ontermination, thus we could let the operators to implement its own >> actionswith the low-level interface provided. The only exception is >> theProcessFunction, with which users might register customized timers, >> thususers might also defines the actions on termination (If I >> havemisunderstandings here, please correct me). For the low-level API, I >> couldget the benefits with the callback options: since in most cases an >> operatorhas only one action to all the timers, its a waste for us to store >> the sameflag for all the timers, also with a lot of code / state format >> changes.But since it is enough for most users to simply trigger / cacnel >> thetimers, it would be redundant for users to implement the logic twice. >> Thusperhaps we might combine the benefits of the two options: We might have >> aseparate interface public interface TimerHandlersOnTermination { >> voidprocessPendingTimer(Timer timer, long currentTime); } public class Timer >> {long getRegisteredTimestamp(); void trigger(); void waitFor(); >> voidcancel(); } Then if an operator have implemented >> theTimerHandlersOnTermination interface, on termination we could >> callprocessPendingTimer(xx) for every pending timers. Users might >> simplytrigger / waitFor / cancel it, or execute some other logics if needed. >> Thenfor the ProcessFunction we might have a similar interface >> toprocessPendingTimer, except we might need to provide Context / Collector >> tothe ProcessFunction. Do you think this would be a good direction? >> Also@Piotr I don't see a problem here. Interface doesn't have to reflect >> that,only the runtime must set the correct key context before executing >> thehandler dealing with the processing time timers at the end of >> input/time.Sorry I might not make it very clear here. I think the difficulty >> withsupported setting the currentKey is a special issue for the >> callbackoptions (no matter what the interface is) since it allows users to >> executelogic other than the one registered with the timers. The complexity >> comesfrom that currently we have two level of TimerServices: >> TheProcessingTimerService (there is no key) and InternalTimerService >> (withkey). Currently only ProcessingTimerService is exposed to the runtime >> andInternalTimerService is much more a utility to implement the operator. >> Thenwith the current code, the runtime could only access >> toProcessingTimerService on termination. If we only executes some >> predefinedactions, we do not need to worry about the implementation >> ofInternalTimerService and just execute the registered timers. But if >> weallow users to execute arbitrary logic, we need to be also aware of >> theInternalTimerServices and parse the key from the timers stored in it. >> Ithink we should always have method to overcome this issue, but to >> supportthe callback options would be more complex. Best, Yun >> Gao------------------------------------------------------------------From:Divye >> Kapoor <dkap...@pinterest.com.INVALID> <dkap...@pinterest.com.INVALID> >> <mailto:dkap...@pinterest.com.INVALID > <dkap...@pinterest.com.INVALID> Send >> Time:2022 Nov. 24 (Thu.) 08:50To:dev <dev@flink.apache.org> >> <dev@flink.apache.org> <mailto:dev@flink.apache.org > <dev@flink.apache.org> >> Cc:XenonDevelopment Team <xenon-...@pinterest.com> <xenon-...@pinterest.com> >> <mailto:xenon-...@pinterest.com <xenon-...@pinterest.com> >> >> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing >> >> Timers on Job Termination Sounds good. Looks like we're on the same >> page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski >> <pnowoj...@apache.org> <mailto:pnowoj...@apache.org > <pnowoj...@apache.org> >> wrote: Hi Divye Ithink we are mostly on the same page. Just to >> clarify/rephrase: One thingto think about - on EOF “trigger immediately” >> will mean that theasynchronous wait timeout timers will also fire - which is >> undesirable Ididn't mean to fire all timers immediately in all of the >> built-inoperators. Just that each built-in operator can have a hard coded >> way(without a way for users to change it) to handle those timers. >> Windowedoperators would trigger the lingering timers (flush >> outputs),AsyncWaitOperator could just ignore them. The same way users could >> registerEOF timer handlers in the ProcessFunction as Dawid Wysakowicz >> proposed, we(as flink developers) could use the same mechanism to implement >> anybehaviour we want for the built-in operators. There should be no need >> toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 >> DivyeKapoor <dkap...@pinterest.com.invalid> <dkap...@pinterest.com.invalid> >> <mailto:dkap...@pinterest.com.invalid > <dkap...@pinterest.com.invalid> >> napisał(a): Thanks Yun/Piotrek, Somebrief comments inline below. On Tue, Nov >> 22, 2022 at 1:37 AM Piotr Nowojski<pnowoj...@apache.org> >> <pnowoj...@apache.org> <mailto:pnowoj...@apache.org > <pnowoj...@apache.org> >> wrote: Hi, All inall I would agree with Dawid's proposal. +1 We can add the >> flexibility ofhow to deal with the timers in the low level API via adding a >> handler - ifsomeone needs to customize it, he will always have a workaround. >> Note aftergiving it more thought, I agree that registering some handlers is >> betterthan overloading the register timer method and modifying the timer's >> state.+1. At the same time, we can force the most sensible semantic that we >> thinkfor the couple of built-in operators, which should be >> prettystraightforward (either ignore the timers, or fire them at once). I >> agreethere might be some edge cases, that theoretically user might want to >> waitfor the timer to fire naturally, but: 1. I'm not sure how common >> inpractice this will be. If not at all, then why should we be >> complicatingthe API/system? That’s fair. However, the specifics are very >> importanthere. One thing to think about - on EOF “trigger immediately” will >> meanthat the asynchronous wait timeout timers will also fire - which >> isundesirable (because they are racing with the last async call). >> However,the issue is cleanly resolved by waiting for the timer to be >> canceled whenthe last event is processed. (“Wait for” case). Ignoring the >> timer has theleast justification. Registering the handler as per Dawid’s >> proposal andhaving that handler unregister the timers on EOF makes best >> sense. Thissolution also unifies the trigger immediately case as that >> handler canreregister the timers for early termination. The proposal: 1. >> Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts >> theregistered timers for early trigger or ignore. If wait-for behavior >> isdesired, timers are not changed. This is controlled in client code. >> 4.Operator waits for all timers to drain/trigger. (“Always”). There is >> nospecial handling for ignore/early trigger. 5. Operator allows job >> toproceed with shutdown. The only api change needed is an EOF handler. >> Theother agreement we need is that “Wait for” is the desired behavior >> inprocessing time and that processing time is fundamentally different >> fromevent time in this respect. (I have changed my thinking since the >> lastmail). 2. We can always expand the API in the future, and let the >> useroverride the default built-in behaviour of the operators via some setter >> onthe stream transformation (`SingleOutputStreamOperator`), or via >> somecustom API DSL style in each of the operators separately. This is >> notrequired. See above. Re forcing the same semantics for processing >> timetimers as for event time ones - this is tempting, but indeed I see >> apossibility that users need to adhere to some external constraints >> whenusing processing time. +1. As above, we should consider the 2 >> casesfundamentally different in this area. Re: Yun - b) Another issue is >> thatwhat if users use timers with different termination actions in the >> sameoperator / UDF? For example, users use some kind of timeout (like >> throwsexception if some thing not happen after some other thing), and also >> somekind of window aggregation logic. In this case, without additional >> tags,users might not be able to distinguish which timer should be canceled >> andwhich time should be triggered ? as above. The EOF handler makes >> thechoice. 4. How could these scenarios adjust their APIs ? From the >> currentlisted scenarios, I'm more tend to that as @Dawid pointed out, there >> mightbe only one expected behavior for each scenario, thus it does not seems >> toneed to allow users to adjust the behavior. Thus @Divye may I have a >> doubleconfirmation currently do we have explicit scenarios that is expected >> tochange the different behaviors for the same scenario? Wait-for behavior >> isprobably the only expected behavior and any alterations should be from >> theEOF handler managing the registered timers. Besides @Divye from the >> listedscenarios, I have another concern for global configuration is that for >> onejob, different operators seems to still might have different >> expectedbehaviors. For example, A job using both Window operator >> andAsyncWaitOperator might have different requirements for timers >> ontermination? Thank you for raising this case. This changed my >> thinking.Based on your point, we should try and align on the “Wait-for” with >> EOFhandler proposal. I’m withdrawing the “single-runtime-config” >> proposal.Best, Divye >> >>