This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new d7ca7a5 KYLIN-4918 Support Cube Level configuration in FilePruner
d7ca7a5 is described below
commit d7ca7a5918f28ec7bf8de187014451c5709456e4
Author: zhengshengjun <[email protected]>
AuthorDate: Fri Mar 5 11:25:38 2021 +0800
KYLIN-4918 Support Cube Level configuration in FilePruner
---
.../spark/sql/execution/datasource/FilePruner.scala | 15 +++++++--------
.../sql/execution/datasource/ResetShufflePartition.scala | 12 +++++-------
2 files changed, 12 insertions(+), 15 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index f3fe76d..b6008c2 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -21,7 +21,6 @@ package org.apache.spark.sql.execution.datasource
import java.sql.{Date, Timestamp}
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.DateFormat
import org.apache.kylin.cube.cuboid.Cuboid
import org.apache.kylin.cube.CubeInstance
@@ -78,6 +77,9 @@ class FilePruner(cubeInstance: CubeInstance,
val options: Map[String, String])
extends FileIndex with ResetShufflePartition with Logging {
+ val MAX_SHARDING_SIZE_PER_TASK: Long =
+ cubeInstance.getConfig.getMaxShardingSizeMBPerTask * 1024 * 1024
+
private lazy val segmentDirs: Seq[SegmentDirectory] = {
cubeInstance.getSegments.asScala
.filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => {
@@ -170,7 +172,7 @@ class FilePruner(cubeInstance: CubeInstance,
}
private def genShardSpec(selected: Seq[SegmentDirectory]): Option[ShardSpec]
= {
- if (!KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled ||
selected.isEmpty) {
+ if (!cubeInstance.getConfig.isShardingJoinOptEnabled || selected.isEmpty) {
None
} else {
val segments = selected.par.map { segDir =>
@@ -190,9 +192,9 @@ class FilePruner(cubeInstance: CubeInstance,
(FilePruner.getPartitionId(f.getPath), f.getLen)
).groupBy(_._1).mapValues(_.map(_._2).sum)
// if there are some partition ids which the file size exceeds the
threshold
- if (partitionSizePerId.exists(_._2 >
FilePruner.MAX_SHARDING_SIZE_PER_TASK)) {
+ if (partitionSizePerId.exists(_._2 > MAX_SHARDING_SIZE_PER_TASK)) {
logInfo(s"There are some partition ids which the file size exceeds
the " +
- s"threshold size ${FilePruner.MAX_SHARDING_SIZE_PER_TASK}, skip
shard join.")
+ s"threshold size ${MAX_SHARDING_SIZE_PER_TASK}, skip shard join.")
None
} else {
val sortColumns = if (segments.length == 1) {
@@ -259,7 +261,7 @@ class FilePruner(cubeInstance: CubeInstance,
// QueryContextFacade.current().record("shard_pruning")
val totalFileSize = selected.flatMap(_.files).map(_.getLen).sum
logInfo(s"After files pruning, total file size is ${totalFileSize}")
- setShufflePartitions(totalFileSize, session)
+ setShufflePartitions(totalFileSize, session, cubeInstance.getConfig)
logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble /
1000000} ms")
if (selected.isEmpty) {
val value = Seq.empty[PartitionDirectory]
@@ -447,9 +449,6 @@ class FilePruner(cubeInstance: CubeInstance,
object FilePruner {
- val MAX_SHARDING_SIZE_PER_TASK: Long = KylinConfig.getInstanceFromEnv
- .getMaxShardingSizeMBPerTask * 1024 * 1024
-
def getPartitionId(p: Path): Int = {
// path like:
part-00001-91f13932-3d5e-4f85-9a56-d1e2b47d0ccb-c000.snappy.parquet
// we need to get 00001.
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index 1549c45..6724bce 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -23,17 +23,15 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.utils.SparderUtils
trait ResetShufflePartition extends Logging {
- val PARTITION_SPLIT_BYTES: Long =
- KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024
// 64MB
- def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
+ def setShufflePartitions(bytes: Long, sparkSession: SparkSession, conf:
KylinConfig): Unit = {
QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
val defaultParallelism =
SparderUtils.getTotalCore(sparkSession.sparkContext.getConf)
- val kylinConfig = KylinConfig.getInstanceFromEnv
- val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) {
- kylinConfig.getSparkSqlShufflePartitions
+ val partitionsNum = if (conf.getSparkSqlShufflePartitions != -1) {
+ conf.getSparkSqlShufflePartitions
} else {
- Math.min(QueryContextFacade.current().getSourceScanBytes /
PARTITION_SPLIT_BYTES + 1,
+ Math.min(QueryContextFacade.current().getSourceScanBytes /
+ (conf.getQueryPartitionSplitSizeMB * 1024 * 1024) + 1,
defaultParallelism).toInt
}
// when hitting cube, this will override the value of
'spark.sql.shuffle.partitions'