This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9ba276029311 feat(table-services): Support clustering file groups with
earlier instants times first (#18174)
9ba276029311 is described below
commit 9ba2760293115d68e679b53f1004fa20b411fd2e
Author: Krishen <[email protected]>
AuthorDate: Thu Mar 5 14:17:53 2026 -0800
feat(table-services): Support clustering file groups with earlier instants
times first (#18174)
New config hoodie.clustering.plan.strategy.file.slices.sort.by (default
SIZE). Accepts a comma-separated list of sort fields. Setting it to
INSTANT_TIME,SIZE causes PartitionAwareClusteringPlanStrategy to sort file
slices by base file commit time ascending first, then by file size descending,
so older data is clustered first.
Changelog:
ClusteringFileSliceSortByField: New enum defining the available sort
fields: INSTANT_TIME (commit time ascending) and SIZE (file size descending).
ClusteringFileSliceComparator: New utility that parses the comma-separated
config value into a composite Comparator<FileSlice>, combining individual field
comparators in the specified order.
HoodieClusteringConfig: Added PLAN_STRATEGY_FILE_SLICES_SORT_BY config
property (key hoodie.clustering.plan.strategy.file.slices.sort.by, default
SIZE) and Builder.withFileSlicesSortBy(String).
HoodieWriteConfig: Added getFileSlicesSortBy().
PartitionAwareClusteringPlanStrategy: Replaced inline comparator logic with
a call to ClusteringFileSliceComparator.buildComparator(writeConfig).
TestSparkSizeBasedClusteringPlanStrategy: Updated tests to use
withFileSlicesSortBy("INSTANT_TIME,SIZE" / "SIZE"):
testSortByInstantTimeThenSize, testSortBySizeOnly,
testCommitTimeOrderingWithSameSizes,
testSortingBehaviorComparisonInstantTimeVsSizeOnly.
---
.../apache/hudi/config/HoodieClusteringConfig.java | 16 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../cluster/ClusteringFileSliceComparator.java | 69 ++++++++
.../cluster/ClusteringFileSliceSortByField.java | 38 +++++
.../PartitionAwareClusteringPlanStrategy.java | 8 +-
.../TestSparkSizeBasedClusteringPlanStrategy.java | 175 ++++++++++++++++++++-
6 files changed, 304 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index b489ae646fa4..eb47688f2aea 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -217,6 +217,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Whether to generate clustering plan when there is
only one file group involved, by default true");
+ public static final ConfigProperty<String> PLAN_STRATEGY_FILE_SLICES_SORT_BY
= ConfigProperty
+ .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "file.slices.sort.by")
+ .defaultValue("SIZE")
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("Comma-separated list of fields to sort file slices
by when packing files together within a partition "
+ + "to create clustering groups. "
+ + "Available fields: INSTANT_TIME (sort by commit time ascending, so
that older data files are clustered first), "
+ + "SIZE (sort by file size descending). For example,
'INSTANT_TIME,SIZE' sorts by commit time first then by size. "
+ + "Default 'SIZE' sorts by file size only.");
+
public static final ConfigProperty<String> PLAN_STRATEGY_SORT_COLUMNS =
ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns")
.noDefaultValue()
@@ -594,6 +605,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
+ public Builder withFileSlicesSortBy(String sortByFields) {
+ clusteringConfig.setValue(PLAN_STRATEGY_FILE_SLICES_SORT_BY,
sortByFields);
+ return this;
+ }
+
public Builder withInlineClustering(Boolean inlineClustering) {
clusteringConfig.setValue(INLINE_CLUSTERING,
String.valueOf(inlineClustering));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 144d62468c19..f961fca2c314 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2020,6 +2020,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getLong(HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES);
}
+ public String getFileSlicesSortBy() {
+ return getString(HoodieClusteringConfig.PLAN_STRATEGY_FILE_SLICES_SORT_BY);
+ }
+
public int getTargetPartitionsForClustering() {
return getInt(HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceComparator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceComparator.java
new file mode 100644
index 000000000000..bd06aae4290e
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceComparator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Builds a {@link Comparator} for {@link FileSlice} based on the
+ * {@link
org.apache.hudi.config.HoodieClusteringConfig#PLAN_STRATEGY_FILE_SLICES_SORT_BY}
config.
+ * The comparators for each field are combined in the order specified, so
earlier fields take priority.
+ */
+public class ClusteringFileSliceComparator {
+
+ public static Comparator<FileSlice> buildComparator(HoodieWriteConfig
config) {
+ String sortByFields = config.getFileSlicesSortBy();
+
+ List<ClusteringFileSliceSortByField> fields =
Arrays.stream(sortByFields.split(","))
+ .map(String::trim)
+ .map(s -> ClusteringFileSliceSortByField.valueOf(s.toUpperCase()))
+ .collect(Collectors.toList());
+
+ if (fields.isEmpty()) {
+ throw new HoodieClusteringException("At least one sort field must be
specified in: " + sortByFields);
+ }
+
+ Comparator<FileSlice> comparator = comparatorForField(fields.get(0),
config);
+ for (int i = 1; i < fields.size(); i++) {
+ comparator = comparator.thenComparing(comparatorForField(fields.get(i),
config));
+ }
+ return comparator;
+ }
+
+ private static Comparator<FileSlice>
comparatorForField(ClusteringFileSliceSortByField field, HoodieWriteConfig
config) {
+ switch (field) {
+ case INSTANT_TIME:
+ return Comparator.comparing(fileSlice ->
+ fileSlice.getBaseFile().map(baseFile ->
baseFile.getCommitTime()).orElse(""));
+ case SIZE:
+ return Comparator.comparing(
+ (FileSlice fileSlice) -> fileSlice.getBaseFile().map(baseFile ->
baseFile.getFileSize()).orElse(config.getParquetMaxFileSize()),
+ Comparator.reverseOrder());
+ default:
+ throw new HoodieClusteringException("Unknown file slice sort field: "
+ field);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceSortByField.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceSortByField.java
new file mode 100644
index 000000000000..991471a7907d
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringFileSliceSortByField.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+/**
+ * Fields by which file slices can be sorted when creating clustering groups.
+ */
+@EnumDescription("Fields by which file slices are sorted when creating
clustering groups. "
+ + "Multiple fields can be specified as a comma-separated list to define
sort priority.")
+public enum ClusteringFileSliceSortByField {
+
+ @EnumFieldDescription("Sort by the commit/instant time of the file slice's
base file in ascending order, "
+ + "so that older data files are clustered first (e.g. to reduce
stitching lag).")
+ INSTANT_TIME,
+
+ @EnumFieldDescription("Sort by the file size of the file slice's base file
in descending order, "
+ + "so that larger files are clustered first.")
+ SIZE
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 0d445a7e00f9..08d25c7907e1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy;
+import org.apache.hudi.table.action.cluster.ClusteringFileSliceComparator;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
import org.apache.hudi.util.Lazy;
@@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -68,11 +70,9 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
- // Sort fileSlices before dividing, which makes dividing more compact
+ Comparator<FileSlice> sortedFileSlicesComparator =
ClusteringFileSliceComparator.buildComparator(writeConfig);
List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
- sortedFileSlices.sort((o1, o2) -> (int)
- ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize()
: writeConfig.getParquetMaxFileSize())
- - (o1.getBaseFile().isPresent() ?
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+ sortedFileSlices.sort(sortedFileSlicesComparator);
long totalSizeSoFar = 0;
boolean partialScheduled = false;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
index 1f95ea472f7a..d7f7cfc4a3d2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
@@ -19,10 +19,12 @@
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
@@ -82,10 +84,179 @@ public class TestSparkSizeBasedClusteringPlanStrategy {
Assertions.assertEquals(1,
clusteringGroups.get(1).getNumOutputFileGroups());
}
+ @Test
+ public void testSortByInstantTimeThenSize() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+ .withClusteringMaxBytesInGroup(750)
+ .withClusteringTargetFileMaxBytes(1000)
+ .withClusteringPlanSmallFileLimit(50)
+ .withClusteringMaxNumGroups(1)
+ .withFileSlicesSortBy("INSTANT_TIME,SIZE")
+ .build())
+ .build();
+
+ SparkSizeBasedClusteringPlanStrategy planStrategy = new
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+ ArrayList<FileSlice> fileSlices = new ArrayList<>();
+ fileSlices.add(createFileSliceWithCommitTime(400, "003"));
+ fileSlices.add(createFileSliceWithCommitTime(500, "001"));
+ fileSlices.add(createFileSliceWithCommitTime(200, "001"));
+ fileSlices.add(createFileSliceWithCommitTime(100, "002"));
+ fileSlices.add(createFileSliceWithCommitTime(300, "002"));
+
+ Pair<Stream<HoodieClusteringGroup>, Boolean> result =
+ planStrategy.buildClusteringGroupsForPartition("p0", fileSlices);
+ List<HoodieClusteringGroup> clusteringGroups =
+ ((Stream<HoodieClusteringGroup>)
result.getLeft()).collect(Collectors.toList());
+
+ Assertions.assertEquals(1, clusteringGroups.size());
+
+ List<HoodieSliceInfo> slicesInPlan = clusteringGroups.get(0).getSlices();
+ Assertions.assertEquals(2, slicesInPlan.size());
+ for (HoodieSliceInfo slice : slicesInPlan) {
+ Assertions.assertTrue(slice.getDataFilePath().contains("001"),
+ "Expected only slices from earliest commit '001', but found: " +
slice.getDataFilePath());
+ }
+
+ Assertions.assertTrue(result.getRight(), "Should indicate partial
scheduling since not all slices were processed");
+ }
+
+ @Test
+ public void testSortBySizeOnly() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+ .withClusteringMaxBytesInGroup(2000)
+ .withClusteringTargetFileMaxBytes(1000)
+ .withClusteringPlanSmallFileLimit(500)
+ .withFileSlicesSortBy("SIZE")
+ .build())
+ .build();
+
+ SparkSizeBasedClusteringPlanStrategy planStrategy = new
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+ ArrayList<FileSlice> fileSlices = new ArrayList<>();
+ fileSlices.add(createFileSliceWithCommitTime(400, "003"));
+ fileSlices.add(createFileSliceWithCommitTime(200, "001"));
+ fileSlices.add(createFileSliceWithCommitTime(300, "002"));
+ fileSlices.add(createFileSliceWithCommitTime(500, "001"));
+
+ Stream<HoodieClusteringGroup> clusteringGroupStream =
+ (Stream<HoodieClusteringGroup>)
planStrategy.buildClusteringGroupsForPartition("p0", fileSlices).getLeft();
+ List<HoodieClusteringGroup> clusteringGroups =
clusteringGroupStream.collect(Collectors.toList());
+
+ Assertions.assertTrue(clusteringGroups.size() > 0);
+
+ HoodieClusteringGroup firstGroup = clusteringGroups.get(0);
+ Assertions.assertTrue(firstGroup.getSlices().size() > 0);
+ }
+
+ @Test
+ public void testCommitTimeOrderingWithSameSizes() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+ .withClusteringMaxBytesInGroup(300)
+ .withClusteringTargetFileMaxBytes(1000)
+ .withClusteringPlanSmallFileLimit(1000)
+ .withFileSlicesSortBy("INSTANT_TIME,SIZE")
+ .build())
+ .build();
+
+ SparkSizeBasedClusteringPlanStrategy planStrategy = new
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+ ArrayList<FileSlice> fileSlices = new ArrayList<>();
+ fileSlices.add(createFileSliceWithCommitTime(300, "003"));
+ fileSlices.add(createFileSliceWithCommitTime(300, "001"));
+ fileSlices.add(createFileSliceWithCommitTime(300, "002"));
+
+ Stream<HoodieClusteringGroup> clusteringGroupStream =
+ (Stream<HoodieClusteringGroup>)
planStrategy.buildClusteringGroupsForPartition("p0", fileSlices).getLeft();
+ List<HoodieClusteringGroup> clusteringGroups =
clusteringGroupStream.collect(Collectors.toList());
+
+
Assertions.assertTrue(clusteringGroups.get(0).getSlices().get(0).getDataFilePath().contains("001"));
+
Assertions.assertTrue(clusteringGroups.get(1).getSlices().get(0).getDataFilePath().contains("002"));
+
Assertions.assertTrue(clusteringGroups.get(2).getSlices().get(0).getDataFilePath().contains("003"));
+ }
+
+ @Test
+ public void testSortingBehaviorComparisonInstantTimeVsSizeOnly() {
+ HoodieWriteConfig configEnabled = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+ .withClusteringMaxBytesInGroup(200)
+ .withClusteringTargetFileMaxBytes(1000)
+ .withClusteringPlanSmallFileLimit(1000)
+ .withFileSlicesSortBy("INSTANT_TIME,SIZE")
+ .build())
+ .build();
+
+ HoodieWriteConfig configDisabled = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+ .withClusteringMaxBytesInGroup(200)
+ .withClusteringTargetFileMaxBytes(1000)
+ .withClusteringPlanSmallFileLimit(1000)
+ .withFileSlicesSortBy("SIZE")
+ .build())
+ .build();
+
+ SparkSizeBasedClusteringPlanStrategy planStrategyEnabled = new
SparkSizeBasedClusteringPlanStrategy(table, context, configEnabled);
+ SparkSizeBasedClusteringPlanStrategy planStrategyDisabled = new
SparkSizeBasedClusteringPlanStrategy(table, context, configDisabled);
+
+ ArrayList<FileSlice> fileSlicesEnabled = new ArrayList<>();
+ ArrayList<FileSlice> fileSlicesDisabled = new ArrayList<>();
+
+ String[] commitTimes = {"001", "002", "003"};
+ long[] fileSizes = {100, 200, 150};
+
+ for (int i = 0; i < commitTimes.length; i++) {
+ fileSlicesEnabled.add(createFileSliceWithCommitTime(fileSizes[i],
commitTimes[i]));
+ fileSlicesDisabled.add(createFileSliceWithCommitTime(fileSizes[i],
commitTimes[i]));
+ }
+
+ Stream<HoodieClusteringGroup> streamEnabled =
+ (Stream<HoodieClusteringGroup>)
planStrategyEnabled.buildClusteringGroupsForPartition("p0",
fileSlicesEnabled).getLeft();
+ List<HoodieClusteringGroup> groupsEnabled =
streamEnabled.collect(Collectors.toList());
+
+ Stream<HoodieClusteringGroup> streamDisabled =
+ (Stream<HoodieClusteringGroup>)
planStrategyDisabled.buildClusteringGroupsForPartition("p0",
fileSlicesDisabled).getLeft();
+ List<HoodieClusteringGroup> groupsDisabled =
streamDisabled.collect(Collectors.toList());
+
+ Assertions.assertTrue(groupsEnabled.size() > 0);
+ Assertions.assertTrue(groupsDisabled.size() > 0);
+
+ int totalFilesEnabled = groupsEnabled.stream().mapToInt(g ->
g.getSlices().size()).sum();
+ int totalFilesDisabled = groupsDisabled.stream().mapToInt(g ->
g.getSlices().size()).sum();
+
+ Assertions.assertEquals(totalFilesEnabled, totalFilesDisabled);
+ Assertions.assertEquals(3, totalFilesEnabled);
+
+
Assertions.assertTrue(groupsEnabled.get(0).getSlices().get(0).getDataFilePath().contains("001"));
+
Assertions.assertTrue(groupsEnabled.get(1).getSlices().get(0).getDataFilePath().contains("002"));
+
Assertions.assertTrue(groupsEnabled.get(2).getSlices().get(0).getDataFilePath().contains("003"));
+
+
Assertions.assertTrue(groupsDisabled.get(0).getSlices().get(0).getDataFilePath().contains("002"));
+
Assertions.assertTrue(groupsDisabled.get(1).getSlices().get(0).getDataFilePath().contains("003"));
+
Assertions.assertTrue(groupsDisabled.get(2).getSlices().get(0).getDataFilePath().contains("001"));
+ }
+
private FileSlice createFileSlice(long baseFileSize) {
+ return createFileSliceWithCommitTime(baseFileSize, "001");
+ }
+
+ private FileSlice createFileSliceWithCommitTime(long baseFileSize, String
commitTime) {
String fileId = FSUtils.createNewFileId(FSUtils.createNewFileIdPfx(), 0);
- FileSlice fs = new FileSlice("p0", "001", fileId);
- HoodieBaseFile f = new HoodieBaseFile(fileId);
+ FileSlice fs = new FileSlice("p0", commitTime, fileId);
+ String basePath = "/test/path/" + fileId + "_" + commitTime + ".parquet";
+ HoodieBaseFile f = new HoodieBaseFile(basePath);
f.setFileSize(baseFileSize);
fs.setBaseFile(f);
return fs;