This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f64c93ee899c fix(clustering): When inferring wether an instant is 
clustering, do not fail if replacecommit was rolled back already (by a 
concurrent writer) (#18288)
f64c93ee899c is described below

commit f64c93ee899c88c28d2d882b33389c820511bdc8
Author: Krishen <[email protected]>
AuthorDate: Wed Mar 11 12:45:33 2026 -0700

    fix(clustering): When inferring wether an instant is clustering, do not 
fail if replacecommit was rolled back already (by a concurrent writer) (#18288)
    
    If getClusteringPlan is called after the target instant is rolled back by a 
concurrent writer, a runtime exception is thrown. This causes the following 
important use cases to fail:
    
    Ingestion checking whether other replacecommits are from clustering (via 
ClusteringUtils.getAllFileGroupsInPendingClusteringPlans)
    Clustering jobs calling ClusteringUtils.getAllPendingClusteringPlans to 
find failed clustering attempts to rollback
    File system view initialization calling 
ClusteringUtils.getAllFileGroupsInPendingClusteringPlans to track file groups 
involved in pending clustering
    In all of these cases, between the time the timeline is loaded and before 
getClusteringPlan is called, the instant can be rolled back by a concurrent 
writer, causing the requested metadata file to no longer exist.
    
    Summary and Changelog
    Update ClusteringUtils.getClusteringPlan to gracefully handle the case 
where a clustering/replacecommit instant is rolled back by a concurrent writer 
between timeline load and metadata read.
    
    The method that directly reads requested replace metadata now catches both 
IOException and HoodieIOException
    When a HoodieTableMetaClient is available, the active timeline is reloaded 
on error and the instant's presence is re-checked. If the instant is no longer 
in the timeline, the error is suppressed and an empty Option is returned 
instead of throwing
    When metaClient is not available (e.g. internal timeline methods in 
BaseTimelineV1/BaseTimelineV2 that call isClusteringInstant without a 
metaClient reference), the original exception behavior is preserved
    A new overload accepting Option<HoodieTableMetaClient> is introduced to 
allow callers to opt into error recovery
    Added unit tests covering: non-existent instant, deleted requested file 
(simulated rollback), and getAllPendingClusteringPlans gracefully skipping a 
rolled-back instant
    We shouldn't have to handle the case where a replacecommit requested 
instant is deleted and later reappears, since that should only happen if:
    
    DFS APIs are behaving incorrectly (showing a file again after it is already 
deleted)
    A user misconfig or a HUDI bug caused a replacecommit instant with same 
instant time to be re-created
    
    Impact
    No public API changes. The existing 
getClusteringPlan(HoodieTableMetaClient, HoodieInstant) and 
getClusteringPlan(HoodieTimeline, HoodieInstant, InstantGenerator) signatures 
are unchanged. A new overload getClusteringPlan(HoodieTimeline, HoodieInstant, 
InstantGenerator, Option<HoodieTableMetaClient>) is added.
    
    Behavioral change: getClusteringPlan now returns Option.empty() instead of 
throwing when the instant was concurrently rolled back and metaClient is 
available for verification. This also prevents file system view initialization 
from failing when it calls getAllFileGroupsInPendingClusteringPlans during a 
concurrent rollback.
    
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../apache/hudi/common/util/ClusteringUtils.java   |  29 ++++-
 .../hudi/common/util/TestClusteringUtils.java      | 143 +++++++++++++++++++++
 2 files changed, 169 insertions(+), 3 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5ee87f5ea1e6..013d568f27fb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -246,7 +246,7 @@ public class ClusteringUtils {
    * @return option of the replace metadata if present, else empty
    */
   public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant 
pendingReplaceInstant) {
-    return getClusteringPlan(metaClient.getActiveTimeline(), 
pendingReplaceInstant, metaClient.getInstantGenerator());
+    return getClusteringPlan(metaClient.getActiveTimeline(), 
pendingReplaceInstant, metaClient.getInstantGenerator(), Option.of(metaClient));
   }
 
   /**
@@ -269,14 +269,37 @@ public class ClusteringUtils {
    * @return {@link Option} of {@link Pair} of {@link HoodieInstant} and 
{@link HoodieClusteringPlan}
    */
   public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant, 
InstantGenerator factory) {
+    return getClusteringPlan(timeline, pendingReplaceInstant, factory, 
Option.empty());
+  }
+
+  /**
+   * Get Clustering plan from timeline with optional metaClient for error 
recovery.
+   * When metaClient is provided, if reading the clustering plan fails due to 
the instant being
+   * rolled back by a concurrent writer, the timeline is reloaded and an empty 
option is returned
+   * instead of throwing an exception.
+   */
+  public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(
+      HoodieTimeline timeline, HoodieInstant pendingReplaceInstant, 
InstantGenerator factory, Option<HoodieTableMetaClient> metaClientOpt) {
     try {
       Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(timeline, pendingReplaceInstant, factory);
       if (requestedReplaceMetadata.isPresent() && 
WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType()))
 {
         return Option.of(Pair.of(pendingReplaceInstant, 
requestedReplaceMetadata.get().getClusteringPlan()));
       }
       return Option.empty();
-    } catch (IOException e) {
-      throw new HoodieIOException("Error reading clustering plan " + 
pendingReplaceInstant.requestedTime(), e);
+    } catch (IOException | HoodieIOException e) {
+      if (metaClientOpt.isPresent() && 
!metaClientOpt.get().reloadActiveTimeline().containsInstant(pendingReplaceInstant))
 {
+        log.warn("Error reading requested replace metadata {} due to it no 
longer being in the timeline. "
+            + "This could be due to the instant being rolled back by a 
concurrent writer",
+            pendingReplaceInstant, e);
+        return Option.empty();
+      }
+      final IOException ioException;
+      if (e instanceof HoodieIOException) {
+        ioException = ((HoodieIOException) e).getIOException();
+      } else {
+        ioException = (IOException) e;
+      }
+      throw new HoodieIOException("Error reading clustering plan " + 
pendingReplaceInstant.requestedTime(), ioException);
     }
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 668a6c61f113..6d580e3b5c56 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -35,6 +35,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -49,8 +52,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -231,6 +237,143 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
         "retain the first replace commit after the last complete clean ");
   }
 
+  /**
+   * Getting a clustering plan for an instant whose requested file was never 
written (e.g. the instant
+   * was rolled back before we could read it) should return empty rather than 
throw an exception.
+   */
+  @Test
+  public void testGetClusteringPlanOnNonExistentInstant() {
+    HoodieInstant requestedInstant = INSTANT_GENERATOR.createNewInstant(
+        HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, 
"001");
+    assertFalse(ClusteringUtils.getClusteringPlan(metaClient, 
requestedInstant).isPresent());
+  }
+
+  /**
+   * Simulates a concurrent rollback: a clustering plan is created, then its 
requested file is
+   * deleted before getClusteringPlan reads it. The method should return empty 
instead of throwing.
+   */
+  @Test
+  public void testGetClusteringPlanAfterRequestedFileDeleted() throws 
Exception {
+    String partitionPath = "partition1";
+    List<String> fileIds = new ArrayList<>();
+    fileIds.add(UUID.randomUUID().toString());
+    String clusterTime = "1";
+    HoodieInstant requestedInstant = 
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+    metaClient.reloadActiveTimeline();
+
+    // Verify the plan is readable before deletion
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> plan = 
ClusteringUtils.getClusteringPlan(metaClient, requestedInstant);
+    assertTrue(plan.isPresent());
+
+    // Delete the requested file to simulate a concurrent rollback
+    StoragePath instantFilePath = new StoragePath(
+        metaClient.getTimelinePath(),
+        INSTANT_FILE_NAME_GENERATOR.getFileName(requestedInstant));
+    assertTrue(metaClient.getStorage().deleteFile(instantFilePath));
+
+    // Should return empty rather than throw
+    assertFalse(ClusteringUtils.getClusteringPlan(metaClient, 
requestedInstant).isPresent());
+  }
+
+  /**
+   * When one instant among several pending clustering instants is rolled back 
concurrently,
+   * getAllPendingClusteringPlans should still return the plans for the 
remaining valid instants.
+   */
+  @Test
+  public void testGetAllPendingClusteringPlansWithRolledBackInstant() throws 
Exception {
+    String partitionPath = "partition1";
+
+    List<String> fileIds1 = new ArrayList<>();
+    fileIds1.add(UUID.randomUUID().toString());
+    createRequestedClusterInstant(partitionPath, "1", fileIds1);
+
+    List<String> fileIds2 = new ArrayList<>();
+    fileIds2.add(UUID.randomUUID().toString());
+    HoodieInstant secondInstant = createRequestedClusterInstant(partitionPath, 
"2", fileIds2);
+
+    metaClient.reloadActiveTimeline();
+
+    // Delete the second instant's requested file to simulate rollback
+    StoragePath instantFilePath = new StoragePath(
+        metaClient.getTimelinePath(),
+        INSTANT_FILE_NAME_GENERATOR.getFileName(secondInstant));
+    assertTrue(metaClient.getStorage().deleteFile(instantFilePath));
+
+    // Should still return the first plan without throwing
+    List<Pair<HoodieInstant, HoodieClusteringPlan>> plans =
+        
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+    assertEquals(1, plans.size());
+    assertEquals("1", plans.get(0).getLeft().requestedTime());
+  }
+
+  /**
+   * When the requested file is corrupted but the instant is still in the 
timeline,
+   * getClusteringPlan should rethrow the exception (genuine error, not a 
rollback).
+   */
+  @Test
+  public void testGetClusteringPlanRethrowsWhenInstantStillInTimeline() throws 
Exception {
+    String partitionPath = "partition1";
+    List<String> fileIds = new ArrayList<>();
+    fileIds.add(UUID.randomUUID().toString());
+    String clusterTime = "1";
+    HoodieInstant requestedInstant = 
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+    metaClient.reloadActiveTimeline();
+
+    // Corrupt the requested file by overwriting with invalid (non-empty) 
content
+    StoragePath instantFilePath = new StoragePath(
+        metaClient.getTimelinePath(),
+        INSTANT_FILE_NAME_GENERATOR.getFileName(requestedInstant));
+    java.io.OutputStream out = metaClient.getStorage().create(instantFilePath, 
true);
+    out.write(new byte[] {0, 1, 2, 3});
+    out.close();
+
+    assertThrows(HoodieIOException.class,
+        () -> ClusteringUtils.getClusteringPlan(metaClient, requestedInstant));
+  }
+
+  /**
+   * The 3-arg overload (without metaClient) should throw on a missing instant 
since
+   * it has no metaClient to verify rollback.
+   */
+  @Test
+  public void testGetClusteringPlanWithoutMetaClientThrowsOnMissingInstant() 
throws Exception {
+    String partitionPath = "partition1";
+    List<String> fileIds = new ArrayList<>();
+    fileIds.add(UUID.randomUUID().toString());
+    String clusterTime = "1";
+    HoodieInstant requestedInstant = 
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+    metaClient.reloadActiveTimeline();
+
+    // Delete the requested file
+    StoragePath instantFilePath = new StoragePath(
+        metaClient.getTimelinePath(),
+        INSTANT_FILE_NAME_GENERATOR.getFileName(requestedInstant));
+    assertTrue(metaClient.getStorage().deleteFile(instantFilePath));
+
+    // The 3-arg overload without metaClient should throw
+    assertThrows(HoodieIOException.class,
+        () -> ClusteringUtils.getClusteringPlan(
+            metaClient.getActiveTimeline(), requestedInstant, 
INSTANT_GENERATOR));
+  }
+
+  /**
+   * The 3-arg overload (without metaClient) should return the plan 
successfully on the happy path.
+   */
+  @Test
+  public void testGetClusteringPlanViaTimelineOverload() throws Exception {
+    String partitionPath = "partition1";
+    List<String> fileIds = new ArrayList<>();
+    fileIds.add(UUID.randomUUID().toString());
+    String clusterTime = "1";
+    HoodieInstant requestedInstant = 
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+    metaClient.reloadActiveTimeline();
+
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> plan = 
ClusteringUtils.getClusteringPlan(
+        metaClient.getActiveTimeline(), requestedInstant, INSTANT_GENERATOR);
+    assertTrue(plan.isPresent());
+    assertEquals(clusterTime, plan.get().getLeft().requestedTime());
+  }
+
   private void validateClusteringInstant(List<String> fileIds, String 
partitionPath,
                                          String expectedInstantTime, 
Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
     for (String fileId : fileIds) {

Reply via email to