This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fd3c2c6ca3 [HUDI-5878] Optimize the txn metadata for 
BaseCommitActionExecutor (#8099)
0fd3c2c6ca3 is described below

commit 0fd3c2c6ca39fb82135efc3042682cb15439779a
Author: Danny Chan <[email protected]>
AuthorDate: Mon Mar 6 19:23:10 2023 +0800

    [HUDI-5878] Optimize the txn metadata for BaseCommitActionExecutor (#8099)
---
 .../action/commit/BaseCommitActionExecutor.java    | 34 +++++++++++++---------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 1e92f802274..8f3a0244d2e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
@@ -62,6 +63,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -77,9 +79,9 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
   protected final Option<Map<String, String>> extraMetadata;
   protected final WriteOperationType operationType;
   protected final TaskContextSupplier taskContextSupplier;
-  protected final TransactionManager txnManager;
-  protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxn;
-  protected Set<String> pendingInflightAndRequestedInstants;
+  protected final Option<TransactionManager> txnManagerOption;
+  protected final Option<Pair<HoodieInstant, Map<String, String>>> 
lastCompletedTxn;
+  protected final Set<String> pendingInflightAndRequestedInstants;
 
   public BaseCommitActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config,
                                   HoodieTable<T, I, K, O> table, String 
instantTime, WriteOperationType operationType,
@@ -89,11 +91,16 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
     this.extraMetadata = extraMetadata;
     this.taskContextSupplier = context.getTaskContextSupplier();
     // TODO : Remove this once we refactor and move out autoCommit method from 
here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
-    this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
-    this.lastCompletedTxn = txnManager.isOptimisticConcurrencyControlEnabled() 
-        ? 
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()) : 
Option.empty();
-    this.pendingInflightAndRequestedInstants = 
TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
-    this.pendingInflightAndRequestedInstants.remove(instantTime);
+    this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new 
TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
+    if (this.txnManagerOption.isPresent() && 
this.txnManagerOption.get().isOptimisticConcurrencyControlEnabled()) {
+      // these txn metadata are only needed for auto commit when optimistic 
concurrent control is also enabled
+      this.lastCompletedTxn = 
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
+      this.pendingInflightAndRequestedInstants = 
TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
+      this.pendingInflightAndRequestedInstants.remove(instantTime);
+    } else {
+      this.lastCompletedTxn = Option.empty();
+      this.pendingInflightAndRequestedInstants = Collections.emptySet();
+    }
     if (!table.getStorageLayout().writeOperationSupported(operationType)) {
       throw new UnsupportedOperationException("Executor " + 
this.getClass().getSimpleName()
           + " is not compatible with table layout " + 
table.getStorageLayout().getClass().getSimpleName());
@@ -180,16 +187,18 @@ public abstract class BaseCommitActionExecutor<T, I, K, 
O, R>
   protected void autoCommit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<O> result) {
     final Option<HoodieInstant> inflightInstant = Option.of(new 
HoodieInstant(State.INFLIGHT,
         getCommitActionType(), instantTime));
-    this.txnManager.beginTransaction(inflightInstant,
+    ValidationUtils.checkState(this.txnManagerOption.isPresent(), "The 
transaction manager has not been initialized");
+    TransactionManager txnManager = this.txnManagerOption.get();
+    txnManager.beginTransaction(inflightInstant,
         lastCompletedTxn.isPresent() ? 
Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
     try {
       setCommitMetadata(result);
       // reload active timeline so as to get all updates after current 
transaction have started. hence setting last arg to true.
-      TransactionUtils.resolveWriteConflictIfAny(table, 
this.txnManager.getCurrentTransactionOwner(),
-          result.getCommitMetadata(), config, 
this.txnManager.getLastCompletedTransactionOwner(), true, 
pendingInflightAndRequestedInstants);
+      TransactionUtils.resolveWriteConflictIfAny(table, 
txnManager.getCurrentTransactionOwner(),
+          result.getCommitMetadata(), config, 
txnManager.getLastCompletedTransactionOwner(), true, 
pendingInflightAndRequestedInstants);
       commit(extraMetadata, result);
     } finally {
-      this.txnManager.endTransaction(inflightInstant);
+      txnManager.endTransaction(inflightInstant);
     }
   }
 
@@ -290,5 +299,4 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
           + " write statuses");
     }
   }
-
 }

Reply via email to