kbuci commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2998503660


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java:
##########
@@ -236,6 +236,93 @@ public void testLogCompactionOnMORTable() throws Exception 
{
         storageConf, Arrays.asList(dataGen.getPartitionPaths()));
   }
 
+  /**
+   * Verify that log compaction is only scheduled when the number of delta 
commits since the last
+   * compaction or log compaction meets or exceeds the 
LogCompactionBlocksThreshold, and that
+   * the counter resets after each log compaction.
+   */
+  @Test
+  public void testLogCompactionSchedulingRespectsThreshold() throws Exception {
+    int logCompactionThreshold = 3;

Review Comment:
   Sure, removed FT and instead added a unit tests with mocks



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +367,67 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns the most recent log compaction instant from the raw active 
timeline.
+   *
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return The latest log compaction instant, or empty if no log compaction 
is present.
+   */
+  public static Option<HoodieInstant> getLastLogCompaction(final 
HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+    if (!lastLogCompactionInstantOption.isPresent()) {
+      return Option.empty();
+    }
+    String logCompactionTimestamp = 
lastLogCompactionInstantOption.get().requestedTime();
+    Option<HoodieInstant> completedInstant = Option.fromJavaOptional(
+        rawActiveTimeline
+            .findInstantsInClosedRange(logCompactionTimestamp, 
logCompactionTimestamp)
+            .getInstantsAsStream()
+            .filter(HoodieInstant::isCompleted)
+            .findFirst()
+    );
+    return completedInstant.isPresent() ? completedInstant : 
lastLogCompactionInstantOption;
+  }
+
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestCompletedLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionOption = 
getLastLogCompaction(rawActiveTimeline);

Review Comment:
   Sure updated names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to