This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new 23365b6 KYLIN-5011 HotFix for scatter skew data in dict-encoding step
23365b6 is described below
commit 23365b6cae8939e6bbefb328d1c80d4236afda34
Author: yaqian.zhang <[email protected]>
AuthorDate: Thu Aug 12 16:36:53 2021 +0800
KYLIN-5011 HotFix for scatter skew data in dict-encoding step
---
.../org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index c9cea1f..cf5d07c 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -53,7 +53,7 @@ object CubeTableEncoder extends Logging {
val bucketThreshold = seg.kylinconf.getGlobalDictV2ThresholdBucketSize
val minBucketSize: Long = sourceCnt / bucketThreshold
- var repartitionSizeAfterEncode = 0;
+ var repartitionSizeAfterEncode = 0
cols.asScala.foreach(
ref => {
val globalDict = new NGlobalDictionary(seg.project,
ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
@@ -72,11 +72,12 @@ object CubeTableEncoder extends Logging {
var encodeCol = dict_encode(col(encodeColRef).cast(StringType),
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
val columns = partitionedDs.schema.map(ty => col(ty.name))
+ var scatterSkewedData = false
if (seg.kylinconf.detectDataSkewInDictEncodingEnabled()) {
//find skewed data in dict-encoding step
val castEncodeColRef = col(encodeColRef).cast(StringType)
val sampleData =
ds.select(castEncodeColRef).sample(seg.kylinconf.sampleRateInEncodingSkewDetection()).cache()
- val totalCount = sampleData.count();
+ val totalCount = sampleData.count()
val skewDictStorage = new
Path(seg.kylinconf.getJobTmpDir(seg.project) +
"/" + jobId + "/skewed_data/" + ref.identity)
val skewedDict = new Object2LongOpenHashMap[String]()
@@ -90,6 +91,7 @@ object CubeTableEncoder extends Logging {
//save skewed data dict
if (skewedDict.size() > 0) {
+ scatterSkewedData = true
val kryo = new Kryo()
val fs = skewDictStorage.getFileSystem(new Configuration())
if (fs.exists(skewDictStorage)) {
@@ -112,7 +114,8 @@ object CubeTableEncoder extends Logging {
.repartition(enlargedBucketSize, col("scatter_skew_data_" +
ref.columnName))
.select(columns ++ Seq(encodeCol): _*)
}
- } else {
+ }
+ if (!scatterSkewedData) {
partitionedDs = partitionedDs
.repartition(enlargedBucketSize,
col(encodeColRef).cast(StringType))
.select(columns ++ Seq(encodeCol): _*)