This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ca4bd831b69d7a13d1fd5f6289cdca5460e9b981
Author: deardeng <565620...@qq.com>
AuthorDate: Mon Sep 2 00:29:06 2024 +0800

    [fix](cloud) Fix migrate tablets between backends back and forth (#39792)
    
    BUG: cloud rebalancer migrates tablets back and forth: move from A to B,
    then B to A, then A to B, ...
    
    The reason is that the tabletToInfightTask map tracking in-flight tasks
    ignored the multi-cluster scenario, and in the statRouteInfo function,
    the cluster information was lost, which led to inaccurate tablets
    statistics.
---
 be/src/cloud/cloud_backend_service.cpp             |   5 +
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  95 +++++++++-----
 .../multi_cluster/test_warmup_rebalance.groovy     | 137 +++++++++++++++++++++
 3 files changed, 205 insertions(+), 32 deletions(-)

diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
index d91e9e416b8..2dc6d03ebf6 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -180,6 +180,11 @@ void 
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
                                                     const 
TCheckWarmUpCacheAsyncRequest& request) {
     std::map<int64_t, bool> task_done;
     _engine.file_cache_block_downloader().check_download_task(request.tablets, 
&task_done);
+    
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false",
 {
+        for (auto& it : task_done) {
+            it.second = false;
+        }
+    });
     response.__set_task_done(task_done);
 
     Status st = Status::OK();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 73ddbe4c455..fc580c4fc7e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -46,6 +46,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
 import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
 
 import com.google.common.base.Preconditions;
+import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -54,6 +55,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -94,7 +96,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new 
LinkedBlockingQueue<Pair<Long, Long>>();
 
-    private Map<Long, InfightTask> tabletToInfightTask = new HashMap<Long, 
InfightTask>();
+    private Map<InfightTablet, InfightTask> tabletToInfightTask = new 
HashMap<>();
 
     private long assignedErrNum = 0;
 
@@ -115,12 +117,39 @@ public class CloudTabletRebalancer extends MasterDaemon {
         PARTITION
     }
 
+    @Getter
+    private class InfightTablet {
+        private final Long tabletId;
+        private final String clusterId;
+
+        public InfightTablet(Long tabletId, String clusterId) {
+            this.tabletId = tabletId;
+            this.clusterId = clusterId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            InfightTablet that = (InfightTablet) o;
+            return tabletId.equals(that.tabletId) && 
clusterId.equals(that.clusterId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tabletId, clusterId);
+        }
+    }
+
     private class InfightTask {
         public Tablet pickedTablet;
         public long srcBe;
         public long destBe;
         public boolean isGlobal;
-        public String clusterId;
         public Map<Long, List<Tablet>> beToTablets;
         public long startTimestamp;
         BalanceType balanceType;
@@ -343,41 +372,44 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     public void checkInflghtWarmUpCacheAsync() {
-        Map<Long, List<Long>> beToTabletIds = new HashMap<Long, List<Long>>();
+        Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, 
List<InfightTask>>();
 
-        for (Map.Entry<Long, InfightTask> entry : 
tabletToInfightTask.entrySet()) {
-            beToTabletIds.putIfAbsent(entry.getValue().destBe, new 
ArrayList<Long>());
-            
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
+        for (Map.Entry<InfightTablet, InfightTask> entry : 
tabletToInfightTask.entrySet()) {
+            beToInfightTasks.putIfAbsent(entry.getValue().destBe, new 
ArrayList<>());
+            
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
         }
 
         List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
-        for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
+        for (Map.Entry<Long, List<InfightTask>> entry : 
beToInfightTasks.entrySet()) {
             LOG.info("before pre cache check dest be {} inflight task num {}", 
entry.getKey(), entry.getValue().size());
             Backend destBackend = 
cloudSystemInfoService.getBackend(entry.getKey());
             if (destBackend == null) {
-                for (long tabletId : entry.getValue()) {
-                    tabletToInfightTask.remove(tabletId);
+                for (InfightTask task : entry.getValue()) {
+                    for (InfightTablet key : tabletToInfightTask.keySet()) {
+                        tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+                    }
                 }
                 continue;
             }
-
-            Map<Long, Boolean> taskDone = 
sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey());
+            List<Long> tablets = entry.getValue().stream()
+                    .map(task -> 
task.pickedTablet.getId()).collect(Collectors.toList());
+            Map<Long, Boolean> taskDone = 
sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
             if (taskDone == null) {
                 LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, 
inFight tasks {}",
                         entry.getKey(), entry.getValue());
                 continue;
             }
-
+            String clusterId = 
cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId();
             for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
-                InfightTask task = tabletToInfightTask.get(result.getKey());
-                if (result.getValue()
-                        || System.currentTimeMillis() / 1000 - 
task.startTimestamp
-                            > Config.cloud_pre_heating_time_limit_sec) {
+                InfightTask task = tabletToInfightTask
+                        .getOrDefault(new InfightTablet(result.getKey(), 
clusterId), null);
+                if (task != null && (result.getValue() || 
System.currentTimeMillis() / 1000 - task.startTimestamp
+                            > Config.cloud_pre_heating_time_limit_sec)) {
                     if (!result.getValue()) {
                         LOG.info("{} pre cache timeout, forced to change the 
mapping", result.getKey());
                     }
-                    updateClusterToBeMap(task.pickedTablet, task.destBe, 
task.clusterId, infos);
-                    tabletToInfightTask.remove(result.getKey());
+                    updateClusterToBeMap(task.pickedTablet, task.destBe, 
clusterId, infos);
+                    tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), clusterId));
                 }
             }
         }
@@ -393,13 +425,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
         }
 
         // recalculate inflight beToTablets, just for print the log
-        beToTabletIds = new HashMap<Long, List<Long>>();
-        for (Map.Entry<Long, InfightTask> entry : 
tabletToInfightTask.entrySet()) {
-            beToTabletIds.putIfAbsent(entry.getValue().destBe, new 
ArrayList<Long>());
-            
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
+        beToInfightTasks.clear();
+        for (Map.Entry<InfightTablet, InfightTask> entry : 
tabletToInfightTask.entrySet()) {
+            beToInfightTasks.putIfAbsent(entry.getValue().destBe, new 
ArrayList<>());
+            
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
         }
 
-        for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
+        for (Map.Entry<Long, List<InfightTask>> entry : 
beToInfightTasks.entrySet()) {
             LOG.info("after pre cache check dest be {} inflight task num {}", 
entry.getKey(), entry.getValue().size());
         }
     }
@@ -449,7 +481,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                             }
                             LOG.info("notify decommission response: {} ", 
response);
                         } catch (RpcException e) {
-                            LOG.info("failed to notify decommission {}", e);
+                            LOG.info("failed to notify decommission", e);
                             return;
                         }
                         beToDecommissionedTime.put(beId, 
System.currentTimeMillis() / 1000);
@@ -552,8 +584,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
                         fillBeToTablets(bes.get(0), table.getId(), 
partition.getId(), index.getId(), tablet,
                                 tmpBeToTabletsGlobal, beToTabletsInTable, 
this.partitionToTablets);
 
-                        if (tabletToInfightTask.containsKey(tablet.getId())) {
-                            InfightTask task = 
tabletToInfightTask.get(tablet.getId());
+                        InfightTask task = tabletToInfightTask
+                                .getOrDefault(new 
InfightTablet(tablet.getId(), cluster), null);
+
+                        if (task != null) {
                             fillBeToTablets(task.destBe, table.getId(), 
partition.getId(), index.getId(), tablet,
                                     futureBeToTabletsGlobal, 
futureBeToTabletsInTable, futurePartitionToTablets);
                         } else {
@@ -808,9 +842,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             List<Tablet> destBeTablets = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
                     .get(cloudReplica.getIndexId()).get(destBe);
             long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
-            if (minBeSize >= maxBeSize) {
-                return true;
-            }
+            return minBeSize >= maxBeSize;
         }
 
         return false;
@@ -881,10 +913,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 task.srcBe = srcBe;
                 task.destBe = destBe;
                 task.balanceType = balanceType;
-                task.clusterId = clusterId;
                 task.beToTablets = beToTablets;
                 task.startTimestamp = System.currentTimeMillis() / 1000;
-                tabletToInfightTask.put(pickedTablet.getId(), task);
+                tabletToInfightTask.put(new 
InfightTablet(pickedTablet.getId(), clusterId), task);
 
                 LOG.info("pre cache {} from {} to {}, cluster {} minNum {} 
maxNum {} beNum {} tabletsNum {}, part {}",
                          pickedTablet.getId(), srcBe, destBe, clusterId,
@@ -936,7 +967,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             CloudReplica cloudReplica = (CloudReplica) 
tablet.getReplicas().get(0);
             Backend be = cloudSystemInfoService.getBackend(srcBe);
             if (be == null) {
-                LOG.info("backend {} not found", be);
+                LOG.info("src backend {} not found", srcBe);
                 continue;
             }
             String clusterId = be.getCloudClusterId();
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
new file mode 100644
index 00000000000..7e12d9f81f8
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_warmup_rebalance_in_cloud', 'multi_cluster') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'enable_cloud_warm_up_for_rebalance=true',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'cloud_balance_tablet_percent_per_run=0.5',
+        'sys_log_verbose_modules=org',
+        'cloud_pre_heating_time_limit_sec=600'
+    ]
+    options.setFeNum(2)
+    options.setBeNum(3)
+    options.cloudMode = true
+    options.enableDebugPoints()
+    def check = { String feLogPath ->
+        log.info("search fe log path: {}", feLogPath)
+        Map<String, List<String>> circularRebalanceMap = [:]
+        boolean isCircularRebalanceDetected = false
+
+        new File(feLogPath).text.tokenize('\n')
+        .findAll { it =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), 
cluster ([a-zA-Z0-9_]+)/ }
+        .each { line ->
+            def (tabletId, fromBe, toBe, clusterId) = (line =~ /pre cache 
([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/)[0][1..-1]
+
+            String clusterPreCacheKey = "$clusterId-$tabletId"
+
+            if (!circularRebalanceMap.containsKey(clusterPreCacheKey)) {
+                circularRebalanceMap[clusterPreCacheKey] = new ArrayList<>()
+            }
+
+            List<String> paths = circularRebalanceMap[clusterPreCacheKey]
+
+            if (paths.contains(toBe)) {
+                isCircularRebalanceDetected = true
+                log.info("Circular rebalance detected for tabletId: {}, 
clusterId: {}", tabletId, clusterId)
+                assertFalse(true)
+            }
+
+            paths << fromBe
+            circularRebalanceMap[clusterPreCacheKey] = paths
+
+            if (!paths.contains(toBe)) {
+                paths << (toBe as String)
+            }
+        }
+
+        if (!isCircularRebalanceDetected) {
+            log.info("No circular rebalance detected.")
+        }
+    }
+
+    docker(options) {
+        def clusterName = "newcluster1"
+        // 添加一个新的cluster add_new_cluster
+        cluster.addBackend(2, clusterName)
+
+        def ret = sql_return_maparray """show clusters"""
+        log.info("show clusters: {}", ret)
+        assertEquals(2, ret.size())
+
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+        sql """set global forward_to_master=false"""
+
+        sql """
+            CREATE TABLE table100 (
+            class INT,
+            id INT,
+            score INT SUM
+            )
+            AGGREGATE KEY(class, id)
+            DISTRIBUTED BY HASH(class) BUCKETS 48
+        """
+
+        sql """
+        INSERT INTO table100 VALUES (1, 1, 100);
+        """
+
+        dockerAwaitUntil(5) {
+            ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
+            log.info("replica distribution table100: {}", ret)
+            ret.size() == 5
+        }
+
+        sql """use @newcluster1"""
+        def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION 
FROM table100; """
+        assertEquals(5, result.size())
+        int replicaNum = 0
+
+        for (def row : result) {
+            log.info("replica distribution: ${row} ".toString())
+            if (row.CloudClusterName == "newcluster1") {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                assertTrue(replicaNum <= 25 && replicaNum >= 23)
+            }
+        }
+        def fe1 = cluster.getFeByIndex(1)
+        String feLogPath = fe1.getLogFilePath()
+        // stop be id 1, 4
+        cluster.stopBackends(1, 4)
+        // check log
+        sleep(10 * 1000)
+        check feLogPath
+
+        // start be id 1, 4
+        cluster.startBackends(1, 4)
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+        // check log
+        sleep(10 * 1000)
+        check feLogPath
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to