This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 276f906 KYLIN-4747 Use the first dimension column as sort column
within a partiton
276f906 is described below
commit 276f906a35ce9f868b444b0a95c4d57648e07b23
Author: Zhichao Zhang <[email protected]>
AuthorDate: Fri Sep 4 09:28:10 2020 +0800
KYLIN-4747 Use the first dimension column as sort column within a partiton
---
.../apache/kylin/engine/spark/job/NSparkCubingUtil.java | 14 ++++----------
.../org/apache/kylin/engine/spark/job/CubeBuildJob.java | 4 ++--
.../org/apache/kylin/engine/spark/job/CubeMergeJob.java | 13 ++++++++-----
.../org/apache/kylin/engine/spark/utils/Repartitioner.java | 4 ++--
4 files changed, 16 insertions(+), 19 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
index 5438896..6a1f9f1 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
@@ -23,8 +23,8 @@ import org.apache.kylin.metadata.model.Segments;
import org.apache.spark.sql.Column;
import org.spark_project.guava.collect.Sets;
+import java.util.Collection;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -51,7 +51,7 @@ public class NSparkCubingUtil {
return getColumns(ret);
}
- public static Column[] getColumns(Set<Integer> indices) {
+ public static Column[] getColumns(Collection<Integer> indices) {
Column[] ret = new Column[indices.size()];
int index = 0;
for (Integer i : indices) {
@@ -61,14 +61,8 @@ public class NSparkCubingUtil {
return ret;
}
- public static Column[] getColumns(List<Integer> indices) {
- Column[] ret = new Column[indices.size()];
- int index = 0;
- for (Integer i : indices) {
- ret[index] = new Column(String.valueOf(i));
- index++;
- }
- return ret;
+ public static Column getFirstColumn(Collection<Integer> indices) {
+ return getColumns(indices)[0];
}
private static final Pattern DOT_PATTERN =
Pattern.compile("(\\S+)\\.(\\D+)");
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index bbf50e8..954434d 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -310,7 +310,7 @@ public class CubeBuildJob extends SparkApplication {
ss.sparkContext().setJobDescription("build " +
layoutEntity.getId() + " from parent " + parentName);
Set<Integer> orderedDims =
layoutEntity.getOrderedDimensions().keySet();
Dataset<Row> afterSort =
afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
-
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
+
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(orderedDims));
saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
} else {
Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent,
dimIndexes, cuboid.getOrderedMeasures(),
@@ -321,7 +321,7 @@ public class CubeBuildJob extends SparkApplication {
Dataset<Row> afterSort = afterAgg
.select(NSparkCubingUtil.getColumns(rowKeys,
layoutEntity.getOrderedMeasures().keySet()))
-
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
+
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(rowKeys));
saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 3d54492..2772118 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.job;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.kylin.cube.CubeInstance;
@@ -91,17 +92,19 @@ public class CubeMergeJob extends SparkApplication {
Dataset<Row> afterSort;
if (layout.isTableIndex()) {
- afterSort =
afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet()));
+ afterSort =
+
afterMerge.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet()));
} else {
- Column[] dimsCols =
NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet());
- Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge,
layout.getOrderedDimensions().keySet(),
+ Set<Integer> dimColumns =
layout.getOrderedDimensions().keySet();
+ Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge,
dimColumns,
layout.getOrderedMeasures(), spanningTree, false);
- afterSort = afterAgg.sortWithinPartitions(dimsCols);
+ afterSort = afterAgg.sortWithinPartitions(
+ NSparkCubingUtil.getFirstColumn(dimColumns));
}
buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity()
{
@Override
public String getName() {
- return "merge-layout-" + layout.getId();
+ return "merge-cuboid-" + layout.getId();
}
@Override
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index efaa7d0..453704e 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -153,12 +153,12 @@ public class Repartitioner {
//ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled",
"false");
data = storage.getFrom(tempPath,
ss).repartition(repartitionNum,
NSparkCubingUtil.getColumns(getShardByColumns()))
- .sortWithinPartitions(sortCols);
+ .sortWithinPartitions(sortCols[0]);
} else {
// repartition for single file size is too small
logger.info("repartition to {}", repartitionNum);
data = storage.getFrom(tempPath,
ss).repartition(repartitionNum)
- .sortWithinPartitions(sortCols);
+ .sortWithinPartitions(sortCols[0]);
}
storage.saveTo(path, data, ss);