This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 77765b23b99 [improvement](partition rebalance) improve partition rebalance choose candidate speed #36509 (#36978) 77765b23b99 is described below commit 77765b23b9967924b25f68851916332b9e0d1e28 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Fri Jun 28 16:31:16 2024 +0800 [improvement](partition rebalance) improve partition rebalance choose candidate speed #36509 (#36978) cherry pick from #36509 --- .../apache/doris/clone/PartitionRebalancer.java | 78 +++++++++++++++------- .../java/org/apache/doris/clone/Rebalancer.java | 3 + .../org/apache/doris/clone/TabletScheduler.java | 13 ++-- .../java/org/apache/doris/clone/PathSlotTest.java | 5 +- .../doris/cluster/DecommissionBackendTest.java | 1 - 5 files changed, 68 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index fd70e5116f2..25d85822edb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -29,17 +29,20 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.common.collect.TreeMultimap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.security.SecureRandom; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiPredicate; import java.util.stream.Collectors; /* @@ -115,40 +118,64 @@ public class PartitionRebalancer extends Rebalancer { = algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection); List<TabletSchedCtx> alternativeTablets = Lists.newArrayList(); - List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList()); + Set<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toSet()); + Random rand = new SecureRandom(); for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) { // Find all tablets of the specified partition that would have a replica at the source be, // but would not have a replica at the destination be. That is to satisfy the restriction // of having no more than one replica of the same tablet per be. List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium); - List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium); - tabletIds.removeAll(invalidIds); - // In-progress tablets can't be the candidate too. - tabletIds.removeAll(inProgressIds); + if (tabletIds.isEmpty()) { + continue; + } + + Set<Long> invalidIds = Sets.newHashSet( + invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium)); + + BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> { + return tabletMeta != null + && tabletMeta.getPartitionId() == move.partitionId + && tabletMeta.getIndexId() == move.indexId + && !invalidIds.contains(tabletId) + && !inProgressIds.contains(tabletId); + }; - Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap(); - for (long tabletId : tabletIds) { + // Random pick one candidate to create tabletSchedCtx + int startIdx = rand.nextInt(tabletIds.size()); + long pickedTabletId = -1L; + TabletMeta pickedTabletMeta = null; + for (int i = startIdx; i < tabletIds.size(); i++) { + long tabletId = tabletIds.get(i); TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId - && tabletMeta.getIndexId() == move.indexId) { - tabletCandidates.put(tabletId, tabletMeta); + if (canMoveTablet.test(tabletId, tabletMeta)) { + pickedTabletId = tabletId; + pickedTabletMeta = tabletMeta; + break; } } - LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move); - if (tabletCandidates.isEmpty()) { - continue; + + if (pickedTabletId == -1L) { + for (int i = 0; i < startIdx; i++) { + long tabletId = tabletIds.get(i); + TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); + if (canMoveTablet.test(tabletId, tabletMeta)) { + pickedTabletId = tabletId; + pickedTabletMeta = tabletMeta; + break; + } + } } - // Random pick one candidate to create tabletSchedCtx - Random rand = new Random(); - Object[] keys = tabletCandidates.keySet().toArray(); - long pickedTabletId = (long) keys[rand.nextInt(keys.length)]; - LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId); + if (pickedTabletId == -1L) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cann't picked tablet id for move {}", move); + } + continue; + } - TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId); TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, - tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), - tabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/, + pickedTabletMeta.getDbId(), pickedTabletMeta.getTableId(), pickedTabletMeta.getPartitionId(), + pickedTabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/, System.currentTimeMillis()); tabletCtx.setTag(clusterStat.getTag()); // Balance task's priority is always LOW @@ -268,7 +295,7 @@ public class PartitionRebalancer extends Rebalancer { List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium() && path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK) .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList()); - long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath); + long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getStorageMedium()); if (pathHash == -1) { throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT, "paths has no available balance slot: " + availPath); @@ -315,6 +342,11 @@ public class PartitionRebalancer extends Rebalancer { } } + @Override + public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) { + movesCacheMap.invalidateTablet(tabletCtx); + } + @Override public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) { super.updateLoadStatistic(statisticMap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index def9e18a7e3..b7d1e06a5ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -127,6 +127,9 @@ public abstract class Rebalancer { return -1L; } + public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) { + } + public void onTabletFailed(TabletSchedCtx tabletCtx) { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 715739310bd..5ecbeb89d39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -975,7 +975,9 @@ public class TabletScheduler extends MasterDaemon { if (chosenReplica == null) { return false; } + deleteReplicaInternal(tabletCtx, chosenReplica, "src replica of rebalance", force); + rebalancer.invalidateToDeleteReplicaId(tabletCtx); return true; } @@ -1962,11 +1964,10 @@ public class TabletScheduler extends MasterDaemon { private Map<Long, Slot> pathSlots = Maps.newConcurrentMap(); private long beId; // only use in takeAnAvailBalanceSlotFrom, make pick RR - private long lastPickPathHash; + private Map<TStorageMedium, Long> lastPickPathHashs = Maps.newHashMap(); public PathSlot(Map<Long, TStorageMedium> paths, long beId) { this.beId = beId; - this.lastPickPathHash = -1; for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) { pathSlots.put(entry.getKey(), new Slot(entry.getValue())); } @@ -2109,14 +2110,14 @@ public class TabletScheduler extends MasterDaemon { return -1; } - public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) { + public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, TStorageMedium medium) { if (pathHashs.isEmpty()) { return -1; } Collections.sort(pathHashs); synchronized (this) { - int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1; + int preferSlotIndex = pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1; if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) { preferSlotIndex = 0; } @@ -2124,14 +2125,14 @@ public class TabletScheduler extends MasterDaemon { for (int i = preferSlotIndex; i < pathHashs.size(); i++) { long pathHash = pathHashs.get(i); if (takeBalanceSlot(pathHash) != -1) { - lastPickPathHash = pathHash; + lastPickPathHashs.put(medium, pathHash); return pathHash; } } for (int i = 0; i < preferSlotIndex; i++) { long pathHash = pathHashs.get(i); if (takeBalanceSlot(pathHash) != -1) { - lastPickPathHash = pathHash; + lastPickPathHashs.put(medium, pathHash); return pathHash; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java index e26e3042fb8..99d49ceb30c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java @@ -39,10 +39,11 @@ class PathSlotTest { List<Long> availPathHashs = Lists.newArrayList(); List<Long> expectPathHashs = Lists.newArrayList(); List<Long> gotPathHashs = Lists.newArrayList(); + TStorageMedium medium = TStorageMedium.HDD; long startPath = 10001L; long endPath = 10006L; for (long pathHash = startPath; pathHash < endPath; pathHash++) { - paths.put(pathHash, TStorageMedium.HDD); + paths.put(pathHash, medium); availPathHashs.add(pathHash); expectPathHashs.add(pathHash); } @@ -56,7 +57,7 @@ class PathSlotTest { PathSlot ps = new PathSlot(paths, 1L); for (int i = 0; i < expectPathHashs.size(); i++) { Collections.shuffle(availPathHashs); - gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs)); + gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs, medium)); } Assert.assertEquals(expectPathHashs, gotPathHashs); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 79216f28c40..b917bc41a7e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -49,7 +49,6 @@ public class DecommissionBackendTest extends TestWithFeService { @Override protected void beforeCluster() { FeConstants.runningUnitTest = true; - needCleanDir = false; } @BeforeAll --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org