This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b51a4212d682ffbfb1f789290d777edce8707037
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu May 16 17:54:39 2024 +0800

    [fix](txn insert) Fix txn insert values error when connect to follower fe 
(#34950)
---
 .../insert/BatchInsertIntoTableCommand.java        |  4 +--
 .../trees/plans/commands/insert/InsertUtils.java   | 29 ++++++++++++++++------
 2 files changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
index 6d4431eec27..4399cd57db4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
@@ -33,7 +33,7 @@ import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.Command;
-import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.commands.NoForward;
 import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
 /**
  * insert into values with in txn model.
  */
-public class BatchInsertIntoTableCommand extends Command implements 
ForwardWithSync, Explainable {
+public class BatchInsertIntoTableCommand extends Command implements NoForward, 
Explainable {
 
     public static final Logger LOG = 
LogManager.getLogger(BatchInsertIntoTableCommand.class);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index ad974e9e7bc..8293183eeb2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -24,7 +24,7 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.UserException;
+import org.apache.doris.common.Config;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.nereids.analyzer.UnboundAlias;
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
@@ -54,11 +54,14 @@ import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.qe.MasterTxnExecutor;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TLoadTxnBeginRequest;
+import org.apache.doris.thrift.TLoadTxnBeginResult;
 import org.apache.doris.thrift.TMergeType;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
@@ -182,15 +185,25 @@ public class InsertUtils {
         txnEntry.setDb(dbObj);
         String label = txnEntry.getLabel();
         try {
-            long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
-                    label, new TransactionState.TxnCoordinator(
-                            TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
-                    sourceType, timeoutSecond);
-            txnConf.setTxnId(txnId);
+            long txnId;
             String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+            if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
+                txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
+                        txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
+                        label, new TransactionState.TxnCoordinator(
+                                TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        sourceType, timeoutSecond);
+            } else {
+                MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(ctx);
+                TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
+                
request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token)
+                        
.setLabel(label).setUser("").setUserIp("").setPasswd("");
+                TLoadTxnBeginResult result = 
masterTxnExecutor.beginTxn(request);
+                txnId = result.getTxnId();
+            }
+            txnConf.setTxnId(txnId);
             txnConf.setToken(token);
-        } catch (UserException e) {
+        } catch (Exception e) {
             throw new AnalysisException(e.getMessage(), e);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to