This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 6b31a7db6c5 [branch-2.0](colocate group) fix colocate group always 
exclude the same host #33823 (#36503)
6b31a7db6c5 is described below

commit 6b31a7db6c54c6ea28d7e65604ae74d63d58f6b0
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Jun 20 17:52:19 2024 +0800

    [branch-2.0](colocate group) fix colocate group always exclude the same 
host #33823 (#36503)
---
 .../clone/ColocateTableCheckerAndBalancer.java     | 25 ++++++-
 .../doris/cluster/DecommissionBackendTest.java     | 86 +++++++++++++++++++++-
 .../apache/doris/utframe/TestWithFeService.java    |  6 +-
 3 files changed, 110 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 456701213ba..1a085ab8104 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -859,6 +859,8 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
 
                 int targetSeqIndex = -1;
                 long minDataSizeDiff = Long.MAX_VALUE;
+                boolean destBeContainsAllBuckets = true;
+                boolean theSameHostContainsAllBuckets = true;
                 for (int seqIndex : seqIndexes) {
                     // the bucket index.
                     // eg: 0 / 3 = 0, so that the bucket index of the 4th 
backend id in flatBackendsPerBucketSeq is 0.
@@ -866,9 +868,15 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                     List<Long> backendsSet = 
backendsPerBucketSeq.get(bucketIndex);
                     List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
                     // the replicas of a tablet can not locate in same Backend 
or same host
-                    if (backendsSet.contains(destBeId) || 
hostsSet.contains(destBe.getHost())) {
+                    if (backendsSet.contains(destBeId)) {
                         continue;
                     }
+                    destBeContainsAllBuckets = false;
+
+                    if (!Config.allow_replica_on_same_host && 
hostsSet.contains(destBe.getHost())) {
+                        continue;
+                    }
+                    theSameHostContainsAllBuckets = false;
 
                     Preconditions.checkState(backendsSet.contains(srcBeId), 
srcBeId);
                     long bucketDataSize =
@@ -895,8 +903,19 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
 
                 if (targetSeqIndex < 0) {
                     // we use next node as dst node
-                    LOG.info("unable to replace backend {} with backend {} in 
colocate group {}",
-                            srcBeId, destBeId, groupId);
+                    String failedReason;
+                    if (destBeContainsAllBuckets) {
+                        failedReason = "dest be contains all the same buckets";
+                    } else if (theSameHostContainsAllBuckets) {
+                        failedReason = "dest be's host contains all the same 
buckets "
+                                + "and 
Config.allow_replica_on_same_host=false";
+                    } else {
+                        failedReason = "dest be has no fit path, maybe disk 
usage is exceeds "
+                                + 
"Config.storage_high_watermark_usage_percent";
+                    }
+                    LOG.info("unable to replace backend {} with dest backend 
{} in colocate group {}, "
+                            + "failed reason: {}",
+                            srcBeId, destBeId, groupId, failedReason);
                     continue;
                 }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index e689723cdf8..79216f28c40 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -22,6 +22,10 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.clone.RebalancerTestUtil;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
@@ -39,7 +43,7 @@ import java.util.List;
 public class DecommissionBackendTest extends TestWithFeService {
     @Override
     protected int backendNum() {
-        return 3;
+        return 4;
     }
 
     @Override
@@ -56,10 +60,14 @@ public class DecommissionBackendTest extends 
TestWithFeService {
     @Override
     protected void beforeCreatingConnectContext() throws Exception {
         FeConstants.default_scheduler_interval_millisecond = 1000;
-        FeConstants.tablet_checker_interval_ms = 1000;
+        FeConstants.tablet_checker_interval_ms = 100;
+        FeConstants.tablet_schedule_interval_ms = 100;
         Config.tablet_repair_delay_factor_second = 1;
         Config.allow_replica_on_same_host = true;
         Config.disable_balance = true;
+        Config.schedule_batch_size = 1000;
+        Config.schedule_slot_num_per_hdd_path = 1000;
+        FeConstants.heartbeat_interval_second = 5;
     }
 
     @Test
@@ -76,6 +84,7 @@ public class DecommissionBackendTest extends 
TestWithFeService {
 
         // 3. create table tbl1
         createTable("create table db1.tbl1(k1 int) distributed by hash(k1) 
buckets 3 properties('replication_num' = '1');");
+        RebalancerTestUtil.updateReplicaPathHash();
 
         // 4. query tablet num
         int tabletNum = 
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
@@ -133,6 +142,7 @@ public class DecommissionBackendTest extends 
TestWithFeService {
         createTable("create table db2.tbl1(k1 int) distributed by hash(k1) 
buckets 3 properties('replication_num' = '"
                 + availableBeNum + "');");
         createTable("create table db2.tbl2(k1 int) distributed by hash(k1) 
buckets 3 properties('replication_num' = '1');");
+        RebalancerTestUtil.updateReplicaPathHash();
 
         // 4. query tablet num
         int tabletNum = 
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
@@ -180,9 +190,81 @@ public class DecommissionBackendTest extends 
TestWithFeService {
         // recover tbl1, because tbl1 has more than one replica, so it still 
can be recovered
         Assertions.assertDoesNotThrow(() -> recoverTable("db2.tbl1"));
         Assertions.assertDoesNotThrow(() -> showCreateTable(sql));
+        dropTable("db2.tbl1", false);
 
         addNewBackend();
         Assertions.assertEquals(backendNum(), 
Env.getCurrentSystemInfo().getIdToBackend().size());
     }
 
+    @Test
+    public void testDecommissionBackendWithColocateGroup() throws Exception {
+        // 1. create connect context
+        connectContext = createDefaultCtx();
+
+        ImmutableMap<Long, Backend> idToBackendRef = 
Env.getCurrentSystemInfo().getIdToBackend();
+        Assertions.assertEquals(backendNum(), idToBackendRef.size());
+
+        // 2. create database db1
+        createDatabase("db4");
+        System.out.println(Env.getCurrentInternalCatalog().getDbNames());
+
+        // 3. create table
+        createTable("CREATE TABLE db4.table1 (\n"
+                + " `c1` varchar(20) NULL,\n"
+                + " `c2` bigint(20) NULL,\n"
+                + " `c3` int(20) not NULL,\n"
+                + " `k4` bitmap BITMAP_UNION NULL,\n"
+                + " `k5` bitmap BITMAP_UNION NULL\n"
+                + ") ENGINE=OLAP\n"
+                + "AGGREGATE KEY(`c1`, `c2`, `c3`)\n"
+                + "COMMENT 'OLAP'\n"
+                + "DISTRIBUTED BY HASH(`c2`) BUCKETS 20\n"
+                + "PROPERTIES(\n"
+                + "  'colocate_with' = 'foo',\n"
+                + "  'replication_num' = '3'\n"
+                + ")"
+                + ";");
+
+        RebalancerTestUtil.updateReplicaPathHash();
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db4");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException("table1");
+        Assertions.assertNotNull(tbl);
+
+        Partition partition = tbl.getPartitions().iterator().next();
+        Tablet tablet = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
+                .iterator().next().getTablets().iterator().next();
+        Assertions.assertNotNull(tablet);
+        Backend srcBackend = 
Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId());
+        Assertions.assertNotNull(srcBackend);
+
+        // 4. query tablet num
+        int tabletNum = 
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
+
+        String decommissionStmtStr = "alter system decommission backend 
\"127.0.0.1:"
+                + srcBackend.getHeartbeatPort() + "\"";
+        AlterSystemStmt decommissionStmt = (AlterSystemStmt) 
parseAndAnalyzeStmt(decommissionStmtStr);
+        
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
+
+        Assertions.assertTrue(srcBackend.isDecommissioned());
+        long startTimestamp = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTimestamp < 90000
+            && 
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+            Thread.sleep(1000);
+        }
+
+        Assertions.assertEquals(backendNum() - 1, 
Env.getCurrentSystemInfo().getIdToBackend().size());
+
+        // For now, we have pre-built internal table: analysis_job and 
column_statistics
+        Assertions.assertEquals(tabletNum,
+                Env.getCurrentInvertedIndex().getTabletMetaMap().size());
+
+        for (Replica replica : tablet.getReplicas()) {
+            Assertions.assertTrue(replica.getBackendId() != 
srcBackend.getId());
+        }
+
+        // 6. add backend
+        addNewBackend();
+        Assertions.assertEquals(backendNum(), 
Env.getCurrentSystemInfo().getIdToBackend().size());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 41eb4e6c4e1..c61fcd28afe 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -405,7 +405,7 @@ public abstract class TestWithFeService {
     }
 
     private void checkBEHeartbeat(List<Backend> bes) throws 
InterruptedException {
-        int maxTry = 10;
+        int maxTry = FeConstants.heartbeat_interval_second + 5;
         boolean allAlive = false;
         while (maxTry-- > 0 && !allAlive) {
             Thread.sleep(1000);
@@ -437,7 +437,9 @@ public abstract class TestWithFeService {
     }
 
     protected Backend addNewBackend() throws IOException, InterruptedException 
{
-        return createBackend("127.0.0.1", lastFeRpcPort);
+        Backend be = createBackend("127.0.0.1", lastFeRpcPort);
+        checkBEHeartbeat(Lists.newArrayList(be));
+        return be;
     }
 
     protected Backend createBackend(String beHost, int feRpcPort) throws 
IOException, InterruptedException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to