This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b51a4212d682ffbfb1f789290d777edce8707037 Author: meiyi <myime...@gmail.com> AuthorDate: Thu May 16 17:54:39 2024 +0800 [fix](txn insert) Fix txn insert values error when connect to follower fe (#34950) --- .../insert/BatchInsertIntoTableCommand.java | 4 +-- .../trees/plans/commands/insert/InsertUtils.java | 29 ++++++++++++++++------ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java index 6d4431eec27..4399cd57db4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java @@ -33,7 +33,7 @@ import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.Command; -import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; +import org.apache.doris.nereids.trees.plans.commands.NoForward; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -59,7 +59,7 @@ import java.util.stream.Collectors; /** * insert into values with in txn model. */ -public class BatchInsertIntoTableCommand extends Command implements ForwardWithSync, Explainable { +public class BatchInsertIntoTableCommand extends Command implements NoForward, Explainable { public static final Logger LOG = LogManager.getLogger(BatchInsertIntoTableCommand.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index ad974e9e7bc..8293183eeb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.UserException; +import org.apache.doris.common.Config; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; @@ -54,11 +54,14 @@ import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.qe.MasterTxnExecutor; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TLoadTxnBeginRequest; +import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TMergeType; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; @@ -182,15 +185,25 @@ public class InsertUtils { txnEntry.setDb(dbObj); String label = txnEntry.getLabel(); try { - long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - sourceType, timeoutSecond); - txnConf.setTxnId(txnId); + long txnId; String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) { + txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), + label, new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + sourceType, timeoutSecond); + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx); + TLoadTxnBeginRequest request = new TLoadTxnBeginRequest(); + request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token) + .setLabel(label).setUser("").setUserIp("").setPasswd(""); + TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request); + txnId = result.getTxnId(); + } + txnConf.setTxnId(txnId); txnConf.setToken(token); - } catch (UserException e) { + } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org