This is an automated email from the ASF dual-hosted git repository.
sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e5f3badd010 [fix](job) lock routine load task renew on submit failure
(#64731)
e5f3badd010 is described below
commit e5f3badd0109e312167f242df5aa53adb86806d8
Author: hui lai <[email protected]>
AuthorDate: Wed Jun 24 10:58:39 2026 +0800
[fix](job) lock routine load task renew on submit failure (#64731)
### What problem does this PR solve?
Routine load submit failures can renew a task directly from the
scheduler after the task has begun a transaction. That path mutates the
job's `routineLoadTaskInfoList` without holding the job write lock,
racing with scheduler idle-slot counting that reads the same list. This
PR protects the submit-failure renew path with the job write lock,
matching the existing timeout and transaction-status renew paths, and
adds unit coverage for the locking behavior.
---
.../load/routineload/RoutineLoadTaskScheduler.java | 42 ++++++---
.../routineload/RoutineLoadTaskSchedulerTest.java | 103 +++++++++++++++++++++
2 files changed, 130 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 4de5f22c026..c901cd41610 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -250,22 +250,34 @@ public class RoutineLoadTaskScheduler extends
MasterDaemon {
routineLoadTaskInfo.getBeId(), errorMsg);
routineLoadTaskInfo.setBeId(-1);
RoutineLoadJob routineLoadJob =
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
- routineLoadJob.setOtherMsg(errorMsg);
-
- // Check if this is a resource pressure error that should not be
immediately rescheduled
- if (errorMsg.contains("TOO_MANY_TASKS") ||
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
- // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED
error),
- // but txn has already begun. Here we will still set the
ExecuteStartTime of
- // this task, which means we "assume" that this task has been
successfully submitted.
- // And this task will then be aborted because of a timeout.
- // In this way, we can prevent the entire job from being paused
due to submit errors,
- // and we can also relieve the pressure on BE by waiting for the
timeout period.
-
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
- return;
- }
+ RoutineLoadTaskInfo newTask;
+
+ routineLoadJob.writeLock();
+ try {
+ routineLoadJob.setOtherMsg(errorMsg);
+
+ // Check if this is a resource pressure error that should not be
immediately rescheduled
+ if (errorMsg.contains("TOO_MANY_TASKS") ||
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+ // submit task failed (such as
TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED error),
+ // but txn has already begun. Here we will still set the
ExecuteStartTime of
+ // this task, which means we "assume" that this task has been
successfully submitted.
+ // And this task will then be aborted because of a timeout.
+ // In this way, we can prevent the entire job from being
paused due to submit errors,
+ // and we can also relieve the pressure on BE by waiting for
the timeout period.
+
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
+ return;
+ }
- // for other errors (network issues, BE restart, etc.), reschedule
immediately
- RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
+ if (routineLoadJob.getState() != JobState.RUNNING
+ ||
!routineLoadJob.containsTask(routineLoadTaskInfo.getId())) {
+ return;
+ }
+
+ // for other errors (network issues, BE restart, etc.), reschedule
immediately
+ newTask = routineLoadJob.unprotectRenewTask(routineLoadTaskInfo,
false);
+ } finally {
+ routineLoadJob.writeUnlock();
+ }
addTaskInQueue(newTask);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index c3370221334..c99877c2cc4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -33,13 +33,16 @@ import org.apache.doris.thrift.BackendService;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -108,4 +111,104 @@ public class RoutineLoadTaskSchedulerTest {
routineLoadTaskScheduler.runAfterCatalogReady();
}
}
+
+ @Test
+ public void testSubmitTaskFailureRenewsTaskWithJobWriteLock() {
+ ConcurrentMap<Integer, Long> partitionIdToOffset =
Maps.newConcurrentMap();
+ partitionIdToOffset.put(1, 100L);
+ KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ partitionIdToOffset, false, -1, false);
+ routineLoadTaskInfo.setBeId(100L);
+
+ LockCheckingKafkaRoutineLoadJob routineLoadJob = new
LockCheckingKafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.RUNNING);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+ Lists.newArrayList(routineLoadTaskInfo));
+ Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+ RoutineLoadTaskScheduler routineLoadTaskScheduler = new
RoutineLoadTaskScheduler(routineLoadManager);
+ Deencapsulation.invoke(routineLoadTaskScheduler,
"handleSubmitTaskFailure",
+ routineLoadTaskInfo, "network error");
+
+ Assert.assertTrue(routineLoadJob.isRenewCalledWithWriteLock());
+ List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
+ Deencapsulation.getField(routineLoadJob,
"routineLoadTaskInfoList");
+ Assert.assertEquals(1, routineLoadTaskInfoList.size());
+ Assert.assertNotSame(routineLoadTaskInfo,
routineLoadTaskInfoList.get(0));
+
+ LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+ Deencapsulation.getField(routineLoadTaskScheduler,
"needScheduleTasksQueue");
+ Assert.assertSame(routineLoadTaskInfoList.get(0),
needScheduleTasksQueue.peek());
+ }
+
+ @Test
+ public void testSubmitTaskFailureSkipsRenewWhenTaskRemoved() {
+ ConcurrentMap<Integer, Long> partitionIdToOffset =
Maps.newConcurrentMap();
+ partitionIdToOffset.put(1, 100L);
+ KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ partitionIdToOffset, false, -1, false);
+ routineLoadTaskInfo.setBeId(100L);
+
+ LockCheckingKafkaRoutineLoadJob routineLoadJob = new
LockCheckingKafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.RUNNING);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
Lists.newArrayList());
+ Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+ RoutineLoadTaskScheduler routineLoadTaskScheduler = new
RoutineLoadTaskScheduler(routineLoadManager);
+ Deencapsulation.invoke(routineLoadTaskScheduler,
"handleSubmitTaskFailure",
+ routineLoadTaskInfo, "network error");
+
+ Assert.assertFalse(routineLoadJob.isRenewCalled());
+ LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+ Deencapsulation.getField(routineLoadTaskScheduler,
"needScheduleTasksQueue");
+ Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+ }
+
+ @Test
+ public void testSubmitTaskFailureSkipsRenewWhenJobPaused() {
+ ConcurrentMap<Integer, Long> partitionIdToOffset =
Maps.newConcurrentMap();
+ partitionIdToOffset.put(1, 100L);
+ KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ partitionIdToOffset, false, -1, false);
+ routineLoadTaskInfo.setBeId(100L);
+
+ LockCheckingKafkaRoutineLoadJob routineLoadJob = new
LockCheckingKafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.PAUSED);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+ Lists.newArrayList(routineLoadTaskInfo));
+ Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+ RoutineLoadTaskScheduler routineLoadTaskScheduler = new
RoutineLoadTaskScheduler(routineLoadManager);
+ Deencapsulation.invoke(routineLoadTaskScheduler,
"handleSubmitTaskFailure",
+ routineLoadTaskInfo, "network error");
+
+ Assert.assertFalse(routineLoadJob.isRenewCalled());
+ LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+ Deencapsulation.getField(routineLoadTaskScheduler,
"needScheduleTasksQueue");
+ Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+ }
+
+ private static class LockCheckingKafkaRoutineLoadJob extends
KafkaRoutineLoadJob {
+ private boolean renewCalled;
+ private boolean renewCalledWithWriteLock;
+
+ @Override
+ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo,
+ boolean delaySchedule) {
+ renewCalled = true;
+ renewCalledWithWriteLock = lock.isWriteLockedByCurrentThread();
+ return super.unprotectRenewTask(routineLoadTaskInfo,
delaySchedule);
+ }
+
+ private boolean isRenewCalled() {
+ return renewCalled;
+ }
+
+ private boolean isRenewCalledWithWriteLock() {
+ return renewCalledWithWriteLock;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]