nsivabalan commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2977749679


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns the most recent log compaction instant from the raw active 
timeline.
+   *
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return The latest log compaction instant, or empty if no log compaction 
is present.
+   */
+  public static Option<HoodieInstant> getLastLogCompaction(final 
HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(

Review Comment:
   `lastLogCompactionInstantOpt` 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -160,55 +161,69 @@ private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLastCompactio
     return Option.empty();
   }
 
+  private Option<Pair<Integer, String>> 
getLatestDeltaCommitInfoSinceLogCompaction() {
+    HoodieActiveTimeline rawActiveTimeline = 
table.getMetaClient().getTableFormat()
+        .getTimelineFactory().createActiveTimeline(table.getMetaClient(), 
false);
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompletedLogCompaction(
+            table.getActiveTimeline().getDeltaCommitTimeline(), 
rawActiveTimeline);
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().requestedTime()));
+    }
+    return Option.empty();
+  }
+
   private boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = 
getLatestDeltaCommitInfo();
-    if (!latestDeltaCommitInfoOption.isPresent()) {
+    Option<Pair<Integer, String>> latestDeltaCommitInfoSinceCompactOption = 
getLatestDeltaCommitInfoSinceCompaction();
+    if (!latestDeltaCommitInfoSinceCompactOption.isPresent()) {
       return false;
     }
-    Pair<Integer, String> latestDeltaCommitInfo = 
latestDeltaCommitInfoOption.get();
+    Pair<Integer, String> latestDeltaCommitInfoSinceCompact = 
latestDeltaCommitInfoSinceCompactOption.get();
     if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
-      return true;
+      return needLogCompact(latestDeltaCommitInfoSinceCompact);

Review Comment:
   thnx. this looks neat



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns the most recent log compaction instant from the raw active 
timeline.
+   *
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return The latest log compaction instant, or empty if no log compaction 
is present.
+   */
+  public static Option<HoodieInstant> getLastLogCompaction(final 
HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+    if (!lastLogCompactionInstantOption.isPresent()) {
+      return Option.empty();
+    }
+    String logCompactionTimestamp = 
lastLogCompactionInstantOption.get().requestedTime();
+    Option<HoodieInstant> completedInstant = Option.fromJavaOptional(
+        rawActiveTimeline
+            .findInstantsInClosedRange(logCompactionTimestamp, 
logCompactionTimestamp)
+            .getInstantsAsStream()
+            .filter(HoodieInstant::isCompleted)
+            .findFirst()
+    );
+    return completedInstant.isPresent() ? completedInstant : 
lastLogCompactionInstantOption;
+  }
+
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestCompletedLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionOption = 
getLastLogCompaction(rawActiveTimeline);

Review Comment:
   for larger names, we can omit "ion" for `Option`.
   
   `lastLogCompactionOpt`



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception 
{
         storageConf, Arrays.asList(dataGen.getPartitionPaths()));
   }
 
+  /**
+   * Verify that log compaction is only scheduled when the number of delta 
commits since the last
+   * compaction or log compaction meets or exceeds the 
LogCompactionBlocksThreshold, and that
+   * the counter resets after each log compaction.
+   */
+  @Test
+  public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+    int logCompactionThreshold = 3;
+    HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
+        .withMaxNumDeltaCommitsBeforeCompaction(1)
+        .withLogCompactionBlocksThreshold(logCompactionThreshold)
+        .build();
+    HoodieWriteConfig config = 
getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
+        
HoodieIndex.IndexType.INMEMORY).withCompactionConfig(compactionConfig).build();
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+    // Insert
+    int expectedTotalRecs = 100;
+    String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+    insertBatch(config, client, newCommitTime, "000", expectedTotalRecs,
+        SparkRDDWriteClient::insert, false, false, expectedTotalRecs, 
expectedTotalRecs,
+        1, Option.empty(), INSTANT_GENERATOR);
+
+    // Do enough upserts to trigger compaction 
(maxNumDeltaCommitsBeforeCompaction=1)
+    String prevCommitTime = newCommitTime;
+    newCommitTime = WriteClientTestUtils.createNewInstantTime();
+    expectedTotalRecs += 50;
+    updateBatch(config, client, newCommitTime, prevCommitTime,
+        Option.of(Arrays.asList(prevCommitTime)), "000", 50, 
SparkRDDWriteClient::upsert,
+        false, false, 50, expectedTotalRecs, 2, config.populateMetaFields(), 
INSTANT_GENERATOR);
+
+    // Schedule and execute compaction to establish a baseline
+    Option<String> compactionTimeStamp = 
client.scheduleCompaction(Option.empty());
+    assertTrue(compactionTimeStamp.isPresent());
+    HoodieWriteMetadata result = client.compact(compactionTimeStamp.get());
+    client.commitCompaction(compactionTimeStamp.get(), result, Option.empty());
+
+    // Do (threshold - 1) upserts -- below threshold
+    prevCommitTime = compactionTimeStamp.get();
+    for (int i = 0; i < logCompactionThreshold - 1; i++) {
+      newCommitTime = WriteClientTestUtils.createNewInstantTime();
+      expectedTotalRecs += 50;
+      updateBatch(config, client, newCommitTime, prevCommitTime,
+          Option.of(Arrays.asList(prevCommitTime)), "000", 50, 
SparkRDDWriteClient::upsert,
+          false, false, 50, expectedTotalRecs, i + 3, 
config.populateMetaFields(), INSTANT_GENERATOR);
+      prevCommitTime = newCommitTime;
+    }
+
+    // Log compaction should NOT be scheduled (below threshold)
+    Option<String> logCompactionTimeStamp = 
client.scheduleLogCompaction(Option.empty());
+    assertFalse(logCompactionTimeStamp.isPresent(),
+        "Log compaction should not be scheduled when delta commits < 
threshold");
+
+    // One more upsert to reach the threshold
+    newCommitTime = WriteClientTestUtils.createNewInstantTime();
+    expectedTotalRecs += 50;
+    updateBatch(config, client, newCommitTime, prevCommitTime,
+        Option.of(Arrays.asList(prevCommitTime)), "000", 50, 
SparkRDDWriteClient::upsert,
+        false, false, 50, expectedTotalRecs, logCompactionThreshold + 2, 
config.populateMetaFields(), INSTANT_GENERATOR);
+    prevCommitTime = newCommitTime;
+
+    // Log compaction SHOULD be scheduled now (at threshold)
+    logCompactionTimeStamp = client.scheduleLogCompaction(Option.empty());
+    assertTrue(logCompactionTimeStamp.isPresent(),
+        "Log compaction should be scheduled when delta commits >= threshold");
+    result = client.logCompact(logCompactionTimeStamp.get());
+    client.commitLogCompaction(logCompactionTimeStamp.get(), result, 
Option.empty());
+
+    // After log compaction, do (threshold - 1) upserts again -- below 
threshold
+    prevCommitTime = logCompactionTimeStamp.get();
+    int totalCommitsAfterLogCompact = logCompactionThreshold + 3;
+    for (int i = 0; i < logCompactionThreshold - 1; i++) {
+      newCommitTime = WriteClientTestUtils.createNewInstantTime();
+      expectedTotalRecs += 50;
+      updateBatch(config, client, newCommitTime, prevCommitTime,
+          Option.of(Arrays.asList(prevCommitTime)), "000", 50, 
SparkRDDWriteClient::upsert,
+          false, false, 50, expectedTotalRecs, totalCommitsAfterLogCompact + i,
+          config.populateMetaFields(), INSTANT_GENERATOR);
+      prevCommitTime = newCommitTime;
+    }
+
+    // Log compaction should NOT be scheduled again (counter reset, below 
threshold)

Review Comment:
   do we have test, where we have pending compaction and log blocks threshold 
is not met ?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns the most recent log compaction instant from the raw active 
timeline.
+   *
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return The latest log compaction instant, or empty if no log compaction 
is present.
+   */
+  public static Option<HoodieInstant> getLastLogCompaction(final 
HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+    if (!lastLogCompactionInstantOption.isPresent()) {
+      return Option.empty();
+    }
+    String logCompactionTimestamp = 
lastLogCompactionInstantOption.get().requestedTime();
+    Option<HoodieInstant> completedInstant = Option.fromJavaOptional(

Review Comment:
   `completedInstantOpt` 



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception 
{
         storageConf, Arrays.asList(dataGen.getPartitionPaths()));
   }
 
+  /**
+   * Verify that log compaction is only scheduled when the number of delta 
commits since the last
+   * compaction or log compaction meets or exceeds the 
LogCompactionBlocksThreshold, and that
+   * the counter resets after each log compaction.
+   */
+  @Test
+  public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+    int logCompactionThreshold = 3;

Review Comment:
   instead of end to end functional test, can we try mocking timeline entries 
and write tests based on that. lower test runtimes and we could cover all diff 
combinations. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to