Hello again everyone, FLIP is now updated based on our discussion! In short, FLIP-304 [1] proposes the addition of a pluggable interface that will allow users to add custom logic and enrich failures with custom metadata labels. While as discussed, custom restart strategies will be part of a different effort. Every pluggable FaulireEnricher:
- Is triggered on every global/non-global failure - Receives a Throwable cause and an immutable Context - Performs asynchronous execution (separate IoExecutor) to avoid blocking the main thread for RPCs - Is completely independent from other Enrichers - Emits failure labels/tags for its unique, pre-defined keys (defined at startup time) Check the link for implementation details and please let me know what you think :) [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers Panagiotis On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu <reed...@gmail.com> wrote: > Hi Panagiotis, > > How about to introduce a config option to control which error handling > plugins should be used? It is more flexible for deployments. Additionally, > it can also enable users to explicitly specify the order that the plugins > take effects. > > Thanks, > Zhu > > Gen Luo <luogen...@gmail.com> 于2023年3月27日周一 15:02写道: > > > > Thanks for the summary! > > > > Also +1 to support custom restart strategies in a different FLIP, > > as long as we can make sure that the plugin interface won't be > > changed when the restart strategy interface is introduced. > > > > To achieve this, maybe we should think well how the handler > > would cooperate with the restart strategy, like would it executes b > > efore the strategy (e.g. some strategy may use the tag), or after > > it (e.g. some metric reporting handler may use the handling result). > > Though we can implement in one way, and extend if the other is > > really necessary by someone. > > > > Besides, instead of using either of the names, shall we just make > > them two subclasses named FailureEnricher and FailureListener? > > The former executes synchronously and can modify the context, > > while the latter executes asynchronously and has a read-only view > > of context. In this way we can make sure a handler behaves in > > the expected way. > > > > > > On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > +1 to support custom restart strategies in a different FLIP. > > > > > > It's fine to have a different plugin for custom restart strategy. > > > If so, since we do not treat the FLIP-304 plugin as a common failure > > > handler, but instead mainly targets to add labels to errors, I would > > > +1 for the name `FailureEnricher`. > > > > > > Thanks, > > > Zhu > > > > > > David Morávek <d...@apache.org> 于2023年3月23日周四 15:51写道: > > > > > > > > > > > > > > One additional remark on introducing it as an async operation: We > would > > > > > need a new configuration parameter to define the timeout for such a > > > > > listener call, wouldn't we? > > > > > > > > > > > > > This could be left up to the implementor to handle. > > > > > > > > What about adding an extra method getNamespace() to the Listener > > > interface > > > > > which returns an Optional<String>. > > > > > > > > > > > > > I'd avoid mixing an additional concept into this. We can simply have > a > > > new > > > > method that returns a set of keys the listener can output. We can > > > validate > > > > this at the JM startup time and fail fast (since it's a configuration > > > > error) if there is an overlap. If the listener outputs the key that > is > > > not > > > > allowed to, I wouldn't be afraid to call into a fatal error handler > since > > > > it's an invalid implementation. > > > > > > > > Best, > > > > D. > > > > > > > > On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl > > > > <matthias.p...@aiven.io.invalid> wrote: > > > > > > > > > Sounds good. Two points I want to add: > > > > > > > > > > - Listener execution should be independent — however we need a > way > > > to > > > > > > enforce a Label key/key-prefix is only assigned to a single > Listener, > > > > > > thinking of a validation step both at Listener init and runtime > > > stages > > > > > > > > > > > What about adding an extra method getNamespace() to the Listener > > > interface > > > > > which returns an Optional<String>. Therefore, the > implementation/the > > > user > > > > > can decide depending on the use case whether it's necessary to have > > > > > separate namespaces for the key/value pairs or not. On the Flink > side, > > > we > > > > > would just merge the different maps considering their namespaces. > > > > > > > > > > A flaw of this approach is that if a user decides to use the same > > > namespace > > > > > for multiple listeners, how is an error in one of the listeners > > > represented > > > > > in the outcome? We would have to overwrite either the successful > > > listener's > > > > > result or the failed ones. I wanted to share it, anyway. > > > > > > > > > > One additional remark on introducing it as an async operation: We > would > > > > > need a new configuration parameter to define the timeout for such a > > > > > listener call, wouldn't we? > > > > > > > > > > Matthias > > > > > > > > > > On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis < > > > pga...@apache.org> > > > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > Thanks for the valuable comments! > > > > > > Excited to see this is an area of interest for the community! > > > > > > > > > > > > Summarizing some of the main points raised along with my > thoughts: > > > > > > > > > > > > - Labels (Key/Value) pairs are more expressive than Tags > > > (Strings) so > > > > > > using the former is a good idea — I am also debating if we > want to > > > > > > return > > > > > > multiple KV pairs per Listener (one could argue that we could > > > split > > > > > the > > > > > > logic in multiple Listeners to support that) > > > > > > - An immutable context along with data returned using the > > > interface > > > > > > method implementations is a better approach than a mutable > > > Collection > > > > > > - Listener execution should be independent — however we need a > > > way to > > > > > > enforce a Label key/key-prefix is only assigned to a single > > > Listener, > > > > > > thinking of a validation step both at Listener init and > runtime > > > stages > > > > > > - We want to perform async Listener operations as sync could > > > block the > > > > > > main thread — exposing an ioExecutor pool through the context > > > could be > > > > > > an > > > > > > elegant solution here > > > > > > - Make sure Listener errors are not failing jobs — make sure > to > > > log > > > > > and > > > > > > keep the job alive > > > > > > - We need better naming / public interface > separation/description > > > > > > > > > > > > - Even though custom restart strategies share some > > > properties > > > > > with > > > > > > Listeners, they would probably need a separate interface with a > > > different > > > > > > return type anyway (restart strategy not labels) and in general > they > > > are > > > > > > different and complex enough to justify their own FLIP (that can > > > also be > > > > > a > > > > > > follow-up). > > > > > > > > > > > > > > > > > > What do people think? I am planning to modify the FLIP to reflect > > > these > > > > > > changes if they make sense to everyone. > > > > > > > > > > > > Cheers, > > > > > > Panagiotis > > > > > > > > > > > > On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh <hlteo...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > Thank you Panagiotis for proposing this. From the size of the > > > thread, > > > > > > this > > > > > > > is a much needed feature in Flink! > > > > > > > Some thoughts, to extend those already adeptly summarised by > Piotr, > > > > > > > Matthias and Jing. > > > > > > > > > > > > > > - scope of FLIP: +1 to scoping this FLIP to observability > around a > > > > > > > restart. That would include adding metadata + exposing > metadata to > > > > > > external > > > > > > > systems. IMO, introducing a new restart strategy solves > different > > > > > > problems, > > > > > > > is much larger scope and should be covered in a separate FLIP. > > > > > > > > > > > > > > - failure handling: At the moment, we propose transitioning the > > > Flink > > > > > job > > > > > > > to a terminal FAILED state when JobListener fails, when the job > > > could > > > > > > have > > > > > > > transitioned to RESTARTING->RUNNING. If we are keeping in line > > > with the > > > > > > > scope to add metadata/observability around job restarts, we > should > > > not > > > > > be > > > > > > > affecting the running of the Flink job itself. Could I propose > we > > > > > instead > > > > > > > log WARN/ERROR. > > > > > > > > > > > > > > - immutable context: +1 to keeping the contract clear via > return > > > types. > > > > > > > - async operation: +1 to adding ioexecutor to context, however, > > > given > > > > > we > > > > > > > don’t want to block the actual job restart on adding metadata / > > > calling > > > > > > > external services, should we consider returning and letting > futures > > > > > > > complete independently? > > > > > > > > > > > > > > - independent vs ordered execution: Should we consider making > the > > > order > > > > > > of > > > > > > > execution deterministic (use a List instead of Set)? > > > > > > > > > > > > > > > > > > > > > Once again, thank you for working on this. > > > > > > > > > > > > > > Regards, > > > > > > > Hong > > > > > > > > > > > > > > > > > > > > > > On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID > > > > > > > wrote: > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > Thanks Panagiotis for this FLIP and thanks for all valuable > > > > > > discussions. > > > > > > > > I'd like to share my two cents: > > > > > > > > > > > > > > > > - FailureListenerContext#addTag and > > > FailureListenerContext#getTags. > > > > > It > > > > > > > > seems that we have to call getTags() and then do remove > > > activities if > > > > > > we > > > > > > > > want to delete any tags (according to the javadoc in the > FLIP). > > > It > > > > > is > > > > > > > > inconsistent for me too. Either offer addTag(), deleteTag(), > and > > > let > > > > > > > > getTags() return immutable collection, or offer getTags() > only to > > > > > > return > > > > > > > > mutable collection. > > > > > > > > > > > > > > > > - label vs tag. Label is a great idea +1. AFAIC, tag could > be a > > > > > special > > > > > > > > case of label, i.e. key="tag". It is convenient to offer the > > > xxxTag() > > > > > > > > method if the user only needs one label. I would love to have > > > both of > > > > > > > them. > > > > > > > > Another thought is that tag implicitly contains the meaning > of > > > > > > > "immutable". > > > > > > > > > > > > > > > > - +1 for a separate FLIP of customized restart strategy. > > > Attention > > > > > > should > > > > > > > > be taken to make sure it works well with Flink built-in > > > > > restartStrategy > > > > > > > in > > > > > > > > order to have the single source of truth. > > > > > > > > > > > > > > > > - execution order. The default independent execution should > be > > > fine. > > > > > > > > According to the FailureListener interface definition in the > > > FLIP, > > > > > > users > > > > > > > > should be able to easily build a listener chain[1] to offer > > > > > sequential > > > > > > > > execution, e.g. public FailureListener(FailureListener > > > nextListener). > > > > > > > > Another option is to modify the interface or provide another > > > > > interface > > > > > > > > alongside the current one to extend the method to support > > > > > > ListenerChain, > > > > > > > > i.e. void onFailure(Throwable cause, FailureListenerContext > > > context, > > > > > > > > ListenerChain listenerChain). Users can also mix them up. > > > > > > > > > > > > > > > > - naming. Afaiu, the pluggable extension is not limited to > > > failure > > > > > > > > enrichment. Conceptually it can do everything for the given > > > failure, > > > > > > e.g. > > > > > > > > start counting metric as the FLIP described, calling an > external > > > > > > system, > > > > > > > > sending notification to slack channel, etc. you name it. It > > > sounds to > > > > > > me > > > > > > > > more like a FailureActionListener - it can trigger actions > based > > > on > > > > > > > > failure. Failure enrichment is one type of action. > > > > > > > > > > > > > > > > Best regards, > > > > > > > > Jing > > > > > > > > > > > > > > > > [1] > > > https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern > > > > > > > > > > > > > > > > On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl > > > > > > > > <matthias.p...@aiven.io.invalid> wrote: > > > > > > > > > > > > > > > >> Thanks for the proposal, Panagiotis. A lot of good points > have > > > been > > > > > > > already > > > > > > > >> shared. I just want to add my view on some of the items: > > > > > > > >> > > > > > > > >> - independent execution vs ordered execution: I prefer the > > > listeners > > > > > > > being > > > > > > > >> processed independently from each other because it adds less > > > > > > complexity > > > > > > > >> code-wise. The use case Piotr described (where you want to > reuse > > > > > some > > > > > > > other > > > > > > > >> classifier) is the only one I can think of where we actually > > > need > > > > > > > >> classifiers depending on each other. Supporting such a use > case > > > > > right > > > > > > > from > > > > > > > >> the start feels a bit over-engineered and could be covered > in a > > > > > > > follow-up > > > > > > > >> FLIP if we really come to that point where such a feature is > > > > > requested > > > > > > > by > > > > > > > >> users. > > > > > > > >> > > > > > > > >> - key/value pairs instead of plain labels: I think that's a > good > > > > > idea. > > > > > > > >> key/value pairs are more expressive. +1 > > > > > > > >> > > > > > > > >> - extending the FLIP to cover restart strategy: I understand > > > Gen's > > > > > > > concern > > > > > > > >> about introducing too many different types of plugins. But I > > > would > > > > > > still > > > > > > > >> favor not extending the FLIP in this regard. A pluggable > restart > > > > > > > strategy > > > > > > > >> sounds reasonable. But an error classifier and a restart > > > strategy > > > > > are > > > > > > > still > > > > > > > >> different enough to justify separate plugins, IMHO. And > > > therefore, I > > > > > > > would > > > > > > > >> think that covering the restart strategy in a separate FLIP > is > > > the > > > > > > > better > > > > > > > >> option for the sake of simplicity. > > > > > > > >> > > > > > > > >> - immutable context: Passing in an immutable context and > > > returning > > > > > > data > > > > > > > >> through the interface method's return value sounds like a > better > > > > > > > approach > > > > > > > >> to harden the contract of the interface. +1 for that > proposal > > > > > > > >> > > > > > > > >> - async operation: I think David is right. An async > interface > > > makes > > > > > > the > > > > > > > >> listener implementations more robust when it comes to heavy > IO > > > > > > > operations. > > > > > > > >> The ioExecutor can be passed through the context object. +1 > > > > > > > >> > > > > > > > >> Matthias > > > > > > > >> > > > > > > > >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek < > > > > > > david.mora...@gmail.com> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >>> *@Piotr* > > > > > > > >>> > > > > > > > >>> > > > > > > > >>>> I was thinking about actually defining the order of the > > > > > > > >>>> classifiers/handlers and not allowing them to be > asynchronous. > > > > > > > >>>> Asynchronousity would create some problems: when to > actually > > > > > return > > > > > > > the > > > > > > > >>>> error to the user? After all async responses will get > back? > > > > > Before, > > > > > > > but > > > > > > > >>>> without classified exception? It would also add > implementation > > > > > > > >> complexity > > > > > > > >>>> and I think we can always expand the API with async > version > > > in the > > > > > > > >> future > > > > > > > >>>> if needed. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> As long as the classifiers need to talk to an external > system, > > > we > > > > > by > > > > > > > >>> definition need to allow them to be asynchronous to > unblock the > > > > > main > > > > > > > >> thread > > > > > > > >>> for handling other RPCs. Exposing ioExecutor via the > context > > > > > proposed > > > > > > > >> above > > > > > > > >>> would be great. > > > > > > > >>> > > > > > > > >>> After all async responses will get back > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> This would be the same if we trigger them synchronously > one by > > > one, > > > > > > > with > > > > > > > >> a > > > > > > > >>> caveat that synchronous execution might take significantly > > > longer > > > > > and > > > > > > > >>> introduce unnecessary downtime to a job. > > > > > > > >>> > > > > > > > >>> D. > > > > > > > >>> > > > > > > > >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com > > > > > wrote: > > > > > > > >>> > > > > > > > >>>> Hi Piotr, > > > > > > > >>>> > > > > > > > >>>> It's fine to me to have a separate FLIP to extend this > > > > > > > >> `FailureListener` > > > > > > > >>>> to support custom restart strategy. > > > > > > > >>>> > > > > > > > >>>> What I was a bit concerned is that if we just treat the > > > > > > > >> `FailureListener` > > > > > > > >>>> as an error classifier which is not crucial to Flink > framework > > > > > > > process, > > > > > > > >>>> we may design it to run asynchronously and not trigger > Flink > > > > > > failures. > > > > > > > >>>> This may be a blocker if later we want to enable it to > support > > > > > > custom > > > > > > > >>>> restart strategy. > > > > > > > >>>> > > > > > > > >>>> Thanks, > > > > > > > >>>> Zhu > > > > > > > >>>> > > > > > > > >>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道: > > > > > > > >>>>> > > > > > > > >>>>> Hi Panagiotis, > > > > > > > >>>>> > > > > > > > >>>>> Thanks for the proposal. This is a very valuable feature > and > > > will > > > > > > be > > > > > > > >> a > > > > > > > >>>> good > > > > > > > >>>>> add-on for Flink. > > > > > > > >>>>> > > > > > > > >>>>> I also think that it will be great if we can consider > how to > > > make > > > > > > it > > > > > > > >>>>> possible for users to customize the failure handling in > this > > > > > FLIP. > > > > > > > >> It's > > > > > > > >>>>> highly related to the problem we want to address in this > > > FLIP and > > > > > > > >> could > > > > > > > >>>>> avoid refactoring the interfaces proposed in this FLIP > too > > > > > quickly. > > > > > > > >>>>> > > > > > > > >>>>> Currently it treats all kinds of exceptions the same. > > > However, > > > > > some > > > > > > > >>> kinds > > > > > > > >>>>> of exceptions are actually not recoverable at all. It > could > > > let > > > > > > users > > > > > > > >>> to > > > > > > > >>>>> customize the failure handling logic to fail fast for > certain > > > > > known > > > > > > > >>>>> unrecoverable exceptions and finally make these kinds of > > > jobs get > > > > > > > >>> noticed > > > > > > > >>>>> and recoveried more quickly. > > > > > > > >>>>> > > > > > > > >>>>> Regards, > > > > > > > >>>>> Dian > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo < > luogen...@gmail.com > > > > > > > > > > wrote: > > > > > > > >>>>> > > > > > > > >>>>>> Hi Panagiotis, > > > > > > > >>>>>> > > > > > > > >>>>>> Thanks for the proposal. > > > > > > > >>>>>> > > > > > > > >>>>>> It's useful to enrich the information so that users can > be > > > more > > > > > > > >>>>>> clear why the job is failing, especially platform > > > developers who > > > > > > > >>>>>> need to provide the information to their end users. > > > > > > > >>>>>> And for the very FLIP, I'd prefer the naming > > > `FailureEnricher` > > > > > > > >>>>>> proposed by David, as the plugin doesn't really handle > the > > > > > > failure. > > > > > > > >>>>>> > > > > > > > >>>>>> However, like Zhu and Lijie said, I also joined a > discussion > > > > > > > >>>>>> recently about customized failure handling, e.g. > counting > > > the > > > > > > > >>>>>> failure rate of pipeline regions separately, and failing > > > the job > > > > > > > >>>>>> when a specific error occurs, and so on. > > > > > > > >>>>>> I suppose a custom restart strategy, or I'd call it a > custom > > > > > > > >>>>>> failure "handler", is indeed necessary. It can also > enrich > > > the > > > > > > > >>>>>> information as the current proposed handler does. > > > > > > > >>>>>> > > > > > > > >>>>>> To avoid adding too many plugin interfaces which may > confuse > > > > > users > > > > > > > >>>>>> and make the ExecutionFailureHandler more complex, > > > > > > > >>>>>> I think it'd be better to consider the requirements at > the > > > same > > > > > > > >> time. > > > > > > > >>>>>> > > > > > > > >>>>>> IMO, we can add a handler interface, then make the > current > > > > > restart > > > > > > > >>>>>> strategy and the enricher both types of the handler. The > > > > > handlers > > > > > > > >>>>>> execute in sequence, and the failure is considered > > > unrecoverable > > > > > > if > > > > > > > >>>>>> any of the handlers decides. > > > > > > > >>>>>> In this way, users can also implement a handler using > the > > > > > enriched > > > > > > > >>>>>> information provided by the previous handlers, e.g. fail > > > the job > > > > > > > >> and > > > > > > > >>>>>> send a notification if too many failures are caused by > the > > > end > > > > > > > >> users. > > > > > > > >>>>>> > > > > > > > >>>>>> Best, > > > > > > > >>>>>> Gen > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu < > > > > > > huweihua....@gmail.com > > > > > > > >>> > > > > > > > >>>> wrote: > > > > > > > >>>>>> > > > > > > > >>>>>>> Hi Panagiotis, > > > > > > > >>>>>>> > > > > > > > >>>>>>> Thanks for your proposal. It is valuable to analyze the > > > reason > > > > > > > >> for > > > > > > > >>>>>>> failure with the user plug-in. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Making the context immutable could make the contract > > > stronger. > > > > > > > >>>>>>> Letting the listener return an enriching result may be > a > > > better > > > > > > > >>> way. > > > > > > > >>>>>>> > > > > > > > >>>>>>> IIUC, listeners could do two things, enrich more > > > information > > > > > > > >>>>>> (tags/labels) > > > > > > > >>>>>>> to FailureHandlingResult, and push data out of Flink > > > (metrics > > > > > or > > > > > > > >>>>>>> something). > > > > > > > >>>>>>> IMO, we could split these two types into Listener and > > > Advisor > > > > > > > >>> (maybe > > > > > > > >>>>>>> other names). The Listener just pushes the data out and > > > returns > > > > > > > >>>> nothing > > > > > > > >>>>>> to > > > > > > > >>>>>>> Flink, so we can run these async and don't have to > wait for > > > > > > > >>>> Listener's > > > > > > > >>>>>>> result. > > > > > > > >>>>>>> The Advisor returns rich information to the > > > > > FailureHadingResult, > > > > > > > >>>> and it > > > > > > > >>>>>>> should > > > > > > > >>>>>>> have a lighter logic. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Supporting a custom restart strategy is also valuable. > In > > > this > > > > > > > >>>> design, we > > > > > > > >>>>>>> use > > > > > > > >>>>>>> RestartStrategy to construct a FailureHandingResult, > and > > > then > > > > > > > >> pass > > > > > > > >>>> it to > > > > > > > >>>>>>> Listener. > > > > > > > >>>>>>> My question is, should we change the restart strategy > > > interface > > > > > > > >> to > > > > > > > >>>>>> support > > > > > > > >>>>>>> the > > > > > > > >>>>>>> custom restart strategy, or keep the current restart > > > strategy > > > > > and > > > > > > > >>>> let the > > > > > > > >>>>>>> later > > > > > > > >>>>>>> Listener enrich the restartable information to > > > > > > > >>> FailureHandingResult? > > > > > > > >>>> The > > > > > > > >>>>>>> latter > > > > > > > >>>>>>> may cause some confusion when we use a custom restart > > > strategy. > > > > > > > >>>>>>> The default flink restart strategy also runs but does > not > > > take > > > > > > > >>>> effect. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Best, > > > > > > > >>>>>>> Weihua > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang < > > > > > > > >>>> wangdachui9...@gmail.com> > > > > > > > >>>>>>> wrote: > > > > > > > >>>>>>> > > > > > > > >>>>>>>> Hi Panagiotis, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Thanks for driving this. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> +1 for supporting custom restart strategy, we did > receive > > > such > > > > > > > >>>> requests > > > > > > > >>>>>>>> from the user mailing list [1][2]. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Besides, in current design, the plugin will only do > some > > > > > > > >>>> statistical > > > > > > > >>>>>> and > > > > > > > >>>>>>>> classification work, and will not affect the > > > > > > > >>>> *FailureHandlingResult*. > > > > > > > >>>>>>> Just > > > > > > > >>>>>>>> listening, no handling, it doesn't quite match the > title. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> [1] > > > > > > > >>>> > > > https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1 > > > > > > > >>>>>>>> [2] > > > > > > > >>>> > > > https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Best, > > > > > > > >>>>>>>> Lijie > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> Hi Panagiotis, > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Thanks for creating this proposal! It's good to > enable > > > Flink > > > > > > > >> to > > > > > > > >>>>>> handle > > > > > > > >>>>>>>>> different errors in different ways, through a > pluggable > > > way. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> There are requests for flexible restart strategies > from > > > time > > > > > > > >> to > > > > > > > >>>> time, > > > > > > > >>>>>>> for > > > > > > > >>>>>>>>> different strategies of restart backoff time, or to > > > suppress > > > > > > > >>>>>> restarting > > > > > > > >>>>>>>>> on certain errors. Therefore, I think it's better > that > > > the > > > > > > > >>>> proposed > > > > > > > >>>>>>>>> failure handling plugin can also support custom > restart > > > > > > > >>>> strategies. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Maybe we can call it FailureHandlingAdvisor which > > > provides > > > > > > > >> more > > > > > > > >>>>>>>>> information (labels) and gives advice (restart > backoff > > > time, > > > > > > > >>>> whether > > > > > > > >>>>>>>>> to restart)? I do not have a strong opinion though, > any > > > > > > > >>>> explanatory > > > > > > > >>>>>>>>> name would be good. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> To avoid unexpected mutation, how about to make the > > > context > > > > > > > >>>> immutable > > > > > > > >>>>>>>>> and let the plugin return an immutable result? i.e. > > > remove > > > > > > > >> the > > > > > > > >>>>>> setters > > > > > > > >>>>>>>>> from the context, and let the plugin method return a > > > result > > > > > > > >>> which > > > > > > > >>>>>>>>> contains `labels`, `canRestart` and > `restartBackoffTime`. > > > > > > > >> Flink > > > > > > > >>>>>> should > > > > > > > >>>>>>>>> apply the result to the context before invoking the > next > > > > > > > >>> plugin, > > > > > > > >>>> so > > > > > > > >>>>>>>>> that the next plugin will see the updated context. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> The plugin should avoid taking too much time to > return > > > the > > > > > > > >>>> result, > > > > > > > >>>>>>>> because > > > > > > > >>>>>>>>> it will block the RPC and result in instability. > > > However, it > > > > > > > >>> can > > > > > > > >>>>>> still > > > > > > > >>>>>>>>> perform heavy actions in a different thread. The > context > > > can > > > > > > > >>>> provide > > > > > > > >>>>>> an > > > > > > > >>>>>>>>> `ioExecutor` to the plugins for reuse. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Thanks, > > > > > > > >>>>>>>>> Zhu > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 > 20:21写道: > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Hi Panagiotis > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Thank you for your answer. I agree that > > > `FailureListener` > > > > > > > >>>> could be > > > > > > > >>>>>>>>>> stateless, then I have some thoughts as follows > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> 1. I see that listeners and tag collections are > > > associated. > > > > > > > >>>> When > > > > > > > >>>>>>>>> JobManager > > > > > > > >>>>>>>>>> fails and restarts, how can the new listener be > > > associated > > > > > > > >>>> with the > > > > > > > >>>>>>> tag > > > > > > > >>>>>>>>>> collection before failover? Is the listener loading > > > order? > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> 2. The tag collection may be too large, resulting > in the > > > > > > > >>>> JobManager > > > > > > > >>>>>>>> OOM, > > > > > > > >>>>>>>>> do > > > > > > > >>>>>>>>>> we need to provide a management class that supports > some > > > > > > > >>>>>> obsolescence > > > > > > > >>>>>>>>>> strategies instead of a direct Collection? > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> 3. Is it possible to provide a more complex data > > > structure > > > > > > > >>>> than a > > > > > > > >>>>>>>> simple > > > > > > > >>>>>>>>>> string collection for tags in listeners, such as > > > key-value? > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Best, > > > > > > > >>>>>>>>>> Shammon FY > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu < > > > > > > > >>> xbjt...@gmail.com> > > > > > > > >>>>>>> wrote: > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> Hi,Panagiotis > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Thank you for kicking off this discussion. > Overall, the > > > > > > > >>>> proposed > > > > > > > >>>>>>>>> feature of > > > > > > > >>>>>>>>>>> this FLIP makes sense to me. We have also discussed > > > > > > > >> similar > > > > > > > >>>>>>>>> requirements > > > > > > > >>>>>>>>>>> with our users and developers, and I believe it > will > > > help > > > > > > > >>>> many > > > > > > > >>>>>>> users. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> In terms of FLIP content, I have some thoughts: > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> (1) For the FailureListenerContextget interface, > the > > > > > > > >>> methods > > > > > > > >>>>>>>>>>> FailureListenerContext#addTag and > > > > > > > >>>> FailureListenerContextgetTags > > > > > > > >>>>>>> looks > > > > > > > >>>>>>>>> very > > > > > > > >>>>>>>>>>> inconsistent because they imply specific > implementation > > > > > > > >>>> details, > > > > > > > >>>>>>> and > > > > > > > >>>>>>>>> not > > > > > > > >>>>>>>>>>> all FailureListeners need to handle them, we > shouldn't > > > > > > > >> put > > > > > > > >>>> them > > > > > > > >>>>>> in > > > > > > > >>>>>>>> the > > > > > > > >>>>>>>>>>> interface. Minor: The comment "UDF loading" in the > > > > > > > >>>>>>>> getUserClassLoader() > > > > > > > >>>>>>>>>>> method looks like a typo, IIUC it should return the > > > > > > > >>>> classloader > > > > > > > >>>>>> of > > > > > > > >>>>>>>> the > > > > > > > >>>>>>>>>>> current job. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> (2) Regarding the implementation in > > > > > > > >>>>>>>>> ExecutionFailureHandler#handleFailure, > > > > > > > >>>>>>>>>>> some custom listeners may have heavy IO operations, > > > such > > > > > > > >> as > > > > > > > >>>>>>> reporting > > > > > > > >>>>>>>>> to > > > > > > > >>>>>>>>>>> their monitoring system. The current logic appears > to > > > be > > > > > > > >>>>>> processing > > > > > > > >>>>>>>> in > > > > > > > >>>>>>>>> the > > > > > > > >>>>>>>>>>> JobMaster's main thread, and it is recommended not > to > > > do > > > > > > > >>> this > > > > > > > >>>>>> kind > > > > > > > >>>>>>> of > > > > > > > >>>>>>>>>>> processing in the main thread. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> (3) The results of FailureListener's processing > and the > > > > > > > >>>>>>>>>>> FailureHandlingResult returned by > > > ExecutionFailureHandler > > > > > > > >>>> are not > > > > > > > >>>>>>>>> related. > > > > > > > >>>>>>>>>>> I think these two are closely related, the > motivation > > > of > > > > > > > >>> this > > > > > > > >>>>>> FLIP > > > > > > > >>>>>>> is > > > > > > > >>>>>>>>> to > > > > > > > >>>>>>>>>>> make current failure handling more flexible. From > this > > > > > > > >>>>>> perspective, > > > > > > > >>>>>>>>>>> different listeners should have the opportunity to > > > affect > > > > > > > >>> the > > > > > > > >>>>>> job's > > > > > > > >>>>>>>>> failure > > > > > > > >>>>>>>>>>> handling flow. For example, a Flink job is > configured > > > > > > > >> with > > > > > > > >>> a > > > > > > > >>>>>>>>>>> RestartStrategy with huge numbers retry , but the > Kafka > > > > > > > >>>> topic of > > > > > > > >>>>>>>>> Source has > > > > > > > >>>>>>>>>>> been deleted, the job will failover continuously. > In > > > this > > > > > > > >>>> case, > > > > > > > >>>>>> the > > > > > > > >>>>>>>>> user > > > > > > > >>>>>>>>>>> should have their listener to determine whether > this > > > > > > > >>> failure > > > > > > > >>>> is > > > > > > > >>>>>>>>> recoverable > > > > > > > >>>>>>>>>>> or unrecoverable, and then wrap the processing > result > > > > > > > >> into > > > > > > > >>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass > it to > > > > > > > >>>> JobMaster, > > > > > > > >>>>>>>> this > > > > > > > >>>>>>>>>>> approach will be more flexible. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> (4) All FLIPs have an important section named > Public > > > > > > > >>>> Interfaces. > > > > > > > >>>>>>>>> Current > > > > > > > >>>>>>>>>>> FLIP mixes the interface section and the > implementation > > > > > > > >>>> section > > > > > > > >>>>>>>>> together. > > > > > > > >>>>>>>>>>> It is better for us to refer to the FLIP > template[1] > > > and > > > > > > > >>>> separate > > > > > > > >>>>>>>> them, > > > > > > > >>>>>>>>>>> this will make the entire FLIP clearer. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> In addition, regarding the FLIP process, there is a > > > small > > > > > > > >>>>>>> suggestion: > > > > > > > >>>>>>>>> The > > > > > > > >>>>>>>>>>> community generally creates a JIRA issue after the > FLIP > > > > > > > >>> vote > > > > > > > >>>> is > > > > > > > >>>>>>>> passed, > > > > > > > >>>>>>>>>>> instead of during the FLIP preparation phase > because > > > the > > > > > > > >>>> FLIP may > > > > > > > >>>>>>> be > > > > > > > >>>>>>>>>>> rejected. Although this FLIP is very reasonable, > it's > > > > > > > >>> better > > > > > > > >>>> to > > > > > > > >>>>>>>> follow > > > > > > > >>>>>>>>> the > > > > > > > >>>>>>>>>>> process. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Leonard > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> [1] > > > > > > > >>>>>>> > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek < > > > > > > > >>>> d...@apache.org> > > > > > > > >>>>>>>> wrote: > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> however listeners can use previous state > > > > > > > >> (tags/labels) > > > > > > > >>> to > > > > > > > >>>>>> make > > > > > > > >>>>>>>>>>> decisions > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> That sounds like a very fragile contract. We > should > > > > > > > >>> either > > > > > > > >>>>>> allow > > > > > > > >>>>>>>>> passing > > > > > > > >>>>>>>>>>>> tags between listeners and then need to define > > > ordering > > > > > > > >>> or > > > > > > > >>>> make > > > > > > > >>>>>>> all > > > > > > > >>>>>>>>> of > > > > > > > >>>>>>>>>>> them > > > > > > > >>>>>>>>>>>> independent. I prefer the latter because it > allows us > > > > > > > >> to > > > > > > > >>>>>>>> parallelize > > > > > > > >>>>>>>>>>> things > > > > > > > >>>>>>>>>>>> if needed (if all listeners trigger an RCP to the > > > > > > > >>> external > > > > > > > >>>>>>> system, > > > > > > > >>>>>>>>> for > > > > > > > >>>>>>>>>>>> example). > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Can you expand on why we need more than one > classifier > > > > > > > >> to > > > > > > > >>>> be > > > > > > > >>>>>> able > > > > > > > >>>>>>>> to > > > > > > > >>>>>>>>>>> output > > > > > > > >>>>>>>>>>>> the same tag? > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> system ones come first and then the ones loaded > from > > > > > > > >> the > > > > > > > >>>> plugin > > > > > > > >>>>>>>>> manager > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Since they're returned as a Set, the order is > > > > > > > >> completely > > > > > > > >>>>>>>>>>> non-deterministic, > > > > > > > >>>>>>>>>>>> no matter in which order they're loaded. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> just communicating with external > monitoring/alerting > > > > > > > >>>> systems > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> That makes the need for pushing things out of the > main > > > > > > > >>>> thread > > > > > > > >>>>>>> even > > > > > > > >>>>>>>>>>>> stronger. This almost sounds like we need to > return a > > > > > > > >>>>>>>>> CompletableFuture > > > > > > > >>>>>>>>>>> for > > > > > > > >>>>>>>>>>>> the per-throwable classification because an > external > > > > > > > >>> system > > > > > > > >>>>>> might > > > > > > > >>>>>>>>> take a > > > > > > > >>>>>>>>>>>> significant time to respond. We need to unblock > the > > > > > > > >> main > > > > > > > >>>> thread > > > > > > > >>>>>>> for > > > > > > > >>>>>>>>> other > > > > > > > >>>>>>>>>>>> RPCs. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Also, in the proposal, this happens in the failure > > > > > > > >>>> handler. If > > > > > > > >>>>>>>>> that's the > > > > > > > >>>>>>>>>>>> case, this might block the job from being > restarted > > > (if > > > > > > > >>> the > > > > > > > >>>>>>> restart > > > > > > > >>>>>>>>>>>> strategy allows for another restart), which would > be > > > > > > > >>> great > > > > > > > >>>> to > > > > > > > >>>>>>> avoid > > > > > > > >>>>>>>>>>> because > > > > > > > >>>>>>>>>>>> it can introduce extra downtime. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> This raises another question: what should happen > if > > > the > > > > > > > >>>>>>>>> classification > > > > > > > >>>>>>>>>>>> fails? Crashing the job (which is what's currently > > > > > > > >>>> proposed) > > > > > > > >>>>>>> seems > > > > > > > >>>>>>>>> very > > > > > > > >>>>>>>>>>>> dangerous if this might depend on an external > system. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Thats a valid point, passing the JobGraph > containing > > > > > > > >> all > > > > > > > >>>> the > > > > > > > >>>>>>> above > > > > > > > >>>>>>>>>>>>> information is also something to consider > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> We should avoid passing JG around because it's > mutable > > > > > > > >>>> (which > > > > > > > >>>>>> we > > > > > > > >>>>>>>>> must fix > > > > > > > >>>>>>>>>>>> in the long term), and letting users change it > might > > > > > > > >> have > > > > > > > >>>>>>>>> consequences. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>>> D. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis > Garefalakis > > > > > > > >> < > > > > > > > >>>>>>>>>>> pga...@apache.org> > > > > > > > >>>>>>>>>>>> wrote: > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Hey David, Shammon, > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Thanks for the valuable comments! > > > > > > > >>>>>>>>>>>>> I am glad you find this proposal useful, some > > > > > > > >> thoughts: > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> @Shammon > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> 1. How about adding more job information in > > > > > > > >>>>>>>>> FailureListenerContext? For > > > > > > > >>>>>>>>>>>>>> example, job vertext, subtask, taskmanager > > > > > > > >> location. > > > > > > > >>>> And > > > > > > > >>>>>> then > > > > > > > >>>>>>>>> user > > > > > > > >>>>>>>>>>> can > > > > > > > >>>>>>>>>>>> do > > > > > > > >>>>>>>>>>>>>> more statistics according to different > dimensions. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Thats a valid point, passing the JobGraph > containing > > > > > > > >>> all > > > > > > > >>>> the > > > > > > > >>>>>>>> above > > > > > > > >>>>>>>>>>>>> information > > > > > > > >>>>>>>>>>>>> is also something to consider, I was mostly > trying to > > > > > > > >>> be > > > > > > > >>>>>>>>> conservative: > > > > > > > >>>>>>>>>>>>> i.e., passingly only the information we need, and > > > > > > > >>> extend > > > > > > > >>>> as > > > > > > > >>>>>> we > > > > > > > >>>>>>>> see > > > > > > > >>>>>>>>> fit > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> 2. Users may want to save results in listener, > and > > > > > > > >> then > > > > > > > >>>> they > > > > > > > >>>>>>> can > > > > > > > >>>>>>>>> get > > > > > > > >>>>>>>>>>> the > > > > > > > >>>>>>>>>>>>>> historical results even jabmanager failover. > Can we > > > > > > > >>>>>> provide a > > > > > > > >>>>>>>>> unified > > > > > > > >>>>>>>>>>>>>> implementation for data storage requirements? > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> The idea is to store only the output of the > Listeners > > > > > > > >>>> (tags) > > > > > > > >>>>>>> and > > > > > > > >>>>>>>>> treat > > > > > > > >>>>>>>>>>>> them > > > > > > > >>>>>>>>>>>>> as stateless. > > > > > > > >>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and > > > > > > > >> will > > > > > > > >>> be > > > > > > > >>>>>>>> available > > > > > > > >>>>>>>>>>>> through > > > > > > > >>>>>>>>>>>>> the HistoryServer > > > > > > > >>>>>>>>>>>>> even after a JM dies. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> @David > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> 1) Should we also consider adding labels? The > > > > > > > >>>> combination of > > > > > > > >>>>>>> tags > > > > > > > >>>>>>>>> and > > > > > > > >>>>>>>>>>>>>> labels seems to be what most systems offer; > > > > > > > >>> sometimes, > > > > > > > >>>> they > > > > > > > >>>>>>>> offer > > > > > > > >>>>>>>>>>>> labels > > > > > > > >>>>>>>>>>>>>> only (key=value pairs) because tags can be > > > > > > > >>> implemented > > > > > > > >>>>>> using > > > > > > > >>>>>>>>> those, > > > > > > > >>>>>>>>>>> but > > > > > > > >>>>>>>>>>>>> not > > > > > > > >>>>>>>>>>>>>> the other way around. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Indeed changing tags to k:v labels could be more > > > > > > > >>>> expressive, > > > > > > > >>>>>> I > > > > > > > >>>>>>>>> like it! > > > > > > > >>>>>>>>>>>>> Let's see what others think. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> 2) Since we can not predict how heavy > user-defined > > > > > > > >>> models > > > > > > > >>>>>>>>> ("listeners") > > > > > > > >>>>>>>>>>>> are > > > > > > > >>>>>>>>>>>>>> going to be, it would be great to keep the > > > > > > > >>>> interfaces/data > > > > > > > >>>>>>>>> structures > > > > > > > >>>>>>>>>>>>>> immutable so we can push things over to the I/O > > > > > > > >>>> threads. > > > > > > > >>>>>>> Also, > > > > > > > >>>>>>>> it > > > > > > > >>>>>>>>>>>> sounds > > > > > > > >>>>>>>>>>>>>> off to call the main interface a Listener since > > > > > > > >> it's > > > > > > > >>>>>> supposed > > > > > > > >>>>>>>> to > > > > > > > >>>>>>>>>>>> enhance > > > > > > > >>>>>>>>>>>>>> the original throwable with additional metadata. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> The idea was for the name to be generic as there > > > > > > > >> could > > > > > > > >>> be > > > > > > > >>>>>>>> Listener > > > > > > > >>>>>>>>>>>>> implementations > > > > > > > >>>>>>>>>>>>> just communicating with external > monitoring/alerting > > > > > > > >>>> systems > > > > > > > >>>>>>> and > > > > > > > >>>>>>>> no > > > > > > > >>>>>>>>>>>>> metadata output > > > > > > > >>>>>>>>>>>>> -- but lets rethink that. For immutability, see > > > > > > > >> below: > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> 3) You're proposing to support a set of > listeners. > > > > > > > >>> Since > > > > > > > >>>>>> you're > > > > > > > >>>>>>>>> passing > > > > > > > >>>>>>>>>>>> the > > > > > > > >>>>>>>>>>>>>> mutable context around, which includes tags set > by > > > > > > > >>> the > > > > > > > >>>>>>> previous > > > > > > > >>>>>>>>>>>> listener, > > > > > > > >>>>>>>>>>>>>> do you expect users to make any assumptions > about > > > > > > > >> the > > > > > > > >>>> order > > > > > > > >>>>>>> in > > > > > > > >>>>>>>>> which > > > > > > > >>>>>>>>>>>>>> listeners are executed? > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> In the existing proposal we are not making any > > > > > > > >>>> assumptions > > > > > > > >>>>>>> about > > > > > > > >>>>>>>>> the > > > > > > > >>>>>>>>>>>> order > > > > > > > >>>>>>>>>>>>> of listeners, > > > > > > > >>>>>>>>>>>>> (system ones come first and then the ones loaded > from > > > > > > > >>> the > > > > > > > >>>>>>> plugin > > > > > > > >>>>>>>>>>> manager) > > > > > > > >>>>>>>>>>>>> however listeners can use previous state > > > > > > > >> (tags/labels) > > > > > > > >>> to > > > > > > > >>>>>> make > > > > > > > >>>>>>>>>>> decisions: > > > > > > > >>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we > have > > > > > > > >>>> already > > > > > > > >>>>>>> seen > > > > > > > >>>>>>>>> *USER > > > > > > > >>>>>>>>>>>> *or > > > > > > > >>>>>>>>>>>>> the other way around -- when we have seen > *UNKNOWN* > > > > > > > >>>> remove in > > > > > > > >>>>>>>>> favor of > > > > > > > >>>>>>>>>>>>> *USER* > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Cheers, > > > > > > > >>>>>>>>>>>>> Panagiotis > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek < > > > > > > > >>>>>>> d...@apache.org> > > > > > > > >>>>>>>>>>> wrote: > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> Hi Panagiotis, > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> This is an excellent proposal and something > > > > > > > >> everyone > > > > > > > >>>> trying > > > > > > > >>>>>>> to > > > > > > > >>>>>>>>>>> provide > > > > > > > >>>>>>>>>>>>>> "Flink as a service" needs to solve at some > point. > > > > > > > >> I > > > > > > > >>>> have a > > > > > > > >>>>>>>>> couple of > > > > > > > >>>>>>>>>>>>>> questions: > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> If I understand the proposal correctly, this is > > > > > > > >> just > > > > > > > >>>> about > > > > > > > >>>>>>>> adding > > > > > > > >>>>>>>>>>> tags > > > > > > > >>>>>>>>>>>> to > > > > > > > >>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable, > > > > > > > >>>>>>> FailureContext) > > > > > > > >>>>>>>>>>>> through a > > > > > > > >>>>>>>>>>>>>> user-defined model. > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> 1) Should we also consider adding labels? The > > > > > > > >>>> combination > > > > > > > >>>>>> of > > > > > > > >>>>>>>>> tags and > > > > > > > >>>>>>>>>>>>>> labels seems to be what most systems offer; > > > > > > > >>> sometimes, > > > > > > > >>>> they > > > > > > > >>>>>>>> offer > > > > > > > >>>>>>>>>>>> labels > > > > > > > >>>>>>>>>>>>>> only (key=value pairs) because tags can be > > > > > > > >>> implemented > > > > > > > >>>>>> using > > > > > > > >>>>>>>>> those, > > > > > > > >>>>>>>>>>> but > > > > > > > >>>>>>>>>>>>> not > > > > > > > >>>>>>>>>>>>>> the other way around. > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> 2) Since we can not predict how heavy > user-defined > > > > > > > >>>> models > > > > > > > >>>>>>>>>>> ("listeners") > > > > > > > >>>>>>>>>>>>> are > > > > > > > >>>>>>>>>>>>>> going to be, it would be great to keep the > > > > > > > >>>> interfaces/data > > > > > > > >>>>>>>>> structures > > > > > > > >>>>>>>>>>>>>> immutable so we can push things over to the I/O > > > > > > > >>>> threads. > > > > > > > >>>>>>> Also, > > > > > > > >>>>>>>> it > > > > > > > >>>>>>>>>>>> sounds > > > > > > > >>>>>>>>>>>>>> off to call the main interface a Listener since > > > > > > > >> it's > > > > > > > >>>>>> supposed > > > > > > > >>>>>>>> to > > > > > > > >>>>>>>>>>>> enhance > > > > > > > >>>>>>>>>>>>>> the original throwable with additional metadata. > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> I'd propose something along the lines of (we > should > > > > > > > >>>> have > > > > > > > >>>>>>> better > > > > > > > >>>>>>>>>>> names, > > > > > > > >>>>>>>>>>>>> this > > > > > > > >>>>>>>>>>>>>> is just to outline the idea): > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> interface FailureEnricher { > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> ThrowableWithTagsAndLabels > > > > > > > >> enrichFailure(Throwable > > > > > > > >>>> cause, > > > > > > > >>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable > > > > > > > >>> context); > > > > > > > >>>>>>>>>>>>>> } > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> The names should change; this is just to outline > > > > > > > >> the > > > > > > > >>>> idea. > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> 3) You're proposing to support a set of > listeners. > > > > > > > >>>> Since > > > > > > > >>>>>>> you're > > > > > > > >>>>>>>>>>> passing > > > > > > > >>>>>>>>>>>>> the > > > > > > > >>>>>>>>>>>>>> mutable context around, which includes tags set > by > > > > > > > >>> the > > > > > > > >>>>>>> previous > > > > > > > >>>>>>>>>>>> listener, > > > > > > > >>>>>>>>>>>>>> do you expect users to make any assumptions > about > > > > > > > >> the > > > > > > > >>>> order > > > > > > > >>>>>>> in > > > > > > > >>>>>>>>> which > > > > > > > >>>>>>>>>>>>>> listeners are executed? > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> *@Shammon* > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> Users may want to save results in listener, and > > > > > > > >> then > > > > > > > >>>> they > > > > > > > >>>>>> can > > > > > > > >>>>>>>>> get the > > > > > > > >>>>>>>>>>>>>>> historical results even jabmanager failover. > Can > > > > > > > >> we > > > > > > > >>>>>>> provide a > > > > > > > >>>>>>>>>>> unified > > > > > > > >>>>>>>>>>>>>>> implementation for data storage requirements? > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> I think we should explicitly state that all > > > > > > > >>>> "listeners" are > > > > > > > >>>>>>>>> treated > > > > > > > >>>>>>>>>>> as > > > > > > > >>>>>>>>>>>>>> stateless. I don't see any strong reason for > > > > > > > >>>> snapshotting > > > > > > > >>>>>>> them. > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>>>>> D. > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY < > > > > > > > >>>>>>> zjur...@gmail.com> > > > > > > > >>>>>>>>>>> wrote: > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> Hi Panagiotis > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> Thank you for starting this discussion. I think > > > > > > > >>> this > > > > > > > >>>> FLIP > > > > > > > >>>>>>> is > > > > > > > >>>>>>>>>>> valuable > > > > > > > >>>>>>>>>>>>> and > > > > > > > >>>>>>>>>>>>>>> can help user to analyze the causes of job > > > > > > > >> failover > > > > > > > >>>>>> better! > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> I have two comments as follows > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> 1. How about adding more job information in > > > > > > > >>>>>>>>> FailureListenerContext? > > > > > > > >>>>>>>>>>>> For > > > > > > > >>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager > > > > > > > >>> location. > > > > > > > >>>> And > > > > > > > >>>>>>> then > > > > > > > >>>>>>>>> user > > > > > > > >>>>>>>>>>>> can > > > > > > > >>>>>>>>>>>>> do > > > > > > > >>>>>>>>>>>>>>> more statistics according to different > > > > > > > >> dimensions. > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> 2. Users may want to save results in listener, > > > > > > > >> and > > > > > > > >>>> then > > > > > > > >>>>>>> they > > > > > > > >>>>>>>>> can > > > > > > > >>>>>>>>>>> get > > > > > > > >>>>>>>>>>>>> the > > > > > > > >>>>>>>>>>>>>>> historical results even jabmanager failover. > Can > > > > > > > >> we > > > > > > > >>>>>>> provide a > > > > > > > >>>>>>>>>>> unified > > > > > > > >>>>>>>>>>>>>>> implementation for data storage requirements? > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>>>>>> shammon FY > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis > > > > > > > >>> Garefalakis < > > > > > > > >>>>>>>>>>>> pga...@apache.org > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> wrote: > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> Hi everyone, > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface > > > > > > > >> for > > > > > > > >>>>>> failure > > > > > > > >>>>>>>>> handling > > > > > > > >>>>>>>>>>>>>>> allowing > > > > > > > >>>>>>>>>>>>>>>> users to implement custom failure logic using > > > > > > > >> the > > > > > > > >>>>>> plugin > > > > > > > >>>>>>>>>>> framework. > > > > > > > >>>>>>>>>>>>>>>> Motivated by existing proposals [2] and > tickets > > > > > > > >>>> [3], > > > > > > > >>>>>> this > > > > > > > >>>>>>>>> enables > > > > > > > >>>>>>>>>>>>>>> use-cases > > > > > > > >>>>>>>>>>>>>>>> like: assigning particular types to failures > > > > > > > >>> (e.g., > > > > > > > >>>>>> User > > > > > > > >>>>>>> or > > > > > > > >>>>>>>>>>>> System), > > > > > > > >>>>>>>>>>>>>>>> emitting custom metrics per type (e.g., > > > > > > > >>>> application or > > > > > > > >>>>>>>>> platform), > > > > > > > >>>>>>>>>>>>> even > > > > > > > >>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g., > > > > > > > >>>>>>> notification > > > > > > > >>>>>>>>>>>> systems). > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial > > > > > > > >> reviews > > > > > > > >>>> and > > > > > > > >>>>>>>>>>> discussions! > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> For anyone interested, the starting point > would > > > > > > > >>> be > > > > > > > >>>> the > > > > > > > >>>>>>> FLIP > > > > > > > >>>>>>>>> [1] > > > > > > > >>>>>>>>>>>> that > > > > > > > >>>>>>>>>>>>> I > > > > > > > >>>>>>>>>>>>>>>> created, > > > > > > > >>>>>>>>>>>>>>>> describing the motivation and the proposed > > > > > > > >>> changes > > > > > > > >>>>>> (part > > > > > > > >>>>>>> of > > > > > > > >>>>>>>>> the > > > > > > > >>>>>>>>>>>> core, > > > > > > > >>>>>>>>>>>>>>>> runtime and web). > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> The intuition behind this FLIP is being able > to > > > > > > > >>>> execute > > > > > > > >>>>>>>>> custom > > > > > > > >>>>>>>>>>>> logic > > > > > > > >>>>>>>>>>>>> on > > > > > > > >>>>>>>>>>>>>>>> failures by exposing a FailureListener > > > > > > > >> interface. > > > > > > > >>>>>>>>> Implementation > > > > > > > >>>>>>>>>>> by > > > > > > > >>>>>>>>>>>>>> users > > > > > > > >>>>>>>>>>>>>>>> can be simply loaded to the system as Jar > > > > > > > >> files. > > > > > > > >>>>>>>>> FailureListeners > > > > > > > >>>>>>>>>>>> may > > > > > > > >>>>>>>>>>>>>>> also > > > > > > > >>>>>>>>>>>>>>>> decide to assign failure tags to errors > > > > > > > >>> (expressed > > > > > > > >>>> as > > > > > > > >>>>>>>>> strings), > > > > > > > >>>>>>>>>>>>>>>> that will then be exposed as metadata by the > > > > > > > >>>> UI/Rest > > > > > > > >>>>>>>>> interfaces. > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> Feedback is always appreciated! Looking > forward > > > > > > > >>> to > > > > > > > >>>> your > > > > > > > >>>>>>>>> thoughts! > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> [1] > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304% > > > > > > > >>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink > > > > > > > >>>>>>>>>>>>>>>> [2] > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>> > > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67 > > > > > > > >>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4 > > > > > > > >>>>>>>>>>>>>>>> [3] > > > > > > > >>>> https://issues.apache.org/jira/browse/FLINK-20833 > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>>> Cheers, > > > > > > > >>>>>>>>>>>>>>>> Panagiotis > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>> > > > > > > > >>>> > > > > > > > >>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >