This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit a44f304826ef1d0a1e134bb8734446133405016c Author: Henry2SS <45096548+henry...@users.noreply.github.com> AuthorDate: Sat Apr 9 19:02:55 2022 +0800 [fix](routine load) Routine load task doesn't reallocate when previous BE is down. (#8824) if previous be is not alive, should assigned another available BE instead. --- .../doris/load/routineload/RoutineLoadManager.java | 36 ++++++++++++---------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index ed6e2eaf81..2d6c13adab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -41,6 +41,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -410,36 +411,39 @@ public class RoutineLoadManager implements Writable { // check if the specified BE is available for running task // return true if it is available. return false if otherwise. // throw exception if unrecoverable errors happen. - public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException { + public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException { List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true); if (beIdsInCluster == null) { throw new LoadException("The " + clusterName + " has been deleted"); } - if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) { - return -1L; - } - // check if be has idle slot readLock(); try { Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap(); + // 1. Find if the given BE id has available slots - if (previoudBeId != -1L) { - int idleTaskNum = 0; - if (!beIdToMaxConcurrentTasks.containsKey(previoudBeId)) { - idleTaskNum = 0; - } else if (beIdToConcurrentTasks.containsKey(previoudBeId)) { - idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId); - } else { - idleTaskNum = Config.max_routine_load_task_num_per_be; - } - if (idleTaskNum > 0) { - return previoudBeId; + if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) { + // get the previousBackend info + Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId); + // check previousBackend is not null && load available + if (previousBackend != null && previousBackend.isLoadAvailable()) { + int idleTaskNum = 0; + if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) { + idleTaskNum = 0; + } else if (beIdToConcurrentTasks.containsKey(previousBeId)) { + idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) - beIdToConcurrentTasks.get(previousBeId); + } else { + idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId); + } + if (idleTaskNum > 0) { + return previousBeId; + } } } // 2. The given BE id does not have available slots, find a BE with min tasks + // 3. The previos BE is not in cluster && is not load available, find a new BE with min tasks int idleTaskNum = 0; long resultBeId = -1L; int maxIdleSlotNum = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org