This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f64c93ee899c fix(clustering): When inferring wether an instant is
clustering, do not fail if replacecommit was rolled back already (by a
concurrent writer) (#18288)
f64c93ee899c is described below
commit f64c93ee899c88c28d2d882b33389c820511bdc8
Author: Krishen <[email protected]>
AuthorDate: Wed Mar 11 12:45:33 2026 -0700
fix(clustering): When inferring wether an instant is clustering, do not
fail if replacecommit was rolled back already (by a concurrent writer) (#18288)
If getClusteringPlan is called after the target instant is rolled back by a
concurrent writer, a runtime exception is thrown. This causes the following
important use cases to fail:
Ingestion checking whether other replacecommits are from clustering (via
ClusteringUtils.getAllFileGroupsInPendingClusteringPlans)
Clustering jobs calling ClusteringUtils.getAllPendingClusteringPlans to
find failed clustering attempts to rollback
File system view initialization calling
ClusteringUtils.getAllFileGroupsInPendingClusteringPlans to track file groups
involved in pending clustering
In all of these cases, between the time the timeline is loaded and before
getClusteringPlan is called, the instant can be rolled back by a concurrent
writer, causing the requested metadata file to no longer exist.
Summary and Changelog
Update ClusteringUtils.getClusteringPlan to gracefully handle the case
where a clustering/replacecommit instant is rolled back by a concurrent writer
between timeline load and metadata read.
The method that directly reads requested replace metadata now catches both
IOException and HoodieIOException
When a HoodieTableMetaClient is available, the active timeline is reloaded
on error and the instant's presence is re-checked. If the instant is no longer
in the timeline, the error is suppressed and an empty Option is returned
instead of throwing
When metaClient is not available (e.g. internal timeline methods in
BaseTimelineV1/BaseTimelineV2 that call isClusteringInstant without a
metaClient reference), the original exception behavior is preserved
A new overload accepting Option<HoodieTableMetaClient> is introduced to
allow callers to opt into error recovery
Added unit tests covering: non-existent instant, deleted requested file
(simulated rollback), and getAllPendingClusteringPlans gracefully skipping a
rolled-back instant
We shouldn't have to handle the case where a replacecommit requested
instant is deleted and later reappears, since that should only happen if:
DFS APIs are behaving incorrectly (showing a file again after it is already
deleted)
A user misconfig or a HUDI bug caused a replacecommit instant with same
instant time to be re-created
Impact
No public API changes. The existing
getClusteringPlan(HoodieTableMetaClient, HoodieInstant) and
getClusteringPlan(HoodieTimeline, HoodieInstant, InstantGenerator) signatures
are unchanged. A new overload getClusteringPlan(HoodieTimeline, HoodieInstant,
InstantGenerator, Option<HoodieTableMetaClient>) is added.
Behavioral change: getClusteringPlan now returns Option.empty() instead of
throwing when the instant was concurrently rolled back and metaClient is
available for verification. This also prevents file system view initialization
from failing when it calls getAllFileGroupsInPendingClusteringPlans during a
concurrent rollback.
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../apache/hudi/common/util/ClusteringUtils.java | 29 ++++-
.../hudi/common/util/TestClusteringUtils.java | 143 +++++++++++++++++++++
2 files changed, 169 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5ee87f5ea1e6..013d568f27fb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -246,7 +246,7 @@ public class ClusteringUtils {
* @return option of the replace metadata if present, else empty
*/
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant
pendingReplaceInstant) {
- return getClusteringPlan(metaClient.getActiveTimeline(),
pendingReplaceInstant, metaClient.getInstantGenerator());
+ return getClusteringPlan(metaClient.getActiveTimeline(),
pendingReplaceInstant, metaClient.getInstantGenerator(), Option.of(metaClient));
}
/**
@@ -269,14 +269,37 @@ public class ClusteringUtils {
* @return {@link Option} of {@link Pair} of {@link HoodieInstant} and
{@link HoodieClusteringPlan}
*/
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant,
InstantGenerator factory) {
+ return getClusteringPlan(timeline, pendingReplaceInstant, factory,
Option.empty());
+ }
+
+ /**
+ * Get Clustering plan from timeline with optional metaClient for error
recovery.
+ * When metaClient is provided, if reading the clustering plan fails due to
the instant being
+ * rolled back by a concurrent writer, the timeline is reloaded and an empty
option is returned
+ * instead of throwing an exception.
+ */
+ public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(
+ HoodieTimeline timeline, HoodieInstant pendingReplaceInstant,
InstantGenerator factory, Option<HoodieTableMetaClient> metaClientOpt) {
try {
Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(timeline, pendingReplaceInstant, factory);
if (requestedReplaceMetadata.isPresent() &&
WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType()))
{
return Option.of(Pair.of(pendingReplaceInstant,
requestedReplaceMetadata.get().getClusteringPlan()));
}
return Option.empty();
- } catch (IOException e) {
- throw new HoodieIOException("Error reading clustering plan " +
pendingReplaceInstant.requestedTime(), e);
+ } catch (IOException | HoodieIOException e) {
+ if (metaClientOpt.isPresent() &&
!metaClientOpt.get().reloadActiveTimeline().containsInstant(pendingReplaceInstant))
{
+ log.warn("Error reading requested replace metadata {} due to it no
longer being in the timeline. "
+ + "This could be due to the instant being rolled back by a
concurrent writer",
+ pendingReplaceInstant, e);
+ return Option.empty();
+ }
+ final IOException ioException;
+ if (e instanceof HoodieIOException) {
+ ioException = ((HoodieIOException) e).getIOException();
+ } else {
+ ioException = (IOException) e;
+ }
+ throw new HoodieIOException("Error reading clustering plan " +
pendingReplaceInstant.requestedTime(), ioException);
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 668a6c61f113..6d580e3b5c56 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -35,6 +35,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -49,8 +52,11 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -231,6 +237,143 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
"retain the first replace commit after the last complete clean ");
}
+ /**
+ * Getting a clustering plan for an instant whose requested file was never
written (e.g. the instant
+ * was rolled back before we could read it) should return empty rather than
throw an exception.
+ */
+ @Test
+ public void testGetClusteringPlanOnNonExistentInstant() {
+ HoodieInstant requestedInstant = INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION,
"001");
+ assertFalse(ClusteringUtils.getClusteringPlan(metaClient,
requestedInstant).isPresent());
+ }
+
+ /**
+ * Simulates a concurrent rollback: a clustering plan is created, then its
requested file is
+ * deleted before getClusteringPlan reads it. The method should return empty
instead of throwing.
+ */
+ @Test
+ public void testGetClusteringPlanAfterRequestedFileDeleted() throws
Exception {
+ String partitionPath = "partition1";
+ List<String> fileIds = new ArrayList<>();
+ fileIds.add(UUID.randomUUID().toString());
+ String clusterTime = "1";
+ HoodieInstant requestedInstant =
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+ metaClient.reloadActiveTimeline();
+
+ // Verify the plan is readable before deletion
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> plan =
ClusteringUtils.getClusteringPlan(metaClient, requestedInstant);
+ assertTrue(plan.isPresent());
+
+ // Delete the requested file to simulate a concurrent rollback
+ StoragePath instantFilePath = new StoragePath(
+ metaClient.getTimelinePath(),
+ INSTANT_FILE_NAME_GENERATOR.getFileName(requestedInstant));
+ assertTrue(metaClient.getStorage().deleteFile(instantFilePath));
+
+ // Should return empty rather than throw
+ assertFalse(ClusteringUtils.getClusteringPlan(metaClient,
requestedInstant).isPresent());
+ }
+
+ /**
+ * When one instant among several pending clustering instants is rolled back
concurrently,
+ * getAllPendingClusteringPlans should still return the plans for the
remaining valid instants.
+ */
+ @Test
+ public void testGetAllPendingClusteringPlansWithRolledBackInstant() throws
Exception {
+ String partitionPath = "partition1";
+
+ List<String> fileIds1 = new ArrayList<>();
+ fileIds1.add(UUID.randomUUID().toString());
+ createRequestedClusterInstant(partitionPath, "1", fileIds1);
+
+ List<String> fileIds2 = new ArrayList<>();
+ fileIds2.add(UUID.randomUUID().toString());
+ HoodieInstant secondInstant = createRequestedClusterInstant(partitionPath,
"2", fileIds2);
+
+ metaClient.reloadActiveTimeline();
+
+ // Delete the second instant's requested file to simulate rollback
+ StoragePath instantFilePath = new StoragePath(
+ metaClient.getTimelinePath(),
+ INSTANT_FILE_NAME_GENERATOR.getFileName(secondInstant));
+ assertTrue(metaClient.getStorage().deleteFile(instantFilePath));
+
+ // Should still return the first plan without throwing
+ List<Pair<HoodieInstant, HoodieClusteringPlan>> plans =
+
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+ assertEquals(1, plans.size());
+ assertEquals("1", plans.get(0).getLeft().requestedTime());
+ }
+
+ /**
+ * When the requested file is corrupted but the instant is still in the
timeline,
+ * getClusteringPlan should rethrow the exception (genuine error, not a
rollback).
+ */
+ @Test
+ public void testGetClusteringPlanRethrowsWhenInstantStillInTimeline() throws
Exception {
+ String partitionPath = "partition1";
+ List<String> fileIds = new ArrayList<>();
+ fileIds.add(UUID.randomUUID().toString());
+ String clusterTime = "1";
+ HoodieInstant requestedInstant =
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+ metaClient.reloadActiveTimeline();
+
+ // Corrupt the requested file by overwriting with invalid (non-empty)
content
+ StoragePath instantFilePath = new StoragePath(
+ metaClient.getTimelinePath(),
+ INSTANT_FILE_NAME_GENERATOR.getFileName(requestedInstant));
+ java.io.OutputStream out = metaClient.getStorage().create(instantFilePath,
true);
+ out.write(new byte[] {0, 1, 2, 3});
+ out.close();
+
+ assertThrows(HoodieIOException.class,
+ () -> ClusteringUtils.getClusteringPlan(metaClient, requestedInstant));
+ }
+
+ /**
+ * The 3-arg overload (without metaClient) should throw on a missing instant
since
+ * it has no metaClient to verify rollback.
+ */
+ @Test
+ public void testGetClusteringPlanWithoutMetaClientThrowsOnMissingInstant()
throws Exception {
+ String partitionPath = "partition1";
+ List<String> fileIds = new ArrayList<>();
+ fileIds.add(UUID.randomUUID().toString());
+ String clusterTime = "1";
+ HoodieInstant requestedInstant =
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+ metaClient.reloadActiveTimeline();
+
+ // Delete the requested file
+ StoragePath instantFilePath = new StoragePath(
+ metaClient.getTimelinePath(),
+ INSTANT_FILE_NAME_GENERATOR.getFileName(requestedInstant));
+ assertTrue(metaClient.getStorage().deleteFile(instantFilePath));
+
+ // The 3-arg overload without metaClient should throw
+ assertThrows(HoodieIOException.class,
+ () -> ClusteringUtils.getClusteringPlan(
+ metaClient.getActiveTimeline(), requestedInstant,
INSTANT_GENERATOR));
+ }
+
+ /**
+ * The 3-arg overload (without metaClient) should return the plan
successfully on the happy path.
+ */
+ @Test
+ public void testGetClusteringPlanViaTimelineOverload() throws Exception {
+ String partitionPath = "partition1";
+ List<String> fileIds = new ArrayList<>();
+ fileIds.add(UUID.randomUUID().toString());
+ String clusterTime = "1";
+ HoodieInstant requestedInstant =
createRequestedClusterInstant(partitionPath, clusterTime, fileIds);
+ metaClient.reloadActiveTimeline();
+
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> plan =
ClusteringUtils.getClusteringPlan(
+ metaClient.getActiveTimeline(), requestedInstant, INSTANT_GENERATOR);
+ assertTrue(plan.isPresent());
+ assertEquals(clusterTime, plan.get().getLeft().requestedTime());
+ }
+
private void validateClusteringInstant(List<String> fileIds, String
partitionPath,
String expectedInstantTime,
Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
for (String fileId : fileIds) {