This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f8a4636c007 branch-3.0: [feature](restore) introduce
AgentBoundedBatchTask to manage concurrent restore tasks #50740 (#50843)
f8a4636c007 is described below
commit f8a4636c007c5bb07c82ca8b26995b9cc0a3e67b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 13 22:23:00 2025 +0800
branch-3.0: [feature](restore) introduce AgentBoundedBatchTask to manage
concurrent restore tasks #50740 (#50843)
Cherry-picked from #50740
Co-authored-by: walter <[email protected]>
Co-authored-by: wubiao02 <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 9 +
.../java/org/apache/doris/backup/RestoreJob.java | 42 ++-
.../java/org/apache/doris/master/MasterImpl.java | 4 +
.../java/org/apache/doris/task/AgentBatchTask.java | 6 +-
.../apache/doris/task/AgentBoundedBatchTask.java | 281 +++++++++++++++++++++
5 files changed, 313 insertions(+), 29 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 3d8b2d3f3ee..486c1838bfd 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2794,6 +2794,15 @@ public class Config extends ConfigBase {
})
public static int backup_restore_batch_task_num_per_rpc = 10000;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "一个 BE 同时执行的恢复任务的并发数",
+ "The number of concurrent restore tasks per be"})
+ public static int restore_task_concurrency_per_be = 5000;
+
+ @ConfField(mutable = true, description = {"执行 agent task
时,BE心跳超过多长时间,认为BE不可用",
+ "The time after which BE is considered unavailable if the
heartbeat is not received"})
+ public static int agent_task_be_unavailable_heartbeat_timeout_second = 300;
+
@ConfField(description = {"是否开启通过http接口获取log文件的功能",
"Whether to enable the function of getting log files through http
interface"})
public static boolean enable_get_log_file_api = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 44e29e2c106..83569e10eab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -73,6 +73,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentBoundedBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
@@ -680,7 +681,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
Map<Long, TabletRef> tabletBases = new HashMap<>();
// Check and prepare meta objects.
- Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
+ Map<Long, AgentBoundedBatchTask> batchTaskPerTable = new HashMap<>();
// The tables that are restored but not committed, because the table
name may be changed.
List<Table> stagingRestoreTables = Lists.newArrayList();
@@ -949,9 +950,10 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
BackupPartitionInfo backupPartitionInfo
=
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
- AgentBatchTask batchTask =
batchTaskPerTable.get(localTbl.getId());
+ AgentBoundedBatchTask batchTask =
batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
- batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ batchTask = new AgentBoundedBatchTask(
+ Config.backup_restore_batch_task_num_per_rpc,
Config.restore_task_concurrency_per_be);
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);
@@ -965,9 +967,10 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
if (restoreTbl.getType() == TableType.OLAP) {
OlapTable restoreOlapTable = (OlapTable) restoreTbl;
for (Partition restorePart :
restoreOlapTable.getPartitions()) {
- AgentBatchTask batchTask =
batchTaskPerTable.get(restoreTbl.getId());
+ AgentBoundedBatchTask batchTask =
batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
- batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ batchTask = new
AgentBoundedBatchTask(Config.backup_restore_batch_task_num_per_rpc,
+ Config.restore_task_concurrency_per_be);
batchTaskPerTable.put(restoreTbl.getId(),
batchTask);
}
createReplicas(db, batchTask, restoreOlapTable,
restorePart, tabletBases);
@@ -1019,7 +1022,6 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
for (AgentTask task : batchTask.getAllTasks()) {
createReplicaTasksLatch.addMark(task.getBackendId(),
task.getTabletId());
((CreateReplicaTask)
task).setLatch(createReplicaTasksLatch);
- AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
@@ -1235,7 +1237,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
taskProgress.clear();
taskErrMsg.clear();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
- AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+ Config.backup_restore_batch_task_num_per_rpc,
Config.restore_task_concurrency_per_be);
db.readLock();
try {
for (Map.Entry<IdChain, IdChain> entry :
fileMapping.getMapping().entrySet()) {
@@ -1277,10 +1280,6 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
return;
}
- // send tasks
- for (AgentTask task : batchTask.getAllTasks()) {
- AgentTaskQueue.addTask(task);
- }
AgentTaskExecutor.submit(batchTask);
LOG.info("finished to send snapshot tasks, num: {}. {}",
batchTask.getTaskNum(), this);
}
@@ -1724,7 +1723,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+ Config.backup_restore_batch_task_num_per_rpc,
Config.restore_task_concurrency_per_be);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
@@ -1853,10 +1853,6 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
}
- // send task
- for (AgentTask task : batchTask.getAllTasks()) {
- AgentTaskQueue.addTask(task);
- }
AgentTaskExecutor.submit(batchTask);
state = RestoreJobState.DOWNLOADING;
@@ -1876,7 +1872,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+ Config.backup_restore_batch_task_num_per_rpc,
Config.restore_task_concurrency_per_be);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
@@ -2019,10 +2016,6 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
}
- // send task
- for (AgentTask task : batchTask.getAllTasks()) {
- AgentTaskQueue.addTask(task);
- }
AgentTaskExecutor.submit(batchTask);
state = RestoreJobState.DOWNLOADING;
@@ -2051,7 +2044,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
+ AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
+ Config.backup_restore_batch_task_num_per_rpc,
Config.restore_task_concurrency_per_be);
// tablet id->(be id -> download info)
for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
SnapshotInfo info = cell.getValue();
@@ -2063,10 +2057,6 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.put(signature, info.getTabletId());
}
- // send task
- for (AgentTask task : batchTask.getAllTasks()) {
- AgentTaskQueue.addTask(task);
- }
AgentTaskExecutor.submit(batchTask);
state = RestoreJobState.COMMITTING;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index b0b5a9bde37..213984ae982 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -256,6 +256,7 @@ public class MasterImpl {
createReplicaTask.countDownToZero(task.getBackendId() + ": "
+ request.getTaskStatus().getErrorMsgs().toString());
} else {
+ createReplicaTask.setFinished(true);
long tabletId = createReplicaTask.getTabletId();
if (request.isSetFinishTabletInfos()) {
@@ -590,6 +591,7 @@ public class MasterImpl {
private void finishMakeSnapshot(AgentTask task, TFinishTaskRequest
request) {
SnapshotTask snapshotTask = (SnapshotTask) task;
+ task.setFinished(true);
if (snapshotTask.isCopyTabletTask()) {
snapshotTask.setResultSnapshotPath(request.getSnapshotPath());
snapshotTask.countDown(task.getBackendId(), task.getTabletId());
@@ -609,6 +611,7 @@ public class MasterImpl {
private void finishDownloadTask(AgentTask task, TFinishTaskRequest
request) {
DownloadTask downloadTask = (DownloadTask) task;
+ task.setFinished(true);
if
(Env.getCurrentEnv().getBackupHandler().handleDownloadSnapshotTask(downloadTask,
request)) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DOWNLOAD,
task.getSignature());
}
@@ -616,6 +619,7 @@ public class MasterImpl {
private void finishMoveDirTask(AgentTask task, TFinishTaskRequest request)
{
DirMoveTask dirMoveTask = (DirMoveTask) task;
+ task.setFinished(true);
if
(Env.getCurrentEnv().getBackupHandler().handleDirMoveTask(dirMoveTask,
request)) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.MOVE,
task.getSignature());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 328e4ba233d..89ea9168c74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -72,10 +72,10 @@ import java.util.stream.Collectors;
public class AgentBatchTask implements Runnable {
private static final Logger LOG =
LogManager.getLogger(AgentBatchTask.class);
- private int batchSize = Integer.MAX_VALUE;
+ protected int batchSize = Integer.MAX_VALUE;
// backendId -> AgentTask List
- private Map<Long, List<AgentTask>> backendIdToTasks;
+ protected Map<Long, List<AgentTask>> backendIdToTasks;
public AgentBatchTask() {
this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
@@ -246,7 +246,7 @@ public class AgentBatchTask implements Runnable {
}
}
- private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
+ protected TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);
tAgentTaskRequest.setSignature(task.getSignature());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBoundedBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBoundedBatchTask.java
new file mode 100644
index 00000000000..f6cd5fd009a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBoundedBatchTask.java
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThriftUtils;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TAgentTaskRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+/*
+ * Like AgentBatchTask, but this class is used to submit tasks to BE in a
bounded way, to avoid BE OOM.
+ */
+public class AgentBoundedBatchTask extends AgentBatchTask {
+ private static final Logger LOG =
LogManager.getLogger(AgentBoundedBatchTask.class);
+ private static final int RPC_MAX_RETRY_TIMES = 3;
+
+ private int taskConcurrency;
+ private Map<Long, Integer> backendIdToConsumedTaskIndex;
+ private int beUnavailableMaxLostTimeSecond;
+
+ /**
+ * NOTE:
+ * this class is used to submit tasks to BE in a bounded way,
+ * and it will automatically add to AgentTaskQueue.
+ *
+ * @param batchSize the max number of tasks to submit to BE in one
time
+ * @param taskConcurrency the max number of tasks to submit to BE in one
time
+ */
+ public AgentBoundedBatchTask(int batchSize, int taskConcurrency) {
+ super(batchSize);
+ this.taskConcurrency = taskConcurrency;
+ this.backendIdToConsumedTaskIndex = new HashMap<>();
+ this.beUnavailableMaxLostTimeSecond =
Config.agent_task_be_unavailable_heartbeat_timeout_second;
+ }
+
+ @Override
+ public void addTask(AgentTask agentTask) {
+ if (agentTask == null) {
+ return;
+ }
+ long backendId = agentTask.getBackendId();
+ if (backendIdToTasks.containsKey(backendId)) {
+ List<AgentTask> tasks = backendIdToTasks.get(backendId);
+ tasks.add(agentTask);
+ } else {
+ List<AgentTask> tasks = new ArrayList<>();
+ tasks.add(agentTask);
+ backendIdToTasks.put(backendId, tasks);
+ }
+ }
+
+ @Override
+ public void run() {
+ int taskNum = getTaskNum();
+ LOG.info("begin to submit tasks to BE. total {} tasks, be task
concurrency: {}", taskNum, taskConcurrency);
+ boolean submitFinished = false;
+ while (getSubmitTaskNum() < taskNum && !submitFinished) {
+ for (Long backendId : backendIdToTasks.keySet()) {
+ int consumedTaskIndex =
backendIdToConsumedTaskIndex.getOrDefault(backendId, 0);
+ if (consumedTaskIndex >=
backendIdToTasks.get(backendId).size()) {
+ LOG.info("backend {} has submitted all tasks, taskNum: {}",
+ backendId, backendIdToTasks.get(backendId).size());
+ continue;
+ }
+
+ boolean ok = false;
+ String errMsg = "";
+ Backend backend = null;
+ List<AgentTask> tasks = new ArrayList<>();
+ List<TAgentTaskRequest> agentTaskRequests = new ArrayList<>();
+ try {
+ backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ tasks = this.backendIdToTasks.getOrDefault(backendId, new
ArrayList<>());
+ if (backend == null) {
+ errMsg = String.format("backend %d is not found",
backendId);
+ throw new RuntimeException(errMsg);
+ }
+ if (!backend.isAlive()) {
+ errMsg = String.format("backend %d is not alive",
backendId);
+ if (System.currentTimeMillis() -
backend.getLastUpdateMs()
+ > beUnavailableMaxLostTimeSecond * 1000) {
+ errMsg = String.format("backend %d is not alive
too long, last update time: %s",
+ backendId,
TimeUtils.longToTimeString(backend.getLastUpdateMs()));
+ throw new RuntimeException(errMsg);
+ }
+ continue;
+ }
+
+ int runningTaskNum = getRunningTaskNum(backendId);
+ LOG.info("backend {} has {} running tasks, task
concurrency: {}",
+ backendId, runningTaskNum, taskConcurrency);
+ int index = consumedTaskIndex;
+ for (; index < tasks.size()
+ && index < consumedTaskIndex + taskConcurrency -
runningTaskNum; index++) {
+
agentTaskRequests.add(toAgentTaskRequest(tasks.get(index)));
+ // add to AgentTaskQueue
+ AgentTaskQueue.addTask(tasks.get(index));
+ if (agentTaskRequests.size() >= batchSize) {
+ submitTasks(backend, agentTaskRequests);
+ agentTaskRequests.clear();
+ }
+ }
+ submitTasks(backend, agentTaskRequests);
+ backendIdToConsumedTaskIndex.put(backendId, index);
+ LOG.info("submit task to backend {} finished, already
submitted task num: {}/{}",
+ backendId, index, tasks.size());
+ ok = true;
+ } catch (Exception e) {
+ LOG.warn("task exec error. backend[{}]", backendId, e);
+ errMsg = String.format("task exec error: %s. backend[%d]",
e.getMessage(), backendId);
+ if (!agentTaskRequests.isEmpty() &&
errMsg.contains("Broken pipe")) {
+ // Log the task binary message size and the max task
type, to help debug the
+ // large thrift message size issue.
+ List<Pair<TTaskType, Long>> taskTypeAndSize =
agentTaskRequests.stream()
+ .map(req -> Pair.of(req.getTaskType(),
ThriftUtils.getBinaryMessageSize(req)))
+ .collect(Collectors.toList());
+ Pair<TTaskType, Long> maxTaskTypeAndSize =
taskTypeAndSize.stream()
+ .max((p1, p2) -> Long.compare(p1.value(),
p2.value()))
+ .orElse(null); // taskTypeAndSize is not empty
+ TTaskType maxType = maxTaskTypeAndSize.first;
+ long maxSize = maxTaskTypeAndSize.second;
+ long totalSize =
taskTypeAndSize.stream().map(Pair::value).reduce(0L, Long::sum);
+ LOG.warn("submit {} tasks to backend[{}], total size:
{}, max task type: {}, size: {}. msg: {}",
+ agentTaskRequests.size(), backendId,
totalSize, maxType, maxSize, e.getMessage());
+ }
+ } finally {
+ if (!ok) {
+ submitFinished = true;
+ LOG.warn("submit task to backend {} failed, errMsg:
{}, cancel all tasks", backendId, errMsg);
+ cancelAllTasks(errMsg);
+ }
+ }
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(3);
+ } catch (InterruptedException e) {
+ String errMsg = "submit task thread is interrupted";
+ LOG.warn(errMsg, e);
+ submitFinished = true;
+ cancelAllTasks(errMsg);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ private static void submitTasks(Backend backend, List<TAgentTaskRequest>
agentTaskRequests) throws Exception {
+ long start = System.currentTimeMillis();
+ if (agentTaskRequests.isEmpty()) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ long size = agentTaskRequests.stream()
+ .map(ThriftUtils::getBinaryMessageSize)
+ .reduce(0L, Long::sum);
+ TTaskType firstTaskType = agentTaskRequests.get(0).getTaskType();
+ LOG.debug("submit {} tasks to backend[{}], total size: {}, first
task type: {}",
+ agentTaskRequests.size(), backend.getId(), size,
firstTaskType);
+ for (TAgentTaskRequest req : agentTaskRequests) {
+ LOG.debug("send task: type[{}], backend[{}], signature[{}]",
+ req.getTaskType(), backend.getId(),
req.getSignature());
+ }
+ }
+
+ MetricRepo.COUNTER_AGENT_TASK_REQUEST_TOTAL.increase(1L);
+
+ BackendService.Client client = null;
+ TNetworkAddress address = null;
+ // create AgentClient
+ String host = FeConstants.runningUnitTest ? "127.0.0.1" :
backend.getHost();
+ address = new TNetworkAddress(host, backend.getBePort());
+ long backendId = backend.getId();
+ boolean ok = false;
+ for (int attempt = 1; attempt <= RPC_MAX_RETRY_TIMES; attempt++) {
+ try {
+ if (client == null) {
+ // borrow new client when previous client request failed
+ client = ClientPool.backendPool.borrowObject(address);
+ }
+ client.submitTasks(agentTaskRequests);
+ ok = true;
+ break;
+ } catch (Exception e) {
+ if (attempt == RPC_MAX_RETRY_TIMES) {
+ LOG.warn("submit task to agent failed. backend[{}],
request size: {}, elapsed:{} ms error: {}",
+ backendId, agentTaskRequests.size(),
System.currentTimeMillis() - start,
+ e.getMessage());
+ throw e;
+ } else {
+ LOG.warn("submit task attempt {} failed, retrying...
backend[{}], error: {}",
+ attempt, backendId, e.getMessage());
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } finally {
+ if (ok) {
+ ClientPool.backendPool.returnObject(address, client);
+ } else {
+ ClientPool.backendPool.invalidateObject(address, client);
+ client = null;
+ }
+ }
+ }
+ }
+
+ private int getSubmitTaskNum() {
+ return backendIdToConsumedTaskIndex.values().stream()
+ .mapToInt(Integer::intValue)
+ .sum();
+ }
+
+ private int getFinishedTaskNum(long backendId) {
+ int count = 0;
+ for (AgentTask agentTask : this.backendIdToTasks.get(backendId)) {
+ if (agentTask.isFinished()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int getRunningTaskNum(long backendId) {
+ int count = 0;
+ List<AgentTask> tasks = backendIdToTasks.get(backendId);
+ int consumedTaskIndex =
backendIdToConsumedTaskIndex.getOrDefault(backendId, 0);
+ for (int i = 0; i < consumedTaskIndex; i++) {
+ if (!tasks.get(i).isFinished) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private void cancelAllTasks(String errMsg) {
+ for (List<AgentTask> beTasks : backendIdToTasks.values()) {
+ for (AgentTask task : beTasks) {
+ task.failedWithMsg(errMsg);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]