This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 588e08b KYLIN-4877 Use all dimension columns as sort columns when
saving cuboid data
588e08b is described below
commit 588e08b465115a39810819102074c80161e6c790
Author: Zhichao Zhang <[email protected]>
AuthorDate: Mon Jan 18 17:27:29 2021 +0800
KYLIN-4877 Use all dimension columns as sort columns when saving cuboid data
---
.../main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java | 4 ++--
.../main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java | 4 ++--
.../main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 51f9f2c..b463dad 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -421,7 +421,7 @@ public class CubeBuildJob extends SparkApplication {
ss.sparkContext().setJobDescription("build " +
layoutEntity.getId() + " from parent " + parentName);
Set<Integer> orderedDims =
layoutEntity.getOrderedDimensions().keySet();
Dataset<Row> afterSort =
afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
-
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(orderedDims));
+
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
} else {
Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent,
dimIndexes, cuboid.getOrderedMeasures(),
@@ -432,7 +432,7 @@ public class CubeBuildJob extends SparkApplication {
Dataset<Row> afterSort = afterAgg
.select(NSparkCubingUtil.getColumns(rowKeys,
layoutEntity.getOrderedMeasures().keySet()))
-
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(rowKeys));
+
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index a716d6f..12e939d 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -99,13 +99,13 @@ public class CubeMergeJob extends SparkApplication {
Dataset<Row> afterSort;
if (layout.isTableIndex()) {
afterSort =
-
afterMerge.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet()));
+
afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet()));
} else {
Set<Integer> dimColumns =
layout.getOrderedDimensions().keySet();
Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge,
dimColumns,
layout.getOrderedMeasures(), spanningTree, false);
afterSort = afterAgg.sortWithinPartitions(
- NSparkCubingUtil.getFirstColumn(dimColumns));
+ NSparkCubingUtil.getColumns(dimColumns));
}
buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity()
{
@Override
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index e40ed64..01cc9f1 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -157,12 +157,12 @@ public class Repartitioner {
//ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled",
"false");
data = storage.getFrom(tempPath,
ss).repartition(repartitionNum,
NSparkCubingUtil.getColumns(getShardByColumns()))
- .sortWithinPartitions(sortCols[0]);
+ .sortWithinPartitions(sortCols);
} else {
// repartition for single file size is too small
logger.info("Cuboid[{}] repartition to {}", cuboid,
repartitionNum);
data = storage.getFrom(tempPath,
ss).repartition(repartitionNum)
- .sortWithinPartitions(sortCols[0]);
+ .sortWithinPartitions(sortCols);
}
storage.saveTo(path, data, ss);