This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 6b31a7db6c5 [branch-2.0](colocate group) fix colocate group always exclude the same host #33823 (#36503) 6b31a7db6c5 is described below commit 6b31a7db6c54c6ea28d7e65604ae74d63d58f6b0 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Jun 20 17:52:19 2024 +0800 [branch-2.0](colocate group) fix colocate group always exclude the same host #33823 (#36503) --- .../clone/ColocateTableCheckerAndBalancer.java | 25 ++++++- .../doris/cluster/DecommissionBackendTest.java | 86 +++++++++++++++++++++- .../apache/doris/utframe/TestWithFeService.java | 6 +- 3 files changed, 110 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 456701213ba..1a085ab8104 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -859,6 +859,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { int targetSeqIndex = -1; long minDataSizeDiff = Long.MAX_VALUE; + boolean destBeContainsAllBuckets = true; + boolean theSameHostContainsAllBuckets = true; for (int seqIndex : seqIndexes) { // the bucket index. // eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0. @@ -866,9 +868,15 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { List<Long> backendsSet = backendsPerBucketSeq.get(bucketIndex); List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex); // the replicas of a tablet can not locate in same Backend or same host - if (backendsSet.contains(destBeId) || hostsSet.contains(destBe.getHost())) { + if (backendsSet.contains(destBeId)) { continue; } + destBeContainsAllBuckets = false; + + if (!Config.allow_replica_on_same_host && hostsSet.contains(destBe.getHost())) { + continue; + } + theSameHostContainsAllBuckets = false; Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId); long bucketDataSize = @@ -895,8 +903,19 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { if (targetSeqIndex < 0) { // we use next node as dst node - LOG.info("unable to replace backend {} with backend {} in colocate group {}", - srcBeId, destBeId, groupId); + String failedReason; + if (destBeContainsAllBuckets) { + failedReason = "dest be contains all the same buckets"; + } else if (theSameHostContainsAllBuckets) { + failedReason = "dest be's host contains all the same buckets " + + "and Config.allow_replica_on_same_host=false"; + } else { + failedReason = "dest be has no fit path, maybe disk usage is exceeds " + + "Config.storage_high_watermark_usage_percent"; + } + LOG.info("unable to replace backend {} with dest backend {} in colocate group {}, " + + "failed reason: {}", + srcBeId, destBeId, groupId, failedReason); continue; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index e689723cdf8..79216f28c40 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -22,6 +22,10 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.clone.RebalancerTestUtil; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -39,7 +43,7 @@ import java.util.List; public class DecommissionBackendTest extends TestWithFeService { @Override protected int backendNum() { - return 3; + return 4; } @Override @@ -56,10 +60,14 @@ public class DecommissionBackendTest extends TestWithFeService { @Override protected void beforeCreatingConnectContext() throws Exception { FeConstants.default_scheduler_interval_millisecond = 1000; - FeConstants.tablet_checker_interval_ms = 1000; + FeConstants.tablet_checker_interval_ms = 100; + FeConstants.tablet_schedule_interval_ms = 100; Config.tablet_repair_delay_factor_second = 1; Config.allow_replica_on_same_host = true; Config.disable_balance = true; + Config.schedule_batch_size = 1000; + Config.schedule_slot_num_per_hdd_path = 1000; + FeConstants.heartbeat_interval_second = 5; } @Test @@ -76,6 +84,7 @@ public class DecommissionBackendTest extends TestWithFeService { // 3. create table tbl1 createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + RebalancerTestUtil.updateReplicaPathHash(); // 4. query tablet num int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); @@ -133,6 +142,7 @@ public class DecommissionBackendTest extends TestWithFeService { createTable("create table db2.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '" + availableBeNum + "');"); createTable("create table db2.tbl2(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + RebalancerTestUtil.updateReplicaPathHash(); // 4. query tablet num int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); @@ -180,9 +190,81 @@ public class DecommissionBackendTest extends TestWithFeService { // recover tbl1, because tbl1 has more than one replica, so it still can be recovered Assertions.assertDoesNotThrow(() -> recoverTable("db2.tbl1")); Assertions.assertDoesNotThrow(() -> showCreateTable(sql)); + dropTable("db2.tbl1", false); addNewBackend(); Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); } + @Test + public void testDecommissionBackendWithColocateGroup() throws Exception { + // 1. create connect context + connectContext = createDefaultCtx(); + + ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); + Assertions.assertEquals(backendNum(), idToBackendRef.size()); + + // 2. create database db1 + createDatabase("db4"); + System.out.println(Env.getCurrentInternalCatalog().getDbNames()); + + // 3. create table + createTable("CREATE TABLE db4.table1 (\n" + + " `c1` varchar(20) NULL,\n" + + " `c2` bigint(20) NULL,\n" + + " `c3` int(20) not NULL,\n" + + " `k4` bitmap BITMAP_UNION NULL,\n" + + " `k5` bitmap BITMAP_UNION NULL\n" + + ") ENGINE=OLAP\n" + + "AGGREGATE KEY(`c1`, `c2`, `c3`)\n" + + "COMMENT 'OLAP'\n" + + "DISTRIBUTED BY HASH(`c2`) BUCKETS 20\n" + + "PROPERTIES(\n" + + " 'colocate_with' = 'foo',\n" + + " 'replication_num' = '3'\n" + + ")" + + ";"); + + RebalancerTestUtil.updateReplicaPathHash(); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db4"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("table1"); + Assertions.assertNotNull(tbl); + + Partition partition = tbl.getPartitions().iterator().next(); + Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL) + .iterator().next().getTablets().iterator().next(); + Assertions.assertNotNull(tablet); + Backend srcBackend = Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId()); + Assertions.assertNotNull(srcBackend); + + // 4. query tablet num + int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); + + String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + + srcBackend.getHeartbeatPort() + "\""; + AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); + Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + + Assertions.assertTrue(srcBackend.isDecommissioned()); + long startTimestamp = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTimestamp < 90000 + && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + Thread.sleep(1000); + } + + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + + // For now, we have pre-built internal table: analysis_job and column_statistics + Assertions.assertEquals(tabletNum, + Env.getCurrentInvertedIndex().getTabletMetaMap().size()); + + for (Replica replica : tablet.getReplicas()) { + Assertions.assertTrue(replica.getBackendId() != srcBackend.getId()); + } + + // 6. add backend + addNewBackend(); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 41eb4e6c4e1..c61fcd28afe 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -405,7 +405,7 @@ public abstract class TestWithFeService { } private void checkBEHeartbeat(List<Backend> bes) throws InterruptedException { - int maxTry = 10; + int maxTry = FeConstants.heartbeat_interval_second + 5; boolean allAlive = false; while (maxTry-- > 0 && !allAlive) { Thread.sleep(1000); @@ -437,7 +437,9 @@ public abstract class TestWithFeService { } protected Backend addNewBackend() throws IOException, InterruptedException { - return createBackend("127.0.0.1", lastFeRpcPort); + Backend be = createBackend("127.0.0.1", lastFeRpcPort); + checkBEHeartbeat(Lists.newArrayList(be)); + return be; } protected Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org