liaoxin01 commented on code in PR #64301:
URL: https://github.com/apache/doris/pull/64301#discussion_r3443174255
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1404,20 +1406,51 @@ public void gsonPostProcess() throws IOException {
}
}
+ /**
+ * Push from cdc_client: fail the running task immediately on a hard write
failure.
+ * Reports whose taskId no longer matches the current running task are
dropped.
+ */
+ public void reportTaskFailure(TaskFailureRequest request) throws
JobException {
+ AbstractStreamingTask task = this.runningStreamTask;
+ if (!(task instanceof StreamingMultiTblTask)) {
+ return;
+ }
+ StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+ if (runningMultiTask.getTaskId() != request.getTaskId()) {
+ return;
+ }
+ writeLock();
+ try {
+ if (this.runningStreamTask == runningMultiTask
+ && runningMultiTask.getTaskId() == request.getTaskId()
+ &&
TaskStatus.RUNNING.equals(runningMultiTask.getStatus())) {
+ runningMultiTask.onFail(request.getReason());
Review Comment:
[P2] This path releases the job write lock twice. `reportTaskFailure()`
acquires the lock here and its `finally` calls `writeUnlock()`, but
`runningMultiTask.onFail()` reaches `StreamingInsertJob.onStreamTaskFail()`,
whose `finally` already calls `writeUnlock()`. Therefore every accepted failure
report can finish with `IllegalMonitorStateException` after changing the job
state, causing the REST endpoint to return an error and making
retries/observability misleading. Please centralize lock ownership so exactly
one layer releases it.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -406,6 +409,13 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
closeJobStreamLoad(writeRecordRequest.getJobId());
String rootCauseMessage =
ExceptionUtils.getRootCauseMessage(ex);
taskErrorMaps.put(writeRecordRequest.getTaskId(),
rootCauseMessage);
Review Comment:
[P2] This is not a fixed small cache: task IDs are unique and this map has
no size bound or expiry. When the push below succeeds, FE immediately
fails/pauses the task and normally never polls `/api/getTaskStatus` for that
old task ID, so the only `remove(taskId)` consumer is bypassed. `/api/close`,
reader release, and creation of the next task also do not clear this entry.
Repeated auto-resume failures therefore retain one error string per historical
task indefinitely. Please remove the entry after FE confirms that it accepted
the push, while retaining it when the push fails as the polling fallback, or
use an explicitly bounded/expiring store.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -600,4 +603,40 @@ public void commitOffset(
throw new StreamLoadException("Failed to commit offset", ex);
}
}
+
+ /**
+ * Best-effort push: tell the FE a running task hit a hard write failure
so it is failed within
+ * seconds instead of waiting out the timeout budget. Never throws; if the
push is lost the
+ * progress-aware timeout on the FE is the backstop.
+ */
+ public static void reportTaskFailure(
+ String frontendAddress, String token, String jobId, String taskId,
String reason) {
+ try {
+ String url = String.format(REPORT_FAILURE_URL_PATTERN,
frontendAddress);
+ String param =
+ OBJECT_MAPPER.writeValueAsString(
+ TaskFailureRequest.builder()
+ .jobId(Long.parseLong(jobId))
+ .taskId(Long.parseLong(taskId))
+ .reason(reason)
+ .build());
+ HttpPutBuilder builder =
+ new HttpPutBuilder()
+ .addCommonHeader()
+ .addBodyContentType()
+ .addTokenAuth(token)
+ .setUrl(url)
+ .setEntity(new StringEntity(param));
+ try (CloseableHttpClient client = HttpUtil.getHttpClient();
Review Comment:
[P1] This synchronous failure-report runs on the same CDC worker that
executes `writeRecordsAsync`, but `HttpUtil.getHttpClient()` has a 10-minute
socket timeout. If FE is unreachable, each failed task can pin one of the
default 10 pipeline executor threads for up to 10 minutes; ten such failures
block unrelated CDC jobs from starting. Please use a short timeout specifically
for this best-effort notification, or dispatch it through a separate bounded
executor so failure reporting cannot exhaust the data-processing pool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]