This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0fd3c2c6ca3 [HUDI-5878] Optimize the txn metadata for
BaseCommitActionExecutor (#8099)
0fd3c2c6ca3 is described below
commit 0fd3c2c6ca39fb82135efc3042682cb15439779a
Author: Danny Chan <[email protected]>
AuthorDate: Mon Mar 6 19:23:10 2023 +0800
[HUDI-5878] Optimize the txn metadata for BaseCommitActionExecutor (#8099)
---
.../action/commit/BaseCommitActionExecutor.java | 34 +++++++++++++---------
1 file changed, 21 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 1e92f802274..8f3a0244d2e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
@@ -62,6 +63,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -77,9 +79,9 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
protected final Option<Map<String, String>> extraMetadata;
protected final WriteOperationType operationType;
protected final TaskContextSupplier taskContextSupplier;
- protected final TransactionManager txnManager;
- protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxn;
- protected Set<String> pendingInflightAndRequestedInstants;
+ protected final Option<TransactionManager> txnManagerOption;
+ protected final Option<Pair<HoodieInstant, Map<String, String>>>
lastCompletedTxn;
+ protected final Set<String> pendingInflightAndRequestedInstants;
public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table, String
instantTime, WriteOperationType operationType,
@@ -89,11 +91,16 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
this.extraMetadata = extraMetadata;
this.taskContextSupplier = context.getTaskContextSupplier();
// TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
- this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
- this.lastCompletedTxn = txnManager.isOptimisticConcurrencyControlEnabled()
- ?
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()) :
Option.empty();
- this.pendingInflightAndRequestedInstants =
TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
- this.pendingInflightAndRequestedInstants.remove(instantTime);
+ this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new
TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
+ if (this.txnManagerOption.isPresent() &&
this.txnManagerOption.get().isOptimisticConcurrencyControlEnabled()) {
+ // these txn metadata are only needed for auto commit when optimistic
concurrent control is also enabled
+ this.lastCompletedTxn =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
+ this.pendingInflightAndRequestedInstants =
TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
+ this.pendingInflightAndRequestedInstants.remove(instantTime);
+ } else {
+ this.lastCompletedTxn = Option.empty();
+ this.pendingInflightAndRequestedInstants = Collections.emptySet();
+ }
if (!table.getStorageLayout().writeOperationSupported(operationType)) {
throw new UnsupportedOperationException("Executor " +
this.getClass().getSimpleName()
+ " is not compatible with table layout " +
table.getStorageLayout().getClass().getSimpleName());
@@ -180,16 +187,18 @@ public abstract class BaseCommitActionExecutor<T, I, K,
O, R>
protected void autoCommit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<O> result) {
final Option<HoodieInstant> inflightInstant = Option.of(new
HoodieInstant(State.INFLIGHT,
getCommitActionType(), instantTime));
- this.txnManager.beginTransaction(inflightInstant,
+ ValidationUtils.checkState(this.txnManagerOption.isPresent(), "The
transaction manager has not been initialized");
+ TransactionManager txnManager = this.txnManagerOption.get();
+ txnManager.beginTransaction(inflightInstant,
lastCompletedTxn.isPresent() ?
Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
setCommitMetadata(result);
// reload active timeline so as to get all updates after current
transaction have started. hence setting last arg to true.
- TransactionUtils.resolveWriteConflictIfAny(table,
this.txnManager.getCurrentTransactionOwner(),
- result.getCommitMetadata(), config,
this.txnManager.getLastCompletedTransactionOwner(), true,
pendingInflightAndRequestedInstants);
+ TransactionUtils.resolveWriteConflictIfAny(table,
txnManager.getCurrentTransactionOwner(),
+ result.getCommitMetadata(), config,
txnManager.getLastCompletedTransactionOwner(), true,
pendingInflightAndRequestedInstants);
commit(extraMetadata, result);
} finally {
- this.txnManager.endTransaction(inflightInstant);
+ txnManager.endTransaction(inflightInstant);
}
}
@@ -290,5 +299,4 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
+ " write statuses");
}
}
-
}