This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new f5c058527ce [improvement](balance) partition rebalance chose disk by rr #36826 (#36901) f5c058527ce is described below commit f5c058527cee79ef4518e1e0726b05fcb3fca9f7 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Jun 27 13:37:07 2024 +0800 [improvement](balance) partition rebalance chose disk by rr #36826 (#36901) cherry pick from #36826 --- .../apache/doris/clone/PartitionRebalancer.java | 5 +- .../org/apache/doris/clone/TabletScheduler.java | 48 +++++++++------- .../java/org/apache/doris/clone/PathSlotTest.java | 64 ++++++++++++++++++++++ 3 files changed, 93 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index a6b8bf04b12..fd70e5116f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Random; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -266,9 +265,9 @@ public class PartitionRebalancer extends Rebalancer { Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe); List<RootPathLoadStatistic> paths = beStat.getPathStatistics(); - Set<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium() + List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium() && path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK) - .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet()); + .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList()); long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath); if (pathHash == -1) { throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT, diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 8ae51be7a96..3341f5bb305 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1933,9 +1933,12 @@ public class TabletScheduler extends MasterDaemon { // path hash -> slot num private Map<Long, Slot> pathSlots = Maps.newConcurrentMap(); private long beId; + // only use in takeAnAvailBalanceSlotFrom, make pick RR + private long lastPickPathHash; public PathSlot(Map<Long, TStorageMedium> paths, long beId) { this.beId = beId; + this.lastPickPathHash = -1; for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) { pathSlots.put(entry.getKey(), new Slot(entry.getValue())); } @@ -2046,19 +2049,6 @@ public class TabletScheduler extends MasterDaemon { return num; } - /** - * get path whose balance slot num is larger than 0 - */ - public synchronized Set<Long> getAvailPathsForBalance() { - Set<Long> pathHashs = Sets.newHashSet(); - for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) { - if (entry.getValue().getAvailableBalance() > 0) { - pathHashs.add(entry.getKey()); - } - } - return pathHashs; - } - public synchronized List<List<String>> getSlotInfo(long beId) { List<List<String>> results = Lists.newArrayList(); pathSlots.forEach((key, value) -> { @@ -2091,15 +2081,31 @@ public class TabletScheduler extends MasterDaemon { return -1; } - public synchronized long takeAnAvailBalanceSlotFrom(Set<Long> pathHashs) { - for (Long pathHash : pathHashs) { - Slot slot = pathSlots.get(pathHash); - if (slot == null) { - continue; + public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) { + if (pathHashs.isEmpty()) { + return -1; + } + + Collections.sort(pathHashs); + synchronized (this) { + int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1; + if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) { + preferSlotIndex = 0; } - if (slot.balanceUsed < slot.getBalanceTotal()) { - slot.balanceUsed++; - return pathHash; + + for (int i = preferSlotIndex; i < pathHashs.size(); i++) { + long pathHash = pathHashs.get(i); + if (takeBalanceSlot(pathHash) != -1) { + lastPickPathHash = pathHash; + return pathHash; + } + } + for (int i = 0; i < preferSlotIndex; i++) { + long pathHash = pathHashs.get(i); + if (takeBalanceSlot(pathHash) != -1) { + lastPickPathHash = pathHash; + return pathHash; + } } } return -1; diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java new file mode 100644 index 00000000000..e26e3042fb8 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.common.Config; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +class PathSlotTest { + + @Test + public void test() { + Config.balance_slot_num_per_path = 2; + Map<Long, TStorageMedium> paths = Maps.newHashMap(); + List<Long> availPathHashs = Lists.newArrayList(); + List<Long> expectPathHashs = Lists.newArrayList(); + List<Long> gotPathHashs = Lists.newArrayList(); + long startPath = 10001L; + long endPath = 10006L; + for (long pathHash = startPath; pathHash < endPath; pathHash++) { + paths.put(pathHash, TStorageMedium.HDD); + availPathHashs.add(pathHash); + expectPathHashs.add(pathHash); + } + for (long pathHash = startPath; pathHash < endPath; pathHash++) { + expectPathHashs.add(pathHash); + } + for (long pathHash = startPath; pathHash < endPath; pathHash++) { + expectPathHashs.add(-1L); + } + + PathSlot ps = new PathSlot(paths, 1L); + for (int i = 0; i < expectPathHashs.size(); i++) { + Collections.shuffle(availPathHashs); + gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs)); + } + Assert.assertEquals(expectPathHashs, gotPathHashs); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org