Hi all,

Thank you Panagiotis for proposing this. From the size of the thread, this is a 
much needed feature in Flink!
Some thoughts, to extend those already adeptly summarised by Piotr, Matthias 
and Jing.

- scope of FLIP: +1 to scoping this FLIP to observability around a restart. 
That would include adding metadata + exposing metadata to external systems. 
IMO, introducing a new restart strategy solves different problems, is much 
larger scope and should be covered in a separate FLIP.

- failure handling: At the moment, we propose transitioning the Flink job to a 
terminal FAILED state when JobListener fails, when the job could have 
transitioned to RESTARTING->RUNNING. If we are keeping in line with the scope 
to add metadata/observability around job restarts, we should not be affecting 
the running of the Flink job itself. Could I propose we instead log WARN/ERROR.

- immutable context: +1 to keeping the contract clear via return types. 
- async operation: +1 to adding ioexecutor to context, however, given we don’t 
want to block the actual job restart on adding metadata / calling external 
services, should we consider returning and letting futures complete 
independently?

- independent vs ordered execution: Should we consider making the order of 
execution deterministic (use a List instead of Set)?


Once again, thank you for working on this.

Regards,
Hong


> On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID> wrote:
> 
> Hi,
> 
> Thanks Panagiotis for this FLIP and thanks for all valuable discussions.
> I'd like to share my two cents:
> 
> - FailureListenerContext#addTag and FailureListenerContext#getTags. It
> seems that we have to call getTags() and then do remove activities if we
> want to delete any tags (according to the javadoc in the FLIP).  It is
> inconsistent for me too. Either offer addTag(), deleteTag(), and let
> getTags() return immutable collection, or offer getTags() only to return
> mutable collection.
> 
> - label vs tag. Label is a great idea +1. AFAIC, tag could be a special
> case of label, i.e. key="tag". It is convenient to offer the xxxTag()
> method if the user only needs one label. I would love to have both of them.
> Another thought is that tag implicitly contains the meaning of "immutable".
> 
> - +1 for a separate FLIP of customized restart strategy. Attention should
> be taken to make sure it works well with Flink built-in restartStrategy in
> order to have the single source of truth.
> 
> - execution order. The default independent execution should be fine.
> According to the FailureListener interface definition in the FLIP, users
> should be able to easily build a listener chain[1] to offer sequential
> execution, e.g. public FailureListener(FailureListener nextListener).
> Another option is to modify the interface or provide another interface
> alongside the current one to extend the method to support ListenerChain,
> i.e. void onFailure(Throwable cause, FailureListenerContext context,
> ListenerChain listenerChain). Users can also mix them up.
> 
> - naming. Afaiu, the pluggable extension is not limited to failure
> enrichment. Conceptually it can do everything for the given failure, e.g.
> start counting metric as the FLIP described, calling an external system,
> sending notification to slack channel, etc. you name it. It sounds to me
> more like a FailureActionListener - it can trigger actions based on
> failure. Failure enrichment is one type of action.
> 
> Best regards,
> Jing
> 
> [1] https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
> 
> On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl
> <matthias.p...@aiven.io.invalid> wrote:
> 
>> Thanks for the proposal, Panagiotis. A lot of good points have been already
>> shared. I just want to add my view on some of the items:
>> 
>> - independent execution vs ordered execution: I prefer the listeners being
>> processed independently from each other because it adds less complexity
>> code-wise. The use case Piotr described (where you want to reuse some other
>> classifier) is the only one I can think of where we actually need
>> classifiers depending on each other. Supporting such a use case right from
>> the start feels a bit over-engineered and could be covered in a follow-up
>> FLIP if we really come to that point where such a feature is requested by
>> users.
>> 
>> - key/value pairs instead of plain labels: I think that's a good idea.
>> key/value pairs are more expressive. +1
>> 
>> - extending the FLIP to cover restart strategy: I understand Gen's concern
>> about introducing too many different types of plugins. But I would still
>> favor not extending the FLIP in this regard. A pluggable restart strategy
>> sounds reasonable. But an error classifier and a restart strategy are still
>> different enough to justify separate plugins, IMHO. And therefore, I would
>> think that covering the restart strategy in a separate FLIP is the better
>> option for the sake of simplicity.
>> 
>> - immutable context: Passing in an immutable context and returning data
>> through the interface method's return value sounds like a better approach
>> to harden the contract of the interface. +1 for that proposal
>> 
>> - async operation: I think David is right. An async interface makes the
>> listener implementations more robust when it comes to heavy IO operations.
>> The ioExecutor can be passed through the context object. +1
>> 
>> Matthias
>> 
>> On Tue, Mar 21, 2023 at 2:09 PM David Morávek <david.mora...@gmail.com>
>> wrote:
>> 
>>> *@Piotr*
>>> 
>>> 
>>>> I was thinking about actually defining the order of the
>>>> classifiers/handlers and not allowing them to be asynchronous.
>>>> Asynchronousity would create some problems: when to actually return the
>>>> error to the user? After all async responses will get back? Before, but
>>>> without classified exception? It would also add implementation
>> complexity
>>>> and I think we can always expand the API with async version in the
>> future
>>>> if needed.
>>> 
>>> 
>>> As long as the classifiers need to talk to an external system, we by
>>> definition need to allow them to be asynchronous to unblock the main
>> thread
>>> for handling other RPCs. Exposing ioExecutor via the context proposed
>> above
>>> would be great.
>>> 
>>> After all async responses will get back
>>> 
>>> 
>>> This would be the same if we trigger them synchronously one by one, with
>> a
>>> caveat that synchronous execution might take significantly longer and
>>> introduce unnecessary downtime to a job.
>>> 
>>> D.
>>> 
>>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com> wrote:
>>> 
>>>> Hi Piotr,
>>>> 
>>>> It's fine to me to have a separate FLIP to extend this
>> `FailureListener`
>>>> to support custom restart strategy.
>>>> 
>>>> What I was a bit concerned is that if we just treat the
>> `FailureListener`
>>>> as an error classifier which is not crucial to Flink framework process,
>>>> we may design it to run asynchronously and not trigger Flink failures.
>>>> This may be a blocker if later we want to enable it to support custom
>>>> restart strategy.
>>>> 
>>>> Thanks,
>>>> Zhu
>>>> 
>>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道:
>>>>> 
>>>>> Hi Panagiotis,
>>>>> 
>>>>> Thanks for the proposal. This is a very valuable feature and will be
>> a
>>>> good
>>>>> add-on for Flink.
>>>>> 
>>>>> I also think that it will be great if we can consider how to make it
>>>>> possible for users to customize the failure handling in this FLIP.
>> It's
>>>>> highly related to the problem we want to address in this FLIP and
>> could
>>>>> avoid refactoring the interfaces proposed in this FLIP too quickly.
>>>>> 
>>>>> Currently it treats all kinds of exceptions the same. However, some
>>> kinds
>>>>> of exceptions are actually not recoverable at all. It could let users
>>> to
>>>>> customize the failure handling logic to fail fast for certain known
>>>>> unrecoverable exceptions and finally make these kinds of jobs get
>>> noticed
>>>>> and recoveried more quickly.
>>>>> 
>>>>> Regards,
>>>>> Dian
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo <luogen...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Panagiotis,
>>>>>> 
>>>>>> Thanks for the proposal.
>>>>>> 
>>>>>> It's useful to enrich the information so that users can be more
>>>>>> clear why the job is failing, especially platform developers who
>>>>>> need to provide the information to their end users.
>>>>>> And for the very FLIP, I'd prefer the naming `FailureEnricher`
>>>>>> proposed by David, as the plugin doesn't really handle the failure.
>>>>>> 
>>>>>> However, like Zhu and Lijie said, I also joined a discussion
>>>>>> recently about customized failure handling, e.g. counting the
>>>>>> failure rate of pipeline regions separately, and failing the job
>>>>>> when a specific error occurs, and so on.
>>>>>> I suppose a custom restart strategy, or I'd call it a custom
>>>>>> failure "handler", is indeed necessary. It can also enrich the
>>>>>> information as the current proposed handler does.
>>>>>> 
>>>>>> To avoid adding too many plugin interfaces which may confuse users
>>>>>> and make the ExecutionFailureHandler more complex,
>>>>>> I think it'd be better to consider the requirements at the same
>> time.
>>>>>> 
>>>>>> IMO, we can add a handler interface, then make the current restart
>>>>>> strategy and the enricher both types of the handler. The handlers
>>>>>> execute in sequence, and the failure is considered unrecoverable if
>>>>>> any of the handlers decides.
>>>>>> In this way, users can also implement a handler using the enriched
>>>>>> information provided by the previous handlers, e.g. fail the job
>> and
>>>>>> send a notification if too many failures are caused by the end
>> users.
>>>>>> 
>>>>>> Best,
>>>>>> Gen
>>>>>> 
>>>>>> 
>>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu <huweihua....@gmail.com
>>> 
>>>> wrote:
>>>>>> 
>>>>>>> Hi Panagiotis,
>>>>>>> 
>>>>>>> Thanks for your proposal. It is valuable to analyze the reason
>> for
>>>>>>> failure with the user plug-in.
>>>>>>> 
>>>>>>> Making the context immutable could make the contract stronger.
>>>>>>> Letting the listener return an enriching result may be a better
>>> way.
>>>>>>> 
>>>>>>> IIUC, listeners could do two things, enrich more information
>>>>>> (tags/labels)
>>>>>>> to FailureHandlingResult, and push data out of Flink (metrics or
>>>>>>> something).
>>>>>>> IMO, we could split these two types into Listener and Advisor
>>> (maybe
>>>>>>> other names). The Listener just pushes the data out and returns
>>>> nothing
>>>>>> to
>>>>>>> Flink, so we can run these async and don't have to wait for
>>>> Listener's
>>>>>>> result.
>>>>>>> The Advisor returns rich information to the FailureHadingResult,
>>>> and it
>>>>>>> should
>>>>>>> have a lighter logic.
>>>>>>> 
>>>>>>> 
>>>>>>> Supporting a custom restart strategy is also valuable. In this
>>>> design, we
>>>>>>> use
>>>>>>> RestartStrategy to construct a FailureHandingResult, and then
>> pass
>>>> it to
>>>>>>> Listener.
>>>>>>> My question is, should we change the restart strategy interface
>> to
>>>>>> support
>>>>>>> the
>>>>>>> custom restart strategy, or keep the current restart strategy and
>>>> let the
>>>>>>> later
>>>>>>> Listener enrich the restartable information to
>>> FailureHandingResult?
>>>> The
>>>>>>> latter
>>>>>>> may cause some confusion when we use a custom restart strategy.
>>>>>>> The default flink restart strategy also runs but does not take
>>>> effect.
>>>>>>> 
>>>>>>> 
>>>>>>> Best,
>>>>>>> Weihua
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang <
>>>> wangdachui9...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Panagiotis,
>>>>>>>> 
>>>>>>>> Thanks for driving this.
>>>>>>>> 
>>>>>>>> +1 for supporting custom restart strategy, we did receive such
>>>> requests
>>>>>>>> from the user mailing list [1][2].
>>>>>>>> 
>>>>>>>> Besides, in current design, the plugin will only do some
>>>> statistical
>>>>>> and
>>>>>>>> classification work, and will not affect the
>>>> *FailureHandlingResult*.
>>>>>>> Just
>>>>>>>> listening, no handling, it doesn't quite match the title.
>>>>>>>> 
>>>>>>>> [1]
>>>> https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
>>>>>>>> [2]
>>>> https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Lijie
>>>>>>>> 
>>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道:
>>>>>>>> 
>>>>>>>>> Hi Panagiotis,
>>>>>>>>> 
>>>>>>>>> Thanks for creating this proposal! It's good to enable Flink
>> to
>>>>>> handle
>>>>>>>>> different errors in different ways, through a pluggable way.
>>>>>>>>> 
>>>>>>>>> There are requests for flexible restart strategies from time
>> to
>>>> time,
>>>>>>> for
>>>>>>>>> different strategies of restart backoff time, or to suppress
>>>>>> restarting
>>>>>>>>> on certain errors. Therefore, I think it's better that the
>>>> proposed
>>>>>>>>> failure handling plugin can also support custom restart
>>>> strategies.
>>>>>>>>> 
>>>>>>>>> Maybe we can call it FailureHandlingAdvisor which provides
>> more
>>>>>>>>> information (labels) and gives advice (restart backoff time,
>>>> whether
>>>>>>>>> to restart)? I do not have a strong opinion though, any
>>>> explanatory
>>>>>>>>> name would be good.
>>>>>>>>> 
>>>>>>>>> To avoid unexpected mutation, how about to make the context
>>>> immutable
>>>>>>>>> and let the plugin return an immutable result? i.e. remove
>> the
>>>>>> setters
>>>>>>>>> from the context, and let the plugin method return a result
>>> which
>>>>>>>>> contains `labels`, `canRestart` and `restartBackoffTime`.
>> Flink
>>>>>> should
>>>>>>>>> apply the result to the context before invoking the next
>>> plugin,
>>>> so
>>>>>>>>> that the next plugin will see the updated context.
>>>>>>>>> 
>>>>>>>>> The plugin should avoid taking too much time to return the
>>>> result,
>>>>>>>> because
>>>>>>>>> it will block the RPC and result in instability. However, it
>>> can
>>>>>> still
>>>>>>>>> perform heavy actions in a different thread. The context can
>>>> provide
>>>>>> an
>>>>>>>>> `ioExecutor` to the plugins for reuse.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Zhu
>>>>>>>>> 
>>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道:
>>>>>>>>>> 
>>>>>>>>>> Hi Panagiotis
>>>>>>>>>> 
>>>>>>>>>> Thank you for your answer. I agree that `FailureListener`
>>>> could be
>>>>>>>>>> stateless, then I have some thoughts as follows
>>>>>>>>>> 
>>>>>>>>>> 1. I see that listeners and tag collections are associated.
>>>> When
>>>>>>>>> JobManager
>>>>>>>>>> fails and restarts, how can the new listener be associated
>>>> with the
>>>>>>> tag
>>>>>>>>>> collection before failover? Is the listener loading order?
>>>>>>>>>> 
>>>>>>>>>> 2. The tag collection may be too large, resulting in the
>>>> JobManager
>>>>>>>> OOM,
>>>>>>>>> do
>>>>>>>>>> we need to provide a management class that supports some
>>>>>> obsolescence
>>>>>>>>>> strategies instead of a direct Collection?
>>>>>>>>>> 
>>>>>>>>>> 3. Is it possible to provide a more complex data structure
>>>> than a
>>>>>>>> simple
>>>>>>>>>> string collection for tags in listeners, such as key-value?
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Shammon FY
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <
>>> xbjt...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi,Panagiotis
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Thank you for kicking off this discussion. Overall, the
>>>> proposed
>>>>>>>>> feature of
>>>>>>>>>>> this FLIP makes sense to me. We have also discussed
>> similar
>>>>>>>>> requirements
>>>>>>>>>>> with our users and developers, and I believe it will help
>>>> many
>>>>>>> users.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> In terms of FLIP content, I have some thoughts:
>>>>>>>>>>> 
>>>>>>>>>>> (1) For the FailureListenerContextget interface, the
>>> methods
>>>>>>>>>>> FailureListenerContext#addTag and
>>>> FailureListenerContextgetTags
>>>>>>> looks
>>>>>>>>> very
>>>>>>>>>>> inconsistent because they imply specific implementation
>>>> details,
>>>>>>> and
>>>>>>>>> not
>>>>>>>>>>> all FailureListeners need to handle them, we shouldn't
>> put
>>>> them
>>>>>> in
>>>>>>>> the
>>>>>>>>>>> interface. Minor: The comment "UDF loading" in the
>>>>>>>> getUserClassLoader()
>>>>>>>>>>> method looks like a typo, IIUC it should return the
>>>> classloader
>>>>>> of
>>>>>>>> the
>>>>>>>>>>> current job.
>>>>>>>>>>> 
>>>>>>>>>>> (2) Regarding the implementation in
>>>>>>>>> ExecutionFailureHandler#handleFailure,
>>>>>>>>>>> some custom listeners may have heavy IO operations, such
>> as
>>>>>>> reporting
>>>>>>>>> to
>>>>>>>>>>> their monitoring system. The current logic appears to be
>>>>>> processing
>>>>>>>> in
>>>>>>>>> the
>>>>>>>>>>> JobMaster's main thread, and it is recommended not to do
>>> this
>>>>>> kind
>>>>>>> of
>>>>>>>>>>> processing in the main thread.
>>>>>>>>>>> 
>>>>>>>>>>> (3) The results of FailureListener's processing and the
>>>>>>>>>>> FailureHandlingResult returned by ExecutionFailureHandler
>>>> are not
>>>>>>>>> related.
>>>>>>>>>>> I think these two are closely related, the motivation of
>>> this
>>>>>> FLIP
>>>>>>> is
>>>>>>>>> to
>>>>>>>>>>> make current failure handling more flexible. From this
>>>>>> perspective,
>>>>>>>>>>> different listeners should have the opportunity to affect
>>> the
>>>>>> job's
>>>>>>>>> failure
>>>>>>>>>>> handling flow. For example, a Flink job is configured
>> with
>>> a
>>>>>>>>>>> RestartStrategy with huge numbers retry , but the Kafka
>>>> topic of
>>>>>>>>> Source has
>>>>>>>>>>> been deleted, the job will failover continuously. In this
>>>> case,
>>>>>> the
>>>>>>>>> user
>>>>>>>>>>> should have their listener to determine whether this
>>> failure
>>>> is
>>>>>>>>> recoverable
>>>>>>>>>>> or unrecoverable, and then wrap the processing result
>> into
>>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass it to
>>>> JobMaster,
>>>>>>>> this
>>>>>>>>>>> approach will be more flexible.
>>>>>>>>>>> 
>>>>>>>>>>> (4) All FLIPs have an important section named Public
>>>> Interfaces.
>>>>>>>>> Current
>>>>>>>>>>> FLIP mixes the interface section and the implementation
>>>> section
>>>>>>>>> together.
>>>>>>>>>>> It is better for us to refer to the FLIP template[1] and
>>>> separate
>>>>>>>> them,
>>>>>>>>>>> this will make the entire FLIP clearer.
>>>>>>>>>>> 
>>>>>>>>>>> In addition, regarding the FLIP process, there is a small
>>>>>>> suggestion:
>>>>>>>>> The
>>>>>>>>>>> community generally creates a JIRA issue after the FLIP
>>> vote
>>>> is
>>>>>>>> passed,
>>>>>>>>>>> instead of during the FLIP preparation phase because the
>>>> FLIP may
>>>>>>> be
>>>>>>>>>>> rejected. Although this FLIP is very reasonable, it's
>>> better
>>>> to
>>>>>>>> follow
>>>>>>>>> the
>>>>>>>>>>> process.
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> 
>>>>>>>>>>> Leonard
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek <
>>>> d...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> however listeners can use previous state
>> (tags/labels)
>>> to
>>>>>> make
>>>>>>>>>>> decisions
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> That sounds like a very fragile contract. We should
>>> either
>>>>>> allow
>>>>>>>>> passing
>>>>>>>>>>>> tags between listeners and then need to define ordering
>>> or
>>>> make
>>>>>>> all
>>>>>>>>> of
>>>>>>>>>>> them
>>>>>>>>>>>> independent. I prefer the latter because it allows us
>> to
>>>>>>>> parallelize
>>>>>>>>>>> things
>>>>>>>>>>>> if needed (if all listeners trigger an RCP to the
>>> external
>>>>>>> system,
>>>>>>>>> for
>>>>>>>>>>>> example).
>>>>>>>>>>>> 
>>>>>>>>>>>> Can you expand on why we need more than one classifier
>> to
>>>> be
>>>>>> able
>>>>>>>> to
>>>>>>>>>>> output
>>>>>>>>>>>> the same tag?
>>>>>>>>>>>> 
>>>>>>>>>>>> system ones come first and then the ones loaded from
>> the
>>>> plugin
>>>>>>>>> manager
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Since they're returned as a Set, the order is
>> completely
>>>>>>>>>>> non-deterministic,
>>>>>>>>>>>> no matter in which order they're loaded.
>>>>>>>>>>>> 
>>>>>>>>>>>> just communicating with external monitoring/alerting
>>>> systems
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> That makes the need for pushing things out of the main
>>>> thread
>>>>>>> even
>>>>>>>>>>>> stronger. This almost sounds like we need to return a
>>>>>>>>> CompletableFuture
>>>>>>>>>>> for
>>>>>>>>>>>> the per-throwable classification because an external
>>> system
>>>>>> might
>>>>>>>>> take a
>>>>>>>>>>>> significant time to respond. We need to unblock the
>> main
>>>> thread
>>>>>>> for
>>>>>>>>> other
>>>>>>>>>>>> RPCs.
>>>>>>>>>>>> 
>>>>>>>>>>>> Also, in the proposal, this happens in the failure
>>>> handler. If
>>>>>>>>> that's the
>>>>>>>>>>>> case, this might block the job from being restarted (if
>>> the
>>>>>>> restart
>>>>>>>>>>>> strategy allows for another restart), which would be
>>> great
>>>> to
>>>>>>> avoid
>>>>>>>>>>> because
>>>>>>>>>>>> it can introduce extra downtime.
>>>>>>>>>>>> 
>>>>>>>>>>>> This raises another question: what should happen if the
>>>>>>>>> classification
>>>>>>>>>>>> fails? Crashing the job (which is what's currently
>>>> proposed)
>>>>>>> seems
>>>>>>>>> very
>>>>>>>>>>>> dangerous if this might depend on an external system.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing
>> all
>>>> the
>>>>>>> above
>>>>>>>>>>>>> information is also something to consider
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> We should avoid passing JG around because it's mutable
>>>> (which
>>>>>> we
>>>>>>>>> must fix
>>>>>>>>>>>> in the long term), and letting users change it might
>> have
>>>>>>>>> consequences.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> D.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis
>> <
>>>>>>>>>>> pga...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hey David, Shammon,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the valuable comments!
>>>>>>>>>>>>> I am glad you find this proposal useful, some
>> thoughts:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> @Shammon
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1. How about adding more job information in
>>>>>>>>> FailureListenerContext? For
>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager
>> location.
>>>> And
>>>>>> then
>>>>>>>>> user
>>>>>>>>>>> can
>>>>>>>>>>>> do
>>>>>>>>>>>>>> more statistics according to different dimensions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing
>>> all
>>>> the
>>>>>>>> above
>>>>>>>>>>>>> information
>>>>>>>>>>>>> is also something to consider, I was mostly trying to
>>> be
>>>>>>>>> conservative:
>>>>>>>>>>>>> i.e., passingly only the information we need, and
>>> extend
>>>> as
>>>>>> we
>>>>>>>> see
>>>>>>>>> fit
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2. Users may want to save results in listener, and
>> then
>>>> they
>>>>>>> can
>>>>>>>>> get
>>>>>>>>>>> the
>>>>>>>>>>>>>> historical results even jabmanager failover. Can we
>>>>>> provide a
>>>>>>>>> unified
>>>>>>>>>>>>>> implementation for data storage requirements?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The idea is to store only the output of the Listeners
>>>> (tags)
>>>>>>> and
>>>>>>>>> treat
>>>>>>>>>>>> them
>>>>>>>>>>>>> as stateless.
>>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and
>> will
>>> be
>>>>>>>> available
>>>>>>>>>>>> through
>>>>>>>>>>>>> the HistoryServer
>>>>>>>>>>>>> even after a JM dies.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> @David
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1) Should we also consider adding labels? The
>>>> combination of
>>>>>>> tags
>>>>>>>>> and
>>>>>>>>>>>>>> labels seems to be what most systems offer;
>>> sometimes,
>>>> they
>>>>>>>> offer
>>>>>>>>>>>> labels
>>>>>>>>>>>>>> only (key=value pairs) because tags can be
>>> implemented
>>>>>> using
>>>>>>>>> those,
>>>>>>>>>>> but
>>>>>>>>>>>>> not
>>>>>>>>>>>>>> the other way around.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Indeed changing tags to k:v labels could be more
>>>> expressive,
>>>>>> I
>>>>>>>>> like it!
>>>>>>>>>>>>> Let's see what others think.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined
>>> models
>>>>>>>>> ("listeners")
>>>>>>>>>>>> are
>>>>>>>>>>>>>> going to be, it would be great to keep the
>>>> interfaces/data
>>>>>>>>> structures
>>>>>>>>>>>>>> immutable so we can push things over to the I/O
>>>> threads.
>>>>>>> Also,
>>>>>>>> it
>>>>>>>>>>>> sounds
>>>>>>>>>>>>>> off to call the main interface a Listener since
>> it's
>>>>>> supposed
>>>>>>>> to
>>>>>>>>>>>> enhance
>>>>>>>>>>>>>> the original throwable with additional metadata.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The idea was for the name to be generic as there
>> could
>>> be
>>>>>>>> Listener
>>>>>>>>>>>>> implementations
>>>>>>>>>>>>> just communicating with external monitoring/alerting
>>>> systems
>>>>>>> and
>>>>>>>> no
>>>>>>>>>>>>> metadata output
>>>>>>>>>>>>> -- but lets rethink that. For immutability, see
>> below:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 3) You're proposing to support a set of listeners.
>>> Since
>>>>>> you're
>>>>>>>>> passing
>>>>>>>>>>>> the
>>>>>>>>>>>>>> mutable context around, which includes tags set by
>>> the
>>>>>>> previous
>>>>>>>>>>>> listener,
>>>>>>>>>>>>>> do you expect users to make any assumptions about
>> the
>>>> order
>>>>>>> in
>>>>>>>>> which
>>>>>>>>>>>>>> listeners are executed?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> In the existing proposal we are not making any
>>>> assumptions
>>>>>>> about
>>>>>>>>> the
>>>>>>>>>>>> order
>>>>>>>>>>>>> of listeners,
>>>>>>>>>>>>> (system ones come first and then the ones loaded from
>>> the
>>>>>>> plugin
>>>>>>>>>>> manager)
>>>>>>>>>>>>> however listeners can use previous state
>> (tags/labels)
>>> to
>>>>>> make
>>>>>>>>>>> decisions:
>>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we have
>>>> already
>>>>>>> seen
>>>>>>>>> *USER
>>>>>>>>>>>> *or
>>>>>>>>>>>>> the other way around -- when we have seen *UNKNOWN*
>>>> remove in
>>>>>>>>> favor of
>>>>>>>>>>>>> *USER*
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Panagiotis
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek <
>>>>>>> d...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Panagiotis,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This is an excellent proposal and something
>> everyone
>>>> trying
>>>>>>> to
>>>>>>>>>>> provide
>>>>>>>>>>>>>> "Flink as a service" needs to solve at some point.
>> I
>>>> have a
>>>>>>>>> couple of
>>>>>>>>>>>>>> questions:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If I understand the proposal correctly, this is
>> just
>>>> about
>>>>>>>> adding
>>>>>>>>>>> tags
>>>>>>>>>>>> to
>>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable,
>>>>>>> FailureContext)
>>>>>>>>>>>> through a
>>>>>>>>>>>>>> user-defined model.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1) Should we also consider adding labels? The
>>>> combination
>>>>>> of
>>>>>>>>> tags and
>>>>>>>>>>>>>> labels seems to be what most systems offer;
>>> sometimes,
>>>> they
>>>>>>>> offer
>>>>>>>>>>>> labels
>>>>>>>>>>>>>> only (key=value pairs) because tags can be
>>> implemented
>>>>>> using
>>>>>>>>> those,
>>>>>>>>>>> but
>>>>>>>>>>>>> not
>>>>>>>>>>>>>> the other way around.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined
>>>> models
>>>>>>>>>>> ("listeners")
>>>>>>>>>>>>> are
>>>>>>>>>>>>>> going to be, it would be great to keep the
>>>> interfaces/data
>>>>>>>>> structures
>>>>>>>>>>>>>> immutable so we can push things over to the I/O
>>>> threads.
>>>>>>> Also,
>>>>>>>> it
>>>>>>>>>>>> sounds
>>>>>>>>>>>>>> off to call the main interface a Listener since
>> it's
>>>>>> supposed
>>>>>>>> to
>>>>>>>>>>>> enhance
>>>>>>>>>>>>>> the original throwable with additional metadata.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I'd propose something along the lines of (we should
>>>> have
>>>>>>> better
>>>>>>>>>>> names,
>>>>>>>>>>>>> this
>>>>>>>>>>>>>> is just to outline the idea):
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> interface FailureEnricher {
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>  ThrowableWithTagsAndLabels
>> enrichFailure(Throwable
>>>> cause,
>>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable
>>> context);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The names should change; this is just to outline
>> the
>>>> idea.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 3) You're proposing to support a set of listeners.
>>>> Since
>>>>>>> you're
>>>>>>>>>>> passing
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> mutable context around, which includes tags set by
>>> the
>>>>>>> previous
>>>>>>>>>>>> listener,
>>>>>>>>>>>>>> do you expect users to make any assumptions about
>> the
>>>> order
>>>>>>> in
>>>>>>>>> which
>>>>>>>>>>>>>> listeners are executed?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> *@Shammon*
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Users may want to save results in listener, and
>> then
>>>> they
>>>>>> can
>>>>>>>>> get the
>>>>>>>>>>>>>>> historical results even jabmanager failover. Can
>> we
>>>>>>> provide a
>>>>>>>>>>> unified
>>>>>>>>>>>>>>> implementation for data storage requirements?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I think we should explicitly state that all
>>>> "listeners" are
>>>>>>>>> treated
>>>>>>>>>>> as
>>>>>>>>>>>>>> stateless. I don't see any strong reason for
>>>> snapshotting
>>>>>>> them.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> D.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <
>>>>>>> zjur...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Panagiotis
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thank you for starting this discussion. I think
>>> this
>>>> FLIP
>>>>>>> is
>>>>>>>>>>> valuable
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> can help user to analyze the causes of job
>> failover
>>>>>> better!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I have two comments as follows
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1. How about adding more job information in
>>>>>>>>> FailureListenerContext?
>>>>>>>>>>>> For
>>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager
>>> location.
>>>> And
>>>>>>> then
>>>>>>>>> user
>>>>>>>>>>>> can
>>>>>>>>>>>>> do
>>>>>>>>>>>>>>> more statistics according to different
>> dimensions.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2. Users may want to save results in listener,
>> and
>>>> then
>>>>>>> they
>>>>>>>>> can
>>>>>>>>>>> get
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> historical results even jabmanager failover. Can
>> we
>>>>>>> provide a
>>>>>>>>>>> unified
>>>>>>>>>>>>>>> implementation for data storage requirements?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> shammon FY
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis
>>> Garefalakis <
>>>>>>>>>>>> pga...@apache.org
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface
>> for
>>>>>> failure
>>>>>>>>> handling
>>>>>>>>>>>>>>> allowing
>>>>>>>>>>>>>>>> users to implement custom failure logic using
>> the
>>>>>> plugin
>>>>>>>>>>> framework.
>>>>>>>>>>>>>>>> Motivated by existing proposals [2] and tickets
>>>> [3],
>>>>>> this
>>>>>>>>> enables
>>>>>>>>>>>>>>> use-cases
>>>>>>>>>>>>>>>> like: assigning particular types to failures
>>> (e.g.,
>>>>>> User
>>>>>>> or
>>>>>>>>>>>> System),
>>>>>>>>>>>>>>>> emitting custom metrics per type (e.g.,
>>>> application or
>>>>>>>>> platform),
>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g.,
>>>>>>> notification
>>>>>>>>>>>> systems).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial
>> reviews
>>>> and
>>>>>>>>>>> discussions!
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> For anyone interested, the starting point would
>>> be
>>>> the
>>>>>>> FLIP
>>>>>>>>> [1]
>>>>>>>>>>>> that
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> created,
>>>>>>>>>>>>>>>> describing the motivation and the proposed
>>> changes
>>>>>> (part
>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>>> core,
>>>>>>>>>>>>>>>> runtime and web).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> The intuition behind this FLIP is being able to
>>>> execute
>>>>>>>>> custom
>>>>>>>>>>>> logic
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> failures by exposing a FailureListener
>> interface.
>>>>>>>>> Implementation
>>>>>>>>>>> by
>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>> can be simply loaded to the system as Jar
>> files.
>>>>>>>>> FailureListeners
>>>>>>>>>>>> may
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>> decide to assign failure tags to errors
>>> (expressed
>>>> as
>>>>>>>>> strings),
>>>>>>>>>>>>>>>> that will then be exposed as metadata by the
>>>> UI/Rest
>>>>>>>>> interfaces.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Feedback is always appreciated! Looking forward
>>> to
>>>> your
>>>>>>>>> thoughts!
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>> 
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%
>>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink
>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>> 
>>>>>>>> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67
>>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4
>>>>>>>>>>>>>>>> [3]
>>>> https://issues.apache.org/jira/browse/FLINK-20833
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Panagiotis
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> 

Reply via email to