This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db999a1a25 [MINOR] Renaming TransactionManager methods to begin/end x 
StateChange (#13387)
2db999a1a25 is described below

commit 2db999a1a253fb222f1e4a397e46f92f2b1a846d
Author: vinoth chandar <[email protected]>
AuthorDate: Mon Jun 2 16:55:14 2025 -0700

    [MINOR] Renaming TransactionManager methods to begin/end x StateChange 
(#13387)
    
    * [MINOR] Renaming TransactionManager methods to begin/end x StateChange
    
     - begin/end Transaction is confusing.
     - Naming aligns with how these methods are called, whenever action state 
changes
    
    * Log message cleanup
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 24 +++++-----
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 24 +++++-----
 .../timeline/versioning/v1/TimelineArchiverV1.java |  4 +-
 .../timeline/versioning/v2/TimelineArchiverV2.java |  4 +-
 .../DirectMarkerTransactionManager.java            |  2 +-
 .../client/transaction/TransactionManager.java     | 40 ++++++++--------
 .../table/action/clean/CleanActionExecutor.java    |  4 +-
 .../action/commit/BaseCommitActionExecutor.java    |  4 +-
 .../action/index/AbstractIndexingCatchupTask.java  |  4 +-
 .../table/action/index/RunIndexActionExecutor.java |  4 +-
 .../action/restore/BaseRestoreActionExecutor.java  |  4 +-
 .../rollback/BaseRollbackActionExecutor.java       |  4 +-
 .../hudi/client/TestBaseHoodieWriteClient.java     |  4 +-
 .../client/transaction/TestTransactionManager.java | 54 +++++++++++-----------
 .../hudi/client/HoodieFlinkTableServiceClient.java |  8 ++--
 15 files changed, 94 insertions(+), 94 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 54898d1123b..1a4e21c6242 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -349,7 +349,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
     try {
-      this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
       finalizeWrite(table, compactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
       writeTableMetadata(table, compactionCommitTime, metadata);
@@ -357,7 +357,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
       LOG.debug("Compaction {} finished with result: {}", 
compactionCommitTime, metadata);
     } finally {
-      this.txnManager.endTransaction(Option.of(compactionInstant));
+      this.txnManager.endStateChange(Option.of(compactionInstant));
       releaseResources(compactionCommitTime);
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
@@ -410,7 +410,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     final HoodieInstant logCompactionInstant = 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION,
         logCompactionCommitTime);
     try {
-      this.txnManager.beginTransaction(Option.of(logCompactionInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(logCompactionInstant), 
Option.empty());
       preCommit(metadata);
       finalizeWrite(table, logCompactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
@@ -419,7 +419,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       CompactHelpers.getInstance().completeInflightLogCompaction(table, 
logCompactionCommitTime, metadata);
       LOG.debug("Log Compaction {} finished with result {}", 
logCompactionCommitTime, metadata);
     } finally {
-      this.txnManager.endTransaction(Option.of(logCompactionInstant));
+      this.txnManager.endStateChange(Option.of(logCompactionInstant));
       releaseResources(logCompactionCommitTime);
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionCommitTime)
@@ -537,7 +537,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     final HoodieInstant clusteringInstant = 
ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
         table.getActiveTimeline(), 
table.getMetaClient().getInstantGenerator()).get();
     try {
-      this.txnManager.beginTransaction(Option.of(clusteringInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(clusteringInstant), 
Option.empty());
 
       finalizeWrite(table, clusteringCommitTime, writeStats);
       // Only in some cases conflict resolution needs to be performed.
@@ -555,7 +555,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     } catch (Exception e) {
       throw new HoodieClusteringException("unable to transition clustering 
inflight to complete: " + clusteringCommitTime, e);
     } finally {
-      this.txnManager.endTransaction(Option.of(clusteringInstant));
+      this.txnManager.endStateChange(Option.of(clusteringInstant));
       releaseResources(clusteringCommitTime);
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
clusteringCommitTime)
@@ -652,7 +652,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       // so it is handled differently to avoid locking for planning.
       return scheduleCleaning(createTable(config, storageConf), 
providedInstantTime);
     }
-    txnManager.beginTransaction(Option.empty(), Option.empty());
+    txnManager.beginStateChange(Option.empty(), Option.empty());
     try {
       Option<String> option;
       HoodieTable<?, ?, ?, ?> table = createTable(config, storageConf);
@@ -688,7 +688,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
 
       return option;
     } finally {
-      txnManager.endTransaction(Option.empty());
+      txnManager.endStateChange(Option.empty());
     }
   }
 
@@ -817,7 +817,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
   private Option<String> scheduleCleaning(HoodieTable<?, ?, ?, ?> table, 
Option<String> suppliedCleanInstant) {
     Option<HoodieCleanerPlan> cleanerPlan = table.createCleanerPlan(context, 
Option.empty());
     if (cleanerPlan.isPresent()) {
-      txnManager.beginTransaction(Option.empty(), Option.empty());
+      txnManager.beginStateChange(Option.empty(), Option.empty());
       try {
         String cleanInstantTime = suppliedCleanInstant.orElseGet(() -> 
createNewInstantTime(false));
         final HoodieInstant cleanInstant = 
table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, cleanInstantTime);
@@ -833,7 +833,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
         LOG.error("Got exception when saving cleaner requested file", e);
         throw e;
       } finally {
-        txnManager.endTransaction(Option.empty());
+        txnManager.endStateChange(Option.empty());
       }
     }
     return Option.empty();
@@ -1121,14 +1121,14 @@ public abstract class BaseHoodieTableServiceClient<I, 
T, O> extends BaseHoodieCl
           return false;
         }
         if (!skipLocking) {
-          txnManager.beginTransaction(Option.empty(), Option.empty());
+          txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
           rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
           rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
         } finally {
           if (!skipLocking) {
-            txnManager.endTransaction(Option.empty());
+            txnManager.endStateChange(Option.empty());
           }
         }
       }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 80f12c1357e..7a298e6ae45 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -252,7 +252,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
                 extraMetadata, operationType, config.getWriteSchema(), 
commitActionType));
     HoodieInstant inflightInstant = 
table.getMetaClient().createNewInstant(State.INFLIGHT, commitActionType, 
instantTime);
     HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, 
heartbeatClient, config);
-    this.txnManager.beginTransaction(Option.of(inflightInstant),
+    this.txnManager.beginStateChange(Option.of(inflightInstant),
         lastCompletedTxnAndMetadata.isPresent() ? 
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
     try {
       preCommit(metadata);
@@ -265,7 +265,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime, e);
     } finally {
-      this.txnManager.endTransaction(Option.of(inflightInstant));
+      this.txnManager.endStateChange(Option.of(inflightInstant));
       releaseResources(instantTime);
     }
 
@@ -859,12 +859,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     if (failedRestore.isPresent() && 
savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table,
 failedRestore.get()))) {
       return Pair.of(failedRestore.get().requestedTime(), 
Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(), 
failedRestore.get())));
     }
-    txnManager.beginTransaction(Option.empty(), Option.empty());
+    txnManager.beginStateChange(Option.empty(), Option.empty());
     try {
       final String restoreInstantTimestamp = createNewInstantTime();
       return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, 
restoreInstantTimestamp, savepointToRestoreTimestamp));
     } finally {
-      txnManager.endTransaction(Option.empty());
+      txnManager.endStateChange(Option.empty());
     }
   }
 
@@ -969,7 +969,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
         HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites(metaClient));
 
-    txnManager.beginTransaction(Option.empty(), 
lastCompletedTxnAndMetadata.map(Pair::getLeft));
+    txnManager.beginStateChange(Option.empty(), 
lastCompletedTxnAndMetadata.map(Pair::getLeft));
     String instantTime;
     HoodieInstant instant = null;
     try {
@@ -992,7 +992,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
         metaClient.getActiveTimeline().createNewInstant(instant);
       }
     } finally {
-      txnManager.endTransaction(Option.ofNullable(instant));
+      txnManager.endStateChange(Option.ofNullable(instant));
     }
     return instantTime;
   }
@@ -1024,14 +1024,14 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @return instant time for the requested INDEX action
    */
   public Option<String> scheduleIndexing(List<MetadataPartitionType> 
partitionTypes, List<String> partitionPaths) {
-    txnManager.beginTransaction(Option.empty(), Option.empty());
+    txnManager.beginStateChange(Option.empty(), Option.empty());
     try {
       String instantTime = createNewInstantTime(false);
       Option<HoodieIndexPlan> indexPlan = createTable(config)
           .scheduleIndexing(context, instantTime, partitionTypes, 
partitionPaths);
       return indexPlan.map(plan -> instantTime);
     } finally {
-      txnManager.endTransaction(Option.empty());
+      txnManager.endStateChange(Option.empty());
     }
   }
 
@@ -1054,7 +1054,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     HoodieTable table = createTable(config);
     String dropInstant = createNewInstantTime();
     HoodieInstant ownerInstant = 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.INDEXING_ACTION, dropInstant);
-    this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
+    this.txnManager.beginStateChange(Option.of(ownerInstant), Option.empty());
     try {
       context.setJobStatus(this.getClass().getSimpleName(), "Dropping 
partitions from metadata table: " + config.getTableName());
       HoodieTableMetaClient metaClient = table.getMetaClient();
@@ -1087,7 +1087,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
         }
       }
     } finally {
-      this.txnManager.endTransaction(Option.of(ownerInstant));
+      this.txnManager.endStateChange(Option.of(ownerInstant));
     }
   }
 
@@ -1298,11 +1298,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   }
 
   private void executeUsingTxnManager(Option<HoodieInstant> ownerInstant, 
Runnable r) {
-    this.txnManager.beginTransaction(ownerInstant, Option.empty());
+    this.txnManager.beginStateChange(ownerInstant, Option.empty());
     try {
       r.run();
     } finally {
-      this.txnManager.endTransaction(ownerInstant);
+      this.txnManager.endStateChange(ownerInstant);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index c0aa11a2aea..8da69585b04 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -139,7 +139,7 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
     try {
       if (acquireLock) {
         // there is no owner or instant time per se for archival.
-        txnManager.beginTransaction(Option.empty(), Option.empty());
+        txnManager.beginStateChange(Option.empty(), Option.empty());
       }
       List<HoodieInstant> instantsToArchive = getInstantsToArchive();
       if (!instantsToArchive.isEmpty()) {
@@ -156,7 +156,7 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
     } finally {
       close();
       if (acquireLock) {
-        txnManager.endTransaction(Option.empty());
+        txnManager.endStateChange(Option.empty());
       }
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index 65e08920ae4..e18617232e5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -95,7 +95,7 @@ public class TimelineArchiverV2<T extends HoodieAvroPayload, 
I, K, O> implements
     try {
       if (acquireLock) {
         // there is no owner or instant time per se for archival.
-        txnManager.beginTransaction(Option.empty(), Option.empty());
+        txnManager.beginStateChange(Option.empty(), Option.empty());
       }
     } catch (HoodieLockException e) {
       LOG.error("Fail to begin transaction", e);
@@ -123,7 +123,7 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
       return instantsToArchive.size();
     } finally {
       if (acquireLock) {
-        txnManager.endTransaction(Option.empty());
+        txnManager.endStateChange(Option.empty());
       }
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index d75d0a648dc..02b027f12d3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -51,7 +51,7 @@ public class DirectMarkerTransactionManager extends 
TransactionManager {
       LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " 
+ filePath);
       lockManager.lock();
 
-      reset(currentTxnOwnerInstant, 
Option.of(getInstant(newTxnOwnerInstantTime, instantGenerator)), 
Option.empty());
+      reset(changeActionInstant, Option.of(getInstant(newTxnOwnerInstantTime, 
instantGenerator)), Option.empty());
       LOG.info("Transaction started for " + newTxnOwnerInstantTime + " and " + 
filePath);
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index b725243fca0..da1f6639752 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -38,8 +38,8 @@ public class TransactionManager implements Serializable {
   protected static final Logger LOG = 
LoggerFactory.getLogger(TransactionManager.class);
   protected final LockManager lockManager;
   protected final boolean isLockRequired;
-  protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
-  private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
+  protected Option<HoodieInstant> changeActionInstant = Option.empty();
+  private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
     this(new LockManager(config, storage), config.isLockRequired());
@@ -50,34 +50,34 @@ public class TransactionManager implements Serializable {
     this.isLockRequired = isLockRequired;
   }
 
-  public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
-                               Option<HoodieInstant> 
lastCompletedTxnOwnerInstant) {
+  public void beginStateChange(Option<HoodieInstant> changeActionInstant,
+                               Option<HoodieInstant> 
lastCompletedActionInstant) {
     if (isLockRequired) {
-      LOG.info("Transaction starting for " + newTxnOwnerInstant
-          + " with latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
+      LOG.info("State change starting for {} with latest completed action 
instant {}",
+          changeActionInstant, lastCompletedActionInstant);
       lockManager.lock();
-      reset(currentTxnOwnerInstant, newTxnOwnerInstant, 
lastCompletedTxnOwnerInstant);
-      LOG.info("Transaction started for " + newTxnOwnerInstant
-          + " with latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
+      reset(this.changeActionInstant, changeActionInstant, 
lastCompletedActionInstant);
+      LOG.info("State change started for {} with latest completed action 
instant {}",
+          changeActionInstant, lastCompletedActionInstant);
     }
   }
 
-  public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
+  public void endStateChange(Option<HoodieInstant> changeActionInstant) {
     if (isLockRequired) {
-      LOG.info("Transaction ending with transaction owner " + 
currentTxnOwnerInstant);
-      if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
+      LOG.info("State change ending for action instant {}", 
changeActionInstant);
+      if (reset(changeActionInstant, Option.empty(), Option.empty())) {
         lockManager.unlock();
-        LOG.info("Transaction ended with transaction owner " + 
currentTxnOwnerInstant);
+        LOG.info("State change ended for action instant {}", 
changeActionInstant);
       }
     }
   }
 
   protected synchronized boolean reset(Option<HoodieInstant> callerInstant,
-                                       Option<HoodieInstant> 
newTxnOwnerInstant,
-                                       Option<HoodieInstant> 
lastCompletedTxnOwnerInstant) {
-    if (!this.currentTxnOwnerInstant.isPresent() || 
this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
-      this.currentTxnOwnerInstant = newTxnOwnerInstant;
-      this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
+                                       Option<HoodieInstant> 
changeActionInstant,
+                                       Option<HoodieInstant> 
lastCompletedActionInstant) {
+    if (!this.changeActionInstant.isPresent() || 
this.changeActionInstant.get().equals(callerInstant.get())) {
+      this.changeActionInstant = changeActionInstant;
+      this.lastCompletedActionInstant = lastCompletedActionInstant;
       return true;
     }
     return false;
@@ -95,11 +95,11 @@ public class TransactionManager implements Serializable {
   }
 
   public Option<HoodieInstant> getLastCompletedTransactionOwner() {
-    return lastCompletedTxnOwnerInstant;
+    return lastCompletedActionInstant;
   }
 
   public Option<HoodieInstant> getCurrentTransactionOwner() {
-    return currentTxnOwnerInstant;
+    return changeActionInstant;
   }
 
   public boolean isLockRequired() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index d37c40960c9..bed24daa7e7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -220,13 +220,13 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
           cleanStats,
           cleanerPlan.getExtraMetadata()
       );
-      this.txnManager.beginTransaction(Option.of(inflightInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(inflightInstant), 
Option.empty());
       writeTableMetadata(metadata, inflightInstant.requestedTime());
       table.getActiveTimeline().transitionCleanInflightToComplete(false, 
inflightInstant, Option.of(metadata));
       LOG.info("Marked clean started on {} as complete", 
inflightInstant.requestedTime());
       return metadata;
     } finally {
-      this.txnManager.endTransaction(Option.ofNullable(inflightInstant));
+      this.txnManager.endStateChange(Option.ofNullable(inflightInstant));
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index dd547b7c03c..549afc7d90d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -199,7 +199,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
         getCommitActionType(), instantTime));
     ValidationUtils.checkState(this.txnManagerOption.isPresent(), "The 
transaction manager has not been initialized");
     TransactionManager txnManager = this.txnManagerOption.get();
-    txnManager.beginTransaction(inflightInstant,
+    txnManager.beginStateChange(inflightInstant,
         lastCompletedTxn.isPresent() ? 
Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
     try {
       setCommitMetadata(result);
@@ -208,7 +208,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
           result.getCommitMetadata(), config, 
txnManager.getLastCompletedTransactionOwner(), false, 
pendingInflightAndRequestedInstants);
       commit(result);
     } finally {
-      txnManager.endTransaction(inflightInstant);
+      txnManager.endStateChange(inflightInstant);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
index c5b22e56709..45b51256711 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
@@ -107,7 +107,7 @@ public abstract class AbstractIndexingCatchupTask 
implements IndexingCatchupTask
         }
         try {
           // we need take a lock here as inflight writer could also try to 
update the timeline
-          transactionManager.beginTransaction(Option.of(instant), 
Option.empty());
+          transactionManager.beginStateChange(Option.of(instant), 
Option.empty());
           LOG.info("Updating metadata table for instant: " + instant);
           switch (instant.getAction()) {
             case HoodieTimeline.COMMIT_ACTION:
@@ -133,7 +133,7 @@ public abstract class AbstractIndexingCatchupTask 
implements IndexingCatchupTask
         } catch (IOException e) {
           throw new HoodieIndexException(String.format("Could not update 
metadata partition for instant: %s", instant), e);
         } finally {
-          transactionManager.endTransaction(Option.of(instant));
+          transactionManager.endStateChange(Option.of(instant));
         }
       }
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index bceb50d8417..fe02732c3da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -266,13 +266,13 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
                                             HoodieIndexCommitMetadata 
indexCommitMetadata) throws IOException {
     try {
       // update the table config and timeline in a lock as there could be 
another indexer running
-      txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
+      txnManager.beginStateChange(Option.of(indexInstant), Option.empty());
       updateMetadataPartitionsTableConfig(table.getMetaClient(),
           
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
       table.getActiveTimeline().saveAsComplete(false, 
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, 
INDEXING_ACTION, indexInstant.requestedTime()),
           Option.of(indexCommitMetadata));
     } finally {
-      txnManager.endTransaction(Option.of(indexInstant));
+      txnManager.endStateChange(Option.of(indexInstant));
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 831fe0616d6..36b4aae550e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -152,10 +152,10 @@ public abstract class BaseRestoreActionExecutor<T, I, K, 
O> extends BaseActionEx
    */
   private void writeToMetadata(HoodieRestoreMetadata restoreMetadata, 
HoodieInstant restoreInflightInstant) {
     try {
-      this.txnManager.beginTransaction(Option.of(restoreInflightInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(restoreInflightInstant), 
Option.empty());
       writeTableMetadata(restoreMetadata);
     } finally {
-      this.txnManager.endTransaction(Option.of(restoreInflightInstant));
+      this.txnManager.endStateChange(Option.of(restoreInflightInstant));
     }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 2f22d27bd8a..2c17de655bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -270,7 +270,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
     boolean enableLocking = (!skipLocking && !skipTimelinePublish);
     try {
       if (enableLocking) {
-        this.txnManager.beginTransaction(Option.of(inflightInstant), 
Option.empty());
+        this.txnManager.beginStateChange(Option.of(inflightInstant), 
Option.empty());
       }
 
       // If publish the rollback to the timeline, we first write the rollback 
metadata to metadata table
@@ -293,7 +293,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
       }
     } finally {
       if (enableLocking) {
-        this.txnManager.endTransaction(Option.of(inflightInstant));
+        this.txnManager.endStateChange(Option.of(inflightInstant));
       }
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 75dd7c0cc50..04e3ab8fe26 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -143,9 +143,9 @@ class TestBaseHoodieWriteClient extends 
HoodieCommonTestHarness {
     HoodieInstant expectedInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieActiveTimeline.COMMIT_ACTION, instantTime, 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
 
     InOrder inOrder = Mockito.inOrder(transactionManager, timeGenerator);
-    inOrder.verify(transactionManager).beginTransaction(Option.empty(), 
Option.empty());
+    inOrder.verify(transactionManager).beginStateChange(Option.empty(), 
Option.empty());
     inOrder.verify(timeGenerator).generateTime(true);
-    
inOrder.verify(transactionManager).endTransaction(Option.of(expectedInstant));
+    
inOrder.verify(transactionManager).endStateChange(Option.of(expectedInstant));
   }
 
   private static class TestWriteClient extends BaseHoodieWriteClient<String, 
String, String, String> {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index a1487042119..4e2bb4dcfe6 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -84,26 +84,26 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
   public void testSingleWriterTransaction() {
     Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
     Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
-    transactionManager.beginTransaction(newTxnOwnerInstant, 
lastCompletedInstant);
-    transactionManager.endTransaction(newTxnOwnerInstant);
+    transactionManager.beginStateChange(newTxnOwnerInstant, 
lastCompletedInstant);
+    transactionManager.endStateChange(newTxnOwnerInstant);
   }
 
   @Test
   public void testSingleWriterNestedTransaction() {
     Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
     Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
-    transactionManager.beginTransaction(newTxnOwnerInstant, 
lastCompletedInstant);
+    transactionManager.beginStateChange(newTxnOwnerInstant, 
lastCompletedInstant);
 
     Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003");
     Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004");
 
     assertThrows(HoodieLockException.class, () -> {
-      transactionManager.beginTransaction(newTxnOwnerInstant1, 
lastCompletedInstant1);
+      transactionManager.beginStateChange(newTxnOwnerInstant1, 
lastCompletedInstant1);
     });
 
-    transactionManager.endTransaction(newTxnOwnerInstant);
+    transactionManager.endStateChange(newTxnOwnerInstant);
     assertDoesNotThrow(() -> {
-      transactionManager.endTransaction(newTxnOwnerInstant1);
+      transactionManager.endStateChange(newTxnOwnerInstant1);
     });
   }
 
@@ -124,7 +124,7 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
     // to join the sync up point.
     Thread writer1 = new Thread(() -> {
       assertDoesNotThrow(() -> {
-        transactionManager.beginTransaction(newTxnOwnerInstant1, 
lastCompletedInstant1);
+        transactionManager.beginStateChange(newTxnOwnerInstant1, 
lastCompletedInstant1);
       });
       latch.countDown();
       try {
@@ -137,7 +137,7 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
         //
       }
       assertDoesNotThrow(() -> {
-        transactionManager.endTransaction(newTxnOwnerInstant1);
+        transactionManager.endStateChange(newTxnOwnerInstant1);
       });
       writer1Completed.set(true);
     });
@@ -153,10 +153,10 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
         //
       }
       assertDoesNotThrow(() -> {
-        transactionManager.beginTransaction(newTxnOwnerInstant2, 
lastCompletedInstant2);
+        transactionManager.beginStateChange(newTxnOwnerInstant2, 
lastCompletedInstant2);
       });
       assertDoesNotThrow(() -> {
-        transactionManager.endTransaction(newTxnOwnerInstant2);
+        transactionManager.endStateChange(newTxnOwnerInstant2);
       });
       writer2Completed.set(true);
     });
@@ -179,17 +179,17 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
   }
 
   @Test
-  public void testEndTransactionByDiffOwner() throws InterruptedException {
+  public void testEndStateChangeByDiffOwner() throws InterruptedException {
     // 1. Begin and end by the same transaction owner
     Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
     Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
-    transactionManager.beginTransaction(newTxnOwnerInstant, 
lastCompletedInstant);
+    transactionManager.beginStateChange(newTxnOwnerInstant, 
lastCompletedInstant);
 
     CountDownLatch countDownLatch = new CountDownLatch(1);
     // Another writer thread
     Thread writer2 = new Thread(() -> {
       Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003");
-      transactionManager.endTransaction(newTxnOwnerInstant1);
+      transactionManager.endStateChange(newTxnOwnerInstant1);
       countDownLatch.countDown();
     });
 
@@ -199,7 +199,7 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
     
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
-    transactionManager.endTransaction(newTxnOwnerInstant);
+    transactionManager.endStateChange(newTxnOwnerInstant);
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
   }
@@ -209,46 +209,46 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
     // 1. Begin and end by the same transaction owner
     Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
     Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
-    transactionManager.beginTransaction(newTxnOwnerInstant, 
lastCompletedInstant);
+    transactionManager.beginStateChange(newTxnOwnerInstant, 
lastCompletedInstant);
     Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == 
newTxnOwnerInstant);
     
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == 
lastCompletedInstant);
-    transactionManager.endTransaction(newTxnOwnerInstant);
+    transactionManager.endStateChange(newTxnOwnerInstant);
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
     // 2. Begin transaction with a new txn owner, but end transaction with 
wrong owner
     lastCompletedInstant = getInstant("0000002");
     newTxnOwnerInstant = getInstant("0000003");
-    transactionManager.beginTransaction(newTxnOwnerInstant, 
lastCompletedInstant);
-    transactionManager.endTransaction(getInstant("0000004"));
+    transactionManager.beginStateChange(newTxnOwnerInstant, 
lastCompletedInstant);
+    transactionManager.endStateChange(getInstant("0000004"));
     // Owner reset would not happen as the end txn was invoked with an 
incorrect current txn owner
     Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == 
newTxnOwnerInstant);
     
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == 
lastCompletedInstant);
-    transactionManager.endTransaction(newTxnOwnerInstant);
+    transactionManager.endStateChange(newTxnOwnerInstant);
 
     // 3. But, we should be able to begin a new transaction for a new owner
     lastCompletedInstant = getInstant("0000003");
     newTxnOwnerInstant = getInstant("0000004");
-    transactionManager.beginTransaction(newTxnOwnerInstant, 
lastCompletedInstant);
+    transactionManager.beginStateChange(newTxnOwnerInstant, 
lastCompletedInstant);
     Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == 
newTxnOwnerInstant);
     
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == 
lastCompletedInstant);
-    transactionManager.endTransaction(newTxnOwnerInstant);
+    transactionManager.endStateChange(newTxnOwnerInstant);
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
     // 4. Transactions with new instants but with same timestamps should 
properly reset owners
-    transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
+    transactionManager.beginStateChange(getInstant("0000005"), Option.empty());
     
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
-    transactionManager.endTransaction(getInstant("0000005"));
+    transactionManager.endStateChange(getInstant("0000005"));
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
     // 6. Transactions with no owners should also go through
-    transactionManager.beginTransaction(Option.empty(), Option.empty());
+    transactionManager.beginStateChange(Option.empty(), Option.empty());
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
-    transactionManager.endTransaction(Option.empty());
+    transactionManager.endStateChange(Option.empty());
     
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
   }
@@ -258,9 +258,9 @@ public class TestTransactionManager extends 
HoodieCommonTestHarness {
   public void testTransactionsWithUncheckedLockProviderRuntimeException() {
     assertThrows(RuntimeException.class, () -> {
       try {
-        transactionManager.beginTransaction(Option.empty(), Option.empty());
+        transactionManager.beginStateChange(Option.empty(), Option.empty());
       } finally {
-        transactionManager.endTransaction(Option.empty());
+        transactionManager.endStateChange(Option.empty());
       }
     });
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index ade05a28ce4..0602fe7e279 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -84,7 +84,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
     List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant compactionInstant = 
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
     try {
-      this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
       finalizeWrite(table, compactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
       // Do not do any conflict resolution here as we do with regular writes. 
We take the lock here to ensure all writes to metadata table happens within a
@@ -93,7 +93,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       LOG.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
     } finally {
-      this.txnManager.endTransaction(Option.of(compactionInstant));
+      this.txnManager.endStateChange(Option.of(compactionInstant));
     }
     WriteMarkersFactory
         .get(config.getMarkersType(), table, compactionCommitTime)
@@ -125,7 +125,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
     }
 
     try {
-      this.txnManager.beginTransaction(Option.of(clusteringInstant), 
Option.empty());
+      this.txnManager.beginStateChange(Option.of(clusteringInstant), 
Option.empty());
       finalizeWrite(table, clusteringCommitTime, writeStats);
       // Only in some cases conflict resolution needs to be performed.
       // So, check if preCommit method that does conflict resolution needs to 
be triggered.
@@ -147,7 +147,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       throw new HoodieClusteringException(
           "Failed to commit " + table.getMetaClient().getBasePath() + " at 
time " + clusteringCommitTime, e);
     } finally {
-      this.txnManager.endTransaction(Option.of(clusteringInstant));
+      this.txnManager.endStateChange(Option.of(clusteringInstant));
     }
 
     WriteMarkersFactory.get(config.getMarkersType(), table, 
clusteringCommitTime)


Reply via email to