This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 086d09f70e8 branch-3.0: [fix](cloud) fix corner case when warm up data
larger than cache capacity #49050 (#49675)
086d09f70e8 is described below
commit 086d09f70e82e40d2a560636a8049608cd03bfa8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 31 14:40:35 2025 +0800
branch-3.0: [fix](cloud) fix corner case when warm up data larger than
cache capacity #49050 (#49675)
Cherry-picked from #49050
Co-authored-by: zhengyu <[email protected]>
---
.../apache/doris/cloud/CacheHotspotManager.java | 83 +++++++++-----
.../doris/cloud/cache/CacheHotspotManagerTest.java | 124 +++++++++++++++++++++
2 files changed, 182 insertions(+), 25 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index f4c7392eb75..a05518a6ee2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -330,7 +330,7 @@ public class CacheHotspotManager extends MasterDaemon {
return responseList;
}
- private Long getFileCacheCapacity(String clusterName) throws
RuntimeException {
+ Long getFileCacheCapacity(String clusterName) throws RuntimeException {
List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName);
Long totalFileCache = 0L;
@@ -516,56 +516,89 @@ public class CacheHotspotManager extends MasterDaemon {
}
}
- private Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String
dstClusterName,
+ public List<Partition> getPartitionsFromTriple(Triple<String, String,
String> tableTriple) {
+ String dbName = tableTriple.getLeft();
+ String tableName = tableTriple.getMiddle();
+ String partitionName = tableTriple.getRight();
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
+ OlapTable table = (OlapTable) db.getTableNullable(tableName);
+ List<Partition> partitions = new ArrayList<>();
+ if (partitionName.length() != 0) {
+ partitions.add(table.getPartition(partitionName));
+ } else {
+ partitions.addAll(table.getPartitions());
+ }
+ return partitions;
+ }
+
+ public List<Backend> getBackendsFromCluster(String dstClusterName) {
+ return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getBackendsByClusterName(dstClusterName);
+ }
+
+ public Set<Long> getTabletIdsFromBe(long beId) {
+ return ((CloudEnv) Env.getCurrentEnv())
+ .getCloudTabletRebalancer()
+
.getSnapshotTabletsInPrimaryByBeId(beId);
+ }
+
+ public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) {
+ List<Tablet> tablets = new ArrayList<>();
+ for (MaterializedIndex index : indexes) {
+ tablets.addAll(index.getTablets());
+ }
+ return tablets;
+ }
+
+ public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String
dstClusterName,
List<Triple<String, String, String>> tables,
boolean isForce) throws RuntimeException {
Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
Long totalFileCache = getFileCacheCapacity(dstClusterName);
Long warmUpTotalFileCache = 0L;
+ LOG.info("Start warm up job {}, cluster {}, total cache size: {}",
+ jobId, dstClusterName, totalFileCache);
for (Triple<String, String, String> tableTriple : tables) {
if (warmUpTotalFileCache > totalFileCache) {
+ LOG.info("Warm up size {} exceeds total cache size {},
breaking loop",
+ warmUpTotalFileCache, totalFileCache);
break;
}
- String dbName = tableTriple.getLeft();
- String tableName = tableTriple.getMiddle();
- String partitionName = tableTriple.getRight();
- Database db =
Env.getCurrentInternalCatalog().getDbNullable(dbName);
- OlapTable table = (OlapTable) db.getTableNullable(tableName);
- List<Partition> partitions = new ArrayList<>();
- if (partitionName.length() != 0) {
- partitions.add(table.getPartition(partitionName));
- } else {
- partitions.addAll(table.getPartitions());
- }
- List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
-
.getBackendsByClusterName(dstClusterName);
+
+ List<Partition> partitions = getPartitionsFromTriple(tableTriple);
+ LOG.info("Got {} partitions for table {}.{}.{}", partitions.size(),
+ tableTriple.getLeft(), tableTriple.getMiddle(),
tableTriple.getRight());
+ List<Backend> backends = getBackendsFromCluster(dstClusterName);
+ LOG.info("Got {} backends for cluster {}", backends.size(),
dstClusterName);
List<Partition> warmUpPartitions = new ArrayList<>();
for (Partition partition : partitions) {
Long partitionSize = partition.getDataSize(true);
- if ((warmUpTotalFileCache + partitionSize) > totalFileCache) {
- break;
- }
warmUpTotalFileCache += partitionSize;
warmUpPartitions.add(partition);
+ if (warmUpTotalFileCache > totalFileCache) {
+ LOG.info("Warm up size {} exceeds total cache size {},
current partition size {}",
+ warmUpTotalFileCache, totalFileCache,
partitionSize);
+ break;
+ }
}
List<MaterializedIndex> indexes = new ArrayList<>();
for (Partition partition : warmUpPartitions) {
indexes.addAll(partition.getMaterializedIndices(IndexExtState.VISIBLE));
}
- List<Tablet> tablets = new ArrayList<>();
- for (MaterializedIndex index : indexes) {
- tablets.addAll(index.getTablets());
- }
+ LOG.info("Got {} materialized indexes for table {}.{}.{}",
indexes.size(),
+ tableTriple.getLeft(), tableTriple.getMiddle(),
tableTriple.getRight());
+ List<Tablet> tablets = getTabletsFromIndexs(indexes);
+ LOG.info("Got {} tablets for table {}.{}.{}", tablets.size(),
+ tableTriple.getLeft(), tableTriple.getMiddle(),
tableTriple.getRight());
for (Backend backend : backends) {
- Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
- .getCloudTabletRebalancer()
-
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
+ Set<Long> beTabletIds = getTabletIdsFromBe(backend.getId());
List<Tablet> warmUpTablets = new ArrayList<>();
for (Tablet tablet : tablets) {
if (beTabletIds.contains(tablet.getId())) {
warmUpTablets.add(tablet);
}
}
+ LOG.info("Assigning {} tablets to backend {}",
warmUpTablets.size(), backend.getId());
beToWarmUpTablets.computeIfAbsent(backend.getId(),
k -> new ArrayList<>()).addAll(warmUpTablets);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
new file mode 100644
index 00000000000..ff42ea31bcb
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.cache;
+
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.CacheHotspotManager;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.system.Backend;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.commons.lang3.tuple.Triple;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CacheHotspotManagerTest {
+ private CacheHotspotManager cacheHotspotManager;
+ private CloudSystemInfoService cloudSystemInfoService;
+ private Partition partition;
+
+ @Test
+ public void testWarmUpNewClusterByTable() {
+ partition = new Partition(0, null, null, null);
+ new MockUp<Partition>() {
+
+ @Mock
+ public long getDataSize(boolean singleReplica) {
+ return 10000000L;
+ }
+
+ @Mock
+ public List<MaterializedIndex>
getMaterializedIndices(IndexExtState extState) {
+ List<MaterializedIndex> list = new ArrayList<>();
+ MaterializedIndex ind = new MaterializedIndex();
+ list.add(ind);
+ return list;
+ }
+ };
+
+ cloudSystemInfoService = new CloudSystemInfoService();
+ cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService);
+ new MockUp<CacheHotspotManager>() {
+
+ @Mock
+ Long getFileCacheCapacity(String clusterName) throws
RuntimeException {
+ return 100L;
+ }
+
+ @Mock
+ List<Partition> getPartitionsFromTriple(Triple<String, String,
String> tableTriple) {
+ List<Partition> partitions = new ArrayList<>();
+ partition = new Partition(1, "p1", null, null);
+ partitions.add(partition);
+ return partitions;
+ }
+
+ @Mock
+ List<Backend> getBackendsFromCluster(String dstClusterName) {
+ List<Backend> backends = new ArrayList<>();
+ Backend backend = new Backend(11, dstClusterName, 0);
+ backends.add(backend);
+ return backends;
+ }
+
+ @Mock
+ public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex>
indexes) {
+ List<Tablet> list = new ArrayList<>();
+ Tablet tablet = new Tablet(1001L);
+ list.add(tablet);
+ return list;
+ }
+
+ @Mock
+ Set<Long> getTabletIdsFromBe(long beId) {
+ Set<Long> tabletIds = new HashSet<Long>();
+ tabletIds.add(1001L);
+ return tabletIds;
+ }
+ };
+
+ // Setup mock data
+ long jobId = 1L;
+ String dstClusterName = "test_cluster";
+ List<Triple<String, String, String>> tables = new ArrayList<>();
+ tables.add(Triple.of("test_db", "test_table", ""));
+
+
+ // force = true
+ Map<Long, List<Tablet>> result =
cacheHotspotManager.warmUpNewClusterByTable(
+ jobId, dstClusterName, tables, true);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(11L).get(0).getId(), 1001L);
+
+ // force = false
+ RuntimeException exception =
Assert.assertThrows(RuntimeException.class, () -> {
+ cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName,
tables, false);
+ });
+ Assert.assertEquals("The cluster " + dstClusterName + " cache size is
not enough", exception.getMessage());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]