This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new bdda09744d KYLIN-5966 Reorder the loading order of gluten jar
bdda09744d is described below
commit bdda09744da19b06a9aade8b95615b51766f81d1
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue Sep 3 17:42:51 2024 +0800
KYLIN-5966 Reorder the loading order of gluten jar
1. Reorder the loading order of gluten jar
2. Set default properties
---------
Co-authored-by: Zhiting Guo <[email protected]>
Co-authored-by: zhong.zhu <[email protected]>
---
src/assembly/pom.xml | 16 +++++
.../src/main/resources/kylin-defaults0.properties | 71 +++++++++++++++++++---
.../kylin/job/execution/NSparkExecutableTest.java | 4 +-
.../test_case_data/localmeta/kylin.properties | 8 +++
.../test_case_data/sandbox/kylin.properties | 7 +++
.../kylin/job/execution/NSparkExecutable.java | 44 +++++++++++---
.../scala/org/apache/spark/sql/KylinSession.scala | 26 ++++++--
7 files changed, 152 insertions(+), 24 deletions(-)
diff --git a/src/assembly/pom.xml b/src/assembly/pom.xml
index a4f5fb7d03..41997bff66 100644
--- a/src/assembly/pom.xml
+++ b/src/assembly/pom.xml
@@ -56,6 +56,16 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-spark</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>delta-core_2.12</artifactId>
+ <groupId>io.delta</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>delta-storage</artifactId>
+ <groupId>io.delta</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -80,6 +90,12 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-sparder</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>delta-core_2.12</artifactId>
+ <groupId>io.delta</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties
b/src/core-common/src/main/resources/kylin-defaults0.properties
index 9e8c482a57..b89043ff69 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -75,7 +75,6 @@
kylin.engine.spark-conf.spark.eventLog.dir=${kylin.env.hdfs-working-dir}/spark-h
kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8
-Dhdp.version=current -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=${KYLIN_HOME}/logs
kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-Dlog4j.configurationFile=spark-appmaster-log4j.xml
kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml
-Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir}
-Dkap.metadata.identifier=${kylin.metadata.url.identifier}
-Dkap.spark.category=job -Dkap.spark.project=${job.project}
-Dkap.spark.identifier=${job.id} -Dkap.spark.jobName=${job.stepId}
-Duser.timezone=${user.timezone} -Dkap.spark.mountDir=${job.mountDir}
-kylin.engine.spark-conf.spark.yarn.dist.files=${kylin.log.spark-executor-properties-file},${kylin.log.spark-appmaster-properties-file}
kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
kylin.engine.spark-conf.spark.hadoop.hive.exec.scratchdir=${kylin.env.hdfs-working-dir}/hive-scratch
kylin.engine.spark-conf.spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true
@@ -117,11 +116,6 @@
kylin.engine.spark-conf.spark.sql.optimizer.dynamicPartitionPruning.enabled=fals
# for storageV3
kylin.engine.spark-conf.spark.databricks.delta.legacy.allowAmbiguousPathsInCreateTable=true
-# gluten
-kylin.engine.spark-conf.spark.gluten.enabled=false
-kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
-kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
-
# ==================== QUERY SPARK CONTEXT & COLUMNAR STORAGE
====================
kylin.storage.quota-in-giga-bytes=10240
@@ -198,10 +192,6 @@
kylin.storage.columnar.spark-conf.spark.sql.legacy.timeParserPolicy=LEGACY
# spark3 close DPP feature
kylin.storage.columnar.spark-conf.spark.sql.optimizer.dynamicPartitionPruning.enabled=false
-# gluten
-kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
-kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
-
# ==================== JOB SCHEDULER ====================
# max job retry on error, default 0: no retry
@@ -481,3 +471,64 @@
kylin.streaming.spark-conf.spark.sql.hive.metastore.version=1.2.2
kylin.streaming.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8
-Dhdp.version=current -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=${KYLIN_HOME}/logs
kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
-Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir}
-Dkap.metadata.identifier=${kylin.metadata.url.identifier}
-Dkap.spark.category=streaming_job
+
+
+# gluten & internal table
+## for query
+kylin.storage.columnar.spark-conf.spark.gluten.enabled=true
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.libpath=${KYLIN_HOME}/server/libch.so
+kylin.storage.columnar.spark-conf.spark.gluten.sql.countDistinctWithoutExpand=true
+kylin.storage.columnar.spark-conf.spark.gluten.sql.enable.native.validation=false
+kylin.storage.columnar.spark-conf.spark.memory.offHeap.enabled=true
+kylin.storage.columnar.spark-conf.spark.memory.offHeap.size=12g
+kylin.storage.columnar.spark-conf.spark.plugins=org.apache.gluten.GlutenPlugin
+kylin.storage.columnar.spark-conf.spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager
+kylin.storage.columnar.spark-conf.spark.sql.autoBroadcastJoinThreshold=100MB
+kylin.storage.columnar.spark-conf.spark.sql.decimalOperations.allowPrecisionLoss=false
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type=hdfs_gluten
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path=/tmp/ch_metadata_kylin
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type=cache
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk=hdfs
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path=/tmp/hdfs_cache_kylin
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size=256Gi
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes=main
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes.main.disk=hdfs_cache
+kylin.storage.columnar.spark-conf.spark.sql.catalog.INTERNAL_CATALOG=org.apache.spark.sql.execution.datasources.v2.kyinternal.KyinternalCatalog
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.path=/tmp/gluten_default
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.tmp_path=/tmp/kyligence_glt/tmp_ch
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
+
+## on yarn
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.libhdfs3_conf=/etc/hadoop/conf/hdfs-site.xml
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.executor.libpath=libch.so
+kylin.storage.columnar.spark-conf.spark.executorEnv.LD_PRELOAD=$PWD/libch.so
+kylin.query.engine.sparder-additional-files=${KYLIN_HOME}/server/libch.so,${KYLIN_HOME}/server/conf/spark-executor-log4j.xml,${KYLIN_HOME}/server/conf/spark-appmaster-log4j.xml,${KYLIN_HOME}/lib/libasyncProfiler-linux-x64.so,${KYLIN_HOME}/lib/libasyncProfiler-linux-arm64.so
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.use_current_directory_as_tmp=true
+kylin.storage.columnar.spark-conf.spark.gluten.sql.executor.jar.path=${KYLIN_HOME}/lib/ext/gluten.jar
+
+## for build
+kylin.engine.spark-conf.spark.gluten.enabled=true
+kylin.engine.spark-conf.spark.gluten.sql.columnar.libpath=${KYLIN_HOME}/server/libch.so
+kylin.engine.spark-conf.spark.gluten.sql.countDistinctWithoutExpand=true
+kylin.engine.spark-conf.spark.gluten.sql.enable.native.validation=false
+kylin.engine.spark-conf.spark.memory.offHeap.enabled=true
+kylin.engine.spark-conf.spark.memory.offHeap.size=12g
+kylin.engine.spark-conf.spark.plugins=org.apache.gluten.GlutenPlugin
+kylin.engine.spark-conf.spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type=hdfs_gluten
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path=/tmp/kyligence_glt/ch_metadata
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes.main.disk=hdfs
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes=main
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.path=/tmp/gluten_default
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.tmp_path=/tmp/kyligence_glt/tmp_ch
+kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
+kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
+## on yarn
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.libhdfs3_conf=/etc/hadoop/conf/hdfs-site.xml
+kylin.engine.spark-conf.spark.gluten.sql.columnar.executor.libpath=libch.so
+kylin.engine.spark-conf.spark.executorEnv.LD_PRELOAD=$PWD/libch.so
+kylin.engine.spark-conf.spark.yarn.dist.files=${KYLIN_HOME}/server/libch.so,${kylin.log.spark-executor-properties-file},${kylin.log.spark-appmaster-properties-file}
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.use_current_directory_as_tmp=true
+kylin.engine.spark-conf.spark.gluten.sql.driver.jar.path=${KYLIN_HOME}/lib/ext/gluten.jar
+kylin.engine.spark-conf.spark.gluten.sql.executor.jar.path=${KYLIN_HOME}/lib/ext/gluten.jar
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
index 183fe9244f..45ff34e22c 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
@@ -154,7 +154,7 @@ public class NSparkExecutableTest extends
NLocalFileMetadataTestCase {
String cmd = (String)
sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
Assert.assertNotNull(cmd);
- Assert.assertTrue(cmd.contains("spark.plugins=" +
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
+ Assert.assertTrue(cmd.contains("spark.plugins=," +
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
}
overwriteSystemProp("kylin.engine.spark-conf.spark.plugins",
@@ -180,7 +180,7 @@ public class NSparkExecutableTest extends
NLocalFileMetadataTestCase {
String cmd = (String)
sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
Assert.assertNotNull(cmd);
- Assert.assertFalse(cmd.contains("spark.plugins=" +
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
+ Assert.assertFalse(cmd.contains("spark.plugins=," +
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
}
overwriteSystemProp("kylin.engine.spark-conf.spark.driver.extraJavaOptions",
diff --git a/src/examples/test_case_data/localmeta/kylin.properties
b/src/examples/test_case_data/localmeta/kylin.properties
index 2eb95ecf29..090977bbf0 100755
--- a/src/examples/test_case_data/localmeta/kylin.properties
+++ b/src/examples/test_case_data/localmeta/kylin.properties
@@ -170,3 +170,11 @@
kylin.storage.columnar.spark-conf.spark.executor.instances=1
kylin.storage.columnar.spark-conf.spark.executor.memory=512m
kylin.storage.columnar.spark-conf.spark.executor.memoryOverhead=512m
kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=../../build/spark/hive_1_2_2/*
+
+# disable gluten
+kylin.storage.columnar.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.plugins=
+kylin.storage.columnar.spark-conf.spark.plugins=
+kylin.engine.spark-conf.spark.memory.offHeap.enabled=false
+kylin.storage.columnar.spark-conf.spark.sql.decimalOperations.allowPrecisionLoss=true
diff --git a/src/examples/test_case_data/sandbox/kylin.properties
b/src/examples/test_case_data/sandbox/kylin.properties
index 854b2ae343..f0818905f1 100644
--- a/src/examples/test_case_data/sandbox/kylin.properties
+++ b/src/examples/test_case_data/sandbox/kylin.properties
@@ -190,3 +190,10 @@
kylin.streaming.spark.job-jar=../assembly/target/kylin-assembly-5.0.0-SNAPSHOT-j
kylin.query.escape-default-keyword=true
kylin.monitor.enabled=false
+
+# disable gluten
+kylin.storage.columnar.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.plugins=
+kylin.storage.columnar.spark-conf.spark.plugins=
+kylin.engine.spark-conf.spark.memory.offHeap.enabled=false
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
index 07dd582b6e..73ab5ac470 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
@@ -888,12 +888,16 @@ public class NSparkExecutable extends AbstractExecutable
implements ChainedStage
// (assembled in the kylinJobJar)
// will be in NM container's classpath.
+ Set<String> sparkJars = Sets.newLinkedHashSet();
+ boolean glutenEnabled =
Boolean.parseBoolean(sparkConf.get("spark.gluten.enabled"));
// Cluster mode.
if (isClusterMode(sparkConf)) {
// On yarn cluster mode,
// application jar (kylinJobJar here) would ln as '__app__.jar'.
- Set<String> sparkJars = Sets.newLinkedHashSet();
sparkJars.add(APP_JAR_NAME);
+ if (glutenEnabled) {
+ sparkJars.add("gluten.jar");
+ }
sparkJars.addAll(getSparkJars(kylinConf, sparkConf));
final String jointJarNames = sparkJars.stream().map(jar ->
Paths.get(jar).getFileName().toString()).sorted()
.collect(Collectors.joining(COLON));
@@ -903,12 +907,33 @@ public class NSparkExecutable extends AbstractExecutable
implements ChainedStage
}
// Client mode.
- Set<String> sparkJars = getSparkJars(kylinConf, sparkConf);
- sparkConf.put(DRIVER_EXTRA_CLASSPATH,
- sparkJars.stream().sorted(Comparator.comparing(jar ->
jar.substring(jar.lastIndexOf(PATH_DELIMITER))))
- .collect(Collectors.joining(COLON)));
- sparkConf.put(EXECUTOR_EXTRA_CLASSPATH, sparkJars.stream().map(jar ->
Paths.get(jar).getFileName().toString())
- .sorted().collect(Collectors.joining(COLON)));
+ sparkJars.addAll(getSparkJars(kylinConf, sparkConf));
+ if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
+ if (glutenEnabled) {
+
sparkJars.add(sparkConf.get("spark.gluten.sql.driver.jar.path"));
+ }
+ sparkConf.put(DRIVER_EXTRA_CLASSPATH,
+ sparkJars.stream().sorted(
+ Comparator.comparing(jar ->
jar.substring(jar.lastIndexOf(PATH_DELIMITER))))
+ .collect(Collectors.joining(COLON)));
+ sparkConf.put(EXECUTOR_EXTRA_CLASSPATH,
+ sparkJars.stream().map(jar ->
Paths.get(jar).getFileName().toString())
+ .sorted().collect(Collectors.joining(COLON)));
+ } else {
+ String driverCp = sparkJars.stream().sorted(
+ Comparator.comparing(jar ->
jar.substring(jar.lastIndexOf(PATH_DELIMITER))))
+ .collect(Collectors.joining(COLON));
+ String executorCp = sparkJars.stream()
+ .map(jar -> Paths.get(jar).getFileName().toString())
+ .sorted().collect(Collectors.joining(COLON));
+ if (glutenEnabled) {
+ driverCp = sparkConf.get("spark.gluten.sql.driver.jar.path") +
COLON + driverCp;
+ executorCp =
sparkConf.get("spark.gluten.sql.executor.jar.path")
+ + COLON + executorCp;
+ }
+ sparkConf.put(DRIVER_EXTRA_CLASSPATH, driverCp);
+ sparkConf.put(EXECUTOR_EXTRA_CLASSPATH, executorCp);
+ }
}
private void rewriteDriverLog4jConf(StringBuilder sb, KylinConfig config,
Map<String, String> sparkConf) {
@@ -959,6 +984,11 @@ public class NSparkExecutable extends AbstractExecutable
implements ChainedStage
filePaths.add(sparkConf.get(SPARK_FILES_1));
filePaths.add(sparkConf.get(SPARK_FILES_2));
+ if (sparkConf.get(SPARK_MASTER).startsWith("yarn")
+ &&
Boolean.parseBoolean(sparkConf.get("spark.gluten.enabled"))) {
+ filePaths.add(sparkConf.get("spark.gluten.sql.driver.jar.path"));
+ }
+
LinkedHashSet<String> sparkFiles = filePaths.stream() //
.filter(StringUtils::isNotEmpty) //
.flatMap(p -> Arrays.stream(StringUtils.split(p, COMMA))) //
diff --git
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
index eb79cee6a7..10520c603b 100644
---
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -115,7 +115,10 @@ class KylinSession(
object KylinSession extends Logging {
val NORMAL_FAIR_SCHEDULER_FILE_NAME: String = "/fairscheduler.xml"
val QUERY_LIMIT_FAIR_SCHEDULER_FILE_NAME: String =
"/query-limit-fair-scheduler.xml"
+ val SPARK_MASTER = "spark.master"
val SPARK_PLUGINS_KEY = "spark.plugins"
+ val SPARK_YARN_DIST_FILE = "spark.yarn.dist.files"
+ val SPARK_EXECUTOR_JAR_PATH = "spark.gluten.sql.executor.jar.path"
implicit class KylinBuilder(builder: Builder) {
var queryCluster: Boolean = true
@@ -249,7 +252,7 @@ object KylinSession extends Logging {
}
// the length of the `podNamePrefix` needs to be less than or equal to 47
- sparkConf.get("spark.master") match {
+ sparkConf.get(SPARK_MASTER) match {
case v if v.startsWith("k8s") =>
val appName = sparkConf.get("spark.app.name",
System.getenv("HOSTNAME"))
val podNamePrefix = generateExecutorPodNamePrefixForK8s(appName)
@@ -292,7 +295,7 @@ object KylinSession extends Logging {
}
if (!"true".equalsIgnoreCase(System.getProperty("spark.local"))) {
- if (sparkConf.get("spark.master").startsWith("yarn")) {
+ if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
// TODO Less elegant implementation.
val applicationJar =
KylinConfig.getInstanceFromEnv.getKylinJobJarPath
val yarnDistJarsConf = "spark.yarn.dist.jars"
@@ -302,14 +305,14 @@ object KylinSession extends Logging {
applicationJar
}
sparkConf.set(yarnDistJarsConf, distJars)
- sparkConf.set("spark.yarn.dist.files", kapConfig.sparderFiles())
+ sparkConf.set(SPARK_YARN_DIST_FILE, kapConfig.sparderFiles())
} else {
sparkConf.set("spark.jars", kapConfig.sparderJars)
sparkConf.set("spark.files", kapConfig.sparderFiles())
}
// spark on k8s with client mode, set the spark.driver.host = local ip
- if (sparkConf.get("spark.master").startsWith("k8s") &&
"client".equals(sparkConf.get("spark.submit.deployMode", "client"))) {
+ if (sparkConf.get(SPARK_MASTER).startsWith("k8s") &&
"client".equals(sparkConf.get("spark.submit.deployMode", "client"))) {
if (!sparkConf.contains("spark.driver.host")) {
sparkConf.set("spark.driver.host",
AddressUtil.getLocalHostExactAddress)
}
@@ -317,7 +320,20 @@ object KylinSession extends Logging {
var extraJars =
Paths.get(KylinConfig.getInstanceFromEnv.getKylinJobJarPath).getFileName.toString
if (KylinConfig.getInstanceFromEnv.queryUseGlutenEnabled) {
- extraJars = "gluten.jar:" + extraJars
+ if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
+ val distFiles = sparkConf.get(SPARK_YARN_DIST_FILE)
+ if (distFiles.isEmpty) {
+ sparkConf.set(SPARK_YARN_DIST_FILE,
+ sparkConf.get(SPARK_EXECUTOR_JAR_PATH))
+ } else {
+ sparkConf.set(SPARK_YARN_DIST_FILE,
+ sparkConf.get(SPARK_EXECUTOR_JAR_PATH) + "," + distFiles)
+ }
+ extraJars = "gluten.jar" + File.pathSeparator + extraJars
+ } else {
+ extraJars = sparkConf.get(SPARK_EXECUTOR_JAR_PATH) +
+ File.pathSeparator + extraJars
+ }
}
sparkConf.set("spark.executor.extraClassPath", extraJars)