Thanks for the proposal, Lijie.

This is an interesting feature and discussion, and somewhat related to the
design principle about how people should operate Flink.

I think there are three things involved in this FLIP.
     a) Detect and report the unstable node.
     b) Collect the information of the unstable node and form a blocklist.
     c) Take the action to block nodes.

My two cents:

1. It looks like people all agree that Flink should have c). It is not only
useful for cases of node failures, but also handy for some planned
maintenance.

2. People have different opinions on b), i.e. who should be the brain to
make the decision to block a node. I think this largely depends on who we
talk to. Different users would probably give different answers. For people
who do have a centralized node health management service, let Flink do just
do a) and c) would be preferred. So essentially Flink would be one of the
sources that may detect unstable nodes, report it to that service, and then
take the command from that service to block the problematic nodes. On the
other hand, for users who do not have such a service, simply letting Flink
be clever by itself to block the suspicious nodes might be desired to
ensure the jobs are running smoothly.

So that indicates a) and b) here should be pluggable / optional.

In light of this, maybe it would make sense to have something pluggable
like a UnstableNodeReporter which exposes unstable nodes actively. (A more
general interface should be JobInfoReporter<T> which can be used to report
any information of type <T>. But I'll just keep the scope relevant to this
FLIP here). Personally speaking, I think it is OK to have a default
implementation of a reporter which just tells Flink to take action to block
problematic nodes and also unblocks them after timeout.

Thanks,

Jiangjie (Becket) Qin


On Mon, May 2, 2022 at 3:27 PM Роман Бойко <ro.v.bo...@gmail.com> wrote:

> Thanks for good initiative, Lijie and Zhu!
>
> If it's possible I'd like to participate in development.
>
> I agree with 3rd point of Konstantin's reply - we should consider to move
> somehow the information of blocklisted nodes/TMs from active
> ResourceManager to non-active ones. Probably storing inside
> Zookeeper/Configmap might be helpful here.
>
> And I agree with Martijn that a lot of organizations don't want to expose
> such API for a cluster user group. But I think it's necessary to have the
> mechanism for unblocking the nodes/TMs anyway for avoiding incorrect
> automatic behaviour.
>
> And another one small suggestion - I think it would be better to extend the
> *BlocklistedItem* class with the *endTimestamp* field and fill it at the
> item creation. This simple addition will allow to:
>
>    -
>
>    Provide the ability to users to setup the exact time of blocklist end
>    through RestAPI
>    -
>
>    Not being tied to a single value of
>    *cluster.resource-blacklist.item.timeout*
>
>
> On Mon, 2 May 2022 at 14:17, Chesnay Schepler <ches...@apache.org> wrote:
>
> > I do share the concern between blurring the lines a bit.
> >
> > That said, I'd prefer to not have any auto-detection and only have an
> > opt-in mechanism
> > to manually block processes/nodes. To me this sounds yet again like one
> > of those
> > magical mechanisms that will rarely work just right.
> > An external system can leverage way more information after all.
> >
> > Moreover, I'm quite concerned about the complexity of this proposal.
> > Tracking on both the RM/JM side; syncing between components; adjustments
> > to the
> > slot and resource protocol.
> >
> > In a way it seems overly complicated.
> >
> > If we look at it purely from an active resource management perspective,
> > then there
> > isn't really a need to touch the slot protocol at all (or in fact to
> > anything in the JobMaster),
> > because there isn't any point in keeping around blocked TMs in the first
> > place.
> > They'd just be idling, potentially shutting down after a while by the RM
> > because of
> > it (unless we _also_ touch that logic).
> > Here the blocking of a process (be it by blocking the process or node) is
> > equivalent with shutting down the blocked process(es).
> > Once the block is lifted we can just spin it back up.
> >
> > And I do wonder whether we couldn't apply the same line of thinking to
> > standalone resource management.
> > Here being able to stop/restart a process/node manually should be a core
> > requirement for a Flink deployment anyway.
> >
> >
> > On 02/05/2022 08:49, Martijn Visser wrote:
> > > Hi everyone,
> > >
> > > Thanks for creating this FLIP. I can understand the problem and I see
> > value
> > > in the automatic detection and blocklisting. I do have some concerns
> with
> > > the ability to manually specify to be blocked resources. I have two
> > > concerns;
> > >
> > > * Most organizations explicitly have a separation of concerns, meaning
> > that
> > > there's a group who's responsible for managing a cluster and there's a
> > user
> > > group who uses that cluster. With the introduction of this mechanism,
> the
> > > latter group now can influence the responsibility of the first group.
> So
> > it
> > > can be possible that someone from the user group blocks something,
> which
> > > causes an outage (which could result in paging mechanism triggering
> etc)
> > > which impacts the first group.
> > > * How big is the group of people who can go through the process of
> > manually
> > > identifying a node that isn't behaving as it should be? I do think this
> > > group is relatively limited. Does it then make sense to introduce such
> a
> > > feature, which would only be used by a really small user group of
> Flink?
> > We
> > > still have to maintain, test and support such a feature.
> > >
> > > I'm +1 for the autodetection features, but I'm leaning towards not
> > exposing
> > > this to the user group but having this available strictly for cluster
> > > operators. They could then also set up their paging/metrics/logging
> > system
> > > to take this into account.
> > >
> > > Best regards,
> > >
> > > Martijn Visser
> > > https://twitter.com/MartijnVisser82
> > > https://github.com/MartijnVisser
> > >
> > >
> > > On Fri, 29 Apr 2022 at 09:39, Yangze Guo <karma...@gmail.com> wrote:
> > >
> > >> Thanks for driving this, Zhu and Lijie.
> > >>
> > >> +1 for the overall proposal. Just share some cents here:
> > >>
> > >> - Why do we need to expose
> > >> cluster.resource-blacklist.item.timeout-check-interval to the user?
> > >> I think the semantics of `cluster.resource-blacklist.item.timeout` is
> > >> sufficient for the user. How to guarantee the timeout mechanism is
> > >> Flink's internal implementation. I think it will be very confusing and
> > >> we do not need to expose it to users.
> > >>
> > >> - ResourceManager can notify the exception of a task manager to
> > >> `BlacklistHandler` as well.
> > >> For example, the slot allocation might fail in case the target task
> > >> manager is busy or has a network jitter. I don't mean we need to cover
> > >> this case in this version, but we can also open a `notifyException` in
> > >> `ResourceManagerBlacklistHandler`.
> > >>
> > >> - Before we sync the blocklist to ResourceManager, will the slot of a
> > >> blocked task manager continues to be released and allocated?
> > >>
> > >> Best,
> > >> Yangze Guo
> > >>
> > >> On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang <wangdachui9...@gmail.com>
> > >> wrote:
> > >>> Hi Konstantin,
> > >>>
> > >>> Thanks for your feedback. I will response your 4 remarks:
> > >>>
> > >>>
> > >>> 1) Thanks for reminding me of the controversy. I think “BlockList” is
> > >> good
> > >>> enough, and I will change it in FLIP.
> > >>>
> > >>>
> > >>> 2) Your suggestion for the REST API is a good idea. Based on the
> > above, I
> > >>> would change REST API as following:
> > >>>
> > >>> POST/GET <host>/blocklist/nodes
> > >>>
> > >>> POST/GET <host>/blocklist/taskmanagers
> > >>>
> > >>> DELETE <host>/blocklist/node/<identifier>
> > >>>
> > >>> DELETE <host>/blocklist/taskmanager/<identifier>
> > >>>
> > >>>
> > >>> 3) If a node is blocking/blocklisted, it means that all task managers
> > on
> > >>> this node are blocklisted. All slots on these TMs are not available.
> > This
> > >>> is actually a bit like TM losts, but these TMs are not really lost,
> > they
> > >>> are in an unavailable status, and they are still registered in this
> > flink
> > >>> cluster. They will be available again once the corresponding
> blocklist
> > >> item
> > >>> is removed. This behavior is the same in active/non-active clusters.
> > >>> However in the active clusters, these TMs may be released due to idle
> > >>> timeouts.
> > >>>
> > >>>
> > >>> 4) For the item timeout, I prefer to keep it. The reasons are as
> > >> following:
> > >>> a) The timeout will not affect users adding or removing items via
> REST
> > >> API,
> > >>> and users can disable it by configuring it to Long.MAX_VALUE .
> > >>>
> > >>> b) Some node problems can recover after a period of time (such as
> > machine
> > >>> hotspots), in which case users may prefer that Flink can do this
> > >>> automatically instead of requiring the user to do it manually.
> > >>>
> > >>>
> > >>> Best,
> > >>>
> > >>> Lijie
> > >>>
> > >>> Konstantin Knauf <kna...@apache.org> 于2022年4月27日周三 19:23写道:
> > >>>
> > >>>> Hi Lijie,
> > >>>>
> > >>>> I think, this makes sense and +1 to only support manually blocking
> > >>>> taskmanagers and nodes. Maybe the different strategies can also be
> > >>>> maintained outside of Apache Flink.
> > >>>>
> > >>>> A few remarks:
> > >>>>
> > >>>> 1) Can we use another term than "bla.cklist" due to the controversy
> > >> around
> > >>>> the term? [1] There was also a Jira Ticket about this topic a while
> > >> back
> > >>>> and there was generally a consensus to avoid the term blacklist &
> > >> whitelist
> > >>>> [2]? We could use "blocklist" "denylist" or "quarantined"
> > >>>> 2) For the REST API, I'd prefer a slightly different design as verbs
> > >> like
> > >>>> add/remove often considered an anti-pattern for REST APIs. POST on a
> > >> list
> > >>>> item is generally the standard to add items. DELETE on the
> individual
> > >>>> resource is standard to remove an item.
> > >>>>
> > >>>> POST <host>/quarantine/items
> > >>>> DELETE <host>/quarantine/items/<itemidentifier>
> > >>>>
> > >>>> We could also consider to separate taskmanagers and nodes in the
> REST
> > >> API
> > >>>> (and internal data structures). Any opinion on this?
> > >>>>
> > >>>> POST/GET <host>/quarantine/nodes
> > >>>> POST/GET <host>/quarantine/taskmanager
> > >>>> DELETE <host>/quarantine/nodes/<identifier>
> > >>>> DELETE <host>/quarantine/taskmanager/<identifier>
> > >>>>
> > >>>> 3) How would blocking nodes behave with non-active resource
> managers,
> > >> i.e.
> > >>>> standalone or reactive mode?
> > >>>>
> > >>>> 4) To keep the implementation even more minimal, do we need the
> > timeout
> > >>>> behavior? If items are added/removed manually we could delegate this
> > >> to the
> > >>>> user easily. In my opinion the timeout behavior would better fit
> into
> > >>>> specific strategies at a later point.
> > >>>>
> > >>>> Looking forward to your thoughts.
> > >>>>
> > >>>> Cheers and thank you,
> > >>>>
> > >>>> Konstantin
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>>
> > >>
> >
> https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term
> > >>>> [2] https://issues.apache.org/jira/browse/FLINK-18209
> > >>>>
> > >>>> Am Mi., 27. Apr. 2022 um 04:04 Uhr schrieb Lijie Wang <
> > >>>> wangdachui9...@gmail.com>:
> > >>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> Flink job failures may happen due to cluster node issues
> > >> (insufficient
> > >>>> disk
> > >>>>> space, bad hardware, network abnormalities). Flink will take care
> of
> > >> the
> > >>>>> failures and redeploy the tasks. However, due to data locality and
> > >>>> limited
> > >>>>> resources, the new tasks are very likely to be redeployed to the
> same
> > >>>>> nodes, which will result in continuous task abnormalities and
> affect
> > >> job
> > >>>>> progress.
> > >>>>>
> > >>>>> Currently, Flink users need to manually identify the problematic
> > >> node and
> > >>>>> take it offline to solve this problem. But this approach has
> > >> following
> > >>>>> disadvantages:
> > >>>>>
> > >>>>> 1. Taking a node offline can be a heavy process. Users may need to
> > >>>> contact
> > >>>>> cluster administors to do this. The operation can even be dangerous
> > >> and
> > >>>> not
> > >>>>> allowed during some important business events.
> > >>>>>
> > >>>>> 2. Identifying and solving this kind of problems manually would be
> > >> slow
> > >>>> and
> > >>>>> a waste of human resources.
> > >>>>>
> > >>>>> To solve this problem, Zhu Zhu and I propose to introduce a
> blacklist
> > >>>>> mechanism for Flink to filter out problematic resources.
> > >>>>>
> > >>>>>
> > >>>>> You can find more details in FLIP-224[1]. Looking forward to your
> > >>>> feedback.
> > >>>>> [1]
> > >>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blacklist+Mechanism
> > >>>>>
> > >>>>> Best,
> > >>>>>
> > >>>>> Lijie
> > >>>>>
> >
> >
>

Reply via email to