Hi all,

I have a prototype for "Keep track of nodes which are going to be shut down
& avoid scheduling new tasks" (
https://issues.apache.org/jira/browse/SPARK-20628) that I would like to
discuss with the community. I added a WIP PR for that in
https://github.com/apache/spark/pull/19267. The basic idea is implementing
a mechanism similar to YARN's graceful decommission, but for Spark. There
is a design document for this in
https://github.com/apache/spark/files/1349653/Spark_Blacklisting_on_decommissioning-Scope.pdf.
I would like to know the opinion of the list on this approach.

*More details about this proposal*
In the PR we define a HostState type to represent the state of the cluster
nodes, and take actions in
CoarseGrainedSchedulerBackend.handleUpdatedHostState when a node
transitions into a state where the node becomes partially or totally
unavailable. Just like in YARN or Mesos, we propose a decommission
mechanism with 2 phases, first a drain phase where the node is still
running but not accepting further work (DECOMMISSIONING in YARN, and DRAIN
in Mesos), followed by a second phase where executors in the node are
forcibly shut down (DECOMMISIONED in YARN, and DOWN in Mesos). In this PR
we focus only in YARN, and in the actions when the node transitions into
DECOMMISSIONING state: blacklisting the node when it transitions to
DECOMMISSIONING, and un-blacklist the node when it gets back to the normal
healthy RUNNING state.
The decommissioning process would not be initiated by Spark, but by an
operator or an automated system (e.g. the cloud environment where YARN is
running), on response to some relevant event (e.g. a cluster resize event),
and it would consist on calling the YARN administrative command yarn
rmadmin -refreshNodes -g for the affected node. Spark would just react to
the node state transition events it receives from the cluster manager.
To make this extensible to other cluster managers besides YARN, we define
the HostState type in Spark, and keep the interaction with the specifics of
each cluster manager into the corresponding packages. For example for YARN,
when YarnAllocator gets a node state transition event, it converts the node
event from the YARN specific NodeState into HostState, wraps it into a
HostStatusUpdate message, and sends it to the
CoarseGrainedSchedulerBackend, that then performs the required actions for
that node.

This code works on a modified version of Hadoop 2.7.3 with patches to
support YARN-4676 (basic graceful decommission), and an approximation to
YARN-3224 (when a node transitions into DECOMMISSIONING state the resource
manager notifies that to each relevant application master by adding it to
the list of updated nodes available in the AllocateResponse returned by the
RM as a response to the AM heartbeat). For these reasons, this code won't
work as-is on vanilla Hadoop. The main problem is that the decommissioning
mechanism for YARN is not completely implemented (see YARN-914), and some
of the parts that are implemented are only available for YARN 2.9.0 (see
YARN-4676). To cope with this, we propose implementing an administrative
command to send node transitions directly to the Spark driver, as
HostStatusUpdate messages addressed to the CoarseGrainedSchedulerBackend.
This command would be similar to the yarn rmadmin -refreshNodes -g, which
is currently used for decommissioning nodes in YARN. When YARN-914 is
complete, this could still be used as a secondary interface for
decommissioning nodes, so nodes transitions could be signaled either by the
cluster manager, or using the administrative command (either manually or
through some automation implemented by the cloud environment).

We would like to get some feedback on this approach in general, and in the
administrative command solution in particular. If that sounds good, then we
will work on modifying this PR so it works on vanilla Hadoop 2.7, and to
implement the administrative command.

Thanks,

Juan Rodriguez Hortala

Reply via email to