This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 05605d99a982d70aaf8c07bea673e86089677ef0
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri May 17 11:49:22 2024 +0800

    [opt](routine-load) optimize routine load task allocation algorithm (#34778)
---
 .../doris/load/routineload/RoutineLoadManager.java     | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 7eb7806f515..356262f8c2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -488,6 +488,7 @@ public class RoutineLoadManager implements Writable {
         readLock();
         try {
             Map<Long, Integer> beIdToConcurrentTasks = 
getBeCurrentTasksNumMap();
+            int previousBeIdleTaskNum = 0;
 
             // 1. Find if the given BE id has more than half of available slots
             if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
@@ -495,22 +496,22 @@ public class RoutineLoadManager implements Writable {
                 Backend previousBackend = 
Env.getCurrentSystemInfo().getBackend(previousBeId);
                 // check previousBackend is not null && load available
                 if (previousBackend != null && 
previousBackend.isLoadAvailable()) {
-                    int idleTaskNum = 0;
                     if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) {
-                        idleTaskNum = 0;
+                        previousBeIdleTaskNum = 0;
                     } else if 
(beIdToConcurrentTasks.containsKey(previousBeId)) {
-                        idleTaskNum = 
beIdToMaxConcurrentTasks.get(previousBeId)
+                        previousBeIdleTaskNum = 
beIdToMaxConcurrentTasks.get(previousBeId)
                                 - beIdToConcurrentTasks.get(previousBeId);
                     } else {
-                        idleTaskNum = 
beIdToMaxConcurrentTasks.get(previousBeId);
+                        previousBeIdleTaskNum = 
beIdToMaxConcurrentTasks.get(previousBeId);
                     }
-                    if (idleTaskNum > (Config.max_routine_load_task_num_per_be 
>> 1)) {
+                    if (previousBeIdleTaskNum == 
Config.max_routine_load_task_num_per_be) {
                         return previousBeId;
                     }
                 }
             }
 
-            // 2. The given BE id does not have available slots, find a BE 
with min tasks
+            // 2. we believe that the benefits of load balance outweigh the 
benefits of object pool cache,
+            //    so we try to find the one with the most idle slots as much 
as possible
             // 3. The previous BE is not in cluster && is not load available, 
find a new BE with min tasks
             int idleTaskNum = 0;
             long resultBeId = -1L;
@@ -530,6 +531,11 @@ public class RoutineLoadManager implements Writable {
                     maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
                 }
             }
+            // 4. on the basis of selecting the maximum idle slot be,
+            //    try to reuse the object cache as much as possible
+            if (previousBeIdleTaskNum == maxIdleSlotNum) {
+                return previousBeId;
+            }
             return resultBeId;
         } finally {
             readUnlock();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to