This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2db999a1a25 [MINOR] Renaming TransactionManager methods to begin/end x
StateChange (#13387)
2db999a1a25 is described below
commit 2db999a1a253fb222f1e4a397e46f92f2b1a846d
Author: vinoth chandar <[email protected]>
AuthorDate: Mon Jun 2 16:55:14 2025 -0700
[MINOR] Renaming TransactionManager methods to begin/end x StateChange
(#13387)
* [MINOR] Renaming TransactionManager methods to begin/end x StateChange
- begin/end Transaction is confusing.
- Naming aligns with how these methods are called, whenever action state
changes
* Log message cleanup
---
.../hudi/client/BaseHoodieTableServiceClient.java | 24 +++++-----
.../apache/hudi/client/BaseHoodieWriteClient.java | 24 +++++-----
.../timeline/versioning/v1/TimelineArchiverV1.java | 4 +-
.../timeline/versioning/v2/TimelineArchiverV2.java | 4 +-
.../DirectMarkerTransactionManager.java | 2 +-
.../client/transaction/TransactionManager.java | 40 ++++++++--------
.../table/action/clean/CleanActionExecutor.java | 4 +-
.../action/commit/BaseCommitActionExecutor.java | 4 +-
.../action/index/AbstractIndexingCatchupTask.java | 4 +-
.../table/action/index/RunIndexActionExecutor.java | 4 +-
.../action/restore/BaseRestoreActionExecutor.java | 4 +-
.../rollback/BaseRollbackActionExecutor.java | 4 +-
.../hudi/client/TestBaseHoodieWriteClient.java | 4 +-
.../client/transaction/TestTransactionManager.java | 54 +++++++++++-----------
.../hudi/client/HoodieFlinkTableServiceClient.java | 8 ++--
15 files changed, 94 insertions(+), 94 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 54898d1123b..1a4e21c6242 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -349,7 +349,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
final HoodieInstant compactionInstant =
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
try {
- this.txnManager.beginTransaction(Option.of(compactionInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, compactionCommitTime, metadata);
@@ -357,7 +357,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
LOG.debug("Compaction {} finished with result: {}",
compactionCommitTime, metadata);
} finally {
- this.txnManager.endTransaction(Option.of(compactionInstant));
+ this.txnManager.endStateChange(Option.of(compactionInstant));
releaseResources(compactionCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
@@ -410,7 +410,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
final HoodieInstant logCompactionInstant =
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION,
logCompactionCommitTime);
try {
- this.txnManager.beginTransaction(Option.of(logCompactionInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(logCompactionInstant),
Option.empty());
preCommit(metadata);
finalizeWrite(table, logCompactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
@@ -419,7 +419,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
CompactHelpers.getInstance().completeInflightLogCompaction(table,
logCompactionCommitTime, metadata);
LOG.debug("Log Compaction {} finished with result {}",
logCompactionCommitTime, metadata);
} finally {
- this.txnManager.endTransaction(Option.of(logCompactionInstant));
+ this.txnManager.endStateChange(Option.of(logCompactionInstant));
releaseResources(logCompactionCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionCommitTime)
@@ -537,7 +537,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
final HoodieInstant clusteringInstant =
ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
table.getActiveTimeline(),
table.getMetaClient().getInstantGenerator()).get();
try {
- this.txnManager.beginTransaction(Option.of(clusteringInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(clusteringInstant),
Option.empty());
finalizeWrite(table, clusteringCommitTime, writeStats);
// Only in some cases conflict resolution needs to be performed.
@@ -555,7 +555,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering
inflight to complete: " + clusteringCommitTime, e);
} finally {
- this.txnManager.endTransaction(Option.of(clusteringInstant));
+ this.txnManager.endStateChange(Option.of(clusteringInstant));
releaseResources(clusteringCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table,
clusteringCommitTime)
@@ -652,7 +652,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
// so it is handled differently to avoid locking for planning.
return scheduleCleaning(createTable(config, storageConf),
providedInstantTime);
}
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
try {
Option<String> option;
HoodieTable<?, ?, ?, ?> table = createTable(config, storageConf);
@@ -688,7 +688,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
return option;
} finally {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
@@ -817,7 +817,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
private Option<String> scheduleCleaning(HoodieTable<?, ?, ?, ?> table,
Option<String> suppliedCleanInstant) {
Option<HoodieCleanerPlan> cleanerPlan = table.createCleanerPlan(context,
Option.empty());
if (cleanerPlan.isPresent()) {
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
try {
String cleanInstantTime = suppliedCleanInstant.orElseGet(() ->
createNewInstantTime(false));
final HoodieInstant cleanInstant =
table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, cleanInstantTime);
@@ -833,7 +833,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
LOG.error("Got exception when saving cleaner requested file", e);
throw e;
} finally {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
return Option.empty();
@@ -1121,14 +1121,14 @@ public abstract class BaseHoodieTableServiceClient<I,
T, O> extends BaseHoodieCl
return false;
}
if (!skipLocking) {
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
} finally {
if (!skipLocking) {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 80f12c1357e..7a298e6ae45 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -252,7 +252,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
extraMetadata, operationType, config.getWriteSchema(),
commitActionType));
HoodieInstant inflightInstant =
table.getMetaClient().createNewInstant(State.INFLIGHT, commitActionType,
instantTime);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table,
heartbeatClient, config);
- this.txnManager.beginTransaction(Option.of(inflightInstant),
+ this.txnManager.beginStateChange(Option.of(inflightInstant),
lastCompletedTxnAndMetadata.isPresent() ?
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(metadata);
@@ -265,7 +265,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime, e);
} finally {
- this.txnManager.endTransaction(Option.of(inflightInstant));
+ this.txnManager.endStateChange(Option.of(inflightInstant));
releaseResources(instantTime);
}
@@ -859,12 +859,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
if (failedRestore.isPresent() &&
savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table,
failedRestore.get()))) {
return Pair.of(failedRestore.get().requestedTime(),
Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(),
failedRestore.get())));
}
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
try {
final String restoreInstantTimestamp = createNewInstantTime();
return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context,
restoreInstantTimestamp, savepointToRestoreTimestamp));
} finally {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
@@ -969,7 +969,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
- txnManager.beginTransaction(Option.empty(),
lastCompletedTxnAndMetadata.map(Pair::getLeft));
+ txnManager.beginStateChange(Option.empty(),
lastCompletedTxnAndMetadata.map(Pair::getLeft));
String instantTime;
HoodieInstant instant = null;
try {
@@ -992,7 +992,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
metaClient.getActiveTimeline().createNewInstant(instant);
}
} finally {
- txnManager.endTransaction(Option.ofNullable(instant));
+ txnManager.endStateChange(Option.ofNullable(instant));
}
return instantTime;
}
@@ -1024,14 +1024,14 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType>
partitionTypes, List<String> partitionPaths) {
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
try {
String instantTime = createNewInstantTime(false);
Option<HoodieIndexPlan> indexPlan = createTable(config)
.scheduleIndexing(context, instantTime, partitionTypes,
partitionPaths);
return indexPlan.map(plan -> instantTime);
} finally {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
@@ -1054,7 +1054,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
HoodieTable table = createTable(config);
String dropInstant = createNewInstantTime();
HoodieInstant ownerInstant =
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.INDEXING_ACTION, dropInstant);
- this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
+ this.txnManager.beginStateChange(Option.of(ownerInstant), Option.empty());
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping
partitions from metadata table: " + config.getTableName());
HoodieTableMetaClient metaClient = table.getMetaClient();
@@ -1087,7 +1087,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
}
}
} finally {
- this.txnManager.endTransaction(Option.of(ownerInstant));
+ this.txnManager.endStateChange(Option.of(ownerInstant));
}
}
@@ -1298,11 +1298,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
}
private void executeUsingTxnManager(Option<HoodieInstant> ownerInstant,
Runnable r) {
- this.txnManager.beginTransaction(ownerInstant, Option.empty());
+ this.txnManager.beginStateChange(ownerInstant, Option.empty());
try {
r.run();
} finally {
- this.txnManager.endTransaction(ownerInstant);
+ this.txnManager.endStateChange(ownerInstant);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index c0aa11a2aea..8da69585b04 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -139,7 +139,7 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
try {
if (acquireLock) {
// there is no owner or instant time per se for archival.
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
}
List<HoodieInstant> instantsToArchive = getInstantsToArchive();
if (!instantsToArchive.isEmpty()) {
@@ -156,7 +156,7 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
} finally {
close();
if (acquireLock) {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index 65e08920ae4..e18617232e5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -95,7 +95,7 @@ public class TimelineArchiverV2<T extends HoodieAvroPayload,
I, K, O> implements
try {
if (acquireLock) {
// there is no owner or instant time per se for archival.
- txnManager.beginTransaction(Option.empty(), Option.empty());
+ txnManager.beginStateChange(Option.empty(), Option.empty());
}
} catch (HoodieLockException e) {
LOG.error("Fail to begin transaction", e);
@@ -123,7 +123,7 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
return instantsToArchive.size();
} finally {
if (acquireLock) {
- txnManager.endTransaction(Option.empty());
+ txnManager.endStateChange(Option.empty());
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index d75d0a648dc..02b027f12d3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -51,7 +51,7 @@ public class DirectMarkerTransactionManager extends
TransactionManager {
LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and "
+ filePath);
lockManager.lock();
- reset(currentTxnOwnerInstant,
Option.of(getInstant(newTxnOwnerInstantTime, instantGenerator)),
Option.empty());
+ reset(changeActionInstant, Option.of(getInstant(newTxnOwnerInstantTime,
instantGenerator)), Option.empty());
LOG.info("Transaction started for " + newTxnOwnerInstantTime + " and " +
filePath);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index b725243fca0..da1f6639752 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -38,8 +38,8 @@ public class TransactionManager implements Serializable {
protected static final Logger LOG =
LoggerFactory.getLogger(TransactionManager.class);
protected final LockManager lockManager;
protected final boolean isLockRequired;
- protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
- private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
+ protected Option<HoodieInstant> changeActionInstant = Option.empty();
+ private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
this(new LockManager(config, storage), config.isLockRequired());
@@ -50,34 +50,34 @@ public class TransactionManager implements Serializable {
this.isLockRequired = isLockRequired;
}
- public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
- Option<HoodieInstant>
lastCompletedTxnOwnerInstant) {
+ public void beginStateChange(Option<HoodieInstant> changeActionInstant,
+ Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
- LOG.info("Transaction starting for " + newTxnOwnerInstant
- + " with latest completed transaction instant " +
lastCompletedTxnOwnerInstant);
+ LOG.info("State change starting for {} with latest completed action
instant {}",
+ changeActionInstant, lastCompletedActionInstant);
lockManager.lock();
- reset(currentTxnOwnerInstant, newTxnOwnerInstant,
lastCompletedTxnOwnerInstant);
- LOG.info("Transaction started for " + newTxnOwnerInstant
- + " with latest completed transaction instant " +
lastCompletedTxnOwnerInstant);
+ reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
+ LOG.info("State change started for {} with latest completed action
instant {}",
+ changeActionInstant, lastCompletedActionInstant);
}
}
- public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
+ public void endStateChange(Option<HoodieInstant> changeActionInstant) {
if (isLockRequired) {
- LOG.info("Transaction ending with transaction owner " +
currentTxnOwnerInstant);
- if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
+ LOG.info("State change ending for action instant {}",
changeActionInstant);
+ if (reset(changeActionInstant, Option.empty(), Option.empty())) {
lockManager.unlock();
- LOG.info("Transaction ended with transaction owner " +
currentTxnOwnerInstant);
+ LOG.info("State change ended for action instant {}",
changeActionInstant);
}
}
}
protected synchronized boolean reset(Option<HoodieInstant> callerInstant,
- Option<HoodieInstant>
newTxnOwnerInstant,
- Option<HoodieInstant>
lastCompletedTxnOwnerInstant) {
- if (!this.currentTxnOwnerInstant.isPresent() ||
this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
- this.currentTxnOwnerInstant = newTxnOwnerInstant;
- this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
+ Option<HoodieInstant>
changeActionInstant,
+ Option<HoodieInstant>
lastCompletedActionInstant) {
+ if (!this.changeActionInstant.isPresent() ||
this.changeActionInstant.get().equals(callerInstant.get())) {
+ this.changeActionInstant = changeActionInstant;
+ this.lastCompletedActionInstant = lastCompletedActionInstant;
return true;
}
return false;
@@ -95,11 +95,11 @@ public class TransactionManager implements Serializable {
}
public Option<HoodieInstant> getLastCompletedTransactionOwner() {
- return lastCompletedTxnOwnerInstant;
+ return lastCompletedActionInstant;
}
public Option<HoodieInstant> getCurrentTransactionOwner() {
- return currentTxnOwnerInstant;
+ return changeActionInstant;
}
public boolean isLockRequired() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index d37c40960c9..bed24daa7e7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -220,13 +220,13 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
cleanStats,
cleanerPlan.getExtraMetadata()
);
- this.txnManager.beginTransaction(Option.of(inflightInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(inflightInstant),
Option.empty());
writeTableMetadata(metadata, inflightInstant.requestedTime());
table.getActiveTimeline().transitionCleanInflightToComplete(false,
inflightInstant, Option.of(metadata));
LOG.info("Marked clean started on {} as complete",
inflightInstant.requestedTime());
return metadata;
} finally {
- this.txnManager.endTransaction(Option.ofNullable(inflightInstant));
+ this.txnManager.endStateChange(Option.ofNullable(inflightInstant));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index dd547b7c03c..549afc7d90d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -199,7 +199,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
getCommitActionType(), instantTime));
ValidationUtils.checkState(this.txnManagerOption.isPresent(), "The
transaction manager has not been initialized");
TransactionManager txnManager = this.txnManagerOption.get();
- txnManager.beginTransaction(inflightInstant,
+ txnManager.beginStateChange(inflightInstant,
lastCompletedTxn.isPresent() ?
Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
setCommitMetadata(result);
@@ -208,7 +208,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
result.getCommitMetadata(), config,
txnManager.getLastCompletedTransactionOwner(), false,
pendingInflightAndRequestedInstants);
commit(result);
} finally {
- txnManager.endTransaction(inflightInstant);
+ txnManager.endStateChange(inflightInstant);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
index c5b22e56709..45b51256711 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
@@ -107,7 +107,7 @@ public abstract class AbstractIndexingCatchupTask
implements IndexingCatchupTask
}
try {
// we need take a lock here as inflight writer could also try to
update the timeline
- transactionManager.beginTransaction(Option.of(instant),
Option.empty());
+ transactionManager.beginStateChange(Option.of(instant),
Option.empty());
LOG.info("Updating metadata table for instant: " + instant);
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
@@ -133,7 +133,7 @@ public abstract class AbstractIndexingCatchupTask
implements IndexingCatchupTask
} catch (IOException e) {
throw new HoodieIndexException(String.format("Could not update
metadata partition for instant: %s", instant), e);
} finally {
- transactionManager.endTransaction(Option.of(instant));
+ transactionManager.endStateChange(Option.of(instant));
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index bceb50d8417..fe02732c3da 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -266,13 +266,13 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
HoodieIndexCommitMetadata
indexCommitMetadata) throws IOException {
try {
// update the table config and timeline in a lock as there could be
another indexer running
- txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
+ txnManager.beginStateChange(Option.of(indexInstant), Option.empty());
updateMetadataPartitionsTableConfig(table.getMetaClient(),
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
table.getActiveTimeline().saveAsComplete(false,
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT,
INDEXING_ACTION, indexInstant.requestedTime()),
Option.of(indexCommitMetadata));
} finally {
- txnManager.endTransaction(Option.of(indexInstant));
+ txnManager.endStateChange(Option.of(indexInstant));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 831fe0616d6..36b4aae550e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -152,10 +152,10 @@ public abstract class BaseRestoreActionExecutor<T, I, K,
O> extends BaseActionEx
*/
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata,
HoodieInstant restoreInflightInstant) {
try {
- this.txnManager.beginTransaction(Option.of(restoreInflightInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(restoreInflightInstant),
Option.empty());
writeTableMetadata(restoreMetadata);
} finally {
- this.txnManager.endTransaction(Option.of(restoreInflightInstant));
+ this.txnManager.endStateChange(Option.of(restoreInflightInstant));
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 2f22d27bd8a..2c17de655bc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -270,7 +270,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
boolean enableLocking = (!skipLocking && !skipTimelinePublish);
try {
if (enableLocking) {
- this.txnManager.beginTransaction(Option.of(inflightInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(inflightInstant),
Option.empty());
}
// If publish the rollback to the timeline, we first write the rollback
metadata to metadata table
@@ -293,7 +293,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
}
} finally {
if (enableLocking) {
- this.txnManager.endTransaction(Option.of(inflightInstant));
+ this.txnManager.endStateChange(Option.of(inflightInstant));
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 75dd7c0cc50..04e3ab8fe26 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -143,9 +143,9 @@ class TestBaseHoodieWriteClient extends
HoodieCommonTestHarness {
HoodieInstant expectedInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieActiveTimeline.COMMIT_ACTION, instantTime,
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
InOrder inOrder = Mockito.inOrder(transactionManager, timeGenerator);
- inOrder.verify(transactionManager).beginTransaction(Option.empty(),
Option.empty());
+ inOrder.verify(transactionManager).beginStateChange(Option.empty(),
Option.empty());
inOrder.verify(timeGenerator).generateTime(true);
-
inOrder.verify(transactionManager).endTransaction(Option.of(expectedInstant));
+
inOrder.verify(transactionManager).endStateChange(Option.of(expectedInstant));
}
private static class TestWriteClient extends BaseHoodieWriteClient<String,
String, String, String> {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index a1487042119..4e2bb4dcfe6 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -84,26 +84,26 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
public void testSingleWriterTransaction() {
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
- transactionManager.beginTransaction(newTxnOwnerInstant,
lastCompletedInstant);
- transactionManager.endTransaction(newTxnOwnerInstant);
+ transactionManager.beginStateChange(newTxnOwnerInstant,
lastCompletedInstant);
+ transactionManager.endStateChange(newTxnOwnerInstant);
}
@Test
public void testSingleWriterNestedTransaction() {
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
- transactionManager.beginTransaction(newTxnOwnerInstant,
lastCompletedInstant);
+ transactionManager.beginStateChange(newTxnOwnerInstant,
lastCompletedInstant);
Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003");
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004");
assertThrows(HoodieLockException.class, () -> {
- transactionManager.beginTransaction(newTxnOwnerInstant1,
lastCompletedInstant1);
+ transactionManager.beginStateChange(newTxnOwnerInstant1,
lastCompletedInstant1);
});
- transactionManager.endTransaction(newTxnOwnerInstant);
+ transactionManager.endStateChange(newTxnOwnerInstant);
assertDoesNotThrow(() -> {
- transactionManager.endTransaction(newTxnOwnerInstant1);
+ transactionManager.endStateChange(newTxnOwnerInstant1);
});
}
@@ -124,7 +124,7 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
// to join the sync up point.
Thread writer1 = new Thread(() -> {
assertDoesNotThrow(() -> {
- transactionManager.beginTransaction(newTxnOwnerInstant1,
lastCompletedInstant1);
+ transactionManager.beginStateChange(newTxnOwnerInstant1,
lastCompletedInstant1);
});
latch.countDown();
try {
@@ -137,7 +137,7 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
//
}
assertDoesNotThrow(() -> {
- transactionManager.endTransaction(newTxnOwnerInstant1);
+ transactionManager.endStateChange(newTxnOwnerInstant1);
});
writer1Completed.set(true);
});
@@ -153,10 +153,10 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
//
}
assertDoesNotThrow(() -> {
- transactionManager.beginTransaction(newTxnOwnerInstant2,
lastCompletedInstant2);
+ transactionManager.beginStateChange(newTxnOwnerInstant2,
lastCompletedInstant2);
});
assertDoesNotThrow(() -> {
- transactionManager.endTransaction(newTxnOwnerInstant2);
+ transactionManager.endStateChange(newTxnOwnerInstant2);
});
writer2Completed.set(true);
});
@@ -179,17 +179,17 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
}
@Test
- public void testEndTransactionByDiffOwner() throws InterruptedException {
+ public void testEndStateChangeByDiffOwner() throws InterruptedException {
// 1. Begin and end by the same transaction owner
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
- transactionManager.beginTransaction(newTxnOwnerInstant,
lastCompletedInstant);
+ transactionManager.beginStateChange(newTxnOwnerInstant,
lastCompletedInstant);
CountDownLatch countDownLatch = new CountDownLatch(1);
// Another writer thread
Thread writer2 = new Thread(() -> {
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003");
- transactionManager.endTransaction(newTxnOwnerInstant1);
+ transactionManager.endStateChange(newTxnOwnerInstant1);
countDownLatch.countDown();
});
@@ -199,7 +199,7 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent());
- transactionManager.endTransaction(newTxnOwnerInstant);
+ transactionManager.endStateChange(newTxnOwnerInstant);
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
}
@@ -209,46 +209,46 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
// 1. Begin and end by the same transaction owner
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
- transactionManager.beginTransaction(newTxnOwnerInstant,
lastCompletedInstant);
+ transactionManager.beginStateChange(newTxnOwnerInstant,
lastCompletedInstant);
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() ==
newTxnOwnerInstant);
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() ==
lastCompletedInstant);
- transactionManager.endTransaction(newTxnOwnerInstant);
+ transactionManager.endStateChange(newTxnOwnerInstant);
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
// 2. Begin transaction with a new txn owner, but end transaction with
wrong owner
lastCompletedInstant = getInstant("0000002");
newTxnOwnerInstant = getInstant("0000003");
- transactionManager.beginTransaction(newTxnOwnerInstant,
lastCompletedInstant);
- transactionManager.endTransaction(getInstant("0000004"));
+ transactionManager.beginStateChange(newTxnOwnerInstant,
lastCompletedInstant);
+ transactionManager.endStateChange(getInstant("0000004"));
// Owner reset would not happen as the end txn was invoked with an
incorrect current txn owner
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() ==
newTxnOwnerInstant);
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() ==
lastCompletedInstant);
- transactionManager.endTransaction(newTxnOwnerInstant);
+ transactionManager.endStateChange(newTxnOwnerInstant);
// 3. But, we should be able to begin a new transaction for a new owner
lastCompletedInstant = getInstant("0000003");
newTxnOwnerInstant = getInstant("0000004");
- transactionManager.beginTransaction(newTxnOwnerInstant,
lastCompletedInstant);
+ transactionManager.beginStateChange(newTxnOwnerInstant,
lastCompletedInstant);
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() ==
newTxnOwnerInstant);
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() ==
lastCompletedInstant);
- transactionManager.endTransaction(newTxnOwnerInstant);
+ transactionManager.endStateChange(newTxnOwnerInstant);
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
// 4. Transactions with new instants but with same timestamps should
properly reset owners
- transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
+ transactionManager.beginStateChange(getInstant("0000005"), Option.empty());
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
- transactionManager.endTransaction(getInstant("0000005"));
+ transactionManager.endStateChange(getInstant("0000005"));
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
// 6. Transactions with no owners should also go through
- transactionManager.beginTransaction(Option.empty(), Option.empty());
+ transactionManager.beginStateChange(Option.empty(), Option.empty());
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
- transactionManager.endTransaction(Option.empty());
+ transactionManager.endStateChange(Option.empty());
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
}
@@ -258,9 +258,9 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
public void testTransactionsWithUncheckedLockProviderRuntimeException() {
assertThrows(RuntimeException.class, () -> {
try {
- transactionManager.beginTransaction(Option.empty(), Option.empty());
+ transactionManager.beginStateChange(Option.empty(), Option.empty());
} finally {
- transactionManager.endTransaction(Option.empty());
+ transactionManager.endStateChange(Option.empty());
}
});
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index ade05a28ce4..0602fe7e279 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -84,7 +84,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant =
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
try {
- this.txnManager.beginTransaction(Option.of(compactionInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
@@ -93,7 +93,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
LOG.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
} finally {
- this.txnManager.endTransaction(Option.of(compactionInstant));
+ this.txnManager.endStateChange(Option.of(compactionInstant));
}
WriteMarkersFactory
.get(config.getMarkersType(), table, compactionCommitTime)
@@ -125,7 +125,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
}
try {
- this.txnManager.beginTransaction(Option.of(clusteringInstant),
Option.empty());
+ this.txnManager.beginStateChange(Option.of(clusteringInstant),
Option.empty());
finalizeWrite(table, clusteringCommitTime, writeStats);
// Only in some cases conflict resolution needs to be performed.
// So, check if preCommit method that does conflict resolution needs to
be triggered.
@@ -147,7 +147,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
throw new HoodieClusteringException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at
time " + clusteringCommitTime, e);
} finally {
- this.txnManager.endTransaction(Option.of(clusteringInstant));
+ this.txnManager.endStateChange(Option.of(clusteringInstant));
}
WriteMarkersFactory.get(config.getMarkersType(), table,
clusteringCommitTime)