This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de1fcd37ce7 [fix](agent) cancel agent task when it is rejected by
agent-task-pool (#51138)
de1fcd37ce7 is described below
commit de1fcd37ce7c6cb1f217c46a342a73c5aaa05bc1
Author: walter <[email protected]>
AuthorDate: Sat May 24 11:31:18 2025 +0800
[fix](agent) cancel agent task when it is rejected by agent-task-pool
(#51138)
---
.../java/org/apache/doris/master/ReportHandler.java | 10 +---------
.../main/java/org/apache/doris/task/AgentTask.java | 11 +++++++++++
.../java/org/apache/doris/task/AgentTaskExecutor.java | 19 ++++++++++++++++---
.../java/org/apache/doris/task/AgentTaskQueue.java | 13 +++++++++++++
4 files changed, 41 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 7fc7e62fab0..813e041a072 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -642,14 +642,7 @@ public class ReportHandler extends Daemon {
AgentBatchTask batchTask = new
AgentBatchTask(Config.report_resend_batch_task_num_per_rpc);
long taskReportTime = System.currentTimeMillis();
for (AgentTask task : diffTasks) {
- // these tasks no need to do diff
- // 1. CREATE
- // 2. SYNC DELETE
- // 3. CHECK_CONSISTENCY
- // 4. STORAGE_MDEIUM_MIGRATE
- if (task.getTaskType() == TTaskType.CREATE
- || task.getTaskType() == TTaskType.CHECK_CONSISTENCY
- || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE)
{
+ if (!task.isNeedResendType()) {
continue;
}
@@ -658,7 +651,6 @@ public class ReportHandler extends Daemon {
MetricRepo.COUNTER_AGENT_TASK_RESEND_TOTAL.getOrAdd(task.getTaskType().toString()).increase(1L);
batchTask.addTask(task);
}
-
}
if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
index f29ab3ef2b4..b5cc8fc2085 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
@@ -145,6 +145,17 @@ public abstract class AgentTask {
return isFinished;
}
+ public boolean isNeedResendType() {
+ // these tasks no need to do diff
+ // 1. CREATE
+ // 2. SYNC DELETE
+ // 3. CHECK_CONSISTENCY
+ // 4. STORAGE_MEDIUM_MIGRATE
+ return !(taskType == TTaskType.CREATE
+ || taskType == TTaskType.CHECK_CONSISTENCY
+ || taskType == TTaskType.STORAGE_MEDIUM_MIGRATE);
+ }
+
public boolean shouldResend(long currentTimeMillis) {
return createTime == -1 || currentTimeMillis - createTime >
Config.agent_task_resend_wait_time_ms;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
index 8297ef2fff8..c8f5c977848 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
@@ -21,21 +21,34 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
public class AgentTaskExecutor {
- private static final ExecutorService EXECUTOR =
ThreadPoolManager.newDaemonCacheThreadPool(
+ private static final ExecutorService EXECUTOR =
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
Config.max_agent_task_threads_num, "agent-task-pool", true);
public AgentTaskExecutor() {
-
}
public static void submit(AgentBatchTask task) {
if (task == null) {
return;
}
- EXECUTOR.submit(task);
+ try {
+ EXECUTOR.submit(task);
+ } catch (RejectedExecutionException e) {
+ String msg = "Task is rejected, because the agent-task-pool is
full, "
+ + "consider increasing the max_agent_task_threads_num
config";
+ for (AgentTask t : task.getAllTasks()) {
+ // Skip the task if it is a resend type and already exists in
the queue, because it will be
+ // re-submit to the executor later.
+ if (t.isNeedResendType() && AgentTaskQueue.contains(t)) {
+ continue;
+ }
+ t.failedWithMsg(msg);
+ }
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index bd68d87f191..97e1a3cc676 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -132,6 +132,19 @@ public class AgentTaskQueue {
}
}
+ public static synchronized boolean contains(AgentTask task) {
+ long backendId = task.getBackendId();
+ TTaskType type = task.getTaskType();
+ long signature = task.getSignature();
+
+ if (!tasks.contains(backendId, type)) {
+ return false;
+ }
+
+ Map<Long, AgentTask> signatureMap = tasks.get(backendId, type);
+ return signatureMap.containsKey(signature);
+ }
+
public static synchronized AgentTask getTask(long backendId, TTaskType
type, long signature) {
if (!tasks.contains(backendId, type)) {
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]