Hi everyone,

Thanks for the valuable comments!
Excited to see this is an area of interest for the community!

Summarizing some of the main points raised along with my thoughts:

   - Labels (Key/Value) pairs are more expressive than Tags (Strings) so
   using the former is a good idea — I am also debating if we want to return
   multiple KV pairs per Listener (one could argue that we could split the
   logic in multiple Listeners to support that)
   - An immutable context along with data returned using the interface
   method implementations is a better approach than a mutable Collection
   - Listener execution should be independent — however we need a way to
   enforce a Label key/key-prefix is only assigned to a single Listener,
   thinking of a validation step both at Listener init and runtime stages
   - We want to perform async Listener operations as sync could block the
   main thread — exposing an ioExecutor pool through the context could be an
   elegant solution here
   - Make sure Listener errors are not failing jobs — make sure to log and
   keep the job alive
   - We need better naming / public interface separation/description

        -  Even though custom restart strategies share some properties with
Listeners, they would probably need a separate interface with a different
return type anyway (restart strategy not labels) and in general they are
different and complex enough to justify their own FLIP (that can also be a
follow-up).


What do people think? I am planning to modify the FLIP to reflect these
changes if they make sense to everyone.

Cheers,
Panagiotis

On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh <hlteo...@gmail.com> wrote:

> Hi all,
>
> Thank you Panagiotis for proposing this. From the size of the thread, this
> is a much needed feature in Flink!
> Some thoughts, to extend those already adeptly summarised by Piotr,
> Matthias and Jing.
>
> - scope of FLIP: +1 to scoping this FLIP to observability around a
> restart. That would include adding metadata + exposing metadata to external
> systems. IMO, introducing a new restart strategy solves different problems,
> is much larger scope and should be covered in a separate FLIP.
>
> - failure handling: At the moment, we propose transitioning the Flink job
> to a terminal FAILED state when JobListener fails, when the job could have
> transitioned to RESTARTING->RUNNING. If we are keeping in line with the
> scope to add metadata/observability around job restarts, we should not be
> affecting the running of the Flink job itself. Could I propose we instead
> log WARN/ERROR.
>
> - immutable context: +1 to keeping the contract clear via return types.
> - async operation: +1 to adding ioexecutor to context, however, given we
> don’t want to block the actual job restart on adding metadata / calling
> external services, should we consider returning and letting futures
> complete independently?
>
> - independent vs ordered execution: Should we consider making the order of
> execution deterministic (use a List instead of Set)?
>
>
> Once again, thank you for working on this.
>
> Regards,
> Hong
>
>
> > On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID> wrote:
> >
> > Hi,
> >
> > Thanks Panagiotis for this FLIP and thanks for all valuable discussions.
> > I'd like to share my two cents:
> >
> > - FailureListenerContext#addTag and FailureListenerContext#getTags. It
> > seems that we have to call getTags() and then do remove activities if we
> > want to delete any tags (according to the javadoc in the FLIP).  It is
> > inconsistent for me too. Either offer addTag(), deleteTag(), and let
> > getTags() return immutable collection, or offer getTags() only to return
> > mutable collection.
> >
> > - label vs tag. Label is a great idea +1. AFAIC, tag could be a special
> > case of label, i.e. key="tag". It is convenient to offer the xxxTag()
> > method if the user only needs one label. I would love to have both of
> them.
> > Another thought is that tag implicitly contains the meaning of
> "immutable".
> >
> > - +1 for a separate FLIP of customized restart strategy. Attention should
> > be taken to make sure it works well with Flink built-in restartStrategy
> in
> > order to have the single source of truth.
> >
> > - execution order. The default independent execution should be fine.
> > According to the FailureListener interface definition in the FLIP, users
> > should be able to easily build a listener chain[1] to offer sequential
> > execution, e.g. public FailureListener(FailureListener nextListener).
> > Another option is to modify the interface or provide another interface
> > alongside the current one to extend the method to support ListenerChain,
> > i.e. void onFailure(Throwable cause, FailureListenerContext context,
> > ListenerChain listenerChain). Users can also mix them up.
> >
> > - naming. Afaiu, the pluggable extension is not limited to failure
> > enrichment. Conceptually it can do everything for the given failure, e.g.
> > start counting metric as the FLIP described, calling an external system,
> > sending notification to slack channel, etc. you name it. It sounds to me
> > more like a FailureActionListener - it can trigger actions based on
> > failure. Failure enrichment is one type of action.
> >
> > Best regards,
> > Jing
> >
> > [1] https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
> >
> > On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl
> > <matthias.p...@aiven.io.invalid> wrote:
> >
> >> Thanks for the proposal, Panagiotis. A lot of good points have been
> already
> >> shared. I just want to add my view on some of the items:
> >>
> >> - independent execution vs ordered execution: I prefer the listeners
> being
> >> processed independently from each other because it adds less complexity
> >> code-wise. The use case Piotr described (where you want to reuse some
> other
> >> classifier) is the only one I can think of where we actually need
> >> classifiers depending on each other. Supporting such a use case right
> from
> >> the start feels a bit over-engineered and could be covered in a
> follow-up
> >> FLIP if we really come to that point where such a feature is requested
> by
> >> users.
> >>
> >> - key/value pairs instead of plain labels: I think that's a good idea.
> >> key/value pairs are more expressive. +1
> >>
> >> - extending the FLIP to cover restart strategy: I understand Gen's
> concern
> >> about introducing too many different types of plugins. But I would still
> >> favor not extending the FLIP in this regard. A pluggable restart
> strategy
> >> sounds reasonable. But an error classifier and a restart strategy are
> still
> >> different enough to justify separate plugins, IMHO. And therefore, I
> would
> >> think that covering the restart strategy in a separate FLIP is the
> better
> >> option for the sake of simplicity.
> >>
> >> - immutable context: Passing in an immutable context and returning data
> >> through the interface method's return value sounds like a better
> approach
> >> to harden the contract of the interface. +1 for that proposal
> >>
> >> - async operation: I think David is right. An async interface makes the
> >> listener implementations more robust when it comes to heavy IO
> operations.
> >> The ioExecutor can be passed through the context object. +1
> >>
> >> Matthias
> >>
> >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek <david.mora...@gmail.com>
> >> wrote:
> >>
> >>> *@Piotr*
> >>>
> >>>
> >>>> I was thinking about actually defining the order of the
> >>>> classifiers/handlers and not allowing them to be asynchronous.
> >>>> Asynchronousity would create some problems: when to actually return
> the
> >>>> error to the user? After all async responses will get back? Before,
> but
> >>>> without classified exception? It would also add implementation
> >> complexity
> >>>> and I think we can always expand the API with async version in the
> >> future
> >>>> if needed.
> >>>
> >>>
> >>> As long as the classifiers need to talk to an external system, we by
> >>> definition need to allow them to be asynchronous to unblock the main
> >> thread
> >>> for handling other RPCs. Exposing ioExecutor via the context proposed
> >> above
> >>> would be great.
> >>>
> >>> After all async responses will get back
> >>>
> >>>
> >>> This would be the same if we trigger them synchronously one by one,
> with
> >> a
> >>> caveat that synchronous execution might take significantly longer and
> >>> introduce unnecessary downtime to a job.
> >>>
> >>> D.
> >>>
> >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com> wrote:
> >>>
> >>>> Hi Piotr,
> >>>>
> >>>> It's fine to me to have a separate FLIP to extend this
> >> `FailureListener`
> >>>> to support custom restart strategy.
> >>>>
> >>>> What I was a bit concerned is that if we just treat the
> >> `FailureListener`
> >>>> as an error classifier which is not crucial to Flink framework
> process,
> >>>> we may design it to run asynchronously and not trigger Flink failures.
> >>>> This may be a blocker if later we want to enable it to support custom
> >>>> restart strategy.
> >>>>
> >>>> Thanks,
> >>>> Zhu
> >>>>
> >>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道:
> >>>>>
> >>>>> Hi Panagiotis,
> >>>>>
> >>>>> Thanks for the proposal. This is a very valuable feature and will be
> >> a
> >>>> good
> >>>>> add-on for Flink.
> >>>>>
> >>>>> I also think that it will be great if we can consider how to make it
> >>>>> possible for users to customize the failure handling in this FLIP.
> >> It's
> >>>>> highly related to the problem we want to address in this FLIP and
> >> could
> >>>>> avoid refactoring the interfaces proposed in this FLIP too quickly.
> >>>>>
> >>>>> Currently it treats all kinds of exceptions the same. However, some
> >>> kinds
> >>>>> of exceptions are actually not recoverable at all. It could let users
> >>> to
> >>>>> customize the failure handling logic to fail fast for certain known
> >>>>> unrecoverable exceptions and finally make these kinds of jobs get
> >>> noticed
> >>>>> and recoveried more quickly.
> >>>>>
> >>>>> Regards,
> >>>>> Dian
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo <luogen...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Panagiotis,
> >>>>>>
> >>>>>> Thanks for the proposal.
> >>>>>>
> >>>>>> It's useful to enrich the information so that users can be more
> >>>>>> clear why the job is failing, especially platform developers who
> >>>>>> need to provide the information to their end users.
> >>>>>> And for the very FLIP, I'd prefer the naming `FailureEnricher`
> >>>>>> proposed by David, as the plugin doesn't really handle the failure.
> >>>>>>
> >>>>>> However, like Zhu and Lijie said, I also joined a discussion
> >>>>>> recently about customized failure handling, e.g. counting the
> >>>>>> failure rate of pipeline regions separately, and failing the job
> >>>>>> when a specific error occurs, and so on.
> >>>>>> I suppose a custom restart strategy, or I'd call it a custom
> >>>>>> failure "handler", is indeed necessary. It can also enrich the
> >>>>>> information as the current proposed handler does.
> >>>>>>
> >>>>>> To avoid adding too many plugin interfaces which may confuse users
> >>>>>> and make the ExecutionFailureHandler more complex,
> >>>>>> I think it'd be better to consider the requirements at the same
> >> time.
> >>>>>>
> >>>>>> IMO, we can add a handler interface, then make the current restart
> >>>>>> strategy and the enricher both types of the handler. The handlers
> >>>>>> execute in sequence, and the failure is considered unrecoverable if
> >>>>>> any of the handlers decides.
> >>>>>> In this way, users can also implement a handler using the enriched
> >>>>>> information provided by the previous handlers, e.g. fail the job
> >> and
> >>>>>> send a notification if too many failures are caused by the end
> >> users.
> >>>>>>
> >>>>>> Best,
> >>>>>> Gen
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu <huweihua....@gmail.com
> >>>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi Panagiotis,
> >>>>>>>
> >>>>>>> Thanks for your proposal. It is valuable to analyze the reason
> >> for
> >>>>>>> failure with the user plug-in.
> >>>>>>>
> >>>>>>> Making the context immutable could make the contract stronger.
> >>>>>>> Letting the listener return an enriching result may be a better
> >>> way.
> >>>>>>>
> >>>>>>> IIUC, listeners could do two things, enrich more information
> >>>>>> (tags/labels)
> >>>>>>> to FailureHandlingResult, and push data out of Flink (metrics or
> >>>>>>> something).
> >>>>>>> IMO, we could split these two types into Listener and Advisor
> >>> (maybe
> >>>>>>> other names). The Listener just pushes the data out and returns
> >>>> nothing
> >>>>>> to
> >>>>>>> Flink, so we can run these async and don't have to wait for
> >>>> Listener's
> >>>>>>> result.
> >>>>>>> The Advisor returns rich information to the FailureHadingResult,
> >>>> and it
> >>>>>>> should
> >>>>>>> have a lighter logic.
> >>>>>>>
> >>>>>>>
> >>>>>>> Supporting a custom restart strategy is also valuable. In this
> >>>> design, we
> >>>>>>> use
> >>>>>>> RestartStrategy to construct a FailureHandingResult, and then
> >> pass
> >>>> it to
> >>>>>>> Listener.
> >>>>>>> My question is, should we change the restart strategy interface
> >> to
> >>>>>> support
> >>>>>>> the
> >>>>>>> custom restart strategy, or keep the current restart strategy and
> >>>> let the
> >>>>>>> later
> >>>>>>> Listener enrich the restartable information to
> >>> FailureHandingResult?
> >>>> The
> >>>>>>> latter
> >>>>>>> may cause some confusion when we use a custom restart strategy.
> >>>>>>> The default flink restart strategy also runs but does not take
> >>>> effect.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Weihua
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang <
> >>>> wangdachui9...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Panagiotis,
> >>>>>>>>
> >>>>>>>> Thanks for driving this.
> >>>>>>>>
> >>>>>>>> +1 for supporting custom restart strategy, we did receive such
> >>>> requests
> >>>>>>>> from the user mailing list [1][2].
> >>>>>>>>
> >>>>>>>> Besides, in current design, the plugin will only do some
> >>>> statistical
> >>>>>> and
> >>>>>>>> classification work, and will not affect the
> >>>> *FailureHandlingResult*.
> >>>>>>> Just
> >>>>>>>> listening, no handling, it doesn't quite match the title.
> >>>>>>>>
> >>>>>>>> [1]
> >>>> https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
> >>>>>>>> [2]
> >>>> https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Lijie
> >>>>>>>>
> >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道:
> >>>>>>>>
> >>>>>>>>> Hi Panagiotis,
> >>>>>>>>>
> >>>>>>>>> Thanks for creating this proposal! It's good to enable Flink
> >> to
> >>>>>> handle
> >>>>>>>>> different errors in different ways, through a pluggable way.
> >>>>>>>>>
> >>>>>>>>> There are requests for flexible restart strategies from time
> >> to
> >>>> time,
> >>>>>>> for
> >>>>>>>>> different strategies of restart backoff time, or to suppress
> >>>>>> restarting
> >>>>>>>>> on certain errors. Therefore, I think it's better that the
> >>>> proposed
> >>>>>>>>> failure handling plugin can also support custom restart
> >>>> strategies.
> >>>>>>>>>
> >>>>>>>>> Maybe we can call it FailureHandlingAdvisor which provides
> >> more
> >>>>>>>>> information (labels) and gives advice (restart backoff time,
> >>>> whether
> >>>>>>>>> to restart)? I do not have a strong opinion though, any
> >>>> explanatory
> >>>>>>>>> name would be good.
> >>>>>>>>>
> >>>>>>>>> To avoid unexpected mutation, how about to make the context
> >>>> immutable
> >>>>>>>>> and let the plugin return an immutable result? i.e. remove
> >> the
> >>>>>> setters
> >>>>>>>>> from the context, and let the plugin method return a result
> >>> which
> >>>>>>>>> contains `labels`, `canRestart` and `restartBackoffTime`.
> >> Flink
> >>>>>> should
> >>>>>>>>> apply the result to the context before invoking the next
> >>> plugin,
> >>>> so
> >>>>>>>>> that the next plugin will see the updated context.
> >>>>>>>>>
> >>>>>>>>> The plugin should avoid taking too much time to return the
> >>>> result,
> >>>>>>>> because
> >>>>>>>>> it will block the RPC and result in instability. However, it
> >>> can
> >>>>>> still
> >>>>>>>>> perform heavy actions in a different thread. The context can
> >>>> provide
> >>>>>> an
> >>>>>>>>> `ioExecutor` to the plugins for reuse.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Zhu
> >>>>>>>>>
> >>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道:
> >>>>>>>>>>
> >>>>>>>>>> Hi Panagiotis
> >>>>>>>>>>
> >>>>>>>>>> Thank you for your answer. I agree that `FailureListener`
> >>>> could be
> >>>>>>>>>> stateless, then I have some thoughts as follows
> >>>>>>>>>>
> >>>>>>>>>> 1. I see that listeners and tag collections are associated.
> >>>> When
> >>>>>>>>> JobManager
> >>>>>>>>>> fails and restarts, how can the new listener be associated
> >>>> with the
> >>>>>>> tag
> >>>>>>>>>> collection before failover? Is the listener loading order?
> >>>>>>>>>>
> >>>>>>>>>> 2. The tag collection may be too large, resulting in the
> >>>> JobManager
> >>>>>>>> OOM,
> >>>>>>>>> do
> >>>>>>>>>> we need to provide a management class that supports some
> >>>>>> obsolescence
> >>>>>>>>>> strategies instead of a direct Collection?
> >>>>>>>>>>
> >>>>>>>>>> 3. Is it possible to provide a more complex data structure
> >>>> than a
> >>>>>>>> simple
> >>>>>>>>>> string collection for tags in listeners, such as key-value?
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Shammon FY
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <
> >>> xbjt...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,Panagiotis
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you for kicking off this discussion. Overall, the
> >>>> proposed
> >>>>>>>>> feature of
> >>>>>>>>>>> this FLIP makes sense to me. We have also discussed
> >> similar
> >>>>>>>>> requirements
> >>>>>>>>>>> with our users and developers, and I believe it will help
> >>>> many
> >>>>>>> users.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> In terms of FLIP content, I have some thoughts:
> >>>>>>>>>>>
> >>>>>>>>>>> (1) For the FailureListenerContextget interface, the
> >>> methods
> >>>>>>>>>>> FailureListenerContext#addTag and
> >>>> FailureListenerContextgetTags
> >>>>>>> looks
> >>>>>>>>> very
> >>>>>>>>>>> inconsistent because they imply specific implementation
> >>>> details,
> >>>>>>> and
> >>>>>>>>> not
> >>>>>>>>>>> all FailureListeners need to handle them, we shouldn't
> >> put
> >>>> them
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>> interface. Minor: The comment "UDF loading" in the
> >>>>>>>> getUserClassLoader()
> >>>>>>>>>>> method looks like a typo, IIUC it should return the
> >>>> classloader
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>>> current job.
> >>>>>>>>>>>
> >>>>>>>>>>> (2) Regarding the implementation in
> >>>>>>>>> ExecutionFailureHandler#handleFailure,
> >>>>>>>>>>> some custom listeners may have heavy IO operations, such
> >> as
> >>>>>>> reporting
> >>>>>>>>> to
> >>>>>>>>>>> their monitoring system. The current logic appears to be
> >>>>>> processing
> >>>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>> JobMaster's main thread, and it is recommended not to do
> >>> this
> >>>>>> kind
> >>>>>>> of
> >>>>>>>>>>> processing in the main thread.
> >>>>>>>>>>>
> >>>>>>>>>>> (3) The results of FailureListener's processing and the
> >>>>>>>>>>> FailureHandlingResult returned by ExecutionFailureHandler
> >>>> are not
> >>>>>>>>> related.
> >>>>>>>>>>> I think these two are closely related, the motivation of
> >>> this
> >>>>>> FLIP
> >>>>>>> is
> >>>>>>>>> to
> >>>>>>>>>>> make current failure handling more flexible. From this
> >>>>>> perspective,
> >>>>>>>>>>> different listeners should have the opportunity to affect
> >>> the
> >>>>>> job's
> >>>>>>>>> failure
> >>>>>>>>>>> handling flow. For example, a Flink job is configured
> >> with
> >>> a
> >>>>>>>>>>> RestartStrategy with huge numbers retry , but the Kafka
> >>>> topic of
> >>>>>>>>> Source has
> >>>>>>>>>>> been deleted, the job will failover continuously. In this
> >>>> case,
> >>>>>> the
> >>>>>>>>> user
> >>>>>>>>>>> should have their listener to determine whether this
> >>> failure
> >>>> is
> >>>>>>>>> recoverable
> >>>>>>>>>>> or unrecoverable, and then wrap the processing result
> >> into
> >>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass it to
> >>>> JobMaster,
> >>>>>>>> this
> >>>>>>>>>>> approach will be more flexible.
> >>>>>>>>>>>
> >>>>>>>>>>> (4) All FLIPs have an important section named Public
> >>>> Interfaces.
> >>>>>>>>> Current
> >>>>>>>>>>> FLIP mixes the interface section and the implementation
> >>>> section
> >>>>>>>>> together.
> >>>>>>>>>>> It is better for us to refer to the FLIP template[1] and
> >>>> separate
> >>>>>>>> them,
> >>>>>>>>>>> this will make the entire FLIP clearer.
> >>>>>>>>>>>
> >>>>>>>>>>> In addition, regarding the FLIP process, there is a small
> >>>>>>> suggestion:
> >>>>>>>>> The
> >>>>>>>>>>> community generally creates a JIRA issue after the FLIP
> >>> vote
> >>>> is
> >>>>>>>> passed,
> >>>>>>>>>>> instead of during the FLIP preparation phase because the
> >>>> FLIP may
> >>>>>>> be
> >>>>>>>>>>> rejected. Although this FLIP is very reasonable, it's
> >>> better
> >>>> to
> >>>>>>>> follow
> >>>>>>>>> the
> >>>>>>>>>>> process.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>>
> >>>>>>>>>>> Leonard
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek <
> >>>> d...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> however listeners can use previous state
> >> (tags/labels)
> >>> to
> >>>>>> make
> >>>>>>>>>>> decisions
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> That sounds like a very fragile contract. We should
> >>> either
> >>>>>> allow
> >>>>>>>>> passing
> >>>>>>>>>>>> tags between listeners and then need to define ordering
> >>> or
> >>>> make
> >>>>>>> all
> >>>>>>>>> of
> >>>>>>>>>>> them
> >>>>>>>>>>>> independent. I prefer the latter because it allows us
> >> to
> >>>>>>>> parallelize
> >>>>>>>>>>> things
> >>>>>>>>>>>> if needed (if all listeners trigger an RCP to the
> >>> external
> >>>>>>> system,
> >>>>>>>>> for
> >>>>>>>>>>>> example).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Can you expand on why we need more than one classifier
> >> to
> >>>> be
> >>>>>> able
> >>>>>>>> to
> >>>>>>>>>>> output
> >>>>>>>>>>>> the same tag?
> >>>>>>>>>>>>
> >>>>>>>>>>>> system ones come first and then the ones loaded from
> >> the
> >>>> plugin
> >>>>>>>>> manager
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Since they're returned as a Set, the order is
> >> completely
> >>>>>>>>>>> non-deterministic,
> >>>>>>>>>>>> no matter in which order they're loaded.
> >>>>>>>>>>>>
> >>>>>>>>>>>> just communicating with external monitoring/alerting
> >>>> systems
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> That makes the need for pushing things out of the main
> >>>> thread
> >>>>>>> even
> >>>>>>>>>>>> stronger. This almost sounds like we need to return a
> >>>>>>>>> CompletableFuture
> >>>>>>>>>>> for
> >>>>>>>>>>>> the per-throwable classification because an external
> >>> system
> >>>>>> might
> >>>>>>>>> take a
> >>>>>>>>>>>> significant time to respond. We need to unblock the
> >> main
> >>>> thread
> >>>>>>> for
> >>>>>>>>> other
> >>>>>>>>>>>> RPCs.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also, in the proposal, this happens in the failure
> >>>> handler. If
> >>>>>>>>> that's the
> >>>>>>>>>>>> case, this might block the job from being restarted (if
> >>> the
> >>>>>>> restart
> >>>>>>>>>>>> strategy allows for another restart), which would be
> >>> great
> >>>> to
> >>>>>>> avoid
> >>>>>>>>>>> because
> >>>>>>>>>>>> it can introduce extra downtime.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This raises another question: what should happen if the
> >>>>>>>>> classification
> >>>>>>>>>>>> fails? Crashing the job (which is what's currently
> >>>> proposed)
> >>>>>>> seems
> >>>>>>>>> very
> >>>>>>>>>>>> dangerous if this might depend on an external system.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thats a valid point, passing the JobGraph containing
> >> all
> >>>> the
> >>>>>>> above
> >>>>>>>>>>>>> information is also something to consider
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> We should avoid passing JG around because it's mutable
> >>>> (which
> >>>>>> we
> >>>>>>>>> must fix
> >>>>>>>>>>>> in the long term), and letting users change it might
> >> have
> >>>>>>>>> consequences.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> D.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis
> >> <
> >>>>>>>>>>> pga...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hey David, Shammon,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the valuable comments!
> >>>>>>>>>>>>> I am glad you find this proposal useful, some
> >> thoughts:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Shammon
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. How about adding more job information in
> >>>>>>>>> FailureListenerContext? For
> >>>>>>>>>>>>>> example, job vertext, subtask, taskmanager
> >> location.
> >>>> And
> >>>>>> then
> >>>>>>>>> user
> >>>>>>>>>>> can
> >>>>>>>>>>>> do
> >>>>>>>>>>>>>> more statistics according to different dimensions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing
> >>> all
> >>>> the
> >>>>>>>> above
> >>>>>>>>>>>>> information
> >>>>>>>>>>>>> is also something to consider, I was mostly trying to
> >>> be
> >>>>>>>>> conservative:
> >>>>>>>>>>>>> i.e., passingly only the information we need, and
> >>> extend
> >>>> as
> >>>>>> we
> >>>>>>>> see
> >>>>>>>>> fit
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. Users may want to save results in listener, and
> >> then
> >>>> they
> >>>>>>> can
> >>>>>>>>> get
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> historical results even jabmanager failover. Can we
> >>>>>> provide a
> >>>>>>>>> unified
> >>>>>>>>>>>>>> implementation for data storage requirements?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The idea is to store only the output of the Listeners
> >>>> (tags)
> >>>>>>> and
> >>>>>>>>> treat
> >>>>>>>>>>>> them
> >>>>>>>>>>>>> as stateless.
> >>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and
> >> will
> >>> be
> >>>>>>>> available
> >>>>>>>>>>>> through
> >>>>>>>>>>>>> the HistoryServer
> >>>>>>>>>>>>> even after a JM dies.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @David
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) Should we also consider adding labels? The
> >>>> combination of
> >>>>>>> tags
> >>>>>>>>> and
> >>>>>>>>>>>>>> labels seems to be what most systems offer;
> >>> sometimes,
> >>>> they
> >>>>>>>> offer
> >>>>>>>>>>>> labels
> >>>>>>>>>>>>>> only (key=value pairs) because tags can be
> >>> implemented
> >>>>>> using
> >>>>>>>>> those,
> >>>>>>>>>>> but
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>> the other way around.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Indeed changing tags to k:v labels could be more
> >>>> expressive,
> >>>>>> I
> >>>>>>>>> like it!
> >>>>>>>>>>>>> Let's see what others think.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined
> >>> models
> >>>>>>>>> ("listeners")
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>> going to be, it would be great to keep the
> >>>> interfaces/data
> >>>>>>>>> structures
> >>>>>>>>>>>>>> immutable so we can push things over to the I/O
> >>>> threads.
> >>>>>>> Also,
> >>>>>>>> it
> >>>>>>>>>>>> sounds
> >>>>>>>>>>>>>> off to call the main interface a Listener since
> >> it's
> >>>>>> supposed
> >>>>>>>> to
> >>>>>>>>>>>> enhance
> >>>>>>>>>>>>>> the original throwable with additional metadata.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The idea was for the name to be generic as there
> >> could
> >>> be
> >>>>>>>> Listener
> >>>>>>>>>>>>> implementations
> >>>>>>>>>>>>> just communicating with external monitoring/alerting
> >>>> systems
> >>>>>>> and
> >>>>>>>> no
> >>>>>>>>>>>>> metadata output
> >>>>>>>>>>>>> -- but lets rethink that. For immutability, see
> >> below:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) You're proposing to support a set of listeners.
> >>> Since
> >>>>>> you're
> >>>>>>>>> passing
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> mutable context around, which includes tags set by
> >>> the
> >>>>>>> previous
> >>>>>>>>>>>> listener,
> >>>>>>>>>>>>>> do you expect users to make any assumptions about
> >> the
> >>>> order
> >>>>>>> in
> >>>>>>>>> which
> >>>>>>>>>>>>>> listeners are executed?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In the existing proposal we are not making any
> >>>> assumptions
> >>>>>>> about
> >>>>>>>>> the
> >>>>>>>>>>>> order
> >>>>>>>>>>>>> of listeners,
> >>>>>>>>>>>>> (system ones come first and then the ones loaded from
> >>> the
> >>>>>>> plugin
> >>>>>>>>>>> manager)
> >>>>>>>>>>>>> however listeners can use previous state
> >> (tags/labels)
> >>> to
> >>>>>> make
> >>>>>>>>>>> decisions:
> >>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we have
> >>>> already
> >>>>>>> seen
> >>>>>>>>> *USER
> >>>>>>>>>>>> *or
> >>>>>>>>>>>>> the other way around -- when we have seen *UNKNOWN*
> >>>> remove in
> >>>>>>>>> favor of
> >>>>>>>>>>>>> *USER*
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Panagiotis
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek <
> >>>>>>> d...@apache.org>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Panagiotis,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This is an excellent proposal and something
> >> everyone
> >>>> trying
> >>>>>>> to
> >>>>>>>>>>> provide
> >>>>>>>>>>>>>> "Flink as a service" needs to solve at some point.
> >> I
> >>>> have a
> >>>>>>>>> couple of
> >>>>>>>>>>>>>> questions:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If I understand the proposal correctly, this is
> >> just
> >>>> about
> >>>>>>>> adding
> >>>>>>>>>>> tags
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable,
> >>>>>>> FailureContext)
> >>>>>>>>>>>> through a
> >>>>>>>>>>>>>> user-defined model.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) Should we also consider adding labels? The
> >>>> combination
> >>>>>> of
> >>>>>>>>> tags and
> >>>>>>>>>>>>>> labels seems to be what most systems offer;
> >>> sometimes,
> >>>> they
> >>>>>>>> offer
> >>>>>>>>>>>> labels
> >>>>>>>>>>>>>> only (key=value pairs) because tags can be
> >>> implemented
> >>>>>> using
> >>>>>>>>> those,
> >>>>>>>>>>> but
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>> the other way around.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined
> >>>> models
> >>>>>>>>>>> ("listeners")
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>> going to be, it would be great to keep the
> >>>> interfaces/data
> >>>>>>>>> structures
> >>>>>>>>>>>>>> immutable so we can push things over to the I/O
> >>>> threads.
> >>>>>>> Also,
> >>>>>>>> it
> >>>>>>>>>>>> sounds
> >>>>>>>>>>>>>> off to call the main interface a Listener since
> >> it's
> >>>>>> supposed
> >>>>>>>> to
> >>>>>>>>>>>> enhance
> >>>>>>>>>>>>>> the original throwable with additional metadata.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'd propose something along the lines of (we should
> >>>> have
> >>>>>>> better
> >>>>>>>>>>> names,
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>> is just to outline the idea):
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> interface FailureEnricher {
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>  ThrowableWithTagsAndLabels
> >> enrichFailure(Throwable
> >>>> cause,
> >>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable
> >>> context);
> >>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The names should change; this is just to outline
> >> the
> >>>> idea.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3) You're proposing to support a set of listeners.
> >>>> Since
> >>>>>>> you're
> >>>>>>>>>>> passing
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> mutable context around, which includes tags set by
> >>> the
> >>>>>>> previous
> >>>>>>>>>>>> listener,
> >>>>>>>>>>>>>> do you expect users to make any assumptions about
> >> the
> >>>> order
> >>>>>>> in
> >>>>>>>>> which
> >>>>>>>>>>>>>> listeners are executed?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *@Shammon*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Users may want to save results in listener, and
> >> then
> >>>> they
> >>>>>> can
> >>>>>>>>> get the
> >>>>>>>>>>>>>>> historical results even jabmanager failover. Can
> >> we
> >>>>>>> provide a
> >>>>>>>>>>> unified
> >>>>>>>>>>>>>>> implementation for data storage requirements?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think we should explicitly state that all
> >>>> "listeners" are
> >>>>>>>>> treated
> >>>>>>>>>>> as
> >>>>>>>>>>>>>> stateless. I don't see any strong reason for
> >>>> snapshotting
> >>>>>>> them.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> D.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <
> >>>>>>> zjur...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Panagiotis
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thank you for starting this discussion. I think
> >>> this
> >>>> FLIP
> >>>>>>> is
> >>>>>>>>>>> valuable
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> can help user to analyze the causes of job
> >> failover
> >>>>>> better!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have two comments as follows
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. How about adding more job information in
> >>>>>>>>> FailureListenerContext?
> >>>>>>>>>>>> For
> >>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager
> >>> location.
> >>>> And
> >>>>>>> then
> >>>>>>>>> user
> >>>>>>>>>>>> can
> >>>>>>>>>>>>> do
> >>>>>>>>>>>>>>> more statistics according to different
> >> dimensions.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2. Users may want to save results in listener,
> >> and
> >>>> then
> >>>>>>> they
> >>>>>>>>> can
> >>>>>>>>>>> get
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> historical results even jabmanager failover. Can
> >> we
> >>>>>>> provide a
> >>>>>>>>>>> unified
> >>>>>>>>>>>>>>> implementation for data storage requirements?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> shammon FY
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis
> >>> Garefalakis <
> >>>>>>>>>>>> pga...@apache.org
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface
> >> for
> >>>>>> failure
> >>>>>>>>> handling
> >>>>>>>>>>>>>>> allowing
> >>>>>>>>>>>>>>>> users to implement custom failure logic using
> >> the
> >>>>>> plugin
> >>>>>>>>>>> framework.
> >>>>>>>>>>>>>>>> Motivated by existing proposals [2] and tickets
> >>>> [3],
> >>>>>> this
> >>>>>>>>> enables
> >>>>>>>>>>>>>>> use-cases
> >>>>>>>>>>>>>>>> like: assigning particular types to failures
> >>> (e.g.,
> >>>>>> User
> >>>>>>> or
> >>>>>>>>>>>> System),
> >>>>>>>>>>>>>>>> emitting custom metrics per type (e.g.,
> >>>> application or
> >>>>>>>>> platform),
> >>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g.,
> >>>>>>> notification
> >>>>>>>>>>>> systems).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial
> >> reviews
> >>>> and
> >>>>>>>>>>> discussions!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For anyone interested, the starting point would
> >>> be
> >>>> the
> >>>>>>> FLIP
> >>>>>>>>> [1]
> >>>>>>>>>>>> that
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> created,
> >>>>>>>>>>>>>>>> describing the motivation and the proposed
> >>> changes
> >>>>>> (part
> >>>>>>> of
> >>>>>>>>> the
> >>>>>>>>>>>> core,
> >>>>>>>>>>>>>>>> runtime and web).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The intuition behind this FLIP is being able to
> >>>> execute
> >>>>>>>>> custom
> >>>>>>>>>>>> logic
> >>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>> failures by exposing a FailureListener
> >> interface.
> >>>>>>>>> Implementation
> >>>>>>>>>>> by
> >>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>> can be simply loaded to the system as Jar
> >> files.
> >>>>>>>>> FailureListeners
> >>>>>>>>>>>> may
> >>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>> decide to assign failure tags to errors
> >>> (expressed
> >>>> as
> >>>>>>>>> strings),
> >>>>>>>>>>>>>>>> that will then be exposed as metadata by the
> >>>> UI/Rest
> >>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Feedback is always appreciated! Looking forward
> >>> to
> >>>> your
> >>>>>>>>> thoughts!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%
> >>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink
> >>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>
> >>>>>>>> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67
> >>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4
> >>>>>>>>>>>>>>>> [3]
> >>>> https://issues.apache.org/jira/browse/FLINK-20833
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>> Panagiotis
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to