This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 967d0a59b9a branch-2.1: [fix](load) reset routine load task EOF sign
correctly #50048 (#50179)
967d0a59b9a is described below
commit 967d0a59b9a138260b944f1402b5ab1f2955a699
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Apr 19 17:15:18 2025 +0800
branch-2.1: [fix](load) reset routine load task EOF sign correctly #50048
(#50179)
Cherry-picked from #50048
Co-authored-by: hui lai <[email protected]>
---
.../load/routineload/RoutineLoadTaskInfo.java | 7 ++++++
.../routine_load/test_routine_load_eof.groovy | 29 ++++++++++++++++++++--
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 5075311299d..0c662ce765d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.ExecuteEnv;
@@ -171,11 +172,17 @@ public abstract class RoutineLoadTaskInfo {
}
private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment)
{
+ if (DebugPointUtil.isEnable("RoutineLoadTaskInfo.judgeEof")) {
+ this.isEof = false;
+ return;
+ }
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
if (rlTaskTxnCommitAttachment.getTotalRows() <
routineLoadJob.getMaxBatchRows()
&& rlTaskTxnCommitAttachment.getReceivedBytes() <
routineLoadJob.getMaxBatchSizeBytes()
&& rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() <
this.timeoutMs) {
this.isEof = true;
+ } else {
+ this.isEof = false;
}
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
index d4078896068..ac0b08248ef 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -20,7 +20,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig
-suite("test_routine_load_eof","p0") {
+suite("test_routine_load_eof","nonConcurrent") {
def kafkaCsvTpoics = [
"test_eof",
]
@@ -52,7 +52,7 @@ suite("test_routine_load_eof","p0") {
producer.send(record)
}
}
- if (count >= 120) {
+ if (count >= 180) {
break
}
count++
@@ -166,6 +166,8 @@ suite("test_routine_load_eof","p0") {
}
break;
}
+ def committedTaskNum1 = 0
+ def committedTaskNum2 = 0
sleep(60 * 1000)
def res = sql "show routine load for ${jobName}"
def statistic = res[0][14].toString()
@@ -174,6 +176,29 @@ suite("test_routine_load_eof","p0") {
if (json.committedTaskNum > 20) {
assertEquals(1, 2)
}
+ committedTaskNum1 = json.committedTaskNum
+ try {
+
GetDebugPoint().enableDebugPointForAllFEs("RoutineLoadTaskInfo.judgeEof")
+ sleep(30 * 1000)
+ res = sql "show routine load for ${jobName}"
+ statistic = res[0][14].toString()
+ json = parseJson(res[0][14])
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ if (json.committedTaskNum - committedTaskNum1 < 20) {
+ assertEquals(1, 2)
+ }
+ committedTaskNum2 = json.committedTaskNum
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs("RoutineLoadTaskInfo.judgeEof")
+ }
+ sleep(60 * 1000)
+ res = sql "show routine load for ${jobName}"
+ statistic = res[0][14].toString()
+ json = parseJson(res[0][14])
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ if (json.committedTaskNum - committedTaskNum2 > 20) {
+ assertEquals(1, 2)
+ }
} finally {
sql "stop routine load for ${jobName}"
sql "DROP TABLE IF EXISTS ${tableName}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]