This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 650932cc49a [fix](job) fix routine load task schedule stuck after 
create task fail (#60143)
650932cc49a is described below

commit 650932cc49a33be2dc5965eedc1a4d7b628b837e
Author: hui lai <[email protected]>
AuthorDate: Sat Jan 31 05:58:24 2026 +0800

    [fix](job) fix routine load task schedule stuck after create task fail 
(#60143)
    
    ### What problem does this PR solve?
    
    fix routine load task schedule stuck after create task fail:
    ```
    026-01-21 18:46:11,938 WARN (Routine load task scheduler|52) 
[RoutineLoadTaskScheduler.process():117] Taking routine load task from queue 
has been interrupted
    java.lang.IllegalStateException
            at 
com.google.common.base.Preconditions.checkState(Preconditions.java:499)
            at org.apache.doris.analysis.SlotRef.getTableName(SlotRef.java:356)
            at 
org.apache.doris.rewrite.ExtractCommonFactorsRule.rewriteOrToIn(ExtractCommonFactorsRule.java:536)
            at 
org.apache.doris.rewrite.ExtractCommonFactorsRule.makeCompoundRemaining(ExtractCommonFactorsRule.java:459)
            at 
org.apache.doris.rewrite.ExtractCommonFactorsRule.extractCommonFactors(ExtractCommonFactorsRule.java:205)
            at 
org.apache.doris.rewrite.ExtractCommonFactorsRule.apply(ExtractCommonFactorsRule.java:80)
            at 
org.apache.doris.rewrite.ExprRewriter.applyRuleOnce(ExprRewriter.java:178)
            at 
org.apache.doris.rewrite.ExprRewriter.rewrite(ExprRewriter.java:171)
            at 
org.apache.doris.planner.FileLoadScanNode.initWhereExpr(FileLoadScanNode.java:171)
            at 
org.apache.doris.planner.FileLoadScanNode.initAndSetPrecedingFilter(FileLoadScanNode.java:144)
            at 
org.apache.doris.planner.FileLoadScanNode.initParamCreateContexts(FileLoadScanNode.java:134)
            at 
org.apache.doris.planner.FileLoadScanNode.init(FileLoadScanNode.java:125)
            at 
org.apache.doris.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:307)
            at 
org.apache.doris.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:116)
            at 
org.apache.doris.load.routineload.RoutineLoadJob.plan(RoutineLoadJob.java:1032)
            at 
org.apache.doris.load.routineload.KafkaTaskInfo.rePlan(KafkaTaskInfo.java:136)
            at 
org.apache.doris.load.routineload.KafkaTaskInfo.createRoutineLoadTask(KafkaTaskInfo.java:99)
            at 
org.apache.doris.load.routineload.RoutineLoadTaskScheduler.scheduleOneTask(RoutineLoadTaskScheduler.java:193)
            at 
org.apache.doris.load.routineload.RoutineLoadTaskScheduler.process(RoutineLoadTaskScheduler.java:115)
            at 
org.apache.doris.load.routineload.RoutineLoadTaskScheduler.runAfterCatalogReady(RoutineLoadTaskScheduler.java:84)
            at 
org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58)
            at org.apache.doris.common.util.Daemon.run(Daemon.java:119)
    ```
---
 .../load/routineload/RoutineLoadTaskScheduler.java |   7 +-
 ...est_routine_load_task_exception_recovery.groovy | 101 +++++++++++++++++++++
 2 files changed, 106 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 8be721fba58..4156410ef86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -195,6 +195,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
         try {
             long startTime = System.currentTimeMillis();
             tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
+            if 
(DebugPointUtil.isEnable("FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"))
 {
+                throw new RuntimeException("debug point: 
createRoutineLoadTask.exception");
+            }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("create routine load task cost(ms): {}, job id: {}",
                         (System.currentTimeMillis() - startTime), 
routineLoadTaskInfo.getJobId());
@@ -208,12 +211,12 @@ public class RoutineLoadTaskScheduler extends 
MasterDaemon {
                             new 
ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + 
e.getMessage()),
                             false);
             throw e;
-        } catch (UserException e) {
+        } catch (Exception e) {
             // set BE id to -1 to release the BE slot
             routineLoadTaskInfo.setBeId(-1);
             routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
                     .updateState(JobState.PAUSED,
-                            new ErrorReason(e.getErrorCode(),
+                            new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
                                     "failed to create task: " + 
e.getMessage()), false);
             throw e;
         }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
new file mode 100644
index 00000000000..a9cd82fd702
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.junit.Assert
+
+suite("test_routine_load_task_exception_recovery", "nonConcurrent") {
+    def kafkaCsvTpoics = [
+        "test_routine_load_task_exception_recovery",
+    ]
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        def tableName = "test_routine_load_task_exception_recovery"
+        def job = "test_task_exception_recovery"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // Enable debug point to simulate exception during 
createRoutineLoadTask
+            def injection = 
"FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"
+            try {
+                logger.info("---test task exception recovery: enable debug 
point to simulate exception---")
+                GetDebugPoint().enableDebugPointForAllFEs(injection)
+
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTpoics)
+
+                def maxWaitCount = 0
+                while (true) {
+                    def res = runSql("show routine load for ${job}")
+                    def routineLoadState = res[0][8].toString()
+                    def otherMsg = res[0][19].toString()
+                    logger.info("Routine load state: ${routineLoadState}, 
error message: ${otherMsg}")
+                    if (routineLoadState == "PAUSED" && 
otherMsg.contains("failed to create task")) {
+                        break
+                    }
+                    Thread.sleep(1000)
+                    if (maxWaitCount++ > 60) {
+                        Assert.fail("Routine load job did not pause as 
expected within timeout")
+                    }
+                }
+            } finally {
+                GetDebugPoint().disableDebugPointForAllFEs(injection)
+            }
+
+            // After disabling the debug point, verify that the routine load 
can recover
+            // and successfully load data
+            logger.info("---test task exception recovery: verify data loading 
after recovery---")
+            RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 0)
+
+            // Verify data was loaded
+            def rowCount = sql "select count(*) from ${tableName}"
+            logger.info("Row count: ${rowCount[0][0]}")
+            Assert.assertTrue("Expected at least 2 rows in table", 
rowCount[0][0] >= 2)
+        } finally {
+            sql "stop routine load for ${job}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to