> > One additional remark on introducing it as an async operation: We would > need a new configuration parameter to define the timeout for such a > listener call, wouldn't we? >
This could be left up to the implementor to handle. What about adding an extra method getNamespace() to the Listener interface > which returns an Optional<String>. > I'd avoid mixing an additional concept into this. We can simply have a new method that returns a set of keys the listener can output. We can validate this at the JM startup time and fail fast (since it's a configuration error) if there is an overlap. If the listener outputs the key that is not allowed to, I wouldn't be afraid to call into a fatal error handler since it's an invalid implementation. Best, D. On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl <matthias.p...@aiven.io.invalid> wrote: > Sounds good. Two points I want to add: > > - Listener execution should be independent — however we need a way to > > enforce a Label key/key-prefix is only assigned to a single Listener, > > thinking of a validation step both at Listener init and runtime stages > > > What about adding an extra method getNamespace() to the Listener interface > which returns an Optional<String>. Therefore, the implementation/the user > can decide depending on the use case whether it's necessary to have > separate namespaces for the key/value pairs or not. On the Flink side, we > would just merge the different maps considering their namespaces. > > A flaw of this approach is that if a user decides to use the same namespace > for multiple listeners, how is an error in one of the listeners represented > in the outcome? We would have to overwrite either the successful listener's > result or the failed ones. I wanted to share it, anyway. > > One additional remark on introducing it as an async operation: We would > need a new configuration parameter to define the timeout for such a > listener call, wouldn't we? > > Matthias > > On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis <pga...@apache.org> > wrote: > > > Hi everyone, > > > > > > Thanks for the valuable comments! > > Excited to see this is an area of interest for the community! > > > > Summarizing some of the main points raised along with my thoughts: > > > > - Labels (Key/Value) pairs are more expressive than Tags (Strings) so > > using the former is a good idea — I am also debating if we want to > > return > > multiple KV pairs per Listener (one could argue that we could split > the > > logic in multiple Listeners to support that) > > - An immutable context along with data returned using the interface > > method implementations is a better approach than a mutable Collection > > - Listener execution should be independent — however we need a way to > > enforce a Label key/key-prefix is only assigned to a single Listener, > > thinking of a validation step both at Listener init and runtime stages > > - We want to perform async Listener operations as sync could block the > > main thread — exposing an ioExecutor pool through the context could be > > an > > elegant solution here > > - Make sure Listener errors are not failing jobs — make sure to log > and > > keep the job alive > > - We need better naming / public interface separation/description > > > > - Even though custom restart strategies share some properties > with > > Listeners, they would probably need a separate interface with a different > > return type anyway (restart strategy not labels) and in general they are > > different and complex enough to justify their own FLIP (that can also be > a > > follow-up). > > > > > > What do people think? I am planning to modify the FLIP to reflect these > > changes if they make sense to everyone. > > > > Cheers, > > Panagiotis > > > > On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh <hlteo...@gmail.com> wrote: > > > > > Hi all, > > > > > > Thank you Panagiotis for proposing this. From the size of the thread, > > this > > > is a much needed feature in Flink! > > > Some thoughts, to extend those already adeptly summarised by Piotr, > > > Matthias and Jing. > > > > > > - scope of FLIP: +1 to scoping this FLIP to observability around a > > > restart. That would include adding metadata + exposing metadata to > > external > > > systems. IMO, introducing a new restart strategy solves different > > problems, > > > is much larger scope and should be covered in a separate FLIP. > > > > > > - failure handling: At the moment, we propose transitioning the Flink > job > > > to a terminal FAILED state when JobListener fails, when the job could > > have > > > transitioned to RESTARTING->RUNNING. If we are keeping in line with the > > > scope to add metadata/observability around job restarts, we should not > be > > > affecting the running of the Flink job itself. Could I propose we > instead > > > log WARN/ERROR. > > > > > > - immutable context: +1 to keeping the contract clear via return types. > > > - async operation: +1 to adding ioexecutor to context, however, given > we > > > don’t want to block the actual job restart on adding metadata / calling > > > external services, should we consider returning and letting futures > > > complete independently? > > > > > > - independent vs ordered execution: Should we consider making the order > > of > > > execution deterministic (use a List instead of Set)? > > > > > > > > > Once again, thank you for working on this. > > > > > > Regards, > > > Hong > > > > > > > > > > On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID> > wrote: > > > > > > > > Hi, > > > > > > > > Thanks Panagiotis for this FLIP and thanks for all valuable > > discussions. > > > > I'd like to share my two cents: > > > > > > > > - FailureListenerContext#addTag and FailureListenerContext#getTags. > It > > > > seems that we have to call getTags() and then do remove activities if > > we > > > > want to delete any tags (according to the javadoc in the FLIP). It > is > > > > inconsistent for me too. Either offer addTag(), deleteTag(), and let > > > > getTags() return immutable collection, or offer getTags() only to > > return > > > > mutable collection. > > > > > > > > - label vs tag. Label is a great idea +1. AFAIC, tag could be a > special > > > > case of label, i.e. key="tag". It is convenient to offer the xxxTag() > > > > method if the user only needs one label. I would love to have both of > > > them. > > > > Another thought is that tag implicitly contains the meaning of > > > "immutable". > > > > > > > > - +1 for a separate FLIP of customized restart strategy. Attention > > should > > > > be taken to make sure it works well with Flink built-in > restartStrategy > > > in > > > > order to have the single source of truth. > > > > > > > > - execution order. The default independent execution should be fine. > > > > According to the FailureListener interface definition in the FLIP, > > users > > > > should be able to easily build a listener chain[1] to offer > sequential > > > > execution, e.g. public FailureListener(FailureListener nextListener). > > > > Another option is to modify the interface or provide another > interface > > > > alongside the current one to extend the method to support > > ListenerChain, > > > > i.e. void onFailure(Throwable cause, FailureListenerContext context, > > > > ListenerChain listenerChain). Users can also mix them up. > > > > > > > > - naming. Afaiu, the pluggable extension is not limited to failure > > > > enrichment. Conceptually it can do everything for the given failure, > > e.g. > > > > start counting metric as the FLIP described, calling an external > > system, > > > > sending notification to slack channel, etc. you name it. It sounds to > > me > > > > more like a FailureActionListener - it can trigger actions based on > > > > failure. Failure enrichment is one type of action. > > > > > > > > Best regards, > > > > Jing > > > > > > > > [1] https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern > > > > > > > > On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl > > > > <matthias.p...@aiven.io.invalid> wrote: > > > > > > > >> Thanks for the proposal, Panagiotis. A lot of good points have been > > > already > > > >> shared. I just want to add my view on some of the items: > > > >> > > > >> - independent execution vs ordered execution: I prefer the listeners > > > being > > > >> processed independently from each other because it adds less > > complexity > > > >> code-wise. The use case Piotr described (where you want to reuse > some > > > other > > > >> classifier) is the only one I can think of where we actually need > > > >> classifiers depending on each other. Supporting such a use case > right > > > from > > > >> the start feels a bit over-engineered and could be covered in a > > > follow-up > > > >> FLIP if we really come to that point where such a feature is > requested > > > by > > > >> users. > > > >> > > > >> - key/value pairs instead of plain labels: I think that's a good > idea. > > > >> key/value pairs are more expressive. +1 > > > >> > > > >> - extending the FLIP to cover restart strategy: I understand Gen's > > > concern > > > >> about introducing too many different types of plugins. But I would > > still > > > >> favor not extending the FLIP in this regard. A pluggable restart > > > strategy > > > >> sounds reasonable. But an error classifier and a restart strategy > are > > > still > > > >> different enough to justify separate plugins, IMHO. And therefore, I > > > would > > > >> think that covering the restart strategy in a separate FLIP is the > > > better > > > >> option for the sake of simplicity. > > > >> > > > >> - immutable context: Passing in an immutable context and returning > > data > > > >> through the interface method's return value sounds like a better > > > approach > > > >> to harden the contract of the interface. +1 for that proposal > > > >> > > > >> - async operation: I think David is right. An async interface makes > > the > > > >> listener implementations more robust when it comes to heavy IO > > > operations. > > > >> The ioExecutor can be passed through the context object. +1 > > > >> > > > >> Matthias > > > >> > > > >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek < > > david.mora...@gmail.com> > > > >> wrote: > > > >> > > > >>> *@Piotr* > > > >>> > > > >>> > > > >>>> I was thinking about actually defining the order of the > > > >>>> classifiers/handlers and not allowing them to be asynchronous. > > > >>>> Asynchronousity would create some problems: when to actually > return > > > the > > > >>>> error to the user? After all async responses will get back? > Before, > > > but > > > >>>> without classified exception? It would also add implementation > > > >> complexity > > > >>>> and I think we can always expand the API with async version in the > > > >> future > > > >>>> if needed. > > > >>> > > > >>> > > > >>> As long as the classifiers need to talk to an external system, we > by > > > >>> definition need to allow them to be asynchronous to unblock the > main > > > >> thread > > > >>> for handling other RPCs. Exposing ioExecutor via the context > proposed > > > >> above > > > >>> would be great. > > > >>> > > > >>> After all async responses will get back > > > >>> > > > >>> > > > >>> This would be the same if we trigger them synchronously one by one, > > > with > > > >> a > > > >>> caveat that synchronous execution might take significantly longer > and > > > >>> introduce unnecessary downtime to a job. > > > >>> > > > >>> D. > > > >>> > > > >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com> wrote: > > > >>> > > > >>>> Hi Piotr, > > > >>>> > > > >>>> It's fine to me to have a separate FLIP to extend this > > > >> `FailureListener` > > > >>>> to support custom restart strategy. > > > >>>> > > > >>>> What I was a bit concerned is that if we just treat the > > > >> `FailureListener` > > > >>>> as an error classifier which is not crucial to Flink framework > > > process, > > > >>>> we may design it to run asynchronously and not trigger Flink > > failures. > > > >>>> This may be a blocker if later we want to enable it to support > > custom > > > >>>> restart strategy. > > > >>>> > > > >>>> Thanks, > > > >>>> Zhu > > > >>>> > > > >>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道: > > > >>>>> > > > >>>>> Hi Panagiotis, > > > >>>>> > > > >>>>> Thanks for the proposal. This is a very valuable feature and will > > be > > > >> a > > > >>>> good > > > >>>>> add-on for Flink. > > > >>>>> > > > >>>>> I also think that it will be great if we can consider how to make > > it > > > >>>>> possible for users to customize the failure handling in this > FLIP. > > > >> It's > > > >>>>> highly related to the problem we want to address in this FLIP and > > > >> could > > > >>>>> avoid refactoring the interfaces proposed in this FLIP too > quickly. > > > >>>>> > > > >>>>> Currently it treats all kinds of exceptions the same. However, > some > > > >>> kinds > > > >>>>> of exceptions are actually not recoverable at all. It could let > > users > > > >>> to > > > >>>>> customize the failure handling logic to fail fast for certain > known > > > >>>>> unrecoverable exceptions and finally make these kinds of jobs get > > > >>> noticed > > > >>>>> and recoveried more quickly. > > > >>>>> > > > >>>>> Regards, > > > >>>>> Dian > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo <luogen...@gmail.com> > > wrote: > > > >>>>> > > > >>>>>> Hi Panagiotis, > > > >>>>>> > > > >>>>>> Thanks for the proposal. > > > >>>>>> > > > >>>>>> It's useful to enrich the information so that users can be more > > > >>>>>> clear why the job is failing, especially platform developers who > > > >>>>>> need to provide the information to their end users. > > > >>>>>> And for the very FLIP, I'd prefer the naming `FailureEnricher` > > > >>>>>> proposed by David, as the plugin doesn't really handle the > > failure. > > > >>>>>> > > > >>>>>> However, like Zhu and Lijie said, I also joined a discussion > > > >>>>>> recently about customized failure handling, e.g. counting the > > > >>>>>> failure rate of pipeline regions separately, and failing the job > > > >>>>>> when a specific error occurs, and so on. > > > >>>>>> I suppose a custom restart strategy, or I'd call it a custom > > > >>>>>> failure "handler", is indeed necessary. It can also enrich the > > > >>>>>> information as the current proposed handler does. > > > >>>>>> > > > >>>>>> To avoid adding too many plugin interfaces which may confuse > users > > > >>>>>> and make the ExecutionFailureHandler more complex, > > > >>>>>> I think it'd be better to consider the requirements at the same > > > >> time. > > > >>>>>> > > > >>>>>> IMO, we can add a handler interface, then make the current > restart > > > >>>>>> strategy and the enricher both types of the handler. The > handlers > > > >>>>>> execute in sequence, and the failure is considered unrecoverable > > if > > > >>>>>> any of the handlers decides. > > > >>>>>> In this way, users can also implement a handler using the > enriched > > > >>>>>> information provided by the previous handlers, e.g. fail the job > > > >> and > > > >>>>>> send a notification if too many failures are caused by the end > > > >> users. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Gen > > > >>>>>> > > > >>>>>> > > > >>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu < > > huweihua....@gmail.com > > > >>> > > > >>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Panagiotis, > > > >>>>>>> > > > >>>>>>> Thanks for your proposal. It is valuable to analyze the reason > > > >> for > > > >>>>>>> failure with the user plug-in. > > > >>>>>>> > > > >>>>>>> Making the context immutable could make the contract stronger. > > > >>>>>>> Letting the listener return an enriching result may be a better > > > >>> way. > > > >>>>>>> > > > >>>>>>> IIUC, listeners could do two things, enrich more information > > > >>>>>> (tags/labels) > > > >>>>>>> to FailureHandlingResult, and push data out of Flink (metrics > or > > > >>>>>>> something). > > > >>>>>>> IMO, we could split these two types into Listener and Advisor > > > >>> (maybe > > > >>>>>>> other names). The Listener just pushes the data out and returns > > > >>>> nothing > > > >>>>>> to > > > >>>>>>> Flink, so we can run these async and don't have to wait for > > > >>>> Listener's > > > >>>>>>> result. > > > >>>>>>> The Advisor returns rich information to the > FailureHadingResult, > > > >>>> and it > > > >>>>>>> should > > > >>>>>>> have a lighter logic. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Supporting a custom restart strategy is also valuable. In this > > > >>>> design, we > > > >>>>>>> use > > > >>>>>>> RestartStrategy to construct a FailureHandingResult, and then > > > >> pass > > > >>>> it to > > > >>>>>>> Listener. > > > >>>>>>> My question is, should we change the restart strategy interface > > > >> to > > > >>>>>> support > > > >>>>>>> the > > > >>>>>>> custom restart strategy, or keep the current restart strategy > and > > > >>>> let the > > > >>>>>>> later > > > >>>>>>> Listener enrich the restartable information to > > > >>> FailureHandingResult? > > > >>>> The > > > >>>>>>> latter > > > >>>>>>> may cause some confusion when we use a custom restart strategy. > > > >>>>>>> The default flink restart strategy also runs but does not take > > > >>>> effect. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Weihua > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang < > > > >>>> wangdachui9...@gmail.com> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi Panagiotis, > > > >>>>>>>> > > > >>>>>>>> Thanks for driving this. > > > >>>>>>>> > > > >>>>>>>> +1 for supporting custom restart strategy, we did receive such > > > >>>> requests > > > >>>>>>>> from the user mailing list [1][2]. > > > >>>>>>>> > > > >>>>>>>> Besides, in current design, the plugin will only do some > > > >>>> statistical > > > >>>>>> and > > > >>>>>>>> classification work, and will not affect the > > > >>>> *FailureHandlingResult*. > > > >>>>>>> Just > > > >>>>>>>> listening, no handling, it doesn't quite match the title. > > > >>>>>>>> > > > >>>>>>>> [1] > > > >>>> https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1 > > > >>>>>>>> [2] > > > >>>> https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> Lijie > > > >>>>>>>> > > > >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道: > > > >>>>>>>> > > > >>>>>>>>> Hi Panagiotis, > > > >>>>>>>>> > > > >>>>>>>>> Thanks for creating this proposal! It's good to enable Flink > > > >> to > > > >>>>>> handle > > > >>>>>>>>> different errors in different ways, through a pluggable way. > > > >>>>>>>>> > > > >>>>>>>>> There are requests for flexible restart strategies from time > > > >> to > > > >>>> time, > > > >>>>>>> for > > > >>>>>>>>> different strategies of restart backoff time, or to suppress > > > >>>>>> restarting > > > >>>>>>>>> on certain errors. Therefore, I think it's better that the > > > >>>> proposed > > > >>>>>>>>> failure handling plugin can also support custom restart > > > >>>> strategies. > > > >>>>>>>>> > > > >>>>>>>>> Maybe we can call it FailureHandlingAdvisor which provides > > > >> more > > > >>>>>>>>> information (labels) and gives advice (restart backoff time, > > > >>>> whether > > > >>>>>>>>> to restart)? I do not have a strong opinion though, any > > > >>>> explanatory > > > >>>>>>>>> name would be good. > > > >>>>>>>>> > > > >>>>>>>>> To avoid unexpected mutation, how about to make the context > > > >>>> immutable > > > >>>>>>>>> and let the plugin return an immutable result? i.e. remove > > > >> the > > > >>>>>> setters > > > >>>>>>>>> from the context, and let the plugin method return a result > > > >>> which > > > >>>>>>>>> contains `labels`, `canRestart` and `restartBackoffTime`. > > > >> Flink > > > >>>>>> should > > > >>>>>>>>> apply the result to the context before invoking the next > > > >>> plugin, > > > >>>> so > > > >>>>>>>>> that the next plugin will see the updated context. > > > >>>>>>>>> > > > >>>>>>>>> The plugin should avoid taking too much time to return the > > > >>>> result, > > > >>>>>>>> because > > > >>>>>>>>> it will block the RPC and result in instability. However, it > > > >>> can > > > >>>>>> still > > > >>>>>>>>> perform heavy actions in a different thread. The context can > > > >>>> provide > > > >>>>>> an > > > >>>>>>>>> `ioExecutor` to the plugins for reuse. > > > >>>>>>>>> > > > >>>>>>>>> Thanks, > > > >>>>>>>>> Zhu > > > >>>>>>>>> > > > >>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道: > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Panagiotis > > > >>>>>>>>>> > > > >>>>>>>>>> Thank you for your answer. I agree that `FailureListener` > > > >>>> could be > > > >>>>>>>>>> stateless, then I have some thoughts as follows > > > >>>>>>>>>> > > > >>>>>>>>>> 1. I see that listeners and tag collections are associated. > > > >>>> When > > > >>>>>>>>> JobManager > > > >>>>>>>>>> fails and restarts, how can the new listener be associated > > > >>>> with the > > > >>>>>>> tag > > > >>>>>>>>>> collection before failover? Is the listener loading order? > > > >>>>>>>>>> > > > >>>>>>>>>> 2. The tag collection may be too large, resulting in the > > > >>>> JobManager > > > >>>>>>>> OOM, > > > >>>>>>>>> do > > > >>>>>>>>>> we need to provide a management class that supports some > > > >>>>>> obsolescence > > > >>>>>>>>>> strategies instead of a direct Collection? > > > >>>>>>>>>> > > > >>>>>>>>>> 3. Is it possible to provide a more complex data structure > > > >>>> than a > > > >>>>>>>> simple > > > >>>>>>>>>> string collection for tags in listeners, such as key-value? > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Shammon FY > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu < > > > >>> xbjt...@gmail.com> > > > >>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi,Panagiotis > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thank you for kicking off this discussion. Overall, the > > > >>>> proposed > > > >>>>>>>>> feature of > > > >>>>>>>>>>> this FLIP makes sense to me. We have also discussed > > > >> similar > > > >>>>>>>>> requirements > > > >>>>>>>>>>> with our users and developers, and I believe it will help > > > >>>> many > > > >>>>>>> users. > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> In terms of FLIP content, I have some thoughts: > > > >>>>>>>>>>> > > > >>>>>>>>>>> (1) For the FailureListenerContextget interface, the > > > >>> methods > > > >>>>>>>>>>> FailureListenerContext#addTag and > > > >>>> FailureListenerContextgetTags > > > >>>>>>> looks > > > >>>>>>>>> very > > > >>>>>>>>>>> inconsistent because they imply specific implementation > > > >>>> details, > > > >>>>>>> and > > > >>>>>>>>> not > > > >>>>>>>>>>> all FailureListeners need to handle them, we shouldn't > > > >> put > > > >>>> them > > > >>>>>> in > > > >>>>>>>> the > > > >>>>>>>>>>> interface. Minor: The comment "UDF loading" in the > > > >>>>>>>> getUserClassLoader() > > > >>>>>>>>>>> method looks like a typo, IIUC it should return the > > > >>>> classloader > > > >>>>>> of > > > >>>>>>>> the > > > >>>>>>>>>>> current job. > > > >>>>>>>>>>> > > > >>>>>>>>>>> (2) Regarding the implementation in > > > >>>>>>>>> ExecutionFailureHandler#handleFailure, > > > >>>>>>>>>>> some custom listeners may have heavy IO operations, such > > > >> as > > > >>>>>>> reporting > > > >>>>>>>>> to > > > >>>>>>>>>>> their monitoring system. The current logic appears to be > > > >>>>>> processing > > > >>>>>>>> in > > > >>>>>>>>> the > > > >>>>>>>>>>> JobMaster's main thread, and it is recommended not to do > > > >>> this > > > >>>>>> kind > > > >>>>>>> of > > > >>>>>>>>>>> processing in the main thread. > > > >>>>>>>>>>> > > > >>>>>>>>>>> (3) The results of FailureListener's processing and the > > > >>>>>>>>>>> FailureHandlingResult returned by ExecutionFailureHandler > > > >>>> are not > > > >>>>>>>>> related. > > > >>>>>>>>>>> I think these two are closely related, the motivation of > > > >>> this > > > >>>>>> FLIP > > > >>>>>>> is > > > >>>>>>>>> to > > > >>>>>>>>>>> make current failure handling more flexible. From this > > > >>>>>> perspective, > > > >>>>>>>>>>> different listeners should have the opportunity to affect > > > >>> the > > > >>>>>> job's > > > >>>>>>>>> failure > > > >>>>>>>>>>> handling flow. For example, a Flink job is configured > > > >> with > > > >>> a > > > >>>>>>>>>>> RestartStrategy with huge numbers retry , but the Kafka > > > >>>> topic of > > > >>>>>>>>> Source has > > > >>>>>>>>>>> been deleted, the job will failover continuously. In this > > > >>>> case, > > > >>>>>> the > > > >>>>>>>>> user > > > >>>>>>>>>>> should have their listener to determine whether this > > > >>> failure > > > >>>> is > > > >>>>>>>>> recoverable > > > >>>>>>>>>>> or unrecoverable, and then wrap the processing result > > > >> into > > > >>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass it to > > > >>>> JobMaster, > > > >>>>>>>> this > > > >>>>>>>>>>> approach will be more flexible. > > > >>>>>>>>>>> > > > >>>>>>>>>>> (4) All FLIPs have an important section named Public > > > >>>> Interfaces. > > > >>>>>>>>> Current > > > >>>>>>>>>>> FLIP mixes the interface section and the implementation > > > >>>> section > > > >>>>>>>>> together. > > > >>>>>>>>>>> It is better for us to refer to the FLIP template[1] and > > > >>>> separate > > > >>>>>>>> them, > > > >>>>>>>>>>> this will make the entire FLIP clearer. > > > >>>>>>>>>>> > > > >>>>>>>>>>> In addition, regarding the FLIP process, there is a small > > > >>>>>>> suggestion: > > > >>>>>>>>> The > > > >>>>>>>>>>> community generally creates a JIRA issue after the FLIP > > > >>> vote > > > >>>> is > > > >>>>>>>> passed, > > > >>>>>>>>>>> instead of during the FLIP preparation phase because the > > > >>>> FLIP may > > > >>>>>>> be > > > >>>>>>>>>>> rejected. Although this FLIP is very reasonable, it's > > > >>> better > > > >>>> to > > > >>>>>>>> follow > > > >>>>>>>>> the > > > >>>>>>>>>>> process. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Leonard > > > >>>>>>>>>>> > > > >>>>>>>>>>> [1] > > > >>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek < > > > >>>> d...@apache.org> > > > >>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> however listeners can use previous state > > > >> (tags/labels) > > > >>> to > > > >>>>>> make > > > >>>>>>>>>>> decisions > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> That sounds like a very fragile contract. We should > > > >>> either > > > >>>>>> allow > > > >>>>>>>>> passing > > > >>>>>>>>>>>> tags between listeners and then need to define ordering > > > >>> or > > > >>>> make > > > >>>>>>> all > > > >>>>>>>>> of > > > >>>>>>>>>>> them > > > >>>>>>>>>>>> independent. I prefer the latter because it allows us > > > >> to > > > >>>>>>>> parallelize > > > >>>>>>>>>>> things > > > >>>>>>>>>>>> if needed (if all listeners trigger an RCP to the > > > >>> external > > > >>>>>>> system, > > > >>>>>>>>> for > > > >>>>>>>>>>>> example). > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Can you expand on why we need more than one classifier > > > >> to > > > >>>> be > > > >>>>>> able > > > >>>>>>>> to > > > >>>>>>>>>>> output > > > >>>>>>>>>>>> the same tag? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> system ones come first and then the ones loaded from > > > >> the > > > >>>> plugin > > > >>>>>>>>> manager > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Since they're returned as a Set, the order is > > > >> completely > > > >>>>>>>>>>> non-deterministic, > > > >>>>>>>>>>>> no matter in which order they're loaded. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> just communicating with external monitoring/alerting > > > >>>> systems > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> That makes the need for pushing things out of the main > > > >>>> thread > > > >>>>>>> even > > > >>>>>>>>>>>> stronger. This almost sounds like we need to return a > > > >>>>>>>>> CompletableFuture > > > >>>>>>>>>>> for > > > >>>>>>>>>>>> the per-throwable classification because an external > > > >>> system > > > >>>>>> might > > > >>>>>>>>> take a > > > >>>>>>>>>>>> significant time to respond. We need to unblock the > > > >> main > > > >>>> thread > > > >>>>>>> for > > > >>>>>>>>> other > > > >>>>>>>>>>>> RPCs. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Also, in the proposal, this happens in the failure > > > >>>> handler. If > > > >>>>>>>>> that's the > > > >>>>>>>>>>>> case, this might block the job from being restarted (if > > > >>> the > > > >>>>>>> restart > > > >>>>>>>>>>>> strategy allows for another restart), which would be > > > >>> great > > > >>>> to > > > >>>>>>> avoid > > > >>>>>>>>>>> because > > > >>>>>>>>>>>> it can introduce extra downtime. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> This raises another question: what should happen if the > > > >>>>>>>>> classification > > > >>>>>>>>>>>> fails? Crashing the job (which is what's currently > > > >>>> proposed) > > > >>>>>>> seems > > > >>>>>>>>> very > > > >>>>>>>>>>>> dangerous if this might depend on an external system. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thats a valid point, passing the JobGraph containing > > > >> all > > > >>>> the > > > >>>>>>> above > > > >>>>>>>>>>>>> information is also something to consider > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> We should avoid passing JG around because it's mutable > > > >>>> (which > > > >>>>>> we > > > >>>>>>>>> must fix > > > >>>>>>>>>>>> in the long term), and letting users change it might > > > >> have > > > >>>>>>>>> consequences. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> D. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis > > > >> < > > > >>>>>>>>>>> pga...@apache.org> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hey David, Shammon, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for the valuable comments! > > > >>>>>>>>>>>>> I am glad you find this proposal useful, some > > > >> thoughts: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> @Shammon > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 1. How about adding more job information in > > > >>>>>>>>> FailureListenerContext? For > > > >>>>>>>>>>>>>> example, job vertext, subtask, taskmanager > > > >> location. > > > >>>> And > > > >>>>>> then > > > >>>>>>>>> user > > > >>>>>>>>>>> can > > > >>>>>>>>>>>> do > > > >>>>>>>>>>>>>> more statistics according to different dimensions. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing > > > >>> all > > > >>>> the > > > >>>>>>>> above > > > >>>>>>>>>>>>> information > > > >>>>>>>>>>>>> is also something to consider, I was mostly trying to > > > >>> be > > > >>>>>>>>> conservative: > > > >>>>>>>>>>>>> i.e., passingly only the information we need, and > > > >>> extend > > > >>>> as > > > >>>>>> we > > > >>>>>>>> see > > > >>>>>>>>> fit > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 2. Users may want to save results in listener, and > > > >> then > > > >>>> they > > > >>>>>>> can > > > >>>>>>>>> get > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>> historical results even jabmanager failover. Can we > > > >>>>>> provide a > > > >>>>>>>>> unified > > > >>>>>>>>>>>>>> implementation for data storage requirements? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> The idea is to store only the output of the Listeners > > > >>>> (tags) > > > >>>>>>> and > > > >>>>>>>>> treat > > > >>>>>>>>>>>> them > > > >>>>>>>>>>>>> as stateless. > > > >>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and > > > >> will > > > >>> be > > > >>>>>>>> available > > > >>>>>>>>>>>> through > > > >>>>>>>>>>>>> the HistoryServer > > > >>>>>>>>>>>>> even after a JM dies. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> @David > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 1) Should we also consider adding labels? The > > > >>>> combination of > > > >>>>>>> tags > > > >>>>>>>>> and > > > >>>>>>>>>>>>>> labels seems to be what most systems offer; > > > >>> sometimes, > > > >>>> they > > > >>>>>>>> offer > > > >>>>>>>>>>>> labels > > > >>>>>>>>>>>>>> only (key=value pairs) because tags can be > > > >>> implemented > > > >>>>>> using > > > >>>>>>>>> those, > > > >>>>>>>>>>> but > > > >>>>>>>>>>>>> not > > > >>>>>>>>>>>>>> the other way around. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Indeed changing tags to k:v labels could be more > > > >>>> expressive, > > > >>>>>> I > > > >>>>>>>>> like it! > > > >>>>>>>>>>>>> Let's see what others think. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined > > > >>> models > > > >>>>>>>>> ("listeners") > > > >>>>>>>>>>>> are > > > >>>>>>>>>>>>>> going to be, it would be great to keep the > > > >>>> interfaces/data > > > >>>>>>>>> structures > > > >>>>>>>>>>>>>> immutable so we can push things over to the I/O > > > >>>> threads. > > > >>>>>>> Also, > > > >>>>>>>> it > > > >>>>>>>>>>>> sounds > > > >>>>>>>>>>>>>> off to call the main interface a Listener since > > > >> it's > > > >>>>>> supposed > > > >>>>>>>> to > > > >>>>>>>>>>>> enhance > > > >>>>>>>>>>>>>> the original throwable with additional metadata. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> The idea was for the name to be generic as there > > > >> could > > > >>> be > > > >>>>>>>> Listener > > > >>>>>>>>>>>>> implementations > > > >>>>>>>>>>>>> just communicating with external monitoring/alerting > > > >>>> systems > > > >>>>>>> and > > > >>>>>>>> no > > > >>>>>>>>>>>>> metadata output > > > >>>>>>>>>>>>> -- but lets rethink that. For immutability, see > > > >> below: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3) You're proposing to support a set of listeners. > > > >>> Since > > > >>>>>> you're > > > >>>>>>>>> passing > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>> mutable context around, which includes tags set by > > > >>> the > > > >>>>>>> previous > > > >>>>>>>>>>>> listener, > > > >>>>>>>>>>>>>> do you expect users to make any assumptions about > > > >> the > > > >>>> order > > > >>>>>>> in > > > >>>>>>>>> which > > > >>>>>>>>>>>>>> listeners are executed? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> In the existing proposal we are not making any > > > >>>> assumptions > > > >>>>>>> about > > > >>>>>>>>> the > > > >>>>>>>>>>>> order > > > >>>>>>>>>>>>> of listeners, > > > >>>>>>>>>>>>> (system ones come first and then the ones loaded from > > > >>> the > > > >>>>>>> plugin > > > >>>>>>>>>>> manager) > > > >>>>>>>>>>>>> however listeners can use previous state > > > >> (tags/labels) > > > >>> to > > > >>>>>> make > > > >>>>>>>>>>> decisions: > > > >>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we have > > > >>>> already > > > >>>>>>> seen > > > >>>>>>>>> *USER > > > >>>>>>>>>>>> *or > > > >>>>>>>>>>>>> the other way around -- when we have seen *UNKNOWN* > > > >>>> remove in > > > >>>>>>>>> favor of > > > >>>>>>>>>>>>> *USER* > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Cheers, > > > >>>>>>>>>>>>> Panagiotis > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek < > > > >>>>>>> d...@apache.org> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi Panagiotis, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> This is an excellent proposal and something > > > >> everyone > > > >>>> trying > > > >>>>>>> to > > > >>>>>>>>>>> provide > > > >>>>>>>>>>>>>> "Flink as a service" needs to solve at some point. > > > >> I > > > >>>> have a > > > >>>>>>>>> couple of > > > >>>>>>>>>>>>>> questions: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> If I understand the proposal correctly, this is > > > >> just > > > >>>> about > > > >>>>>>>> adding > > > >>>>>>>>>>> tags > > > >>>>>>>>>>>> to > > > >>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable, > > > >>>>>>> FailureContext) > > > >>>>>>>>>>>> through a > > > >>>>>>>>>>>>>> user-defined model. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 1) Should we also consider adding labels? The > > > >>>> combination > > > >>>>>> of > > > >>>>>>>>> tags and > > > >>>>>>>>>>>>>> labels seems to be what most systems offer; > > > >>> sometimes, > > > >>>> they > > > >>>>>>>> offer > > > >>>>>>>>>>>> labels > > > >>>>>>>>>>>>>> only (key=value pairs) because tags can be > > > >>> implemented > > > >>>>>> using > > > >>>>>>>>> those, > > > >>>>>>>>>>> but > > > >>>>>>>>>>>>> not > > > >>>>>>>>>>>>>> the other way around. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined > > > >>>> models > > > >>>>>>>>>>> ("listeners") > > > >>>>>>>>>>>>> are > > > >>>>>>>>>>>>>> going to be, it would be great to keep the > > > >>>> interfaces/data > > > >>>>>>>>> structures > > > >>>>>>>>>>>>>> immutable so we can push things over to the I/O > > > >>>> threads. > > > >>>>>>> Also, > > > >>>>>>>> it > > > >>>>>>>>>>>> sounds > > > >>>>>>>>>>>>>> off to call the main interface a Listener since > > > >> it's > > > >>>>>> supposed > > > >>>>>>>> to > > > >>>>>>>>>>>> enhance > > > >>>>>>>>>>>>>> the original throwable with additional metadata. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I'd propose something along the lines of (we should > > > >>>> have > > > >>>>>>> better > > > >>>>>>>>>>> names, > > > >>>>>>>>>>>>> this > > > >>>>>>>>>>>>>> is just to outline the idea): > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> interface FailureEnricher { > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> ThrowableWithTagsAndLabels > > > >> enrichFailure(Throwable > > > >>>> cause, > > > >>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable > > > >>> context); > > > >>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> The names should change; this is just to outline > > > >> the > > > >>>> idea. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 3) You're proposing to support a set of listeners. > > > >>>> Since > > > >>>>>>> you're > > > >>>>>>>>>>> passing > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>> mutable context around, which includes tags set by > > > >>> the > > > >>>>>>> previous > > > >>>>>>>>>>>> listener, > > > >>>>>>>>>>>>>> do you expect users to make any assumptions about > > > >> the > > > >>>> order > > > >>>>>>> in > > > >>>>>>>>> which > > > >>>>>>>>>>>>>> listeners are executed? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> *@Shammon* > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Users may want to save results in listener, and > > > >> then > > > >>>> they > > > >>>>>> can > > > >>>>>>>>> get the > > > >>>>>>>>>>>>>>> historical results even jabmanager failover. Can > > > >> we > > > >>>>>>> provide a > > > >>>>>>>>>>> unified > > > >>>>>>>>>>>>>>> implementation for data storage requirements? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I think we should explicitly state that all > > > >>>> "listeners" are > > > >>>>>>>>> treated > > > >>>>>>>>>>> as > > > >>>>>>>>>>>>>> stateless. I don't see any strong reason for > > > >>>> snapshotting > > > >>>>>>> them. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>> D. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY < > > > >>>>>>> zjur...@gmail.com> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hi Panagiotis > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thank you for starting this discussion. I think > > > >>> this > > > >>>> FLIP > > > >>>>>>> is > > > >>>>>>>>>>> valuable > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>> can help user to analyze the causes of job > > > >> failover > > > >>>>>> better! > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I have two comments as follows > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 1. How about adding more job information in > > > >>>>>>>>> FailureListenerContext? > > > >>>>>>>>>>>> For > > > >>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager > > > >>> location. > > > >>>> And > > > >>>>>>> then > > > >>>>>>>>> user > > > >>>>>>>>>>>> can > > > >>>>>>>>>>>>> do > > > >>>>>>>>>>>>>>> more statistics according to different > > > >> dimensions. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 2. Users may want to save results in listener, > > > >> and > > > >>>> then > > > >>>>>>> they > > > >>>>>>>>> can > > > >>>>>>>>>>> get > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> historical results even jabmanager failover. Can > > > >> we > > > >>>>>>> provide a > > > >>>>>>>>>>> unified > > > >>>>>>>>>>>>>>> implementation for data storage requirements? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>> shammon FY > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis > > > >>> Garefalakis < > > > >>>>>>>>>>>> pga...@apache.org > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface > > > >> for > > > >>>>>> failure > > > >>>>>>>>> handling > > > >>>>>>>>>>>>>>> allowing > > > >>>>>>>>>>>>>>>> users to implement custom failure logic using > > > >> the > > > >>>>>> plugin > > > >>>>>>>>>>> framework. > > > >>>>>>>>>>>>>>>> Motivated by existing proposals [2] and tickets > > > >>>> [3], > > > >>>>>> this > > > >>>>>>>>> enables > > > >>>>>>>>>>>>>>> use-cases > > > >>>>>>>>>>>>>>>> like: assigning particular types to failures > > > >>> (e.g., > > > >>>>>> User > > > >>>>>>> or > > > >>>>>>>>>>>> System), > > > >>>>>>>>>>>>>>>> emitting custom metrics per type (e.g., > > > >>>> application or > > > >>>>>>>>> platform), > > > >>>>>>>>>>>>> even > > > >>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g., > > > >>>>>>> notification > > > >>>>>>>>>>>> systems). > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial > > > >> reviews > > > >>>> and > > > >>>>>>>>>>> discussions! > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> For anyone interested, the starting point would > > > >>> be > > > >>>> the > > > >>>>>>> FLIP > > > >>>>>>>>> [1] > > > >>>>>>>>>>>> that > > > >>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>> created, > > > >>>>>>>>>>>>>>>> describing the motivation and the proposed > > > >>> changes > > > >>>>>> (part > > > >>>>>>> of > > > >>>>>>>>> the > > > >>>>>>>>>>>> core, > > > >>>>>>>>>>>>>>>> runtime and web). > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> The intuition behind this FLIP is being able to > > > >>>> execute > > > >>>>>>>>> custom > > > >>>>>>>>>>>> logic > > > >>>>>>>>>>>>> on > > > >>>>>>>>>>>>>>>> failures by exposing a FailureListener > > > >> interface. > > > >>>>>>>>> Implementation > > > >>>>>>>>>>> by > > > >>>>>>>>>>>>>> users > > > >>>>>>>>>>>>>>>> can be simply loaded to the system as Jar > > > >> files. > > > >>>>>>>>> FailureListeners > > > >>>>>>>>>>>> may > > > >>>>>>>>>>>>>>> also > > > >>>>>>>>>>>>>>>> decide to assign failure tags to errors > > > >>> (expressed > > > >>>> as > > > >>>>>>>>> strings), > > > >>>>>>>>>>>>>>>> that will then be exposed as metadata by the > > > >>>> UI/Rest > > > >>>>>>>>> interfaces. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Feedback is always appreciated! Looking forward > > > >>> to > > > >>>> your > > > >>>>>>>>> thoughts! > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>>>>> > > > >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304% > > > >>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink > > > >>>>>>>>>>>>>>>> [2] > > > >>>>>>>>>>>>>>>> > > > >>>>>>>> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67 > > > >>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4 > > > >>>>>>>>>>>>>>>> [3] > > > >>>> https://issues.apache.org/jira/browse/FLINK-20833 > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Cheers, > > > >>>>>>>>>>>>>>>> Panagiotis > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > >