This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d6c5fe2617d branch-3.1: [fix](audit log) fix audit log return rows
incorrect when statement need forward #54548 (#54580)
d6c5fe2617d is described below
commit d6c5fe2617d6acc610833f9bca0c0bd5decd6314
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 14:28:48 2025 +0800
branch-3.1: [fix](audit log) fix audit log return rows incorrect when
statement need forward #54548 (#54580)
Cherry-picked from #54548
Co-authored-by: hui lai <[email protected]>
---
.../java/org/apache/doris/qe/ConnectProcessor.java | 1 +
.../java/org/apache/doris/qe/FEOpExecutor.java | 3 ++
gensrc/thrift/FrontendService.thrift | 1 +
.../insert/test_insert_from_follower.groovy | 62 ++++++++++++++++++++++
4 files changed, 67 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 8ad600f465e..93ce46dd93c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -749,6 +749,7 @@ public abstract class ConnectProcessor {
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
result.setStatusCode(0);
+ result.setAffectedRows(ctx.getState().getAffectedRows());
} else {
ErrorCode errorCode = ctx.getState().getErrorCode();
if (errorCode != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index ecf2f0c8428..0c849ef924b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -83,6 +83,9 @@ public class FEOpExecutor {
LOG.info("set txn entry to null");
}
}
+ if (result.isSetAffectedRows()) {
+ ctx.updateReturnRows((int) result.getAffectedRows());
+ }
}
public void cancel() throws Exception {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index fe60c5d1b22..8f896b0b70a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -635,6 +635,7 @@ struct TMasterOpResult {
// transaction load
9: optional TTxnLoadInfo txnLoadInfo;
10: optional i64 groupCommitLoadBeId;
+ 11: optional i64 affectedRows;
}
struct TUpdateExportTaskStatusRequest {
diff --git
a/regression-test/suites/load_p0/insert/test_insert_from_follower.groovy
b/regression-test/suites/load_p0/insert/test_insert_from_follower.groovy
new file mode 100644
index 00000000000..31b230ceb04
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_from_follower.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_insert_from_follower", "docker") {
+ def options = new ClusterOptions()
+ options.setFeNum(3)
+ options.setBeNum(3)
+ docker(options) {
+ def masterFe = cluster.getMasterFe()
+ def allFes = cluster.getAllFrontends()
+ def followerFes = allFes.findAll { fe -> fe.index != masterFe.index }
+ def followerFe = followerFes[0]
+ logger.info("Master FE: ${masterFe.host}")
+ logger.info("Using follower FE: ${followerFe.host}")
+ // Connect to follower FE
+ def url = String.format(
+
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
+ followerFe.host, followerFe.queryPort)
+ logger.info("Connecting to follower FE: ${url}")
+ context.connectTo(url, context.config.jdbcUser,
context.config.jdbcPassword)
+
+ sql "drop database if exists test_insert_from_follower"
+ sql "create database test_insert_from_follower"
+ sql "use test_insert_from_follower"
+ def tbl = 'test_insert_from_follower_tbl'
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """
+ CREATE TABLE ${tbl} (
+ `k1` int(11) NULL,
+ `k2` char(5) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="3"
+ );
+ """
+
+ def loadRes = sql """ INSERT INTO ${tbl} (k1, k2) VALUES (1, "a"), (2,
"b"), (3, "c"), (4, "e");"""
+ logger.info("loadRes: ${loadRes}")
+ assertTrue(loadRes[0][0] == 4)
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sleep(5000)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]