This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 05605d99a982d70aaf8c07bea673e86089677ef0 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Fri May 17 11:49:22 2024 +0800 [opt](routine-load) optimize routine load task allocation algorithm (#34778) --- .../doris/load/routineload/RoutineLoadManager.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 7eb7806f515..356262f8c2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -488,6 +488,7 @@ public class RoutineLoadManager implements Writable { readLock(); try { Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap(); + int previousBeIdleTaskNum = 0; // 1. Find if the given BE id has more than half of available slots if (previousBeId != -1L && availableBeIds.contains(previousBeId)) { @@ -495,22 +496,22 @@ public class RoutineLoadManager implements Writable { Backend previousBackend = Env.getCurrentSystemInfo().getBackend(previousBeId); // check previousBackend is not null && load available if (previousBackend != null && previousBackend.isLoadAvailable()) { - int idleTaskNum = 0; if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) { - idleTaskNum = 0; + previousBeIdleTaskNum = 0; } else if (beIdToConcurrentTasks.containsKey(previousBeId)) { - idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) + previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) - beIdToConcurrentTasks.get(previousBeId); } else { - idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId); + previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId); } - if (idleTaskNum > (Config.max_routine_load_task_num_per_be >> 1)) { + if (previousBeIdleTaskNum == Config.max_routine_load_task_num_per_be) { return previousBeId; } } } - // 2. The given BE id does not have available slots, find a BE with min tasks + // 2. we believe that the benefits of load balance outweigh the benefits of object pool cache, + // so we try to find the one with the most idle slots as much as possible // 3. The previous BE is not in cluster && is not load available, find a new BE with min tasks int idleTaskNum = 0; long resultBeId = -1L; @@ -530,6 +531,11 @@ public class RoutineLoadManager implements Writable { maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); } } + // 4. on the basis of selecting the maximum idle slot be, + // try to reuse the object cache as much as possible + if (previousBeIdleTaskNum == maxIdleSlotNum) { + return previousBeId; + } return resultBeId; } finally { readUnlock(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org