This is an automated email from the ASF dual-hosted git repository. liulijia pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d2f5d6202f8 [fix](Prepared Statment) Fix exec prepared insert stmt in non master error (#48740) d2f5d6202f8 is described below commit d2f5d6202f8c1c5470be7f957d77a7d14db5cc72 Author: Lijia Liu <liutang...@yeah.net> AuthorDate: Mon Mar 10 11:09:46 2025 +0800 [fix](Prepared Statment) Fix exec prepared insert stmt in non master error (#48740) ### What problem does this PR solve? bp #48689 for 2.1 Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --------- Co-authored-by: lucianljliu <lucianlj...@tencent.com> Co-authored-by: liulijia <liulijia1...@gmail.com> --- .../trees/plans/commands/PrepareCommand.java | 2 +- .../java/org/apache/doris/qe/ConnectContext.java | 7 +++++ .../java/org/apache/doris/qe/ConnectProcessor.java | 28 +++++++++++++++++-- .../java/org/apache/doris/qe/MasterOpExecutor.java | 7 +++++ .../org/apache/doris/qe/MysqlConnectProcessor.java | 31 +++++++++++++++++++--- .../java/org/apache/doris/qe/StmtExecutor.java | 12 +++++++-- gensrc/thrift/FrontendService.thrift | 1 + 7 files changed, 80 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java index a844dcb9500..ebddcd68845 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java @@ -109,7 +109,7 @@ public class PrepareCommand extends Command { } ctx.addPreparedStatementContext(name, new PreparedStatementContext(this, ctx, ctx.getStatementContext(), name)); - if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE) { + if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE && !ctx.isProxy()) { executor.sendStmtPrepareOK(Integer.parseInt(name), labels); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index a82d5e09299..c677edf5d6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -69,6 +69,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.netty.util.concurrent.FastThreadLocal; +import lombok.Getter; +import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -76,6 +78,7 @@ import org.json.JSONObject; import org.xnio.StreamConnection; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -239,6 +242,10 @@ public class ConnectContext { // it's default thread-safe private boolean isProxy = false; + @Getter + @Setter + private ByteBuffer prepareExecuteBuffer; + private MysqlHandshakePacket mysqlHandshakePacket; public void setUserQueryTimeout(int queryTimeout) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index cc75c72ec6f..85104262014 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -61,6 +61,7 @@ import org.apache.doris.nereids.parser.Dialect; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; +import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; import org.apache.doris.plugin.DialectConverterPlugin; @@ -87,6 +88,7 @@ import org.apache.thrift.TException; import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -693,7 +695,6 @@ public abstract class ConnectProcessor { ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser)); ctx.setThreadLocalInfo(); - StmtExecutor executor = null; try { // 0 for compatibility. int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0; @@ -722,7 +723,25 @@ public abstract class ConnectProcessor { queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } - executor.execute(queryId); + if (request.isSetPrepareExecuteBuffer()) { + ctx.setCommand(MysqlCommand.COM_STMT_PREPARE); + executor.execute(); + ctx.setCommand(MysqlCommand.COM_STMT_EXECUTE); + String preparedStmtId = executor.getPrepareStmtName(); + PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(preparedStmtId); + if (preparedStatementContext == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Something error, just support nereids preparedStmtId:{}", preparedStmtId); + } + throw new RuntimeException("Prepare failed when proxy execute"); + } + handleExecute(preparedStatementContext.command, Long.parseLong(preparedStmtId), + preparedStatementContext, + ByteBuffer.wrap(request.getPrepareExecuteBuffer()).order(ByteOrder.LITTLE_ENDIAN), queryId); + } else { + executor.execute(queryId); + } + } catch (IOException e) { // Client failed. LOG.warn("Process one query failed because IOException: ", e); @@ -772,6 +791,11 @@ public abstract class ConnectProcessor { throw new NotImplementedException("Not Impl processOnce"); } + protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx, + ByteBuffer packetBuf, TUniqueId queryId) { + throw new NotSupportedException("Just MysqlConnectProcessor support execute"); + } + private Map<String, LiteralExpr> userVariableFromThrift(Map<String, TExprNode> thriftMap) throws TException { try { Map<String, LiteralExpr> userVariables = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 23f11f41173..3cf5f2edc4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; +import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprNode; @@ -183,6 +184,12 @@ public class MasterOpExecutor { if (null != ctx.queryId()) { params.setQueryId(ctx.queryId()); } + if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) { + if (null != ctx.getPrepareExecuteBuffer()) { + params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer()); + } + } + return params; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java index f40bae578a4..d6304b8c844 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java @@ -46,6 +46,7 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.PlaceholderId; import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand; import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -170,7 +171,18 @@ public class MysqlConnectProcessor extends ConnectProcessor { } } - private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx) { + private String getHexStr(ByteBuffer packetBuf) { + byte[] bytes = packetBuf.array(); + StringBuilder hex = new StringBuilder(); + for (int i = packetBuf.position(); i < packetBuf.limit(); ++i) { + hex.append(String.format("%02X ", bytes[i])); + } + return hex.toString(); + } + + @Override + protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx, + ByteBuffer packetBuf, TUniqueId queryId) { int paramCount = prepareCommand.placeholderCount(); LOG.debug("execute prepared statement {}, paramCount {}", stmtId, paramCount); // null bitmap @@ -178,6 +190,13 @@ public class MysqlConnectProcessor extends ConnectProcessor { try { StatementContext statementContext = prepCtx.statementContext; if (paramCount > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("execute param buf: {}, array: {}", packetBuf, getHexStr(packetBuf)); + } + if (!ctx.isProxy()) { + ctx.setPrepareExecuteBuffer(packetBuf.duplicate()); + } + byte[] nullbitmapData = new byte[(paramCount + 7) / 8]; packetBuf.get(nullbitmapData); // new_params_bind_flag @@ -218,7 +237,12 @@ public class MysqlConnectProcessor extends ConnectProcessor { stmt.setOrigStmt(prepareCommand.getOriginalStmt()); executor = new StmtExecutor(ctx, stmt); ctx.setExecutor(executor); - executor.execute(); + if (null != queryId) { + executor.execute(queryId); + } else { + executor.execute(); + } + if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) { stmtStr = executeStmt.toSql(); stmtStr = stmtStr + " /*originalSql = " + prepareCommand.getOriginalStmt().originStmt + "*/"; @@ -233,6 +257,7 @@ public class MysqlConnectProcessor extends ConnectProcessor { if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) { auditAfterExec(stmtStr, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); } + ctx.setPrepareExecuteBuffer(null); } // process COM_EXECUTE, parse binary row data @@ -266,7 +291,7 @@ public class MysqlConnectProcessor extends ConnectProcessor { "msg: Not supported such prepared statement"); return; } - handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext); + handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext, packetBuf, null); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5f3bf4c1b1e..55dc0dfaba3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -143,6 +143,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.minidump.MinidumpUtils; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook; +import org.apache.doris.nereids.trees.expressions.Placeholder; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand; @@ -267,6 +268,7 @@ public class StmtExecutor { private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; private String stmtName; + private String prepareStmtName; // for proxy private StatementBase prepareStmt = null; private String mysqlLoadId; // Distinguish from prepare and execute command @@ -681,8 +683,10 @@ public class StmtExecutor { } long stmtId = Config.prepared_stmt_start_id > 0 ? Config.prepared_stmt_start_id : context.getPreparedStmtId(); - logicalPlan = new PrepareCommand(String.valueOf(stmtId), - logicalPlan, statementContext.getPlaceholders(), originStmt); + this.prepareStmtName = String.valueOf(stmtId); + List<Placeholder> placeholders = context == null + ? statementContext.getPlaceholders() : context.getStatementContext().getPlaceholders(); + logicalPlan = new PrepareCommand(prepareStmtName, logicalPlan, placeholders, originStmt); } // when we in transaction mode, we only support insert into command and transaction command if (context.isTxnModel()) { @@ -3477,4 +3481,8 @@ public class StmtExecutor { context.getMysqlChannel().sendOnePacket(byteBuffer); } } + + public String getPrepareStmtName() { + return this.prepareStmtName; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 746d5d7e5d8..64f6578f68f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -571,6 +571,7 @@ struct TMasterOpRequest { // transaction load 29: optional TTxnLoadInfo txnLoadInfo 30: optional TGroupCommitInfo groupCommitInfo + 31: optional binary prepareExecuteBuffer } struct TColumnDefinition { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org