kbuci commented on code in PR #18306:
URL: https://github.com/apache/hudi/pull/18306#discussion_r2956942797


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -363,6 +368,59 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLa
     }
   }
 
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest 
completed
+   * log compaction delta commit, the completed log compaction commit 
instant), if the latest completed
+   * log compaction commit is present; a pair of (timeline containing all the 
delta commits,
+   * the first delta commit instant), if there is no completed log compaction 
commit.
+   *
+   * @param deltaCommitTimeline Active timeline of table that contains only 
delta commits.
+   * @param rawActiveTimeline Active timeline of table, that has current and 
previous states of each instant.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> 
getDeltaCommitsSinceLatestLogCompaction(
+      final HoodieTimeline deltaCommitTimeline,
+      final HoodieActiveTimeline rawActiveTimeline) {
+    Option<HoodieInstant> lastLogCompactionInstantOption = 
Option.fromJavaOptional(
+        rawActiveTimeline
+            .filterPendingLogCompactionTimeline()
+            .getReverseOrderedInstants()
+            .findFirst()
+    );
+
+    if (lastLogCompactionInstantOption.isPresent()) {
+      // Search for a corresponding completed delta commit for the latest log 
compact instant observed.
+      // If a delta commit is found, then that means the last compact instant 
was completed.
+      // Otherwise return empty, since that means there is a pending log 
compaction that has not
+      // been executed yet. The latter scenario should not happen in practice, 
as log compaction
+      // scheduling is only done after previous log compact pending instants 
have been executed or
+      // rolled back.
+      String lastLogCompactionTimestamp = 
lastLogCompactionInstantOption.get().requestedTime();
+      Option<HoodieInstant> lastCompletedLogCompactionInstantOption = 
Option.fromJavaOptional(
+          deltaCommitTimeline
+              .filterCompletedInstants()
+              .filter(hoodieInstant -> 
hoodieInstant.requestedTime().equals(lastLogCompactionTimestamp))
+              .getInstantsAsStream()
+              .findFirst()
+      );
+      if (lastCompletedLogCompactionInstantOption.isPresent()) {
+        HoodieInstant lastCompletedLogCompactionInstant = 
lastCompletedLogCompactionInstantOption.get();
+        return Option.of(Pair.of(deltaCommitTimeline.findInstantsAfter(
+            lastCompletedLogCompactionInstant.requestedTime(), 
Integer.MAX_VALUE), lastCompletedLogCompactionInstant));
+      } else {
+        LOG.info("Last log compaction instant {}, is in pending state so 
returning empty value.", lastLogCompactionTimestamp);

Review Comment:
   This should only happen if a logcompact attempt was attempted to be 
scheduled while a pending log compact still exists in timeline (from prior 
failed attempt or if table service platform is still working on it). I kept 
this here to help debug, in case user is confused why their job isn't 
scheduling a new log compact. If you think it might be too noisy thought I can 
remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to