This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 77765b23b99 [improvement](partition rebalance) improve partition 
rebalance choose candidate speed #36509 (#36978)
77765b23b99 is described below

commit 77765b23b9967924b25f68851916332b9e0d1e28
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Fri Jun 28 16:31:16 2024 +0800

    [improvement](partition rebalance) improve partition rebalance choose 
candidate speed #36509 (#36978)
    
    cherry pick from #36509
---
 .../apache/doris/clone/PartitionRebalancer.java    | 78 +++++++++++++++-------
 .../java/org/apache/doris/clone/Rebalancer.java    |  3 +
 .../org/apache/doris/clone/TabletScheduler.java    | 13 ++--
 .../java/org/apache/doris/clone/PathSlotTest.java  |  5 +-
 .../doris/cluster/DecommissionBackendTest.java     |  1 -
 5 files changed, 68 insertions(+), 32 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index fd70e5116f2..25d85822edb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -29,17 +29,20 @@ import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.security.SecureRandom;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 
 /*
@@ -115,40 +118,64 @@ public class PartitionRebalancer extends Rebalancer {
                 = algo.getNextMoves(clusterBalanceInfo, 
Config.partition_rebalance_max_moves_num_per_selection);
 
         List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
-        List<Long> inProgressIds = movesInProgressList.stream().map(m -> 
m.tabletId).collect(Collectors.toList());
+        Set<Long> inProgressIds = movesInProgressList.stream().map(m -> 
m.tabletId).collect(Collectors.toSet());
+        Random rand = new SecureRandom();
         for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
             // Find all tablets of the specified partition that would have a 
replica at the source be,
             // but would not have a replica at the destination be. That is to 
satisfy the restriction
             // of having no more than one replica of the same tablet per be.
             List<Long> tabletIds = 
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
-            List<Long> invalidIds = 
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
-            tabletIds.removeAll(invalidIds);
-            // In-progress tablets can't be the candidate too.
-            tabletIds.removeAll(inProgressIds);
+            if (tabletIds.isEmpty()) {
+                continue;
+            }
+
+            Set<Long> invalidIds = Sets.newHashSet(
+                    
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
+
+            BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, 
TabletMeta tabletMeta) -> {
+                return tabletMeta != null
+                        && tabletMeta.getPartitionId() == move.partitionId
+                        && tabletMeta.getIndexId() == move.indexId
+                        && !invalidIds.contains(tabletId)
+                        && !inProgressIds.contains(tabletId);
+            };
 
-            Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
-            for (long tabletId : tabletIds) {
+            // Random pick one candidate to create tabletSchedCtx
+            int startIdx = rand.nextInt(tabletIds.size());
+            long pickedTabletId = -1L;
+            TabletMeta pickedTabletMeta = null;
+            for (int i = startIdx; i < tabletIds.size(); i++) {
+                long tabletId = tabletIds.get(i);
                 TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
-                if (tabletMeta != null && tabletMeta.getPartitionId() == 
move.partitionId
-                        && tabletMeta.getIndexId() == move.indexId) {
-                    tabletCandidates.put(tabletId, tabletMeta);
+                if (canMoveTablet.test(tabletId, tabletMeta)) {
+                    pickedTabletId = tabletId;
+                    pickedTabletMeta = tabletMeta;
+                    break;
                 }
             }
-            LOG.debug("Find {} candidates for move {}", 
tabletCandidates.size(), move);
-            if (tabletCandidates.isEmpty()) {
-                continue;
+
+            if (pickedTabletId == -1L) {
+                for (int i = 0; i < startIdx; i++) {
+                    long tabletId = tabletIds.get(i);
+                    TabletMeta tabletMeta = 
invertedIndex.getTabletMeta(tabletId);
+                    if (canMoveTablet.test(tabletId, tabletMeta)) {
+                        pickedTabletId = tabletId;
+                        pickedTabletMeta = tabletMeta;
+                        break;
+                    }
+                }
             }
 
-            // Random pick one candidate to create tabletSchedCtx
-            Random rand = new Random();
-            Object[] keys = tabletCandidates.keySet().toArray();
-            long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
-            LOG.debug("Picked tablet id for move {}: {}", move, 
pickedTabletId);
+            if (pickedTabletId == -1L) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cann't picked tablet id for move {}", move);
+                }
+                continue;
+            }
 
-            TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
             TabletSchedCtx tabletCtx = new 
TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
-                    tabletMeta.getDbId(), tabletMeta.getTableId(), 
tabletMeta.getPartitionId(),
-                    tabletMeta.getIndexId(), pickedTabletId, null /* replica 
alloc is not used for balance*/,
+                    pickedTabletMeta.getDbId(), pickedTabletMeta.getTableId(), 
pickedTabletMeta.getPartitionId(),
+                    pickedTabletMeta.getIndexId(), pickedTabletId, null /* 
replica alloc is not used for balance*/,
                     System.currentTimeMillis());
             tabletCtx.setTag(clusterStat.getTag());
             // Balance task's priority is always LOW
@@ -268,7 +295,7 @@ public class PartitionRebalancer extends Rebalancer {
             List<Long> availPath = paths.stream().filter(path -> 
path.getStorageMedium() == tabletCtx.getStorageMedium()
                             && path.isFit(tabletCtx.getTabletSize(), false) == 
BalanceStatus.OK)
                     
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
-            long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
+            long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, 
tabletCtx.getStorageMedium());
             if (pathHash == -1) {
                 throw new 
SchedException(SchedException.Status.SCHEDULE_FAILED, 
SchedException.SubCode.WAITING_SLOT,
                         "paths has no available balance slot: " + availPath);
@@ -315,6 +342,11 @@ public class PartitionRebalancer extends Rebalancer {
         }
     }
 
+    @Override
+    public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
+        movesCacheMap.invalidateTablet(tabletCtx);
+    }
+
     @Override
     public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> 
statisticMap) {
         super.updateLoadStatistic(statisticMap);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index def9e18a7e3..b7d1e06a5ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -127,6 +127,9 @@ public abstract class Rebalancer {
         return -1L;
     }
 
+    public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
+    }
+
     public void onTabletFailed(TabletSchedCtx tabletCtx) {
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 715739310bd..5ecbeb89d39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -975,7 +975,9 @@ public class TabletScheduler extends MasterDaemon {
         if (chosenReplica == null) {
             return false;
         }
+
         deleteReplicaInternal(tabletCtx, chosenReplica, "src replica of 
rebalance", force);
+        rebalancer.invalidateToDeleteReplicaId(tabletCtx);
 
         return true;
     }
@@ -1962,11 +1964,10 @@ public class TabletScheduler extends MasterDaemon {
         private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
         private long beId;
         // only use in takeAnAvailBalanceSlotFrom, make pick RR
-        private long lastPickPathHash;
+        private Map<TStorageMedium, Long> lastPickPathHashs = 
Maps.newHashMap();
 
         public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
             this.beId = beId;
-            this.lastPickPathHash = -1;
             for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
                 pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
             }
@@ -2109,14 +2110,14 @@ public class TabletScheduler extends MasterDaemon {
             return -1;
         }
 
-        public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
+        public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, 
TStorageMedium medium) {
             if (pathHashs.isEmpty()) {
                 return -1;
             }
 
             Collections.sort(pathHashs);
             synchronized (this) {
-                int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
+                int preferSlotIndex = 
pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1;
                 if (preferSlotIndex < 0 || preferSlotIndex >= 
pathHashs.size()) {
                     preferSlotIndex = 0;
                 }
@@ -2124,14 +2125,14 @@ public class TabletScheduler extends MasterDaemon {
                 for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
                     long pathHash = pathHashs.get(i);
                     if (takeBalanceSlot(pathHash) != -1) {
-                        lastPickPathHash = pathHash;
+                        lastPickPathHashs.put(medium, pathHash);
                         return pathHash;
                     }
                 }
                 for (int i = 0; i < preferSlotIndex; i++) {
                     long pathHash = pathHashs.get(i);
                     if (takeBalanceSlot(pathHash) != -1) {
-                        lastPickPathHash = pathHash;
+                        lastPickPathHashs.put(medium, pathHash);
                         return pathHash;
                     }
                 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
index e26e3042fb8..99d49ceb30c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
@@ -39,10 +39,11 @@ class PathSlotTest {
         List<Long> availPathHashs = Lists.newArrayList();
         List<Long> expectPathHashs = Lists.newArrayList();
         List<Long> gotPathHashs = Lists.newArrayList();
+        TStorageMedium medium = TStorageMedium.HDD;
         long startPath = 10001L;
         long endPath = 10006L;
         for (long pathHash = startPath; pathHash < endPath; pathHash++) {
-            paths.put(pathHash, TStorageMedium.HDD);
+            paths.put(pathHash, medium);
             availPathHashs.add(pathHash);
             expectPathHashs.add(pathHash);
         }
@@ -56,7 +57,7 @@ class PathSlotTest {
         PathSlot ps = new PathSlot(paths, 1L);
         for (int i = 0; i < expectPathHashs.size(); i++) {
             Collections.shuffle(availPathHashs);
-            gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
+            gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs, 
medium));
         }
         Assert.assertEquals(expectPathHashs, gotPathHashs);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 79216f28c40..b917bc41a7e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -49,7 +49,6 @@ public class DecommissionBackendTest extends 
TestWithFeService {
     @Override
     protected void beforeCluster() {
         FeConstants.runningUnitTest = true;
-        needCleanDir = false;
     }
 
     @BeforeAll


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to