To expand a bit for transparency, Zhu Zhu and I had long discussion (literally spanning days) about this FLIP and it's relation to speculative execution.

The gist of it is that speculative execution doesn't strictly need the block list; just _some_ mechanism to select/request slots from other nodes. It is however at this time the easiest way to implement it because of the technical debt we have in the scheduler components (e.g., adaptive scheduler not being used for all streaming jobs or default scheduler not being integrated fully into declarative resource management). Because of that I was worried that we might expand the API now to unlock speculative execution, but then end up not actually requiring it down the line (but still being stuck with it).
This should give us quite a bit more freedom as to how we implement it.
Which is particularly important because we already identified some limitations in the current design (e.g., blocks not being scoped to jobs, resourceID-based blocks intefeing with hard-coded resource IDs, potentially blocking more slots than necessary).

Shall we start a new vote thread, since the design changed quite a bit?

On 15/06/2022 09:33, Zhu Zhu wrote:
Hi everyone,
Thank you all for the feedback!

We receive concerns that the blocklist feature is not strongly
required except for the needs of speculative execution. So as the
first step, we will limit the scope of FLIP-224 to only support
speculative execution, and therefore, not add public interfaces for
users to manipulate with the blocklist directly.

If later we receive strong requirements for blocklist from users, we
will have another FLIP to open it to users with well designed public
interfaces and web UI.

Thanks,
Zhu

Zhu Zhu <reed...@gmail.com> 于2022年6月10日周五 18:45写道:
1) With the declarative slot allocation protocol, it's not easy to
keep the slow slots which may have satisfy the slot request already,
but ask for more slots from the resource manager.
And we are also concerned to use detected slow slots, which may add
load to the slow nodes and further slow down the tasks on the slow
nodes. Because we found slow nodes are mostly caused by heavy loads.

2) I see your point. The blocker here is that batch jobs currently
uses DefaultScheduler. The scheduler does not see slots directly and
does not know which of them are from slow nodes, so it's hard for it
to rescale the vertex according to this information. Besides that, it
cannot help with the stage that first observed and affected the slow
nodes.

Thanks,
Zhu

Chesnay Schepler <ches...@apache.org> 于2022年6月10日周五 17:04写道:
1)
It's true that if we handle this entirely in the scheduler we may get a bunch 
of slow slots from the RM.
My point is that this isn't necessarily a problem; it depends on how you handle 
those slots.

We anyway need to account for the possibility that we're not getting any new 
fast slots from the RM.
With that in mind, I'd rather not categorically throw away the slow slots we 
got, but try to make use of them somehow.

2)
The idea was to rescale the job when the current stage finishes.
The speculative execution handles slow nodes being detected in the current 
stage,
next stage we use _some_ strategy to handle slow nodes (be it ignoring those, 
rescaling the job, ...).

3)
It's a fair observation that once you push this to the RM you end up with a 
de-facto blocklist :)

On 08/06/2022 17:11, Zhu Zhu wrote:

Thanks Chesnay for the feedback in the vote thread.
(https://lists.apache.org/thread/opc7jg3rpxnwotkb0fcn4wnm02m4397o)

I'd like to continue the discussion in this thread so that the
discussions can be better tracked.

Regarding your questions, here are my thoughts:
1. The current locality mechanism does not work well to avoid
deploying tasks to slow nodes because it cannot proactively reject or
release slots from the slow nodes. And it cannot help at the resource
manager side to avoid allocating free slots or launching new
TaskManagers on slow nodes.
2. Dynamically downscaling or upscaling a batch job is usually
unacceptable because it means to re-run the whole stage.
3. Extending the requirement declaration to have a notion of
"undesirable nodes" is an option. And it is actually how we started.
Considering implementation details, we found we need that
- a tracker to collect all the undesirable(detected slow nodes)
- to filter out slots from undesirable nodes when allocating slots
from the SlotPool
- ask the ResourceManager for slots that are not on the undesirable
nodes. The ResourceManager may further need to ask for new
TaskManagers that are not on the undesirable nodes.
Then, with all these functionality, we found that we almost have a
blocklist mechanism. As blocklist mechanism is a common concept and is
possible to benefit users, we took this chance to propose the
blocklist mechanism.
4. A cluster-wide shared blocklist is not a must for speculative
execution at the moment. It is mainly part of a standalone blocklist
feature to host user specified block items. To avoid sharing job
specific blocked items between jobs, one way is to add a nullable
JobID tag for the blocked item.

Thanks,
Zhu

Zhu Zhu <reed...@gmail.com> 于2022年6月7日周二 10:58写道:

Hi Chesnay,

Would you please take a look at the FLIP and discussion to see if all
your concerns have been addressed?

Thanks,
Zhu

Zhu Zhu <reed...@gmail.com> 于2022年5月28日周六 13:26写道:

Regarding the concern of the SlotManager, my two cents:
1. it is necessary for the SlotManager to host blocked slots, in 2 cases:
   a. In standalone mode, a taskmanager may be temporarily added to
the blocklist. We do not want the TM to get disconnected and shut down.
So we need to keep its connection to RM and keep hosting its slots.
   b. When we want to avoid allocating slots to a slow nodes but do not
want to kill current running tasks on the nodes (MARK_BLOCKED mode).

There is possible a way to keep the connection of a blocked task manager
while hide its slots from SlotManager, but I feel it may be even much more
complicated.

2. It will not complicate the SlotManager too much. The SlotManager will
be offered a BlockedTaskManagerChecker when created, and just need
to use it to filter out blocked slots on slot request. Therefore I think the
complication is acceptable.

Thanks,
Zhu

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月25日周三 15:26写道:

Hi everyone,

I've updated the FLIP according to Chesnay's feedback, changes as follows:
1. Change the GET result to a map.
2. Only left *endTimestamp* in ADD operation, and change the rest (from
POST) to PUT
3. Introduce a new slot pool implementation(BlocklistSlotPool) to
encapsulate blocklist related functions.
4. Remove *mainThread* from BlocklistTracker, instead provide a
*removeTimeoutItems* method to be called by outside components。

Best,
Lijie

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月23日周一 22:51写道:

Hi Chesnay,

Thanks for feedback.

1. Regarding the TM/Node id. Do you mean special characters may appear in
the rest URL?  Actually, I don't think so. The task manager id in REST API
should be the *ResourceID* of taskmanager in Flink, there should be no
special characters, and some existing REST APIs are already using it, e.g.
GET: http://{jm_rest_address:port}/taskmanagers/<taskmanagerid>. The node
id should be an IP of a machine or node name in Yarn/Kubernetes, I think it
should also have no special characters.
2. Regarding the GET query responses. I agree with you, it makes sense to
change the GET result to a map.

3. Regarding the endTimestamp.  I also agree with you, endTimestamp can
cover everything, and the endTimestamp is a unix timestamp, there should be
no timezone issues. But I think PUT and DELETE are enough, no PATCH.  The
add rest api is add or update, PUT can cover this semantics.

4. Regarding the slot pool/manager. I don't think the current slotpool
and slotmanager are able to support the MARK_BLOCKED(slots that are
already allocated will not be affected) action. The reasons are as
follows:

a) for slot pool, with the MARK_BLOCKED action, when a slot state changes
from reserved(task assigned) to free(no task assigned), it is necessary to
check whether the slot should be released immediately(it should be released
immediately if the task manager is blocked, otherwise it may be allocated
to other tasks). I think it cannot be supported without being aware of
the blocklist information. Compared to the solution in FLIP, a more
appropriate/prefered way may be: Introduce a new slot pool
implementation for blocklist(may be named BlocklistSlotPool, it
extends/wrapps the original slot pool), and implement the parts that need
to be aware of the blocklist in this newly introduced slot pool, and the
original slot pool basically does not need to change.

b) for slot manager, with the MARK_BLOCKED action, there may be free but
blocked slots in slot manager (the corresponding TMs cannot be
released/unregistered because there are still running tasks on them).
Therefore, we need to filter out the blocked slots when trying to fulfill
the slot requirements. Therefore it also needs to know the blocklist 
information.
A better way may be to abstract a resource allocation strategy, and make
the blocklist as a special implementation, then pass the resource
allocation strategy in when constructing the slot manager. Unfortunately,
the data structures in the two existing slot manager
implementations(*DeclarativeSlotManager* and *FineGrainedSlotManager*) are
quite different, it is not easy to abstract a common resource allocation
strategy, so we prefer to keep the current way(i.e. pass the blocklist
information directly into slot manager).


5. Regarding the BlocklistTracker. I also agree with you, the BlocklistTracker
does not need to be aware of the executor, and the timeout actions can be
done outside.

Chesnay Schepler <ches...@apache.org> 于2022年5月20日周五 17:34写道:

I have a number of concerns:

Is the id used for deleting an item the same sent in the initial request
(and not one returned by Flink)?
I'm very concerned that the tm/node id can contain special characters.

The GET query should return a map, not a list of items. This makes it
easier to work with.

The duality of endTimestamp and duration is also concerning.
If we conclude that endTimestamps can in fact work (and aren't utterly
unusable due to timezones),
then this should be able to cover everything and rid us of some
complexity w.r.t. POSTs to the same ID.
Additions would be a PUT, changes a PATCH, deletes a DELETE.


I also dislike how we're pushing more functionality into the
slotpool/-manager.
These components are complex enough as-is, and instead I'd propose a
separate component that interacts with the SlotPool/-Manager instead,
for example by removing the slots from that TM.
The reason being that from the slot-pool perspective it is irrelevant
whether a slot is gone because the TM was lost, or because it was blocked.


The BlocklistTracker shouldn't be modeled as component that is aware of
the concept of main threads.
It really has no business knowing that; all it needs is an executor for
handling timeouts/periodic actions,
and a way to interact with the JM/RM (which internally can handle the
scheduling into the main thread).


On 20/05/2022 07:20, Lijie Wang wrote:

Hi everyone,

I have started a vote for this FLIP [1]. Please cast your vote there or

ask

additional questions here. [1]
https://lists.apache.org/thread/3416vks1j35co9608gkmsoplvcjjz7bg

Best, Lijie

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月19日周四 17:34写道:

Hi Konstantin,

We found that Flink REST URL does not support the format ":merge" ,

which

will be recognized as a parameter in the URL(due to start with a

colon).

We will keep the previous way, i.e.

POST: http://{jm_rest_address:port}/blocklist/taskmanagers
and the "id" and "merge" flag are put into the request body.

Best,
Lijie

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月18日周三 09:35写道:

Hi Weihua,
thanks for feedback.

1. Yes, only *Manually* is supported in this FLIP, but it's the first
step towards auto-detection.
2. We wii print the blocked nodes in logs. Maybe also put it into the
exception of insufficient resources.
3. No. This FLIP won't change the WebUI. The blocklist information

can be

obtained through REST API and metrics.

Best,
Lijie

Weihua Hu <huweihua....@gmail.com> 于2022年5月17日周二 21:41写道:

Hi,
Thanks for creating this FLIP.
We have implemented an automatic blocklist detection mechanism
internally, which is indeed very effective for handling node

failures.

Due to the large number of nodes, although SREs already support
automatic offline failure nodes, the detection is not 100% accurate

and

there is some delay.
So the blocklist mechanism can make flink job recover from failure

much

faster.

Here are some of my thoughts:
1. In this FLIP, it needs users to locate machine failure manually,
there is a certain cost of use
2. What happens if too many nodes are blocked, resulting in

insufficient

resources? Will there be a special Exception for the user?
3. Will we display the blocklist information in the WebUI? The

blocklist

is for cluster level, and if multiple users share a cluster, some

users may

be a little confused when resources are not enough, or when

resources are

applied for more.

Also, Looking forward to the next FLIP on auto-detection.

Best,
Weihua

2022年5月16日 下午11:22,Lijie Wang <wangdachui9...@gmail.com> 写道:

Hi Konstantin,

Maybe change it to the following:

1. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
Merge is not allowed. If the {id} already exists, return error.

Otherwise,

create a new item.

2. POST: http://

{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge

Merge is allowed. If the {id} already exists, merge. Otherwise,

create

a

new item.

WDYT?

Best,
Lijie

Konstantin Knauf <kna...@apache.org> 于2022年5月16日周一 20:07写道:

Hi Lijie,

hm, maybe the following is more appropriate in that case

POST: http://

{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge

Best,

Konstantin

Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang <
wangdachui9...@gmail.com>:

Hi Konstantin,
thanks for your feedback.

  From what I understand, PUT should be idempotent. However, we

have a

*timeout* field in the request. This means that initiating the

same

request

at two different times will lead to different resource status

(timestamps

of the items to be removed will be different).

Should we use PUT in this case? WDYT?

Best,
Lijie

Konstantin Knauf <kna...@apache.org> 于2022年5月13日周五 17:20写道:

Hi Lijie,

wouldn't the REST API-idiomatic way for an update/replace be a

PUT

on

the

resource?

PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}

Best,

Konstantin



Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang <
wangdachui9...@gmail.com>:

Hi everyone,

I've had an offline discussion with Becket Qin and Zhu Zhu, and

made

the

following changes on REST API:
1. To avoid ambiguity, *timeout* and *endTimestamp* can only

choose

one.

If

both are specified, will return error.
2.  If the specified item is already there, the *ADD* operation

has

two

behaviors:  *return error*(default value) or *merge/update*,

and we

add a

flag to the request body to control it. You can find more

details

"Public

Interface" section.

If there is no more feedback, we will start the vote thread next

week.

Best,
Lijie

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月10日周二 17:14写道:

Hi Becket Qin,

Thanks for your suggestions.  I have moved the description of
configurations, metrics and REST API into "Public Interface"

section,

and

made a few updates according to your suggestion.  And in this

FLIP,

there

no public java Interfaces or pluggables that users need to

implement

by

themselves.

Answers for you questions:
1. Yes, there 2 block actions: MARK_BLOCKED and.
MARK_BLOCKED_AND_EVACUATE_TASKS (has renamed). Currently, block

items

can

only be added through the REST API, so these 2 action are

mentioned

in

the

REST API part (The REST API part has beed moved to public

interface

now).

2. I agree with you. I have changed the "Cause" field to

String,

and

allow

users to specify it via REST API.
3. Yes, it is useful to allow different timeouts. As mentioned

above,

we

will introduce 2 fields : *timeout* and *endTimestamp* into the

ADD

REST

API to specify when to remove the blocked item. These 2 fields

are

optional, if neither is specified, it means that the blocked

item

is

permanent and will not be removed. If both are specified, the

minimum

of

*currentTimestamp+tiemout *and* endTimestamp* will be used as

the

time

to

remove the blocked item. To keep the configurations more

minimal,

we

have

removed the *cluster.resource-blocklist.item.timeout*

configuration

option.
4. Yes, the block item will be overridden if the specified item

already

exists. The ADD operation is *ADD or UPDATE*.
5. Yes. On JM/RM side, all the blocklist information is

maintained

in

JMBlocklistHandler/RMBlocklistHandler. The blocklist handler(or

abstracted

to other interfaces) will be propagated to different

components.

Best,
Lijie

Becket Qin <becket....@gmail.com> 于2022年5月10日周二 11:26写道:

Hi Lijie,

Thanks for updating the FLIP. It looks like the public

interface

section

did not fully reflect all the user sensible behavior and API.

Can

you

put

everything that users may be aware of there? That would

include

the

REST

API, metrics, configurations, public java Interfaces or

pluggables

that

users may see or implement by themselves, as well as a brief

summary

of

the
behavior of the public API.

Besides that, I have a few questions:

1. According to the conversation in the discussion thread, it

looks

like

the BlockAction will have "MARK_BLOCKLISTED" and
"MARK_BLOCKLISTED_AND_EVACUATE_TASKS". Is that the case? If

so,

can

you

add
that to the public interface as well?

2. At this point, the "Cause" field in the BlockingItem is a

Throwable

and

is not reflected in the REST API. Should that be included in

the

query

response? And should we change that field to be a String so

users

may

specify the cause via the REST API when they block some nodes

/

TMs?

3. Would it be useful to allow users to have different

timeouts

for

different blocked items? So while there is a default timeout,

users

can

also override it via the REST API when they block an entity.

4. Regarding the ADD operation, if the specified item is

already

there,

will the block item be overridden? For example, if the user

wants

to

extend
the timeout of a blocked item, can they just  issue an ADD

command

again?

5. I am not quite familiar with the details of this, but is

there

a

source

of truth for the blocked list? I think it might be good to

have a

single

source of truth for the blocked list and just propagate that

list

to

different components to take the action of actually blocking

the

resource.

Thanks,

Jiangjie (Becket) Qin

On Mon, May 9, 2022 at 5:54 PM Lijie Wang <

wangdachui9...@gmail.com

wrote:

Hi everyone,

Based on the discussion in the mailing list, I updated the

FLIP

doc,

the

changes include:
1. Changed the description of the motivation section to more

clearly

describe the problem this FLIP is trying to solve.
2. Only  *Manually* is supported.
3. Adopted some suggestions, such as *endTimestamp*.

Best,
Lijie


Roman Boyko <ro.v.bo...@gmail.com> 于2022年5月7日周六 19:25写道:

Hi Lijie!




*a) “Probably storing inside Zookeeper/Configmap might be

helpfulhere.”

Can you explain it in detail? I don't fully understand that.

In

myopinion,

non-active and active are the same, and no special treatment

isrequired.*

Sorry this was a misunderstanding from my side. I thought we

were

talking

about the HA mode (but not about Active and Standalone

ResourceManager).

And the original question was - how to handle the

blacklisted

nodes

list

at

the moment of leader change? Should we simply forget about

them

or

try to

pre-save that list on the remote storage?

On Sat, 7 May 2022 at 10:51, Yang Wang <

danrtsey...@gmail.com

wrote:

Thanks Lijie and ZhuZhu for the explanation.

I just overlooked the "MARK_BLOCKLISTED". For tasks level,

it

is

indeed

some functionalities the external tools(e.g. kubectl taint)

could

not

support.


Best,
Yang

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月6日周五

22:18写道:

Thanks for your feedback, Jiangang and Martijn.

@Jiangang


For auto-detecting, I wonder how to make the strategy

and

mark a

node

blocked?

In fact, we currently plan to not support auto-detection

in

this

FLIP.

The

part about auto-detection may be continued in a separate

FLIP

in

the

future. Some guys have the same concerns as you, and the

correctness

and

necessity of auto-detection may require further discussion

in

the

future.

In session mode, multi jobs can fail on the same bad

node

and

the

node

should be marked blocked.
By design, the blocklist information will be shared among

all

jobs

in a

cluster/session. The JM will sync blocklist information

with

RM.

@Martijn

I agree with Yang Wang on this.

As Zhu Zhu and I mentioned above, we think the

MARK_BLOCKLISTED(Just

limits

the load of the node and does not  kill all the processes

on

it)

is

also

important, and we think that external systems (*yarn

rmadmin

or

kubectl

taint*) cannot support it. So we think it makes sense even

only

*manually*.

I also agree with Chesnay that magical mechanisms are

indeed

super

hard

to get right.
Yes, as you see, Jiangang(and a few others) have the same

concern.

However, we currently plan to not support auto-detection

in

this

FLIP,

and

only *manually*. In addition, I'd like to say that the

FLIP

provides

a

mechanism to support MARK_BLOCKLISTED and
MARK_BLOCKLISTED_AND_EVACUATE_TASKS,
the auto-detection may be done by external systems.

Best,
Lijie

Martijn Visser <mart...@ververica.com> 于2022年5月6日周五

19:04写道:

If we only support to block nodes manually, then I

could

not

see

the obvious advantages compared with current SRE's

approach(via

*yarn

rmadmin or kubectl taint*).

I agree with Yang Wang on this.

To me this sounds yet again like one of those magical

mechanisms

that

will rarely work just right.

I also agree with Chesnay that magical mechanisms are

indeed

super

hard

to

get right.

Best regards,

Martijn

On Fri, 6 May 2022 at 12:03, Jiangang Liu <

liujiangangp...@gmail.com

wrote:

Thanks for the valuable design. The auto-detecting can

decrease

great

work

for us. We have implemented the similar feature in our

inner

flink

version.
Below is something that I care about:

    1. For auto-detecting, I wonder how to make the

strategy

and

mark a

node
    blocked? Sometimes the blocked node is hard to be

detected,

for

example,
    the upper node or the down node will be blocked when

network

unreachable.
    2. I see that the strategy is made in JobMaster

side.

How

about

    implementing the similar logic in resource manager?

In

session

mode,

multi
    jobs can fail on the same bad node and the node

should

be

marked

blocked.
    If the job makes the strategy, the node may be not

marked

blocked

if

the
    fail times don't exceed the threshold.


Zhu Zhu <reed...@gmail.com> 于2022年5月5日周四 23:35写道:

Thank you for all your feedback!

Besides the answers from Lijie, I'd like to share

some

of

my

thoughts:

1. Whether to enable automatical blocklist
Generally speaking, it is not a goal of FLIP-224.
The automatical way should be something built upon

the

blocklist

mechanism and well decoupled. It was designed to be a

configurable

blocklist strategy, but I think we can further

decouple

it

by

introducing a abnormal node detector, as Becket

suggested,

which

just

uses the blocklist mechanism once bad nodes are

detected.

However,

it

should be a separate FLIP with further dev

discussions

and

feedback

from users. I also agree with Becket that different

users

have

different

requirements, and we should listen to them.

2. Is it enough to just take away abnormal nodes

externally

My answer is no. As Lijie has mentioned, we need a

way

to

avoid

deploying tasks to temporary hot nodes. In this case,

users

may

just

want to limit the load of the node and do not want to

kill

all

the

processes on it. Another case is the speculative

execution[1]

which

may also leverage this feature to avoid starting

mirror

tasks on

slow

nodes.

Thanks,
Zhu

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job

Lijie Wang <wangdachui9...@gmail.com> 于2022年5月5日周四

15:56写道:

Hi everyone,


Thanks for your feedback.


There's one detail that I'd like to re-emphasize

here

because

it

can

affect the value and design of the blocklist

mechanism

(perhaps

I

should

highlight it in the FLIP). We propose two actions in

FLIP:

1) MARK_BLOCKLISTED: Just mark the task manager or

node

as

blocked.

Future slots should not be allocated from the blocked

task

manager

or

node.

But slots that are already allocated will not be

affected.

A

typical

application scenario is to mitigate machine hotspots.

In

this

case,

we

hope

that subsequent resource allocations will not be on

the

hot

machine,

but

tasks currently running on it should not be affected.

2) MARK_BLOCKLISTED_AND_EVACUATE_TASKS: Mark the

task

manager

or

node

as

blocked, and evacuate all tasks on it. Evacuated

tasks

will

be

restarted on

non-blocked task managers.

For the above 2 actions, the former may more

highlight

the

meaning

of

this FLIP, because the external system cannot do

that.

Regarding *Manually* and *Automatically*, I

basically

agree

with

@Becket

Qin: different users have different answers. Not all

users’

deployment

environments have a special external system that can

perform

the

anomaly

detection. In addition, adding pluggable/optional

auto-detection

doesn't

require much extra work on top of manual

specification.

I will answer your other questions one by one.


@Yangze

a) I think you are right, we do not need to expose

the
`cluster.resource-blocklist.item.timeout-check-interval`

to

users.

b) We can abstract the `notifyException` to a

separate

interface

(maybe

BlocklistExceptionListener), and the

ResourceManagerBlocklistHandler

can

implement it in the future.

@Martijn

a) I also think the manual blocking should be done

by

cluster

operators.

b) I think manual blocking makes sense, because

according

to

my

experience, users are often the first to perceive the

machine

problems

(because of job failover or delay), and they will

contact

cluster

operators

to solve it, or even tell the cluster operators which

machine is

problematic. From this point of view, I think the

people

who

really

need

the manual blocking are the users, and it’s just

performed

by

the

cluster

operator, so I think the manual blocking makes sense.

@Chesnay

We need to touch the logic of JM/SlotPool, because

for

MARK_BLOCKLISTED

, we need to know whether the slot is blocklisted

when

the

task

is

FINISHED/CANCELLED/FAILED. If so,  SlotPool should

release

the

slot

directly to avoid assigning other tasks (of this job)

on

it.

If

we

only

maintain the blocklist information on the RM, JM

needs

to

retrieve

it

by

RPC. I think the performance overhead of that is

relatively

large,

so

I

think it's worth maintaining the blocklist

information

on

the JM

side

and

syncing them.

@Роман

     a) “Probably storing inside Zookeeper/Configmap

might

be

helpful

here.”  Can you explain it in detail? I don't fully

understand

that.

In

my

opinion, non-active and active are the same, and no

special

treatment

is

required.

b) I agree with you, the `endTimestamp` makes

sense,

I

will

add

it

to

FLIP.

@Yang

As mentioned above, AFAK, the external system

cannot

support

the

MARK_BLOCKLISTED action.

Looking forward to your further feedback.


Best,

Lijie


Yang Wang <danrtsey...@gmail.com> 于2022年5月3日周二

21:09写道:

Thanks Lijie and Zhu for creating the proposal.

I want to share some thoughts about Flink cluster

operations.

In the production environment, the SRE(aka Site

Reliability

Engineer)

already has many tools to detect the unstable

nodes,

which

could

take

the

system logs/metrics into consideration.
Then they use graceful-decomission in YARN and

taint

in

K8s

to

prevent

new

allocations on these unstable nodes.
At last, they will evict all the containers and

pods

running

on

these

nodes.

This mechanism also works for planned maintenance.

So

I

am

afraid

this

is

not the typical use case for FLIP-224.

If we only support to block nodes manually, then I

could

not

see

the obvious advantages compared with current SRE's

approach(via

*yarn

rmadmin or kubectl taint*).
At least, we need to have a pluggable component

which

could

expose

the

potential unstable nodes automatically and block

them

if

enabled

explicitly.

Best,
Yang



Becket Qin <becket....@gmail.com> 于2022年5月2日周一

16:36写道:

Thanks for the proposal, Lijie.

This is an interesting feature and discussion,

and

somewhat

related

to the

design principle about how people should operate

Flink.

I think there are three things involved in this

FLIP.

      a) Detect and report the unstable node.
      b) Collect the information of the unstable

node

and

form a

blocklist.

      c) Take the action to block nodes.

My two cents:

1. It looks like people all agree that Flink

should

have

c).

It

is

not only

useful for cases of node failures, but also

handy

for

some

planned

maintenance.

2. People have different opinions on b), i.e.

who

should be

the

brain

to

make the decision to block a node. I think this

largely

depends

on

who we

talk to. Different users would probably give

different

answers.

For

people

who do have a centralized node health management

service,

let

Flink

do just

do a) and c) would be preferred. So essentially

Flink

would

be

one

of

the

sources that may detect unstable nodes, report

it

to

that

service,

and then

take the command from that service to block the

problematic

nodes.

On

the

other hand, for users who do not have such a

service,

simply

letting

Flink

be clever by itself to block the suspicious

nodes

might

be

desired

to

ensure the jobs are running smoothly.

So that indicates a) and b) here should be

pluggable /

optional.

In light of this, maybe it would make sense to

have

something

pluggable

like a UnstableNodeReporter which exposes

unstable

nodes

actively.

(A

more

general interface should be JobInfoReporter<T>

which

can be

used

to

report

any information of type <T>. But I'll just keep

the

scope

relevant

to

this

FLIP here). Personally speaking, I think it is

OK

to

have a

default

implementation of a reporter which just tells

Flink

to

take

action

to

block

problematic nodes and also unblocks them after

timeout.

Thanks,

Jiangjie (Becket) Qin


On Mon, May 2, 2022 at 3:27 PM Роман Бойко <

ro.v.bo...@gmail.com

wrote:

Thanks for good initiative, Lijie and Zhu!

If it's possible I'd like to participate in

development.

I agree with 3rd point of Konstantin's reply -

we

should

consider

to move

somehow the information of blocklisted

nodes/TMs

from

active

ResourceManager to non-active ones. Probably

storing

inside

Zookeeper/Configmap might be helpful here.

And I agree with Martijn that a lot of

organizations

don't

want

to

expose

such API for a cluster user group. But I think

it's

necessary

to

have the

mechanism for unblocking the nodes/TMs anyway

for

avoiding

incorrect

automatic behaviour.

And another one small suggestion - I think it

would

be

better

to

extend

the

*BlocklistedItem* class with the

*endTimestamp*

field

and

fill

it

at the

item creation. This simple addition will allow

to:

    -

    Provide the ability to users to setup the

exact

time

of

blocklist end

    through RestAPI
    -

    Not being tied to a single value of
    *cluster.resource-blacklist.item.timeout*


On Mon, 2 May 2022 at 14:17, Chesnay Schepler

<

ches...@apache.org>

wrote:

I do share the concern between blurring the

lines

a

bit.

That said, I'd prefer to not have any

auto-detection

and

only

have an

opt-in mechanism
to manually block processes/nodes. To me

this

sounds

yet

again

like one

of those
magical mechanisms that will rarely work

just

right.

An external system can leverage way more

information

after

all.

Moreover, I'm quite concerned about the

complexity

of

this

proposal.

Tracking on both the RM/JM side; syncing

between

components;

adjustments

to the
slot and resource protocol.

In a way it seems overly complicated.

If we look at it purely from an active

resource

management

perspective,

then there
isn't really a need to touch the slot

protocol

at

all

(or

in

fact

to

anything in the JobMaster),
because there isn't any point in keeping

around

blocked

TMs

in

the

first

place.
They'd just be idling, potentially shutting

down

after

a

while

by

the

RM

because of
it (unless we _also_ touch that logic).
Here the blocking of a process (be it by

blocking

the

process

or

node)

is

equivalent with shutting down the blocked

process(es).

Once the block is lifted we can just spin it

back

up.

And I do wonder whether we couldn't apply

the

same

line

of

thinking to

standalone resource management.
Here being able to stop/restart a

process/node

manually

should

be

a

core

requirement for a Flink deployment anyway.


On 02/05/2022 08:49, Martijn Visser wrote:

Hi everyone,

Thanks for creating this FLIP. I can

understand

the

problem

and

I see

value

in the automatic detection and

blocklisting. I

do

have

some

concerns

with

the ability to manually specify to be

blocked

resources.

I

have

two

concerns;

* Most organizations explicitly have a

separation

of

concerns,

meaning

that

there's a group who's responsible for

managing a

cluster

and

there's

a

user

group who uses that cluster. With the

introduction of

this

mechanism,

the

latter group now can influence the

responsibility

of

the

first

group.

So

it

can be possible that someone from the user

group

blocks

something,

which

causes an outage (which could result in

paging

mechanism

triggering

etc)

which impacts the first group.
* How big is the group of people who can

go

through

the

process

of

manually

identifying a node that isn't behaving as

it

should

be? I

do

think

this

group is relatively limited. Does it then

make

sense

to

introduce

such

a

feature, which would only be used by a

really

small

user

group

of

Flink?

We

still have to maintain, test and support

such

a

feature.

I'm +1 for the autodetection features, but

I'm

leaning

towards

not

exposing

this to the user group but having this

available

strictly

for

cluster

operators. They could then also set up

their

paging/metrics/logging

system

to take this into account.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 29 Apr 2022 at 09:39, Yangze Guo <

karma...@gmail.com

wrote:

Thanks for driving this, Zhu and Lijie.

+1 for the overall proposal. Just share

some

cents

here:

- Why do we need to expose

cluster.resource-blacklist.item.timeout-check-interval

to

the

user?

I think the semantics of

`cluster.resource-blacklist.item.timeout`

is

sufficient for the user. How to guarantee

the

timeout

mechanism is

Flink's internal implementation. I think

it

will

be

very

confusing

and

we do not need to expose it to users.

- ResourceManager can notify the

exception

of a

task

manager to

`BlacklistHandler` as well.
For example, the slot allocation might

fail

in

case

the

target

task

manager is busy or has a network jitter.

I

don't

mean

we

need

to

cover

this case in this version, but we can

also

open a

`notifyException`

in

`ResourceManagerBlacklistHandler`.

- Before we sync the blocklist to

ResourceManager,

will

the

slot of

a

blocked task manager continues to be

released

and

allocated?

Best,
Yangze Guo

On Thu, Apr 28, 2022 at 3:11 PM Lijie

Wang

<

wangdachui9...@gmail.com>

wrote:

Hi Konstantin,

Thanks for your feedback. I will

response

your 4

remarks:

1) Thanks for reminding me of the

controversy. I

think

“BlockList”

is

good

enough, and I will change it in FLIP.


2) Your suggestion for the REST API is a

good

idea.

Based

on

the

above, I

would change REST API as following:

POST/GET <host>/blocklist/nodes

POST/GET <host>/blocklist/taskmanagers

DELETE

<host>/blocklist/node/<identifier>

DELETE

<host>/blocklist/taskmanager/<identifier>

3) If a node is blocking/blocklisted, it

means

that

all

task

managers

on

this node are blocklisted. All slots on

these

TMs

are

not

available.

This

is actually a bit like TM losts, but

these

TMs

are

not

really

lost,

they

are in an unavailable status, and they

are

still

registered

in this

flink

cluster. They will be available again

once

the

corresponding

blocklist

item

is removed. This behavior is the same in

active/non-active

clusters.

However in the active clusters, these

TMs

may

be

released

due

to

idle

timeouts.


4) For the item timeout, I prefer to

keep

it.

The

reasons

are

as

following:

a) The timeout will not affect users

adding

or

removing

items

via

REST

API,

and users can disable it by configuring

it

to

Long.MAX_VALUE .

b) Some node problems can recover after

a

period of

time

(such as

machine

hotspots), in which case users may

prefer

that

Flink

can

do

this

automatically instead of requiring the

user

to

do

it

manually.

Best,

Lijie

Konstantin Knauf <kna...@apache.org>

于2022年4月27日周三

19:23写道:

Hi Lijie,

I think, this makes sense and +1 to

only

support

manually

blocking

taskmanagers and nodes. Maybe the

different

strategies

can

also be

maintained outside of Apache Flink.

A few remarks:

1) Can we use another term than

"bla.cklist"

due

to

the

controversy

around

the term? [1] There was also a Jira

Ticket

about

this

topic a

while

back

and there was generally a consensus to

avoid

the

term

blacklist &

whitelist

[2]? We could use "blocklist"

"denylist"

or

"quarantined"

2) For the REST API, I'd prefer a

slightly

different

design

as

verbs

like

add/remove often considered an

anti-pattern

for

REST

APIs.

POST

on a

list

item is generally the standard to add

items.

DELETE

on

the

individual

resource is standard to remove an item.

POST <host>/quarantine/items
DELETE

<host>/quarantine/items/<itemidentifier>

We could also consider to separate

taskmanagers

and

nodes

in

the

REST

API

(and internal data structures). Any

opinion

on

this?

POST/GET <host>/quarantine/nodes
POST/GET <host>/quarantine/taskmanager
DELETE

<host>/quarantine/nodes/<identifier>

DELETE

<host>/quarantine/taskmanager/<identifier>

3) How would blocking nodes behave with

non-active

resource

managers,

i.e.

standalone or reactive mode?

4) To keep the implementation even more

minimal,

do

we

need

the

timeout

behavior? If items are added/removed

manually

we

could

delegate

this

to the

user easily. In my opinion the timeout

behavior

would

better

fit

into

specific strategies at a later point.

Looking forward to your thoughts.

Cheers and thank you,

Konstantin

[1]


https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term

[2]

https://issues.apache.org/jira/browse/FLINK-18209

Am Mi., 27. Apr. 2022 um 04:04 Uhr

schrieb

Lijie

Wang

<

wangdachui9...@gmail.com>:

Hi all,

Flink job failures may happen due to

cluster

node

issues

(insufficient

disk

space, bad hardware, network

abnormalities).

Flink

will

take care

of

the

failures and redeploy the tasks.

However,

due

to

data

locality

and

limited

resources, the new tasks are very

likely

to

be

redeployed

to the

same

nodes, which will result in continuous

task

abnormalities

and

affect

job

progress.

Currently, Flink users need to

manually

identify

the

problematic

node and

take it offline to solve this problem.

But

this

approach

has

following

disadvantages:

1. Taking a node offline can be a

heavy

process.

Users

may

need

to

contact

cluster administors to do this. The

operation

can

even

be

dangerous

and

not

allowed during some important business

events.

2. Identifying and solving this kind

of

problems

manually

would

be

slow

and

a waste of human resources.

To solve this problem, Zhu Zhu and I

propose

to

introduce a

blacklist

mechanism for Flink to filter out

problematic

resources.

You can find more details in

FLIP-224[1].

Looking

forward

to your

feedback.

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blacklist+Mechanism

Best,

Lijie

--
Best regards,
Roman Boyko
e.: ro.v.bo...@gmail.com

--
https://twitter.com/snntrable
https://github.com/knaufk

--
https://twitter.com/snntrable
https://github.com/knaufk




Reply via email to