This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 31dbd89d6c9 [HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks 
as needed (#13064)
31dbd89d6c9 is described below

commit 31dbd89d6c99f704211aae0bf8dedf5320bab2f4
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jun 4 22:01:58 2025 -0700

    [HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed 
(#13064)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  44 ++++-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  23 +--
 .../hudi/client/timeline/TimestampUtils.java       |  10 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  65 +++++---
 .../rollback/BaseRollbackPlanActionExecutor.java   |  22 ++-
 .../HoodieFlinkMergeOnReadTableCompactor.java      |   3 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   3 +-
 .../HoodieJavaMergeOnReadTableCompactor.java       |   3 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   3 +-
 .../commit/BaseSparkCommitActionExecutor.java      |   9 +-
 .../TestHoodieClientOnMergeOnReadStorage.java      |   3 +-
 .../table/action/compact/TestAsyncCompaction.java  |   2 +-
 .../rollback/TestRollbackPlanActionExecutor.java   | 179 +++++++++++++++++++++
 .../TestHoodieSparkMergeOnReadTableRollback.java   |   5 +-
 .../sink/clustering/HoodieFlinkClusteringJob.java  |   3 +-
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |   2 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |   6 +-
 .../java/org/apache/hudi/util/CompactionUtil.java  |   6 +-
 .../command/procedures/RunCleanProcedure.scala     |   2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   2 +-
 21 files changed, 328 insertions(+), 69 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 1b7e854b0f9..62fa6f98d4a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -80,6 +80,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
@@ -216,7 +217,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     HoodieInstant inflightInstant = 
HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
     if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
       LOG.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
+          Option.of(txnManager));
       table.getMetaClient().reloadActiveTimeline();
       throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
           + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
@@ -290,7 +292,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
+          Option.of(txnManager));
       table.getMetaClient().reloadActiveTimeline();
     }
     compactionTimer = metrics.getCompactionCtx();
@@ -449,7 +452,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
     if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
       if 
(pendingClusteringTimeline.isPendingClusterInstant(inflightInstant.getTimestamp()))
 {
-        table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+        table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
+            Option.of(txnManager));
         table.getMetaClient().reloadActiveTimeline();
       } else {
         throw new HoodieClusteringException("Non clustering replace-commit 
inflight at timestamp " + clusteringInstant);
@@ -483,7 +487,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
     if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true);
+      table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true,
+          Option.of(txnManager));
       table.getMetaClient().reloadActiveTimeline();
       return true;
     }
@@ -746,7 +751,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       LOG.info("Cleaner started");
       // proceed only if multiple clean schedules are enabled or if there are 
no pending cleans.
       if (scheduleInline) {
-        scheduleTableServiceInternal(cleanInstantTime, Option.empty(), 
TableServiceType.CLEAN);
+        scheduleClean(cleanInstantTime);
         table.getMetaClient().reloadActiveTimeline();
       }
 
@@ -769,6 +774,16 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     return metadata;
   }
 
+  private void scheduleClean(String cleanInstantTime) {
+    HoodieInstant cleanInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, cleanInstantTime);
+    try {
+      txnManager.beginTransaction(Option.of(cleanInstant), Option.empty());
+      scheduleTableServiceInternal(cleanInstantTime, Option.empty(), 
TableServiceType.CLEAN);
+    } finally {
+      txnManager.endTransaction(Option.of(cleanInstant));
+    }
+  }
+
   /**
    * Trigger archival for the table. This ensures that the number of commits 
do not explode
    * and keep increasing unbounded over time.
@@ -1034,8 +1049,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
                 + "(exists in active timeline: %s), with rollback plan: %s",
             rollbackInstantTime, commitInstantOpt.isPresent(), 
pendingRollbackInfo.isPresent()));
         Option<HoodieRollbackPlan> rollbackPlanOption = 
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
-            .orElseGet(() -> table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(),
-                false));
+            .orElseGet(() -> scheduleRollback(table, rollbackInstantTime, 
commitInstantOpt, skipLocking));
         if (rollbackPlanOption.isPresent()) {
           // There can be a case where the inflight rollback failed after the 
instant files
           // are deleted for commitInstantTime, so that commitInstantOpt is 
empty as it is
@@ -1065,6 +1079,22 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     }
   }
 
+  private Option<HoodieRollbackPlan> scheduleRollback(HoodieTable table, 
String rollbackInstantTime, Option<HoodieInstant> commitInstantOpt,
+                                                      boolean skipLocking) {
+    HoodieInstant rollbackInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, rollbackInstantTime, 
HoodieTimeline.ROLLBACK_ACTION);
+    try {
+      if (!skipLocking) {
+        txnManager.beginTransaction(Option.of(rollbackInstant), 
Option.empty());
+      }
+      return table.scheduleRollback(context, rollbackInstantTime, 
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(),
+          false);
+    } finally {
+      if (!skipLocking) {
+        txnManager.endTransaction(Option.of(rollbackInstant));
+      }
+    }
+  }
+
   /**
    * Main API to rollback failed bootstrap.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 780d48d16bc..d4f41d4c9fd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -52,6 +52,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -342,8 +343,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   protected abstract void validateTimestamp(HoodieTableMetaClient metaClient, 
String instantTime);
 
   protected void validateTimestampInternal(HoodieTableMetaClient metaClient, 
String instantTime) {
-    if (config.shouldEnableTimestampOrderinValidation() && 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
-      TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+    if (config.shouldEnableTimestampOrderingValidation() && 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      TimestampUtils.validateForLatestTimestamp(Either.left(metaClient), 
metaClient.isMetadataTable(), instantTime);
     }
   }
 
@@ -829,20 +830,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * cleaned)
    */
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
-    return clean(cleanInstantTime, true, false);
-  }
-
-  /**
-   * Clean up any stale/old files/data lying around (either on file storage or 
index storage) based on the
-   * configurations and CleaningPolicy used. (typically files that no longer 
can be used by a running query can be
-   * cleaned)
-   * @param cleanInstantTime instant time for clean.
-   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
-   * @return instance of {@link HoodieCleanMetadata}.
-   */
-  @Deprecated
-  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
skipLocking) throws HoodieIOException {
-    return clean(cleanInstantTime, true, false);
+    return clean(cleanInstantTime, true);
   }
 
   /**
@@ -853,9 +841,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * of clean.
    * @param cleanInstantTime instant time for clean.
    * @param scheduleInline true if needs to be scheduled inline. false 
otherwise.
-   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
    */
-  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline, boolean skipLocking) throws HoodieIOException {
+  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline) throws HoodieIOException {
     return tableServiceClient.clean(cleanInstantTime, scheduleInline);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
index dfc1578290b..9a380c46707 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
@@ -19,16 +19,18 @@
 package org.apache.hudi.client.timeline;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.ValidationUtils;
 
 public class TimestampUtils {
 
-  public static void validateForLatestTimestamp(HoodieTableMetaClient 
metaClient, String instantTime) {
+  public static void validateForLatestTimestamp(Either<HoodieTableMetaClient, 
HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable, 
String instantTime) {
     // validate that the instant for which requested is about to be created is 
the latest in the timeline.
-    if (!metaClient.isMetadataTable()) { // lets validate data table that 
timestamps are generated in monotically increasing order. 
-      HoodieTableMetaClient reloadedMetaClient = 
HoodieTableMetaClient.reload(metaClient);
-      
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry
 -> {
+    if (!isMetadataTable) { // lets validate data table that timestamps are 
generated in monotically increasing order.
+      HoodieActiveTimeline reloadedActiveTimeline = 
metaClientOrActiveTimeline.isLeft() ? 
metaClientOrActiveTimeline.asLeft().reloadActiveTimeline() : 
metaClientOrActiveTimeline.asRight();
+      reloadedActiveTimeline.getWriteTimeline().lastInstant().ifPresent(entry 
-> {
         
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(),
 HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
             "Found later commit time " + entry + ", compared to the current 
instant " + instantTime + ", hence failing to create requested commit meta 
file");
       });
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f4b2be343cb..a369b748ee2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2629,7 +2629,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     return props.getInteger(WRITES_FILEID_ENCODING, 
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
   }
 
-  public Boolean shouldEnableTimestampOrderinValidation() {
+  public Boolean shouldEnableTimestampOrderingValidation() {
     return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8ff7b279024..9fc39341e19 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.timeline.TimestampUtils;
+import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -58,6 +59,7 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -540,6 +542,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param instantTime Instant Time for scheduling rollback
    * @param instantToRollback instant to be rolled back
    * @param shouldRollbackUsingMarkers uses marker based rollback strategy 
when set to true. uses list based rollback when false.
+   * @param isRestore {@code true} when invoked as part of restore.
    * @return HoodieRollbackPlan containing info on rollback.
    */
   public abstract Option<HoodieRollbackPlan> 
scheduleRollback(HoodieEngineContext context,
@@ -607,12 +610,12 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
                                                     String 
restoreInstantTimestamp,
                                                     String 
savepointToRestoreTimestamp);
 
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
-    rollbackInflightCompaction(inflightInstant, s -> Option.empty());
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant, 
Option<TransactionManager> txnManagerOpt) {
+    rollbackInflightCompaction(inflightInstant, s -> Option.empty(), 
txnManagerOpt);
   }
 
-  public void rollbackInflightLogCompaction(HoodieInstant inflightInstant) {
-    rollbackInflightLogCompaction(inflightInstant, s -> Option.empty());
+  public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, 
Option<TransactionManager> txnManagerOpt) {
+    rollbackInflightLogCompaction(inflightInstant, s -> Option.empty(), 
txnManagerOpt);
   }
 
   /**
@@ -622,9 +625,10 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param inflightInstant Inflight Compaction Instant
    */
   public void rollbackInflightCompaction(HoodieInstant inflightInstant,
-                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc,
+                                         Option<TransactionManager> 
txnManagerOpt) {
     
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
-    rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+    rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc, 
txnManagerOpt);
   }
 
   /**
@@ -634,8 +638,9 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param getPendingRollbackInstantFunc Function to get rollback instant
    */
   public void rollbackInflightClustering(HoodieInstant inflightInstant,
-                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
-    rollbackInflightClustering(inflightInstant, getPendingRollbackInstantFunc, 
false);
+                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc,
+                                         Option<TransactionManager> 
txnManagerOpt) {
+    rollbackInflightClustering(inflightInstant, getPendingRollbackInstantFunc, 
false, txnManagerOpt);
   }
 
   /**
@@ -645,9 +650,10 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param getPendingRollbackInstantFunc Function to get rollback instant
    */
   public void rollbackInflightClustering(HoodieInstant inflightInstant,
-                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc, boolean 
deleteInstants) {
+                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc, boolean 
deleteInstants,
+                                         Option<TransactionManager> 
txnManagerOpt) {
     
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
-    rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+    rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc, 
txnManagerOpt);
     if (deleteInstants) {
       // above rollback would still keep requested in the timeline. so, lets 
delete it if if are looking to purge the pending clustering fully.
       getActiveTimeline().deletePending(new 
HoodieInstant(HoodieInstant.State.REQUESTED, inflightInstant.getAction(), 
inflightInstant.getTimestamp()));
@@ -661,12 +667,12 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param getPendingRollbackInstantFunc Function to get rollback instant
    */
   private void rollbackInflightInstant(HoodieInstant inflightInstant,
-                                       Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+                                       Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc,
+                                       Option<TransactionManager> 
txnManagerOpt) {
     final String commitTime = 
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
         -> entry.getRollbackInstant().getTimestamp())
         .orElseGet(HoodieActiveTimeline::createNewInstantTime);
-    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(),
-        false);
+    scheduleRollback(commitTime, inflightInstant, txnManagerOpt);
     rollback(context, commitTime, inflightInstant, false, false);
     getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
   }
@@ -677,15 +683,26 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    *
    * @param inflightInstant Inflight Compaction Instant
    */
-  public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, 
Function<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInstantFunc) {
+  public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, 
Function<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInstantFunc,
+                                            Option<TransactionManager> 
txnManagerOpt) {
     final String commitTime = 
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
         -> entry.getRollbackInstant().getTimestamp())
         .orElseGet(HoodieActiveTimeline::createNewInstantTime);
-    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(),
-        false);
+    scheduleRollback(commitTime, inflightInstant, txnManagerOpt);
     rollback(context, commitTime, inflightInstant, true, false);
   }
 
+  private void scheduleRollback(String commitTime, HoodieInstant 
inflightInstant, Option<TransactionManager> txnManagerOpt) {
+    HoodieInstant rollbackInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime, 
HoodieTimeline.ROLLBACK_ACTION);
+    try {
+      txnManagerOpt.ifPresent(txnManager -> 
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty()));
+      scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(),
+          false);
+    } finally {
+      txnManagerOpt.ifPresent(txnManager -> 
txnManager.endTransaction(Option.of(rollbackInstant)));
+    }
+  }
+
   /**
    * Finalize the written data onto storage. Perform any final cleanups.
    *
@@ -897,15 +914,25 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     }
   }
 
+  /**
+   * Validates that the instantTime is latest in the write timeline. This 
method is specifically used to avoid additional timeline reloads.
+   * If the caller expects the validation to explicitly reload, please use 
{@link #validateForLatestTimestampInternal(Either, boolean, String)}.
+   * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+   * @param instantTime instant time of interest.
+   */
+  public void validateForLatestTimestampWithoutReload(HoodieTableMetaClient 
metaClient, String instantTime) {
+    
validateForLatestTimestampInternal(Either.right(metaClient.getActiveTimeline()),
 metaClient.isMetadataTable(), instantTime);
+  }
+
   /**
    * Validates that the instantTime is latest in the write timeline.
    * @param instantTime instant time of interest.
    */
   public abstract void validateForLatestTimestamp(String instantTime);
 
-  protected void validateForLatestTimestampInternal(String instantTime) {
-    if (this.config.shouldEnableTimestampOrderinValidation() && 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
-      TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+  protected void 
validateForLatestTimestampInternal(Either<HoodieTableMetaClient, 
HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable, 
String instantTime) {
+    if (this.config.shouldEnableTimestampOrderingValidation() && 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      TimestampUtils.validateForLatestTimestamp(metaClientOrActiveTimeline, 
isMetadataTable, instantTime);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index 045e8a5a720..cfda4711558 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -113,7 +114,9 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> 
extends BaseActionExecut
       HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new 
HoodieInstantInfo(instantToRollback.getTimestamp(),
           instantToRollback.getAction()), rollbackRequests, 
LATEST_ROLLBACK_PLAN_VERSION);
       if (!skipTimelinePublish) {
-        table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
+        if (!canProceedWithRollback(rollbackInstant)) {
+          return Option.empty();
+        }
         if 
(table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp()))
 {
           LOG.warn("Request Rollback found with instant time " + 
rollbackInstant + ", hence skipping scheduling rollback");
         } else {
@@ -129,6 +132,23 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> 
extends BaseActionExecut
     }
   }
 
+  private boolean canProceedWithRollback(HoodieInstant rollbackInstant) {
+    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      // check for concurrent rollbacks. i.e if the commit being rolledback is 
already rolled back, we can bail out.
+      HoodieTableMetaClient reloadedMetaClient = 
HoodieTableMetaClient.reload(table.getMetaClient());
+      HoodieTimeline reloadedActiveTimeline = 
reloadedMetaClient.getActiveTimeline();
+      if 
(!reloadedActiveTimeline.filterInflightsAndRequested().containsInstant(instantToRollback.getTimestamp())
+          && 
!reloadedActiveTimeline.filterCompletedInstants().containsInstant(instantToRollback.getTimestamp()))
 {
+        // if instant to rollback is already rolled back, we can bail out.
+        return false;
+      } else {
+        // since we had already reloaded the timeline above, lets avoid 
additional reload with validateForLatestTimestamp.
+        table.validateForLatestTimestampWithoutReload(reloadedMetaClient, 
rollbackInstant.getTimestamp());
+      }
+    }
+    return true;
+  }
+
   @Override
   public Option<HoodieRollbackPlan> execute() {
     // Plan a new rollback action
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index 058459b1bf6..ee2f31a9bff 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -49,7 +50,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T>
         ? HoodieTimeline.getCompactionInflightInstant(instantTime)
         : HoodieTimeline.getLogCompactionInflightInstant(instantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant);
+      table.rollbackInflightCompaction(inflightInstant, Option.empty());
       table.getMetaClient().reloadActiveTimeline();
     }
   }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index cc91d5d3436..ff6f49b815a 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
@@ -94,7 +95,7 @@ public class HoodieJavaCopyOnWriteTable<T>
 
   @Override
   public void validateForLatestTimestamp(String instantTime) {
-    validateForLatestTimestampInternal(instantTime);
+    validateForLatestTimestampInternal(Either.left(metaClient), 
metaClient.isMetadataTable(), instantTime);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
index d8994ce02c3..06b962aa056 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -48,7 +49,7 @@ public class HoodieJavaMergeOnReadTableCompactor<T>
     }
     HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(instantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant);
+      table.rollbackInflightCompaction(inflightInstant, Option.empty());
       table.getMetaClient().reloadActiveTimeline();
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index fea0a19dcdf..869b9c46824 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -106,7 +107,7 @@ public class HoodieSparkCopyOnWriteTable<T>
 
   @Override
   public void validateForLatestTimestamp(String instantTime) {
-    validateForLatestTimestampInternal(instantTime);
+    validateForLatestTimestampInternal(Either.left(metaClient), 
metaClient.isMetadataTable(), instantTime);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 129ace5f8d1..f9b474eedd4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -140,7 +141,13 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
           .collect(Collectors.toSet());
       pendingClusteringInstantsToRollback.forEach(instant -> {
         String commitTime = HoodieActiveTimeline.createNewInstantTime();
-        table.scheduleRollback(context, commitTime, instant, false, 
config.shouldRollbackUsingMarkers(), false);
+        HoodieInstant rollbackInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime, 
HoodieTimeline.ROLLBACK_ACTION);
+        try {
+          txnManagerOption.ifPresent(txnManager -> 
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty()));
+          table.scheduleRollback(context, commitTime, instant, false, 
config.shouldRollbackUsingMarkers(), false);
+        } finally {
+          txnManagerOption.ifPresent(txnManager -> 
txnManager.endTransaction(Option.of(rollbackInstant)));
+        }
         table.rollback(context, commitTime, instant, true, true);
       });
       table.getMetaClient().reloadActiveTimeline();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 59421597013..179ee72c531 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.functional;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -401,7 +402,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
 
       // Rollback the log compaction commit.
       HoodieInstant instant = new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionTimeStamp.get());
-      getHoodieTable(metaClient, 
config).rollbackInflightLogCompaction(instant);
+      getHoodieTable(metaClient, 
config).rollbackInflightLogCompaction(instant, Option.of(new 
TransactionManager(config, metaClient.getStorage())));
 
       // Validate timeline.
       HoodieTimeline activeTimeline = metaClient.reloadActiveTimeline();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 6eb9da120ce..310c851683d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -94,7 +94,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, 
metaClient);
 
       hoodieTable.rollbackInflightCompaction(
-          new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionInstantTime));
+          new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionInstantTime), Option.empty());
       metaClient = 
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(cfg.getBasePath()).build();
       pendingCompactionInstant = 
metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
           .getInstantsAsStream().findFirst().get();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackPlanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackPlanActionExecutor.java
new file mode 100644
index 00000000000..8d9b37a74bd
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackPlanActionExecutor.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRollbackPlanActionExecutor extends 
HoodieClientRollbackTestBase {
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initHoodieStorage();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void 
testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() throws 
Exception {
+    final String p1 = "2015/03/16";
+    final String p2 = "2015/03/17";
+    final String p3 = "2016/03/15";
+    // Let's create some commit files and base files
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient)
+        .withPartitionMetaFiles(p1, p2, p3)
+        .addCommit("001")
+        .withBaseFilesInPartition(p1, "id11").getLeft()
+        .withBaseFilesInPartition(p2, "id12").getLeft()
+        .withLogFile(p1, "id11", 3).getLeft();
+
+    Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
+      {
+        put(p1, "id21");
+        put(p2, "id22");
+      }
+    };
+    Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap1 = 
new HashMap<>();
+    partitionAndFileId1.forEach((k, v) -> 
partitionToFilesNameLengthMap1.put(k, Collections.singletonList(Pair.of(v, 
100))));
+    HoodieCommitMetadata commitMetadata = testTable.doWriteOperation("002", 
WriteOperationType.UPSERT, Arrays.asList(p1, p2, p3), 
partitionToFilesNameLengthMap1,
+        false, true);
+
+    HoodieWriteConfig writeConfig = 
getConfigBuilder().withRollbackUsingMarkers(true).withEmbeddedTimelineServerEnabled(false)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build();
+    HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
+
+    // create markers for inflight commit 002
+    WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, 
table, "002");
+    writeMarkers.createIfNotExists(p1, 
commitMetadata.getPartitionToWriteStats().get(p1).get(0).getPath().substring(11),
 IOType.CREATE);
+    writeMarkers.createIfNotExists(p1, 
commitMetadata.getPartitionToWriteStats().get(p2).get(0).getPath().substring(11),
 IOType.CREATE);
+
+    HoodieInstant needRollBackInstant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, "002");
+    String rollbackInstant1 = "003";
+    String rollbackInstant2 = "004";
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    try {
+      CompletableFuture<Boolean> future1 = CompletableFuture.supplyAsync(new 
Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          // trigger rollback planning by writer 1.
+          BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor 
=
+              new BaseRollbackPlanActionExecutor(context, table.getConfig(), 
table, rollbackInstant2, needRollBackInstant, false,
+                  table.getConfig().shouldRollbackUsingMarkers(), false);
+          HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
+          CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor 
= new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, 
rollbackInstant2, needRollBackInstant, true,
+              false);
+          HoodieRollbackMetadata hoodieRollbackMetadata = 
copyOnWriteRollbackActionExecutor.execute();
+          assertTrue(hoodieRollbackMetadata.getTotalFilesDeleted() > 0);
+          // once rollback planning is complete, count down the latch
+          countDownLatch.countDown();
+          return true;
+        }
+      }, executor);
+
+      CompletableFuture<Boolean> future2 = CompletableFuture.supplyAsync(new 
Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          MockRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor 
=
+              new MockRollbackPlanActionExecutor(context, table.getConfig(), 
table, rollbackInstant1, needRollBackInstant, false,
+                  table.getConfig().shouldRollbackUsingMarkers(), false, 
countDownLatch);
+          // this execute will wait until count down latch reaches 0 
successfully before proceeding.
+          Option<HoodieRollbackPlan> rollbackPlan = 
(Option<HoodieRollbackPlan>) copyOnWriteRollbackPlanActionExecutor.execute();
+          assertTrue(rollbackPlan.isEmpty());
+          return true;
+        }
+      }, executor);
+
+      CompletableFuture.allOf(future1, future2).join();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  /**
+   * Mock RollbackPlanActionExecutor to await on count down latch before 
calling super.execute() for rollback planning.
+   */
+  class MockRollbackPlanActionExecutor extends BaseRollbackPlanActionExecutor {
+
+    private CountDownLatch countDownLatch;
+
+    public MockRollbackPlanActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable table, String instantTime,
+                                          HoodieInstant instantToRollback, 
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean 
isRestore,
+                                          CountDownLatch countDownLatch) {
+      super(context, config, table, instantTime, instantToRollback, 
skipTimelinePublish, shouldRollbackUsingMarkers, isRestore);
+      this.countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public Option<HoodieRollbackPlan> execute() {
+      try {
+        if (countDownLatch.await(120, TimeUnit.SECONDS)) {
+          return super.execute();
+        } else {
+          throw new HoodieException("Hitting timeout waiting for countdown 
latch to reach 0.");
+        }
+      } catch (InterruptedException e) {
+        throw new HoodieException("Interrupted exception thrown waiting for 
countdown latch to reach 0.");
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index c08026946c0..cb60a2c1c9a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -331,7 +331,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
         assertTrue(listAllBaseFilesInPath(hoodieTable).stream()
             .anyMatch(file -> compactedCommitTime.equals(new 
HoodieBaseFile(file).getCommitTime())));
         hoodieTable.rollbackInflightCompaction(new HoodieInstant(
-            HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactedCommitTime));
+            HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactedCommitTime), Option.empty());
         allFiles = listAllBaseFilesInPath(hoodieTable);
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, 
metaClient.getCommitsTimeline(), allFiles);
@@ -1005,8 +1005,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends TestHoodieSparkRoll
       //writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
       // Trigger a rollback of compaction
       table.getActiveTimeline().reload();
-      table.rollbackInflightCompaction(new HoodieInstant(
-          HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
newCommitTime));
+      table.rollbackInflightCompaction(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
newCommitTime), Option.empty());
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
       table = HoodieSparkTable.create(config, context(), metaClient);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 308ebf898a5..0f63cc31ddf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -285,7 +285,8 @@ public class HoodieFlinkClusteringJob {
       if 
(table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
         LOG.info("Rollback inflight clustering instant: [" + clusteringInstant 
+ "]");
         table.rollbackInflightClustering(inflightInstant,
-            commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false));
+            commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false),
+            Option.empty());
         table.getMetaClient().reloadActiveTimeline();
       }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 74fe7929607..c9c9eb28d35 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -247,7 +247,7 @@ public class HoodieFlinkCompactor {
         HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(timestamp);
         if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
           LOG.info("Rollback inflight compaction instant: [" + timestamp + 
"]");
-          table.rollbackInflightCompaction(inflightInstant);
+          table.rollbackInflightCompaction(inflightInstant, null);
           table.getMetaClient().reloadActiveTimeline();
         }
       });
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 6f0bb97a053..0c735ea305c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -95,7 +95,8 @@ public class ClusteringUtil {
     inflightInstants.forEach(inflightInstant -> {
       LOG.info("Rollback the inflight clustering instant: " + inflightInstant 
+ " for failover");
       table.rollbackInflightClustering(inflightInstant,
-          commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false));
+          commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false),
+          Option.empty());
       table.getMetaClient().reloadActiveTimeline();
     });
   }
@@ -112,7 +113,8 @@ public class ClusteringUtil {
     if 
(table.getMetaClient().reloadActiveTimeline().isPendingClusterInstant(instantTime))
 {
       LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
       table.rollbackInflightClustering(inflightInstant,
-          commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false));
+          commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false),
+          Option.empty());
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 1927645d308..a1918ae447a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -168,7 +168,7 @@ public class CompactionUtil {
     HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(instantTime);
     if 
(table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(inflightInstant))
 {
       LOG.warn("Rollback failed compaction instant: [" + instantTime + "]");
-      table.rollbackInflightCompaction(inflightInstant);
+      table.rollbackInflightCompaction(inflightInstant, Option.empty());
     }
   }
 
@@ -184,7 +184,7 @@ public class CompactionUtil {
             instant.getState() == HoodieInstant.State.INFLIGHT);
     inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
       LOG.info("Rollback the inflight compaction instant: " + inflightInstant 
+ " for failover");
-      table.rollbackInflightCompaction(inflightInstant);
+      table.rollbackInflightCompaction(inflightInstant, Option.empty());
       table.getMetaClient().reloadActiveTimeline();
     });
   }
@@ -207,7 +207,7 @@ public class CompactionUtil {
       int timeout = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
       if (StreamerUtil.instantTimeDiffSeconds(currentTime, 
instant.getTimestamp()) >= timeout) {
         LOG.info("Rollback the inflight compaction instant: " + instant + " 
for timeout(" + timeout + "s)");
-        table.rollbackInflightCompaction(instant);
+        table.rollbackInflightCompaction(instant, Option.empty());
         table.getMetaClient().reloadActiveTimeline();
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 7f7c2fe0825..b0b1ebd4284 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -100,7 +100,7 @@ class RunCleanProcedure extends BaseProcedure with 
ProcedureBuilder with Logging
     try {
       client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
         tableName.asInstanceOf[Option[String]])
-      val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, 
skipLocking)
+      val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine)
 
       if (hoodieCleanMeta == null) Seq.empty
       else Seq(Row(hoodieCleanMeta.getStartCleanTime,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index e297bdae191..f5f0abf0c7e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1899,7 +1899,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
                 new SparkRDDWriteClient(context, 
writeConfig).getTableServiceClient
                   .getPendingRollbackInfo(table.getMetaClient, 
commitToRollback, false)
               }
-            })
+            }, org.apache.hudi.common.util.Option.empty())
           val requestedClustering = 
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get
           assertTrue(requestedClustering.isRequested)
           assertEquals(

Reply via email to