This is an automated email from the ASF dual-hosted git repository.
sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 31b6fbb3e50 [opt](job) delay Kafka read committed zero-row retries
(#64046)
31b6fbb3e50 is described below
commit 31b6fbb3e50e25227901a74e3fd872adaa6e3d98
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 5 14:16:11 2026 +0800
[opt](job) delay Kafka read committed zero-row retries (#64046)
### What problem does this PR solve?
Kafka routine load with `isolation.level=read_committed` can finish a
task with 0 consumed rows while the task still has positive lag, for
example when upstream transactional records are not committed and
therefore invisible. PR #63664 added an `OtherMsg` hint for this case,
but the renewed task could still be scheduled immediately when the
normal EOF heuristic did not apply, causing repeated retries. This
change reuses the same read_committed zero-row lag detection to mark the
next Kafka routine load task for delayed scheduling, so it follows the
existing `max_batch_interval` delay path used by EOF tasks.
---
.../load/routineload/RoutineLoadTaskInfo.java | 9 ++++
.../routineload/kafka/KafkaRoutineLoadJob.java | 10 +++--
.../load/routineload/kafka/KafkaTaskInfo.java | 7 ++++
.../load/routineload/KafkaRoutineLoadJobTest.java | 48 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 872f29e72ed..e812bf3740d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -158,6 +158,10 @@ public abstract class RoutineLoadTaskInfo {
return isEof;
}
+ public boolean isDelaySchedule() {
+ return delaySchedule;
+ }
+
public boolean needDedalySchedule() {
return delaySchedule || isEof;
}
@@ -178,6 +182,7 @@ public abstract class RoutineLoadTaskInfo {
public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
judgeEof(rlTaskTxnCommitAttachment);
+ this.delaySchedule = shouldDelaySchedule(rlTaskTxnCommitAttachment);
}
private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment)
{
@@ -195,6 +200,10 @@ public abstract class RoutineLoadTaskInfo {
}
}
+ protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
+ return false;
+ }
+
protected abstract TRoutineLoadTask createRoutineLoadTask() throws
UserException;
public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
index 037b468b53a..dcb683e9e22 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
@@ -374,12 +374,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
private void updateReadCommittedLagHint(RLTaskTxnCommitAttachment
attachment) {
- if (DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
- || (attachment.getTotalRows() == 0 && isReadCommitted() &&
hasPositiveLagForTask(attachment))) {
+ if (shouldDelayScheduleForReadCommittedZeroRowsWithLag(attachment)) {
setOtherMsg(READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE);
}
}
+ boolean
shouldDelayScheduleForReadCommittedZeroRowsWithLag(RLTaskTxnCommitAttachment
attachment) {
+ return DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
+ || (attachment.getTotalRows() == 0 && isReadCommitted() &&
hasPositiveLagForTask(attachment));
+ }
+
private boolean isReadCommitted() {
return
KAFKA_READ_COMMITTED.equalsIgnoreCase(customProperties.get(KAFKA_ISOLATION_LEVEL));
}
@@ -413,7 +417,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// add new task
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
((KafkaProgress)
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()),
isMultiTable());
- kafkaTaskInfo.setDelaySchedule(delaySchedule);
+ kafkaTaskInfo.setDelaySchedule(delaySchedule ||
oldKafkaTaskInfo.isDelaySchedule());
// remove old task
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
// add new task
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
index 4364c106529..21aec901571 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadTaskInfo;
@@ -162,6 +163,12 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
}
+ @Override
+ protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
+ KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob)
routineLoadManager.getJob(jobId);
+ return
routineLoadJob.shouldDelayScheduleForReadCommittedZeroRowsWithLag(rlTaskTxnCommitAttachment);
+ }
+
private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob)
throws UserException {
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(),
id.getLeastSignificantBits());
// plan for each task, in case table has change(rollup or schema
change)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 8ff853e9b42..8b442012ff2 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -261,6 +261,54 @@ public class KafkaRoutineLoadJobTest {
Assert.assertTrue(otherMsg.contains("some records may be in
uncommitted transactions"));
}
+ @Test
+ public void testReadCommittedZeroRowsWithLagDelaysNextTask() throws
UserException {
+ RoutineLoadManager routineLoadManager =
Mockito.mock(RoutineLoadManager.class);
+ Env env = Mockito.mock(Env.class);
+
+ try (MockedStatic<Env> envStatic = Mockito.mockStatic(Env.class)) {
+ envStatic.when(Env::getCurrentEnv).thenReturn(env);
+
Mockito.when(env.getRoutineLoadManager()).thenReturn(routineLoadManager);
+
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+
Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+ Map<String, String> customProperties = Maps.newHashMap();
+ customProperties.put("isolation.level", "read_committed");
+ Deencapsulation.setField(routineLoadJob, "customProperties",
customProperties);
+
+ Map<Integer, Long> cachedPartitionWithLatestOffsets =
Maps.newHashMap();
+ cachedPartitionWithLatestOffsets.put(1, 20L);
+ Deencapsulation.setField(routineLoadJob,
"cachedPartitionWithLatestOffsets",
+ cachedPartitionWithLatestOffsets);
+
+ Map<Integer, Long> taskProgress = Maps.newHashMap();
+ taskProgress.put(1, 10L);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(taskProgress));
+
+ KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ taskProgress, false, 1000, false);
+ List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new
ArrayList<>();
+ routineLoadTaskInfoList.add(kafkaTaskInfo);
+ Deencapsulation.setField(routineLoadJob,
"routineLoadTaskInfoList", routineLoadTaskInfoList);
+
+ RLTaskTxnCommitAttachment attachment = new
RLTaskTxnCommitAttachment();
+ Deencapsulation.setField(attachment, "progress", new
KafkaProgress(taskProgress));
+ Deencapsulation.setField(attachment, "taskExecutionTimeMs",
+ routineLoadJob.getMaxBatchIntervalS() * 1000);
+
+ kafkaTaskInfo.handleTaskByTxnCommitAttachment(attachment);
+
+ Assert.assertFalse(kafkaTaskInfo.getIsEof());
+ Assert.assertTrue(kafkaTaskInfo.needDedalySchedule());
+
+ RoutineLoadTaskInfo newTask =
Deencapsulation.invoke(routineLoadJob,
+ "unprotectRenewTask", kafkaTaskInfo, false);
+ Assert.assertTrue(newTask.needDedalySchedule());
+ }
+ }
+
@Test
public void testProcessTimeOutTasks() throws Exception {
RoutineLoadManager routineLoadManager =
Mockito.mock(RoutineLoadManager.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]