This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e7ac45735 [HUDI-3884] Support archival beyond savepoint commits 
(#5837)
6e7ac45735 is described below

commit 6e7ac457352e007939ba3c44c9dc197de7b88ed3
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Jul 26 00:12:29 2022 +0530

    [HUDI-3884] Support archival beyond savepoint commits (#5837)
    
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../apache/hudi/client/HoodieTimelineArchiver.java |  55 +++++---
 .../apache/hudi/config/HoodieArchivalConfig.java   |  21 ++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   6 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   6 +-
 .../hudi/table/action/clean/CleanPlanner.java      |   6 +-
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 144 ++++++++++++++++++---
 .../apache/hudi/common/model/HoodieFileGroup.java  |  20 +--
 .../table/timeline/HoodieDefaultTimeline.java      |  36 ++++--
 .../hudi/common/table/timeline/HoodieTimeline.java |   9 ++
 .../hudi/common/model/TestHoodieFileGroup.java     |  26 ++++
 .../table/timeline/TestHoodieActiveTimeline.java   |  51 ++++++++
 .../hudi/common/testutils/FileCreateUtils.java     |  15 +++
 .../hudi/common/testutils/HoodieTestTable.java     |  23 +++-
 .../hudi/common/testutils/MockHoodieTimeline.java  |   6 +
 14 files changed, 364 insertions(+), 60 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index c53554d8e0..2992f4abd4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -64,6 +64,7 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -76,12 +77,14 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
  * Archiver to bound the growth of files under .hoodie meta path.
@@ -409,9 +412,11 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
             
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
             .filterInflights().firstInstant();
 
-    // We cannot have any holes in the commit timeline. We cannot archive any 
commits which are
-    // made after the first savepoint present.
+    // NOTE: We cannot have any holes in the commit timeline.
+    // We cannot archive any commits which are made after the first savepoint 
present,
+    // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
     Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
+    Set<String> savepointTimestamps = table.getSavepointTimestamps();
     if (!commitTimeline.empty() && commitTimeline.countInstants() > 
maxInstantsToKeep) {
       // For Merge-On-Read table, inline or async compaction is enabled
       // We need to make sure that there are enough delta commits in the 
active timeline
@@ -428,28 +433,33 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
       // Actually do the commits
       Stream<HoodieInstant> instantToArchiveStream = 
commitTimeline.getInstants()
           .filter(s -> {
-            // if no savepoint present, then don't filter
-            return !(firstSavepoint.isPresent() && 
HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), 
LESSER_THAN_OR_EQUALS, s.getTimestamp()));
+            if (config.shouldArchiveBeyondSavepoint()) {
+              // skip savepoint commits and proceed further
+              return !savepointTimestamps.contains(s.getTimestamp());
+            } else {
+              // if no savepoint present, then don't filter
+              // stop at first savepoint commit
+              return !(firstSavepoint.isPresent() && 
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, 
s.getTimestamp()));
+            }
           }).filter(s -> {
             // Ensure commits >= oldest pending compaction commit is retained
             return oldestPendingCompactionAndReplaceInstant
-                .map(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, 
s.getTimestamp()))
+                .map(instant -> compareTimestamps(instant.getTimestamp(), 
GREATER_THAN, s.getTimestamp()))
                 .orElse(true);
           }).filter(s -> {
             // We need this to ensure that when multiple writers are 
performing conflict resolution, eligible instants don't
             // get archived, i.e, instants after the oldestInflight are 
retained on the timeline
             if (config.getFailedWritesCleanPolicy() == 
HoodieFailedWritesCleaningPolicy.LAZY) {
               return oldestInflightCommitInstant.map(instant ->
-                      HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
GREATER_THAN, s.getTimestamp()))
+                      compareTimestamps(instant.getTimestamp(), GREATER_THAN, 
s.getTimestamp()))
                   .orElse(true);
             }
             return true;
           }).filter(s ->
               oldestInstantToRetainForCompaction.map(instantToRetain ->
-                      HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
+                      compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instantToRetain.getTimestamp()))
                   .orElse(true)
           );
-
       return instantToArchiveStream.limit(commitTimeline.countInstants() - 
minInstantsToKeep);
     } else {
       return Stream.empty();
@@ -479,7 +489,7 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
           instants = Stream.empty();
         } else {
           LOG.info("Limiting archiving of instants to latest compaction on 
metadata table at " + latestCompactionTime.get());
-          instants = instants.filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN,
+          instants = instants.filter(instant -> 
compareTimestamps(instant.getTimestamp(), LESSER_THAN,
               latestCompactionTime.get()));
         }
       } catch (Exception e) {
@@ -487,18 +497,29 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
       }
     }
 
-    // If this is a metadata table, do not archive the commits that live in 
data set
-    // active timeline. This is required by metadata table,
-    // see HoodieTableMetadataUtil#processRollbackMetadata for details.
     if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
       HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
           
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
           .setConf(metaClient.getHadoopConf())
           .build();
-      Option<String> earliestActiveDatasetCommit = 
dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
-      if (earliestActiveDatasetCommit.isPresent()) {
-        instants = instants.filter(instant ->
-            HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
+      Option<HoodieInstant> earliestActiveDatasetCommit = 
dataMetaClient.getActiveTimeline().firstInstant();
+
+      if (config.shouldArchiveBeyondSavepoint()) {
+        // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
+        // So, the first non-savepoint commit in the data timeline is 
considered as beginning of the active timeline.
+        Option<HoodieInstant> firstNonSavepointCommit = 
dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
+        if (firstNonSavepointCommit.isPresent()) {
+          String firstNonSavepointCommitTime = 
firstNonSavepointCommit.get().getTimestamp();
+          instants = instants.filter(instant ->
+              compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
firstNonSavepointCommitTime));
+        }
+      } else {
+        // Do not archive the commits that live in data set active timeline.
+        // This is required by metadata table, see 
HoodieTableMetadataUtil#processRollbackMetadata for details.
+        if (earliestActiveDatasetCommit.isPresent()) {
+          instants = instants.filter(instant ->
+              compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
+        }
       }
     }
 
@@ -589,7 +610,7 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     }
 
     List<HoodieInstant> instantsToBeDeleted =
-        instants.stream().filter(instant1 -> 
HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
+        instants.stream().filter(instant1 -> 
compareTimestamps(instant1.getTimestamp(),
             LESSER_THAN_OR_EQUALS, 
thresholdInstant.getTimestamp())).collect(Collectors.toList());
 
     for (HoodieInstant deleteInstant : instantsToBeDeleted) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 32bccc3a3d..3244b42283 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 
 import javax.annotation.concurrent.Immutable;
+
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -34,8 +35,8 @@ import java.util.Properties;
  */
 @Immutable
 @ConfigClassProperty(name = "Archival Configs",
-        groupName = ConfigGroups.Names.WRITE_CLIENT,
-        description = "Configurations that control archival.")
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that control archival.")
 public class HoodieArchivalConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
@@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig {
       .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
           + " useful when storage scheme doesn't support append operation.");
 
+  public static final ConfigProperty<Boolean> ARCHIVE_BEYOND_SAVEPOINT = 
ConfigProperty
+      .key("hoodie.archive.beyond.savepoint")
+      .defaultValue(false)
+      .sinceVersion("0.12.0")
+      .withDocumentation("If enabled, archival will proceed beyond savepoint, 
skipping savepoint commits. "
+          + "If disabled, archival will stop at the earliest savepoint 
commit.");
+
   /**
    * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
    */
@@ -107,7 +115,9 @@ public class HoodieArchivalConfig extends HoodieConfig {
    */
   @Deprecated
   public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = 
COMMITS_ARCHIVAL_BATCH_SIZE.key();
-  /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
+  /**
+   * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
+   */
   @Deprecated
   private static final String DEFAULT_MAX_COMMITS_TO_KEEP = 
MAX_COMMITS_TO_KEEP.defaultValue();
   /**
@@ -186,6 +196,11 @@ public class HoodieArchivalConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
+      archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, 
String.valueOf(archiveBeyondSavepoint));
+      return this;
+    }
+
     public HoodieArchivalConfig build() {
       archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
       return archivalConfig;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f787232c50..4902c3861f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1209,7 +1209,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean getArchiveMergeEnable() {
-    return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
+    return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
+  }
+
+  public boolean shouldArchiveBeyondSavepoint() {
+    return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
   }
 
   public long getArchiveMergeSmallFileLimitBytes() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 1e68f820d9..5ca3aee764 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -368,10 +368,10 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
   }
 
   /**
-   * Get the list of savepoints in this table.
+   * Get the list of savepoint timestamps in this table.
    */
-  public List<String> getSavepoints() {
-    return 
getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+  public Set<String> getSavepointTimestamps() {
+    return 
getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
   }
 
   public HoodieActiveTimeline getActiveTimeline() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 79eef43b3c..f837d08afc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -104,7 +104,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, 
K, O> implements Ser
    * Get the list of data file names savepointed.
    */
   public Stream<String> getSavepointedDataFiles(String savepointTime) {
-    if (!hoodieTable.getSavepoints().contains(savepointTime)) {
+    if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
       throw new HoodieSavepointException(
           "Could not get data files for savepoint " + savepointTime + ". No 
such savepoint.");
     }
@@ -227,7 +227,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, 
K, O> implements Ser
         + " file versions. ");
     List<CleanFileInfo> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
-    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
+    List<String> savepointedFiles = 
hoodieTable.getSavepointTimestamps().stream()
         .flatMap(this::getSavepointedDataFiles)
         .collect(Collectors.toList());
 
@@ -295,7 +295,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, 
K, O> implements Ser
     List<CleanFileInfo> deletePaths = new ArrayList<>();
 
     // Collect all the datafiles savepointed by all the savepoints
-    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
+    List<String> savepointedFiles = 
hoodieTable.getSavepointTimestamps().stream()
         .flatMap(this::getSavepointedDataFiles)
         .collect(Collectors.toList());
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 4f41c4a44d..0af05b2d6b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.HoodieTimelineArchiver;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
@@ -44,9 +45,9 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -65,7 +66,9 @@ import org.apache.log4j.Logger;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -88,6 +91,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
+import static 
org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -180,6 +184,33 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
                                                            long size,
                                                            
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
                                                            
WriteConcurrencyMode writeConcurrencyMode) throws Exception {
+    return initTestTableAndGetWriteConfig(
+        enableMetadata,
+        minArchivalCommits,
+        maxArchivalCommits,
+        maxDeltaCommits,
+        maxDeltaCommitsMetadataTable,
+        tableType,
+        enableArchiveMerge,
+        archiveFilesBatch,
+        size,
+        failedWritesCleaningPolicy,
+        writeConcurrencyMode,
+        ARCHIVE_BEYOND_SAVEPOINT.defaultValue());
+  }
+
+  private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean 
enableMetadata,
+                                                           int 
minArchivalCommits,
+                                                           int 
maxArchivalCommits,
+                                                           int maxDeltaCommits,
+                                                           int 
maxDeltaCommitsMetadataTable,
+                                                           HoodieTableType 
tableType,
+                                                           boolean 
enableArchiveMerge,
+                                                           int 
archiveFilesBatch,
+                                                           long size,
+                                                           
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+                                                           
WriteConcurrencyMode writeConcurrencyMode,
+                                                           boolean 
archiveProceedBeyondSavepoints) throws Exception {
     init(tableType);
     HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(basePath)
         
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
@@ -188,7 +219,8 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
             .withArchiveMergeEnable(enableArchiveMerge)
             .withArchiveMergeFilesBatchSize(archiveFilesBatch)
             .withArchiveMergeSmallFileLimit(size)
-            .archiveCommitsWith(minArchivalCommits, 
maxArchivalCommits).build())
+            .archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
+            
.withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -249,6 +281,59 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws 
Exception {
+    boolean enableMetadata = false;
+    HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, 
HoodieTableType.COPY_ON_WRITE,
+        false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER, 
WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
+
+    // min archival commits is 2 and max archival commits is 4. and so, after 
5th commit, 3 commits will be archived.
+    for (int i = 1; i < 5; i++) {
+      testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : 
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+    }
+
+    // savepoint 3rd commit
+    String commitToSavepoint = String.format("%08d", 3);
+    HoodieSavepointMetadata savepointMetadata = 
testTable.doSavepoint(commitToSavepoint);
+    testTable.addSavepoint(commitToSavepoint, savepointMetadata);
+
+    for (int i = 5; i < 7; i++) {
+      testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 
2);
+    }
+    // trigger archival
+    Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+    List<HoodieInstant> originalCommits = commitsList.getKey();
+    List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+    if (archiveBeyondSavepoint) {
+      // retains only 2 commits. C3 and C8. and savepointed commit for C3.
+      verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002", "00000004", "00000005")),
+          Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", 
"00000006")).stream(), 
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
+              .collect(Collectors.toList()), commitsAfterArchival);
+    } else {
+      // archives only C1 and C2. stops at first savepointed commit C3.
+      verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
+          Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000005", "00000006")).stream(),
+                  
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
+              .collect(Collectors.toList()), commitsAfterArchival);
+    }
+
+    for (int i = 7; i < 10; i++) {
+      testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 
2);
+    }
+
+    // once savepoint is removed. C3 will be archived.
+    testTable.deleteSavepoint(commitToSavepoint);
+    commitsList = archiveAndGetCommitsList(writeConfig);
+    originalCommits = commitsList.getKey();
+    commitsAfterArchival = commitsList.getValue();
+
+    metaClient.reloadActiveTimeline();
+    verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002","00000003", "00000004", "00000005", "00000006", "00000007")),
+        getActiveCommitInstants(Arrays.asList("00000008", "00000009")), 
commitsAfterArchival);
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean 
enableArchiveMerge) throws Exception {
@@ -563,13 +648,22 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     assertEquals(originalCommits, commitsAfterArchival);
   }
 
+  private static Stream<Arguments> archiveCommitSavepointNoHoleParams() {
+    return Arrays.stream(new Boolean[][] {
+        {true, true},
+        {false, true},
+        {true, false},
+        {false, false}
+    }).map(Arguments::of);
+  }
+
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) 
throws Exception {
+  @MethodSource("archiveCommitSavepointNoHoleParams")
+  public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable, 
boolean archiveBeyondSavepoint) throws Exception {
     init();
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
         
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable("test-trip-table")
-        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
5).build())
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
5).withArchiveBeyondSavepoint(archiveBeyondSavepoint).build())
         
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
@@ -596,14 +690,30 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count 
should match");
     assertTrue(archiver.archiveIfRequired(context));
     timeline = 
metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
-    assertEquals(5, timeline.countInstants(),
-        "Since we have a savepoint at 101, we should never archive any commit 
after 101 (we only archive 100)");
-    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "101")),
-        "Archived commits should always be safe");
-    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "102")),
-        "Archived commits should always be safe");
-    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "103")),
-        "Archived commits should always be safe");
+    if (archiveBeyondSavepoint) {
+      // commits in active timeline = 101 and 105.
+      assertEquals(2, timeline.countInstants(),
+          "Since archiveBeyondSavepoint config is enabled, we will archive 
commits 102, 103 ");
+      assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "101")),
+          "Savepointed commits should always be safe");
+      assertFalse(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "102")),
+          "102 expected to be archived");
+      assertFalse(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "103")),
+          "103 expected to be archived");
+      assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "105")),
+          "104 expected to be archived");
+      assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "105")),
+          "105 expected to be in active timeline");
+    } else {
+      assertEquals(5, timeline.countInstants(),
+          "Since we have a savepoint at 101, we should never archive any 
commit after 101 (we only archive 100)");
+      assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "101")),
+          "Archived commits should always be safe");
+      assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "102")),
+          "Archived commits should always be safe");
+      assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "103")),
+          "Archived commits should always be safe");
+    }
   }
 
   @ParameterizedTest
@@ -934,7 +1044,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     HoodieInstant firstInstant = 
metaClient.reloadActiveTimeline().firstInstant().get();
     expectedArchivedInstants = expectedArchivedInstants.stream()
         .filter(entry -> 
HoodieTimeline.compareTimestamps(entry.getTimestamp(), 
HoodieTimeline.LESSER_THAN, firstInstant.getTimestamp()
-    )).collect(Collectors.toList());
+        )).collect(Collectors.toList());
     expectedArchivedInstants.forEach(entry -> 
assertTrue(metaClient.getArchivedTimeline().containsInstant(entry)));
   }
 
@@ -1283,7 +1393,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     HoodieTimeline timeline = metaClient.getActiveTimeline();
     expectedArchivedInstants.forEach(entry -> {
           // check safety
-          if (entry.getAction() != HoodieTimeline.ROLLBACK_ACTION) {
+          if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
             
assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()), 
"Archived commits should always be safe");
           }
         }
@@ -1315,6 +1425,10 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     return getActiveCommitInstants(commitTimes, HoodieTimeline.COMMIT_ACTION);
   }
 
+  private List<HoodieInstant> getActiveSavepointedCommitInstants(List<String> 
commitTimes) {
+    return getActiveCommitInstants(commitTimes, 
HoodieTimeline.SAVEPOINT_ACTION);
+  }
+
   private List<HoodieInstant> getActiveCommitInstants(List<String> 
commitTimes, String action) {
     List<HoodieInstant> allInstants = new ArrayList<>();
     commitTimes.forEach(entry -> allInstants.add(new 
HoodieInstant(State.COMPLETED, action, entry)));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index bf6ce611e8..9e407aa766 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -28,6 +28,10 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
+
 /**
  * A set of data/base files + set of log files, that make up an unit for all 
operations.
  */
@@ -118,21 +122,22 @@ public class HoodieFileGroup implements Serializable {
    * some log files, that are based off a commit or delta commit.
    */
   private boolean isFileSliceCommitted(FileSlice slice) {
-    String maxCommitTime = lastInstant.get().getTimestamp();
-    return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime())
-        && HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime);
+    if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, 
lastInstant.get().getTimestamp())) {
+      return false;
+    }
 
+    return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime());
   }
 
   /**
-   * Get all the the file slices including in-flight ones as seen in 
underlying file-system.
+   * Get all the file slices including in-flight ones as seen in underlying 
file system.
    */
   public Stream<FileSlice> getAllFileSlicesIncludingInflight() {
     return fileSlices.values().stream();
   }
 
   /**
-   * Get latest file slices including in-flight ones.
+   * Get the latest file slices including inflight ones.
    */
   public Option<FileSlice> getLatestFileSlicesIncludingInflight() {
     return 
Option.fromJavaOptional(getAllFileSlicesIncludingInflight().findFirst());
@@ -169,8 +174,7 @@ public class HoodieFileGroup implements Serializable {
    * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime.
    */
   public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime) 
{
-    return Option.fromJavaOptional(getAllFileSlices().filter(slice -> 
HoodieTimeline
-        .compareTimestamps(slice.getBaseInstantTime(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst());
+    return Option.fromJavaOptional(getAllFileSlices().filter(slice -> 
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, 
maxInstantTime)).findFirst());
   }
 
   /**
@@ -181,7 +185,7 @@ public class HoodieFileGroup implements Serializable {
    */
   public Option<FileSlice> getLatestFileSliceBefore(String maxInstantTime) {
     return Option.fromJavaOptional(getAllFileSlices().filter(
-        slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), 
HoodieTimeline.LESSER_THAN, maxInstantTime))
+        slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN, 
maxInstantTime))
         .findFirst());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index ac1dd007d0..e7970baf67 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.Collections.reverse;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
  * HoodieDefaultTimeline is a default implementation of the HoodieTimeline. It 
provides methods to inspect a
@@ -118,7 +119,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     Option<HoodieInstant> earliestPending = 
getWriteTimeline().filterInflightsAndRequested().firstInstant();
     if (earliestPending.isPresent()) {
       return getWriteTimeline().filterCompletedInstants()
-          .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, 
earliestPending.get().getTimestamp()));
+          .filter(instant -> compareTimestamps(instant.getTimestamp(), 
LESSER_THAN, earliestPending.get().getTimestamp()));
     }
     return getWriteTimeline().filterCompletedInstants();
   }
@@ -156,34 +157,34 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   @Override
   public HoodieDefaultTimeline findInstantsAfter(String instantTime, int 
numCommits) {
     return new HoodieDefaultTimeline(instants.stream()
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, instantTime)).limit(numCommits),
+        .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, 
instantTime)).limit(numCommits),
         details);
   }
 
   @Override
   public HoodieTimeline findInstantsAfter(String instantTime) {
     return new HoodieDefaultTimeline(instants.stream()
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, instantTime)), details);
+        .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, 
instantTime)), details);
   }
 
   @Override
   public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, 
int numCommits) {
     return new HoodieDefaultTimeline(instants.stream()
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, commitTime))
+        .filter(s -> compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, commitTime))
         .limit(numCommits), details);
   }
 
   @Override
   public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
     return new HoodieDefaultTimeline(instants.stream()
-            .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantTime)),
+            .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instantTime)),
             details);
   }
 
   @Override
   public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
     return new HoodieDefaultTimeline(instants.stream()
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN_OR_EQUALS, instantTime)),
+        .filter(s -> compareTimestamps(s.getTimestamp(), 
LESSER_THAN_OR_EQUALS, instantTime)),
         details);
   }
 
@@ -362,11 +363,28 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
 
   @Override
   public boolean isBeforeTimelineStarts(String instant) {
-    Option<HoodieInstant> firstCommit = firstInstant();
-    return firstCommit.isPresent()
-        && HoodieTimeline.compareTimestamps(instant, LESSER_THAN, 
firstCommit.get().getTimestamp());
+    Option<HoodieInstant> firstNonSavepointCommit = 
getFirstNonSavepointCommit();
+    return firstNonSavepointCommit.isPresent()
+        && compareTimestamps(instant, LESSER_THAN, 
firstNonSavepointCommit.get().getTimestamp());
   }
 
+  public Option<HoodieInstant> getFirstNonSavepointCommit() {
+    Option<HoodieInstant> firstCommit = firstInstant();
+    Set<String> savepointTimestamps = instants.stream()
+        .filter(entry -> 
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toSet());
+    Option<HoodieInstant> firstNonSavepointCommit = firstCommit;
+    if (!savepointTimestamps.isEmpty()) {
+      // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
+      // So, the first non-savepoint commit is considered as beginning of the 
active timeline.
+      firstNonSavepointCommit = Option.fromJavaOptional(instants.stream()
+          .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp()))
+          .findFirst());
+    }
+    return firstNonSavepointCommit;
+  }
+  
   @Override
   public Option<byte[]> getInstantDetails(HoodieInstant instant) {
     return details.apply(instant);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index c3fbd97312..e52a279596 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -305,6 +305,15 @@ public interface HoodieTimeline extends Serializable {
    */
   boolean isBeforeTimelineStarts(String ts);
 
+  /**
+   * First non-savepoint commit in the active data timeline. Examples:
+   * 1. An active data timeline C1, C2, C3, C4, C5 returns C1.
+   * 2. If archival is allowed beyond savepoint and let's say C1, C2, C4 have 
been archived
+   * while C3, C5 have been savepointed, then for the data timeline
+   * C3, C3_Savepoint, C5, C5_Savepoint, C6, C7 returns C6.
+   */
+  Option<HoodieInstant> getFirstNonSavepointCommit();
+
   /**
    * Read the completed instant details.
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
index 8ea9ad94ab..91a2019f10 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
@@ -18,10 +18,15 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.MockHoodieTimeline;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,4 +52,25 @@ public class TestHoodieFileGroup {
     
assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("001"));
     assertTrue((new 
HoodieFileGroup(fileGroup)).getLatestFileSlice().get().getBaseInstantTime().equals("001"));
   }
+
+  @Test
+  public void testCommittedFileSlicesWithSavepointAndHoles() {
+    MockHoodieTimeline activeTimeline = new MockHoodieTimeline(Stream.of(
+        new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "01"),
+        new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.SAVEPOINT_ACTION, "01"),
+        new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "03"),
+        new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.SAVEPOINT_ACTION, "03"),
+        new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT 
as well
+    ).collect(Collectors.toList()));
+    HoodieFileGroup fileGroup = new HoodieFileGroup("", "data", 
activeTimeline.filterCompletedAndCompactionInstants());
+    for (int i = 0; i < 7; i++) {
+      HoodieBaseFile baseFile = new HoodieBaseFile("data_1_0" + i);
+      fileGroup.addBaseFile(baseFile);
+    }
+    List<FileSlice> allFileSlices = 
fileGroup.getAllFileSlices().collect(Collectors.toList());
+    assertEquals(6, allFileSlices.size());
+    assertTrue(!allFileSlices.stream().anyMatch(s -> 
s.getBaseInstantTime().equals("06")));
+    assertEquals(7, fileGroup.getAllFileSlicesIncludingInflight().count());
+    
assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("05"));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 5692337471..628aeb8e80 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -262,6 +262,57 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     assertEquals(instant7.getTimestamp(), 
timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp());
   }
 
+  @Test
+  public void testTimelineWithSavepointAndHoles() {
+    timeline = new MockHoodieTimeline(Stream.of(
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, 
"01"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, 
"03"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") 
// this can be DELTA_COMMIT/REPLACE_COMMIT as well
+    ).collect(Collectors.toList()));
+    assertTrue(timeline.isBeforeTimelineStarts("00"));
+    assertTrue(timeline.isBeforeTimelineStarts("01"));
+    assertTrue(timeline.isBeforeTimelineStarts("02"));
+    assertTrue(timeline.isBeforeTimelineStarts("03"));
+    assertTrue(timeline.isBeforeTimelineStarts("04"));
+    assertFalse(timeline.isBeforeTimelineStarts("05"));
+    assertFalse(timeline.isBeforeTimelineStarts("06"));
+
+    // with an inflight savepoint in between
+    timeline = new MockHoodieTimeline(Stream.of(
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"),
+        new HoodieInstant(State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION, 
"01"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, 
"03"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05")
+    ).collect(Collectors.toList()));
+    assertTrue(timeline.isBeforeTimelineStarts("00"));
+    assertTrue(timeline.isBeforeTimelineStarts("01"));
+    assertTrue(timeline.isBeforeTimelineStarts("02"));
+    assertTrue(timeline.isBeforeTimelineStarts("03"));
+    assertTrue(timeline.isBeforeTimelineStarts("04"));
+    assertFalse(timeline.isBeforeTimelineStarts("05"));
+    assertFalse(timeline.isBeforeTimelineStarts("06"));
+
+    // with a pending replacecommit after savepoints
+    timeline = new MockHoodieTimeline(Stream.of(
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, 
"01"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, 
"03"),
+        new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05"),
+        new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "06")
+    ).collect(Collectors.toList()));
+    assertTrue(timeline.isBeforeTimelineStarts("00"));
+    assertTrue(timeline.isBeforeTimelineStarts("01"));
+    assertTrue(timeline.isBeforeTimelineStarts("02"));
+    assertTrue(timeline.isBeforeTimelineStarts("03"));
+    assertTrue(timeline.isBeforeTimelineStarts("04"));
+    assertFalse(timeline.isBeforeTimelineStarts("05"));
+    assertFalse(timeline.isBeforeTimelineStarts("06"));
+  }
+
   @Test
   public void testTimelineGetOperations() {
     List<HoodieInstant> allInstants = getAllInstants();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 290753ef52..f631ec94b0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
@@ -156,6 +158,10 @@ public class FileCreateUtils {
     }
   }
 
+  public static void createSavepointCommit(String basePath, String 
instantTime, HoodieSavepointMetadata savepointMetadata) throws IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION, 
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata).get());
+  }
+
   public static void createCommit(String basePath, String instantTime, 
FileSystem fs) throws IOException {
     createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION, fs);
   }
@@ -285,6 +291,10 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION);
   }
 
+  public static void createInflightSavepoint(String basePath, String 
instantTime) throws IOException {
+    createAuxiliaryMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION);
+  }
+
   public static void createPartitionMetaFile(String basePath, String 
partitionPath) throws IOException {
     Path parentPath = Paths.get(basePath, partitionPath);
     Files.createDirectories(parentPath);
@@ -439,4 +449,9 @@ public class FileCreateUtils {
   public static void deleteDeltaCommit(String basePath, String instantTime, 
FileSystem fs) throws IOException {
     deleteMetaFile(basePath, instantTime, 
HoodieTimeline.DELTA_COMMIT_EXTENSION, fs);
   }
+
+  public static void deleteSavepointCommit(String basePath, String 
instantTime, FileSystem fs) throws IOException {
+    deleteMetaFile(basePath, instantTime, 
HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION, fs);
+    deleteMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION, 
fs);
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 412a69c94c..1351d16812 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -99,6 +99,7 @@ import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightCom
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile;
+import static 
org.apache.hudi.common.testutils.FileCreateUtils.createInflightSavepoint;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
@@ -109,6 +110,8 @@ import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRe
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRollbackFile;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRestoreFile;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile;
+import static 
org.apache.hudi.common.testutils.FileCreateUtils.createSavepointCommit;
+import static 
org.apache.hudi.common.testutils.FileCreateUtils.deleteSavepointCommit;
 import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
 import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
 import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
@@ -199,6 +202,12 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable addSavepointCommit(String instantTime, 
HoodieSavepointMetadata savepointMetadata) throws IOException {
+    createInflightSavepoint(basePath, instantTime);
+    createSavepointCommit(basePath, instantTime, savepointMetadata);
+    return this;
+  }
+
   public HoodieCommitMetadata createCommitMetadata(WriteOperationType 
operationType, String commitTime,
                                                    HoodieTestTableState 
testTableState) {
     String actionType = getCommitActionType(operationType, 
metaClient.getTableType());
@@ -394,7 +403,7 @@ public class HoodieTestTable {
 
   public HoodieSavepointMetadata getSavepointMetadata(String instant, 
Map<String, List<String>> partitionToFilesMeta) {
     HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata();
-    savepointMetadata.setSavepointedAt(Long.valueOf(instant));
+    savepointMetadata.setSavepointedAt(12345L);
     Map<String, HoodieSavepointPartitionMetadata> partitionMetadataMap = new 
HashMap<>();
     for (Map.Entry<String, List<String>> entry : 
partitionToFilesMeta.entrySet()) {
       HoodieSavepointPartitionMetadata savepointPartitionMetadata = new 
HoodieSavepointPartitionMetadata();
@@ -404,6 +413,7 @@ public class HoodieTestTable {
     }
     savepointMetadata.setPartitionMetadata(partitionMetadataMap);
     savepointMetadata.setSavepointedBy("test");
+    savepointMetadata.setComments("test_comment");
     return savepointMetadata;
   }
 
@@ -454,6 +464,17 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable addSavepoint(String instantTime, 
HoodieSavepointMetadata savepointMetadata) throws IOException {
+    createInflightSavepoint(basePath, instantTime);
+    createSavepointCommit(basePath, instantTime, savepointMetadata);
+    return this;
+  }
+
+  public HoodieTestTable deleteSavepoint(String instantTime) throws 
IOException {
+    deleteSavepointCommit(basePath, instantTime, fs);
+    return this;
+  }
+
   public HoodieTestTable forCommit(String instantTime) {
     currentInstantTime = instantTime;
     return this;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
index 5da6b325f3..4014531809 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 
 import java.util.Comparator;
+import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -38,4 +39,9 @@ public class MockHoodieTimeline extends HoodieActiveTimeline {
             inflights.map(s -> new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, s)))
         
.sorted(Comparator.comparing(HoodieInstant::getFileName)).collect(Collectors.toList()));
   }
+
+  public MockHoodieTimeline(List<HoodieInstant> instants) {
+    super();
+    this.setInstants(instants);
+  }
 }

Reply via email to