This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new ed241cd KYLIN-4925 Use Spark 3.1 for Kylin 4.0 (#1601)
ed241cd is described below
commit ed241cd753efd40720d205d90e0ea54c46b0f9ee
Author: Congling Xia <[email protected]>
AuthorDate: Sat Jun 26 23:32:49 2021 +0800
KYLIN-4925 Use Spark 3.1 for Kylin 4.0 (#1601)
---
.travis.yml | 3 +-
assembly/pom.xml | 2 +-
engine-flink/pom.xml | 4 +-
engine-spark/pom.xml | 6 +-
kylin-it/pom.xml | 8 +-
kylin-spark-project/kylin-spark-common/pom.xml | 60 ++++-
.../spark/common/util/KylinDateTimeUtils.scala | 8 +-
.../spark/dict/NGlobalDictBuilderAssist.scala | 2 +-
.../org/apache/spark/sql/KylinFunctions.scala | 2 +-
.../catalyst/expressions/KylinExpresssions.scala | 8 +-
.../catalyst/expressions/TimestampAddImpl.scala | 6 +-
.../catalyst/expressions/TimestampDiffImpl.scala | 11 +-
.../sql/execution/datasource/FilePruner.scala | 34 ++-
.../execution/datasource/KylinSourceStrategy.scala | 29 ++-
.../org/apache/spark/utils/KylinReflectUtils.scala | 24 +-
.../spark/monitor/MonitorExecutorExtension.scala | 0
.../sql/catalyst/expressions/ExpressionUtils.scala | 10 +-
.../sql/execution/KylinFileSourceScanExec.scala | 9 +-
.../spark/sql/execution/KylinJoinSelection.scala | 0
.../sql/execution/datasource/FilterExt.scala} | 22 +-
.../spark/sql/hive/utils/QueryMetricUtils.scala | 1 +
.../spark/monitor/MonitorExecutorExtension.scala | 8 +-
.../sql/catalyst/expressions/ExpressionUtils.scala | 12 +-
.../sql/execution/KylinFileSourceScanExec.scala | 55 +++--
.../spark/sql/execution/KylinJoinSelection.scala | 249 +++++++++++++++++++++
.../spark/sql/hive/utils/QueryMetricUtils.scala | 52 +++--
kylin-spark-project/kylin-spark-engine/pom.xml | 32 ++-
.../spark/SparkBatchCubingEngineParquet.java | 2 +-
.../kylin/engine/spark/job/NSparkCubingJob.java | 2 +-
.../kylin/engine/spark/job/NSparkCubingUtil.java | 2 +-
.../kylin/cluster/ClusterInfoFetcherFactory.scala | 2 +-
.../spark/builder/CubeDictionaryBuilder.scala | 2 +-
.../kylin/engine/spark/job/CuboidAggregator.scala | 18 +-
.../kylin/query/runtime/ExpressionConverter.scala | 5 +-
.../apache/spark/sql/udf/TimestampAddImpl.scala | 5 +-
.../apache/spark/sql/udf/TimestampDiffImpl.scala | 11 +-
.../org/apache/spark/util/KylinReflectUtils.scala | 58 -----
.../engine/spark/LocalWithSparkSessionTest.java | 2 +-
.../kylin/engine/spark/job/JobStepFactoryTest.java | 2 +-
.../kylin/engine/spark/job/SparkCubingJobTest.java | 2 +-
.../engine/spark/builder/TestCreateFlatTable.scala | 27 ++-
kylin-spark-project/kylin-spark-metadata/pom.xml | 38 +++-
.../org/apache/spark/sql/utils/SparkTypeUtil.scala | 17 +-
.../engine/spark/cross/CrossDateTimeUtils.scala | 52 +++++
.../engine/spark/cross/CrossDateTimeUtils.scala | 55 +++++
kylin-spark-project/kylin-spark-query/pom.xml | 77 ++++---
.../kylin/query/runtime/ExpressionConverter.scala | 6 +-
.../kylin/query/runtime/SparderRexVisitor.scala | 6 +-
.../sql/hive/KylinHiveSessionStateBuilder.scala | 0
.../sql/hive/KylinHiveSessionStateBuilder.scala | 14 +-
kylin-spark-project/kylin-spark-test/pom.xml | 10 +-
.../kylin/engine/spark2/NBuildAndQueryTest.java | 2 +-
.../spark2/NManualBuildAndQueryCuboidTest.java | 2 +-
kylin-spark-project/pom.xml | 12 +-
metastore-hbase/pom.xml | 2 +-
metrics-reporter-kafka/pom.xml | 2 +-
parquet-assembly/pom.xml | 8 +
pom.xml | 77 ++++++-
server-base/pom.xml | 4 +-
.../apache/kylin/rest/service/QueryService.java | 2 +-
server/pom.xml | 8 +-
source-kafka/pom.xml | 2 +-
storage-hbase/pom.xml | 2 +-
stream-receiver/pom.xml | 2 +-
stream-source-kafka/pom.xml | 2 +-
65 files changed, 904 insertions(+), 295 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 3dcb586..46236bc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -46,7 +46,8 @@ before_script:
script:
# mvn clean org.jacoco:jacoco-maven-plugin:prepare-agent test
coveralls:report -e
# Skip coveralls temporarily, fix it asap
- - mvn clean test
+ - mvn clean test -q
+ - mvn clean test -q -Psandbox -Pspark3
- if [[ -n "${TRAVIS_PULL_REQUEST_SLUG}" && "${TRAVIS_PULL_REQUEST_SLUG}" !=
"${TRAVIS_REPO_SLUG}" ]]; then
echo "The pull request from ${TRAVIS_PULL_REQUEST_SLUG} is an EXTERNAL
pull request. Skip sonar analysis.";
else
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 041b2c2..2035e84 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -169,7 +169,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml
index b10f3e5..2a99fe3 100644
--- a/engine-flink/pom.xml
+++ b/engine-flink/pom.xml
@@ -60,13 +60,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.11</artifactId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
</dependency>
<!-- Hadoop dependency -->
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index cc1d9d6..af6486c 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -46,19 +46,19 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index b7986ab..56a3dfb 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -272,7 +272,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
@@ -292,7 +292,7 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
@@ -305,7 +305,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
@@ -318,7 +318,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
diff --git a/kylin-spark-project/kylin-spark-common/pom.xml
b/kylin-spark-project/kylin-spark-common/pom.xml
index 44be54b..b027793 100644
--- a/kylin-spark-project/kylin-spark-common/pom.xml
+++ b/kylin-spark-project/kylin-spark-common/pom.xml
@@ -17,7 +17,8 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>Apache Kylin 4.X - Common</name>
@@ -41,6 +42,7 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-common</artifactId>
<type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -48,6 +50,60 @@
<version>${project.version}</version>
</dependency>
</dependencies>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/${spark.version.dir}</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile-version-dependent-source</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <sourceDir>${spark.version.dir}</sourceDir>
+ </configuration>
+ </execution>
+ <execution>
+ <id>compile-common-scala-source</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <sourceDir>scala</sourceDir>
+ </configuration>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
index a5a6451..c03c1d0 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
@@ -19,7 +19,7 @@
package org.apache.kylin.engine.spark.common.util
import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
object KylinDateTimeUtils {
val MICROS_PER_MILLIS: Long = 1000L
@@ -34,7 +34,7 @@ object KylinDateTimeUtils {
def addMonths(timestamp: Long, m: Int): Long = {
// spark ts unit is microsecond
val ms = timestamp / 1000
- val day0 = DateTimeUtils.millisToDays(ms)
+ val day0 = CrossDateTimeUtils.millisToDays(ms)
val millis = ms - day0 * MILLIS_PER_DAY
val x = dateAddMonths(day0, m)
(x * MILLIS_PER_DAY + millis) * 1000
@@ -63,9 +63,9 @@ object KylinDateTimeUtils {
def subtractMonths(t0: Long, t1: Long): Int = {
val millis0 = floorMod(t0, MILLIS_PER_DAY)
- val d0 = DateTimeUtils.millisToDays(t0)
+ val d0 = CrossDateTimeUtils.millisToDays(t0)
val millis1 = floorMod(t1, MILLIS_PER_DAY)
- val d1 = DateTimeUtils.millisToDays(t1)
+ val d1 = CrossDateTimeUtils.millisToDays(t1)
var x = dateSubtractMonths(d0, d1)
val d2 = dateAddMonths(d1, x)
if (x > 0 && d2 == d0 && millis0 < millis1) x -= 1
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
index dbac8b8..3ce9bca 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
@@ -55,7 +55,7 @@ object NGlobalDictBuilderAssist extends Logging {
existsDictDs
.repartition(bucketPartitionSize,
col(existsDictDs.schema.head.name).cast(StringType))
.foreachPartition {
- iter =>
+ iter: Iterator[(String, Long)] =>
val partitionID = TaskContext.get().partitionId()
logInfo(s"Rebuild partition dict col: ${ref.identity}, partitionId:
$partitionID")
val d = broadcastDict.value
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 22cfb0d..8e112c1 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -51,7 +51,7 @@ object KylinFunctions {
case _ => Column(Literal(literal))
}
- def k_like(left: Column, right: Column): Column = Column(Like(left.expr,
right.expr))
+ def k_like(left: Column, right: Column): Column = Column(new Like(left.expr,
right.expr))
def in(value: Expression, list: Seq[Expression]): Column = Column(In(value,
list))
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index 331309a..a1a45fa 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -302,11 +302,11 @@ case class DictEncode(left: Expression, mid: Expression,
right: Expression) exte
val globalDictClass = classOf[NGlobalDictionary].getName
val bucketDictClass = classOf[NBucketDictionary].getName
- val globalDictTerm = ctx.addMutableState(globalDictClass,
s"${mid.simpleString.replace("[", "").replace("]", "")}_globalDict")
- val bucketDictTerm = ctx.addMutableState(bucketDictClass,
s"${mid.simpleString.replace("[", "").replace("]", "")}_bucketDict")
+ val globalDictTerm = ctx.addMutableState(globalDictClass,
s"${ExpressionUtils.simpleString(mid).replace("[", "").replace("]",
"")}_globalDict")
+ val bucketDictTerm = ctx.addMutableState(bucketDictClass,
s"${ExpressionUtils.simpleString(mid).replace("[", "").replace("]",
"")}_bucketDict")
- val dictParamsTerm = mid.simpleString
- val bucketSizeTerm = right.simpleString.toInt
+ val dictParamsTerm = ExpressionUtils.simpleString(mid)
+ val bucketSizeTerm = ExpressionUtils.simpleString(right).toInt
val initBucketDictFuncName =
ctx.addNewFunction(s"init${bucketDictTerm.replace("[", "").replace("]",
"")}BucketDict",
s"""
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
index 55f683c..e1a4d35 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
@@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
-import java.util.{Calendar, Locale, TimeZone}
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import java.util.{Calendar, Locale, TimeZone}
object TimestampAddImpl {
private val localCalendar = new ThreadLocal[Calendar] {
@@ -34,7 +34,7 @@ object TimestampAddImpl {
calendar.clear()
addTime("DAY", time, calendar)
addTime(unit, increment, calendar)
- DateTimeUtils.millisToDays(calendar.getTimeInMillis)
+ CrossDateTimeUtils.millisToDays(calendar.getTimeInMillis)
}
// add long on DateType
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
index e85abe0..cbf42cf 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
@@ -19,17 +19,16 @@
package org.apache.spark.sql.catalyst.expressions
import java.util.Locale
-
import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
object TimestampDiffImpl {
// TimestampType -> DateType
def evaluate(unit: String, timestamp: Long, date: Int): Long = {
val before = timestamp / MICROS_PER_MILLIS
- val after = DateTimeUtils.daysToMillis(date)
+ val after = CrossDateTimeUtils.daysToMillis(date)
convertDuration(unit, before, after)
}
@@ -42,14 +41,14 @@ object TimestampDiffImpl {
// DateType -> DateType
def evaluate(unit: String, date1: Int, date2: Int): Long = {
- val before = DateTimeUtils.daysToMillis(date1)
- val after = DateTimeUtils.daysToMillis(date2)
+ val before = CrossDateTimeUtils.daysToMillis(date1)
+ val after = CrossDateTimeUtils.daysToMillis(date2)
convertDuration(unit, before, after)
}
// DateType -> TimestampType
def evaluate(unit: String, date: Int, timestamp: Long): Long = {
- val before = DateTimeUtils.daysToMillis(date)
+ val before = CrossDateTimeUtils.daysToMillis(date)
val after = timestamp / MICROS_PER_MILLIS
convertDuration(unit, before, after)
}
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 91faab4..2784170 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -19,7 +19,6 @@
package org.apache.spark.sql.execution.datasource
import java.sql.{Date, Timestamp}
-
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.kylin.common.util.DateFormat
import org.apache.kylin.cube.cuboid.Cuboid
@@ -29,7 +28,7 @@ import
org.apache.kylin.engine.spark.metadata.MetadataConverter
import org.apache.kylin.metadata.model.{PartitionDesc, SegmentStatusEnum}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
EmptyRow, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
EmptyRow, Expression, ExpressionUtils, Literal}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
@@ -346,7 +345,7 @@ class FilePruner(cubeInstance: CubeInstance,
segDirs
} else {
val translatedFilter = filters.map(filter => convertCastFilter(filter))
- .flatMap(DataSourceStrategy.translateFilter)
+ .flatMap(ExpressionUtils.translateFilter)
if (translatedFilter.isEmpty) {
logInfo("Can not use filters to prune segments.")
segDirs
@@ -357,8 +356,8 @@ class FilePruner(cubeInstance: CubeInstance,
val tsRange = cubeInstance.getSegment(e.segmentName,
SegmentStatusEnum.READY).getTSRange
SegFilters(tsRange.startValue, tsRange.endValue, pattern)
.foldFilter(reducedFilter) match {
- case Trivial(true) => true
- case Trivial(false) => false
+ case AlwaysTrue => true
+ case AlwaysFalse => false
}
}
}
@@ -555,22 +554,20 @@ case class SegFilters(start: Long, end: Long, pattern:
String) extends Logging {
}
case And(left: Filter, right: Filter) =>
And(foldFilter(left), foldFilter(right)) match {
- case And(Trivial(false), _) => Trivial(false)
- case And(_, Trivial(false)) => Trivial(false)
- case And(Trivial(true), right) => right
- case And(left, Trivial(true)) => left
+ case And(AlwaysFalse, _) => Trivial(false)
+ case And(_, AlwaysFalse) => Trivial(false)
+ case And(AlwaysTrue, right) => right
+ case And(left, AlwaysTrue) => left
case other => other
}
case Or(left: Filter, right: Filter) =>
Or(foldFilter(left), foldFilter(right)) match {
- case Or(Trivial(true), _) => Trivial(true)
- case Or(_, Trivial(true)) => Trivial(true)
- case Or(Trivial(false), right) => right
- case Or(left, Trivial(false)) => left
+ case Or(AlwaysTrue, _) => Trivial(true)
+ case Or(_, AlwaysTrue) => Trivial(true)
+ case Or(AlwaysFalse, right) => right
+ case Or(left, AlwaysFalse) => left
case other => other
}
- case trivial: Trivial =>
- trivial
case unsupportedFilter =>
// return 'true' to scan all partitions
// currently unsupported filters are:
@@ -581,8 +578,7 @@ case class SegFilters(start: Long, end: Long, pattern:
String) extends Logging {
Trivial(true)
}
}
-}
-
-case class Trivial(value: Boolean) extends Filter {
- override def references: Array[String] = findReferences(value)
+ def Trivial(value: Boolean): Filter = {
+ if (value) AlwaysTrue else AlwaysFalse
+ }
}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
index 9e9a29c..0713b7c 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
@@ -17,14 +17,16 @@
*/
package org.apache.spark.sql.execution.datasource
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.{Strategy, execution}
import org.apache.spark.sql.execution.{KylinFileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.utils.KylinReflectUtils
/**
* A strategy for planning scans over collections of files that might be
partitioned or bucketed
@@ -119,15 +121,34 @@ object KylinSourceStrategy extends Strategy with Logging {
val outputAttributes = readDataColumns ++ partitionColumns
// to trigger setShufflePartitions
filePruner.listFiles(partitionKeyFilters.iterator.toSeq,
dataFilters.iterator.toSeq)
- val scan =
- new KylinFileSourceScanExec(
+ val className = "org.apache.spark.sql.execution.KylinFileSourceScanExec"
+ val (scan: KylinFileSourceScanExec, ignored: Class[_]) = if
(SPARK_VERSION.startsWith("2.4")) {
+ KylinReflectUtils.createObject(
+ className,
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
filePruner.getShardSpec,
dataFilters,
- table.map(_.identifier))
+ table.map(_.identifier)
+ )
+ } else if (SPARK_VERSION.startsWith("3.1")) {
+ KylinReflectUtils.createObject(
+ className,
+ fsRelation,
+ outputAttributes,
+ outputSchema,
+ partitionKeyFilters.toSeq,
+ filePruner.getShardSpec,
+ None,
+ dataFilters,
+ table.map(_.identifier),
+ java.lang.Boolean.TRUE
+ )
+ } else {
+ throw new UnsupportedOperationException(s"Spark version
${SPARK_VERSION} is not supported.")
+ }
val afterScanFilter =
afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_,
scan)).getOrElse(scan)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
index 783eeb4..28b3a6e 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
@@ -27,19 +27,21 @@ object KylinReflectUtils {
private val rm = universe.runtimeMirror(getClass.getClassLoader)
def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any =
{
- if (SPARK_VERSION.startsWith("2.4")) {
- var className: String =
- "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
- if (!"hive".equals(sparkContext.getConf
- .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
- className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
- }
- val tuple = createObject(className, kylinSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
+ var className: String =
+ "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
+ if (!"hive".equals(sparkContext.getConf
+ .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
+ className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
+ }
+
+ val (instance, clazz) = if (SPARK_VERSION.startsWith("2.4")) {
+ createObject(className, kylinSession, None)
+ } else if (SPARK_VERSION.startsWith("3.1")) {
+ createObject(className, kylinSession, None, Map.empty)
} else {
- throw new UnsupportedOperationException("Spark version not supported")
+ throw new UnsupportedOperationException(s"Spark version ${SPARK_VERSION}
not supported")
}
+ clazz.getMethod("build").invoke(instance)
}
def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/monitor/MonitorExecutorExtension.scala
similarity index 100%
copy from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
copy to
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/monitor/MonitorExecutorExtension.scala
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
similarity index 91%
copy from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
copy to
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index d9bdcc5..232f6cc 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -21,7 +21,9 @@ package org.apache.spark.sql.catalyst.expressions
import scala.util.{Failure, Success, Try}
import scala.reflect.ClassTag
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FunctionBuilder,
expressions}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
object
ExpressionUtils {
@@ -78,6 +80,12 @@ ExpressionUtils {
(name, (expressionInfo[T](name), builder))
}
+ def simpleString(expression: Expression): String = expression.simpleString
+
+ def translateFilter(expression: Expression): Option[Filter] = {
+ DataSourceStrategy.translateFilter(expression)
+ }
+
private def expressionInfo[T <: Expression : ClassTag](name: String):
ExpressionInfo = {
val clazz = scala.reflect.classTag[T].runtimeClass
val df = clazz.getAnnotation(classOf[ExpressionDescription])
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
similarity index 98%
copy from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
copy to
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 90ff597..0fbb39d 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.execution
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path}
import org.apache.kylin.common.KylinConfig
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Expression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Expression, ExpressionUtils, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
@@ -57,7 +57,7 @@ class KylinFileSourceScanExec(
ret
}
- private lazy val inputRDD: RDD[InternalRow] = {
+ private lazy val _inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -77,11 +77,12 @@ class KylinFileSourceScanExec(
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
- inputRDD :: Nil
+ _inputRDD :: Nil
}
@transient
- private val pushedDownFilters =
dataFilters.flatMap(DataSourceStrategy.translateFilter)
+ private val pushedDownFilters = dataFilters
+ .flatMap(ExpressionUtils.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinJoinSelection.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
similarity index 100%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinJoinSelection.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala
similarity index 68%
copy from
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
copy to
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala
index 9f709f4..d9ca8ab 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala
@@ -14,17 +14,23 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-package org.apache.kylin.cluster
+package org.apache.spark.sql.execution.datasource
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.util.KylinReflectUtils
+import org.apache.spark.sql.sources.Filter
+case class AlwaysTrue() extends Filter {
+ override def references: Array[String] = Array.empty
+}
+
+object AlwaysTrue extends AlwaysTrue {
+}
-object ClusterInfoFetcherFactory {
- def create(kylinConfig: KylinConfig): ClusterInfoFetcher = {
-
KylinReflectUtils.createObject(kylinConfig.getClusterInfoFetcherClassName)._1.asInstanceOf[ClusterInfoFetcher]
- }
+case class AlwaysFalse() extends Filter {
+ override def references: Array[String] = Array.empty
}
+
+object AlwaysFalse extends AlwaysFalse {
+}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
similarity index 99%
copy from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
copy to
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index c928e84..cabe9c6 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -42,6 +42,7 @@ object QueryMetricUtils extends Logging {
// There is only 'numOutputRows' metric in HiveTableScanExec
(exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l)
}
+
val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1))
.filter(_ >= 0L).toList.asJava
val scanFiles = metrics.map(metrics =>
java.lang.Long.valueOf(metrics._2))
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala
similarity index 90%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala
index d291faf..984fe45 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala
@@ -18,10 +18,12 @@
package org.apache.spark.memory
+import java.util
+
+import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
-import org.apache.spark.util.RpcUtils
-import org.apache.spark.{ExecutorPlugin, SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv}
class MonitorExecutorExtension extends ExecutorPlugin with Logging {
@@ -31,7 +33,7 @@ class MonitorExecutorExtension extends ExecutorPlugin with
Logging {
val sparkConf: SparkConf = env.conf
- override def init(): Unit = {
+ override def init(pluginContext: PluginContext, extraConf: util.Map[String,
String]): Unit = {
initMonitorEnv()
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
similarity index 91%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index d9bdcc5..8405493 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -22,6 +22,8 @@ import scala.util.{Failure, Success, Try}
import scala.reflect.ClassTag
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
object
ExpressionUtils {
@@ -78,6 +80,12 @@ ExpressionUtils {
(name, (expressionInfo[T](name), builder))
}
+ def simpleString(expr: Expression): String = expr.simpleString(1)
+
+ def translateFilter(expr: Expression): Option[Filter] ={
+ DataSourceStrategy.translateFilter(expr, supportNestedPredicatePushdown =
true)
+ }
+
private def expressionInfo[T <: Expression : ClassTag](name: String):
ExpressionInfo = {
val clazz = scala.reflect.classTag[T].runtimeClass
val df = clazz.getAnnotation(classOf[ExpressionDescription])
@@ -91,7 +99,9 @@ ExpressionUtils {
df.arguments(),
df.examples(),
df.note(),
- df.since())
+ df.group(),
+ df.since(),
+ df.deprecated())
} else {
// This exists for the backward compatibility with old
`ExpressionDescription`s defining
// the extended description in `extended()`.
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
similarity index 86%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 90ff597..957944b 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -21,43 +21,55 @@ package org.apache.spark.sql.execution
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path}
import org.apache.kylin.common.KylinConfig
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Expression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Expression, ExpressionUtils, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec}
import org.apache.spark.sql.types.StructType
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.TimeUnit.NANOSECONDS
+import scala.collection.mutable.{ArrayBuffer, HashMap}
class KylinFileSourceScanExec(
@transient override val relation: HadoopFsRelation,
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
- val optionalShardSpec: Option[ShardSpec],
+ optionalShardSpec: Option[ShardSpec],
+ ignoredNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
- override val tableIdentifier: Option[TableIdentifier]) extends
FileSourceScanExec(
- relation, output, requiredSchema, partitionFilters, None, dataFilters,
tableIdentifier) {
+ override val tableIdentifier: Option[TableIdentifier],
+ ignoredDisableBucketedScan: Boolean = true) extends FileSourceScanExec(
+ relation, output, requiredSchema, partitionFilters, None, None, dataFilters,
tableIdentifier, true) {
+
+ private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
- @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+ private def sendDriverMetrics(): Unit = {
+ driverMetrics.foreach(e => metrics(e._1).add(e._2))
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+ }
+
+ @transient lazy val _selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs =
relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
- val timeTakenMs = ((System.nanoTime() - startTime) +
optimizerMetadataTimeNs) / 1000 / 1000
-
- metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
- metrics("metadataTime").add(timeTakenMs)
- val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
- metrics("numFiles") :: metrics("metadataTime") :: Nil)
+ driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
+ driverMetrics("filesSize") = ret.map(_.files.map(_.getLen).sum).sum
+ if (relation.partitionSchemaOption.isDefined) {
+ driverMetrics("numPartitions") = ret.length
+ }
+ val timeTakenMs = NANOSECONDS.toMillis((System.nanoTime() - startTime) +
optimizerMetadataTimeNs)
+ driverMetrics("metadataTime") = timeTakenMs
ret
}
- private lazy val inputRDD: RDD[InternalRow] = {
+ private lazy val _inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -68,20 +80,23 @@ class KylinFileSourceScanExec(
options = relation.options,
hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
- optionalShardSpec match {
+ val readRDD = optionalShardSpec match {
case Some(spec) if
KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled =>
- createShardingReadRDD(spec, readFile, selectedPartitions, relation)
+ createShardingReadRDD(spec, readFile, _selectedPartitions, relation)
case _ =>
- createNonShardingReadRDD(readFile, selectedPartitions, relation)
+ createNonShardingReadRDD(readFile, _selectedPartitions, relation)
}
+ sendDriverMetrics()
+ readRDD
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
- inputRDD :: Nil
+ _inputRDD :: Nil
}
@transient
- private val pushedDownFilters =
dataFilters.flatMap(DataSourceStrategy.translateFilter)
+ private val pushedDownFilters = dataFilters
+ .flatMap(ExpressionUtils.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
new file mode 100644
index 0000000..243ffd6
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.execution
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
JoinSelectionHelper}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys,
ExtractSingleColumnNullAwareAntiJoin}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.{SparkSession, Strategy}
+
+import javax.annotation.concurrent.GuardedBy
+
+/**
+ * .
+ */
+case class KylinJoinSelection(session: SparkSession) extends Strategy
+ with JoinSelectionHelper
+ with PredicateHelper
+ with Logging {
+
+ val conf: SQLConf = session.sessionState.conf
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+
+ // If it is an equi-join, we first look at the join hints w.r.t. the
following order:
+ // 1. broadcast hint: pick broadcast hash join if the join type is
supported. If both sides
+ // have the broadcast hints, choose the smaller side (based on stats)
to broadcast.
+ // 2. sort merge hint: pick sort merge join if join keys are sortable.
+ // 3. shuffle hash hint: We pick shuffle hash join if the join type is
supported. If both
+ // sides have the shuffle hash hints, choose the smaller side (based
on stats) as the
+ // build side.
+ // 4. shuffle replicate NL hint: pick cartesian product if join type is
inner like.
+ //
+ // If there is no hint or the hints are not applicable, we follow these
rules one by one:
+ // 1. Pick broadcast hash join if one side is small enough to broadcast,
and the join type
+ // is supported. If both sides are small, choose the smaller side
(based on stats)
+ // to broadcast.
+ // 2. Pick shuffle hash join if one side is small enough to build local
hash map, and is
+ // much smaller than the other side, and
`spark.sql.join.preferSortMergeJoin` is false.
+ // 3. Pick sort merge join if the join keys are sortable.
+ // 4. Pick cartesian product if join type is inner like.
+ // 5. Pick broadcast nested loop join as the final solution. It may OOM
but we don't have
+ // other choice.
+ case j@ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond,
left, right, hint) =>
+ def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
+ getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint,
conf).map {
+ buildSide =>
+ Seq(joins.BroadcastHashJoinExec(
+ leftKeys,
+ rightKeys,
+ joinType,
+ buildSide,
+ nonEquiCond,
+ planLater(left),
+ planLater(right)))
+ }
+ }
+
+ def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
+ getShuffleHashJoinBuildSide(left, right, joinType, hint,
onlyLookingAtHint, conf).map {
+ buildSide =>
+ Seq(joins.ShuffledHashJoinExec(
+ leftKeys,
+ rightKeys,
+ joinType,
+ buildSide,
+ nonEquiCond,
+ planLater(left),
+ planLater(right)))
+ }
+ }
+
+ def createSortMergeJoin() = {
+ if (RowOrdering.isOrderable(leftKeys)) {
+ Some(Seq(joins.SortMergeJoinExec(
+ leftKeys, rightKeys, joinType, nonEquiCond, planLater(left),
planLater(right))))
+ } else {
+ None
+ }
+ }
+
+ def createCartesianProduct() = {
+ if (joinType.isInstanceOf[InnerLike]) {
+ // `CartesianProductExec` can't implicitly evaluate equal join
condition, here we should
+ // pass the original condition which includes both equal and
non-equal conditions.
+ Some(Seq(joins.CartesianProductExec(planLater(left),
planLater(right), j.condition)))
+ } else {
+ None
+ }
+ }
+
+ def createJoinWithoutHint() = {
+ createBroadcastHashJoin(false)
+ .orElse {
+ if (!conf.preferSortMergeJoin) {
+ createShuffleHashJoin(false)
+ } else {
+ None
+ }
+ }
+ .orElse(createSortMergeJoin())
+ .orElse(createCartesianProduct())
+ .getOrElse {
+ // This join could be very slow or OOM
+ val buildSide = getSmallerSide(left, right)
+ Seq(joins.BroadcastNestedLoopJoinExec(
+ planLater(left), planLater(right), buildSide, joinType,
nonEquiCond))
+ }
+ }
+
+ createBroadcastHashJoin(true)
+ .orElse {
+ if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None
+ }
+ .orElse(createShuffleHashJoin(true))
+ .orElse {
+ if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else
None
+ }
+ .getOrElse(createJoinWithoutHint())
+
+ case j@ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) =>
+ Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti,
BuildRight,
+ None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin =
true))
+
+ // If it is not an equi-join, we first look at the join hints w.r.t. the
following order:
+ // 1. broadcast hint: pick broadcast nested loop join. If both sides
have the broadcast
+ // hints, choose the smaller side (based on stats) to broadcast for
inner and full joins,
+ // choose the left side for right join, and choose right side for
left join.
+ // 2. shuffle replicate NL hint: pick cartesian product if join type is
inner like.
+ //
+ // If there is no hint or the hints are not applicable, we follow these
rules one by one:
+ // 1. Pick broadcast nested loop join if one side is small enough to
broadcast. If only left
+ // side is broadcast-able and it's left join, or only right side is
broadcast-able and
+ // it's right join, we skip this rule. If both sides are small,
broadcasts the smaller
+ // side for inner and full joins, broadcasts the left side for right
join, and broadcasts
+ // right side for left join.
+ // 2. Pick cartesian product if join type is inner like.
+ // 3. Pick broadcast nested loop join as the final solution. It may OOM
but we don't have
+ // other choice. It broadcasts the smaller side for inner and full
joins, broadcasts the
+ // left side for right join, and broadcasts right side for left join.
+ case logical.Join(left, right, joinType, condition, hint) =>
+ val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType
== FullOuter) {
+ getSmallerSide(left, right)
+ } else {
+ // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to
broadcast left side if
+ // it's a right join, and broadcast right side if it's a left join.
+ // TODO: revisit it. If left side is much smaller than the right side,
it may be better
+ // to broadcast the left side even if it's a left join.
+ if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight
+ }
+
+ def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
+ val maybeBuildSide = if (buildLeft && buildRight) {
+ Some(desiredBuildSide)
+ } else if (buildLeft) {
+ Some(BuildLeft)
+ } else if (buildRight) {
+ Some(BuildRight)
+ } else {
+ None
+ }
+
+ maybeBuildSide.map { buildSide =>
+ Seq(joins.BroadcastNestedLoopJoinExec(
+ planLater(left), planLater(right), buildSide, joinType, condition))
+ }
+ }
+
+ def createCartesianProduct() = {
+ if (joinType.isInstanceOf[InnerLike]) {
+ Some(Seq(joins.CartesianProductExec(planLater(left),
planLater(right), condition)))
+ } else {
+ None
+ }
+ }
+
+ def createJoinWithoutHint() = {
+ createBroadcastNLJoin(canBroadcastBySize(left, conf),
canBroadcastBySize(right, conf))
+ .orElse(createCartesianProduct())
+ .getOrElse {
+ // This join could be very slow or OOM
+ Seq(joins.BroadcastNestedLoopJoinExec(
+ planLater(left), planLater(right), desiredBuildSide, joinType,
condition))
+ }
+ }
+
+ createBroadcastNLJoin(hintToBroadcastLeft(hint),
hintToBroadcastRight(hint))
+ .orElse {
+ if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else
None
+ }
+ .getOrElse(createJoinWithoutHint())
+
+ // --- Cases where this strategy does not apply
---------------------------------------------
+ case _ => Nil
+ }
+
+ override def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean =
{
+ val size = plan.stats.sizeInBytes
+ size >= 0 && size <= conf.autoBroadcastJoinThreshold &&
JoinMemoryManager.acquireMemory(size.toLong)
+ }
+}
+
+object JoinMemoryManager extends Logging {
+
+ @GuardedBy("this")
+ private[this] var memoryUsed: Long = 0
+
+ def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized {
+ assert(numBytesToAcquire >= 0)
+ val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed)
+ if (enoughMemory) {
+ memoryUsed += numBytesToAcquire
+ logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used
$memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.")
+ } else {
+ logInfo("Driver memory is not enough for BHJ.")
+ }
+ enoughMemory
+ }
+
+ private def maxMemoryJoinCanUse: Long = {
+ val joinMemoryFraction =
KylinConfig.getInstanceFromEnv.getJoinMemoryFraction
+ (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong
+ }
+
+ def releaseAllMemory(): Unit = synchronized {
+ memoryUsed = 0
+ }
+
+}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
similarity index 61%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index c928e84..0fc917f 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -19,29 +19,22 @@
package org.apache.spark.sql.hive.utils
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
ShuffleQueryStageExec}
import org.apache.spark.sql.execution.{FileSourceScanExec,
KylinFileSourceScanExec, SparkPlan}
import org.apache.spark.sql.hive.execution.HiveTableScanExec
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.seqAsJavaListConverter
object QueryMetricUtils extends Logging {
+
def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long],
java.util.List[java.lang.Long],
- java.util.List[java.lang.Long], java.util.List[java.lang.Long],
java.util.List[java.lang.Long]) = {
+ java.util.List[java.lang.Long], java.util.List[java.lang.Long],
java.util.List[java.lang.Long]) = {
try {
val metrics = plan.collect {
- case exec: KylinFileSourceScanExec =>
- //(exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("readBytes").value)
- (exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("numFiles").value,
- exec.metrics.apply("metadataTime").value,
exec.metrics.apply("scanTime").value, -1l)
- case exec: FileSourceScanExec =>
- //(exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("readBytes").value)
- (exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("numFiles").value,
- exec.metrics.apply("metadataTime").value,
exec.metrics.apply("scanTime").value, -1l)
- case exec: HiveTableScanExec =>
- //(exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("readBytes").value)
- // There is only 'numOutputRows' metric in HiveTableScanExec
- (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l)
+ case exec: AdaptiveSparkPlanExec =>
metricLine(recursiveGetSparkPlan(exec.executedPlan))
+ case exec: SparkPlan => metricLine(exec)
}
+
val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1))
.filter(_ >= 0L).toList.asJava
val scanFiles = metrics.map(metrics =>
java.lang.Long.valueOf(metrics._2))
@@ -58,7 +51,36 @@ object QueryMetricUtils extends Logging {
case throwable: Throwable =>
logWarning("Error occurred when collect query scan metrics.",
throwable)
(List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava,
List.empty[java.lang.Long].asJava,
- List.empty[java.lang.Long].asJava,
List.empty[java.lang.Long].asJava)
+ List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava)
+ }
+ }
+
+ private def metricLine(exec: SparkPlan) = {
+ (
+ exec.metrics.get("numOutputRows").map(_.value).getOrElse(-1L),
+ exec.metrics.get("numFiles").map(_.value).getOrElse(-1L),
+ exec.metrics.get("metadataTime").map(_.value).getOrElse(-1L),
+ exec.metrics.get("scanTime").map(_.value).getOrElse(-1L),
+ exec.metrics.get("filesSize").map(_.value).getOrElse(-1L)
+ )
+ }
+
+ private def recursiveGetSparkPlan(sparkPlan: SparkPlan): SparkPlan = {
+ sparkPlan match {
+ case exec: ShuffleQueryStageExec =>
+ recursiveGetSparkPlan(exec.plan)
+ case exec: KylinFileSourceScanExec =>
+ exec
+ case exec: FileSourceScanExec =>
+ exec
+ case exec: HiveTableScanExec =>
+ exec
+ case _ => {
+ if (sparkPlan.children.isEmpty) {
+ return null
+ }
+ recursiveGetSparkPlan(sparkPlan.children.head)
+ }
}
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/pom.xml
b/kylin-spark-project/kylin-spark-engine/pom.xml
index 9954afe..de582b8 100644
--- a/kylin-spark-project/kylin-spark-engine/pom.xml
+++ b/kylin-spark-project/kylin-spark-engine/pom.xml
@@ -63,6 +63,21 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
@@ -79,16 +94,29 @@
</dependency>
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.scalamock</groupId>
- <artifactId>scalamock_2.11</artifactId>
+ <artifactId>scalamock_${scala.binary.version}</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
index b6f4bb8..3b353ed 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
@@ -26,7 +26,7 @@ import org.apache.kylin.engine.IBatchCubingEngine;
import org.apache.kylin.engine.spark.job.NSparkMergingJob;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.spark_project.guava.collect.Sets;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
public class SparkBatchCubingEngineParquet implements IBatchCubingEngine {
@Override
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index da00f6d..886e476 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -37,9 +37,9 @@ import
org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.spark_project.guava.base.Preconditions;
/**
*
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
index 146a1f2..2b10518 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
@@ -21,8 +21,8 @@ package org.apache.kylin.engine.spark.job;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.spark.sql.Column;
-import org.spark_project.guava.collect.Sets;
import java.util.Collection;
import java.util.LinkedHashSet;
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
index 9f709f4..092eeef 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
@@ -19,7 +19,7 @@
package org.apache.kylin.cluster
import org.apache.kylin.common.KylinConfig
-import org.apache.spark.util.KylinReflectUtils
+import org.apache.spark.utils.KylinReflectUtils
object ClusterInfoFetcherFactory {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index dea6ede..f7ec55d 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -83,7 +83,7 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
.filter(dictCol.isNotNull)
.repartition(bucketPartitionSize, dictCol)
.foreachPartition {
- iter =>
+ iter: Iterator[Row] =>
DictHelper.genDict(columnName, broadcastDict, iter)
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index 84beaf2..35ac1af 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -28,10 +28,11 @@ import org.apache.kylin.measure.bitmap.BitmapMeasureType
import org.apache.kylin.measure.hllc.HLLCMeasureType
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.functions.{col, _}
-import org.apache.spark.sql.types.{StringType, _}
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType,
DoubleType, FloatType, ShortType, StringType, _}
import org.apache.spark.sql.udaf._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -72,8 +73,19 @@ object CuboidAggregator {
val colIndex = dataSet.schema.fieldNames.zipWithIndex.map(tp =>
(tp._2, tp._1)).toMap
columns.appendAll(measure.pra.map(p =>col(p.id.toString)))
} else {
- val value = measure.pra.head.asInstanceOf[LiteralColumnDesc].value
- columns.append(new Column(Literal.create(value,
measure.pra.head.dataType)))
+ var value = measure.pra.head.asInstanceOf[LiteralColumnDesc].value
+ value = measure.pra.head.dataType match {
+ case BooleanType => value.asInstanceOf[String].toBoolean
+ case ByteType => value.asInstanceOf[String].toByte
+ case ShortType => value.asInstanceOf[String].toShort
+ case IntegerType | DateType => value.asInstanceOf[String].toInt
+ case LongType | TimestampType => value.asInstanceOf[String].toLong
+ case FloatType => value.asInstanceOf[String].toFloat
+ case DoubleType => value.asInstanceOf[String].toDouble
+ case BinaryType => value.asInstanceOf[String].toArray
+ case StringType => value.asInstanceOf[UTF8String]
+ }
+ columns.append(new Column(Literal.create(value,
measure.pra.head.dataType)))
}
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
index 49aa30f..a08affe 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
@@ -24,11 +24,11 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.SqlKind._
import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
import org.apache.kylin.query.util.UnsupportedSparkFunctionException
import org.apache.spark.sql.Column
import org.apache.spark.sql.KylinFunctions._
import org.apache.spark.sql.catalyst.expressions.{If, IfNull, StringLocate}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.utils.SparkTypeUtil
@@ -213,8 +213,7 @@ object ExpressionConverter {
// time_funcs
case "current_date" =>
k_lit(
- DateTimeUtils.dateToString(
- DateTimeUtils.millisToDays(System.currentTimeMillis())))
+ CrossDateTimeUtils.dateToString())
case "current_timestamp" =>
current_timestamp()
case "to_timestamp" =>
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
index 7413e4b..3beddcf 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
@@ -19,9 +19,8 @@
package org.apache.spark.sql.udf
import java.util.{Calendar, Locale, TimeZone}
-
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import
org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils.{MICROS_PER_MILLIS,
MONTHS_PER_QUARTER}
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
object TimestampAddImpl {
private val localCalendar = new ThreadLocal[Calendar] {
@@ -35,7 +34,7 @@ object TimestampAddImpl {
calendar.clear()
addTime("DAY", time, calendar)
addTime(unit, increment, calendar)
- DateTimeUtils.millisToDays(calendar.getTimeInMillis)
+ CrossDateTimeUtils.millisToDays(calendar.getTimeInMillis)
}
// add long on DateType
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
index f0d4d9e..f8453a2 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
@@ -19,17 +19,16 @@
package org.apache.spark.sql.udf
import java.util.Locale
-
import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
object TimestampDiffImpl {
// TimestampType -> DateType
def evaluate(unit: String, timestamp: Long, date: Int): Long = {
val before = timestamp / MICROS_PER_MILLIS
- val after = DateTimeUtils.daysToMillis(date)
+ val after = CrossDateTimeUtils.daysToMillis(date)
convertDuration(unit, before, after)
}
@@ -42,14 +41,14 @@ object TimestampDiffImpl {
// DateType -> DateType
def evaluate(unit: String, date1: Int, date2: Int): Long = {
- val before = DateTimeUtils.daysToMillis(date1)
- val after = DateTimeUtils.daysToMillis(date2)
+ val before = CrossDateTimeUtils.daysToMillis(date1)
+ val after = CrossDateTimeUtils.daysToMillis(date2)
convertDuration(unit, before, after)
}
// DateType -> TimestampType
def evaluate(unit: String, date: Int, timestamp: Long): Long = {
- val before = DateTimeUtils.daysToMillis(date)
+ val before = CrossDateTimeUtils.daysToMillis(date)
val after = timestamp / MICROS_PER_MILLIS
convertDuration(unit, before, after)
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala
deleted file mode 100644
index eef75cb..0000000
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.spark.util
-
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.{SPARK_VERSION, SparkContext}
-
-import scala.reflect.runtime.universe
-
-object KylinReflectUtils {
- private val rm = universe.runtimeMirror(getClass.getClassLoader)
-
- def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any =
{
- if (SPARK_VERSION.startsWith("2.4")) {
- var className: String =
- "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
- if (!"hive".equals(sparkContext.getConf
- .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
- className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
- }
- val tuple = createObject(className, kylinSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
- } else {
- throw new UnsupportedOperationException("Spark version not supported")
- }
- }
-
- def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
- val clazz = Utils.classForName(className)
- val ctor = clazz.getConstructors.head
- ctor.setAccessible(true)
- (ctor.newInstance(conArgs: _*), clazz)
- }
-
- def createObject(className: String): (Any, Class[_]) = {
- val clazz = Utils.classForName(className)
- val ctor = clazz.getConstructors.head
- ctor.setAccessible(true)
- (ctor.newInstance(), clazz)
- }
-}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index a0c49e8..cfd84aa 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -55,6 +55,7 @@ import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KylinSparkEnv;
@@ -70,7 +71,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Sets;
import java.io.File;
import java.io.IOException;
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
index f788d15..8bf5fb8 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
+++
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
@@ -30,9 +30,9 @@ import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
-import org.spark_project.guava.collect.Sets;
import java.io.IOException;
import java.util.Set;
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
index 0cbe4c0..91768ac 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
+++
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@@ -49,7 +50,6 @@ import org.junit.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Sets;
import java.util.HashSet;
import java.util.Map;
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index 3ecdfb5..f8dd610 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -20,9 +20,9 @@ package org.apache.kylin.engine.spark.builder
import java.text.SimpleDateFormat
import java.util.{Locale, TimeZone, UUID}
-
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.cube.{CubeInstance, CubeManager, CubeSegment}
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
import org.apache.kylin.engine.spark.job.KylinBuildEnv
import org.apache.kylin.engine.spark.metadata.MetadataConverter
import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree
@@ -30,7 +30,7 @@ import org.apache.kylin.job.engine.JobEngineConfig
import org.apache.kylin.job.impl.threadpool.DefaultScheduler
import org.apache.kylin.job.lock.MockJobLock
import org.apache.kylin.metadata.model.SegmentRange
-import org.apache.spark.InfoHelper
+import org.apache.spark.{InfoHelper, SPARK_VERSION}
import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession,
SparderBaseFunSuite}
import org.apache.spark.sql.{Dataset, Row}
import org.junit.Assert
@@ -96,17 +96,32 @@ class TestCreateFlatTable extends SparderBaseFunSuite with
SharedSparkSession wi
val afterJoin1 = generateFlatTable(seg1, cube, true)
afterJoin1.collect()
- val jobs = helper.getJobsByGroupId(groupId)
- Assert.assertEquals(jobs.length, 15)
+ if (SPARK_VERSION.startsWith("2.4")) {
+ val jobs = helper.getJobsByGroupId(groupId)
+ Assert.assertEquals(jobs.length, 15)
+ } else if (SPARK_VERSION.startsWith("3.1")) {
+ // in Spark 3.x, BroadcastExchangeExec overwrites job group ID
+ val jobs = helper.getJobsByGroupId(null)
+ Assert.assertEquals(6,
jobs.count(_.jobGroup.exists(_.endsWith(groupId))))
+ Assert.assertEquals(9,
jobs.count(_.description.exists(_.contains("broadcast exchange"))))
+ }
DefaultScheduler.destroyInstance()
}
private def checkFilterCondition(ds: Dataset[Row], seg: CubeSegment) = {
val queryExecution = ds.queryExecution.simpleString
- val startTime = dateFormat.format(seg.getTSRange.start.v)
- val endTime = dateFormat.format(seg.getTSRange.end.v)
+ var startTime = dateFormat.format(seg.getTSRange.start.v)
+ var endTime = dateFormat.format(seg.getTSRange.end.v)
//Test Filter Condition
+
+ // dates will not be converted to string by default since spark 3.0.0.
+ // see https://issues.apache.org/jira/browse/SPARK-27638 for details.
+ if (SPARK_VERSION.startsWith("3.") &&
conf.get("spark.sql.legacy.typeCoercion.datetimeToString.enabled", "false") ==
"false") {
+ startTime = CrossDateTimeUtils.stringToDate(startTime).get.toString
+ endTime = CrossDateTimeUtils.stringToDate(endTime).get.toString
+ }
+
Assert.assertTrue(queryExecution.contains(startTime))
Assert.assertTrue(queryExecution.contains(endTime))
}
diff --git a/kylin-spark-project/kylin-spark-metadata/pom.xml
b/kylin-spark-project/kylin-spark-metadata/pom.xml
index 9cdd824..71b8b4e 100644
--- a/kylin-spark-project/kylin-spark-metadata/pom.xml
+++ b/kylin-spark-project/kylin-spark-metadata/pom.xml
@@ -39,21 +39,55 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-cube</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/${spark.version.dir}</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
- <id>scala-compile-first</id>
+ <id>compile-version-dependent-source</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <sourceDir>${spark.version.dir}</sourceDir>
+ </configuration>
+ </execution>
+ <execution>
+ <id>compile-common-scala-source</id>
<phase>process-resources</phase>
<goals>
- <goal>add-source</goal>
<goal>compile</goal>
</goals>
+ <configuration>
+ <sourceDir>scala</sourceDir>
+ </configuration>
</execution>
<execution>
<id>scala-test-compile</id>
diff --git
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
index 2e189ec..486e358 100644
---
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
+++
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.utils
import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.spark.unsafe.types.UTF8String
import org.apache.kylin.common.util.DateFormat
import org.apache.spark.sql.Column
import org.apache.spark.internal.Logging
@@ -28,19 +27,23 @@ import java.math.BigDecimal
import org.apache.calcite.util.NlsString
import org.apache.calcite.rel.`type`.RelDataType
import java.sql.{Date, Timestamp, Types}
+import java.time.ZoneId
import java.util.regex.Pattern
import org.apache.spark.sql.functions.col
import org.apache.calcite.avatica.util.TimeUnitRange
import org.apache.calcite.rex.RexLiteral
-import java.util.{GregorianCalendar, Locale, TimeZone}
+import java.util.{GregorianCalendar, Locale}
import org.apache.kylin.engine.spark.metadata.FunctionDesc
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.kylin.metadata.datatype.DataType
import org.apache.spark.sql.types._
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
object SparkTypeUtil extends Logging {
+ private def defaultZoneId = ZoneId.systemDefault()
+ private def UTC = ZoneId.of("UTC")
+
val DATETIME_FAMILY = List("time", "date", "timestamp", "datetime")
def isDateTimeFamilyType(dataType: String): Boolean = {
@@ -167,9 +170,9 @@ object SparkTypeUtil extends Logging {
s.getValue
case g: GregorianCalendar =>
if (literal.getTypeName.getName.equals("DATE")) {
- new
Date(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).get
/ 1000)
+ new Date(CrossDateTimeUtils.stringToTimestamp(literal).get / 1000)
} else {
- new
Timestamp(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).get
/ 1000)
+ new Timestamp(CrossDateTimeUtils.stringToTimestamp(literal).get /
1000)
}
case range: TimeUnitRange =>
// Extract(x from y) in where clause
@@ -259,7 +262,7 @@ object SparkTypeUtil extends Logging {
val time = DateFormat.stringToDate(string).getTime
if (toCalcite) {
//current date is local timezone,
org.apache.calcite.avatica.util.AbstractCursor.DateFromNumberAccessor need to
utc
- DateTimeUtils.stringToDate(UTF8String.fromString(string)).get
+ CrossDateTimeUtils.stringToDate(string).get
} else {
// ms to s
time / 1000
@@ -277,7 +280,7 @@ object SparkTypeUtil extends Logging {
var ts = s.asInstanceOf[Timestamp].toString
if (toCalcite) {
// current ts is local timezone
,org.apache.calcite.avatica.util.AbstractCursor.TimeFromNumberAccessor need to
utc
- DateTimeUtils.stringToTimestamp(UTF8String.fromString(ts),
TimeZone.getTimeZone("UTC")).get / 1000
+ CrossDateTimeUtils.stringToTimestamp(ts, UTC).get / 1000
} else {
// ms to s
s.asInstanceOf[Timestamp].getTime / 1000
diff --git
a/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
b/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
new file mode 100644
index 0000000..54dd480
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark.cross
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp}
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.time.ZoneId
+import java.util.TimeZone
+
+object CrossDateTimeUtils {
+ def stringToTimestamp(value: Any): Option[SQLTimestamp] = {
+ DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString),
TimeZone.getDefault)
+ }
+
+ def stringToTimestamp(value: Any, zoneId: ZoneId): Option[SQLTimestamp] = {
+ DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString),
TimeZone.getTimeZone(zoneId))
+ }
+
+ def stringToDate(value: Any): Option[SQLDate] = {
+ DateTimeUtils.stringToDate(UTF8String.fromString(value.toString))
+ }
+
+ def millisToDays(millis: Long): Int = {
+ DateTimeUtils.millisToDays(millis)
+ }
+
+ def daysToMillis(days: Int): Long = {
+ DateTimeUtils.daysToMillis(days)
+ }
+
+ def dateToString(): String = {
+
DateTimeUtils.dateToString(DateTimeUtils.millisToDays(System.currentTimeMillis()))
+ }
+}
diff --git
a/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
b/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
new file mode 100644
index 0000000..2ef6b18
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark.cross
+
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.time.ZoneId
+import java.util.TimeZone
+
+object CrossDateTimeUtils {
+ val DEFAULT_TZ_ID = TimeZone.getDefault.toZoneId
+
+ def stringToTimestamp(value: Any): Option[Long] = {
+ DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString),
DEFAULT_TZ_ID)
+ }
+
+ def stringToTimestamp(value: Any, zoneId: ZoneId): Option[Long] = {
+ DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString),
zoneId)
+ }
+
+ def stringToDate(value: Any): Option[Int] = {
+ DateTimeUtils.stringToDate(UTF8String.fromString(value.toString),
DEFAULT_TZ_ID)
+ }
+
+ def millisToDays(millis: Long): Int = {
+ DateTimeUtils.microsToDays(millis * 1000, DEFAULT_TZ_ID)
+ }
+
+ def daysToMillis(days: Int): Long = {
+ DateTimeUtils.daysToMicros(days, DEFAULT_TZ_ID)
+ }
+
+ def dateToString(): String = {
+ TimestampFormatter
+ .apply("yyyy-MM-dd", DEFAULT_TZ_ID, isParsing = false)
+ .format(DateTimeUtils.currentTimestamp())
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-query/pom.xml
b/kylin-spark-project/kylin-spark-query/pom.xml
index 2e6f7e4..5a6cff0 100644
--- a/kylin-spark-project/kylin-spark-query/pom.xml
+++ b/kylin-spark-project/kylin-spark-query/pom.xml
@@ -71,7 +71,12 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>provided</scope>
</dependency>
@@ -87,16 +92,28 @@
</dependency>
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
<type>test-jar</type>
- <scope>provided</scope>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalamock</groupId>
- <artifactId>scalamock_2.11</artifactId>
+ <artifactId>scalamock_${scala.binary.version}</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
@@ -125,36 +142,20 @@
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- </configuration>
- </plugin>
- <!--<plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- </plugin>-->
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <configuration>
- <reportsDirectory>
- ${project.build.directory}/surefire-reports
- </reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>SparkTestSuite.txt</filereports>
- <stdout>I</stdout>
- </configuration>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
- <id>test</id>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
<goals>
- <goal>test</goal>
+ <goal>add-source</goal>
</goals>
+ <configuration>
+ <sources>
+ <source>src/main/${spark.version.dir}</source>
+ </sources>
+ </configuration>
</execution>
</executions>
</plugin>
@@ -163,12 +164,24 @@
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
- <id>scala-compile-first</id>
+ <id>compile-version-dependent-source</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <sourceDir>${spark.version.dir}</sourceDir>
+ </configuration>
+ </execution>
+ <execution>
+ <id>compile-common-scala-source</id>
<phase>process-resources</phase>
<goals>
- <goal>add-source</goal>
<goal>compile</goal>
</goals>
+ <configuration>
+ <sourceDir>scala</sourceDir>
+ </configuration>
</execution>
<execution>
<id>scala-test-compile</id>
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
index bad566f..764807d 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.kylin.query.exception.UnsupportedSparkFunctionException
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{If, IfNull, StringLocate}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.KylinFunctions._
import org.apache.spark.sql.utils.SparkTypeUtil
@@ -216,9 +216,7 @@ object ExpressionConverter {
callUDF("split_part", args: _*)
// time_funcs
case "current_date" =>
- k_lit(
- DateTimeUtils.dateToString(
- DateTimeUtils.millisToDays(System.currentTimeMillis())))
+ k_lit(CrossDateTimeUtils.dateToString())
case "current_timestamp" =>
current_timestamp()
case "to_timestamp" =>
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
index dcb4f14..a690f45 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
@@ -28,13 +28,13 @@ import org.apache.calcite.sql.SqlKind._
import org.apache.calcite.sql.`type`.{BasicSqlType, IntervalSqlType,
SqlTypeFamily, SqlTypeName}
import org.apache.calcite.sql.fun.SqlDatetimeSubtractionOperator
import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
import org.apache.spark.sql.KylinFunctions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DateType, LongType, TimestampType}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.utils.SparkTypeUtil
-import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
@@ -312,9 +312,9 @@ class SparderRexVisitor(
case literalSql: BasicSqlType => {
literalSql.getSqlTypeName match {
case SqlTypeName.DATE =>
- return Some(DateTimeUtils.stringToTime(literal.toString))
+ return
Some(DateTimeUtils.toJavaDate(CrossDateTimeUtils.stringToDate(literal).get))
case SqlTypeName.TIMESTAMP =>
- return
Some(DateTimeUtils.toJavaTimestamp(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).head))
+ return
Some(DateTimeUtils.toJavaTimestamp(CrossDateTimeUtils.stringToTimestamp(literal).head))
case _ =>
}
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
b/kylin-spark-project/kylin-spark-query/src/main/spark24/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
similarity index 100%
copy from
kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
copy to
kylin-spark-project/kylin-spark-query/src/main/spark24/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
b/kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
similarity index 79%
rename from
kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
rename to
kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
index 21c6372..5641723 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
@@ -28,14 +28,15 @@ import
org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionState}
* @param parentState
*/
class KylinHiveSessionStateBuilder(sparkSession: SparkSession,
- parentState: Option[SessionState] = None)
- extends HiveSessionStateBuilder(sparkSession, parentState) {
+ parentState: Option[SessionState] = None,
+ options: Map[String, String] = Map.empty)
+ extends HiveSessionStateBuilder(sparkSession, parentState, options) {
private def externalCatalog: HiveExternalCatalog =
session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
override protected def newBuilder: NewBuilder =
- new KylinHiveSessionStateBuilder(_, _)
+ new KylinHiveSessionStateBuilder(_, _, options)
}
@@ -46,10 +47,11 @@ class KylinHiveSessionStateBuilder(sparkSession:
SparkSession,
* @param parentState
*/
class KylinSessionStateBuilder(sparkSession: SparkSession,
- parentState: Option[SessionState] = None)
- extends BaseSessionStateBuilder(sparkSession, parentState) {
+ parentState: Option[SessionState] = None,
+ options: Map[String, String] = Map.empty)
+ extends BaseSessionStateBuilder(sparkSession, parentState, options) {
override protected def newBuilder: NewBuilder =
- new KylinSessionStateBuilder(_, _)
+ new KylinSessionStateBuilder(_, _, options)
}
diff --git a/kylin-spark-project/kylin-spark-test/pom.xml
b/kylin-spark-project/kylin-spark-test/pom.xml
index 4221983..6f2f101 100644
--- a/kylin-spark-project/kylin-spark-test/pom.xml
+++ b/kylin-spark-project/kylin-spark-test/pom.xml
@@ -73,7 +73,7 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.calcite</groupId>
@@ -84,7 +84,7 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.calcite</groupId>
@@ -114,6 +114,12 @@
<!-- test -->
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-spark-engine</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index b91a680..cb1b049 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark2;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.Quadruple;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -37,7 +38,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Lists;
import java.util.Map;
import java.io.File;
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
index a02672b..1bc6d11 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
@@ -30,6 +30,7 @@ import
org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@@ -47,7 +48,6 @@ import org.junit.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Sets;
import java.util.List;
import java.util.Map;
diff --git a/kylin-spark-project/pom.xml b/kylin-spark-project/pom.xml
index 3b52556..3d4911f 100644
--- a/kylin-spark-project/pom.xml
+++ b/kylin-spark-project/pom.xml
@@ -63,7 +63,7 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<artifactId>jetty-plus</artifactId>
@@ -107,25 +107,25 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-unsafe_2.11</artifactId>
+ <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.11</artifactId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
@@ -140,7 +140,7 @@
<!--Env & Test-->
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.11</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
diff --git a/metastore-hbase/pom.xml b/metastore-hbase/pom.xml
index 0945cca..209e99e 100644
--- a/metastore-hbase/pom.xml
+++ b/metastore-hbase/pom.xml
@@ -108,7 +108,7 @@
<!-- <!– Spark dependency –>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.spark</groupId>-->
-<!-- <artifactId>spark-core_2.11</artifactId>-->
+<!-- <artifactId>spark-core_${scala.binary.version}</artifactId>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
</dependencies>
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
index 1c39f08..5c549e3 100644
--- a/metrics-reporter-kafka/pom.xml
+++ b/metrics-reporter-kafka/pom.xml
@@ -39,7 +39,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
diff --git a/parquet-assembly/pom.xml b/parquet-assembly/pom.xml
index 6b34823..f2c795c 100644
--- a/parquet-assembly/pom.xml
+++ b/parquet-assembly/pom.xml
@@ -105,6 +105,14 @@
<pattern>org.roaringbitmap</pattern>
<shadedPattern>${shadeBase}.org.roaringbitmap</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+
<shadedPattern>${shadeBase}.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.codahale.metrics</pattern>
+
<shadedPattern>${shadeBase}.com.codahale.metrics</shadedPattern>
+ </relocation>
</relocations>
<filters>
<filter>
diff --git a/pom.xml b/pom.xml
index d83b0fc..6885f86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
<!-- Spark versions -->
<spark.version>2.4.7</spark.version>
+ <spark.version.dir>spark24</spark.version.dir>
<janino.version>3.0.16</janino.version>
<kryo.version>4.0.0</kryo.version>
@@ -92,6 +93,7 @@
<!-- Scala versions -->
<scala.version>2.11.8</scala.version>
+ <scala.binary.version>2.11</scala.binary.version>
<reflections.version>0.9.10</reflections.version>
@@ -519,6 +521,11 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop2.version}</version>
</dependency>
@@ -729,7 +736,8 @@
<artifactId>avatica</artifactId>
<version>${avatica.version}</version>
</dependency>
- <!-- Workaround for hive 0.14 avatica dependency -->
+ <!-- Workaround for hive 0.14 avatica dependency
+ WARNING: fasterxml jackson library may conflict
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
@@ -741,6 +749,7 @@
</exclusion>
</exclusions>
</dependency>
+ -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
@@ -775,9 +784,13 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <exclusion>
<artifactId>jetty-plus</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
@@ -819,20 +832,42 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.11</artifactId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
@@ -849,19 +884,19 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.11</artifactId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Kafka dependency -->
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
@@ -1159,19 +1194,19 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-unsafe_2.11</artifactId>
+ <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
@@ -2085,9 +2120,29 @@
</lifecycleMappingMetadata>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ </plugin>
</plugins>
</pluginManagement>
</build>
</profile>
+ <profile>
+ <id>spark3</id>
+ <properties>
+ <scala.version>2.12.10</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <spark.version>3.1.1</spark.version>
+ <spark.version.dir>spark31</spark.version.dir>
+ <jackson.version>2.10.0</jackson.version>
+ </properties>
+ <!--
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ -->
+ </profile>
</profiles>
</project>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index a5411c8..e9fc1b9 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -210,12 +210,12 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 8a039c0..6af1d53 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -483,7 +483,7 @@ public class QueryService extends BasicService {
logger.warn("Write metric error.", th);
}
if (sqlResponse.getIsException())
- throw new
InternalErrorException(sqlResponse.getExceptionMessage());
+ throw new
InternalErrorException(sqlResponse.getExceptionMessage(),
sqlResponse.getThrowable());
return sqlResponse;
diff --git a/server/pom.xml b/server/pom.xml
index 82114fb..fc7a60c 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -277,7 +277,7 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
@@ -325,19 +325,19 @@
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.11</artifactId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index f54e5d7..2b61666 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -44,7 +44,7 @@
<!-- Provided -->
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
</dependency>
<!-- Env & Test -->
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 02d08d5..21c39c4 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -113,7 +113,7 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index 0e2c258..48f0e13 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -53,7 +53,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
diff --git a/stream-source-kafka/pom.xml b/stream-source-kafka/pom.xml
index cdec1d4..6b7f1d2 100644
--- a/stream-source-kafka/pom.xml
+++ b/stream-source-kafka/pom.xml
@@ -59,7 +59,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
</dependency>
<dependency>