This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ca4bd831b69d7a13d1fd5f6289cdca5460e9b981 Author: deardeng <565620...@qq.com> AuthorDate: Mon Sep 2 00:29:06 2024 +0800 [fix](cloud) Fix migrate tablets between backends back and forth (#39792) BUG: cloud rebalancer migrates tablets back and forth: move from A to B, then B to A, then A to B, ... The reason is that the tabletToInfightTask map tracking in-flight tasks ignored the multi-cluster scenario, and in the statRouteInfo function, the cluster information was lost, which led to inaccurate tablets statistics. --- be/src/cloud/cloud_backend_service.cpp | 5 + .../doris/cloud/catalog/CloudTabletRebalancer.java | 95 +++++++++----- .../multi_cluster/test_warmup_rebalance.groovy | 137 +++++++++++++++++++++ 3 files changed, 205 insertions(+), 32 deletions(-) diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index d91e9e416b8..2dc6d03ebf6 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -180,6 +180,11 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon const TCheckWarmUpCacheAsyncRequest& request) { std::map<int64_t, bool> task_done; _engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done); + DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", { + for (auto& it : task_done) { + it.second = false; + } + }); response.__set_task_done(task_done); Status st = Status::OK(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 73ddbe4c455..fc580c4fc7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -46,6 +46,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncRequest; import org.apache.doris.thrift.TWarmUpCacheAsyncResponse; import com.google.common.base.Preconditions; +import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,6 +55,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -94,7 +96,7 @@ public class CloudTabletRebalancer extends MasterDaemon { private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new LinkedBlockingQueue<Pair<Long, Long>>(); - private Map<Long, InfightTask> tabletToInfightTask = new HashMap<Long, InfightTask>(); + private Map<InfightTablet, InfightTask> tabletToInfightTask = new HashMap<>(); private long assignedErrNum = 0; @@ -115,12 +117,39 @@ public class CloudTabletRebalancer extends MasterDaemon { PARTITION } + @Getter + private class InfightTablet { + private final Long tabletId; + private final String clusterId; + + public InfightTablet(Long tabletId, String clusterId) { + this.tabletId = tabletId; + this.clusterId = clusterId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InfightTablet that = (InfightTablet) o; + return tabletId.equals(that.tabletId) && clusterId.equals(that.clusterId); + } + + @Override + public int hashCode() { + return Objects.hash(tabletId, clusterId); + } + } + private class InfightTask { public Tablet pickedTablet; public long srcBe; public long destBe; public boolean isGlobal; - public String clusterId; public Map<Long, List<Tablet>> beToTablets; public long startTimestamp; BalanceType balanceType; @@ -343,41 +372,44 @@ public class CloudTabletRebalancer extends MasterDaemon { } public void checkInflghtWarmUpCacheAsync() { - Map<Long, List<Long>> beToTabletIds = new HashMap<Long, List<Long>>(); + Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, List<InfightTask>>(); - for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) { - beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<Long>()); - beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); + for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) { + beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>()); + beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue()); } List<UpdateCloudReplicaInfo> infos = new ArrayList<>(); - for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) { + for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) { LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey()); if (destBackend == null) { - for (long tabletId : entry.getValue()) { - tabletToInfightTask.remove(tabletId); + for (InfightTask task : entry.getValue()) { + for (InfightTablet key : tabletToInfightTask.keySet()) { + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId)); + } } continue; } - - Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey()); + List<Long> tablets = entry.getValue().stream() + .map(task -> task.pickedTablet.getId()).collect(Collectors.toList()); + Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey()); if (taskDone == null) { LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}", entry.getKey(), entry.getValue()); continue; } - + String clusterId = cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId(); for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) { - InfightTask task = tabletToInfightTask.get(result.getKey()); - if (result.getValue() - || System.currentTimeMillis() / 1000 - task.startTimestamp - > Config.cloud_pre_heating_time_limit_sec) { + InfightTask task = tabletToInfightTask + .getOrDefault(new InfightTablet(result.getKey(), clusterId), null); + if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp + > Config.cloud_pre_heating_time_limit_sec)) { if (!result.getValue()) { LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); } - updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos); - tabletToInfightTask.remove(result.getKey()); + updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); } } } @@ -393,13 +425,13 @@ public class CloudTabletRebalancer extends MasterDaemon { } // recalculate inflight beToTablets, just for print the log - beToTabletIds = new HashMap<Long, List<Long>>(); - for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) { - beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<Long>()); - beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); + beToInfightTasks.clear(); + for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) { + beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>()); + beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue()); } - for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) { + for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) { LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); } } @@ -449,7 +481,7 @@ public class CloudTabletRebalancer extends MasterDaemon { } LOG.info("notify decommission response: {} ", response); } catch (RpcException e) { - LOG.info("failed to notify decommission {}", e); + LOG.info("failed to notify decommission", e); return; } beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000); @@ -552,8 +584,10 @@ public class CloudTabletRebalancer extends MasterDaemon { fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet, tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets); - if (tabletToInfightTask.containsKey(tablet.getId())) { - InfightTask task = tabletToInfightTask.get(tablet.getId()); + InfightTask task = tabletToInfightTask + .getOrDefault(new InfightTablet(tablet.getId(), cluster), null); + + if (task != null) { fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet, futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); } else { @@ -808,9 +842,7 @@ public class CloudTabletRebalancer extends MasterDaemon { List<Tablet> destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) .get(cloudReplica.getIndexId()).get(destBe); long minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); - if (minBeSize >= maxBeSize) { - return true; - } + return minBeSize >= maxBeSize; } return false; @@ -881,10 +913,9 @@ public class CloudTabletRebalancer extends MasterDaemon { task.srcBe = srcBe; task.destBe = destBe; task.balanceType = balanceType; - task.clusterId = clusterId; task.beToTablets = beToTablets; task.startTimestamp = System.currentTimeMillis() / 1000; - tabletToInfightTask.put(pickedTablet.getId(), task); + tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task); LOG.info("pre cache {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}", pickedTablet.getId(), srcBe, destBe, clusterId, @@ -936,7 +967,7 @@ public class CloudTabletRebalancer extends MasterDaemon { CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0); Backend be = cloudSystemInfoService.getBackend(srcBe); if (be == null) { - LOG.info("backend {} not found", be); + LOG.info("src backend {} not found", srcBe); continue; } String clusterId = be.getCloudClusterId(); diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy new file mode 100644 index 00000000000..7e12d9f81f8 --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite('test_warmup_rebalance_in_cloud', 'multi_cluster') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'enable_cloud_warm_up_for_rebalance=true', + 'cloud_tablet_rebalancer_interval_second=1', + 'cloud_balance_tablet_percent_per_run=0.5', + 'sys_log_verbose_modules=org', + 'cloud_pre_heating_time_limit_sec=600' + ] + options.setFeNum(2) + options.setBeNum(3) + options.cloudMode = true + options.enableDebugPoints() + def check = { String feLogPath -> + log.info("search fe log path: {}", feLogPath) + Map<String, List<String>> circularRebalanceMap = [:] + boolean isCircularRebalanceDetected = false + + new File(feLogPath).text.tokenize('\n') + .findAll { it =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/ } + .each { line -> + def (tabletId, fromBe, toBe, clusterId) = (line =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/)[0][1..-1] + + String clusterPreCacheKey = "$clusterId-$tabletId" + + if (!circularRebalanceMap.containsKey(clusterPreCacheKey)) { + circularRebalanceMap[clusterPreCacheKey] = new ArrayList<>() + } + + List<String> paths = circularRebalanceMap[clusterPreCacheKey] + + if (paths.contains(toBe)) { + isCircularRebalanceDetected = true + log.info("Circular rebalance detected for tabletId: {}, clusterId: {}", tabletId, clusterId) + assertFalse(true) + } + + paths << fromBe + circularRebalanceMap[clusterPreCacheKey] = paths + + if (!paths.contains(toBe)) { + paths << (toBe as String) + } + } + + if (!isCircularRebalanceDetected) { + log.info("No circular rebalance detected.") + } + } + + docker(options) { + def clusterName = "newcluster1" + // 添加一个新的cluster add_new_cluster + cluster.addBackend(2, clusterName) + + def ret = sql_return_maparray """show clusters""" + log.info("show clusters: {}", ret) + assertEquals(2, ret.size()) + + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + sql """set global forward_to_master=false""" + + sql """ + CREATE TABLE table100 ( + class INT, + id INT, + score INT SUM + ) + AGGREGATE KEY(class, id) + DISTRIBUTED BY HASH(class) BUCKETS 48 + """ + + sql """ + INSERT INTO table100 VALUES (1, 1, 100); + """ + + dockerAwaitUntil(5) { + ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" + log.info("replica distribution table100: {}", ret) + ret.size() == 5 + } + + sql """use @newcluster1""" + def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """ + assertEquals(5, result.size()) + int replicaNum = 0 + + for (def row : result) { + log.info("replica distribution: ${row} ".toString()) + if (row.CloudClusterName == "newcluster1") { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + assertTrue(replicaNum <= 25 && replicaNum >= 23) + } + } + def fe1 = cluster.getFeByIndex(1) + String feLogPath = fe1.getLogFilePath() + // stop be id 1, 4 + cluster.stopBackends(1, 4) + // check log + sleep(10 * 1000) + check feLogPath + + // start be id 1, 4 + cluster.startBackends(1, 4) + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + // check log + sleep(10 * 1000) + check feLogPath + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org