Hi all, Thank you Panagiotis for proposing this. From the size of the thread, this is a much needed feature in Flink! Some thoughts, to extend those already adeptly summarised by Piotr, Matthias and Jing.
- scope of FLIP: +1 to scoping this FLIP to observability around a restart. That would include adding metadata + exposing metadata to external systems. IMO, introducing a new restart strategy solves different problems, is much larger scope and should be covered in a separate FLIP. - failure handling: At the moment, we propose transitioning the Flink job to a terminal FAILED state when JobListener fails, when the job could have transitioned to RESTARTING->RUNNING. If we are keeping in line with the scope to add metadata/observability around job restarts, we should not be affecting the running of the Flink job itself. Could I propose we instead log WARN/ERROR. - immutable context: +1 to keeping the contract clear via return types. - async operation: +1 to adding ioexecutor to context, however, given we don’t want to block the actual job restart on adding metadata / calling external services, should we consider returning and letting futures complete independently? - independent vs ordered execution: Should we consider making the order of execution deterministic (use a List instead of Set)? Once again, thank you for working on this. Regards, Hong > On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID> wrote: > > Hi, > > Thanks Panagiotis for this FLIP and thanks for all valuable discussions. > I'd like to share my two cents: > > - FailureListenerContext#addTag and FailureListenerContext#getTags. It > seems that we have to call getTags() and then do remove activities if we > want to delete any tags (according to the javadoc in the FLIP). It is > inconsistent for me too. Either offer addTag(), deleteTag(), and let > getTags() return immutable collection, or offer getTags() only to return > mutable collection. > > - label vs tag. Label is a great idea +1. AFAIC, tag could be a special > case of label, i.e. key="tag". It is convenient to offer the xxxTag() > method if the user only needs one label. I would love to have both of them. > Another thought is that tag implicitly contains the meaning of "immutable". > > - +1 for a separate FLIP of customized restart strategy. Attention should > be taken to make sure it works well with Flink built-in restartStrategy in > order to have the single source of truth. > > - execution order. The default independent execution should be fine. > According to the FailureListener interface definition in the FLIP, users > should be able to easily build a listener chain[1] to offer sequential > execution, e.g. public FailureListener(FailureListener nextListener). > Another option is to modify the interface or provide another interface > alongside the current one to extend the method to support ListenerChain, > i.e. void onFailure(Throwable cause, FailureListenerContext context, > ListenerChain listenerChain). Users can also mix them up. > > - naming. Afaiu, the pluggable extension is not limited to failure > enrichment. Conceptually it can do everything for the given failure, e.g. > start counting metric as the FLIP described, calling an external system, > sending notification to slack channel, etc. you name it. It sounds to me > more like a FailureActionListener - it can trigger actions based on > failure. Failure enrichment is one type of action. > > Best regards, > Jing > > [1] https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern > > On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl > <matthias.p...@aiven.io.invalid> wrote: > >> Thanks for the proposal, Panagiotis. A lot of good points have been already >> shared. I just want to add my view on some of the items: >> >> - independent execution vs ordered execution: I prefer the listeners being >> processed independently from each other because it adds less complexity >> code-wise. The use case Piotr described (where you want to reuse some other >> classifier) is the only one I can think of where we actually need >> classifiers depending on each other. Supporting such a use case right from >> the start feels a bit over-engineered and could be covered in a follow-up >> FLIP if we really come to that point where such a feature is requested by >> users. >> >> - key/value pairs instead of plain labels: I think that's a good idea. >> key/value pairs are more expressive. +1 >> >> - extending the FLIP to cover restart strategy: I understand Gen's concern >> about introducing too many different types of plugins. But I would still >> favor not extending the FLIP in this regard. A pluggable restart strategy >> sounds reasonable. But an error classifier and a restart strategy are still >> different enough to justify separate plugins, IMHO. And therefore, I would >> think that covering the restart strategy in a separate FLIP is the better >> option for the sake of simplicity. >> >> - immutable context: Passing in an immutable context and returning data >> through the interface method's return value sounds like a better approach >> to harden the contract of the interface. +1 for that proposal >> >> - async operation: I think David is right. An async interface makes the >> listener implementations more robust when it comes to heavy IO operations. >> The ioExecutor can be passed through the context object. +1 >> >> Matthias >> >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek <david.mora...@gmail.com> >> wrote: >> >>> *@Piotr* >>> >>> >>>> I was thinking about actually defining the order of the >>>> classifiers/handlers and not allowing them to be asynchronous. >>>> Asynchronousity would create some problems: when to actually return the >>>> error to the user? After all async responses will get back? Before, but >>>> without classified exception? It would also add implementation >> complexity >>>> and I think we can always expand the API with async version in the >> future >>>> if needed. >>> >>> >>> As long as the classifiers need to talk to an external system, we by >>> definition need to allow them to be asynchronous to unblock the main >> thread >>> for handling other RPCs. Exposing ioExecutor via the context proposed >> above >>> would be great. >>> >>> After all async responses will get back >>> >>> >>> This would be the same if we trigger them synchronously one by one, with >> a >>> caveat that synchronous execution might take significantly longer and >>> introduce unnecessary downtime to a job. >>> >>> D. >>> >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com> wrote: >>> >>>> Hi Piotr, >>>> >>>> It's fine to me to have a separate FLIP to extend this >> `FailureListener` >>>> to support custom restart strategy. >>>> >>>> What I was a bit concerned is that if we just treat the >> `FailureListener` >>>> as an error classifier which is not crucial to Flink framework process, >>>> we may design it to run asynchronously and not trigger Flink failures. >>>> This may be a blocker if later we want to enable it to support custom >>>> restart strategy. >>>> >>>> Thanks, >>>> Zhu >>>> >>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道: >>>>> >>>>> Hi Panagiotis, >>>>> >>>>> Thanks for the proposal. This is a very valuable feature and will be >> a >>>> good >>>>> add-on for Flink. >>>>> >>>>> I also think that it will be great if we can consider how to make it >>>>> possible for users to customize the failure handling in this FLIP. >> It's >>>>> highly related to the problem we want to address in this FLIP and >> could >>>>> avoid refactoring the interfaces proposed in this FLIP too quickly. >>>>> >>>>> Currently it treats all kinds of exceptions the same. However, some >>> kinds >>>>> of exceptions are actually not recoverable at all. It could let users >>> to >>>>> customize the failure handling logic to fail fast for certain known >>>>> unrecoverable exceptions and finally make these kinds of jobs get >>> noticed >>>>> and recoveried more quickly. >>>>> >>>>> Regards, >>>>> Dian >>>>> >>>>> >>>>> >>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo <luogen...@gmail.com> wrote: >>>>> >>>>>> Hi Panagiotis, >>>>>> >>>>>> Thanks for the proposal. >>>>>> >>>>>> It's useful to enrich the information so that users can be more >>>>>> clear why the job is failing, especially platform developers who >>>>>> need to provide the information to their end users. >>>>>> And for the very FLIP, I'd prefer the naming `FailureEnricher` >>>>>> proposed by David, as the plugin doesn't really handle the failure. >>>>>> >>>>>> However, like Zhu and Lijie said, I also joined a discussion >>>>>> recently about customized failure handling, e.g. counting the >>>>>> failure rate of pipeline regions separately, and failing the job >>>>>> when a specific error occurs, and so on. >>>>>> I suppose a custom restart strategy, or I'd call it a custom >>>>>> failure "handler", is indeed necessary. It can also enrich the >>>>>> information as the current proposed handler does. >>>>>> >>>>>> To avoid adding too many plugin interfaces which may confuse users >>>>>> and make the ExecutionFailureHandler more complex, >>>>>> I think it'd be better to consider the requirements at the same >> time. >>>>>> >>>>>> IMO, we can add a handler interface, then make the current restart >>>>>> strategy and the enricher both types of the handler. The handlers >>>>>> execute in sequence, and the failure is considered unrecoverable if >>>>>> any of the handlers decides. >>>>>> In this way, users can also implement a handler using the enriched >>>>>> information provided by the previous handlers, e.g. fail the job >> and >>>>>> send a notification if too many failures are caused by the end >> users. >>>>>> >>>>>> Best, >>>>>> Gen >>>>>> >>>>>> >>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu <huweihua....@gmail.com >>> >>>> wrote: >>>>>> >>>>>>> Hi Panagiotis, >>>>>>> >>>>>>> Thanks for your proposal. It is valuable to analyze the reason >> for >>>>>>> failure with the user plug-in. >>>>>>> >>>>>>> Making the context immutable could make the contract stronger. >>>>>>> Letting the listener return an enriching result may be a better >>> way. >>>>>>> >>>>>>> IIUC, listeners could do two things, enrich more information >>>>>> (tags/labels) >>>>>>> to FailureHandlingResult, and push data out of Flink (metrics or >>>>>>> something). >>>>>>> IMO, we could split these two types into Listener and Advisor >>> (maybe >>>>>>> other names). The Listener just pushes the data out and returns >>>> nothing >>>>>> to >>>>>>> Flink, so we can run these async and don't have to wait for >>>> Listener's >>>>>>> result. >>>>>>> The Advisor returns rich information to the FailureHadingResult, >>>> and it >>>>>>> should >>>>>>> have a lighter logic. >>>>>>> >>>>>>> >>>>>>> Supporting a custom restart strategy is also valuable. In this >>>> design, we >>>>>>> use >>>>>>> RestartStrategy to construct a FailureHandingResult, and then >> pass >>>> it to >>>>>>> Listener. >>>>>>> My question is, should we change the restart strategy interface >> to >>>>>> support >>>>>>> the >>>>>>> custom restart strategy, or keep the current restart strategy and >>>> let the >>>>>>> later >>>>>>> Listener enrich the restartable information to >>> FailureHandingResult? >>>> The >>>>>>> latter >>>>>>> may cause some confusion when we use a custom restart strategy. >>>>>>> The default flink restart strategy also runs but does not take >>>> effect. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Weihua >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang < >>>> wangdachui9...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Panagiotis, >>>>>>>> >>>>>>>> Thanks for driving this. >>>>>>>> >>>>>>>> +1 for supporting custom restart strategy, we did receive such >>>> requests >>>>>>>> from the user mailing list [1][2]. >>>>>>>> >>>>>>>> Besides, in current design, the plugin will only do some >>>> statistical >>>>>> and >>>>>>>> classification work, and will not affect the >>>> *FailureHandlingResult*. >>>>>>> Just >>>>>>>> listening, no handling, it doesn't quite match the title. >>>>>>>> >>>>>>>> [1] >>>> https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1 >>>>>>>> [2] >>>> https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx >>>>>>>> >>>>>>>> Best, >>>>>>>> Lijie >>>>>>>> >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道: >>>>>>>> >>>>>>>>> Hi Panagiotis, >>>>>>>>> >>>>>>>>> Thanks for creating this proposal! It's good to enable Flink >> to >>>>>> handle >>>>>>>>> different errors in different ways, through a pluggable way. >>>>>>>>> >>>>>>>>> There are requests for flexible restart strategies from time >> to >>>> time, >>>>>>> for >>>>>>>>> different strategies of restart backoff time, or to suppress >>>>>> restarting >>>>>>>>> on certain errors. Therefore, I think it's better that the >>>> proposed >>>>>>>>> failure handling plugin can also support custom restart >>>> strategies. >>>>>>>>> >>>>>>>>> Maybe we can call it FailureHandlingAdvisor which provides >> more >>>>>>>>> information (labels) and gives advice (restart backoff time, >>>> whether >>>>>>>>> to restart)? I do not have a strong opinion though, any >>>> explanatory >>>>>>>>> name would be good. >>>>>>>>> >>>>>>>>> To avoid unexpected mutation, how about to make the context >>>> immutable >>>>>>>>> and let the plugin return an immutable result? i.e. remove >> the >>>>>> setters >>>>>>>>> from the context, and let the plugin method return a result >>> which >>>>>>>>> contains `labels`, `canRestart` and `restartBackoffTime`. >> Flink >>>>>> should >>>>>>>>> apply the result to the context before invoking the next >>> plugin, >>>> so >>>>>>>>> that the next plugin will see the updated context. >>>>>>>>> >>>>>>>>> The plugin should avoid taking too much time to return the >>>> result, >>>>>>>> because >>>>>>>>> it will block the RPC and result in instability. However, it >>> can >>>>>> still >>>>>>>>> perform heavy actions in a different thread. The context can >>>> provide >>>>>> an >>>>>>>>> `ioExecutor` to the plugins for reuse. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Zhu >>>>>>>>> >>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道: >>>>>>>>>> >>>>>>>>>> Hi Panagiotis >>>>>>>>>> >>>>>>>>>> Thank you for your answer. I agree that `FailureListener` >>>> could be >>>>>>>>>> stateless, then I have some thoughts as follows >>>>>>>>>> >>>>>>>>>> 1. I see that listeners and tag collections are associated. >>>> When >>>>>>>>> JobManager >>>>>>>>>> fails and restarts, how can the new listener be associated >>>> with the >>>>>>> tag >>>>>>>>>> collection before failover? Is the listener loading order? >>>>>>>>>> >>>>>>>>>> 2. The tag collection may be too large, resulting in the >>>> JobManager >>>>>>>> OOM, >>>>>>>>> do >>>>>>>>>> we need to provide a management class that supports some >>>>>> obsolescence >>>>>>>>>> strategies instead of a direct Collection? >>>>>>>>>> >>>>>>>>>> 3. Is it possible to provide a more complex data structure >>>> than a >>>>>>>> simple >>>>>>>>>> string collection for tags in listeners, such as key-value? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Shammon FY >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu < >>> xbjt...@gmail.com> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi,Panagiotis >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thank you for kicking off this discussion. Overall, the >>>> proposed >>>>>>>>> feature of >>>>>>>>>>> this FLIP makes sense to me. We have also discussed >> similar >>>>>>>>> requirements >>>>>>>>>>> with our users and developers, and I believe it will help >>>> many >>>>>>> users. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> In terms of FLIP content, I have some thoughts: >>>>>>>>>>> >>>>>>>>>>> (1) For the FailureListenerContextget interface, the >>> methods >>>>>>>>>>> FailureListenerContext#addTag and >>>> FailureListenerContextgetTags >>>>>>> looks >>>>>>>>> very >>>>>>>>>>> inconsistent because they imply specific implementation >>>> details, >>>>>>> and >>>>>>>>> not >>>>>>>>>>> all FailureListeners need to handle them, we shouldn't >> put >>>> them >>>>>> in >>>>>>>> the >>>>>>>>>>> interface. Minor: The comment "UDF loading" in the >>>>>>>> getUserClassLoader() >>>>>>>>>>> method looks like a typo, IIUC it should return the >>>> classloader >>>>>> of >>>>>>>> the >>>>>>>>>>> current job. >>>>>>>>>>> >>>>>>>>>>> (2) Regarding the implementation in >>>>>>>>> ExecutionFailureHandler#handleFailure, >>>>>>>>>>> some custom listeners may have heavy IO operations, such >> as >>>>>>> reporting >>>>>>>>> to >>>>>>>>>>> their monitoring system. The current logic appears to be >>>>>> processing >>>>>>>> in >>>>>>>>> the >>>>>>>>>>> JobMaster's main thread, and it is recommended not to do >>> this >>>>>> kind >>>>>>> of >>>>>>>>>>> processing in the main thread. >>>>>>>>>>> >>>>>>>>>>> (3) The results of FailureListener's processing and the >>>>>>>>>>> FailureHandlingResult returned by ExecutionFailureHandler >>>> are not >>>>>>>>> related. >>>>>>>>>>> I think these two are closely related, the motivation of >>> this >>>>>> FLIP >>>>>>> is >>>>>>>>> to >>>>>>>>>>> make current failure handling more flexible. From this >>>>>> perspective, >>>>>>>>>>> different listeners should have the opportunity to affect >>> the >>>>>> job's >>>>>>>>> failure >>>>>>>>>>> handling flow. For example, a Flink job is configured >> with >>> a >>>>>>>>>>> RestartStrategy with huge numbers retry , but the Kafka >>>> topic of >>>>>>>>> Source has >>>>>>>>>>> been deleted, the job will failover continuously. In this >>>> case, >>>>>> the >>>>>>>>> user >>>>>>>>>>> should have their listener to determine whether this >>> failure >>>> is >>>>>>>>> recoverable >>>>>>>>>>> or unrecoverable, and then wrap the processing result >> into >>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass it to >>>> JobMaster, >>>>>>>> this >>>>>>>>>>> approach will be more flexible. >>>>>>>>>>> >>>>>>>>>>> (4) All FLIPs have an important section named Public >>>> Interfaces. >>>>>>>>> Current >>>>>>>>>>> FLIP mixes the interface section and the implementation >>>> section >>>>>>>>> together. >>>>>>>>>>> It is better for us to refer to the FLIP template[1] and >>>> separate >>>>>>>> them, >>>>>>>>>>> this will make the entire FLIP clearer. >>>>>>>>>>> >>>>>>>>>>> In addition, regarding the FLIP process, there is a small >>>>>>> suggestion: >>>>>>>>> The >>>>>>>>>>> community generally creates a JIRA issue after the FLIP >>> vote >>>> is >>>>>>>> passed, >>>>>>>>>>> instead of during the FLIP preparation phase because the >>>> FLIP may >>>>>>> be >>>>>>>>>>> rejected. Although this FLIP is very reasonable, it's >>> better >>>> to >>>>>>>> follow >>>>>>>>> the >>>>>>>>>>> process. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Leonard >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek < >>>> d...@apache.org> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> however listeners can use previous state >> (tags/labels) >>> to >>>>>> make >>>>>>>>>>> decisions >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That sounds like a very fragile contract. We should >>> either >>>>>> allow >>>>>>>>> passing >>>>>>>>>>>> tags between listeners and then need to define ordering >>> or >>>> make >>>>>>> all >>>>>>>>> of >>>>>>>>>>> them >>>>>>>>>>>> independent. I prefer the latter because it allows us >> to >>>>>>>> parallelize >>>>>>>>>>> things >>>>>>>>>>>> if needed (if all listeners trigger an RCP to the >>> external >>>>>>> system, >>>>>>>>> for >>>>>>>>>>>> example). >>>>>>>>>>>> >>>>>>>>>>>> Can you expand on why we need more than one classifier >> to >>>> be >>>>>> able >>>>>>>> to >>>>>>>>>>> output >>>>>>>>>>>> the same tag? >>>>>>>>>>>> >>>>>>>>>>>> system ones come first and then the ones loaded from >> the >>>> plugin >>>>>>>>> manager >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Since they're returned as a Set, the order is >> completely >>>>>>>>>>> non-deterministic, >>>>>>>>>>>> no matter in which order they're loaded. >>>>>>>>>>>> >>>>>>>>>>>> just communicating with external monitoring/alerting >>>> systems >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That makes the need for pushing things out of the main >>>> thread >>>>>>> even >>>>>>>>>>>> stronger. This almost sounds like we need to return a >>>>>>>>> CompletableFuture >>>>>>>>>>> for >>>>>>>>>>>> the per-throwable classification because an external >>> system >>>>>> might >>>>>>>>> take a >>>>>>>>>>>> significant time to respond. We need to unblock the >> main >>>> thread >>>>>>> for >>>>>>>>> other >>>>>>>>>>>> RPCs. >>>>>>>>>>>> >>>>>>>>>>>> Also, in the proposal, this happens in the failure >>>> handler. If >>>>>>>>> that's the >>>>>>>>>>>> case, this might block the job from being restarted (if >>> the >>>>>>> restart >>>>>>>>>>>> strategy allows for another restart), which would be >>> great >>>> to >>>>>>> avoid >>>>>>>>>>> because >>>>>>>>>>>> it can introduce extra downtime. >>>>>>>>>>>> >>>>>>>>>>>> This raises another question: what should happen if the >>>>>>>>> classification >>>>>>>>>>>> fails? Crashing the job (which is what's currently >>>> proposed) >>>>>>> seems >>>>>>>>> very >>>>>>>>>>>> dangerous if this might depend on an external system. >>>>>>>>>>>> >>>>>>>>>>>> Thats a valid point, passing the JobGraph containing >> all >>>> the >>>>>>> above >>>>>>>>>>>>> information is also something to consider >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> We should avoid passing JG around because it's mutable >>>> (which >>>>>> we >>>>>>>>> must fix >>>>>>>>>>>> in the long term), and letting users change it might >> have >>>>>>>>> consequences. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> D. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis >> < >>>>>>>>>>> pga...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hey David, Shammon, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the valuable comments! >>>>>>>>>>>>> I am glad you find this proposal useful, some >> thoughts: >>>>>>>>>>>>> >>>>>>>>>>>>> @Shammon >>>>>>>>>>>>> >>>>>>>>>>>>> 1. How about adding more job information in >>>>>>>>> FailureListenerContext? For >>>>>>>>>>>>>> example, job vertext, subtask, taskmanager >> location. >>>> And >>>>>> then >>>>>>>>> user >>>>>>>>>>> can >>>>>>>>>>>> do >>>>>>>>>>>>>> more statistics according to different dimensions. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing >>> all >>>> the >>>>>>>> above >>>>>>>>>>>>> information >>>>>>>>>>>>> is also something to consider, I was mostly trying to >>> be >>>>>>>>> conservative: >>>>>>>>>>>>> i.e., passingly only the information we need, and >>> extend >>>> as >>>>>> we >>>>>>>> see >>>>>>>>> fit >>>>>>>>>>>>> >>>>>>>>>>>>> 2. Users may want to save results in listener, and >> then >>>> they >>>>>>> can >>>>>>>>> get >>>>>>>>>>> the >>>>>>>>>>>>>> historical results even jabmanager failover. Can we >>>>>> provide a >>>>>>>>> unified >>>>>>>>>>>>>> implementation for data storage requirements? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> The idea is to store only the output of the Listeners >>>> (tags) >>>>>>> and >>>>>>>>> treat >>>>>>>>>>>> them >>>>>>>>>>>>> as stateless. >>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and >> will >>> be >>>>>>>> available >>>>>>>>>>>> through >>>>>>>>>>>>> the HistoryServer >>>>>>>>>>>>> even after a JM dies. >>>>>>>>>>>>> >>>>>>>>>>>>> @David >>>>>>>>>>>>> >>>>>>>>>>>>> 1) Should we also consider adding labels? The >>>> combination of >>>>>>> tags >>>>>>>>> and >>>>>>>>>>>>>> labels seems to be what most systems offer; >>> sometimes, >>>> they >>>>>>>> offer >>>>>>>>>>>> labels >>>>>>>>>>>>>> only (key=value pairs) because tags can be >>> implemented >>>>>> using >>>>>>>>> those, >>>>>>>>>>> but >>>>>>>>>>>>> not >>>>>>>>>>>>>> the other way around. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Indeed changing tags to k:v labels could be more >>>> expressive, >>>>>> I >>>>>>>>> like it! >>>>>>>>>>>>> Let's see what others think. >>>>>>>>>>>>> >>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined >>> models >>>>>>>>> ("listeners") >>>>>>>>>>>> are >>>>>>>>>>>>>> going to be, it would be great to keep the >>>> interfaces/data >>>>>>>>> structures >>>>>>>>>>>>>> immutable so we can push things over to the I/O >>>> threads. >>>>>>> Also, >>>>>>>> it >>>>>>>>>>>> sounds >>>>>>>>>>>>>> off to call the main interface a Listener since >> it's >>>>>> supposed >>>>>>>> to >>>>>>>>>>>> enhance >>>>>>>>>>>>>> the original throwable with additional metadata. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> The idea was for the name to be generic as there >> could >>> be >>>>>>>> Listener >>>>>>>>>>>>> implementations >>>>>>>>>>>>> just communicating with external monitoring/alerting >>>> systems >>>>>>> and >>>>>>>> no >>>>>>>>>>>>> metadata output >>>>>>>>>>>>> -- but lets rethink that. For immutability, see >> below: >>>>>>>>>>>>> >>>>>>>>>>>>> 3) You're proposing to support a set of listeners. >>> Since >>>>>> you're >>>>>>>>> passing >>>>>>>>>>>> the >>>>>>>>>>>>>> mutable context around, which includes tags set by >>> the >>>>>>> previous >>>>>>>>>>>> listener, >>>>>>>>>>>>>> do you expect users to make any assumptions about >> the >>>> order >>>>>>> in >>>>>>>>> which >>>>>>>>>>>>>> listeners are executed? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> In the existing proposal we are not making any >>>> assumptions >>>>>>> about >>>>>>>>> the >>>>>>>>>>>> order >>>>>>>>>>>>> of listeners, >>>>>>>>>>>>> (system ones come first and then the ones loaded from >>> the >>>>>>> plugin >>>>>>>>>>> manager) >>>>>>>>>>>>> however listeners can use previous state >> (tags/labels) >>> to >>>>>> make >>>>>>>>>>> decisions: >>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we have >>>> already >>>>>>> seen >>>>>>>>> *USER >>>>>>>>>>>> *or >>>>>>>>>>>>> the other way around -- when we have seen *UNKNOWN* >>>> remove in >>>>>>>>> favor of >>>>>>>>>>>>> *USER* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Panagiotis >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek < >>>>>>> d...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is an excellent proposal and something >> everyone >>>> trying >>>>>>> to >>>>>>>>>>> provide >>>>>>>>>>>>>> "Flink as a service" needs to solve at some point. >> I >>>> have a >>>>>>>>> couple of >>>>>>>>>>>>>> questions: >>>>>>>>>>>>>> >>>>>>>>>>>>>> If I understand the proposal correctly, this is >> just >>>> about >>>>>>>> adding >>>>>>>>>>> tags >>>>>>>>>>>> to >>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable, >>>>>>> FailureContext) >>>>>>>>>>>> through a >>>>>>>>>>>>>> user-defined model. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1) Should we also consider adding labels? The >>>> combination >>>>>> of >>>>>>>>> tags and >>>>>>>>>>>>>> labels seems to be what most systems offer; >>> sometimes, >>>> they >>>>>>>> offer >>>>>>>>>>>> labels >>>>>>>>>>>>>> only (key=value pairs) because tags can be >>> implemented >>>>>> using >>>>>>>>> those, >>>>>>>>>>> but >>>>>>>>>>>>> not >>>>>>>>>>>>>> the other way around. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined >>>> models >>>>>>>>>>> ("listeners") >>>>>>>>>>>>> are >>>>>>>>>>>>>> going to be, it would be great to keep the >>>> interfaces/data >>>>>>>>> structures >>>>>>>>>>>>>> immutable so we can push things over to the I/O >>>> threads. >>>>>>> Also, >>>>>>>> it >>>>>>>>>>>> sounds >>>>>>>>>>>>>> off to call the main interface a Listener since >> it's >>>>>> supposed >>>>>>>> to >>>>>>>>>>>> enhance >>>>>>>>>>>>>> the original throwable with additional metadata. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'd propose something along the lines of (we should >>>> have >>>>>>> better >>>>>>>>>>> names, >>>>>>>>>>>>> this >>>>>>>>>>>>>> is just to outline the idea): >>>>>>>>>>>>>> >>>>>>>>>>>>>> interface FailureEnricher { >>>>>>>>>>>>>> >>>>>>>>>>>>>> ThrowableWithTagsAndLabels >> enrichFailure(Throwable >>>> cause, >>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable >>> context); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> The names should change; this is just to outline >> the >>>> idea. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3) You're proposing to support a set of listeners. >>>> Since >>>>>>> you're >>>>>>>>>>> passing >>>>>>>>>>>>> the >>>>>>>>>>>>>> mutable context around, which includes tags set by >>> the >>>>>>> previous >>>>>>>>>>>> listener, >>>>>>>>>>>>>> do you expect users to make any assumptions about >> the >>>> order >>>>>>> in >>>>>>>>> which >>>>>>>>>>>>>> listeners are executed? >>>>>>>>>>>>>> >>>>>>>>>>>>>> *@Shammon* >>>>>>>>>>>>>> >>>>>>>>>>>>>> Users may want to save results in listener, and >> then >>>> they >>>>>> can >>>>>>>>> get the >>>>>>>>>>>>>>> historical results even jabmanager failover. Can >> we >>>>>>> provide a >>>>>>>>>>> unified >>>>>>>>>>>>>>> implementation for data storage requirements? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think we should explicitly state that all >>>> "listeners" are >>>>>>>>> treated >>>>>>>>>>> as >>>>>>>>>>>>>> stateless. I don't see any strong reason for >>>> snapshotting >>>>>>> them. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> D. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY < >>>>>>> zjur...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Panagiotis >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you for starting this discussion. I think >>> this >>>> FLIP >>>>>>> is >>>>>>>>>>> valuable >>>>>>>>>>>>> and >>>>>>>>>>>>>>> can help user to analyze the causes of job >> failover >>>>>> better! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have two comments as follows >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. How about adding more job information in >>>>>>>>> FailureListenerContext? >>>>>>>>>>>> For >>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager >>> location. >>>> And >>>>>>> then >>>>>>>>> user >>>>>>>>>>>> can >>>>>>>>>>>>> do >>>>>>>>>>>>>>> more statistics according to different >> dimensions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2. Users may want to save results in listener, >> and >>>> then >>>>>>> they >>>>>>>>> can >>>>>>>>>>> get >>>>>>>>>>>>> the >>>>>>>>>>>>>>> historical results even jabmanager failover. Can >> we >>>>>>> provide a >>>>>>>>>>> unified >>>>>>>>>>>>>>> implementation for data storage requirements? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> shammon FY >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis >>> Garefalakis < >>>>>>>>>>>> pga...@apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface >> for >>>>>> failure >>>>>>>>> handling >>>>>>>>>>>>>>> allowing >>>>>>>>>>>>>>>> users to implement custom failure logic using >> the >>>>>> plugin >>>>>>>>>>> framework. >>>>>>>>>>>>>>>> Motivated by existing proposals [2] and tickets >>>> [3], >>>>>> this >>>>>>>>> enables >>>>>>>>>>>>>>> use-cases >>>>>>>>>>>>>>>> like: assigning particular types to failures >>> (e.g., >>>>>> User >>>>>>> or >>>>>>>>>>>> System), >>>>>>>>>>>>>>>> emitting custom metrics per type (e.g., >>>> application or >>>>>>>>> platform), >>>>>>>>>>>>> even >>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g., >>>>>>> notification >>>>>>>>>>>> systems). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial >> reviews >>>> and >>>>>>>>>>> discussions! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For anyone interested, the starting point would >>> be >>>> the >>>>>>> FLIP >>>>>>>>> [1] >>>>>>>>>>>> that >>>>>>>>>>>>> I >>>>>>>>>>>>>>>> created, >>>>>>>>>>>>>>>> describing the motivation and the proposed >>> changes >>>>>> (part >>>>>>> of >>>>>>>>> the >>>>>>>>>>>> core, >>>>>>>>>>>>>>>> runtime and web). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The intuition behind this FLIP is being able to >>>> execute >>>>>>>>> custom >>>>>>>>>>>> logic >>>>>>>>>>>>> on >>>>>>>>>>>>>>>> failures by exposing a FailureListener >> interface. >>>>>>>>> Implementation >>>>>>>>>>> by >>>>>>>>>>>>>> users >>>>>>>>>>>>>>>> can be simply loaded to the system as Jar >> files. >>>>>>>>> FailureListeners >>>>>>>>>>>> may >>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>> decide to assign failure tags to errors >>> (expressed >>>> as >>>>>>>>> strings), >>>>>>>>>>>>>>>> that will then be exposed as metadata by the >>>> UI/Rest >>>>>>>>> interfaces. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Feedback is always appreciated! Looking forward >>> to >>>> your >>>>>>>>> thoughts! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304% >>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink >>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>> >>>>>>>> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67 >>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4 >>>>>>>>>>>>>>>> [3] >>>> https://issues.apache.org/jira/browse/FLINK-20833 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Panagiotis >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >>