This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6e7ac45735 [HUDI-3884] Support archival beyond savepoint commits
(#5837)
6e7ac45735 is described below
commit 6e7ac457352e007939ba3c44c9dc197de7b88ed3
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Jul 26 00:12:29 2022 +0530
[HUDI-3884] Support archival beyond savepoint commits (#5837)
Co-authored-by: sivabalan <[email protected]>
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 55 +++++---
.../apache/hudi/config/HoodieArchivalConfig.java | 21 ++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 6 +-
.../java/org/apache/hudi/table/HoodieTable.java | 6 +-
.../hudi/table/action/clean/CleanPlanner.java | 6 +-
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 144 ++++++++++++++++++---
.../apache/hudi/common/model/HoodieFileGroup.java | 20 +--
.../table/timeline/HoodieDefaultTimeline.java | 36 ++++--
.../hudi/common/table/timeline/HoodieTimeline.java | 9 ++
.../hudi/common/model/TestHoodieFileGroup.java | 26 ++++
.../table/timeline/TestHoodieActiveTimeline.java | 51 ++++++++
.../hudi/common/testutils/FileCreateUtils.java | 15 +++
.../hudi/common/testutils/HoodieTestTable.java | 23 +++-
.../hudi/common/testutils/MockHoodieTimeline.java | 6 +
14 files changed, 364 insertions(+), 60 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index c53554d8e0..2992f4abd4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -64,6 +64,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -76,12 +77,14 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* Archiver to bound the growth of files under .hoodie meta path.
@@ -409,9 +412,11 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterInflights().firstInstant();
- // We cannot have any holes in the commit timeline. We cannot archive any
commits which are
- // made after the first savepoint present.
+ // NOTE: We cannot have any holes in the commit timeline.
+ // We cannot archive any commits which are made after the first savepoint
present,
+ // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
Option<HoodieInstant> firstSavepoint =
table.getCompletedSavepointTimeline().firstInstant();
+ Set<String> savepointTimestamps = table.getSavepointTimestamps();
if (!commitTimeline.empty() && commitTimeline.countInstants() >
maxInstantsToKeep) {
// For Merge-On-Read table, inline or async compaction is enabled
// We need to make sure that there are enough delta commits in the
active timeline
@@ -428,28 +433,33 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream =
commitTimeline.getInstants()
.filter(s -> {
- // if no savepoint present, then don't filter
- return !(firstSavepoint.isPresent() &&
HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(),
LESSER_THAN_OR_EQUALS, s.getTimestamp()));
+ if (config.shouldArchiveBeyondSavepoint()) {
+ // skip savepoint commits and proceed further
+ return !savepointTimestamps.contains(s.getTimestamp());
+ } else {
+ // if no savepoint present, then don't filter
+ // stop at first savepoint commit
+ return !(firstSavepoint.isPresent() &&
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS,
s.getTimestamp()));
+ }
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
- .map(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN,
s.getTimestamp()))
+ .map(instant -> compareTimestamps(instant.getTimestamp(),
GREATER_THAN, s.getTimestamp()))
.orElse(true);
}).filter(s -> {
// We need this to ensure that when multiple writers are
performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are
retained on the timeline
if (config.getFailedWritesCleanPolicy() ==
HoodieFailedWritesCleaningPolicy.LAZY) {
return oldestInflightCommitInstant.map(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(),
GREATER_THAN, s.getTimestamp()))
+ compareTimestamps(instant.getTimestamp(), GREATER_THAN,
s.getTimestamp()))
.orElse(true);
}
return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
- HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, instantToRetain.getTimestamp()))
+ compareTimestamps(s.getTimestamp(), LESSER_THAN,
instantToRetain.getTimestamp()))
.orElse(true)
);
-
return instantToArchiveStream.limit(commitTimeline.countInstants() -
minInstantsToKeep);
} else {
return Stream.empty();
@@ -479,7 +489,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
instants = Stream.empty();
} else {
LOG.info("Limiting archiving of instants to latest compaction on
metadata table at " + latestCompactionTime.get());
- instants = instants.filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN,
+ instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), LESSER_THAN,
latestCompactionTime.get()));
}
} catch (Exception e) {
@@ -487,18 +497,29 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
}
}
- // If this is a metadata table, do not archive the commits that live in
data set
- // active timeline. This is required by metadata table,
- // see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
- Option<String> earliestActiveDatasetCommit =
dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
- if (earliestActiveDatasetCommit.isPresent()) {
- instants = instants.filter(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
+ Option<HoodieInstant> earliestActiveDatasetCommit =
dataMetaClient.getActiveTimeline().firstInstant();
+
+ if (config.shouldArchiveBeyondSavepoint()) {
+ // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
+ // So, the first non-savepoint commit in the data timeline is
considered as beginning of the active timeline.
+ Option<HoodieInstant> firstNonSavepointCommit =
dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
+ if (firstNonSavepointCommit.isPresent()) {
+ String firstNonSavepointCommitTime =
firstNonSavepointCommit.get().getTimestamp();
+ instants = instants.filter(instant ->
+ compareTimestamps(instant.getTimestamp(), LESSER_THAN,
firstNonSavepointCommitTime));
+ }
+ } else {
+ // Do not archive the commits that live in data set active timeline.
+ // This is required by metadata table, see
HoodieTableMetadataUtil#processRollbackMetadata for details.
+ if (earliestActiveDatasetCommit.isPresent()) {
+ instants = instants.filter(instant ->
+ compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
+ }
}
}
@@ -589,7 +610,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
}
List<HoodieInstant> instantsToBeDeleted =
- instants.stream().filter(instant1 ->
HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
+ instants.stream().filter(instant1 ->
compareTimestamps(instant1.getTimestamp(),
LESSER_THAN_OR_EQUALS,
thresholdInstant.getTimestamp())).collect(Collectors.toList());
for (HoodieInstant deleteInstant : instantsToBeDeleted) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 32bccc3a3d..3244b42283 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import javax.annotation.concurrent.Immutable;
+
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -34,8 +35,8 @@ import java.util.Properties;
*/
@Immutable
@ConfigClassProperty(name = "Archival Configs",
- groupName = ConfigGroups.Names.WRITE_CLIENT,
- description = "Configurations that control archival.")
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Configurations that control archival.")
public class HoodieArchivalConfig extends HoodieConfig {
public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
@@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig {
.withDocumentation("When enable, hoodie will auto merge several small
archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");
+ public static final ConfigProperty<Boolean> ARCHIVE_BEYOND_SAVEPOINT =
ConfigProperty
+ .key("hoodie.archive.beyond.savepoint")
+ .defaultValue(false)
+ .sinceVersion("0.12.0")
+ .withDocumentation("If enabled, archival will proceed beyond savepoint,
skipping savepoint commits. "
+ + "If disabled, archival will stop at the earliest savepoint
commit.");
+
/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
@@ -107,7 +115,9 @@ public class HoodieArchivalConfig extends HoodieConfig {
*/
@Deprecated
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP =
COMMITS_ARCHIVAL_BATCH_SIZE.key();
- /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
+ /**
+ * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
+ */
@Deprecated
private static final String DEFAULT_MAX_COMMITS_TO_KEEP =
MAX_COMMITS_TO_KEEP.defaultValue();
/**
@@ -186,6 +196,11 @@ public class HoodieArchivalConfig extends HoodieConfig {
return this;
}
+ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
+ archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT,
String.valueOf(archiveBeyondSavepoint));
+ return this;
+ }
+
public HoodieArchivalConfig build() {
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
return archivalConfig;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f787232c50..4902c3861f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1209,7 +1209,11 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public boolean getArchiveMergeEnable() {
- return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
+ return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
+ }
+
+ public boolean shouldArchiveBeyondSavepoint() {
+ return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}
public long getArchiveMergeSmallFileLimitBytes() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 1e68f820d9..5ca3aee764 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -368,10 +368,10 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
}
/**
- * Get the list of savepoints in this table.
+ * Get the list of savepoint timestamps in this table.
*/
- public List<String> getSavepoints() {
- return
getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ public Set<String> getSavepointTimestamps() {
+ return
getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
}
public HoodieActiveTimeline getActiveTimeline() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 79eef43b3c..f837d08afc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -104,7 +104,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I,
K, O> implements Ser
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
- if (!hoodieTable.getSavepoints().contains(savepointTime)) {
+ if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
throw new HoodieSavepointException(
"Could not get data files for savepoint " + savepointTime + ". No
such savepoint.");
}
@@ -227,7 +227,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I,
K, O> implements Ser
+ " file versions. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
- List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
+ List<String> savepointedFiles =
hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
@@ -295,7 +295,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I,
K, O> implements Ser
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
- List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
+ List<String> savepointedFiles =
hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 4f41c4a44d..0af05b2d6b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.utils.MetadataConversionUtils;
@@ -44,9 +45,9 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -65,7 +66,9 @@ import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -88,6 +91,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
+import static
org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -180,6 +184,33 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
long size,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
WriteConcurrencyMode writeConcurrencyMode) throws Exception {
+ return initTestTableAndGetWriteConfig(
+ enableMetadata,
+ minArchivalCommits,
+ maxArchivalCommits,
+ maxDeltaCommits,
+ maxDeltaCommitsMetadataTable,
+ tableType,
+ enableArchiveMerge,
+ archiveFilesBatch,
+ size,
+ failedWritesCleaningPolicy,
+ writeConcurrencyMode,
+ ARCHIVE_BEYOND_SAVEPOINT.defaultValue());
+ }
+
+ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean
enableMetadata,
+ int
minArchivalCommits,
+ int
maxArchivalCommits,
+ int maxDeltaCommits,
+ int
maxDeltaCommitsMetadataTable,
+ HoodieTableType
tableType,
+ boolean
enableArchiveMerge,
+ int
archiveFilesBatch,
+ long size,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+
WriteConcurrencyMode writeConcurrencyMode,
+ boolean
archiveProceedBeyondSavepoints) throws Exception {
init(tableType);
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
@@ -188,7 +219,8 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
.withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
.withArchiveMergeSmallFileLimit(size)
- .archiveCommitsWith(minArchivalCommits,
maxArchivalCommits).build())
+ .archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
+
.withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -249,6 +281,59 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws
Exception {
+ boolean enableMetadata = false;
+ HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2,
HoodieTableType.COPY_ON_WRITE,
+ false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER,
WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
+
+ // min archival commits is 2 and max archival commits is 4. and so, after
5th commit, 3 commits will be archived.
+ for (int i = 1; i < 5; i++) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") :
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+ }
+
+ // savepoint 3rd commit
+ String commitToSavepoint = String.format("%08d", 3);
+ HoodieSavepointMetadata savepointMetadata =
testTable.doSavepoint(commitToSavepoint);
+ testTable.addSavepoint(commitToSavepoint, savepointMetadata);
+
+ for (int i = 5; i < 7; i++) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"),
2);
+ }
+ // trigger archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+ if (archiveBeyondSavepoint) {
+ // retains only 2 commits. C3 and C8. and savepointed commit for C3.
+ verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002", "00000004", "00000005")),
+ Stream.concat(getActiveCommitInstants(Arrays.asList("00000003",
"00000006")).stream(),
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
+ .collect(Collectors.toList()), commitsAfterArchival);
+ } else {
+ // archives only C1 and C2. stops at first savepointed commit C3.
+ verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002")),
+ Stream.concat(getActiveCommitInstants(Arrays.asList("00000003",
"00000004", "00000005", "00000006")).stream(),
+
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
+ .collect(Collectors.toList()), commitsAfterArchival);
+ }
+
+ for (int i = 7; i < 10; i++) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"),
2);
+ }
+
+ // once savepoint is removed. C3 will be archived.
+ testTable.deleteSavepoint(commitToSavepoint);
+ commitsList = archiveAndGetCommitsList(writeConfig);
+ originalCommits = commitsList.getKey();
+ commitsAfterArchival = commitsList.getValue();
+
+ metaClient.reloadActiveTimeline();
+ verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002","00000003", "00000004", "00000005", "00000006", "00000007")),
+ getActiveCommitInstants(Arrays.asList("00000008", "00000009")),
commitsAfterArchival);
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean
enableArchiveMerge) throws Exception {
@@ -563,13 +648,22 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
assertEquals(originalCommits, commitsAfterArchival);
}
+ private static Stream<Arguments> archiveCommitSavepointNoHoleParams() {
+ return Arrays.stream(new Boolean[][] {
+ {true, true},
+ {false, true},
+ {true, false},
+ {false, false}
+ }).map(Arguments::of);
+ }
+
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable)
throws Exception {
+ @MethodSource("archiveCommitSavepointNoHoleParams")
+ public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable,
boolean archiveBeyondSavepoint) throws Exception {
init();
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2).forTable("test-trip-table")
-
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
5).build())
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
5).withArchiveBeyondSavepoint(archiveBeyondSavepoint).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
@@ -596,14 +690,30 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count
should match");
assertTrue(archiver.archiveIfRequired(context));
timeline =
metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
- assertEquals(5, timeline.countInstants(),
- "Since we have a savepoint at 101, we should never archive any commit
after 101 (we only archive 100)");
- assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "101")),
- "Archived commits should always be safe");
- assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "102")),
- "Archived commits should always be safe");
- assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "103")),
- "Archived commits should always be safe");
+ if (archiveBeyondSavepoint) {
+ // commits in active timeline = 101 and 105.
+ assertEquals(2, timeline.countInstants(),
+ "Since archiveBeyondSavepoint config is enabled, we will archive
commits 102, 103 ");
+ assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "101")),
+ "Savepointed commits should always be safe");
+ assertFalse(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "102")),
+ "102 expected to be archived");
+ assertFalse(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "103")),
+ "103 expected to be archived");
+ assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "105")),
+ "104 expected to be archived");
+ assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "105")),
+ "105 expected to be in active timeline");
+ } else {
+ assertEquals(5, timeline.countInstants(),
+ "Since we have a savepoint at 101, we should never archive any
commit after 101 (we only archive 100)");
+ assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "101")),
+ "Archived commits should always be safe");
+ assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "102")),
+ "Archived commits should always be safe");
+ assertTrue(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "103")),
+ "Archived commits should always be safe");
+ }
}
@ParameterizedTest
@@ -934,7 +1044,7 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
HoodieInstant firstInstant =
metaClient.reloadActiveTimeline().firstInstant().get();
expectedArchivedInstants = expectedArchivedInstants.stream()
.filter(entry ->
HoodieTimeline.compareTimestamps(entry.getTimestamp(),
HoodieTimeline.LESSER_THAN, firstInstant.getTimestamp()
- )).collect(Collectors.toList());
+ )).collect(Collectors.toList());
expectedArchivedInstants.forEach(entry ->
assertTrue(metaClient.getArchivedTimeline().containsInstant(entry)));
}
@@ -1283,7 +1393,7 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline();
expectedArchivedInstants.forEach(entry -> {
// check safety
- if (entry.getAction() != HoodieTimeline.ROLLBACK_ACTION) {
+ if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()),
"Archived commits should always be safe");
}
}
@@ -1315,6 +1425,10 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
return getActiveCommitInstants(commitTimes, HoodieTimeline.COMMIT_ACTION);
}
+ private List<HoodieInstant> getActiveSavepointedCommitInstants(List<String>
commitTimes) {
+ return getActiveCommitInstants(commitTimes,
HoodieTimeline.SAVEPOINT_ACTION);
+ }
+
private List<HoodieInstant> getActiveCommitInstants(List<String>
commitTimes, String action) {
List<HoodieInstant> allInstants = new ArrayList<>();
commitTimes.forEach(entry -> allInstants.add(new
HoodieInstant(State.COMPLETED, action, entry)));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index bf6ce611e8..9e407aa766 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -28,6 +28,10 @@ import java.util.List;
import java.util.TreeMap;
import java.util.stream.Stream;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
+
/**
* A set of data/base files + set of log files, that make up an unit for all
operations.
*/
@@ -118,21 +122,22 @@ public class HoodieFileGroup implements Serializable {
* some log files, that are based off a commit or delta commit.
*/
private boolean isFileSliceCommitted(FileSlice slice) {
- String maxCommitTime = lastInstant.get().getTimestamp();
- return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime())
- && HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime);
+ if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS,
lastInstant.get().getTimestamp())) {
+ return false;
+ }
+ return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime());
}
/**
- * Get all the the file slices including in-flight ones as seen in
underlying file-system.
+ * Get all the file slices including in-flight ones as seen in underlying
file system.
*/
public Stream<FileSlice> getAllFileSlicesIncludingInflight() {
return fileSlices.values().stream();
}
/**
- * Get latest file slices including in-flight ones.
+ * Get the latest file slices including inflight ones.
*/
public Option<FileSlice> getLatestFileSlicesIncludingInflight() {
return
Option.fromJavaOptional(getAllFileSlicesIncludingInflight().findFirst());
@@ -169,8 +174,7 @@ public class HoodieFileGroup implements Serializable {
* Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
*/
public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime)
{
- return Option.fromJavaOptional(getAllFileSlices().filter(slice ->
HoodieTimeline
- .compareTimestamps(slice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst());
+ return Option.fromJavaOptional(getAllFileSlices().filter(slice ->
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS,
maxInstantTime)).findFirst());
}
/**
@@ -181,7 +185,7 @@ public class HoodieFileGroup implements Serializable {
*/
public Option<FileSlice> getLatestFileSliceBefore(String maxInstantTime) {
return Option.fromJavaOptional(getAllFileSlices().filter(
- slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(),
HoodieTimeline.LESSER_THAN, maxInstantTime))
+ slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN,
maxInstantTime))
.findFirst());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index ac1dd007d0..e7970baf67 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.reverse;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* HoodieDefaultTimeline is a default implementation of the HoodieTimeline. It
provides methods to inspect a
@@ -118,7 +119,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
Option<HoodieInstant> earliestPending =
getWriteTimeline().filterInflightsAndRequested().firstInstant();
if (earliestPending.isPresent()) {
return getWriteTimeline().filterCompletedInstants()
- .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN,
earliestPending.get().getTimestamp()));
+ .filter(instant -> compareTimestamps(instant.getTimestamp(),
LESSER_THAN, earliestPending.get().getTimestamp()));
}
return getWriteTimeline().filterCompletedInstants();
}
@@ -156,34 +157,34 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public HoodieDefaultTimeline findInstantsAfter(String instantTime, int
numCommits) {
return new HoodieDefaultTimeline(instants.stream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, instantTime)).limit(numCommits),
+ .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN,
instantTime)).limit(numCommits),
details);
}
@Override
public HoodieTimeline findInstantsAfter(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, instantTime)), details);
+ .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN,
instantTime)), details);
}
@Override
public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime,
int numCommits) {
return new HoodieDefaultTimeline(instants.stream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, commitTime))
+ .filter(s -> compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, commitTime))
.limit(numCommits), details);
}
@Override
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, instantTime)),
+ .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN,
instantTime)),
details);
}
@Override
public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN_OR_EQUALS, instantTime)),
+ .filter(s -> compareTimestamps(s.getTimestamp(),
LESSER_THAN_OR_EQUALS, instantTime)),
details);
}
@@ -362,11 +363,28 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public boolean isBeforeTimelineStarts(String instant) {
- Option<HoodieInstant> firstCommit = firstInstant();
- return firstCommit.isPresent()
- && HoodieTimeline.compareTimestamps(instant, LESSER_THAN,
firstCommit.get().getTimestamp());
+ Option<HoodieInstant> firstNonSavepointCommit =
getFirstNonSavepointCommit();
+ return firstNonSavepointCommit.isPresent()
+ && compareTimestamps(instant, LESSER_THAN,
firstNonSavepointCommit.get().getTimestamp());
}
+ public Option<HoodieInstant> getFirstNonSavepointCommit() {
+ Option<HoodieInstant> firstCommit = firstInstant();
+ Set<String> savepointTimestamps = instants.stream()
+ .filter(entry ->
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toSet());
+ Option<HoodieInstant> firstNonSavepointCommit = firstCommit;
+ if (!savepointTimestamps.isEmpty()) {
+ // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
+ // So, the first non-savepoint commit is considered as beginning of the
active timeline.
+ firstNonSavepointCommit = Option.fromJavaOptional(instants.stream()
+ .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
+ .findFirst());
+ }
+ return firstNonSavepointCommit;
+ }
+
@Override
public Option<byte[]> getInstantDetails(HoodieInstant instant) {
return details.apply(instant);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index c3fbd97312..e52a279596 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -305,6 +305,15 @@ public interface HoodieTimeline extends Serializable {
*/
boolean isBeforeTimelineStarts(String ts);
+ /**
+ * First non-savepoint commit in the active data timeline. Examples:
+ * 1. An active data timeline C1, C2, C3, C4, C5 returns C1.
+ * 2. If archival is allowed beyond savepoint and let's say C1, C2, C4 have
been archived
+ * while C3, C5 have been savepointed, then for the data timeline
+ * C3, C3_Savepoint, C5, C5_Savepoint, C6, C7 returns C6.
+ */
+ Option<HoodieInstant> getFirstNonSavepointCommit();
+
/**
* Read the completed instant details.
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
index 8ea9ad94ab..91a2019f10 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
@@ -18,10 +18,15 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.MockHoodieTimeline;
+
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,4 +52,25 @@ public class TestHoodieFileGroup {
assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("001"));
assertTrue((new
HoodieFileGroup(fileGroup)).getLatestFileSlice().get().getBaseInstantTime().equals("001"));
}
+
+ @Test
+ public void testCommittedFileSlicesWithSavepointAndHoles() {
+ MockHoodieTimeline activeTimeline = new MockHoodieTimeline(Stream.of(
+ new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "01"),
+ new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.SAVEPOINT_ACTION, "01"),
+ new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "03"),
+ new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.SAVEPOINT_ACTION, "03"),
+ new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT
as well
+ ).collect(Collectors.toList()));
+ HoodieFileGroup fileGroup = new HoodieFileGroup("", "data",
activeTimeline.filterCompletedAndCompactionInstants());
+ for (int i = 0; i < 7; i++) {
+ HoodieBaseFile baseFile = new HoodieBaseFile("data_1_0" + i);
+ fileGroup.addBaseFile(baseFile);
+ }
+ List<FileSlice> allFileSlices =
fileGroup.getAllFileSlices().collect(Collectors.toList());
+ assertEquals(6, allFileSlices.size());
+ assertTrue(!allFileSlices.stream().anyMatch(s ->
s.getBaseInstantTime().equals("06")));
+ assertEquals(7, fileGroup.getAllFileSlicesIncludingInflight().count());
+
assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("05"));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 5692337471..628aeb8e80 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -262,6 +262,57 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
assertEquals(instant7.getTimestamp(),
timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp());
}
+ @Test
+ public void testTimelineWithSavepointAndHoles() {
+ timeline = new MockHoodieTimeline(Stream.of(
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION,
"01"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION,
"03"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05")
// this can be DELTA_COMMIT/REPLACE_COMMIT as well
+ ).collect(Collectors.toList()));
+ assertTrue(timeline.isBeforeTimelineStarts("00"));
+ assertTrue(timeline.isBeforeTimelineStarts("01"));
+ assertTrue(timeline.isBeforeTimelineStarts("02"));
+ assertTrue(timeline.isBeforeTimelineStarts("03"));
+ assertTrue(timeline.isBeforeTimelineStarts("04"));
+ assertFalse(timeline.isBeforeTimelineStarts("05"));
+ assertFalse(timeline.isBeforeTimelineStarts("06"));
+
+ // with an inflight savepoint in between
+ timeline = new MockHoodieTimeline(Stream.of(
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"),
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION,
"01"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION,
"03"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05")
+ ).collect(Collectors.toList()));
+ assertTrue(timeline.isBeforeTimelineStarts("00"));
+ assertTrue(timeline.isBeforeTimelineStarts("01"));
+ assertTrue(timeline.isBeforeTimelineStarts("02"));
+ assertTrue(timeline.isBeforeTimelineStarts("03"));
+ assertTrue(timeline.isBeforeTimelineStarts("04"));
+ assertFalse(timeline.isBeforeTimelineStarts("05"));
+ assertFalse(timeline.isBeforeTimelineStarts("06"));
+
+ // with a pending replacecommit after savepoints
+ timeline = new MockHoodieTimeline(Stream.of(
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION,
"01"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION,
"03"),
+ new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05"),
+ new HoodieInstant(State.INFLIGHT,
HoodieTimeline.REPLACE_COMMIT_ACTION, "06")
+ ).collect(Collectors.toList()));
+ assertTrue(timeline.isBeforeTimelineStarts("00"));
+ assertTrue(timeline.isBeforeTimelineStarts("01"));
+ assertTrue(timeline.isBeforeTimelineStarts("02"));
+ assertTrue(timeline.isBeforeTimelineStarts("03"));
+ assertTrue(timeline.isBeforeTimelineStarts("04"));
+ assertFalse(timeline.isBeforeTimelineStarts("05"));
+ assertFalse(timeline.isBeforeTimelineStarts("06"));
+ }
+
@Test
public void testTimelineGetOperations() {
List<HoodieInstant> allInstants = getAllInstants();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 290753ef52..f631ec94b0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
@@ -156,6 +158,10 @@ public class FileCreateUtils {
}
}
+ public static void createSavepointCommit(String basePath, String
instantTime, HoodieSavepointMetadata savepointMetadata) throws IOException {
+ createMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION,
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata).get());
+ }
+
public static void createCommit(String basePath, String instantTime,
FileSystem fs) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION, fs);
}
@@ -285,6 +291,10 @@ public class FileCreateUtils {
createMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION);
}
+ public static void createInflightSavepoint(String basePath, String
instantTime) throws IOException {
+ createAuxiliaryMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION);
+ }
+
public static void createPartitionMetaFile(String basePath, String
partitionPath) throws IOException {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
@@ -439,4 +449,9 @@ public class FileCreateUtils {
public static void deleteDeltaCommit(String basePath, String instantTime,
FileSystem fs) throws IOException {
deleteMetaFile(basePath, instantTime,
HoodieTimeline.DELTA_COMMIT_EXTENSION, fs);
}
+
+ public static void deleteSavepointCommit(String basePath, String
instantTime, FileSystem fs) throws IOException {
+ deleteMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION, fs);
+ deleteMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION,
fs);
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 412a69c94c..1351d16812 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -99,6 +99,7 @@ import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightCom
import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightSavepoint;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
@@ -109,6 +110,8 @@ import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRe
import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRollbackFile;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createRestoreFile;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createSavepointCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.deleteSavepointCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
@@ -199,6 +202,12 @@ public class HoodieTestTable {
return this;
}
+ public HoodieTestTable addSavepointCommit(String instantTime,
HoodieSavepointMetadata savepointMetadata) throws IOException {
+ createInflightSavepoint(basePath, instantTime);
+ createSavepointCommit(basePath, instantTime, savepointMetadata);
+ return this;
+ }
+
public HoodieCommitMetadata createCommitMetadata(WriteOperationType
operationType, String commitTime,
HoodieTestTableState
testTableState) {
String actionType = getCommitActionType(operationType,
metaClient.getTableType());
@@ -394,7 +403,7 @@ public class HoodieTestTable {
public HoodieSavepointMetadata getSavepointMetadata(String instant,
Map<String, List<String>> partitionToFilesMeta) {
HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata();
- savepointMetadata.setSavepointedAt(Long.valueOf(instant));
+ savepointMetadata.setSavepointedAt(12345L);
Map<String, HoodieSavepointPartitionMetadata> partitionMetadataMap = new
HashMap<>();
for (Map.Entry<String, List<String>> entry :
partitionToFilesMeta.entrySet()) {
HoodieSavepointPartitionMetadata savepointPartitionMetadata = new
HoodieSavepointPartitionMetadata();
@@ -404,6 +413,7 @@ public class HoodieTestTable {
}
savepointMetadata.setPartitionMetadata(partitionMetadataMap);
savepointMetadata.setSavepointedBy("test");
+ savepointMetadata.setComments("test_comment");
return savepointMetadata;
}
@@ -454,6 +464,17 @@ public class HoodieTestTable {
return this;
}
+ public HoodieTestTable addSavepoint(String instantTime,
HoodieSavepointMetadata savepointMetadata) throws IOException {
+ createInflightSavepoint(basePath, instantTime);
+ createSavepointCommit(basePath, instantTime, savepointMetadata);
+ return this;
+ }
+
+ public HoodieTestTable deleteSavepoint(String instantTime) throws
IOException {
+ deleteSavepointCommit(basePath, instantTime, fs);
+ return this;
+ }
+
public HoodieTestTable forCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
index 5da6b325f3..4014531809 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import java.util.Comparator;
+import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -38,4 +39,9 @@ public class MockHoodieTimeline extends HoodieActiveTimeline {
inflights.map(s -> new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, s)))
.sorted(Comparator.comparing(HoodieInstant::getFileName)).collect(Collectors.toList()));
}
+
+ public MockHoodieTimeline(List<HoodieInstant> instants) {
+ super();
+ this.setInstants(instants);
+ }
}