This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d9d22ed81b3 [improve](streaming-job) make from-to streaming task
timeout progress-aware (#64301)
d9d22ed81b3 is described below
commit d9d22ed81b3ce154f602ff10d598c54285e4e9f6
Author: wudi <[email protected]>
AuthorDate: Tue Jun 23 10:04:27 2026 +0800
[improve](streaming-job) make from-to streaming task timeout progress-aware
(#64301)
## Problem
During a from-to streaming job's full (snapshot) sync, a data-skewed
table can produce a single split of 100GB+ that keeps one task busy for
well over an hour. The task timeout is a static
`streaming_task_timeout_multiplier * maxInterval` measured from task
start, so such a task is killed and retried in a loop even though it
keeps making steady progress. The only workaround today is to manually
raise `streaming_task_timeout_multiplier`.
## Solution
Make the timeout progress-aware. cdc_client publishes the read-end
progress (`scannedRows`) per running task and exposes it via
`/api/getProgress/{taskId}`. On each scheduling tick the FE pulls it as
a heartbeat: any advance renews the deadline, so a large split that
keeps scanning is never killed. When there is no progress the check
falls back to the original budget, preserving the safety net against an
unresponsive task.
- The window is floored by a new `streaming_task_min_timeout_sec`
(default 300s), mirroring `routine_load_task_min_timeout_sec`, so a
small `maxInterval` cannot shrink it below the time a task can
legitimately make no progress (e.g. a snapshot split's write tail after
the scan finishes).
- Write-side hard failures still surface within seconds via the existing
fail-reason channel, independent of the timeout path.
- The heartbeat state on the FE task is transient and never persisted,
so FE replay is unaffected.
---
.../main/java/org/apache/doris/common/Config.java | 6 ++
.../apache/doris/job/cdc/StreamingTaskStatus.java | 50 ++++++++++++
.../doris/job/cdc/request/TaskFailureRequest.java | 46 +++++++++++
.../doris/httpv2/rest/StreamingJobAction.java | 40 ++++++++--
.../insert/streaming/StreamingInsertJob.java | 45 +++++++++--
.../insert/streaming/StreamingMultiTblTask.java | 85 +++++++++++----------
.../StreamingMultiTblTaskTimeoutTest.java | 89 ++++++++++++++++++++++
.../cdcclient/controller/ClientController.java | 5 ++
.../cdcclient/service/PipelineCoordinator.java | 37 ++++++++-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 41 ++++++++++
.../org/apache/doris/cdcclient/utils/HttpUtil.java | 6 +-
11 files changed, 390 insertions(+), 60 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 33432425a53..ff1b7502f23 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1193,6 +1193,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int streaming_task_timeout_multiplier = 10;
+ /**
+ * streaming task min timeout second.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int streaming_task_min_timeout_sec = 300;
+
@ConfField(mutable = true, masterOnly = true)
public static int streaming_cdc_light_rpc_timeout_sec = 90;
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/StreamingTaskStatus.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/StreamingTaskStatus.java
new file mode 100644
index 00000000000..16c339dc24c
--- /dev/null
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/StreamingTaskStatus.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.cdc;
+
+// cdc_client -> FE running task status, pulled by the FE only when the local
timeout
+// budget is already exceeded. scannedRows is the read-end heartbeat used to
renew the
+// deadline (scannedRows < 0 means no progress info: not scanning, or scan
finished);
+// failReason carries the recorded write error so a kill can report the real
cause.
+public class StreamingTaskStatus {
+ private long scannedRows = -1;
+ private String failReason = "";
+
+ public StreamingTaskStatus() {}
+
+ public StreamingTaskStatus(long scannedRows, String failReason) {
+ this.scannedRows = scannedRows;
+ this.failReason = failReason;
+ }
+
+ public long getScannedRows() {
+ return scannedRows;
+ }
+
+ public void setScannedRows(long scannedRows) {
+ this.scannedRows = scannedRows;
+ }
+
+ public String getFailReason() {
+ return failReason;
+ }
+
+ public void setFailReason(String failReason) {
+ this.failReason = failReason;
+ }
+}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/TaskFailureRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/TaskFailureRequest.java
new file mode 100644
index 00000000000..487b891c889
--- /dev/null
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/TaskFailureRequest.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.cdc.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+// cdc_client -> FE push: report a hard write failure of a running task
immediately,
+// so it is failed within seconds instead of waiting out the timeout budget.
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class TaskFailureRequest {
+ public long jobId;
+ public long taskId;
+ public String reason;
+
+ @Override
+ public String toString() {
+ return "TaskFailureRequest{"
+ + "jobId=" + jobId
+ + ", taskId=" + taskId
+ + ", reason='" + reason + "'"
+ + "}";
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
index 34d7abe3133..a27973d5350 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
@@ -22,6 +22,7 @@ import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.TaskFailureRequest;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import com.google.common.base.Strings;
@@ -39,17 +40,40 @@ public class StreamingJobAction extends RestBaseController {
@RequestMapping(path = "/api/streaming/commit_offset", method =
RequestMethod.PUT)
public Object commitOffset(@RequestBody CommitOffsetRequest offsetRequest,
HttpServletRequest request) {
+ checkAuth(request);
+ return updateOffset(offsetRequest);
+ }
+
+ @RequestMapping(path = "/api/streaming/report_task_failure", method =
RequestMethod.PUT)
+ public Object reportTaskFailure(@RequestBody TaskFailureRequest
failureRequest, HttpServletRequest request) {
+ checkAuth(request);
+ return failTask(failureRequest);
+ }
+
+ private void checkAuth(HttpServletRequest request) {
String authToken = request.getHeader("token");
- // if auth token is not null, check it first
- if (!Strings.isNullOrEmpty(authToken)) {
- if (!checkClusterToken(authToken)) {
- throw new UnauthorizedException("Invalid token: " + authToken);
- }
- return updateOffset(offsetRequest);
- } else {
- // only use for token
+ if (Strings.isNullOrEmpty(authToken)) {
throw new UnauthorizedException("Miss token");
}
+ if (!checkClusterToken(authToken)) {
+ throw new UnauthorizedException("Invalid token: " + authToken);
+ }
+ }
+
+ private Object failTask(TaskFailureRequest failureRequest) {
+ AbstractJob job =
Env.getCurrentEnv().getJobManager().getJob(failureRequest.getJobId());
+ if (!(job instanceof StreamingInsertJob)) {
+ return ResponseEntityBuilder
+ .okWithCommonError("Job " + failureRequest.getJobId() + "
is not a streaming job");
+ }
+ try {
+ LOG.info("Reporting task failure with {}",
failureRequest.toString());
+ ((StreamingInsertJob) job).reportTaskFailure(failureRequest);
+ return ResponseEntityBuilder.ok("Task failure reported
successfully");
+ } catch (Exception e) {
+ LOG.warn("Failed to report task failure for job {}: {}",
failureRequest.getJobId(), e.getMessage());
+ return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+ }
}
private Object updateOffset(CommitOffsetRequest offsetRequest) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index be14f79fa34..be9e6d22f22 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -39,7 +39,9 @@ import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.TaskFailureRequest;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.IntervalUnit;
@@ -139,7 +141,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private String tvfType;
private Map<String, String> originTvfProps;
@Getter
- AbstractStreamingTask runningStreamTask;
+ volatile AbstractStreamingTask runningStreamTask;
SourceOffsetProvider offsetProvider;
@Getter
@Setter
@@ -1404,20 +1406,51 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ /**
+ * Push from cdc_client: fail the running task immediately on a hard write
failure.
+ * Reports whose taskId no longer matches the current running task are
dropped.
+ */
+ public void reportTaskFailure(TaskFailureRequest request) throws
JobException {
+ AbstractStreamingTask task = this.runningStreamTask;
+ if (!(task instanceof StreamingMultiTblTask)) {
+ return;
+ }
+ StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+ if (runningMultiTask.getTaskId() != request.getTaskId()) {
+ return;
+ }
+ writeLock();
+ try {
+ if (this.runningStreamTask == runningMultiTask
+ && runningMultiTask.getTaskId() == request.getTaskId()
+ &&
TaskStatus.RUNNING.equals(runningMultiTask.getStatus())) {
+ runningMultiTask.onFail(request.getReason());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* The current streamingTask times out; create a new streamingTask.
* Only applies to StreamingMultiTask.
*/
public void processTimeoutTasks() throws JobException {
- if (!(runningStreamTask instanceof StreamingMultiTblTask)) {
+ AbstractStreamingTask task = this.runningStreamTask;
+ if (!(task instanceof StreamingMultiTblTask)) {
+ return;
+ }
+ StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+ if (!runningMultiTask.isLocalTimeout()) {
return;
}
+ StreamingTaskStatus status = runningMultiTask.fetchTaskStatus();
writeLock();
try {
- StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask)
this.runningStreamTask;
- if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
- && runningMultiTask.isTimeout()) {
- String timeoutReason = runningMultiTask.getTimeoutReason();
+ if (this.runningStreamTask == runningMultiTask
+ && TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
+ && runningMultiTask.isTimeout(status)) {
+ String timeoutReason = status == null ? "" :
status.getFailReason();
if (StringUtils.isEmpty(timeoutReason)) {
timeoutReason = "task failed cause timeout";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 9077ad01a82..85525beb1f8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -25,6 +25,7 @@ import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
@@ -83,7 +84,9 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private long loadBytes = 0L;
private long filteredRows = 0L;
private long loadedRows = 0L;
- private long runningBackendId;
+ private volatile long runningBackendId;
+ long lastScannedRows = -1;
+ long lastProgressMs = 0;
public StreamingMultiTblTask(Long jobId,
long taskId,
@@ -111,8 +114,9 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
log.info("streaming multi task has been canceled, task id is {}",
getTaskId());
return;
}
- this.status = TaskStatus.RUNNING;
this.startTimeMs = System.currentTimeMillis();
+ this.lastProgressMs = this.startTimeMs;
+ this.status = TaskStatus.RUNNING;
this.runningOffset = offsetProvider.getNextOffset(null,
sourceProperties);
log.info("streaming multi task {} get running offset: {}", taskId,
runningOffset.toString());
}
@@ -361,15 +365,29 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
return Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
}
- public boolean isTimeout() {
+ // Local pre-check, no RPC: gates whether to pull real progress this tick.
+ boolean isLocalTimeout() {
+ if (startTimeMs == null) {
+ return false;
+ }
+ return System.currentTimeMillis() - lastProgressMs >
getTaskTimeoutMs();
+ }
+
+ boolean isTimeout(StreamingTaskStatus status) {
if (startTimeMs == null) {
// It's still pending, waiting for scheduling.
return false;
}
+ long now = System.currentTimeMillis();
+ if (status != null && status.getScannedRows() > lastScannedRows) {
+ lastScannedRows = status.getScannedRows();
+ lastProgressMs = now;
+ }
long timeoutMs = getTaskTimeoutMs();
- long elapsed = System.currentTimeMillis() - startTimeMs;
+ long elapsed = now - lastProgressMs;
if (elapsed > timeoutMs) {
- log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms",
taskId, elapsed, timeoutMs);
+ log.info("Task {} timeout detected: no progress for {}ms,
timeoutMs={}ms",
+ taskId, elapsed, timeoutMs);
return true;
}
return false;
@@ -377,56 +395,39 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
// Read multiplier live so config changes affect already-running tasks.
private long getTaskTimeoutMs() {
- return Config.streaming_task_timeout_multiplier *
jobProperties.getMaxIntervalSecond() * 1000L;
+ return Math.max(
+ Config.streaming_task_timeout_multiplier *
jobProperties.getMaxIntervalSecond() * 1000L,
+ Config.streaming_task_min_timeout_sec * 1000L);
}
- /**
- * When a task encounters a write error, it will time out.
- * The job needs to obtain the reason for the timeout,
- * such as a data quality error, and needs to expose it to the user.
- */
- public String getTimeoutReason() {
+ StreamingTaskStatus fetchTaskStatus() {
if (runningBackendId <= 0) {
- log.info("No running backend for task {}", runningBackendId);
- return "";
+ return null;
}
Backend backend =
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+ if (backend == null) {
+ return null;
+ }
try {
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
- .setApi("/api/getFailReason/" + getTaskId())
+ .setApi("/api/getTaskStatus/" + getTaskId())
.build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
- InternalService.PRequestCdcClientResult result = null;
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance()
.requestCdcClient(address, request,
Config.streaming_cdc_light_rpc_timeout_sec);
- result = future.get(Config.streaming_cdc_light_rpc_timeout_sec,
TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- log.warn("Failed to get task timeout reason, {}",
result.getStatus().getErrorMsgs(0));
- return "";
+ PRequestCdcClientResult result =
future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
+ if (TStatusCode.findByValue(result.getStatus().getStatusCode()) !=
TStatusCode.OK) {
+ return null;
}
- String response = result.getResponse();
- try {
- ResponseBody<String> responseObj = objectMapper.readValue(
- response,
- new TypeReference<ResponseBody<String>>() {
- }
- );
- if (responseObj.getCode() == RestApiStatusCode.OK.code) {
- return responseObj.getData();
- }
- } catch (JsonProcessingException e) {
- log.warn("Failed to get task timeout reason, response: {}",
response);
- }
- } catch (TimeoutException te) {
- log.warn("cdc_client RPC timeout api=/api/getFailReason jobId={}
taskId={} backend={}:{} "
- + "timeout_sec={}",
- getJobId(), getTaskId(), backend.getHost(),
backend.getBrpcPort(),
- Config.streaming_cdc_light_rpc_timeout_sec);
- } catch (ExecutionException | InterruptedException ex) {
- log.warn("Send get task fail reason request failed: ", ex);
+ ResponseBody<StreamingTaskStatus> body = objectMapper.readValue(
+ result.getResponse(),
+ new TypeReference<ResponseBody<StreamingTaskStatus>>() {
+ });
+ return body.getCode() == RestApiStatusCode.OK.code ?
body.getData() : null;
+ } catch (Exception e) {
+ log.warn("fetch task status failed, job {} task {}", getJobId(),
getTaskId(), e);
+ return null;
}
- return "";
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTaskTimeoutTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTaskTimeoutTest.java
new file mode 100644
index 00000000000..453c4ef202e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTaskTimeoutTest.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class StreamingMultiTblTaskTimeoutTest {
+
+ @Before
+ public void setup() {
+ Config.streaming_task_timeout_multiplier = 10;
+ Config.streaming_task_min_timeout_sec = 300;
+ }
+
+ private StreamingMultiTblTask newTask(long lastProgressMsAgo, long
intervalSec) {
+ StreamingJobProperties props =
Mockito.mock(StreamingJobProperties.class);
+ Mockito.when(props.getMaxIntervalSecond()).thenReturn(intervalSec);
+ StreamingMultiTblTask t = new StreamingMultiTblTask(
+ 1L, 1L, null, null, null, "db", null, props, null, null);
+ long now = System.currentTimeMillis();
+ t.startTimeMs = now - lastProgressMsAgo;
+ t.lastProgressMs = t.startTimeMs;
+ t.lastScannedRows = 1000;
+ return t;
+ }
+
+ private StreamingTaskStatus status(long scanned) {
+ StreamingTaskStatus s = new StreamingTaskStatus();
+ s.setScannedRows(scanned);
+ return s;
+ }
+
+ @Test
+ public void readAdvancingRenewsDeadline() {
+ StreamingMultiTblTask t = newTask(10 * 3600_000L, 60L);
+ Assert.assertFalse(t.isTimeout(status(2000)));
+ }
+
+ @Test
+ public void noProgressWithinBudgetNotTimeout() {
+ StreamingMultiTblTask t = newTask(5 * 60_000L, 60L);
+ Assert.assertFalse(t.isTimeout(status(1000)));
+ }
+
+ @Test
+ public void noProgressOverBudgetTimeout() {
+ StreamingMultiTblTask t = newTask(11 * 60_000L, 60L);
+ Assert.assertTrue(t.isTimeout(status(1000)));
+ }
+
+ @Test
+ public void smallIntervalFlooredByMinTimeout() {
+ StreamingMultiTblTask t = newTask(4 * 60_000L, 1L);
+ Assert.assertFalse(t.isTimeout(status(1000)));
+ }
+
+ @Test
+ public void nullProgressBehavesLikeOldTimeout() {
+ StreamingMultiTblTask t = newTask(11 * 60_000L, 60L);
+ Assert.assertTrue(t.isTimeout(null));
+ }
+
+ @Test
+ public void localTimeoutGatesProgressPull() {
+ Assert.assertFalse(newTask(5 * 60_000L, 60L).isLocalTimeout());
+ Assert.assertTrue(newTask(11 * 60_000L, 60L).isLocalTimeout());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 22509a55e98..1ac874ceeaa 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -165,6 +165,11 @@ public class ClientController {
return
RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
}
+ @RequestMapping(path = "/api/getTaskStatus/{taskId}", method =
RequestMethod.POST)
+ public Object getTaskStatus(@PathVariable("taskId") String taskId) {
+ return RestResponse.success(pipelineCoordinator.getTaskStatus(taskId));
+ }
+
@RequestMapping(path = "/api/getTaskOffset/{taskId}", method =
RequestMethod.POST)
public Object getTaskIdOffset(@PathVariable("taskId") String taskId) {
return
RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId));
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index ebbd9c4acf1..51871db92b1 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -29,6 +29,7 @@ import
org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
@@ -54,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME;
@@ -61,6 +63,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import io.debezium.data.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,8 +82,11 @@ public class PipelineCoordinator {
// taskId -> list of split offsets (accumulates all splits processed in
one task)
private final Map<String, List<Map<String, String>>> taskOffsetCache =
new ConcurrentHashMap<>();
- // taskId -> writeFailReason
- private final Map<String, String> taskErrorMaps = new
ConcurrentHashMap<>();
+ // taskId -> writeFailReason, bounded so old entries are evicted instead
of accumulating
+ // unbounded
+ private final Cache<String, String> taskErrorMaps =
+ CacheBuilder.newBuilder().maximumSize(1000).build();
+ private final Map<String, AtomicLong> taskProgressMap = new
ConcurrentHashMap<>();
private final ThreadPoolExecutor executor;
private static final int QUEUE_CAPACITY = 128;
private static final ObjectMapper objectMapper = new ObjectMapper();
@@ -406,6 +413,13 @@ public class PipelineCoordinator {
closeJobStreamLoad(writeRecordRequest.getJobId());
String rootCauseMessage =
ExceptionUtils.getRootCauseMessage(ex);
taskErrorMaps.put(writeRecordRequest.getTaskId(),
rootCauseMessage);
+ taskProgressMap.remove(writeRecordRequest.getTaskId());
+ DorisBatchStreamLoad.reportTaskFailure(
+ writeRecordRequest.getFrontendAddress(),
+ writeRecordRequest.getToken(),
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ rootCauseMessage);
LOG.error(
"Failed to process async write record,
jobId={} taskId={}",
writeRecordRequest.getJobId(),
@@ -579,6 +593,10 @@ public class PipelineCoordinator {
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
+ taskProgressMap
+ .computeIfAbsent(
+ writeRecordRequest.getTaskId(), k ->
new AtomicLong())
+ .set(scannedRows);
}
}
}
@@ -620,6 +638,7 @@ public class PipelineCoordinator {
scannedRows,
batchStreamLoad.getLoadStatistic(),
tableSchemas);
+ taskProgressMap.remove(currentTaskId);
}
public static boolean isHeartbeatEvent(SourceRecord record) {
@@ -729,10 +748,22 @@ public class PipelineCoordinator {
}
public String getTaskFailReason(String taskId) {
- String taskReason = taskErrorMaps.remove(taskId);
+ String taskReason = taskErrorMaps.getIfPresent(taskId);
+ taskErrorMaps.invalidate(taskId);
return taskReason == null ? "" : taskReason;
}
+ public StreamingTaskStatus getTaskStatus(String taskId) {
+ // On failure, drop progress so FE won't renew the deadline on a
failed task.
+ String reason = taskErrorMaps.getIfPresent(taskId);
+ taskErrorMaps.invalidate(taskId);
+ if (StringUtils.isNotEmpty(reason)) {
+ return new StreamingTaskStatus(-1, reason);
+ }
+ AtomicLong scannedRows = taskProgressMap.get(taskId);
+ return new StreamingTaskStatus(scannedRows == null ? -1 :
scannedRows.get(), "");
+ }
+
/**
* Clean up reader resources: commit source offset and finish split
records.
*
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index b1d1cd8ba03..207813a523c 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -21,6 +21,7 @@ import org.apache.doris.cdcclient.common.Env;
import org.apache.doris.cdcclient.exception.StreamLoadException;
import org.apache.doris.cdcclient.utils.HttpUtil;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.TaskFailureRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
@@ -76,6 +77,10 @@ public class DorisBatchStreamLoad implements Serializable {
private final byte[] lineDelimiter = "\n".getBytes();
private static final String LOAD_URL_PATTERN =
"http://%s/api/%s/%s/_stream_load";
private static final String COMMIT_URL_PATTERN =
"http://%s/api/streaming/commit_offset";
+ private static final String REPORT_FAILURE_URL_PATTERN =
+ "http://%s/api/streaming/report_task_failure";
+ // best-effort notification: short timeout so an unreachable FE can't pin
the data-write thread
+ private static final int REPORT_FAILURE_TIMEOUT_MS = 60 * 1000;
private String hostPort;
@Setter private String frontendAddress;
private Map<String, BatchRecordBuffer> bufferMap = new
ConcurrentHashMap<>();
@@ -600,4 +605,40 @@ public class DorisBatchStreamLoad implements Serializable {
throw new StreamLoadException("Failed to commit offset", ex);
}
}
+
+ /**
+ * Best-effort push: tell the FE a running task hit a hard write failure
so it is failed within
+ * seconds instead of waiting out the timeout budget. Never throws; if the
push is lost the
+ * progress-aware timeout on the FE is the backstop.
+ */
+ public static void reportTaskFailure(
+ String frontendAddress, String token, String jobId, String taskId,
String reason) {
+ try {
+ String url = String.format(REPORT_FAILURE_URL_PATTERN,
frontendAddress);
+ String param =
+ OBJECT_MAPPER.writeValueAsString(
+ TaskFailureRequest.builder()
+ .jobId(Long.parseLong(jobId))
+ .taskId(Long.parseLong(taskId))
+ .reason(reason)
+ .build());
+ HttpPutBuilder builder =
+ new HttpPutBuilder()
+ .addCommonHeader()
+ .addBodyContentType()
+ .addTokenAuth(token)
+ .setUrl(url)
+ .setEntity(new StringEntity(param));
+ try (CloseableHttpClient client =
HttpUtil.getHttpClient(REPORT_FAILURE_TIMEOUT_MS);
+ CloseableHttpResponse resp =
client.execute(builder.build())) {
+ LOG.info(
+ "report task failure jobId={} taskId={} status={}",
+ jobId,
+ taskId,
+ resp.getStatusLine().getStatusCode());
+ }
+ } catch (Exception ex) {
+ LOG.warn("report task failure failed, jobId={} taskId={}", jobId,
taskId, ex);
+ }
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
index 05407b2c89d..88c8accf65f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
@@ -31,6 +31,10 @@ public class HttpUtil {
private static int socketTimeout = 10 * 60 * 1000; // stream load timeout
10 min
public static CloseableHttpClient getHttpClient() {
+ return getHttpClient(socketTimeout);
+ }
+
+ public static CloseableHttpClient getHttpClient(int socketTimeoutMs) {
return HttpClients.custom()
// default timeout 3s, maybe report 307 error when fe busy
.setRequestExecutor(new
HttpRequestExecutor(waitForContinueTimeout))
@@ -47,7 +51,7 @@ public class HttpUtil {
RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectTimeout)
- .setSocketTimeout(socketTimeout)
+ .setSocketTimeout(socketTimeoutMs)
.build())
.addInterceptorLast(new RequestContent(true))
.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]