This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 914b97f KYLIN-5011 Detect and scatter skewed data in dict encoding
step (#1662)
914b97f is described below
commit 914b97f5cf2347030525140038d060178b93f955
Author: zhengshengjun <[email protected]>
AuthorDate: Sun Jun 27 21:34:55 2021 +0800
KYLIN-5011 Detect and scatter skewed data in dict encoding step (#1662)
Co-authored-by: Xiaoxiang Yu <[email protected]>
---
.../org/apache/kylin/common/KylinConfigBase.java | 28 ++++++++++
examples/test_case_data/localmeta/kylin.properties | 3 +-
.../org/apache/spark/dict/NBucketDictionary.java | 21 +++++++-
.../org/apache/spark/dict/NGlobalDictionary.java | 6 ++-
.../org/apache/spark/sql/KylinFunctions.scala | 6 ++-
.../catalyst/expressions/KylinExpresssions.scala | 60 ++++++++++++++++++++-
.../engine/spark/builder/CreateFlatTable.scala | 5 +-
.../engine/spark/builder/CubeTableEncoder.scala | 61 +++++++++++++++++++---
.../engine/spark/job/ParentSourceChooser.scala | 2 +-
.../engine/spark/LocalWithSparkSessionTest.java | 2 +-
.../engine/spark/builder/TestCreateFlatTable.scala | 10 +++-
11 files changed, 186 insertions(+), 18 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ce5d5e8..39f8eae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3168,4 +3168,32 @@ public abstract class KylinConfigBase implements
Serializable {
public boolean rePartitionEncodedDatasetWithRowKey() {
return
Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset",
"false"));
}
+
+ /*
+ * Detect dataset skew in dictionary encode step.
+ * */
+ public boolean detectDataSkewInDictEncodingEnabled() {
+ return
Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding",
"false"));
+ }
+
+ /*
+ * In some data skew cases, the repartition step during dictionary encoding
will be slow.
+ * We can choose to sample from the dataset to detect skewed. This
configuration is used to set the sample rate.
+ * */
+ public double sampleRateInEncodingSkewDetection() {
+ return
Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate",
"0.1"));
+ }
+
+ /*
+ * In KYLIN4, dictionaries are hashed into several buckets, column data are
repartitioned by the same hash algorithm
+ * during encoding step too. In data skew cases, the repartition step will
be very slow. Kylin will automatically
+ * sample from the source to detect skewed data and repartition these
skewed data to random partitions.
+ * This configuration is used to set the skew data threshhold, valued from
0 to 1.
+ * e.g.
+ * if you set this value to 0.05, for each value that takes up more than
5% percent of the total will be regarded
+ * as skew data, as a result the skewed data will be no more than 20
records
+ * */
+ public double skewPercentageThreshHold() {
+ return
Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold",
"0.05"));
+ }
}
diff --git a/examples/test_case_data/localmeta/kylin.properties
b/examples/test_case_data/localmeta/kylin.properties
index d17ce8c..4b015c4 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -162,4 +162,5 @@ kylin.source.jdbc.pass=
kylin.query.auto-sparder-context=false
kylin.metrics.query-cache.expire-seconds=5
-kylin.metrics.query-cache.max-entries=2
\ No newline at end of file
+kylin.metrics.query-cache.max-entries=2
+kylin.dictionary.detect.data.skew.in.encoding=true
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
index bf2a351..2e310ca 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
@@ -19,6 +19,12 @@ package org.apache.spark.dict;
import java.io.IOException;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,8 +42,9 @@ public class NBucketDictionary {
private Object2LongMap<String> absoluteDictMap;
// Relative dictionary needs to calculate dictionary code according to
NGlobalDictMetaInfo's bucketOffsets
private Object2LongMap<String> relativeDictMap;
+ private Object2LongMap<String> skewedDictMap;
- NBucketDictionary(String baseDir, String workingDir, int bucketId,
NGlobalDictMetaInfo metainfo)
+ NBucketDictionary(String baseDir, String workingDir, int bucketId,
NGlobalDictMetaInfo metainfo, String skewDictStorageFile)
throws IOException {
this.workingDir = workingDir;
this.bucketId = bucketId;
@@ -49,6 +56,15 @@ public class NBucketDictionary {
this.absoluteDictMap =
globalDictStore.getBucketDict(versions[versions.length - 1], metainfo,
bucketId);
}
this.relativeDictMap = new Object2LongOpenHashMap<>();
+ if (!StringUtils.isEmpty(skewDictStorageFile)) {
+ FileSystem fs = FileSystem.get(new Configuration());
+ if (fs.exists(new Path(skewDictStorageFile))) {
+ Kryo kryo = new Kryo();
+ Input input = new Input(fs.open(new
Path(skewDictStorageFile)));
+ skewedDictMap = (Object2LongMap<String>)
kryo.readClassAndObject(input);
+ input.close();
+ }
+ }
}
NBucketDictionary(String workingDir) {
@@ -72,6 +88,9 @@ public class NBucketDictionary {
}
public long encode(Object value) {
+ if (null != skewedDictMap &&
skewedDictMap.containsKey(value.toString())) {
+ return skewedDictMap.getLong(value.toString());
+ }
return absoluteDictMap.getLong(value.toString());
}
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
index 651387d..a74022c 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
@@ -39,6 +39,7 @@ public class NGlobalDictionary implements Serializable {
private String sourceTable;
private String sourceColumn;
private boolean isFirst = true;
+ private String skewDictStorageFile;
public String getResourceDir() {
return "/" + project + HadoopUtil.GLOBAL_DICT_STORAGE_ROOT + "/" +
sourceTable + "/" + sourceColumn + "/";
@@ -71,13 +72,16 @@ public class NGlobalDictionary implements Serializable {
if (metadata != null) {
isFirst = false;
}
+ if (dictInfo.length >= 5) {
+ skewDictStorageFile = dictInfo[4];
+ }
}
public NBucketDictionary loadBucketDictionary(int bucketId) throws
IOException {
if (null == metadata) {
metadata = getMetaInfo();
}
- return new NBucketDictionary(baseDir, getWorkingDir(), bucketId,
metadata);
+ return new NBucketDictionary(baseDir, getWorkingDir(), bucketId,
metadata, skewDictStorageFile);
}
public NBucketDictionary createNewBucketDictionary() {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 8e112c1..63c90dc 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode,
BinaryExpression,
DictEncode, Expression, ExpressionInfo, ExpressionUtils,
ImplicitCastInputTypes, In,
- KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase,
SplitPart, Sum0,
+ KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase,
ScatterSkewData, SplitPart, Sum0,
TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount,
PreciseCountDistinct}
@@ -44,6 +44,10 @@ object KylinFunctions {
Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr))
}
+ def scatter_skew_data(column: Column, skewDataStorage: Column): Column = {
+ Column(ScatterSkewData(column.expr, skewDataStorage.expr))
+ }
+
// special lit for KYLIN.
def k_lit(literal: Any): Column = literal match {
case c: Column => c
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index a1a45fa..d25896c 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -24,7 +24,7 @@ import org.apache.spark.dict.{NBucketDictionary,
NGlobalDictionary}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator,
CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator,
CodegenContext, ExprCode, FalseLiteral}
import org.apache.spark.sql.types._
import org.roaringbitmap.longlong.Roaring64NavigableMap
@@ -484,4 +484,62 @@ case class ApproxCountDistinctDecode(_left: Expression,
_right: Expression)
override def dataType: DataType = LongType
override def prettyName: String = "approx_count_distinct_decode"
+}
+
+case class ScatterSkewData(left: Expression, right: Expression) extends
BinaryExpression with ExpectsInputTypes {
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode) :
ExprCode = {
+
+ val rand = ctx.addMutableState("java.util.Random", "rand")
+ val skewData =
ctx.addMutableState("it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap",
+ "skewData")
+ val skewDataStorage = right.simpleString
+
+ val initParamsFuncName = ctx.addNewFunction(s"initParams",
+ s"""
+ | private void initParams() {
+ | ${rand} = new java.util.Random();
+ | com.esotericsoftware.kryo.Kryo kryo = new
com.esotericsoftware.kryo.Kryo();
+ | try {
+ | org.apache.hadoop.fs.FileSystem fs =
org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration());
+ | if (fs.exists(new
org.apache.hadoop.fs.Path("${skewDataStorage}"))) {
+ | com.esotericsoftware.kryo.io.Input input = new
com.esotericsoftware.kryo.io.Input(
+ | fs.open(new
org.apache.hadoop.fs.Path("${skewDataStorage}")));
+ | ${skewData} =
(it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap<String>)
kryo.readClassAndObject(input);
+ | input.close();
+ | }
+ | } catch (java.io.IOException e) {
+ | throw new java.lang.RuntimeException(e);
+ | }
+ | }
+ """.stripMargin)
+
+ val addSalt = ctx.addNewFunction(s"addSalt",
+ s"""
+ | private org.apache.spark.unsafe.types.UTF8String
addSalt(org.apache.spark.unsafe.types.UTF8String val) {
+ | if (null != ${skewData} && (null == val ||
${skewData}.containsKey(val.toString()))) {
+ | return org.apache.spark.unsafe.types.UTF8String.fromString(
+ | java.lang.Integer.toString(${rand}.nextInt()));
+ | } else {
+ | return val;
+ | }
+ | }
+ """.stripMargin)
+
+ ctx.addPartitionInitializationStatement(s"$initParamsFuncName();");
+
+ val leftGen = left.genCode(ctx)
+ val rightGen = right.genCode(ctx)
+ val resultCode = s"""${ev.value} = $addSalt(${leftGen.value});"""
+
+ ev.copy(code = code"""
+ ${leftGen.code}
+ ${rightGen.code}
+ ${CodeGenerator.javaType(dataType)} ${ev.value} =
${CodeGenerator.defaultValue(dataType)};
+ $resultCode""", isNull = FalseLiteral)
+
+ }
+
+ override def dataType: DataType = StringType
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType,
AnyDataType)
}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
index 6e8dc22..3bfd519 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
@@ -36,7 +36,8 @@ import scala.collection.JavaConverters._
class CreateFlatTable(val seg: SegmentInfo,
val toBuildTree: SpanningTree,
val ss: SparkSession,
- val sourceInfo: NBuildSourceInfo) extends Logging {
+ val sourceInfo: NBuildSourceInfo,
+ val jobId: String) extends Logging {
import org.apache.kylin.engine.spark.builder.CreateFlatTable._
@@ -108,7 +109,7 @@ class CreateFlatTable(val seg: SegmentInfo,
val matchedCols = filterCols(ds, encodeCols)
var encodeDs = ds
if (!matchedCols.isEmpty) {
- encodeDs = CubeTableEncoder.encodeTable(ds, seg, matchedCols.asJava)
+ encodeDs = CubeTableEncoder.encodeTable(ds, seg, matchedCols.asJava,
jobId)
}
encodeDs
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index d2a34a8..8a5fe20 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -17,8 +17,13 @@
*/
package org.apache.kylin.engine.spark.builder
-import java.util
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Output
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path}
+import java.util
import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.ENCODE_SUFFIX
import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo}
@@ -27,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.KylinFunctions._
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.types.StringType
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{Dataset, Row, functions}
import org.apache.spark.utils.SparkVersionUtils
import scala.collection.JavaConverters._
@@ -35,7 +40,7 @@ import scala.collection.mutable._
object CubeTableEncoder extends Logging {
- def encodeTable(ds: Dataset[Row], seg: SegmentInfo, cols:
util.Set[ColumnDesc]): Dataset[Row] = {
+ def encodeTable(ds: Dataset[Row], seg: SegmentInfo, cols:
util.Set[ColumnDesc], jobId: String): Dataset[Row] = {
if (SparkVersionUtils.isLessThanSparkVersion("2.4", true)) {
assert(!ds.sparkSession.conf.get("spark.sql.adaptive.enabled",
"false").toBoolean,
"Parameter 'spark.sql.adaptive.enabled' must be false when encode
tables.")
@@ -61,15 +66,57 @@ object CubeTableEncoder extends Logging {
val encodeColRef = convertFromDot(ref.identity)
val columnIndex = structType.fieldIndex(encodeColRef)
- val dictParams = Array(seg.project, ref.tableAliasName,
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
+ var dictParams = Array(seg.project, ref.tableAliasName,
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
.mkString(SEPARATOR)
val aliasName =
structType.apply(columnIndex).name.concat(ENCODE_SUFFIX)
- val encodeCol = dict_encode(col(encodeColRef).cast(StringType),
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
- val columns = partitionedDs.schema.map(ty => col(ty.name)) ++
Seq(encodeCol)
+ var encodeCol = dict_encode(col(encodeColRef).cast(StringType),
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
+ val columns = partitionedDs.schema.map(ty => col(ty.name))
+
+ if (seg.kylinconf.detectDataSkewInDictEncodingEnabled()) {
+ //find skewed data in dict-encoding step
+ val castEncodeColRef = col(encodeColRef).cast(StringType)
+ val sampleData =
ds.select(castEncodeColRef).sample(seg.kylinconf.sampleRateInEncodingSkewDetection()).cache()
+ val totalCount = sampleData.count();
+ val skewDictStorage = new
Path(seg.kylinconf.getJobTmpDir(seg.project) +
+ "/" + jobId + "/skewed_data/" + ref.identity)
+ val skewedDict = new Object2LongOpenHashMap[String]()
+ sampleData.groupBy(encodeColRef)
+ .agg(functions.count(lit(1)).alias("count_value"))
+ .filter(col("count_value") > totalCount *
seg.kylinconf.skewPercentageThreshHold())
+ .repartition(enlargedBucketSize, castEncodeColRef)
+ .select(Seq(castEncodeColRef, encodeCol): _*)
+ .collect().foreach(row => skewedDict.put(row.getString(0),
row.getLong(1)));
+ sampleData.unpersist()
+
+ //save skewed data dict
+ if (skewedDict.size() > 0) {
+ val kryo = new Kryo()
+ val fs = skewDictStorage.getFileSystem(new Configuration())
+ if (fs.exists(skewDictStorage)) {
+ fs.delete(skewDictStorage, true)
+ }
+ val output = new Output(fs.create(skewDictStorage))
+ kryo.writeClassAndObject(output, skewedDict)
+ output.close()
+ //define repartition expression: repartition skewed data to random
partition
+ val scatterColumn = scatter_skew_data(castEncodeColRef,
lit(skewDictStorage.toString))
+ .alias("scatter_skew_data_" + ref.columnName)
+
+ //encode cuboid table with skewed data dictionary
+ dictParams = Array(seg.project, ref.tableAliasName,
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory, skewDictStorage.toString)
+ .mkString(SEPARATOR)
+ encodeCol = dict_encode(col(encodeColRef).cast(StringType),
lit(dictParams), lit(bucketSize).cast(StringType)).alias(aliasName)
+
+ partitionedDs = partitionedDs.select(columns ++
Seq(scatterColumn): _*)
+ .repartition(enlargedBucketSize, col("scatter_skew_data_" +
ref.columnName))
+ .select(columns ++ Seq(encodeCol): _*)
+ return partitionedDs;
+ }
+ }
partitionedDs = partitionedDs
.repartition(enlargedBucketSize, col(encodeColRef).cast(StringType))
- .select(columns: _*)
+ .select(columns ++ Seq(encodeCol): _*)
}
)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index bd54d0b..fdb04ab 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -185,7 +185,7 @@ class ParentSourceChooser(
// sourceInfo.setViewFactTablePath(viewPath)
// val needJoin =
ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree)
- val flatTable = new CreateFlatTable(segInfo, toBuildTree, ss, sourceInfo)
+ val flatTable = new CreateFlatTable(segInfo, toBuildTree, ss, sourceInfo,
jobId)
val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true)
sourceInfo.setFlatTableDS(afterJoin)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index cfd84aa..ae2bb15 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -312,7 +312,7 @@ public class LocalWithSparkSessionTest extends
LocalFileMetadataTestCase impleme
CreateFlatTable flatTable = new CreateFlatTable(
MetadataConverter.getSegmentInfo(segment.getCubeInstance(),
segment.getUuid(),
- segment.getName(),
segment.getStorageLocationIdentifier()), null, ss, null);
+ segment.getName(),
segment.getStorageLocationIdentifier()), null, ss, null,
ss.sparkContext().applicationId());
Dataset<Row> ds = flatTable.generateDataset(false, true);
return ds;
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index f8dd610..df0273c 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -96,9 +96,14 @@ class TestCreateFlatTable extends SparderBaseFunSuite with
SharedSparkSession wi
val afterJoin1 = generateFlatTable(seg1, cube, true)
afterJoin1.collect()
+
if (SPARK_VERSION.startsWith("2.4")) {
val jobs = helper.getJobsByGroupId(groupId)
- Assert.assertEquals(jobs.length, 15)
+ if (seg1.getConfig.detectDataSkewInDictEncodingEnabled()) {
+ Assert.assertEquals(jobs.length, 18)
+ } else {
+ Assert.assertEquals(jobs.length, 15)
+ }
} else if (SPARK_VERSION.startsWith("3.1")) {
// in Spark 3.x, BroadcastExchangeExec overwrites job group ID
val jobs = helper.getJobsByGroupId(null)
@@ -140,7 +145,8 @@ class TestCreateFlatTable extends SparderBaseFunSuite with
SharedSparkSession wi
private def generateFlatTable(segment: CubeSegment, cube: CubeInstance,
needEncode: Boolean): Dataset[Row] = {
val seg = MetadataConverter.getSegmentInfo(segment.getCubeInstance,
segment.getUuid, segment.getName, segment.getStorageLocationIdentifier)
val spanningTree = new
ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts))
- val flatTable = new CreateFlatTable(seg, spanningTree, spark, null)
+ //for test case there is no build job id
+ val flatTable = new CreateFlatTable(seg, spanningTree, spark, null,
spark.sparkContext.applicationId)
val afterJoin = flatTable.generateDataset(needEncode)
afterJoin
}