EmmyMiao87 commented on a change in pull request #6342:
URL: https://github.com/apache/incubator-doris/pull/6342#discussion_r679074901
##########
File path:
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
##########
@@ -344,30 +345,54 @@ public long getMinTaskBeId(String clusterName) throws
LoadException {
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
- public boolean checkBeToTask(long beId, String clusterName) throws
LoadException {
+ public long getAvailableBeForTask(long previoudBeId, String clusterName)
throws LoadException {
List<Long> beIdsInCluster =
Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
if (beIdsInCluster == null) {
throw new LoadException("The " + clusterName + " has been
deleted");
}
- if (!beIdsInCluster.contains(beId)) {
- return false;
+ if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) {
+ return -1L;
}
// check if be has idle slot
readLock();
try {
- int idleTaskNum = 0;
Map<Long, Integer> beIdToConcurrentTasks =
getBeCurrentTasksNumMap();
- if (beIdToConcurrentTasks.containsKey(beId)) {
- idleTaskNum = beIdToMaxConcurrentTasks.get(beId) -
beIdToConcurrentTasks.get(beId);
- } else {
- idleTaskNum = Config.max_routine_load_task_num_per_be;
+ // 1. Find if the given BE id has available slots
+ if (previoudBeId != -1L) {
+ int idleTaskNum = 0;
+ if (beIdToConcurrentTasks.containsKey(previoudBeId)) {
+ idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) -
beIdToConcurrentTasks.get(previoudBeId);
+ } else {
+ idleTaskNum = Config.max_routine_load_task_num_per_be;
+ }
+ if (idleTaskNum > 0) {
+ return previoudBeId;
+ }
}
- if (idleTaskNum > 0) {
- return true;
+
+ // 2. The given BE id does not have available slots, find a BE
with min tasks
+ updateBeIdToMaxConcurrentTasks();
+ int idleTaskNum = 0;
+ long resultBeId = -1L;
+ int maxIdleSlotNum = 0;
+ for (Long beId : beIdsInCluster) {
Review comment:
This logic is consistent with manager's getMinTaskBeId function logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]