This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new b428f1e KYLIN-4893 Optimize query performance when using shard by
column
b428f1e is described below
commit b428f1e1ab6a48101bec03e515d135c57af32878
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Feb 3 15:20:01 2021 +0800
KYLIN-4893 Optimize query performance when using shard by column
(cherry picked from commit bd5ab5e61ca4dc0c5ccabb66b17b4be1642ce13d)
(cherry picked from commit 8fa9d8d210b2755325999ed3e7496a320e3bd7f9)
---
.../org/apache/kylin/common/KylinConfigBase.java | 24 +++++++++++---
.../sql/execution/datasource/FilePruner.scala | 38 +++++++++++++++-------
.../datasource/ResetShufflePartition.scala | 6 ++--
3 files changed, 50 insertions(+), 18 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4941595..0fd24e2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2883,22 +2883,36 @@ public abstract class KylinConfigBase implements
Serializable {
private String getLogPropertyFile(String filename) {
if (isDevEnv()) {
- return Paths.get(getKylinHomeWithoutWarn(), "build",
"conf").toString() + File.separator + filename;
+ return Paths.get(getKylinHomeWithoutWarn(),
+ "build", "conf").toString() + File.separator + filename;
} else {
- return Paths.get(getKylinHomeWithoutWarn(), "conf").toString() +
File.separator + filename;
+ return Paths.get(getKylinHomeWithoutWarn(),
+ "conf").toString() + File.separator + filename;
}
}
public int getQueryPartitionSplitSizeMB() {
- return
Integer.parseInt(getOptional("kylin.query.spark-engine.partition-split-size-mb",
"64"));
+ return
Integer.parseInt(getOptional("kylin.query.spark-engine.partition-split-size-mb",
+ "64"));
+ }
+
+ /**
+ * The max size in mb handled per task when using shard by column,
+ * if the sharding size exceeds this value, it will fall back to
non-sharding read RDD
+ */
+ public int getMaxShardingSizeMBPerTask() {
+ return
Integer.parseInt(getOptional("kylin.query.spark-engine.max-sharding-size-mb",
+ "64"));
}
public boolean isShardingJoinOptEnabled() {
- return
Boolean.parseBoolean(getOptional("kylin.query.spark-engine.expose-sharding-trait",
"true"));
+ return
Boolean.parseBoolean(getOptional("kylin.query.spark-engine.expose-sharding-trait",
+ "true"));
}
public int getSparkSqlShufflePartitions() {
- return
Integer.parseInt(getOptional("kylin.query.spark-engine.spark-sql-shuffle-partitions",
"-1"));
+ return
Integer.parseInt(getOptional("kylin.query.spark-engine.spark-sql-shuffle-partitions",
+ "-1"));
}
public Map<String, String> getQuerySparkConf() {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index f0f7916..513b47f 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasource
import java.sql.{Date, Timestamp}
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.DateFormat
import org.apache.kylin.cube.cuboid.Cuboid
import org.apache.kylin.cube.CubeInstance
@@ -169,28 +170,39 @@ class FilePruner(cubeInstance: CubeInstance,
}
private def genShardSpec(selected: Seq[SegmentDirectory]): Option[ShardSpec]
= {
- if (selected.isEmpty) {
+ if (!KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled ||
selected.isEmpty) {
None
} else {
val segments = selected.par.map { segDir =>
cubeInstance.getSegment(segDir.segmentName, SegmentStatusEnum.READY);
- }.toIterator.toSeq
+ }.seq
val shardNum = segments.head.getCuboidShardNum(layoutEntity.getId).toInt
// the shard num of all layout in segments must be the same
if (layoutEntity.getShardByColumns.isEmpty || segments.exists(
_.getCuboidShardNum(layoutEntity.getId).toInt != shardNum)) {
- logInfo("Shard by column is empty or segments have the different
number of shard, skip " +
- "shard join.")
+ logInfo("Shard by column is empty or segments have the different
number of shard, " +
+ "skip shard join.")
None
} else {
- val sortColumns = if (segments.length == 1) {
-
layoutEntity.getOrderedDimensions.keySet().asScala.map(_.toString).toSeq
+ // calculate the file size for each partition
+ val partitionSizePerId = selected.flatMap(_.files).map( f =>
+ (FilePruner.getPartitionId(f.getPath), f.getLen)
+ ).groupBy(_._1).mapValues(_.map(_._2).sum)
+ // if there are some partition ids which the file size exceeds the
threshold
+ if (partitionSizePerId.exists(_._2 >
FilePruner.MAX_SHARDING_SIZE_PER_TASK)) {
+ logInfo(s"There are some partition ids which the file size exceeds
the " +
+ s"threshold size ${FilePruner.MAX_SHARDING_SIZE_PER_TASK}, skip
shard join.")
+ None
} else {
- logInfo("Sort order will lost in multiple segments.")
- Seq.empty[String]
+ val sortColumns = if (segments.length == 1) {
+
layoutEntity.getOrderedDimensions.keySet().asScala.map(_.toString).toSeq
+ } else {
+ logInfo("Sort order will lost in multiple segments.")
+ Seq.empty[String]
+ }
+ Some(ShardSpec(shardNum, shardBySchema.fieldNames.toSeq,
sortColumns))
}
- Some(ShardSpec(shardNum, shardBySchema.fieldNames.toSeq, sortColumns))
}
}
}
@@ -245,8 +257,8 @@ class FilePruner(cubeInstance: CubeInstance,
// generate the ShardSpec
shardSpec = genShardSpec(selected)
// QueryContextFacade.current().record("shard_pruning")
- val totalFileSize = selected.flatMap(partition =>
partition.files).map(_.getLen).sum
- logInfo(s"totalFileSize is ${totalFileSize}")
+ val totalFileSize = selected.flatMap(_.files).map(_.getLen).sum
+ logInfo(s"After files pruning, total file size is ${totalFileSize}")
setShufflePartitions(totalFileSize, session)
logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble /
1000000} ms")
if (selected.isEmpty) {
@@ -434,6 +446,10 @@ class FilePruner(cubeInstance: CubeInstance,
}
object FilePruner {
+
+ val MAX_SHARDING_SIZE_PER_TASK: Long = KylinConfig.getInstanceFromEnv
+ .getMaxShardingSizeMBPerTask * 1024 * 1024
+
def getPartitionId(p: Path): Int = {
// path like:
part-00001-91f13932-3d5e-4f85-9a56-d1e2b47d0ccb-c000.snappy.parquet
// we need to get 00001.
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index b048283..1549c45 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.utils.SparderUtils
trait ResetShufflePartition extends Logging {
- val PARTITION_SPLIT_BYTES: Long =
KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 //
64MB
+ val PARTITION_SPLIT_BYTES: Long =
+ KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024
// 64MB
def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
@@ -37,6 +38,7 @@ trait ResetShufflePartition extends Logging {
}
// when hitting cube, this will override the value of
'spark.sql.shuffle.partitions'
sparkSession.conf.set("spark.sql.shuffle.partitions",
partitionsNum.toString)
- logInfo(s"Set partition to $partitionsNum, total bytes
${QueryContextFacade.current().getSourceScanBytes}")
+ logInfo(s"Set partition to $partitionsNum, " +
+ s"total bytes ${QueryContextFacade.current().getSourceScanBytes}")
}
}