This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new bdda09744d KYLIN-5966 Reorder the loading order of gluten jar
bdda09744d is described below

commit bdda09744da19b06a9aade8b95615b51766f81d1
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue Sep 3 17:42:51 2024 +0800

    KYLIN-5966 Reorder the loading order of gluten jar
    
    1. Reorder the loading order of gluten jar
    2. Set default properties
    
    ---------
    Co-authored-by: Zhiting Guo <[email protected]>
    Co-authored-by: zhong.zhu <[email protected]>
---
 src/assembly/pom.xml                               | 16 +++++
 .../src/main/resources/kylin-defaults0.properties  | 71 +++++++++++++++++++---
 .../kylin/job/execution/NSparkExecutableTest.java  |  4 +-
 .../test_case_data/localmeta/kylin.properties      |  8 +++
 .../test_case_data/sandbox/kylin.properties        |  7 +++
 .../kylin/job/execution/NSparkExecutable.java      | 44 +++++++++++---
 .../scala/org/apache/spark/sql/KylinSession.scala  | 26 ++++++--
 7 files changed, 152 insertions(+), 24 deletions(-)

diff --git a/src/assembly/pom.xml b/src/assembly/pom.xml
index a4f5fb7d03..41997bff66 100644
--- a/src/assembly/pom.xml
+++ b/src/assembly/pom.xml
@@ -56,6 +56,16 @@
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-spark</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>delta-core_2.12</artifactId>
+                    <groupId>io.delta</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>delta-storage</artifactId>
+                    <groupId>io.delta</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
@@ -80,6 +90,12 @@
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-sparder</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>delta-core_2.12</artifactId>
+                    <groupId>io.delta</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties 
b/src/core-common/src/main/resources/kylin-defaults0.properties
index 9e8c482a57..b89043ff69 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -75,7 +75,6 @@ 
kylin.engine.spark-conf.spark.eventLog.dir=${kylin.env.hdfs-working-dir}/spark-h
 kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 
-Dhdp.version=current -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=${KYLIN_HOME}/logs
 kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current 
-Dlog4j.configurationFile=spark-appmaster-log4j.xml
 kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 
-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml 
-Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} 
-Dkap.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkap.spark.category=job -Dkap.spark.project=${job.project} 
-Dkap.spark.identifier=${job.id} -Dkap.spark.jobName=${job.stepId} 
-Duser.timezone=${user.timezone} -Dkap.spark.mountDir=${job.mountDir}
-kylin.engine.spark-conf.spark.yarn.dist.files=${kylin.log.spark-executor-properties-file},${kylin.log.spark-appmaster-properties-file}
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 
kylin.engine.spark-conf.spark.hadoop.hive.exec.scratchdir=${kylin.env.hdfs-working-dir}/hive-scratch
 
kylin.engine.spark-conf.spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true
@@ -117,11 +116,6 @@ 
kylin.engine.spark-conf.spark.sql.optimizer.dynamicPartitionPruning.enabled=fals
 # for storageV3
 
kylin.engine.spark-conf.spark.databricks.delta.legacy.allowAmbiguousPathsInCreateTable=true
 
-# gluten
-kylin.engine.spark-conf.spark.gluten.enabled=false
-kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
-kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
-
 # ==================== QUERY SPARK CONTEXT & COLUMNAR STORAGE 
====================
 kylin.storage.quota-in-giga-bytes=10240
 
@@ -198,10 +192,6 @@ 
kylin.storage.columnar.spark-conf.spark.sql.legacy.timeParserPolicy=LEGACY
 # spark3 close DPP feature
 
kylin.storage.columnar.spark-conf.spark.sql.optimizer.dynamicPartitionPruning.enabled=false
 
-# gluten
-kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
-kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
-
 # ==================== JOB SCHEDULER ====================
 
 # max job retry on error, default 0: no retry
@@ -481,3 +471,64 @@ 
kylin.streaming.spark-conf.spark.sql.hive.metastore.version=1.2.2
 
kylin.streaming.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
 kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 
-Dhdp.version=current -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=${KYLIN_HOME}/logs
 
kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} 
-Dkap.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkap.spark.category=streaming_job
+
+
+# gluten & internal table
+## for query
+kylin.storage.columnar.spark-conf.spark.gluten.enabled=true
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.libpath=${KYLIN_HOME}/server/libch.so
+kylin.storage.columnar.spark-conf.spark.gluten.sql.countDistinctWithoutExpand=true
+kylin.storage.columnar.spark-conf.spark.gluten.sql.enable.native.validation=false
+kylin.storage.columnar.spark-conf.spark.memory.offHeap.enabled=true
+kylin.storage.columnar.spark-conf.spark.memory.offHeap.size=12g
+kylin.storage.columnar.spark-conf.spark.plugins=org.apache.gluten.GlutenPlugin
+kylin.storage.columnar.spark-conf.spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager
+kylin.storage.columnar.spark-conf.spark.sql.autoBroadcastJoinThreshold=100MB
+kylin.storage.columnar.spark-conf.spark.sql.decimalOperations.allowPrecisionLoss=false
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type=hdfs_gluten
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path=/tmp/ch_metadata_kylin
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type=cache
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk=hdfs
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path=/tmp/hdfs_cache_kylin
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size=256Gi
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes=main
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes.main.disk=hdfs_cache
+kylin.storage.columnar.spark-conf.spark.sql.catalog.INTERNAL_CATALOG=org.apache.spark.sql.execution.datasources.v2.kyinternal.KyinternalCatalog
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.path=/tmp/gluten_default
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.tmp_path=/tmp/kyligence_glt/tmp_ch
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
+
+## on yarn
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.libhdfs3_conf=/etc/hadoop/conf/hdfs-site.xml
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.executor.libpath=libch.so
+kylin.storage.columnar.spark-conf.spark.executorEnv.LD_PRELOAD=$PWD/libch.so
+kylin.query.engine.sparder-additional-files=${KYLIN_HOME}/server/libch.so,${KYLIN_HOME}/server/conf/spark-executor-log4j.xml,${KYLIN_HOME}/server/conf/spark-appmaster-log4j.xml,${KYLIN_HOME}/lib/libasyncProfiler-linux-x64.so,${KYLIN_HOME}/lib/libasyncProfiler-linux-arm64.so
+kylin.storage.columnar.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.use_current_directory_as_tmp=true
+kylin.storage.columnar.spark-conf.spark.gluten.sql.executor.jar.path=${KYLIN_HOME}/lib/ext/gluten.jar
+
+## for build
+kylin.engine.spark-conf.spark.gluten.enabled=true
+kylin.engine.spark-conf.spark.gluten.sql.columnar.libpath=${KYLIN_HOME}/server/libch.so
+kylin.engine.spark-conf.spark.gluten.sql.countDistinctWithoutExpand=true
+kylin.engine.spark-conf.spark.gluten.sql.enable.native.validation=false
+kylin.engine.spark-conf.spark.memory.offHeap.enabled=true
+kylin.engine.spark-conf.spark.memory.offHeap.size=12g
+kylin.engine.spark-conf.spark.plugins=org.apache.gluten.GlutenPlugin
+kylin.engine.spark-conf.spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type=hdfs_gluten
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path=/tmp/kyligence_glt/ch_metadata
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes.main.disk=hdfs
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.hdfs_main.volumes=main
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.path=/tmp/gluten_default
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.tmp_path=/tmp/kyligence_glt/tmp_ch
+kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.columnar.pre.rules=org.apache.spark.sql.execution.gluten.ConvertKylinFileSourceToGlutenRule
+kylin.engine.spark-conf.spark.gluten.sql.columnar.extended.expressions.transformer=org.apache.spark.sql.catalyst.expressions.gluten.CustomerExpressionTransformer
+## on yarn
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.libhdfs3_conf=/etc/hadoop/conf/hdfs-site.xml
+kylin.engine.spark-conf.spark.gluten.sql.columnar.executor.libpath=libch.so
+kylin.engine.spark-conf.spark.executorEnv.LD_PRELOAD=$PWD/libch.so
+kylin.engine.spark-conf.spark.yarn.dist.files=${KYLIN_HOME}/server/libch.so,${kylin.log.spark-executor-properties-file},${kylin.log.spark-appmaster-properties-file}
+kylin.engine.spark-conf.spark.gluten.sql.columnar.backend.ch.runtime_config.use_current_directory_as_tmp=true
+kylin.engine.spark-conf.spark.gluten.sql.driver.jar.path=${KYLIN_HOME}/lib/ext/gluten.jar
+kylin.engine.spark-conf.spark.gluten.sql.executor.jar.path=${KYLIN_HOME}/lib/ext/gluten.jar
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
index 183fe9244f..45ff34e22c 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/job/execution/NSparkExecutableTest.java
@@ -154,7 +154,7 @@ public class NSparkExecutableTest extends 
NLocalFileMetadataTestCase {
             String cmd = (String) 
sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
 
             Assert.assertNotNull(cmd);
-            Assert.assertTrue(cmd.contains("spark.plugins=" + 
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
+            Assert.assertTrue(cmd.contains("spark.plugins=," + 
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
         }
 
         overwriteSystemProp("kylin.engine.spark-conf.spark.plugins",
@@ -180,7 +180,7 @@ public class NSparkExecutableTest extends 
NLocalFileMetadataTestCase {
             String cmd = (String) 
sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
 
             Assert.assertNotNull(cmd);
-            Assert.assertFalse(cmd.contains("spark.plugins=" + 
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
+            Assert.assertFalse(cmd.contains("spark.plugins=," + 
BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
         }
 
         
overwriteSystemProp("kylin.engine.spark-conf.spark.driver.extraJavaOptions",
diff --git a/src/examples/test_case_data/localmeta/kylin.properties 
b/src/examples/test_case_data/localmeta/kylin.properties
index 2eb95ecf29..090977bbf0 100755
--- a/src/examples/test_case_data/localmeta/kylin.properties
+++ b/src/examples/test_case_data/localmeta/kylin.properties
@@ -170,3 +170,11 @@ 
kylin.storage.columnar.spark-conf.spark.executor.instances=1
 kylin.storage.columnar.spark-conf.spark.executor.memory=512m
 kylin.storage.columnar.spark-conf.spark.executor.memoryOverhead=512m
 
kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=../../build/spark/hive_1_2_2/*
+
+# disable gluten
+kylin.storage.columnar.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.plugins=
+kylin.storage.columnar.spark-conf.spark.plugins=
+kylin.engine.spark-conf.spark.memory.offHeap.enabled=false
+kylin.storage.columnar.spark-conf.spark.sql.decimalOperations.allowPrecisionLoss=true
diff --git a/src/examples/test_case_data/sandbox/kylin.properties 
b/src/examples/test_case_data/sandbox/kylin.properties
index 854b2ae343..f0818905f1 100644
--- a/src/examples/test_case_data/sandbox/kylin.properties
+++ b/src/examples/test_case_data/sandbox/kylin.properties
@@ -190,3 +190,10 @@ 
kylin.streaming.spark.job-jar=../assembly/target/kylin-assembly-5.0.0-SNAPSHOT-j
 
 kylin.query.escape-default-keyword=true
 kylin.monitor.enabled=false
+
+# disable gluten
+kylin.storage.columnar.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.gluten.enabled=false
+kylin.engine.spark-conf.spark.plugins=
+kylin.storage.columnar.spark-conf.spark.plugins=
+kylin.engine.spark-conf.spark.memory.offHeap.enabled=false
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
index 07dd582b6e..73ab5ac470 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
@@ -888,12 +888,16 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         // (assembled in the kylinJobJar)
         // will be in NM container's classpath.
 
+        Set<String> sparkJars = Sets.newLinkedHashSet();
+        boolean glutenEnabled = 
Boolean.parseBoolean(sparkConf.get("spark.gluten.enabled"));
         // Cluster mode.
         if (isClusterMode(sparkConf)) {
             // On yarn cluster mode,
             // application jar (kylinJobJar here) would ln as '__app__.jar'.
-            Set<String> sparkJars = Sets.newLinkedHashSet();
             sparkJars.add(APP_JAR_NAME);
+            if (glutenEnabled) {
+                sparkJars.add("gluten.jar");
+            }
             sparkJars.addAll(getSparkJars(kylinConf, sparkConf));
             final String jointJarNames = sparkJars.stream().map(jar -> 
Paths.get(jar).getFileName().toString()).sorted()
                     .collect(Collectors.joining(COLON));
@@ -903,12 +907,33 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         }
 
         // Client mode.
-        Set<String> sparkJars = getSparkJars(kylinConf, sparkConf);
-        sparkConf.put(DRIVER_EXTRA_CLASSPATH,
-                sparkJars.stream().sorted(Comparator.comparing(jar -> 
jar.substring(jar.lastIndexOf(PATH_DELIMITER))))
-                        .collect(Collectors.joining(COLON)));
-        sparkConf.put(EXECUTOR_EXTRA_CLASSPATH, sparkJars.stream().map(jar -> 
Paths.get(jar).getFileName().toString())
-                .sorted().collect(Collectors.joining(COLON)));
+        sparkJars.addAll(getSparkJars(kylinConf, sparkConf));
+        if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
+            if (glutenEnabled) {
+                
sparkJars.add(sparkConf.get("spark.gluten.sql.driver.jar.path"));
+            }
+            sparkConf.put(DRIVER_EXTRA_CLASSPATH,
+                sparkJars.stream().sorted(
+                    Comparator.comparing(jar -> 
jar.substring(jar.lastIndexOf(PATH_DELIMITER))))
+                    .collect(Collectors.joining(COLON)));
+            sparkConf.put(EXECUTOR_EXTRA_CLASSPATH,
+                sparkJars.stream().map(jar -> 
Paths.get(jar).getFileName().toString())
+                    .sorted().collect(Collectors.joining(COLON)));
+        } else {
+            String driverCp = sparkJars.stream().sorted(
+                Comparator.comparing(jar -> 
jar.substring(jar.lastIndexOf(PATH_DELIMITER))))
+                .collect(Collectors.joining(COLON));
+            String executorCp = sparkJars.stream()
+                .map(jar -> Paths.get(jar).getFileName().toString())
+                .sorted().collect(Collectors.joining(COLON));
+            if (glutenEnabled) {
+                driverCp = sparkConf.get("spark.gluten.sql.driver.jar.path") + 
COLON + driverCp;
+                executorCp = 
sparkConf.get("spark.gluten.sql.executor.jar.path")
+                    + COLON + executorCp;
+            }
+            sparkConf.put(DRIVER_EXTRA_CLASSPATH, driverCp);
+            sparkConf.put(EXECUTOR_EXTRA_CLASSPATH, executorCp);
+        }
     }
 
     private void rewriteDriverLog4jConf(StringBuilder sb, KylinConfig config, 
Map<String, String> sparkConf) {
@@ -959,6 +984,11 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         filePaths.add(sparkConf.get(SPARK_FILES_1));
         filePaths.add(sparkConf.get(SPARK_FILES_2));
 
+        if (sparkConf.get(SPARK_MASTER).startsWith("yarn")
+                && 
Boolean.parseBoolean(sparkConf.get("spark.gluten.enabled"))) {
+            filePaths.add(sparkConf.get("spark.gluten.sql.driver.jar.path"));
+        }
+
         LinkedHashSet<String> sparkFiles = filePaths.stream() //
                 .filter(StringUtils::isNotEmpty) //
                 .flatMap(p -> Arrays.stream(StringUtils.split(p, COMMA))) //
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
index eb79cee6a7..10520c603b 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -115,7 +115,10 @@ class KylinSession(
 object KylinSession extends Logging {
   val NORMAL_FAIR_SCHEDULER_FILE_NAME: String = "/fairscheduler.xml"
   val QUERY_LIMIT_FAIR_SCHEDULER_FILE_NAME: String = 
"/query-limit-fair-scheduler.xml"
+  val SPARK_MASTER = "spark.master"
   val SPARK_PLUGINS_KEY = "spark.plugins"
+  val SPARK_YARN_DIST_FILE = "spark.yarn.dist.files"
+  val SPARK_EXECUTOR_JAR_PATH = "spark.gluten.sql.executor.jar.path"
 
   implicit class KylinBuilder(builder: Builder) {
     var queryCluster: Boolean = true
@@ -249,7 +252,7 @@ object KylinSession extends Logging {
       }
 
       // the length of the `podNamePrefix` needs to be less than or equal to 47
-      sparkConf.get("spark.master") match {
+      sparkConf.get(SPARK_MASTER) match {
         case v if v.startsWith("k8s") =>
           val appName = sparkConf.get("spark.app.name", 
System.getenv("HOSTNAME"))
           val podNamePrefix = generateExecutorPodNamePrefixForK8s(appName)
@@ -292,7 +295,7 @@ object KylinSession extends Logging {
       }
 
       if (!"true".equalsIgnoreCase(System.getProperty("spark.local"))) {
-        if (sparkConf.get("spark.master").startsWith("yarn")) {
+        if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
           // TODO Less elegant implementation.
           val applicationJar = 
KylinConfig.getInstanceFromEnv.getKylinJobJarPath
           val yarnDistJarsConf = "spark.yarn.dist.jars"
@@ -302,14 +305,14 @@ object KylinSession extends Logging {
             applicationJar
           }
           sparkConf.set(yarnDistJarsConf, distJars)
-          sparkConf.set("spark.yarn.dist.files", kapConfig.sparderFiles())
+          sparkConf.set(SPARK_YARN_DIST_FILE, kapConfig.sparderFiles())
         } else {
           sparkConf.set("spark.jars", kapConfig.sparderJars)
           sparkConf.set("spark.files", kapConfig.sparderFiles())
         }
 
         // spark on k8s with client mode, set the spark.driver.host = local ip
-        if (sparkConf.get("spark.master").startsWith("k8s") && 
"client".equals(sparkConf.get("spark.submit.deployMode", "client"))) {
+        if (sparkConf.get(SPARK_MASTER).startsWith("k8s") && 
"client".equals(sparkConf.get("spark.submit.deployMode", "client"))) {
           if (!sparkConf.contains("spark.driver.host")) {
             sparkConf.set("spark.driver.host", 
AddressUtil.getLocalHostExactAddress)
           }
@@ -317,7 +320,20 @@ object KylinSession extends Logging {
 
         var extraJars = 
Paths.get(KylinConfig.getInstanceFromEnv.getKylinJobJarPath).getFileName.toString
         if (KylinConfig.getInstanceFromEnv.queryUseGlutenEnabled) {
-          extraJars = "gluten.jar:" + extraJars
+          if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
+            val distFiles = sparkConf.get(SPARK_YARN_DIST_FILE)
+            if (distFiles.isEmpty) {
+              sparkConf.set(SPARK_YARN_DIST_FILE,
+                sparkConf.get(SPARK_EXECUTOR_JAR_PATH))
+            } else {
+              sparkConf.set(SPARK_YARN_DIST_FILE,
+                sparkConf.get(SPARK_EXECUTOR_JAR_PATH) + "," + distFiles)
+            }
+            extraJars = "gluten.jar" + File.pathSeparator + extraJars
+          } else {
+            extraJars = sparkConf.get(SPARK_EXECUTOR_JAR_PATH) +
+              File.pathSeparator + extraJars
+          }
         }
         sparkConf.set("spark.executor.extraClassPath", extraJars)
 

Reply via email to