yihua commented on code in PR #18322:
URL: https://github.com/apache/hudi/pull/18322#discussion_r3036403910


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -153,9 +154,72 @@ public static Option<HoodieInstant> 
getEarliestCommitToRetain(
       earliestCommitToRetain = 
Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i 
-> compareTimestamps(i.requestedTime(),
           GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
     }
+
+    // Apply maxCommitsToClean cap if configured and applicable

Review Comment:
   🤖 When `previousEarliestCommitToRetain` is empty (e.g., first clean ever, or 
first clean after the table was created with cleaner disabled from the start), 
the cap won't apply and all eligible commits will be cleaned in one shot. Is 
that the intended behavior? If so, it might be worth a log message indicating 
the cap is being skipped due to missing previous clean metadata.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -175,9 +177,10 @@ private List<String> 
getPartitionPathsForCleanByCommits(Option<HoodieInstant> in
     }

Review Comment:
   🤖 Setting `lastCompletedClean` and `lastCleanMetadata` as side effects of 
`getEarliestCommitToRetain()` and then relying on them here creates an implicit 
ordering dependency. If `getPartitionPathsForCleanByCommits` is called before 
`getEarliestCommitToRetain`, these fields are empty and the fallback fetches 
the data again—correct but the coupling is fragile. Would it be cleaner to 
extract the metadata fetch into a dedicated initialization method called once?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -590,13 +590,30 @@ public Pair<Boolean, List<CleanFileInfo>> 
getDeletePaths(String partitionPath, O
    */
   public Option<HoodieInstant> getEarliestCommitToRetain() {
     if (!earliestCommitToRetain.isPresent()) {
+      // Get the previous clean's earliest commit to retain, if available
+      Option<String> previousEarliestCommitToRetain = Option.empty();
+      Option<HoodieInstant> lastClean = 
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+      if (lastClean.isPresent()) {
+        try {
+          HoodieCleanMetadata cleanMetadata = 
hoodieTable.getActiveTimeline().readCleanMetadata(lastClean.get());
+          if (cleanMetadata.getEarliestCommitToRetain() != null
+              && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) {
+            previousEarliestCommitToRetain = 
Option.of(cleanMetadata.getEarliestCommitToRetain());
+          }
+        } catch (Exception e) {
+          log.warn("Failed to read previous clean metadata, proceeding without 
capping commits to clean", e);

Review Comment:
   🤖 I think there's a middle ground here. The current fallback to uncapped 
cleaning when deserialization fails makes sense for robustness, but it might be 
worth logging at WARN level with a more explicit message like "falling back to 
uncapped clean" so operators can notice it. An error/exception seems too 
aggressive since the clean can still make progress, but silently ignoring the 
cap could surprise users who set it intentionally.



##########
hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java:
##########
@@ -56,4 +67,201 @@ void rollbackFailedWrites_CommitWithLazyPolicy() {
     
assertFalse(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.LAZY,
 HoodieActiveTimeline.COMMIT_ACTION, rollbackFunction));
     verify(rollbackFunction, never()).apply();
   }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_NoCapping() {
+    // Test scenario: 20 commits, retain 12, clean 8 commits, 
maxCommitsToClean=50
+    // Expected: No capping needed, should return the originally calculated 
earliest commit
+    HoodieTimeline timeline = createMockTimeline(20);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000005"), // previous clean's earliest commit to 
retain (commit 5)
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // With 20 commits and retain 12, earliest commit to retain should be 
commit at index 8 (20-12=8)
+    assertEquals("20000000000008", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_WithCapping() {
+    // Test scenario: 1000 commits, retain 12, would clean 988 commits, 
maxCommitsToClean=50
+    // Expected: Should cap to clean only 50 commits
+    HoodieTimeline timeline = createMockTimeline(1000);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean's earliest commit to 
retain (commit 0)
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // Should cap at 50 commits from commit 0, so earliest commit to retain 
should be commit 50
+    // (Clean commits 0-49, retain from 50 onwards)
+    assertEquals("20000000000050", result.get().requestedTime());
+
+    result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000050"), // previous clean's earliest commit to 
retain (commit 50)
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // Should cap at 50 commits from commit 50, so earliest commit to retain 
should be commit 100
+    // (Clean commits 50-99, retain from 100 onwards)
+    assertEquals("20000000000100", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_ExactBoundary() {
+    // Test scenario: Clean exactly maxCommitsToClean commits
+    HoodieTimeline timeline = createMockTimeline(100);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean at commit 0
+        88L // maxCommitsToClean (exactly the number of commits eligible: 88)
+    );
+
+    assertTrue(result.isPresent());
+    // With 100 commits and retain 12, earliest would be commit 88
+    // With previous clean at 0 and max 88 to clean, we can clean commits 
0-87, so earliest to retain is 88
+    assertEquals("20000000000088", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_NoPreviousClean() {
+    // Test scenario: No previous clean metadata available
+    HoodieTimeline timeline = createMockTimeline(100);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.empty(), // no previous clean
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // Without previous clean, capping should not apply
+    // With 100 commits and retain 12, earliest commit to retain should be 
commit 88
+    assertEquals("20000000000088", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_DefaultValue() {
+    // Test scenario: maxCommitsToClean is set to default Long.MAX_VALUE (no 
capping)
+    HoodieTimeline timeline = createMockTimeline(1000);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean at commit 0
+        Long.MAX_VALUE // no capping
+    );
+
+    assertTrue(result.isPresent());
+    // With no capping and 1000 commits retain 12, earliest should be commit 
988
+    assertEquals("20000000000988", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_KeepLatestByHours() 
{
+    // Test scenario: KEEP_LATEST_BY_HOURS policy with capping
+    // We're testing that the capping logic is invoked for 
KEEP_LATEST_BY_HOURS policy
+    // Since KEEP_LATEST_BY_HOURS may return empty for test timelines, we just 
verify no exception
+    HoodieTimeline timeline = createMockTimeline(100);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS,
+        12, // commits to retain (not used for BY_HOURS)
+        Instant.now(),
+        1, // 1 hour retained - will likely return empty for mock timestamps
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean at first commit
+        20L // maxCommitsToClean
+    );
+
+    // For KEEP_LATEST_BY_HOURS with mock timestamps, result may be empty
+    // The important thing is that the method executes without error and 
capping logic is available
+    // The actual capping behavior for KEEP_LATEST_BY_HOURS is tested in 
integration tests
+    // This unit test just verifies the code path doesn't throw exceptions
+    assertFalse(result.isPresent() && result.get().requestedTime().isEmpty());
+  }
+
+  /**

Review Comment:
   🤖 The mock sets up `getInstantsAsStream()` with 
`thenReturn(instants.stream())`, which captures a single `Stream` instance. 
Java streams can only be consumed once — if the production code (or the capping 
logic) calls `getInstantsAsStream()` on the same `completedTimeline` mock more 
than once, the second call gets an already-consumed stream, leading to silently 
empty results. Consider using `thenAnswer(inv -> instants.stream())` instead so 
a fresh stream is created on each invocation.



##########
hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java:
##########
@@ -56,4 +67,201 @@ void rollbackFailedWrites_CommitWithLazyPolicy() {
     
assertFalse(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.LAZY,
 HoodieActiveTimeline.COMMIT_ACTION, rollbackFunction));
     verify(rollbackFunction, never()).apply();
   }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_NoCapping() {
+    // Test scenario: 20 commits, retain 12, clean 8 commits, 
maxCommitsToClean=50
+    // Expected: No capping needed, should return the originally calculated 
earliest commit
+    HoodieTimeline timeline = createMockTimeline(20);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000005"), // previous clean's earliest commit to 
retain (commit 5)
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // With 20 commits and retain 12, earliest commit to retain should be 
commit at index 8 (20-12=8)
+    assertEquals("20000000000008", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_WithCapping() {
+    // Test scenario: 1000 commits, retain 12, would clean 988 commits, 
maxCommitsToClean=50
+    // Expected: Should cap to clean only 50 commits
+    HoodieTimeline timeline = createMockTimeline(1000);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean's earliest commit to 
retain (commit 0)
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // Should cap at 50 commits from commit 0, so earliest commit to retain 
should be commit 50
+    // (Clean commits 0-49, retain from 50 onwards)
+    assertEquals("20000000000050", result.get().requestedTime());
+
+    result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000050"), // previous clean's earliest commit to 
retain (commit 50)
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // Should cap at 50 commits from commit 50, so earliest commit to retain 
should be commit 100
+    // (Clean commits 50-99, retain from 100 onwards)
+    assertEquals("20000000000100", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_ExactBoundary() {
+    // Test scenario: Clean exactly maxCommitsToClean commits
+    HoodieTimeline timeline = createMockTimeline(100);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean at commit 0
+        88L // maxCommitsToClean (exactly the number of commits eligible: 88)
+    );
+
+    assertTrue(result.isPresent());
+    // With 100 commits and retain 12, earliest would be commit 88
+    // With previous clean at 0 and max 88 to clean, we can clean commits 
0-87, so earliest to retain is 88
+    assertEquals("20000000000088", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_NoPreviousClean() {
+    // Test scenario: No previous clean metadata available
+    HoodieTimeline timeline = createMockTimeline(100);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.empty(), // no previous clean
+        50L // maxCommitsToClean
+    );
+
+    assertTrue(result.isPresent());
+    // Without previous clean, capping should not apply
+    // With 100 commits and retain 12, earliest commit to retain should be 
commit 88
+    assertEquals("20000000000088", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_DefaultValue() {
+    // Test scenario: maxCommitsToClean is set to default Long.MAX_VALUE (no 
capping)
+    HoodieTimeline timeline = createMockTimeline(1000);
+
+    Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
+        timeline,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
+        12, // commits to retain
+        Instant.now(),
+        24, // hours retained
+        HoodieTimelineTimeZone.UTC,
+        Option.of("20000000000000"), // previous clean at commit 0
+        Long.MAX_VALUE // no capping
+    );
+
+    assertTrue(result.isPresent());
+    // With no capping and 1000 commits retain 12, earliest should be commit 
988
+    assertEquals("20000000000988", result.get().requestedTime());
+  }
+
+  @Test
+  void testGetEarliestCommitToRetain_WithMaxCommitsToClean_KeepLatestByHours() 
{
+    // Test scenario: KEEP_LATEST_BY_HOURS policy with capping

Review Comment:
   🤖 This assertion is a no-op: when `result` is empty, `result.isPresent()` is 
false, the `&&` short-circuits to false, and `assertFalse(false)` always 
passes. When `result` is present, `requestedTime()` is never empty for a valid 
`HoodieInstant`. This test doesn't actually verify that the capping logic works 
for `KEEP_LATEST_BY_HOURS`. Could you either construct a timeline where the 
BY_HOURS policy returns a meaningful result that would be capped, or remove 
this test if it can't be made to test the actual behavior?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -153,9 +154,72 @@ public static Option<HoodieInstant> 
getEarliestCommitToRetain(
       earliestCommitToRetain = 
Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i 
-> compareTimestamps(i.requestedTime(),
           GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
     }
+
+    // Apply maxCommitsToClean cap if configured and applicable
+    if (earliestCommitToRetain.isPresent()
+        && (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS || 
cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS)
+        && maxCommitsToClean != Long.MAX_VALUE
+        && previousEarliestCommitToRetain.isPresent()) {
+      earliestCommitToRetain = capCommitsToClean(completedCommitsTimeline, 
earliestCommitToRetain.get(),
+          previousEarliestCommitToRetain.get(), maxCommitsToClean);
+    }
+
     return earliestCommitToRetain;
   }
 
+  /**
+   * Cap the number of commits to clean based on maxCommitsToClean 
configuration.
+   * This prevents cleaning too many commits in a single clean operation, 
which can cause
+   * performance issues or timeouts when there's a large backlog of commits to 
clean.
+   *
+   * <p>Algorithm: This method compares the number of commits that would be 
cleaned (those between
+   * previousEarliestCommitToRetain and calculatedEarliestCommitToRetain) 
against maxCommitsToClean.
+   * If the number exceeds the cap, it adjusts the earliest commit to retain 
such that only
+   * maxCommitsToClean commits are cleaned in this operation. The next clean 
will continue
+   * from where this one left off.
+   *
+   * <p>Example: If there are commits [0, 1, 2, ..., 99] and:
+   * <ul>
+   *   <li>calculatedEarliestCommitToRetain = commit 88 (would clean commits 
0-87, keeping 88+)</li>
+   *   <li>previousEarliestCommitToRetain = commit 0 (last clean retained from 
commit 0 onwards)</li>
+   *   <li>maxCommitsToClean = 50</li>
+   * </ul>
+   * Then this method returns commit 50 as the new earliest to retain, meaning 
we only clean
+   * commits 0-49 in this operation. The next clean will use commit 50 as the 
previous earliest
+   * and can clean the remaining commits 50-87 (capped again if needed).
+   *
+   * @param completedCommitsTimeline Timeline of completed commits
+   * @param calculatedEarliestCommitToRetain The earliest commit to retain 
calculated by policy
+   * @param previousEarliestCommitToRetain The earliest commit to retain from 
previous clean
+   * @param maxCommitsToClean Maximum number of commits to clean in one 
operation
+   * @return Adjusted earliest commit to retain that respects the cap
+   */
+  private static Option<HoodieInstant> capCommitsToClean(HoodieTimeline 
completedCommitsTimeline,
+      HoodieInstant calculatedEarliestCommitToRetain, String 
previousEarliestCommitToRetain,
+      long maxCommitsToClean) {
+    // Get all commits before the calculated earliest commit to retain
+    HoodieTimeline commitsToClean = 
completedCommitsTimeline.findInstantsBefore(calculatedEarliestCommitToRetain.requestedTime());
+
+    // Filter to get only commits after (or equal to) the previous earliest 
commit to retain
+    List<HoodieInstant> commitsEligibleForCleaning = 
commitsToClean.getInstantsAsStream()
+        .filter(instant -> compareTimestamps(instant.requestedTime(), 
GREATER_THAN_OR_EQUALS, previousEarliestCommitToRetain))
+        .collect(Collectors.toList());

Review Comment:
   🤖 If `maxCommitsToClean` is set to 0, `commitsEligibleForCleaning.get((int) 
0)` returns `previousEarliestCommitToRetain`, meaning the earliest commit to 
retain never advances and the cleaner makes zero progress indefinitely. Could 
you add validation (either here or in the config definition) to ensure 
`maxCommitsToClean >= 1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to