nsivabalan commented on a change in pull request #4974:
URL: https://github.com/apache/hudi/pull/4974#discussion_r822159191



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
##########
@@ -395,6 +399,18 @@ public void mergeArchiveFiles(List<FileStatus> 
compactCandidate) throws IOExcept
     // made after the first savepoint present.
     Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
     if (!commitTimeline.empty() && commitTimeline.countInstants() > 
maxInstantsToKeep) {
+      // For Merge-On-Read table, inline or async compaction is enabled
+      // We need to make sure that there are enough delta commits in the 
active timeline
+      // to trigger compaction scheduling, when the trigger strategy of 
compaction is
+      // NUM_COMMITS or NUM_AND_TIME.
+      Option<HoodieInstant> oldestInstantToKeepForCompaction =

Review comment:
       minor: oldestInstantToRetain

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -946,6 +961,152 @@ public void 
testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean 
enableMetadata) throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
+        enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
+
+    // When max archival commits is set to 4, even after 8 delta commits, 
since the number of delta
+    // commits is still smaller than 8, the archival should not kick in.
+    // The archival should only kick in after the 9th delta commit
+    // instant "00000001" to "00000009"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i 
== 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), 
Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      if (i <= 8) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else {
+        assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+        assertFalse(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+        IntStream.range(2, 10).forEach(j ->
+            assertTrue(commitsAfterArchival.contains(
+                new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+      }
+    }
+
+    testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
+
+    // instant "00000011" to "00000019"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i 
== 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), 
Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      // first 9 delta commits before the completed compaction should be 
archived
+      IntStream.range(1, 10).forEach(j ->

Review comment:
       guess we are doing this assertion for all iterations of 11 to 19. we can 
probably do just once at 11. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
##########
@@ -128,27 +129,25 @@ private HoodieCompactionPlan scheduleCompaction() {
     return new HoodieCompactionPlan();
   }
 
-  private Pair<Integer, String> getLatestDeltaCommitInfo() {
-    Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
-        .filterCompletedInstants().lastInstant();
-    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
-
-    String latestInstantTs;
-    final int deltaCommitsSinceLastCompaction;
-    if (lastCompaction.isPresent()) {
-      latestInstantTs = lastCompaction.get().getTimestamp();
-      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(latestInstantTs, 
Integer.MAX_VALUE).countInstants();
-    } else {
-      latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
-      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(latestInstantTs, 
Integer.MAX_VALUE).countInstants();
+  private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        
CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().getTimestamp()));
     }
-    return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
+    return Option.empty();
   }
 
   private boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo();
+    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfo();
+    if (!latestDeltaCommitInfoOption.isPresent()) {
+      return false;

Review comment:
       is this referring to an empty table where in there is no delta commit 
only?
   
   does this also refer to scenarios, where there is no delta commits after the 
last compaction ? 
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
##########
@@ -195,10 +196,76 @@ public static HoodieCompactionPlan 
getCompactionPlan(HoodieTableMetaClient metaC
 
   /**
    * Return all pending compaction instant times.
-   * 
+   *
    * @return
    */
   public static List<HoodieInstant> 
getPendingCompactionInstantTimes(HoodieTableMetaClient metaClient) {
     return 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
   }
+
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * compaction commit, the completed compaction commit instant), if the 
latest completed
+   * compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed compaction 
commit.
+   *
+   * @param activeTimeline Active timeline of a table.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestCompaction(
+      HoodieActiveTimeline activeTimeline) {
+    Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline()
+        .filterCompletedInstants().lastInstant();
+    HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline();
+
+    HoodieInstant latestInstant;
+    if (lastCompaction.isPresent()) {
+      latestInstant = lastCompaction.get();
+      // timeline containing the delta commits after the latest completed 
compaction commit,
+      // and the completed compaction commit instant
+      return Option.of(Pair.of(deltaCommits.findInstantsAfter(
+          latestInstant.getTimestamp(), Integer.MAX_VALUE), 
lastCompaction.get()));
+    } else {
+      if (deltaCommits.countInstants() > 0) {
+        latestInstant = deltaCommits.firstInstant().get();
+        // timeline containing all the delta commits, and the first delta 
commit instant
+        return Option.of(Pair.of(deltaCommits.findInstantsAfterOrEquals(
+            latestInstant.getTimestamp(), Integer.MAX_VALUE), latestInstant));
+      } else {
+        return Option.empty();
+      }
+    }
+  }
+
+  /**
+   * Gets the oldest instant to keep for MOR compaction.
+   * If there is no completed compaction,
+   * num delta commits >= "hoodie.compact.inline.max.delta.commits"
+   * If there is a completed compaction,
+   * num delta commits after latest completed compaction >= 
"hoodie.compact.inline.max.delta.commits"
+   *
+   * @param activeTimeline  Active timeline of a table.
+   * @param maxDeltaCommits Maximum number of delta commits that trigger the 
compaction plan,
+   *                        i.e., "hoodie.compact.inline.max.delta.commits".
+   * @return the oldest instant to keep for MOR compaction.
+   */
+  public static Option<HoodieInstant> getOldestInstantToKeepForCompaction(
+      HoodieActiveTimeline activeTimeline, int maxDeltaCommits) {

Review comment:
       should we not name this as getOldestInstantToKeepFor**Archival** ?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -946,6 +961,152 @@ public void 
testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean 
enableMetadata) throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
+        enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
+
+    // When max archival commits is set to 4, even after 8 delta commits, 
since the number of delta
+    // commits is still smaller than 8, the archival should not kick in.
+    // The archival should only kick in after the 9th delta commit
+    // instant "00000001" to "00000009"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i 
== 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), 
Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      if (i <= 8) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else {
+        assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+        assertFalse(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+        IntStream.range(2, 10).forEach(j ->
+            assertTrue(commitsAfterArchival.contains(
+                new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+      }
+    }
+
+    testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
+
+    // instant "00000011" to "00000019"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i 
== 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), 
Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      // first 9 delta commits before the completed compaction should be 
archived
+      IntStream.range(1, 10).forEach(j ->
+          assertFalse(commitsAfterArchival.contains(
+              new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+
+      if (i == 1) {
+        assertEquals(8, originalCommits.size() - commitsAfterArchival.size());
+        // instant from "00000011" should be in the active timeline
+        assertTrue(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000010")));
+        assertTrue(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "00000011")));
+      } else if (i < 8) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else {
+        assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+        assertFalse(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"00000010")));
+        // i == 8 -> ["00000011", "00000018"] should be in the active timeline

Review comment:
       is this comment valid ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to