1)
It's true that if we handle this entirely in the scheduler we may get a bunch
of slow slots from the RM.
My point is that this isn't necessarily a problem; it depends on how you handle
those slots.
We anyway need to account for the possibility that we're not getting any new
fast slots from the RM.
With that in mind, I'd rather not categorically throw away the slow slots we
got, but try to make use of them somehow.
2)
The idea was to rescale the job when the current stage finishes.
The speculative execution handles slow nodes being detected in the current
stage,
next stage we use _some_ strategy to handle slow nodes (be it ignoring those,
rescaling the job, ...).
3)
It's a fair observation that once you push this to the RM you end up with a
de-facto blocklist :)
On 08/06/2022 17:11, Zhu Zhu wrote:
Thanks Chesnay for the feedback in the vote thread.
(https://lists.apache.org/thread/opc7jg3rpxnwotkb0fcn4wnm02m4397o)
I'd like to continue the discussion in this thread so that the
discussions can be better tracked.
Regarding your questions, here are my thoughts:
1. The current locality mechanism does not work well to avoid
deploying tasks to slow nodes because it cannot proactively reject or
release slots from the slow nodes. And it cannot help at the resource
manager side to avoid allocating free slots or launching new
TaskManagers on slow nodes.
2. Dynamically downscaling or upscaling a batch job is usually
unacceptable because it means to re-run the whole stage.
3. Extending the requirement declaration to have a notion of
"undesirable nodes" is an option. And it is actually how we started.
Considering implementation details, we found we need that
- a tracker to collect all the undesirable(detected slow nodes)
- to filter out slots from undesirable nodes when allocating slots
from the SlotPool
- ask the ResourceManager for slots that are not on the undesirable
nodes. The ResourceManager may further need to ask for new
TaskManagers that are not on the undesirable nodes.
Then, with all these functionality, we found that we almost have a
blocklist mechanism. As blocklist mechanism is a common concept and is
possible to benefit users, we took this chance to propose the
blocklist mechanism.
4. A cluster-wide shared blocklist is not a must for speculative
execution at the moment. It is mainly part of a standalone blocklist
feature to host user specified block items. To avoid sharing job
specific blocked items between jobs, one way is to add a nullable
JobID tag for the blocked item.
Thanks,
Zhu
Zhu Zhu <reed...@gmail.com> 于2022年6月7日周二 10:58写道:
Hi Chesnay,
Would you please take a look at the FLIP and discussion to see if all
your concerns have been addressed?
Thanks,
Zhu
Zhu Zhu <reed...@gmail.com> 于2022年5月28日周六 13:26写道:
Regarding the concern of the SlotManager, my two cents:
1. it is necessary for the SlotManager to host blocked slots, in 2 cases:
a. In standalone mode, a taskmanager may be temporarily added to
the blocklist. We do not want the TM to get disconnected and shut down.
So we need to keep its connection to RM and keep hosting its slots.
b. When we want to avoid allocating slots to a slow nodes but do not
want to kill current running tasks on the nodes (MARK_BLOCKED mode).
There is possible a way to keep the connection of a blocked task manager
while hide its slots from SlotManager, but I feel it may be even much more
complicated.
2. It will not complicate the SlotManager too much. The SlotManager will
be offered a BlockedTaskManagerChecker when created, and just need
to use it to filter out blocked slots on slot request. Therefore I think the
complication is acceptable.
Thanks,
Zhu
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月25日周三 15:26写道:
Hi everyone,
I've updated the FLIP according to Chesnay's feedback, changes as follows:
1. Change the GET result to a map.
2. Only left *endTimestamp* in ADD operation, and change the rest (from
POST) to PUT
3. Introduce a new slot pool implementation(BlocklistSlotPool) to
encapsulate blocklist related functions.
4. Remove *mainThread* from BlocklistTracker, instead provide a
*removeTimeoutItems* method to be called by outside components。
Best,
Lijie
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月23日周一 22:51写道:
Hi Chesnay,
Thanks for feedback.
1. Regarding the TM/Node id. Do you mean special characters may appear in
the rest URL? Actually, I don't think so. The task manager id in REST API
should be the *ResourceID* of taskmanager in Flink, there should be no
special characters, and some existing REST APIs are already using it, e.g.
GET: http://{jm_rest_address:port}/taskmanagers/<taskmanagerid>. The node
id should be an IP of a machine or node name in Yarn/Kubernetes, I think it
should also have no special characters.
2. Regarding the GET query responses. I agree with you, it makes sense to
change the GET result to a map.
3. Regarding the endTimestamp. I also agree with you, endTimestamp can
cover everything, and the endTimestamp is a unix timestamp, there should be
no timezone issues. But I think PUT and DELETE are enough, no PATCH. The
add rest api is add or update, PUT can cover this semantics.
4. Regarding the slot pool/manager. I don't think the current slotpool
and slotmanager are able to support the MARK_BLOCKED(slots that are
already allocated will not be affected) action. The reasons are as
follows:
a) for slot pool, with the MARK_BLOCKED action, when a slot state changes
from reserved(task assigned) to free(no task assigned), it is necessary to
check whether the slot should be released immediately(it should be released
immediately if the task manager is blocked, otherwise it may be allocated
to other tasks). I think it cannot be supported without being aware of
the blocklist information. Compared to the solution in FLIP, a more
appropriate/prefered way may be: Introduce a new slot pool
implementation for blocklist(may be named BlocklistSlotPool, it
extends/wrapps the original slot pool), and implement the parts that need
to be aware of the blocklist in this newly introduced slot pool, and the
original slot pool basically does not need to change.
b) for slot manager, with the MARK_BLOCKED action, there may be free but
blocked slots in slot manager (the corresponding TMs cannot be
released/unregistered because there are still running tasks on them).
Therefore, we need to filter out the blocked slots when trying to fulfill
the slot requirements. Therefore it also needs to know the blocklist
information.
A better way may be to abstract a resource allocation strategy, and make
the blocklist as a special implementation, then pass the resource
allocation strategy in when constructing the slot manager. Unfortunately,
the data structures in the two existing slot manager
implementations(*DeclarativeSlotManager* and *FineGrainedSlotManager*) are
quite different, it is not easy to abstract a common resource allocation
strategy, so we prefer to keep the current way(i.e. pass the blocklist
information directly into slot manager).
5. Regarding the BlocklistTracker. I also agree with you, the BlocklistTracker
does not need to be aware of the executor, and the timeout actions can be
done outside.
Chesnay Schepler <ches...@apache.org> 于2022年5月20日周五 17:34写道:
I have a number of concerns:
Is the id used for deleting an item the same sent in the initial request
(and not one returned by Flink)?
I'm very concerned that the tm/node id can contain special characters.
The GET query should return a map, not a list of items. This makes it
easier to work with.
The duality of endTimestamp and duration is also concerning.
If we conclude that endTimestamps can in fact work (and aren't utterly
unusable due to timezones),
then this should be able to cover everything and rid us of some
complexity w.r.t. POSTs to the same ID.
Additions would be a PUT, changes a PATCH, deletes a DELETE.
I also dislike how we're pushing more functionality into the
slotpool/-manager.
These components are complex enough as-is, and instead I'd propose a
separate component that interacts with the SlotPool/-Manager instead,
for example by removing the slots from that TM.
The reason being that from the slot-pool perspective it is irrelevant
whether a slot is gone because the TM was lost, or because it was blocked.
The BlocklistTracker shouldn't be modeled as component that is aware of
the concept of main threads.
It really has no business knowing that; all it needs is an executor for
handling timeouts/periodic actions,
and a way to interact with the JM/RM (which internally can handle the
scheduling into the main thread).
On 20/05/2022 07:20, Lijie Wang wrote:
Hi everyone,
I have started a vote for this FLIP [1]. Please cast your vote there or
ask
additional questions here. [1]
https://lists.apache.org/thread/3416vks1j35co9608gkmsoplvcjjz7bg
Best, Lijie
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月19日周四 17:34写道:
Hi Konstantin,
We found that Flink REST URL does not support the format ":merge" ,
which
will be recognized as a parameter in the URL(due to start with a
colon).
We will keep the previous way, i.e.
POST: http://{jm_rest_address:port}/blocklist/taskmanagers
and the "id" and "merge" flag are put into the request body.
Best,
Lijie
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月18日周三 09:35写道:
Hi Weihua,
thanks for feedback.
1. Yes, only *Manually* is supported in this FLIP, but it's the first
step towards auto-detection.
2. We wii print the blocked nodes in logs. Maybe also put it into the
exception of insufficient resources.
3. No. This FLIP won't change the WebUI. The blocklist information
can be
obtained through REST API and metrics.
Best,
Lijie
Weihua Hu <huweihua....@gmail.com> 于2022年5月17日周二 21:41写道:
Hi,
Thanks for creating this FLIP.
We have implemented an automatic blocklist detection mechanism
internally, which is indeed very effective for handling node
failures.
Due to the large number of nodes, although SREs already support
automatic offline failure nodes, the detection is not 100% accurate
and
there is some delay.
So the blocklist mechanism can make flink job recover from failure
much
faster.
Here are some of my thoughts:
1. In this FLIP, it needs users to locate machine failure manually,
there is a certain cost of use
2. What happens if too many nodes are blocked, resulting in
insufficient
resources? Will there be a special Exception for the user?
3. Will we display the blocklist information in the WebUI? The
blocklist
is for cluster level, and if multiple users share a cluster, some
users may
be a little confused when resources are not enough, or when
resources are
applied for more.
Also, Looking forward to the next FLIP on auto-detection.
Best,
Weihua
2022年5月16日 下午11:22,Lijie Wang <wangdachui9...@gmail.com> 写道:
Hi Konstantin,
Maybe change it to the following:
1. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
Merge is not allowed. If the {id} already exists, return error.
Otherwise,
create a new item.
2. POST: http://
{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge
Merge is allowed. If the {id} already exists, merge. Otherwise,
create
a
new item.
WDYT?
Best,
Lijie
Konstantin Knauf <kna...@apache.org> 于2022年5月16日周一 20:07写道:
Hi Lijie,
hm, maybe the following is more appropriate in that case
POST: http://
{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge
Best,
Konstantin
Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang <
wangdachui9...@gmail.com>:
Hi Konstantin,
thanks for your feedback.
From what I understand, PUT should be idempotent. However, we
have a
*timeout* field in the request. This means that initiating the
same
request
at two different times will lead to different resource status
(timestamps
of the items to be removed will be different).
Should we use PUT in this case? WDYT?
Best,
Lijie
Konstantin Knauf <kna...@apache.org> 于2022年5月13日周五 17:20写道:
Hi Lijie,
wouldn't the REST API-idiomatic way for an update/replace be a
PUT
on
the
resource?
PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
Best,
Konstantin
Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang <
wangdachui9...@gmail.com>:
Hi everyone,
I've had an offline discussion with Becket Qin and Zhu Zhu, and
made
the
following changes on REST API:
1. To avoid ambiguity, *timeout* and *endTimestamp* can only
choose
one.
If
both are specified, will return error.
2. If the specified item is already there, the *ADD* operation
has
two
behaviors: *return error*(default value) or *merge/update*,
and we
add a
flag to the request body to control it. You can find more
details
"Public
Interface" section.
If there is no more feedback, we will start the vote thread next
week.
Best,
Lijie
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月10日周二 17:14写道:
Hi Becket Qin,
Thanks for your suggestions. I have moved the description of
configurations, metrics and REST API into "Public Interface"
section,
and
made a few updates according to your suggestion. And in this
FLIP,
there
no public java Interfaces or pluggables that users need to
implement
by
themselves.
Answers for you questions:
1. Yes, there 2 block actions: MARK_BLOCKED and.
MARK_BLOCKED_AND_EVACUATE_TASKS (has renamed). Currently, block
items
can
only be added through the REST API, so these 2 action are
mentioned
in
the
REST API part (The REST API part has beed moved to public
interface
now).
2. I agree with you. I have changed the "Cause" field to
String,
and
allow
users to specify it via REST API.
3. Yes, it is useful to allow different timeouts. As mentioned
above,
we
will introduce 2 fields : *timeout* and *endTimestamp* into the
ADD
REST
API to specify when to remove the blocked item. These 2 fields
are
optional, if neither is specified, it means that the blocked
item
is
permanent and will not be removed. If both are specified, the
minimum
of
*currentTimestamp+tiemout *and* endTimestamp* will be used as
the
time
to
remove the blocked item. To keep the configurations more
minimal,
we
have
removed the *cluster.resource-blocklist.item.timeout*
configuration
option.
4. Yes, the block item will be overridden if the specified item
already
exists. The ADD operation is *ADD or UPDATE*.
5. Yes. On JM/RM side, all the blocklist information is
maintained
in
JMBlocklistHandler/RMBlocklistHandler. The blocklist handler(or
abstracted
to other interfaces) will be propagated to different
components.
Best,
Lijie
Becket Qin <becket....@gmail.com> 于2022年5月10日周二 11:26写道:
Hi Lijie,
Thanks for updating the FLIP. It looks like the public
interface
section
did not fully reflect all the user sensible behavior and API.
Can
you
put
everything that users may be aware of there? That would
include
the
REST
API, metrics, configurations, public java Interfaces or
pluggables
that
users may see or implement by themselves, as well as a brief
summary
of
the
behavior of the public API.
Besides that, I have a few questions:
1. According to the conversation in the discussion thread, it
looks
like
the BlockAction will have "MARK_BLOCKLISTED" and
"MARK_BLOCKLISTED_AND_EVACUATE_TASKS". Is that the case? If
so,
can
you
add
that to the public interface as well?
2. At this point, the "Cause" field in the BlockingItem is a
Throwable
and
is not reflected in the REST API. Should that be included in
the
query
response? And should we change that field to be a String so
users
may
specify the cause via the REST API when they block some nodes
/
TMs?
3. Would it be useful to allow users to have different
timeouts
for
different blocked items? So while there is a default timeout,
users
can
also override it via the REST API when they block an entity.
4. Regarding the ADD operation, if the specified item is
already
there,
will the block item be overridden? For example, if the user
wants
to
extend
the timeout of a blocked item, can they just issue an ADD
command
again?
5. I am not quite familiar with the details of this, but is
there
a
source
of truth for the blocked list? I think it might be good to
have a
single
source of truth for the blocked list and just propagate that
list
to
different components to take the action of actually blocking
the
resource.
Thanks,
Jiangjie (Becket) Qin
On Mon, May 9, 2022 at 5:54 PM Lijie Wang <
wangdachui9...@gmail.com
wrote:
Hi everyone,
Based on the discussion in the mailing list, I updated the
FLIP
doc,
the
changes include:
1. Changed the description of the motivation section to more
clearly
describe the problem this FLIP is trying to solve.
2. Only *Manually* is supported.
3. Adopted some suggestions, such as *endTimestamp*.
Best,
Lijie
Roman Boyko <ro.v.bo...@gmail.com> 于2022年5月7日周六 19:25写道:
Hi Lijie!
*a) “Probably storing inside Zookeeper/Configmap might be
helpfulhere.”
Can you explain it in detail? I don't fully understand that.
In
myopinion,
non-active and active are the same, and no special treatment
isrequired.*
Sorry this was a misunderstanding from my side. I thought we
were
talking
about the HA mode (but not about Active and Standalone
ResourceManager).
And the original question was - how to handle the
blacklisted
nodes
list
at
the moment of leader change? Should we simply forget about
them
or
try to
pre-save that list on the remote storage?
On Sat, 7 May 2022 at 10:51, Yang Wang <
danrtsey...@gmail.com
wrote:
Thanks Lijie and ZhuZhu for the explanation.
I just overlooked the "MARK_BLOCKLISTED". For tasks level,
it
is
indeed
some functionalities the external tools(e.g. kubectl taint)
could
not
support.
Best,
Yang
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月6日周五
22:18写道:
Thanks for your feedback, Jiangang and Martijn.
@Jiangang
For auto-detecting, I wonder how to make the strategy
and
mark a
node
blocked?
In fact, we currently plan to not support auto-detection
in
this
FLIP.
The
part about auto-detection may be continued in a separate
FLIP
in
the
future. Some guys have the same concerns as you, and the
correctness
and
necessity of auto-detection may require further discussion
in
the
future.
In session mode, multi jobs can fail on the same bad
node
and
the
node
should be marked blocked.
By design, the blocklist information will be shared among
all
jobs
in a
cluster/session. The JM will sync blocklist information
with
RM.
@Martijn
I agree with Yang Wang on this.
As Zhu Zhu and I mentioned above, we think the
MARK_BLOCKLISTED(Just
limits
the load of the node and does not kill all the processes
on
it)
is
also
important, and we think that external systems (*yarn
rmadmin
or
kubectl
taint*) cannot support it. So we think it makes sense even
only
*manually*.
I also agree with Chesnay that magical mechanisms are
indeed
super
hard
to get right.
Yes, as you see, Jiangang(and a few others) have the same
concern.
However, we currently plan to not support auto-detection
in
this
FLIP,
and
only *manually*. In addition, I'd like to say that the
FLIP
provides
a
mechanism to support MARK_BLOCKLISTED and
MARK_BLOCKLISTED_AND_EVACUATE_TASKS,
the auto-detection may be done by external systems.
Best,
Lijie
Martijn Visser <mart...@ververica.com> 于2022年5月6日周五
19:04写道:
If we only support to block nodes manually, then I
could
not
see
the obvious advantages compared with current SRE's
approach(via
*yarn
rmadmin or kubectl taint*).
I agree with Yang Wang on this.
To me this sounds yet again like one of those magical
mechanisms
that
will rarely work just right.
I also agree with Chesnay that magical mechanisms are
indeed
super
hard
to
get right.
Best regards,
Martijn
On Fri, 6 May 2022 at 12:03, Jiangang Liu <
liujiangangp...@gmail.com
wrote:
Thanks for the valuable design. The auto-detecting can
decrease
great
work
for us. We have implemented the similar feature in our
inner
flink
version.
Below is something that I care about:
1. For auto-detecting, I wonder how to make the
strategy
and
mark a
node
blocked? Sometimes the blocked node is hard to be
detected,
for
example,
the upper node or the down node will be blocked when
network
unreachable.
2. I see that the strategy is made in JobMaster
side.
How
about
implementing the similar logic in resource manager?
In
session
mode,
multi
jobs can fail on the same bad node and the node
should
be
marked
blocked.
If the job makes the strategy, the node may be not
marked
blocked
if
the
fail times don't exceed the threshold.
Zhu Zhu <reed...@gmail.com> 于2022年5月5日周四 23:35写道:
Thank you for all your feedback!
Besides the answers from Lijie, I'd like to share
some
of
my
thoughts:
1. Whether to enable automatical blocklist
Generally speaking, it is not a goal of FLIP-224.
The automatical way should be something built upon
the
blocklist
mechanism and well decoupled. It was designed to be a
configurable
blocklist strategy, but I think we can further
decouple
it
by
introducing a abnormal node detector, as Becket
suggested,
which
just
uses the blocklist mechanism once bad nodes are
detected.
However,
it
should be a separate FLIP with further dev
discussions
and
feedback
from users. I also agree with Becket that different
users
have
different
requirements, and we should listen to them.
2. Is it enough to just take away abnormal nodes
externally
My answer is no. As Lijie has mentioned, we need a
way
to
avoid
deploying tasks to temporary hot nodes. In this case,
users
may
just
want to limit the load of the node and do not want to
kill
all
the
processes on it. Another case is the speculative
execution[1]
which
may also leverage this feature to avoid starting
mirror
tasks on
slow
nodes.
Thanks,
Zhu
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job
Lijie Wang <wangdachui9...@gmail.com> 于2022年5月5日周四
15:56写道:
Hi everyone,
Thanks for your feedback.
There's one detail that I'd like to re-emphasize
here
because
it
can
affect the value and design of the blocklist
mechanism
(perhaps
I
should
highlight it in the FLIP). We propose two actions in
FLIP:
1) MARK_BLOCKLISTED: Just mark the task manager or
node
as
blocked.
Future slots should not be allocated from the blocked
task
manager
or
node.
But slots that are already allocated will not be
affected.
A
typical
application scenario is to mitigate machine hotspots.
In
this
case,
we
hope
that subsequent resource allocations will not be on
the
hot
machine,
but
tasks currently running on it should not be affected.
2) MARK_BLOCKLISTED_AND_EVACUATE_TASKS: Mark the
task
manager
or
node
as
blocked, and evacuate all tasks on it. Evacuated
tasks
will
be
restarted on
non-blocked task managers.
For the above 2 actions, the former may more
highlight
the
meaning
of
this FLIP, because the external system cannot do
that.
Regarding *Manually* and *Automatically*, I
basically
agree
with
@Becket
Qin: different users have different answers. Not all
users’
deployment
environments have a special external system that can
perform
the
anomaly
detection. In addition, adding pluggable/optional
auto-detection
doesn't
require much extra work on top of manual
specification.
I will answer your other questions one by one.
@Yangze
a) I think you are right, we do not need to expose
the
`cluster.resource-blocklist.item.timeout-check-interval`
to
users.
b) We can abstract the `notifyException` to a
separate
interface
(maybe
BlocklistExceptionListener), and the
ResourceManagerBlocklistHandler
can
implement it in the future.
@Martijn
a) I also think the manual blocking should be done
by
cluster
operators.
b) I think manual blocking makes sense, because
according
to
my
experience, users are often the first to perceive the
machine
problems
(because of job failover or delay), and they will
contact
cluster
operators
to solve it, or even tell the cluster operators which
machine is
problematic. From this point of view, I think the
people
who
really
need
the manual blocking are the users, and it’s just
performed
by
the
cluster
operator, so I think the manual blocking makes sense.
@Chesnay
We need to touch the logic of JM/SlotPool, because
for
MARK_BLOCKLISTED
, we need to know whether the slot is blocklisted
when
the
task
is
FINISHED/CANCELLED/FAILED. If so, SlotPool should
release
the
slot
directly to avoid assigning other tasks (of this job)
on
it.
If
we
only
maintain the blocklist information on the RM, JM
needs
to
retrieve
it
by
RPC. I think the performance overhead of that is
relatively
large,
so
I
think it's worth maintaining the blocklist
information
on
the JM
side
and
syncing them.
@Роман
a) “Probably storing inside Zookeeper/Configmap
might
be
helpful
here.” Can you explain it in detail? I don't fully
understand
that.
In
my
opinion, non-active and active are the same, and no
special
treatment
is
required.
b) I agree with you, the `endTimestamp` makes
sense,
I
will
add
it
to
FLIP.
@Yang
As mentioned above, AFAK, the external system
cannot
support
the
MARK_BLOCKLISTED action.
Looking forward to your further feedback.
Best,
Lijie
Yang Wang <danrtsey...@gmail.com> 于2022年5月3日周二
21:09写道:
Thanks Lijie and Zhu for creating the proposal.
I want to share some thoughts about Flink cluster
operations.
In the production environment, the SRE(aka Site
Reliability
Engineer)
already has many tools to detect the unstable
nodes,
which
could
take
the
system logs/metrics into consideration.
Then they use graceful-decomission in YARN and
taint
in
K8s
to
prevent
new
allocations on these unstable nodes.
At last, they will evict all the containers and
pods
running
on
these
nodes.
This mechanism also works for planned maintenance.
So
I
am
afraid
this
is
not the typical use case for FLIP-224.
If we only support to block nodes manually, then I
could
not
see
the obvious advantages compared with current SRE's
approach(via
*yarn
rmadmin or kubectl taint*).
At least, we need to have a pluggable component
which
could
expose
the
potential unstable nodes automatically and block
them
if
enabled
explicitly.
Best,
Yang
Becket Qin <becket....@gmail.com> 于2022年5月2日周一
16:36写道:
Thanks for the proposal, Lijie.
This is an interesting feature and discussion,
and
somewhat
related
to the
design principle about how people should operate
Flink.
I think there are three things involved in this
FLIP.
a) Detect and report the unstable node.
b) Collect the information of the unstable
node
and
form a
blocklist.
c) Take the action to block nodes.
My two cents:
1. It looks like people all agree that Flink
should
have
c).
It
is
not only
useful for cases of node failures, but also
handy
for
some
planned
maintenance.
2. People have different opinions on b), i.e.
who
should be
the
brain
to
make the decision to block a node. I think this
largely
depends
on
who we
talk to. Different users would probably give
different
answers.
For
people
who do have a centralized node health management
service,
let
Flink
do just
do a) and c) would be preferred. So essentially
Flink
would
be
one
of
the
sources that may detect unstable nodes, report
it
to
that
service,
and then
take the command from that service to block the
problematic
nodes.
On
the
other hand, for users who do not have such a
service,
simply
letting
Flink
be clever by itself to block the suspicious
nodes
might
be
desired
to
ensure the jobs are running smoothly.
So that indicates a) and b) here should be
pluggable /
optional.
In light of this, maybe it would make sense to
have
something
pluggable
like a UnstableNodeReporter which exposes
unstable
nodes
actively.
(A
more
general interface should be JobInfoReporter<T>
which
can be
used
to
report
any information of type <T>. But I'll just keep
the
scope
relevant
to
this
FLIP here). Personally speaking, I think it is
OK
to
have a
default
implementation of a reporter which just tells
Flink
to
take
action
to
block
problematic nodes and also unblocks them after
timeout.
Thanks,
Jiangjie (Becket) Qin
On Mon, May 2, 2022 at 3:27 PM Роман Бойко <
ro.v.bo...@gmail.com
wrote:
Thanks for good initiative, Lijie and Zhu!
If it's possible I'd like to participate in
development.
I agree with 3rd point of Konstantin's reply -
we
should
consider
to move
somehow the information of blocklisted
nodes/TMs
from
active
ResourceManager to non-active ones. Probably
storing
inside
Zookeeper/Configmap might be helpful here.
And I agree with Martijn that a lot of
organizations
don't
want
to
expose
such API for a cluster user group. But I think
it's
necessary
to
have the
mechanism for unblocking the nodes/TMs anyway
for
avoiding
incorrect
automatic behaviour.
And another one small suggestion - I think it
would
be
better
to
extend
the
*BlocklistedItem* class with the
*endTimestamp*
field
and
fill
it
at the
item creation. This simple addition will allow
to:
-
Provide the ability to users to setup the
exact
time
of
blocklist end
through RestAPI
-
Not being tied to a single value of
*cluster.resource-blacklist.item.timeout*
On Mon, 2 May 2022 at 14:17, Chesnay Schepler
<
ches...@apache.org>
wrote:
I do share the concern between blurring the
lines
a
bit.
That said, I'd prefer to not have any
auto-detection
and
only
have an
opt-in mechanism
to manually block processes/nodes. To me
this
sounds
yet
again
like one
of those
magical mechanisms that will rarely work
just
right.
An external system can leverage way more
information
after
all.
Moreover, I'm quite concerned about the
complexity
of
this
proposal.
Tracking on both the RM/JM side; syncing
between
components;
adjustments
to the
slot and resource protocol.
In a way it seems overly complicated.
If we look at it purely from an active
resource
management
perspective,
then there
isn't really a need to touch the slot
protocol
at
all
(or
in
fact
to
anything in the JobMaster),
because there isn't any point in keeping
around
blocked
TMs
in
the
first
place.
They'd just be idling, potentially shutting
down
after
a
while
by
the
RM
because of
it (unless we _also_ touch that logic).
Here the blocking of a process (be it by
blocking
the
process
or
node)
is
equivalent with shutting down the blocked
process(es).
Once the block is lifted we can just spin it
back
up.
And I do wonder whether we couldn't apply
the
same
line
of
thinking to
standalone resource management.
Here being able to stop/restart a
process/node
manually
should
be
a
core
requirement for a Flink deployment anyway.
On 02/05/2022 08:49, Martijn Visser wrote:
Hi everyone,
Thanks for creating this FLIP. I can
understand
the
problem
and
I see
value
in the automatic detection and
blocklisting. I
do
have
some
concerns
with
the ability to manually specify to be
blocked
resources.
I
have
two
concerns;
* Most organizations explicitly have a
separation
of
concerns,
meaning
that
there's a group who's responsible for
managing a
cluster
and
there's
a
user
group who uses that cluster. With the
introduction of
this
mechanism,
the
latter group now can influence the
responsibility
of
the
first
group.
So
it
can be possible that someone from the user
group
blocks
something,
which
causes an outage (which could result in
paging
mechanism
triggering
etc)
which impacts the first group.
* How big is the group of people who can
go
through
the
process
of
manually
identifying a node that isn't behaving as
it
should
be? I
do
think
this
group is relatively limited. Does it then
make
sense
to
introduce
such
a
feature, which would only be used by a
really
small
user
group
of
Flink?
We
still have to maintain, test and support
such
a
feature.
I'm +1 for the autodetection features, but
I'm
leaning
towards
not
exposing
this to the user group but having this
available
strictly
for
cluster
operators. They could then also set up
their
paging/metrics/logging
system
to take this into account.
Best regards,
Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser
On Fri, 29 Apr 2022 at 09:39, Yangze Guo <
karma...@gmail.com
wrote:
Thanks for driving this, Zhu and Lijie.
+1 for the overall proposal. Just share
some
cents
here:
- Why do we need to expose
cluster.resource-blacklist.item.timeout-check-interval
to
the
user?
I think the semantics of
`cluster.resource-blacklist.item.timeout`
is
sufficient for the user. How to guarantee
the
timeout
mechanism is
Flink's internal implementation. I think
it
will
be
very
confusing
and
we do not need to expose it to users.
- ResourceManager can notify the
exception
of a
task
manager to
`BlacklistHandler` as well.
For example, the slot allocation might
fail
in
case
the
target
task
manager is busy or has a network jitter.
I
don't
mean
we
need
to
cover
this case in this version, but we can
also
open a
`notifyException`
in
`ResourceManagerBlacklistHandler`.
- Before we sync the blocklist to
ResourceManager,
will
the
slot of
a
blocked task manager continues to be
released
and
allocated?
Best,
Yangze Guo
On Thu, Apr 28, 2022 at 3:11 PM Lijie
Wang
<
wangdachui9...@gmail.com>
wrote:
Hi Konstantin,
Thanks for your feedback. I will
response
your 4
remarks:
1) Thanks for reminding me of the
controversy. I
think
“BlockList”
is
good
enough, and I will change it in FLIP.
2) Your suggestion for the REST API is a
good
idea.
Based
on
the
above, I
would change REST API as following:
POST/GET <host>/blocklist/nodes
POST/GET <host>/blocklist/taskmanagers
DELETE
<host>/blocklist/node/<identifier>
DELETE
<host>/blocklist/taskmanager/<identifier>
3) If a node is blocking/blocklisted, it
means
that
all
task
managers
on
this node are blocklisted. All slots on
these
TMs
are
not
available.
This
is actually a bit like TM losts, but
these
TMs
are
not
really
lost,
they
are in an unavailable status, and they
are
still
registered
in this
flink
cluster. They will be available again
once
the
corresponding
blocklist
item
is removed. This behavior is the same in
active/non-active
clusters.
However in the active clusters, these
TMs
may
be
released
due
to
idle
timeouts.
4) For the item timeout, I prefer to
keep
it.
The
reasons
are
as
following:
a) The timeout will not affect users
adding
or
removing
items
via
REST
API,
and users can disable it by configuring
it
to
Long.MAX_VALUE .
b) Some node problems can recover after
a
period of
time
(such as
machine
hotspots), in which case users may
prefer
that
Flink
can
do
this
automatically instead of requiring the
user
to
do
it
manually.
Best,
Lijie
Konstantin Knauf <kna...@apache.org>
于2022年4月27日周三
19:23写道:
Hi Lijie,
I think, this makes sense and +1 to
only
support
manually
blocking
taskmanagers and nodes. Maybe the
different
strategies
can
also be
maintained outside of Apache Flink.
A few remarks:
1) Can we use another term than
"bla.cklist"
due
to
the
controversy
around
the term? [1] There was also a Jira
Ticket
about
this
topic a
while
back
and there was generally a consensus to
avoid
the
term
blacklist &
whitelist
[2]? We could use "blocklist"
"denylist"
or
"quarantined"
2) For the REST API, I'd prefer a
slightly
different
design
as
verbs
like
add/remove often considered an
anti-pattern
for
REST
APIs.
POST
on a
list
item is generally the standard to add
items.
DELETE
on
the
individual
resource is standard to remove an item.
POST <host>/quarantine/items
DELETE
<host>/quarantine/items/<itemidentifier>
We could also consider to separate
taskmanagers
and
nodes
in
the
REST
API
(and internal data structures). Any
opinion
on
this?
POST/GET <host>/quarantine/nodes
POST/GET <host>/quarantine/taskmanager
DELETE
<host>/quarantine/nodes/<identifier>
DELETE
<host>/quarantine/taskmanager/<identifier>
3) How would blocking nodes behave with
non-active
resource
managers,
i.e.
standalone or reactive mode?
4) To keep the implementation even more
minimal,
do
we
need
the
timeout
behavior? If items are added/removed
manually
we
could
delegate
this
to the
user easily. In my opinion the timeout
behavior
would
better
fit
into
specific strategies at a later point.
Looking forward to your thoughts.
Cheers and thank you,
Konstantin
[1]
https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term
[2]
https://issues.apache.org/jira/browse/FLINK-18209
Am Mi., 27. Apr. 2022 um 04:04 Uhr
schrieb
Lijie
Wang
<
wangdachui9...@gmail.com>:
Hi all,
Flink job failures may happen due to
cluster
node
issues
(insufficient
disk
space, bad hardware, network
abnormalities).
Flink
will
take care
of
the
failures and redeploy the tasks.
However,
due
to
data
locality
and
limited
resources, the new tasks are very
likely
to
be
redeployed
to the
same
nodes, which will result in continuous
task
abnormalities
and
affect
job
progress.
Currently, Flink users need to
manually
identify
the
problematic
node and
take it offline to solve this problem.
But
this
approach
has
following
disadvantages:
1. Taking a node offline can be a
heavy
process.
Users
may
need
to
contact
cluster administors to do this. The
operation
can
even
be
dangerous
and
not
allowed during some important business
events.
2. Identifying and solving this kind
of
problems
manually
would
be
slow
and
a waste of human resources.
To solve this problem, Zhu Zhu and I
propose
to
introduce a
blacklist
mechanism for Flink to filter out
problematic
resources.
You can find more details in
FLIP-224[1].
Looking
forward
to your
feedback.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blacklist+Mechanism
Best,
Lijie
--
Best regards,
Roman Boyko
e.: ro.v.bo...@gmail.com
--
https://twitter.com/snntrable
https://github.com/knaufk
--
https://twitter.com/snntrable
https://github.com/knaufk