Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/3696#discussion_r21844488
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---
@@ -110,4 +113,165 @@ class ExecutorRunnable(
nmClient.startContainer(container, ctx)
}
+ private def prepareCommand(
+ masterAddress: String,
+ slaveId: String,
+ hostname: String,
+ executorMemory: Int,
+ executorCores: Int,
+ appId: String,
+ localResources: HashMap[String, LocalResource]): List[String] = {
+ // Extra options for the JVM
+ val javaOpts = ListBuffer[String]()
+
+ // Set the environment variable through a command prefix
+ // to append to the existing value of the variable
+ var prefixEnv: Option[String] = None
+
+ // Set the JVM memory
+ val executorMemoryString = executorMemory + "m"
+ javaOpts += "-Xms" + executorMemoryString + " -Xmx" +
executorMemoryString + " "
+
+ // Set extra Java options for the executor, if defined
+ sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+ javaOpts += opts
+ }
+ sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
+ javaOpts += opts
+ }
+ sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+ prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+ }
+
+ javaOpts += "-Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
+ // Certain configs need to be passed here because they are needed
before the Executor
+ // registers with the Scheduler and transfers the spark configs. Since
the Executor backend
+ // uses Akka to connect to the scheduler, the akka settings are needed
as well as the
+ // authentication settings.
+ sparkConf.getAll.
+ filter { case (k, v) => k.startsWith("spark.auth") ||
k.startsWith("spark.akka") }.
+ foreach { case (k, v) => javaOpts +=
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+ sparkConf.getAkkaConf.
+ foreach { case (k, v) => javaOpts +=
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+ // Commenting it out for now - so that people can refer to the
properties if required. Remove
+ // it once cpuset version is pushed out.
+ // The context is, default gc for server class machines end up using
all cores to do gc - hence
+ // if there are multiple containers in same node, spark gc effects all
other containers
+ // performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark
behaves 'properly' in
+ // multi-tenant environments. Not sure how default java gc behaves if
it is limited to subset
+ // of cores on a node.
+ /*
+ else {
+ // If no java_opts specified, default to using
-XX:+CMSIncrementalMode
+ // It might be possible that other modes/config is being done in
+ // spark.executor.extraJavaOptions, so we dont want to mess with
it.
+ // In our expts, using (default) throughput collector has severe
perf ramnifications in
+ // multi-tennent machines
+ // The options are based on
+ //
http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
+ // %20the%20Concurrent%20Low%20Pause%20Collector|outline
+ javaOpts += " -XX:+UseConcMarkSweepGC "
+ javaOpts += " -XX:+CMSIncrementalMode "
+ javaOpts += " -XX:+CMSIncrementalPacing "
+ javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+ javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
+ }
+ */
+
+ // For log4j configuration to reference
+ javaOpts += ("-Dspark.yarn.app.container.log.dir=" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+
+ val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() +
"/bin/java",
+ "-server",
+ // Kill if OOM is raised - leverage yarn's failure handling to cause
rescheduling.
+ // Not killing the task leaves various aspects of the executor and
(to some extent) the jvm in
+ // an inconsistent state.
+ // TODO: If the OOM is not recoverable by rescheduling it on
different node, then do
+ // 'something' to fail job ... akin to blacklisting trackers in
mapred ?
+ "-XX:OnOutOfMemoryError='kill %p'") ++
+ javaOpts ++
+ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+ masterAddress.toString,
+ slaveId.toString,
+ hostname.toString,
+ executorCores.toString,
+ appId,
+ "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+ // TODO: it would be nicer to just make sure there are no null
commands here
+ commands.map(s => if (s == null) "null" else s).toList
+ }
+
+ private def setupDistributedCache(
+ file: String,
--- End diff --
Indentation is wrong here. tabs vs. spaces?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]