This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 959edeb KYLIN-4829 Support to use thread-level SparkSession to
execute query
959edeb is described below
commit 959edebdb262afd63308ba32031abcf889670245
Author: Zhichao Zhang <[email protected]>
AuthorDate: Mon Nov 30 17:16:29 2020 +0800
KYLIN-4829 Support to use thread-level SparkSession to execute query
---
kylin-spark-project/kylin-spark-common/pom.xml | 2 +-
.../datasource/ResetShufflePartition.scala | 13 +-
.../org/apache/spark/utils/SparderUtils.scala} | 34 ++---
kylin-spark-project/kylin-spark-engine/pom.xml | 2 +-
.../scala/org/apache/spark/sql/KylinSparkEnv.scala | 75 +----------
kylin-spark-project/kylin-spark-query/pom.xml | 5 +
.../kylin/query/monitor/SparderContextCanary.java | 10 +-
.../kylin/query/pushdown/SparkSubmitter.java | 1 +
.../scala/org/apache/kylin/query/UdfManager.scala | 31 ++++-
.../kylin/query/pushdown/SparkSqlClient.scala | 7 +-
.../kylin/query/runtime/plans/ResultPlan.scala | 15 ++-
.../scala/org/apache/spark/sql/KylinSession.scala | 2 +-
.../org/apache/spark/sql/SparderContext.scala | 51 ++++----
.../apache/spark/sql/SparderContextFacade.scala} | 36 +++---
.../query/monitor/SparderContextCanaryTest.java | 13 +-
.../apache/spark/sql/SparderContextFacadeTest.java | 143 +++++++++++++++++++++
kylin-spark-project/kylin-spark-test/pom.xml | 6 +-
17 files changed, 280 insertions(+), 166 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-common/pom.xml
b/kylin-spark-project/kylin-spark-common/pom.xml
index 9309647..44be54b 100644
--- a/kylin-spark-project/kylin-spark-common/pom.xml
+++ b/kylin-spark-project/kylin-spark-common/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-metadata</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
</dependencies>
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index aaed1d9..b048283 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -17,25 +17,26 @@
*/
package org.apache.spark.sql.execution.datasource
-import org.apache.kylin.common.{KylinConfig, QueryContext, QueryContextFacade}
+import org.apache.kylin.common.{KylinConfig, QueryContextFacade}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
+import org.apache.spark.utils.SparderUtils
trait ResetShufflePartition extends Logging {
+ val PARTITION_SPLIT_BYTES: Long =
KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 //
64MB
def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
- val defaultParallelism = sparkSession.sparkContext.defaultParallelism
+ val defaultParallelism =
SparderUtils.getTotalCore(sparkSession.sparkContext.getConf)
val kylinConfig = KylinConfig.getInstanceFromEnv
val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) {
kylinConfig.getSparkSqlShufflePartitions
} else {
- Math.min(QueryContextFacade.current().getSourceScanBytes / (
- KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 *
1024 * 2) + 1,
+ Math.min(QueryContextFacade.current().getSourceScanBytes /
PARTITION_SPLIT_BYTES + 1,
defaultParallelism).toInt
}
-
//sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
- // partitionsNum.toString)
+ // when hitting cube, this will override the value of
'spark.sql.shuffle.partitions'
+ sparkSession.conf.set("spark.sql.shuffle.partitions",
partitionsNum.toString)
logInfo(s"Set partition to $partitionsNum, total bytes
${QueryContextFacade.current().getSourceScanBytes}")
}
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/SparderUtils.scala
similarity index 51%
copy from
kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
copy to
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/SparderUtils.scala
index 664e6be..fdc83cc 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/SparderUtils.scala
@@ -16,25 +16,27 @@
* limitations under the License.
*/
-package org.apache.kylin.query.pushdown;
+package org.apache.spark.utils
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.spark.metadata.cube.StructField;
-import org.apache.spark.sql.SparderContext;
-import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.spark.SparkConf
-import java.util.List;
-import java.util.UUID;
+object SparderUtils {
-public class SparkSubmitter {
- public static final Logger logger =
LoggerFactory.getLogger(SparkSubmitter.class);
-
- public static PushdownResponse submitPushDownTask(String sql) {
- SparkSession ss = SparderContext.getSparkSession();
- Pair<List<List<String>>, List<StructField>> pair =
SparkSqlClient.executeSql(ss, sql, UUID.randomUUID());
- return new PushdownResponse(pair.getSecond(), pair.getFirst());
+ def getTotalCore(sparkConf: SparkConf): Int = {
+ if (sparkConf.get("spark.master").startsWith("local")) {
+ return 1
}
+ val instances = getExecutorNum(sparkConf)
+ val cores = sparkConf.get("spark.executor.cores").toInt
+ Math.max(instances * cores, 1)
+ }
+ def getExecutorNum(sparkConf: SparkConf): Int = {
+ if (sparkConf.get("spark.dynamicAllocation.enabled", "false").toBoolean) {
+ val maxExecutors = sparkConf.get("spark.dynamicAllocation.maxExecutors",
Int.MaxValue.toString).toInt
+ maxExecutors
+ } else {
+ sparkConf.get("spark.executor.instances").toInt
+ }
+ }
}
diff --git a/kylin-spark-project/kylin-spark-engine/pom.xml
b/kylin-spark-project/kylin-spark-engine/pom.xml
index 3632a4c..9954afe 100644
--- a/kylin-spark-project/kylin-spark-engine/pom.xml
+++ b/kylin-spark-project/kylin-spark-engine/pom.xml
@@ -40,7 +40,7 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-metadata</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/KylinSparkEnv.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/KylinSparkEnv.scala
index bb39dc5..3ef6104 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/KylinSparkEnv.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/KylinSparkEnv.scala
@@ -25,19 +25,11 @@ object KylinSparkEnv extends Logging {
@volatile
private var spark: SparkSession = _
- val _cuboid = new ThreadLocal[Dataset[Row]]
val _needCompute = new ThreadLocal[JBoolean] {
override protected def initialValue = false
}
- @volatile
- private var initializingThread: Thread = null
-
def getSparkSession: SparkSession = withClassLoad {
- if (spark == null || spark.sparkContext.isStopped) {
- logInfo("Init spark.")
- initSpark()
- }
spark
}
@@ -45,76 +37,15 @@ object KylinSparkEnv extends Logging {
spark = sparkSession
}
- def init(): Unit = withClassLoad {
- getSparkSession
- }
-
def withClassLoad[T](body: => T): T = {
- // val originClassLoad =
Thread.currentThread().getContextClassLoader
+ // val originClassLoad =
Thread.currentThread().getContextClassLoader
// fixme aron
- //
Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
+ //
Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
val t = body
- //
Thread.currentThread().setContextClassLoader(originClassLoad)
+ // Thread.currentThread().setContextClassLoader(originClassLoad)
t
}
- def initSpark(): Unit = withClassLoad {
- this.synchronized {
- if (initializingThread == null && (spark == null ||
spark.sparkContext.isStopped)) {
- initializingThread = new Thread(new Runnable {
- override def run(): Unit = {
- try {
- val sparkSession =
System.getProperty("spark.local") match {
- case "true" =>
-
SparkSession.builder
-
.master("local")
-
.appName("sparder-test-sql-context")
-
.enableHiveSupport()
-
.getOrCreate()
- case _ =>
-
SparkSession.builder
-
.appName("sparder-sql-context")
-
.master("yarn-client")
-
//if user defined other master in kylin.properties,
-
// it will get overwrite later in
org.apache.spark.sql.KylinSession.KylinBuilder.initSparkConf
-
.enableHiveSupport()
-
.getOrCreate()
- }
- spark = sparkSession
- logInfo("Spark context
started successfully with stack trace:")
-
logInfo(Thread.currentThread().getStackTrace.mkString("\n"))
- logInfo(
- "Class loader:
" + Thread
-
.currentThread()
-
.getContextClassLoader
-
.toString)
- } catch {
- case throwable:
Throwable =>
- logError("Error
for initializing spark ", throwable)
- } finally {
- logInfo("Setting
initializing Spark thread to null.")
- initializingThread =
null
- }
- }
- })
-
- logInfo("Initializing Spark thread starting.")
- initializingThread.start()
- }
-
- if (initializingThread != null) {
- logInfo("Initializing Spark, waiting for done.")
- initializingThread.join()
- }
- }
- }
-
- def getCuboid: Dataset[Row] = _cuboid.get
-
- def setCuboid(cuboid: Dataset[Row]): Unit = {
- _cuboid.set(cuboid)
- }
-
def skipCompute(): Unit = {
_needCompute.set(true)
}
diff --git a/kylin-spark-project/kylin-spark-query/pom.xml
b/kylin-spark-project/kylin-spark-query/pom.xml
index 59493eb..2e6f7e4 100644
--- a/kylin-spark-project/kylin-spark-query/pom.xml
+++ b/kylin-spark-project/kylin-spark-query/pom.xml
@@ -106,6 +106,11 @@
<artifactId>kylin-core-storage</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ </dependency>
+
<!--For update spark job info from cluster-->
<dependency>
<groupId>org.apache.kylin</groupId>
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
index d0950c1..4066574 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
+++
b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
@@ -21,7 +21,6 @@ package org.apache.kylin.query.monitor;
import org.apache.kylin.common.KylinConfig;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.KylinSparkEnv;
import org.apache.spark.sql.SparderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +86,8 @@ public class SparderContextCanary {
errorAccumulated = Math.max(errorAccumulated + 1,
THRESHOLD_TO_RESTART_SPARK);
} else {
try {
- JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(SparderContext.getSparkSession().sparkContext());
+ JavaSparkContext jsc =
+
JavaSparkContext.fromSparkContext(SparderContext.getOriginalSparkSession().sparkContext());
jsc.setLocalProperty("spark.scheduler.pool", "vip_tasks");
long t = System.currentTimeMillis();
@@ -118,11 +118,7 @@ public class SparderContextCanary {
try {
// Take repair action if error accumulated exceeds
threshold
logger.warn("Repairing sparder context");
- if ("true".equals(System.getProperty("spark.local"))) {
-
SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
- } else {
- SparderContext.restartSpark();
- }
+ SparderContext.restartSpark();
} catch (Throwable th) {
logger.error("Restart sparder context failed.", th);
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
index 664e6be..2d31822 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++
b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
@@ -34,6 +34,7 @@ public class SparkSubmitter {
public static PushdownResponse submitPushDownTask(String sql) {
SparkSession ss = SparderContext.getSparkSession();
Pair<List<List<String>>, List<StructField>> pair =
SparkSqlClient.executeSql(ss, sql, UUID.randomUUID());
+ SparderContext.closeThreadSparkSession();
return new PushdownResponse(pair.getSecond(), pair.getFirst());
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/UdfManager.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/UdfManager.scala
index 4044e58..18c01c3 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/UdfManager.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/UdfManager.scala
@@ -24,15 +24,18 @@ import java.util.concurrent.atomic.AtomicReference
import org.apache.kylin.shaded.com.google.common.cache.{Cache, CacheBuilder,
RemovalListener, RemovalNotification}
import org.apache.kylin.metadata.datatype.DataType
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{FunctionEntity, KylinFunctions, SparkSession}
+import org.apache.spark.sql.{FunctionEntity, KylinFunctions,
SparderContextFacade, SparkSession}
import org.apache.spark.sql.catalyst.expressions.SparderAggFun
class UdfManager(sparkSession: SparkSession) extends Logging {
private var udfCache: Cache[String, String] = _
- KylinFunctions.builtin.foreach { case FunctionEntity(name, info, builder) =>
- sparkSession.sessionState.functionRegistry.registerFunction(name, info,
builder)
+ private def registerBuiltInFunc(): Unit = {
+ KylinFunctions.builtin.foreach { case FunctionEntity(name, info, builder)
=>
+ sparkSession.sessionState.functionRegistry.registerFunction(name, info,
builder)
+ }
}
+
udfCache = CacheBuilder.newBuilder
.maximumSize(100)
.expireAfterWrite(1, TimeUnit.HOURS)
@@ -74,14 +77,34 @@ object UdfManager {
private val defaultSparkSession: AtomicReference[SparkSession] =
new AtomicReference[SparkSession]
+ /**
+ * create UdfManager for original SparkSession
+ */
def create(sparkSession: SparkSession): Unit = {
val manager = new UdfManager(sparkSession)
+ manager.registerBuiltInFunc
defaultManager.set(manager)
defaultSparkSession.set(sparkSession)
}
- def register(dataType: DataType, func: String): String = {
+ /**
+ * register for original SparkSession
+ */
+ def registerForOriginal(dataType: DataType, func: String): String = {
defaultManager.get().doRegister(dataType, func)
}
+ /**
+ * create UdfManager for thread local SparkSession
+ */
+ def createWithoutBuildInFunc(sparkSession: SparkSession): UdfManager = {
+ new UdfManager(sparkSession)
+ }
+
+ /**
+ * register for thread local SparkSession
+ */
+ def register(dataType: DataType, func: String): String = {
+ SparderContextFacade.current().getSecond.doRegister(dataType, func)
+ }
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 5ce4f8f..0d8b769 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -64,9 +64,9 @@ object SparkSqlClient {
val paths =
ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan)
val sourceTableSize =
ResourceDetectUtils.getResourceSize(paths: _*) + "b"
val partitions = Math.max(1,
JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString
-
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
- // partitions)
- logger.info(s"Auto set
spark.sql.shuffle.partitions $partitions")
+
df.sparkSession.conf.set("spark.sql.shuffle.partitions", partitions)
+ logger.info(s"Auto set
spark.sql.shuffle.partitions to $partitions, the total sources " +
+ s"size is ${sourceTableSize}")
} catch {
case e: Throwable =>
logger.error("Auto set
spark.sql.shuffle.partitions failed.", e)
@@ -103,7 +103,6 @@ object SparkSqlClient {
}
else throw e
} finally {
-
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
null)
HadoopUtil.setCurrentConfiguration(null)
}
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index 65ae2b0..d840207 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparderContext}
import org.apache.spark.sql.hive.utils.QueryMetricUtils
import org.apache.spark.sql.utils.SparkTypeUtil
+import org.apache.spark.utils.SparderUtils
import scala.collection.JavaConverters._
@@ -41,7 +42,6 @@ object ResultType extends Enumeration {
}
object ResultPlan extends Logging {
- val PARTITION_SPLIT_BYTES: Long =
KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 //
64MB
def collectEnumerable(
df: DataFrame,
@@ -70,17 +70,18 @@ object ResultPlan extends Logging {
kylinConfig =
ProjectManager.getInstance(kylinConfig).getProject(projectName).getConfig
}
var pool = "heavy_tasks"
+ val sparderTotalCores =
SparderUtils.getTotalCore(df.sparkSession.sparkContext.getConf)
+ // this value of partition num only effects when querying from snapshot
tables
val partitionsNum =
if (kylinConfig.getSparkSqlShufflePartitions != -1) {
kylinConfig.getSparkSqlShufflePartitions
} else {
- Math.min(QueryContextFacade.current().getSourceScanBytes /
PARTITION_SPLIT_BYTES + 1,
- SparderContext.getTotalCore).toInt
+ sparderTotalCores
}
if (QueryContextFacade.current().isHighPriorityQuery) {
pool = "vip_tasks"
- } else if (partitionsNum <= SparderContext.getTotalCore) {
+ } else if (partitionsNum <= sparderTotalCores) {
pool = "lightweight_tasks"
}
@@ -96,8 +97,8 @@ object ResultPlan extends Logging {
QueryContextFacade.current().setSparkPool(pool)
val queryId = QueryContextFacade.current().getQueryId
sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY,
queryId)
-
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
- // partitionsNum.toString)
+ df.sparkSession.conf.set("spark.sql.shuffle.partitions",
partitionsNum.toString)
+ logInfo(s"Set partition to $partitionsNum")
QueryContextFacade.current().setDataset(df)
sparkContext.setJobGroup(jobGroup,
@@ -153,7 +154,6 @@ object ResultPlan extends Logging {
val r = body
// remember clear local properties.
df.sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", null)
-
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
null)
SparderContext.setDF(df)
TableScanPlan.cacheDf.get().clear()
HadoopUtil.setCurrentConfiguration(null)
@@ -178,6 +178,7 @@ object ResultPlan extends Logging {
}
}
SparderContext.cleanQueryInfo()
+ SparderContext.closeThreadSparkSession()
result
}
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
index 7ae0937..e3c532e 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -23,7 +23,7 @@ import java.nio.file.Paths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.query.{UdfManager}
+import org.apache.kylin.query.UdfManager
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 638a9ac..ba4c7b7 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -50,7 +50,7 @@ object SparderContext extends Logging {
@volatile
var master_app_url: String = _
- def getSparkSession: SparkSession = withClassLoad {
+ def getOriginalSparkSession: SparkSession = withClassLoad {
if (spark == null || spark.sparkContext.isStopped) {
logInfo("Init spark.")
initSpark()
@@ -58,6 +58,16 @@ object SparderContext extends Logging {
spark
}
+ def getSparkSession: SparkSession = {
+ logInfo(s"Current thread ${Thread.currentThread().getId} create a
SparkSession.")
+ SparderContextFacade.current().getFirst
+ }
+
+ def closeThreadSparkSession(): Unit = {
+ logInfo(s"Remove SparkSession from thread ${Thread.currentThread().getId}")
+ SparderContextFacade.remove()
+ }
+
def setSparkSession(sparkSession: SparkSession): Unit = {
spark = sparkSession
UdfManager.create(sparkSession)
@@ -93,35 +103,26 @@ object SparderContext extends Logging {
}
}
+ def stopSpark(): Unit = withClassLoad {
+ this.synchronized {
+ if (spark != null && !spark.sparkContext.isStopped) {
+ Utils.tryWithSafeFinally {
+ spark.stop()
+ } {
+ SparkContext.clearActiveContext
+ }
+ }
+ }
+ }
+
def init(): Unit = withClassLoad {
- getSparkSession
+ getOriginalSparkSession
}
def getSparkConf(key: String): String = {
getSparkSession.sparkContext.conf.get(key)
}
- def getTotalCore: Int = {
- val sparkConf = getSparkSession.sparkContext.getConf
- if (sparkConf.get("spark.master").startsWith("local")) {
- return 1
- }
- val instances = getExecutorNum(sparkConf)
- val cores = sparkConf.get("spark.executor.cores").toInt
- Math.max(instances * cores, 1)
- }
-
- def getExecutorNum(sparkConf: SparkConf): Int = {
- if (sparkConf.get("spark.dynamicAllocation.enabled", "false").toBoolean) {
- val maxExecutors = sparkConf.get("spark.dynamicAllocation.maxExecutors",
Int.MaxValue.toString).toInt
- logInfo(s"Use spark.dynamicAllocation.maxExecutors:$maxExecutors as num
instances of executors.")
- maxExecutors
- } else {
- sparkConf.get("spark.executor.instances").toInt
- }
- }
-
-
def initSpark(): Unit = withClassLoad {
this.synchronized {
if (initializingThread == null && (spark == null ||
spark.sparkContext.isStopped)) {
@@ -240,10 +241,10 @@ object SparderContext extends Logging {
* @return The body return
*/
def withClassLoad[T](body: => T): T = {
- // val originClassLoad = Thread.currentThread().getContextClassLoader
+ // val originClassLoad = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
val t = body
- // Thread.currentThread().setContextClassLoader(originClassLoad)
+ // Thread.currentThread().setContextClassLoader(originClassLoad)
t
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala
similarity index 50%
copy from
kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
copy to
kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala
index 664e6be..386e9fc 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala
@@ -16,25 +16,29 @@
* limitations under the License.
*/
-package org.apache.kylin.query.pushdown;
+package org.apache.spark.sql
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.spark.metadata.cube.StructField;
-import org.apache.spark.sql.SparderContext;
-import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.spark.internal.Logging
-import java.util.List;
-import java.util.UUID;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal
+import org.apache.kylin.common.util.Pair
+import org.apache.kylin.query.UdfManager
-public class SparkSubmitter {
- public static final Logger logger =
LoggerFactory.getLogger(SparkSubmitter.class);
+object SparderContextFacade extends Logging {
- public static PushdownResponse submitPushDownTask(String sql) {
- SparkSession ss = SparderContext.getSparkSession();
- Pair<List<List<String>>, List<StructField>> pair =
SparkSqlClient.executeSql(ss, sql, UUID.randomUUID());
- return new PushdownResponse(pair.getSecond(), pair.getFirst());
+ final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession,
UdfManager]] =
+ new InternalThreadLocal[Pair[SparkSession, UdfManager]]()
+
+ def current(): Pair[SparkSession, UdfManager] = {
+ if (CURRENT_SPARKSESSION.get() == null) {
+ val spark = SparderContext.getOriginalSparkSession.cloneSession()
+ CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
+ UdfManager.createWithoutBuildInFunc(spark)))
}
+ CURRENT_SPARKSESSION.get()
+ }
-}
+ def remove(): Unit = {
+ CURRENT_SPARKSESSION.remove()
+ }
+}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
index 7a22892..a27264a 100644
---
a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
+++
b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
@@ -22,7 +22,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.TempMetadataBuilder;
import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.spark.sql.KylinSparkEnv;
import org.apache.spark.sql.SparderContext;
import org.junit.After;
import org.junit.Assert;
@@ -34,11 +33,19 @@ public class SparderContextCanaryTest extends
LocalWithSparkSessionTest {
@Before
public void setup() throws SchedulerException {
super.setup();
- SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
+ KylinConfig conf = KylinConfig.getInstanceFromEnv();
+ // the default value of kylin.query.spark-conf.spark.master is yarn,
+ // which will read from kylin-defaults.properties
+ conf.setProperty("kylin.query.spark-conf.spark.master", "local");
+ // create a new SparkSession of Sparder
+ SparderContext.initSpark();
}
@After
public void after() {
+ SparderContext.stopSpark();
+ KylinConfig.getInstanceFromEnv()
+ .setProperty("kylin.query.spark-conf.spark.master", "yarn");
super.after();
}
@@ -49,7 +56,7 @@ public class SparderContextCanaryTest extends
LocalWithSparkSessionTest {
Assert.assertTrue(SparderContext.isSparkAvailable());
// stop sparder and check again, the sparder context should
auto-restart
- SparderContext.getSparkSession().stop();
+ SparderContext.getOriginalSparkSession().stop();
Assert.assertFalse(SparderContext.isSparkAvailable());
SparderContextCanary.monitor();
diff --git
a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/spark/sql/SparderContextFacadeTest.java
b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/spark/sql/SparderContextFacadeTest.java
new file mode 100644
index 0000000..9d6b606
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/spark/sql/SparderContextFacadeTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
+import org.apache.kylin.job.exception.SchedulerException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparderContextFacadeTest extends LocalWithSparkSessionTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SparderContextFacadeTest.class);
+ private static final Integer TEST_SIZE = 16 * 1024 * 1024;
+
+ @Override
+ @Before
+ public void setup() throws SchedulerException {
+ super.setup();
+ KylinConfig conf = KylinConfig.getInstanceFromEnv();
+ // the default value of kylin.query.spark-conf.spark.master is yarn,
+ // which will read from kylin-defaults.properties
+ conf.setProperty("kylin.query.spark-conf.spark.master", "local");
+ // Init Sparder
+ SparderContext.getOriginalSparkSession();
+ }
+
+ @After
+ public void after() {
+ SparderContext.stopSpark();
+ KylinConfig.getInstanceFromEnv()
+ .setProperty("kylin.query.spark-conf.spark.master", "yarn");
+ super.after();
+ }
+
+ @Test
+ public void testThreadSparkSession() throws InterruptedException,
ExecutionException {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 1,
+ TimeUnit.DAYS, new LinkedBlockingQueue<>(5));
+
+ // test the thread local SparkSession
+ CompletionService<Throwable> service =
runThreadSparkSessionTest(executor, false);
+
+ for (int i = 1; i <= 5; i++) {
+ Assert.assertNull(service.take().get());
+ }
+
+ // test the original SparkSession, it must throw errors.
+ service = runThreadSparkSessionTest(executor, true);
+ boolean hasError = false;
+ for (int i = 1; i <= 5; i++) {
+ if (service.take().get() != null) {
+ hasError = true;
+ }
+ }
+ Assert.assertTrue(hasError);
+
+ executor.shutdown();
+ }
+
+ protected CompletionService<Throwable>
runThreadSparkSessionTest(ThreadPoolExecutor executor,
+ boolean isOriginal) {
+ List<TestCallable> tasks = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ tasks.add(new TestCallable(String.valueOf(TEST_SIZE * i),
String.valueOf(i), isOriginal));
+ }
+
+ CompletionService<Throwable> service = new
ExecutorCompletionService<>(executor);
+ for (TestCallable task : tasks) {
+ service.submit(task);
+ }
+ return service;
+ }
+
+ class TestCallable implements Callable<Throwable> {
+
+ private String maxPartitionBytes = null;
+ private String shufflePartitions = null;
+ private boolean isOriginal = false;
+
+ TestCallable(String maxPartitionBytes, String shufflePartitions,
boolean isOriginal) {
+ this.maxPartitionBytes = maxPartitionBytes;
+ this.shufflePartitions = shufflePartitions;
+ this.isOriginal = isOriginal;
+ }
+
+ @Override
+ public Throwable call() throws Exception {
+ try {
+ SparkSession ss = null;
+ if (!this.isOriginal) {
+ ss = SparderContext.getSparkSession();
+ } else {
+ ss = SparderContext.getOriginalSparkSession();
+ }
+ ss.conf().set("spark.sql.files.maxPartitionBytes",
this.maxPartitionBytes);
+ ss.conf().set("spark.sql.shuffle.partitions",
this.shufflePartitions);
+
+ Thread.sleep((new Random()).nextInt(2) * 1000L);
+ Assert.assertEquals(this.maxPartitionBytes,
+ ss.conf().get("spark.sql.files.maxPartitionBytes"));
+ Assert.assertEquals(this.shufflePartitions,
+ ss.conf().get("spark.sql.shuffle.partitions"));
+ } catch (Throwable th) {
+ logger.error("Test thread local SparkSession error: ", th);
+ return th;
+ }
+ logger.info("Test thread local SparkSession successfully: {}");
+ return null;
+ }
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-test/pom.xml
b/kylin-spark-project/kylin-spark-test/pom.xml
index a3fb20f..4221983 100644
--- a/kylin-spark-project/kylin-spark-test/pom.xml
+++ b/kylin-spark-project/kylin-spark-test/pom.xml
@@ -47,12 +47,12 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-engine</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-query</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
@@ -142,7 +142,7 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-query</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>