This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 26b324f267e5 fix(concurrency): detect rollback conflicts with ongoing 
commit operations (#18089)
26b324f267e5 is described below

commit 26b324f267e507399bb5a0d8157a450ccb797d1a
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Mar 19 19:04:45 2026 -0700

    fix(concurrency): detect rollback conflicts with ongoing commit operations 
(#18089)
---
 .../client/transaction/ConcurrentOperation.java    |  28 +++++-
 .../PreferWriterConflictResolutionStrategy.java    |  15 ++-
 ...urrentFileWritesConflictResolutionStrategy.java |  40 +++++++-
 .../TestConflictResolutionStrategyUtil.java        |  19 ++++
 ...TestPreferWriterConflictResolutionStrategy.java | 106 +++++++++++++++++++++
 5 files changed, 204 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 177119a789ab..80c7df2aca3e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.transaction;
 
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieMetadataWrapper;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -47,6 +48,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_AC
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static 
org.apache.hudi.common.util.CommitUtils.getPartitionAndFileIdWithoutSuffixFromSpecificRecord;
 
 /**
@@ -68,8 +70,11 @@ public class ConcurrentOperation {
   private final String actionType;
   @ToString.Include
   private final String instantTime;
+  private final HoodieTableMetaClient metaClient;
   @Getter
   private Set<Pair<String, String>> mutatedPartitionAndFileIds = 
Collections.emptySet();
+  @Getter
+  private String rolledbackCommit;
 
   public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient 
metaClient) throws IOException {
     // Replace inflight compaction and clustering to requested since inflight 
does not contain the plan.
@@ -82,6 +87,7 @@ public class ConcurrentOperation {
     this.actionState = instant.getState().name();
     this.actionType = instant.getAction();
     this.instantTime = instant.requestedTime();
+    this.metaClient = metaClient;  // used only by the other concurrent 
operation (which reads from timeline)
     init(instant);
   }
 
@@ -91,7 +97,13 @@ public class ConcurrentOperation {
     this.actionState = instant.getState().name();
     this.actionType = instant.getAction();
     this.instantTime = instant.requestedTime();
-    init(instant);
+    this.metaClient = null;  // used only by the other concurrent operation 
(which reads from timeline)
+    try {
+      init(instant);
+    } catch (IOException e) {
+      // This should never happen since we are initializing with commit 
metadata
+      throw new RuntimeException("Failed to initialize ConcurrentOperation for 
instant: " + instant, e);
+    }
   }
 
   public String getInstantActionState() {
@@ -106,7 +118,7 @@ public class ConcurrentOperation {
     return instantTime;
   }
 
-  private void init(HoodieInstant instant) {
+  private void init(HoodieInstant instant) throws IOException {
     if (this.metadataWrapper.isAvroMetadata()) {
       switch (getInstantActionType()) {
         case COMPACTION_ACTION:
@@ -128,6 +140,18 @@ public class ConcurrentOperation {
             this.operationType = 
WriteOperationType.fromValue(avroCommitMeta.getOperationType());
           }
           break;
+        case ROLLBACK_ACTION:
+          this.operationType = WriteOperationType.UNKNOWN;
+          if (!instant.isCompleted()) {
+            // requested rollback instants have rollback plan in the details; 
(inflight rollback is empty).
+            // irrespective of requested/inflight, always read rollback plan.
+            if (this.metaClient != null) {
+              HoodieInstant requested = 
metaClient.getInstantGenerator().getRollbackRequestedInstant(instant);
+              HoodieRollbackPlan rollbackPlan = 
metaClient.getActiveTimeline().readRollbackPlan(requested);
+              this.rolledbackCommit = 
rollbackPlan.getInstantToRollback().getCommitTime();
+            }
+          }
+          break;
         case REPLACE_COMMIT_ACTION:
         case CLUSTERING_ACTION:
           if (instant.isCompleted()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 2c3942f7169e..36a6d5a81ef8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -72,12 +72,25 @@ public class PreferWriterConflictResolutionStrategy
         && writeConfigOpt.isPresent() && 
writeConfigOpt.get().isClusteringBlockForPendingIngestion();
 
     if (isCurrentOperationClustering || 
COMPACTION_ACTION.equals(currentInstant.getAction())) {
+      // Table service rollbacks are done by table service jobs/writers only, 
not by ingestion threads,
+      // so rollback conflict detection is not needed for table services.
       return getCandidateInstantsForTableServicesCommits(activeTimeline, 
currentInstant, isCurrentOperationClustering, metaClient, writeConfigOpt);
     } else {
-      return getCandidateInstantsForNonTableServicesCommits(activeTimeline, 
currentInstant);
+      return 
Stream.concat(getCandidateInstantsForNonTableServicesCommits(activeTimeline, 
currentInstant),
+          getCandidateInstantsForRollbackConflict(activeTimeline, 
currentInstant));
     }
   }
 
+  private Stream<HoodieInstant> 
getCandidateInstantsForRollbackConflict(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant) {
+    // Add Requested rollback action instants that were created after the 
current instant.
+    List<HoodieInstant> pendingRollbacks = activeTimeline
+        .findInstantsAfter(currentInstant.requestedTime())
+        .filterPendingRollbackTimeline()
+        .getInstantsAsStream().collect(Collectors.toList());
+    log.info(String.format("Rollback instants that may have conflict with %s 
are %s", currentInstant, pendingRollbacks));
+    return pendingRollbacks.stream();
+  }
+
   private Stream<HoodieInstant> 
getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant) {
 
     // To find out which instants are conflicting, we apply the following logic
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index 9445499ed8a9..e2eaa5310303 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
@@ -130,6 +131,11 @@ public class 
SimpleConcurrentFileWritesConflictResolutionStrategy
 
   @Override
   public boolean hasConflict(ConcurrentOperation thisOperation, 
ConcurrentOperation otherOperation) {
+    // Check for rollback conflicts first
+    if (isRollbackConflict(thisOperation, otherOperation)) {
+      return true;
+    }
+
     // TODO : UUID's can clash even for insert/insert, handle that case.
     Set<Pair<String, String>> partitionAndFileIdsSetForFirstInstant = 
thisOperation.getMutatedPartitionAndFileIds();
     Set<Pair<String, String>> partitionAndFileIdsSetForSecondInstant = 
otherOperation.getMutatedPartitionAndFileIds();
@@ -143,6 +149,38 @@ public class 
SimpleConcurrentFileWritesConflictResolutionStrategy
     return false;
   }
 
+  /**
+   * Check whether there is a rollback operation in progress that tries to 
rollback the commit created by this
+   * operation.
+   *
+   * @param thisOperation first concurrent commit operation
+   * @param otherOperation concurrent rollback operation
+   * @return true if there is a rollback conflict, false otherwise
+   */
+  private boolean isRollbackConflict(ConcurrentOperation thisOperation, 
ConcurrentOperation otherOperation) {
+    // Check if otherOperation is rollback
+    if (isRollbackOperation(otherOperation)) {
+      String rolledbackCommit = otherOperation.getRolledbackCommit();
+      String thisCommitTimestamp = thisOperation.getInstantTimestamp();
+      if (rolledbackCommit != null && 
rolledbackCommit.equals(thisCommitTimestamp)) {
+        log.error("Found rollback conflict: rollback operation " + 
otherOperation
+            + " is rolling back commit " + thisCommitTimestamp + " created by 
operation " + thisOperation);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check if the given operation is a rollback operation.
+   *
+   * @param operation concurrent operation to check
+   * @return true if it's a rollback operation, false otherwise
+   */
+  private boolean isRollbackOperation(ConcurrentOperation operation) {
+    return ROLLBACK_ACTION.equals(operation.getInstantActionType());
+  }
+
   @Override
   public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
       ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
@@ -163,7 +201,7 @@ public class 
SimpleConcurrentFileWritesConflictResolutionStrategy
       // Conflict arises only if the log compaction commit has a lesser 
timestamp compared to compaction commit.
       return thisOperation.getCommitMetadataOption();
     }
-    // just abort the current write if conflicts are found
+    // just abort the current write if conflicts are found (failed for 
rollback conflicts).
     throw new HoodieWriteConflictException(new 
ConcurrentModificationException("Cannot resolve conflicts for overlapping 
writes between first operation = " + thisOperation
         + ", second operation = " + otherOperation));
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index ea7ddac7aa4b..fd6237b7e9ca 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -22,7 +22,9 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -296,4 +298,21 @@ public class TestConflictResolutionStrategyUtil {
     replaceMetadata.setOperationType(writeOperationType);
     return replaceMetadata;
   }
+
+  public static void createRollbackRequested(String rollbackInstantTime, 
String commitToRollback, HoodieTableMetaClient metaClient) throws Exception {
+    // Create a rollback plan that targets the specified commit
+    HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
+    rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitToRollback, 
"commit"));
+    rollbackPlan.setVersion(TimelineLayoutVersion.CURR_VERSION);
+
+    HoodieTestTable.of(metaClient).addRequestedRollback(rollbackInstantTime, 
rollbackPlan);
+  }
+
+  public static void createRollbackInflight(String rollbackInstantTime, String 
commitToRollback, HoodieTableMetaClient metaClient) throws Exception {
+    // First create the requested rollback, then transition to inflight
+    createRollbackRequested(rollbackInstantTime, commitToRollback, metaClient);
+
+    // Create the inflight rollback file
+    HoodieTestTable.of(metaClient).addInflightRollback(rollbackInstantTime);
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index f59490c77aef..99a39c79d805 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -25,6 +25,7 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -35,6 +36,8 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.List;
@@ -49,6 +52,8 @@ import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterInflight;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterRequested;
+import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackInflight;
+import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackRequested;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 
 public class TestPreferWriterConflictResolutionStrategy extends 
HoodieCommonTestHarness {
@@ -560,4 +565,105 @@ public class TestPreferWriterConflictResolutionStrategy 
extends HoodieCommonTest
     Assertions.assertEquals(currentWriterInstant, 
candidateInstants.get(0).requestedTime());
   }
 
+  /**
+   * Positive testcase, ensures that conflict is flagged for an on-going 
rollback that is targetting the inflight commit.
+   * @param rollbackRequestedOnly - if true, cretes .rollback.requested only, 
otherwise creates .rollback.inflight
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testConcurrentRollbackAndCommitConflict(boolean 
rollbackRequestedOnly) throws Exception {
+    // Create a base commit that the rollback will target
+    String targetCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommit(targetCommitTime, metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+
+    // Consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // Start a new commit (inflight ingestion commit)
+    String inflightCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createInflightCommit(inflightCommitTime, metaClient);
+
+    // Start a rollback operation targeting the same commit timestamp as the 
inflight commit
+    String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime();
+    if (rollbackRequestedOnly) {
+      createRollbackRequested(rollbackInstantTime, inflightCommitTime, 
metaClient);
+    } else {
+      createRollbackInflight(rollbackInstantTime, inflightCommitTime, 
metaClient);
+    }
+
+    // Set up the conflict resolution strategy
+    Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, inflightCommitTime));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(inflightCommitTime);
+
+    metaClient.reloadActiveTimeline();
+    List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
+        Collectors.toList());
+
+    // The rollback operation should be detected as a candidate instant
+    Assertions.assertTrue(candidateInstants.size() == 1);
+    ConcurrentOperation rollbackOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation commitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
+
+    // The strategy should detect a conflict between the rollback and commit 
operations
+    Assertions.assertTrue(strategy.hasConflict(commitOperation, 
rollbackOperation));
+
+    // Attempting to resolve the conflict should throw an exception
+    try {
+      strategy.resolveConflict(null, commitOperation, rollbackOperation);
+      Assertions.fail("Cannot reach here, rollback and commit should have 
thrown a conflict");
+    } catch (HoodieWriteConflictException e) {
+      // expected
+    }
+  }
+
+  /**
+   * Negative testcase, ensures that conflict is not flagged for an on-going 
rollback that is targetting
+   * a different inflight commit.
+   * @param rollbackRequestedOnly - if true, cretes .rollback.requested only, 
otherwise creates .rollback.inflight
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testConcurrentRollbackAndCommitNoConflict(boolean 
rollbackRequestedOnly) throws Exception {
+    // Create two different commits
+    String targetCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommit(targetCommitTime, metaClient);
+    String differentCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommit(differentCommitTime, metaClient);
+
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // Start a new commit (inflight ingestion commit)
+    String inflightCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createInflightCommit(inflightCommitTime, metaClient);
+
+    // Start a rollback operation targeting a different commit (not the 
inflight one)
+    String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime();
+    if (rollbackRequestedOnly) {
+      createRollbackRequested(rollbackInstantTime, targetCommitTime, 
metaClient);
+    } else {
+      createRollbackInflight(rollbackInstantTime, targetCommitTime, 
metaClient);
+    }
+
+    // Set up the conflict resolution strategy
+    Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, inflightCommitTime));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(inflightCommitTime);
+
+    metaClient.reloadActiveTimeline();
+    List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
+        Collectors.toList());
+
+    // The rollback operation should be detected as a candidate instant
+    Assertions.assertTrue(candidateInstants.size() == 1);
+    ConcurrentOperation rollbackOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation commitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
+
+    // The strategy should NOT detect a conflict since the rollback targets a 
different commit
+    Assertions.assertFalse(strategy.hasConflict(commitOperation, 
rollbackOperation));
+  }
 }

Reply via email to