This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f8a4636c007 branch-3.0: [feature](restore) introduce 
AgentBoundedBatchTask to manage concurrent restore tasks #50740 (#50843)
f8a4636c007 is described below

commit f8a4636c007c5bb07c82ca8b26995b9cc0a3e67b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 13 22:23:00 2025 +0800

    branch-3.0: [feature](restore) introduce AgentBoundedBatchTask to manage 
concurrent restore tasks #50740 (#50843)
    
    Cherry-picked from #50740
    
    Co-authored-by: walter <[email protected]>
    Co-authored-by: wubiao02 <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |   9 +
 .../java/org/apache/doris/backup/RestoreJob.java   |  42 ++-
 .../java/org/apache/doris/master/MasterImpl.java   |   4 +
 .../java/org/apache/doris/task/AgentBatchTask.java |   6 +-
 .../apache/doris/task/AgentBoundedBatchTask.java   | 281 +++++++++++++++++++++
 5 files changed, 313 insertions(+), 29 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 3d8b2d3f3ee..486c1838bfd 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2794,6 +2794,15 @@ public class Config extends ConfigBase {
     })
     public static int backup_restore_batch_task_num_per_rpc = 10000;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "一个 BE 同时执行的恢复任务的并发数",
+            "The number of concurrent restore tasks per be"})
+    public static int restore_task_concurrency_per_be = 5000;
+
+    @ConfField(mutable = true, description = {"执行 agent task 
时,BE心跳超过多长时间,认为BE不可用",
+            "The time after which BE is considered unavailable if the 
heartbeat is not received"})
+    public static int agent_task_be_unavailable_heartbeat_timeout_second = 300;
+
     @ConfField(description = {"是否开启通过http接口获取log文件的功能",
             "Whether to enable the function of getting log files through http 
interface"})
     public static boolean enable_get_log_file_api = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 44e29e2c106..83569e10eab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -73,6 +73,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentBoundedBatchTask;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
@@ -680,7 +681,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         Map<Long, TabletRef> tabletBases = new HashMap<>();
 
         // Check and prepare meta objects.
-        Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
+        Map<Long, AgentBoundedBatchTask> batchTaskPerTable = new HashMap<>();
 
         // The tables that are restored but not committed, because the table 
name may be changed.
         List<Table> stagingRestoreTables = Lists.newArrayList();
@@ -949,9 +950,10 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                 BackupPartitionInfo backupPartitionInfo
                         = 
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
 
-                AgentBatchTask batchTask = 
batchTaskPerTable.get(localTbl.getId());
+                AgentBoundedBatchTask batchTask = 
batchTaskPerTable.get(localTbl.getId());
                 if (batchTask == null) {
-                    batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+                    batchTask = new AgentBoundedBatchTask(
+                            Config.backup_restore_batch_task_num_per_rpc, 
Config.restore_task_concurrency_per_be);
                     batchTaskPerTable.put(localTbl.getId(), batchTask);
                 }
                 createReplicas(db, batchTask, localTbl, restorePart);
@@ -965,9 +967,10 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                 if (restoreTbl.getType() == TableType.OLAP) {
                     OlapTable restoreOlapTable = (OlapTable) restoreTbl;
                     for (Partition restorePart : 
restoreOlapTable.getPartitions()) {
-                        AgentBatchTask batchTask = 
batchTaskPerTable.get(restoreTbl.getId());
+                        AgentBoundedBatchTask batchTask = 
batchTaskPerTable.get(restoreTbl.getId());
                         if (batchTask == null) {
-                            batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+                            batchTask = new 
AgentBoundedBatchTask(Config.backup_restore_batch_task_num_per_rpc,
+                                    Config.restore_task_concurrency_per_be);
                             batchTaskPerTable.put(restoreTbl.getId(), 
batchTask);
                         }
                         createReplicas(db, batchTask, restoreOlapTable, 
restorePart, tabletBases);
@@ -1019,7 +1022,6 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                 for (AgentTask task : batchTask.getAllTasks()) {
                     createReplicaTasksLatch.addMark(task.getBackendId(), 
task.getTabletId());
                     ((CreateReplicaTask) 
task).setLatch(createReplicaTasksLatch);
-                    AgentTaskQueue.addTask(task);
                 }
                 AgentTaskExecutor.submit(batchTask);
             }
@@ -1235,7 +1237,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         taskProgress.clear();
         taskErrMsg.clear();
         Multimap<Long, Long> bePathsMap = HashMultimap.create();
-        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+        AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+                Config.backup_restore_batch_task_num_per_rpc, 
Config.restore_task_concurrency_per_be);
         db.readLock();
         try {
             for (Map.Entry<IdChain, IdChain> entry : 
fileMapping.getMapping().entrySet()) {
@@ -1277,10 +1280,6 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             return;
         }
 
-        // send tasks
-        for (AgentTask task : batchTask.getAllTasks()) {
-            AgentTaskQueue.addTask(task);
-        }
         AgentTaskExecutor.submit(batchTask);
         LOG.info("finished to send snapshot tasks, num: {}. {}", 
batchTask.getTaskNum(), this);
     }
@@ -1724,7 +1723,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+        AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+                Config.backup_restore_batch_task_num_per_rpc, 
Config.restore_task_concurrency_per_be);
         for (long dbId : dbToSnapshotInfos.keySet()) {
             List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
 
@@ -1853,10 +1853,6 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             }
         }
 
-        // send task
-        for (AgentTask task : batchTask.getAllTasks()) {
-            AgentTaskQueue.addTask(task);
-        }
         AgentTaskExecutor.submit(batchTask);
 
         state = RestoreJobState.DOWNLOADING;
@@ -1876,7 +1872,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+        AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+                Config.backup_restore_batch_task_num_per_rpc, 
Config.restore_task_concurrency_per_be);
         for (long dbId : dbToSnapshotInfos.keySet()) {
             List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
 
@@ -2019,10 +2016,6 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             }
         }
 
-        // send task
-        for (AgentTask task : batchTask.getAllTasks()) {
-            AgentTaskQueue.addTask(task);
-        }
         AgentTaskExecutor.submit(batchTask);
 
         state = RestoreJobState.DOWNLOADING;
@@ -2051,7 +2044,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         unfinishedSignatureToId.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-        AgentBatchTask batchTask = new 
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+        AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+                Config.backup_restore_batch_task_num_per_rpc, 
Config.restore_task_concurrency_per_be);
         // tablet id->(be id -> download info)
         for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
             SnapshotInfo info = cell.getValue();
@@ -2063,10 +2057,6 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             unfinishedSignatureToId.put(signature, info.getTabletId());
         }
 
-        // send task
-        for (AgentTask task : batchTask.getAllTasks()) {
-            AgentTaskQueue.addTask(task);
-        }
         AgentTaskExecutor.submit(batchTask);
 
         state = RestoreJobState.COMMITTING;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index b0b5a9bde37..213984ae982 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -256,6 +256,7 @@ public class MasterImpl {
                 createReplicaTask.countDownToZero(task.getBackendId() + ": "
                         + request.getTaskStatus().getErrorMsgs().toString());
             } else {
+                createReplicaTask.setFinished(true);
                 long tabletId = createReplicaTask.getTabletId();
 
                 if (request.isSetFinishTabletInfos()) {
@@ -590,6 +591,7 @@ public class MasterImpl {
 
     private void finishMakeSnapshot(AgentTask task, TFinishTaskRequest 
request) {
         SnapshotTask snapshotTask = (SnapshotTask) task;
+        task.setFinished(true);
         if (snapshotTask.isCopyTabletTask()) {
             snapshotTask.setResultSnapshotPath(request.getSnapshotPath());
             snapshotTask.countDown(task.getBackendId(), task.getTabletId());
@@ -609,6 +611,7 @@ public class MasterImpl {
 
     private void finishDownloadTask(AgentTask task, TFinishTaskRequest 
request) {
         DownloadTask downloadTask = (DownloadTask) task;
+        task.setFinished(true);
         if 
(Env.getCurrentEnv().getBackupHandler().handleDownloadSnapshotTask(downloadTask,
 request)) {
             AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DOWNLOAD, 
task.getSignature());
         }
@@ -616,6 +619,7 @@ public class MasterImpl {
 
     private void finishMoveDirTask(AgentTask task, TFinishTaskRequest request) 
{
         DirMoveTask dirMoveTask = (DirMoveTask) task;
+        task.setFinished(true);
         if 
(Env.getCurrentEnv().getBackupHandler().handleDirMoveTask(dirMoveTask, 
request)) {
             AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.MOVE, 
task.getSignature());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 328e4ba233d..89ea9168c74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -72,10 +72,10 @@ import java.util.stream.Collectors;
 public class AgentBatchTask implements Runnable {
     private static final Logger LOG = 
LogManager.getLogger(AgentBatchTask.class);
 
-    private int batchSize = Integer.MAX_VALUE;
+    protected int batchSize = Integer.MAX_VALUE;
 
     // backendId -> AgentTask List
-    private Map<Long, List<AgentTask>> backendIdToTasks;
+    protected Map<Long, List<AgentTask>> backendIdToTasks;
 
     public AgentBatchTask() {
         this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
@@ -246,7 +246,7 @@ public class AgentBatchTask implements Runnable {
         }
     }
 
-    private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
+    protected TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
         TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
         tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);
         tAgentTaskRequest.setSignature(task.getSignature());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBoundedBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBoundedBatchTask.java
new file mode 100644
index 00000000000..f6cd5fd009a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBoundedBatchTask.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThriftUtils;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TAgentTaskRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+/*
+ * Like AgentBatchTask, but this class is used to submit tasks to BE in a 
bounded way, to avoid BE OOM.
+ */
+public class AgentBoundedBatchTask extends AgentBatchTask {
+    private static final Logger LOG = 
LogManager.getLogger(AgentBoundedBatchTask.class);
+    private static final int RPC_MAX_RETRY_TIMES = 3;
+
+    private int taskConcurrency;
+    private Map<Long, Integer> backendIdToConsumedTaskIndex;
+    private int beUnavailableMaxLostTimeSecond;
+
+    /**
+     * NOTE:
+     * this class is used to submit tasks to BE in a bounded way,
+     * and it will automatically add to AgentTaskQueue.
+     *
+     * @param batchSize       the max number of tasks to submit to BE in one 
time
+     * @param taskConcurrency the max number of tasks to submit to BE in one 
time
+     */
+    public AgentBoundedBatchTask(int batchSize, int taskConcurrency) {
+        super(batchSize);
+        this.taskConcurrency = taskConcurrency;
+        this.backendIdToConsumedTaskIndex = new HashMap<>();
+        this.beUnavailableMaxLostTimeSecond = 
Config.agent_task_be_unavailable_heartbeat_timeout_second;
+    }
+
+    @Override
+    public void addTask(AgentTask agentTask) {
+        if (agentTask == null) {
+            return;
+        }
+        long backendId = agentTask.getBackendId();
+        if (backendIdToTasks.containsKey(backendId)) {
+            List<AgentTask> tasks = backendIdToTasks.get(backendId);
+            tasks.add(agentTask);
+        } else {
+            List<AgentTask> tasks = new ArrayList<>();
+            tasks.add(agentTask);
+            backendIdToTasks.put(backendId, tasks);
+        }
+    }
+
+    @Override
+    public void run() {
+        int taskNum = getTaskNum();
+        LOG.info("begin to submit tasks to BE. total {} tasks, be task 
concurrency: {}", taskNum, taskConcurrency);
+        boolean submitFinished = false;
+        while (getSubmitTaskNum() < taskNum && !submitFinished) {
+            for (Long backendId : backendIdToTasks.keySet()) {
+                int consumedTaskIndex = 
backendIdToConsumedTaskIndex.getOrDefault(backendId, 0);
+                if (consumedTaskIndex >= 
backendIdToTasks.get(backendId).size()) {
+                    LOG.info("backend {} has submitted all tasks, taskNum: {}",
+                            backendId, backendIdToTasks.get(backendId).size());
+                    continue;
+                }
+
+                boolean ok = false;
+                String errMsg = "";
+                Backend backend = null;
+                List<AgentTask> tasks = new ArrayList<>();
+                List<TAgentTaskRequest> agentTaskRequests = new ArrayList<>();
+                try {
+                    backend = Env.getCurrentSystemInfo().getBackend(backendId);
+                    tasks = this.backendIdToTasks.getOrDefault(backendId, new 
ArrayList<>());
+                    if (backend == null) {
+                        errMsg = String.format("backend %d is not found", 
backendId);
+                        throw new RuntimeException(errMsg);
+                    }
+                    if (!backend.isAlive()) {
+                        errMsg = String.format("backend %d is not alive", 
backendId);
+                        if (System.currentTimeMillis() - 
backend.getLastUpdateMs()
+                                > beUnavailableMaxLostTimeSecond * 1000) {
+                            errMsg = String.format("backend %d is not alive 
too long, last update time: %s",
+                                    backendId, 
TimeUtils.longToTimeString(backend.getLastUpdateMs()));
+                            throw new RuntimeException(errMsg);
+                        }
+                        continue;
+                    }
+
+                    int runningTaskNum = getRunningTaskNum(backendId);
+                    LOG.info("backend {} has {} running tasks, task 
concurrency: {}",
+                            backendId, runningTaskNum, taskConcurrency);
+                    int index = consumedTaskIndex;
+                    for (; index < tasks.size()
+                            && index < consumedTaskIndex + taskConcurrency - 
runningTaskNum; index++) {
+                        
agentTaskRequests.add(toAgentTaskRequest(tasks.get(index)));
+                        // add to AgentTaskQueue
+                        AgentTaskQueue.addTask(tasks.get(index));
+                        if (agentTaskRequests.size() >= batchSize) {
+                            submitTasks(backend, agentTaskRequests);
+                            agentTaskRequests.clear();
+                        }
+                    }
+                    submitTasks(backend, agentTaskRequests);
+                    backendIdToConsumedTaskIndex.put(backendId, index);
+                    LOG.info("submit task to backend {} finished, already 
submitted task num: {}/{}",
+                            backendId, index, tasks.size());
+                    ok = true;
+                } catch (Exception e) {
+                    LOG.warn("task exec error. backend[{}]", backendId, e);
+                    errMsg = String.format("task exec error: %s. backend[%d]", 
e.getMessage(), backendId);
+                    if (!agentTaskRequests.isEmpty() && 
errMsg.contains("Broken pipe")) {
+                        // Log the task binary message size and the max task 
type, to help debug the
+                        // large thrift message size issue.
+                        List<Pair<TTaskType, Long>> taskTypeAndSize = 
agentTaskRequests.stream()
+                                .map(req -> Pair.of(req.getTaskType(), 
ThriftUtils.getBinaryMessageSize(req)))
+                                .collect(Collectors.toList());
+                        Pair<TTaskType, Long> maxTaskTypeAndSize = 
taskTypeAndSize.stream()
+                                .max((p1, p2) -> Long.compare(p1.value(), 
p2.value()))
+                                .orElse(null);  // taskTypeAndSize is not empty
+                        TTaskType maxType = maxTaskTypeAndSize.first;
+                        long maxSize = maxTaskTypeAndSize.second;
+                        long totalSize = 
taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum);
+                        LOG.warn("submit {} tasks to backend[{}], total size: 
{}, max task type: {}, size: {}. msg: {}",
+                                agentTaskRequests.size(), backendId, 
totalSize, maxType, maxSize, e.getMessage());
+                    }
+                } finally {
+                    if (!ok) {
+                        submitFinished = true;
+                        LOG.warn("submit task to backend {} failed, errMsg: 
{}, cancel all tasks", backendId, errMsg);
+                        cancelAllTasks(errMsg);
+                    }
+                }
+            }
+
+            try {
+                TimeUnit.SECONDS.sleep(3);
+            } catch (InterruptedException e) {
+                String errMsg = "submit task thread is interrupted";
+                LOG.warn(errMsg, e);
+                submitFinished = true;
+                cancelAllTasks(errMsg);
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+    }
+
+    private static void submitTasks(Backend backend, List<TAgentTaskRequest> 
agentTaskRequests) throws Exception {
+        long start = System.currentTimeMillis();
+        if (agentTaskRequests.isEmpty()) {
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            long size = agentTaskRequests.stream()
+                    .map(ThriftUtils::getBinaryMessageSize)
+                    .reduce(0L, Long::sum);
+            TTaskType firstTaskType = agentTaskRequests.get(0).getTaskType();
+            LOG.debug("submit {} tasks to backend[{}], total size: {}, first 
task type: {}",
+                    agentTaskRequests.size(), backend.getId(), size, 
firstTaskType);
+            for (TAgentTaskRequest req : agentTaskRequests) {
+                LOG.debug("send task: type[{}], backend[{}], signature[{}]",
+                        req.getTaskType(), backend.getId(), 
req.getSignature());
+            }
+        }
+
+        MetricRepo.COUNTER_AGENT_TASK_REQUEST_TOTAL.increase(1L);
+
+        BackendService.Client client = null;
+        TNetworkAddress address = null;
+        // create AgentClient
+        String host = FeConstants.runningUnitTest ? "127.0.0.1" : 
backend.getHost();
+        address = new TNetworkAddress(host, backend.getBePort());
+        long backendId = backend.getId();
+        boolean ok = false;
+        for (int attempt = 1; attempt <= RPC_MAX_RETRY_TIMES; attempt++) {
+            try {
+                if (client == null) {
+                    // borrow new client when previous client request failed
+                    client = ClientPool.backendPool.borrowObject(address);
+                }
+                client.submitTasks(agentTaskRequests);
+                ok = true;
+                break;
+            } catch (Exception e) {
+                if (attempt == RPC_MAX_RETRY_TIMES) {
+                    LOG.warn("submit task to agent failed. backend[{}], 
request size: {}, elapsed:{} ms error: {}",
+                            backendId, agentTaskRequests.size(), 
System.currentTimeMillis() - start,
+                            e.getMessage());
+                    throw e;
+                } else {
+                    LOG.warn("submit task attempt {} failed, retrying... 
backend[{}], error: {}",
+                            attempt, backendId, e.getMessage());
+                    try {
+                        Thread.sleep(200);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            } finally {
+                if (ok) {
+                    ClientPool.backendPool.returnObject(address, client);
+                } else {
+                    ClientPool.backendPool.invalidateObject(address, client);
+                    client = null;
+                }
+            }
+        }
+    }
+
+    private int getSubmitTaskNum() {
+        return backendIdToConsumedTaskIndex.values().stream()
+                .mapToInt(Integer::intValue)
+                .sum();
+    }
+
+    private int getFinishedTaskNum(long backendId) {
+        int count = 0;
+        for (AgentTask agentTask : this.backendIdToTasks.get(backendId)) {
+            if (agentTask.isFinished()) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private int getRunningTaskNum(long backendId) {
+        int count = 0;
+        List<AgentTask> tasks = backendIdToTasks.get(backendId);
+        int consumedTaskIndex = 
backendIdToConsumedTaskIndex.getOrDefault(backendId, 0);
+        for (int i = 0; i < consumedTaskIndex; i++) {
+            if (!tasks.get(i).isFinished) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void cancelAllTasks(String errMsg) {
+        for (List<AgentTask> beTasks : backendIdToTasks.values()) {
+            for (AgentTask task : beTasks) {
+                task.failedWithMsg(errMsg);
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to