Hi everyone,
Thanks for the valuable comments! Excited to see this is an area of interest for the community! Summarizing some of the main points raised along with my thoughts: - Labels (Key/Value) pairs are more expressive than Tags (Strings) so using the former is a good idea — I am also debating if we want to return multiple KV pairs per Listener (one could argue that we could split the logic in multiple Listeners to support that) - An immutable context along with data returned using the interface method implementations is a better approach than a mutable Collection - Listener execution should be independent — however we need a way to enforce a Label key/key-prefix is only assigned to a single Listener, thinking of a validation step both at Listener init and runtime stages - We want to perform async Listener operations as sync could block the main thread — exposing an ioExecutor pool through the context could be an elegant solution here - Make sure Listener errors are not failing jobs — make sure to log and keep the job alive - We need better naming / public interface separation/description - Even though custom restart strategies share some properties with Listeners, they would probably need a separate interface with a different return type anyway (restart strategy not labels) and in general they are different and complex enough to justify their own FLIP (that can also be a follow-up). What do people think? I am planning to modify the FLIP to reflect these changes if they make sense to everyone. Cheers, Panagiotis On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh <hlteo...@gmail.com> wrote: > Hi all, > > Thank you Panagiotis for proposing this. From the size of the thread, this > is a much needed feature in Flink! > Some thoughts, to extend those already adeptly summarised by Piotr, > Matthias and Jing. > > - scope of FLIP: +1 to scoping this FLIP to observability around a > restart. That would include adding metadata + exposing metadata to external > systems. IMO, introducing a new restart strategy solves different problems, > is much larger scope and should be covered in a separate FLIP. > > - failure handling: At the moment, we propose transitioning the Flink job > to a terminal FAILED state when JobListener fails, when the job could have > transitioned to RESTARTING->RUNNING. If we are keeping in line with the > scope to add metadata/observability around job restarts, we should not be > affecting the running of the Flink job itself. Could I propose we instead > log WARN/ERROR. > > - immutable context: +1 to keeping the contract clear via return types. > - async operation: +1 to adding ioexecutor to context, however, given we > don’t want to block the actual job restart on adding metadata / calling > external services, should we consider returning and letting futures > complete independently? > > - independent vs ordered execution: Should we consider making the order of > execution deterministic (use a List instead of Set)? > > > Once again, thank you for working on this. > > Regards, > Hong > > > > On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID> wrote: > > > > Hi, > > > > Thanks Panagiotis for this FLIP and thanks for all valuable discussions. > > I'd like to share my two cents: > > > > - FailureListenerContext#addTag and FailureListenerContext#getTags. It > > seems that we have to call getTags() and then do remove activities if we > > want to delete any tags (according to the javadoc in the FLIP). It is > > inconsistent for me too. Either offer addTag(), deleteTag(), and let > > getTags() return immutable collection, or offer getTags() only to return > > mutable collection. > > > > - label vs tag. Label is a great idea +1. AFAIC, tag could be a special > > case of label, i.e. key="tag". It is convenient to offer the xxxTag() > > method if the user only needs one label. I would love to have both of > them. > > Another thought is that tag implicitly contains the meaning of > "immutable". > > > > - +1 for a separate FLIP of customized restart strategy. Attention should > > be taken to make sure it works well with Flink built-in restartStrategy > in > > order to have the single source of truth. > > > > - execution order. The default independent execution should be fine. > > According to the FailureListener interface definition in the FLIP, users > > should be able to easily build a listener chain[1] to offer sequential > > execution, e.g. public FailureListener(FailureListener nextListener). > > Another option is to modify the interface or provide another interface > > alongside the current one to extend the method to support ListenerChain, > > i.e. void onFailure(Throwable cause, FailureListenerContext context, > > ListenerChain listenerChain). Users can also mix them up. > > > > - naming. Afaiu, the pluggable extension is not limited to failure > > enrichment. Conceptually it can do everything for the given failure, e.g. > > start counting metric as the FLIP described, calling an external system, > > sending notification to slack channel, etc. you name it. It sounds to me > > more like a FailureActionListener - it can trigger actions based on > > failure. Failure enrichment is one type of action. > > > > Best regards, > > Jing > > > > [1] https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern > > > > On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl > > <matthias.p...@aiven.io.invalid> wrote: > > > >> Thanks for the proposal, Panagiotis. A lot of good points have been > already > >> shared. I just want to add my view on some of the items: > >> > >> - independent execution vs ordered execution: I prefer the listeners > being > >> processed independently from each other because it adds less complexity > >> code-wise. The use case Piotr described (where you want to reuse some > other > >> classifier) is the only one I can think of where we actually need > >> classifiers depending on each other. Supporting such a use case right > from > >> the start feels a bit over-engineered and could be covered in a > follow-up > >> FLIP if we really come to that point where such a feature is requested > by > >> users. > >> > >> - key/value pairs instead of plain labels: I think that's a good idea. > >> key/value pairs are more expressive. +1 > >> > >> - extending the FLIP to cover restart strategy: I understand Gen's > concern > >> about introducing too many different types of plugins. But I would still > >> favor not extending the FLIP in this regard. A pluggable restart > strategy > >> sounds reasonable. But an error classifier and a restart strategy are > still > >> different enough to justify separate plugins, IMHO. And therefore, I > would > >> think that covering the restart strategy in a separate FLIP is the > better > >> option for the sake of simplicity. > >> > >> - immutable context: Passing in an immutable context and returning data > >> through the interface method's return value sounds like a better > approach > >> to harden the contract of the interface. +1 for that proposal > >> > >> - async operation: I think David is right. An async interface makes the > >> listener implementations more robust when it comes to heavy IO > operations. > >> The ioExecutor can be passed through the context object. +1 > >> > >> Matthias > >> > >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek <david.mora...@gmail.com> > >> wrote: > >> > >>> *@Piotr* > >>> > >>> > >>>> I was thinking about actually defining the order of the > >>>> classifiers/handlers and not allowing them to be asynchronous. > >>>> Asynchronousity would create some problems: when to actually return > the > >>>> error to the user? After all async responses will get back? Before, > but > >>>> without classified exception? It would also add implementation > >> complexity > >>>> and I think we can always expand the API with async version in the > >> future > >>>> if needed. > >>> > >>> > >>> As long as the classifiers need to talk to an external system, we by > >>> definition need to allow them to be asynchronous to unblock the main > >> thread > >>> for handling other RPCs. Exposing ioExecutor via the context proposed > >> above > >>> would be great. > >>> > >>> After all async responses will get back > >>> > >>> > >>> This would be the same if we trigger them synchronously one by one, > with > >> a > >>> caveat that synchronous execution might take significantly longer and > >>> introduce unnecessary downtime to a job. > >>> > >>> D. > >>> > >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com> wrote: > >>> > >>>> Hi Piotr, > >>>> > >>>> It's fine to me to have a separate FLIP to extend this > >> `FailureListener` > >>>> to support custom restart strategy. > >>>> > >>>> What I was a bit concerned is that if we just treat the > >> `FailureListener` > >>>> as an error classifier which is not crucial to Flink framework > process, > >>>> we may design it to run asynchronously and not trigger Flink failures. > >>>> This may be a blocker if later we want to enable it to support custom > >>>> restart strategy. > >>>> > >>>> Thanks, > >>>> Zhu > >>>> > >>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道: > >>>>> > >>>>> Hi Panagiotis, > >>>>> > >>>>> Thanks for the proposal. This is a very valuable feature and will be > >> a > >>>> good > >>>>> add-on for Flink. > >>>>> > >>>>> I also think that it will be great if we can consider how to make it > >>>>> possible for users to customize the failure handling in this FLIP. > >> It's > >>>>> highly related to the problem we want to address in this FLIP and > >> could > >>>>> avoid refactoring the interfaces proposed in this FLIP too quickly. > >>>>> > >>>>> Currently it treats all kinds of exceptions the same. However, some > >>> kinds > >>>>> of exceptions are actually not recoverable at all. It could let users > >>> to > >>>>> customize the failure handling logic to fail fast for certain known > >>>>> unrecoverable exceptions and finally make these kinds of jobs get > >>> noticed > >>>>> and recoveried more quickly. > >>>>> > >>>>> Regards, > >>>>> Dian > >>>>> > >>>>> > >>>>> > >>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo <luogen...@gmail.com> wrote: > >>>>> > >>>>>> Hi Panagiotis, > >>>>>> > >>>>>> Thanks for the proposal. > >>>>>> > >>>>>> It's useful to enrich the information so that users can be more > >>>>>> clear why the job is failing, especially platform developers who > >>>>>> need to provide the information to their end users. > >>>>>> And for the very FLIP, I'd prefer the naming `FailureEnricher` > >>>>>> proposed by David, as the plugin doesn't really handle the failure. > >>>>>> > >>>>>> However, like Zhu and Lijie said, I also joined a discussion > >>>>>> recently about customized failure handling, e.g. counting the > >>>>>> failure rate of pipeline regions separately, and failing the job > >>>>>> when a specific error occurs, and so on. > >>>>>> I suppose a custom restart strategy, or I'd call it a custom > >>>>>> failure "handler", is indeed necessary. It can also enrich the > >>>>>> information as the current proposed handler does. > >>>>>> > >>>>>> To avoid adding too many plugin interfaces which may confuse users > >>>>>> and make the ExecutionFailureHandler more complex, > >>>>>> I think it'd be better to consider the requirements at the same > >> time. > >>>>>> > >>>>>> IMO, we can add a handler interface, then make the current restart > >>>>>> strategy and the enricher both types of the handler. The handlers > >>>>>> execute in sequence, and the failure is considered unrecoverable if > >>>>>> any of the handlers decides. > >>>>>> In this way, users can also implement a handler using the enriched > >>>>>> information provided by the previous handlers, e.g. fail the job > >> and > >>>>>> send a notification if too many failures are caused by the end > >> users. > >>>>>> > >>>>>> Best, > >>>>>> Gen > >>>>>> > >>>>>> > >>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu <huweihua....@gmail.com > >>> > >>>> wrote: > >>>>>> > >>>>>>> Hi Panagiotis, > >>>>>>> > >>>>>>> Thanks for your proposal. It is valuable to analyze the reason > >> for > >>>>>>> failure with the user plug-in. > >>>>>>> > >>>>>>> Making the context immutable could make the contract stronger. > >>>>>>> Letting the listener return an enriching result may be a better > >>> way. > >>>>>>> > >>>>>>> IIUC, listeners could do two things, enrich more information > >>>>>> (tags/labels) > >>>>>>> to FailureHandlingResult, and push data out of Flink (metrics or > >>>>>>> something). > >>>>>>> IMO, we could split these two types into Listener and Advisor > >>> (maybe > >>>>>>> other names). The Listener just pushes the data out and returns > >>>> nothing > >>>>>> to > >>>>>>> Flink, so we can run these async and don't have to wait for > >>>> Listener's > >>>>>>> result. > >>>>>>> The Advisor returns rich information to the FailureHadingResult, > >>>> and it > >>>>>>> should > >>>>>>> have a lighter logic. > >>>>>>> > >>>>>>> > >>>>>>> Supporting a custom restart strategy is also valuable. In this > >>>> design, we > >>>>>>> use > >>>>>>> RestartStrategy to construct a FailureHandingResult, and then > >> pass > >>>> it to > >>>>>>> Listener. > >>>>>>> My question is, should we change the restart strategy interface > >> to > >>>>>> support > >>>>>>> the > >>>>>>> custom restart strategy, or keep the current restart strategy and > >>>> let the > >>>>>>> later > >>>>>>> Listener enrich the restartable information to > >>> FailureHandingResult? > >>>> The > >>>>>>> latter > >>>>>>> may cause some confusion when we use a custom restart strategy. > >>>>>>> The default flink restart strategy also runs but does not take > >>>> effect. > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Weihua > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang < > >>>> wangdachui9...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Panagiotis, > >>>>>>>> > >>>>>>>> Thanks for driving this. > >>>>>>>> > >>>>>>>> +1 for supporting custom restart strategy, we did receive such > >>>> requests > >>>>>>>> from the user mailing list [1][2]. > >>>>>>>> > >>>>>>>> Besides, in current design, the plugin will only do some > >>>> statistical > >>>>>> and > >>>>>>>> classification work, and will not affect the > >>>> *FailureHandlingResult*. > >>>>>>> Just > >>>>>>>> listening, no handling, it doesn't quite match the title. > >>>>>>>> > >>>>>>>> [1] > >>>> https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1 > >>>>>>>> [2] > >>>> https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Lijie > >>>>>>>> > >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道: > >>>>>>>> > >>>>>>>>> Hi Panagiotis, > >>>>>>>>> > >>>>>>>>> Thanks for creating this proposal! It's good to enable Flink > >> to > >>>>>> handle > >>>>>>>>> different errors in different ways, through a pluggable way. > >>>>>>>>> > >>>>>>>>> There are requests for flexible restart strategies from time > >> to > >>>> time, > >>>>>>> for > >>>>>>>>> different strategies of restart backoff time, or to suppress > >>>>>> restarting > >>>>>>>>> on certain errors. Therefore, I think it's better that the > >>>> proposed > >>>>>>>>> failure handling plugin can also support custom restart > >>>> strategies. > >>>>>>>>> > >>>>>>>>> Maybe we can call it FailureHandlingAdvisor which provides > >> more > >>>>>>>>> information (labels) and gives advice (restart backoff time, > >>>> whether > >>>>>>>>> to restart)? I do not have a strong opinion though, any > >>>> explanatory > >>>>>>>>> name would be good. > >>>>>>>>> > >>>>>>>>> To avoid unexpected mutation, how about to make the context > >>>> immutable > >>>>>>>>> and let the plugin return an immutable result? i.e. remove > >> the > >>>>>> setters > >>>>>>>>> from the context, and let the plugin method return a result > >>> which > >>>>>>>>> contains `labels`, `canRestart` and `restartBackoffTime`. > >> Flink > >>>>>> should > >>>>>>>>> apply the result to the context before invoking the next > >>> plugin, > >>>> so > >>>>>>>>> that the next plugin will see the updated context. > >>>>>>>>> > >>>>>>>>> The plugin should avoid taking too much time to return the > >>>> result, > >>>>>>>> because > >>>>>>>>> it will block the RPC and result in instability. However, it > >>> can > >>>>>> still > >>>>>>>>> perform heavy actions in a different thread. The context can > >>>> provide > >>>>>> an > >>>>>>>>> `ioExecutor` to the plugins for reuse. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Zhu > >>>>>>>>> > >>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道: > >>>>>>>>>> > >>>>>>>>>> Hi Panagiotis > >>>>>>>>>> > >>>>>>>>>> Thank you for your answer. I agree that `FailureListener` > >>>> could be > >>>>>>>>>> stateless, then I have some thoughts as follows > >>>>>>>>>> > >>>>>>>>>> 1. I see that listeners and tag collections are associated. > >>>> When > >>>>>>>>> JobManager > >>>>>>>>>> fails and restarts, how can the new listener be associated > >>>> with the > >>>>>>> tag > >>>>>>>>>> collection before failover? Is the listener loading order? > >>>>>>>>>> > >>>>>>>>>> 2. The tag collection may be too large, resulting in the > >>>> JobManager > >>>>>>>> OOM, > >>>>>>>>> do > >>>>>>>>>> we need to provide a management class that supports some > >>>>>> obsolescence > >>>>>>>>>> strategies instead of a direct Collection? > >>>>>>>>>> > >>>>>>>>>> 3. Is it possible to provide a more complex data structure > >>>> than a > >>>>>>>> simple > >>>>>>>>>> string collection for tags in listeners, such as key-value? > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Shammon FY > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu < > >>> xbjt...@gmail.com> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi,Panagiotis > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thank you for kicking off this discussion. Overall, the > >>>> proposed > >>>>>>>>> feature of > >>>>>>>>>>> this FLIP makes sense to me. We have also discussed > >> similar > >>>>>>>>> requirements > >>>>>>>>>>> with our users and developers, and I believe it will help > >>>> many > >>>>>>> users. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> In terms of FLIP content, I have some thoughts: > >>>>>>>>>>> > >>>>>>>>>>> (1) For the FailureListenerContextget interface, the > >>> methods > >>>>>>>>>>> FailureListenerContext#addTag and > >>>> FailureListenerContextgetTags > >>>>>>> looks > >>>>>>>>> very > >>>>>>>>>>> inconsistent because they imply specific implementation > >>>> details, > >>>>>>> and > >>>>>>>>> not > >>>>>>>>>>> all FailureListeners need to handle them, we shouldn't > >> put > >>>> them > >>>>>> in > >>>>>>>> the > >>>>>>>>>>> interface. Minor: The comment "UDF loading" in the > >>>>>>>> getUserClassLoader() > >>>>>>>>>>> method looks like a typo, IIUC it should return the > >>>> classloader > >>>>>> of > >>>>>>>> the > >>>>>>>>>>> current job. > >>>>>>>>>>> > >>>>>>>>>>> (2) Regarding the implementation in > >>>>>>>>> ExecutionFailureHandler#handleFailure, > >>>>>>>>>>> some custom listeners may have heavy IO operations, such > >> as > >>>>>>> reporting > >>>>>>>>> to > >>>>>>>>>>> their monitoring system. The current logic appears to be > >>>>>> processing > >>>>>>>> in > >>>>>>>>> the > >>>>>>>>>>> JobMaster's main thread, and it is recommended not to do > >>> this > >>>>>> kind > >>>>>>> of > >>>>>>>>>>> processing in the main thread. > >>>>>>>>>>> > >>>>>>>>>>> (3) The results of FailureListener's processing and the > >>>>>>>>>>> FailureHandlingResult returned by ExecutionFailureHandler > >>>> are not > >>>>>>>>> related. > >>>>>>>>>>> I think these two are closely related, the motivation of > >>> this > >>>>>> FLIP > >>>>>>> is > >>>>>>>>> to > >>>>>>>>>>> make current failure handling more flexible. From this > >>>>>> perspective, > >>>>>>>>>>> different listeners should have the opportunity to affect > >>> the > >>>>>> job's > >>>>>>>>> failure > >>>>>>>>>>> handling flow. For example, a Flink job is configured > >> with > >>> a > >>>>>>>>>>> RestartStrategy with huge numbers retry , but the Kafka > >>>> topic of > >>>>>>>>> Source has > >>>>>>>>>>> been deleted, the job will failover continuously. In this > >>>> case, > >>>>>> the > >>>>>>>>> user > >>>>>>>>>>> should have their listener to determine whether this > >>> failure > >>>> is > >>>>>>>>> recoverable > >>>>>>>>>>> or unrecoverable, and then wrap the processing result > >> into > >>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass it to > >>>> JobMaster, > >>>>>>>> this > >>>>>>>>>>> approach will be more flexible. > >>>>>>>>>>> > >>>>>>>>>>> (4) All FLIPs have an important section named Public > >>>> Interfaces. > >>>>>>>>> Current > >>>>>>>>>>> FLIP mixes the interface section and the implementation > >>>> section > >>>>>>>>> together. > >>>>>>>>>>> It is better for us to refer to the FLIP template[1] and > >>>> separate > >>>>>>>> them, > >>>>>>>>>>> this will make the entire FLIP clearer. > >>>>>>>>>>> > >>>>>>>>>>> In addition, regarding the FLIP process, there is a small > >>>>>>> suggestion: > >>>>>>>>> The > >>>>>>>>>>> community generally creates a JIRA issue after the FLIP > >>> vote > >>>> is > >>>>>>>> passed, > >>>>>>>>>>> instead of during the FLIP preparation phase because the > >>>> FLIP may > >>>>>>> be > >>>>>>>>>>> rejected. Although this FLIP is very reasonable, it's > >>> better > >>>> to > >>>>>>>> follow > >>>>>>>>> the > >>>>>>>>>>> process. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> > >>>>>>>>>>> Leonard > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek < > >>>> d...@apache.org> > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> however listeners can use previous state > >> (tags/labels) > >>> to > >>>>>> make > >>>>>>>>>>> decisions > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> That sounds like a very fragile contract. We should > >>> either > >>>>>> allow > >>>>>>>>> passing > >>>>>>>>>>>> tags between listeners and then need to define ordering > >>> or > >>>> make > >>>>>>> all > >>>>>>>>> of > >>>>>>>>>>> them > >>>>>>>>>>>> independent. I prefer the latter because it allows us > >> to > >>>>>>>> parallelize > >>>>>>>>>>> things > >>>>>>>>>>>> if needed (if all listeners trigger an RCP to the > >>> external > >>>>>>> system, > >>>>>>>>> for > >>>>>>>>>>>> example). > >>>>>>>>>>>> > >>>>>>>>>>>> Can you expand on why we need more than one classifier > >> to > >>>> be > >>>>>> able > >>>>>>>> to > >>>>>>>>>>> output > >>>>>>>>>>>> the same tag? > >>>>>>>>>>>> > >>>>>>>>>>>> system ones come first and then the ones loaded from > >> the > >>>> plugin > >>>>>>>>> manager > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Since they're returned as a Set, the order is > >> completely > >>>>>>>>>>> non-deterministic, > >>>>>>>>>>>> no matter in which order they're loaded. > >>>>>>>>>>>> > >>>>>>>>>>>> just communicating with external monitoring/alerting > >>>> systems > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> That makes the need for pushing things out of the main > >>>> thread > >>>>>>> even > >>>>>>>>>>>> stronger. This almost sounds like we need to return a > >>>>>>>>> CompletableFuture > >>>>>>>>>>> for > >>>>>>>>>>>> the per-throwable classification because an external > >>> system > >>>>>> might > >>>>>>>>> take a > >>>>>>>>>>>> significant time to respond. We need to unblock the > >> main > >>>> thread > >>>>>>> for > >>>>>>>>> other > >>>>>>>>>>>> RPCs. > >>>>>>>>>>>> > >>>>>>>>>>>> Also, in the proposal, this happens in the failure > >>>> handler. If > >>>>>>>>> that's the > >>>>>>>>>>>> case, this might block the job from being restarted (if > >>> the > >>>>>>> restart > >>>>>>>>>>>> strategy allows for another restart), which would be > >>> great > >>>> to > >>>>>>> avoid > >>>>>>>>>>> because > >>>>>>>>>>>> it can introduce extra downtime. > >>>>>>>>>>>> > >>>>>>>>>>>> This raises another question: what should happen if the > >>>>>>>>> classification > >>>>>>>>>>>> fails? Crashing the job (which is what's currently > >>>> proposed) > >>>>>>> seems > >>>>>>>>> very > >>>>>>>>>>>> dangerous if this might depend on an external system. > >>>>>>>>>>>> > >>>>>>>>>>>> Thats a valid point, passing the JobGraph containing > >> all > >>>> the > >>>>>>> above > >>>>>>>>>>>>> information is also something to consider > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> We should avoid passing JG around because it's mutable > >>>> (which > >>>>>> we > >>>>>>>>> must fix > >>>>>>>>>>>> in the long term), and letting users change it might > >> have > >>>>>>>>> consequences. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> D. > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis > >> < > >>>>>>>>>>> pga...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hey David, Shammon, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the valuable comments! > >>>>>>>>>>>>> I am glad you find this proposal useful, some > >> thoughts: > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Shammon > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. How about adding more job information in > >>>>>>>>> FailureListenerContext? For > >>>>>>>>>>>>>> example, job vertext, subtask, taskmanager > >> location. > >>>> And > >>>>>> then > >>>>>>>>> user > >>>>>>>>>>> can > >>>>>>>>>>>> do > >>>>>>>>>>>>>> more statistics according to different dimensions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing > >>> all > >>>> the > >>>>>>>> above > >>>>>>>>>>>>> information > >>>>>>>>>>>>> is also something to consider, I was mostly trying to > >>> be > >>>>>>>>> conservative: > >>>>>>>>>>>>> i.e., passingly only the information we need, and > >>> extend > >>>> as > >>>>>> we > >>>>>>>> see > >>>>>>>>> fit > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. Users may want to save results in listener, and > >> then > >>>> they > >>>>>>> can > >>>>>>>>> get > >>>>>>>>>>> the > >>>>>>>>>>>>>> historical results even jabmanager failover. Can we > >>>>>> provide a > >>>>>>>>> unified > >>>>>>>>>>>>>> implementation for data storage requirements? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> The idea is to store only the output of the Listeners > >>>> (tags) > >>>>>>> and > >>>>>>>>> treat > >>>>>>>>>>>> them > >>>>>>>>>>>>> as stateless. > >>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and > >> will > >>> be > >>>>>>>> available > >>>>>>>>>>>> through > >>>>>>>>>>>>> the HistoryServer > >>>>>>>>>>>>> even after a JM dies. > >>>>>>>>>>>>> > >>>>>>>>>>>>> @David > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) Should we also consider adding labels? The > >>>> combination of > >>>>>>> tags > >>>>>>>>> and > >>>>>>>>>>>>>> labels seems to be what most systems offer; > >>> sometimes, > >>>> they > >>>>>>>> offer > >>>>>>>>>>>> labels > >>>>>>>>>>>>>> only (key=value pairs) because tags can be > >>> implemented > >>>>>> using > >>>>>>>>> those, > >>>>>>>>>>> but > >>>>>>>>>>>>> not > >>>>>>>>>>>>>> the other way around. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Indeed changing tags to k:v labels could be more > >>>> expressive, > >>>>>> I > >>>>>>>>> like it! > >>>>>>>>>>>>> Let's see what others think. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined > >>> models > >>>>>>>>> ("listeners") > >>>>>>>>>>>> are > >>>>>>>>>>>>>> going to be, it would be great to keep the > >>>> interfaces/data > >>>>>>>>> structures > >>>>>>>>>>>>>> immutable so we can push things over to the I/O > >>>> threads. > >>>>>>> Also, > >>>>>>>> it > >>>>>>>>>>>> sounds > >>>>>>>>>>>>>> off to call the main interface a Listener since > >> it's > >>>>>> supposed > >>>>>>>> to > >>>>>>>>>>>> enhance > >>>>>>>>>>>>>> the original throwable with additional metadata. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> The idea was for the name to be generic as there > >> could > >>> be > >>>>>>>> Listener > >>>>>>>>>>>>> implementations > >>>>>>>>>>>>> just communicating with external monitoring/alerting > >>>> systems > >>>>>>> and > >>>>>>>> no > >>>>>>>>>>>>> metadata output > >>>>>>>>>>>>> -- but lets rethink that. For immutability, see > >> below: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3) You're proposing to support a set of listeners. > >>> Since > >>>>>> you're > >>>>>>>>> passing > >>>>>>>>>>>> the > >>>>>>>>>>>>>> mutable context around, which includes tags set by > >>> the > >>>>>>> previous > >>>>>>>>>>>> listener, > >>>>>>>>>>>>>> do you expect users to make any assumptions about > >> the > >>>> order > >>>>>>> in > >>>>>>>>> which > >>>>>>>>>>>>>> listeners are executed? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> In the existing proposal we are not making any > >>>> assumptions > >>>>>>> about > >>>>>>>>> the > >>>>>>>>>>>> order > >>>>>>>>>>>>> of listeners, > >>>>>>>>>>>>> (system ones come first and then the ones loaded from > >>> the > >>>>>>> plugin > >>>>>>>>>>> manager) > >>>>>>>>>>>>> however listeners can use previous state > >> (tags/labels) > >>> to > >>>>>> make > >>>>>>>>>>> decisions: > >>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we have > >>>> already > >>>>>>> seen > >>>>>>>>> *USER > >>>>>>>>>>>> *or > >>>>>>>>>>>>> the other way around -- when we have seen *UNKNOWN* > >>>> remove in > >>>>>>>>> favor of > >>>>>>>>>>>>> *USER* > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> Panagiotis > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek < > >>>>>>> d...@apache.org> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Panagiotis, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This is an excellent proposal and something > >> everyone > >>>> trying > >>>>>>> to > >>>>>>>>>>> provide > >>>>>>>>>>>>>> "Flink as a service" needs to solve at some point. > >> I > >>>> have a > >>>>>>>>> couple of > >>>>>>>>>>>>>> questions: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> If I understand the proposal correctly, this is > >> just > >>>> about > >>>>>>>> adding > >>>>>>>>>>> tags > >>>>>>>>>>>> to > >>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable, > >>>>>>> FailureContext) > >>>>>>>>>>>> through a > >>>>>>>>>>>>>> user-defined model. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) Should we also consider adding labels? The > >>>> combination > >>>>>> of > >>>>>>>>> tags and > >>>>>>>>>>>>>> labels seems to be what most systems offer; > >>> sometimes, > >>>> they > >>>>>>>> offer > >>>>>>>>>>>> labels > >>>>>>>>>>>>>> only (key=value pairs) because tags can be > >>> implemented > >>>>>> using > >>>>>>>>> those, > >>>>>>>>>>> but > >>>>>>>>>>>>> not > >>>>>>>>>>>>>> the other way around. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined > >>>> models > >>>>>>>>>>> ("listeners") > >>>>>>>>>>>>> are > >>>>>>>>>>>>>> going to be, it would be great to keep the > >>>> interfaces/data > >>>>>>>>> structures > >>>>>>>>>>>>>> immutable so we can push things over to the I/O > >>>> threads. > >>>>>>> Also, > >>>>>>>> it > >>>>>>>>>>>> sounds > >>>>>>>>>>>>>> off to call the main interface a Listener since > >> it's > >>>>>> supposed > >>>>>>>> to > >>>>>>>>>>>> enhance > >>>>>>>>>>>>>> the original throwable with additional metadata. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'd propose something along the lines of (we should > >>>> have > >>>>>>> better > >>>>>>>>>>> names, > >>>>>>>>>>>>> this > >>>>>>>>>>>>>> is just to outline the idea): > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> interface FailureEnricher { > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> ThrowableWithTagsAndLabels > >> enrichFailure(Throwable > >>>> cause, > >>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable > >>> context); > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The names should change; this is just to outline > >> the > >>>> idea. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3) You're proposing to support a set of listeners. > >>>> Since > >>>>>>> you're > >>>>>>>>>>> passing > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> mutable context around, which includes tags set by > >>> the > >>>>>>> previous > >>>>>>>>>>>> listener, > >>>>>>>>>>>>>> do you expect users to make any assumptions about > >> the > >>>> order > >>>>>>> in > >>>>>>>>> which > >>>>>>>>>>>>>> listeners are executed? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *@Shammon* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Users may want to save results in listener, and > >> then > >>>> they > >>>>>> can > >>>>>>>>> get the > >>>>>>>>>>>>>>> historical results even jabmanager failover. Can > >> we > >>>>>>> provide a > >>>>>>>>>>> unified > >>>>>>>>>>>>>>> implementation for data storage requirements? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think we should explicitly state that all > >>>> "listeners" are > >>>>>>>>> treated > >>>>>>>>>>> as > >>>>>>>>>>>>>> stateless. I don't see any strong reason for > >>>> snapshotting > >>>>>>> them. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> D. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY < > >>>>>>> zjur...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Panagiotis > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thank you for starting this discussion. I think > >>> this > >>>> FLIP > >>>>>>> is > >>>>>>>>>>> valuable > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> can help user to analyze the causes of job > >> failover > >>>>>> better! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I have two comments as follows > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1. How about adding more job information in > >>>>>>>>> FailureListenerContext? > >>>>>>>>>>>> For > >>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager > >>> location. > >>>> And > >>>>>>> then > >>>>>>>>> user > >>>>>>>>>>>> can > >>>>>>>>>>>>> do > >>>>>>>>>>>>>>> more statistics according to different > >> dimensions. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2. Users may want to save results in listener, > >> and > >>>> then > >>>>>>> they > >>>>>>>>> can > >>>>>>>>>>> get > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> historical results even jabmanager failover. Can > >> we > >>>>>>> provide a > >>>>>>>>>>> unified > >>>>>>>>>>>>>>> implementation for data storage requirements? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> shammon FY > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis > >>> Garefalakis < > >>>>>>>>>>>> pga...@apache.org > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface > >> for > >>>>>> failure > >>>>>>>>> handling > >>>>>>>>>>>>>>> allowing > >>>>>>>>>>>>>>>> users to implement custom failure logic using > >> the > >>>>>> plugin > >>>>>>>>>>> framework. > >>>>>>>>>>>>>>>> Motivated by existing proposals [2] and tickets > >>>> [3], > >>>>>> this > >>>>>>>>> enables > >>>>>>>>>>>>>>> use-cases > >>>>>>>>>>>>>>>> like: assigning particular types to failures > >>> (e.g., > >>>>>> User > >>>>>>> or > >>>>>>>>>>>> System), > >>>>>>>>>>>>>>>> emitting custom metrics per type (e.g., > >>>> application or > >>>>>>>>> platform), > >>>>>>>>>>>>> even > >>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g., > >>>>>>> notification > >>>>>>>>>>>> systems). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial > >> reviews > >>>> and > >>>>>>>>>>> discussions! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> For anyone interested, the starting point would > >>> be > >>>> the > >>>>>>> FLIP > >>>>>>>>> [1] > >>>>>>>>>>>> that > >>>>>>>>>>>>> I > >>>>>>>>>>>>>>>> created, > >>>>>>>>>>>>>>>> describing the motivation and the proposed > >>> changes > >>>>>> (part > >>>>>>> of > >>>>>>>>> the > >>>>>>>>>>>> core, > >>>>>>>>>>>>>>>> runtime and web). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The intuition behind this FLIP is being able to > >>>> execute > >>>>>>>>> custom > >>>>>>>>>>>> logic > >>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> failures by exposing a FailureListener > >> interface. > >>>>>>>>> Implementation > >>>>>>>>>>> by > >>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>> can be simply loaded to the system as Jar > >> files. > >>>>>>>>> FailureListeners > >>>>>>>>>>>> may > >>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>> decide to assign failure tags to errors > >>> (expressed > >>>> as > >>>>>>>>> strings), > >>>>>>>>>>>>>>>> that will then be exposed as metadata by the > >>>> UI/Rest > >>>>>>>>> interfaces. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Feedback is always appreciated! Looking forward > >>> to > >>>> your > >>>>>>>>> thoughts! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304% > >>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink > >>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>> > >>>>>>>> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67 > >>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4 > >>>>>>>>>>>>>>>> [3] > >>>> https://issues.apache.org/jira/browse/FLINK-20833 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>> Panagiotis > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > >> > >