This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 60910a42e6c [fix](cloud) Fix cloud decomission and check wal (#47187)
60910a42e6c is described below

commit 60910a42e6c9797aae674e175a169f9c78cc74b2
Author: deardeng <deng...@selectdb.com>
AuthorDate: Wed Jan 29 10:07:09 2025 +0800

    [fix](cloud) Fix cloud decomission and check wal (#47187)
---
 .../doris/cloud/catalog/CloudClusterChecker.java   |  15 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  89 +++---
 .../org/apache/doris/load/GroupCommitManager.java  |   6 +
 .../main/java/org/apache/doris/system/Backend.java |  14 +
 .../org/apache/doris/regression/suite/Suite.groovy |  16 +-
 .../node_mgr/test_cloud_decommission.groovy        | 301 +++++++++++++++++++++
 6 files changed, 395 insertions(+), 46 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index b6756fb5cdf..fb0803bed17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -185,9 +185,22 @@ public class CloudClusterChecker extends MasterDaemon {
                     } catch (UserException e) {
                         LOG.warn("failed to register water shed txn id, 
decommission be {}", be.getId(), e);
                     }
-                    be.setDecommissioned(true);
+                    be.setDecommissioning(true);
                 }
             }
+
+            if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED) {
+                // When the synchronization status of the node is 
"NODE_STATUS_DECOMMISSIONED",
+                // it indicates that the conditions for decommissioning have
+                // already been checked in CloudTabletRebalancer.java,
+                // such as the tablets having been successfully migrated and 
no remnants of WAL on the backend (BE).
+                if (!be.isDecommissioned()) {
+                    LOG.warn("impossible status, somewhere has bug,  backend: 
{} status: {}", be, status);
+                }
+                be.setDecommissioned(true);
+                // edit log
+                Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 8e5033470b0..2bd6d8e0256 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -483,46 +483,53 @@ public class CloudTabletRebalancer extends MasterDaemon {
                     LOG.info("backend {} not found", beId);
                     continue;
                 }
-                if ((backend.isDecommissioned() && tabletNum == 0 && 
!backend.isActive())
-                        || (backend.isDecommissioned() && beList.size() == 1)) 
{
-                    LOG.info("check decommission be {} state {} tabletNum {} 
isActive {} beList {}",
-                            backend.getId(), backend.isDecommissioned(), 
tabletNum, backend.isActive(), beList);
-                    if (!beToDecommissionedTime.containsKey(beId)) {
-                        LOG.info("prepare to notify meta service be {} 
decommissioned", backend.getId());
-                        Cloud.AlterClusterRequest.Builder builder =
-                                Cloud.AlterClusterRequest.newBuilder();
-                        builder.setCloudUniqueId(Config.cloud_unique_id);
-                        
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);
-
-                        Cloud.ClusterPB.Builder clusterBuilder =
-                                Cloud.ClusterPB.newBuilder();
-                        
clusterBuilder.setClusterName(backend.getCloudClusterName());
-                        
clusterBuilder.setClusterId(backend.getCloudClusterId());
-                        clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);
-
-                        Cloud.NodeInfoPB.Builder nodeBuilder = 
Cloud.NodeInfoPB.newBuilder();
-                        nodeBuilder.setIp(backend.getHost());
-                        
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
-                        
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
-                        
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);
-
-                        clusterBuilder.addNodes(nodeBuilder);
-                        builder.setCluster(clusterBuilder);
-
-                        Cloud.AlterClusterResponse response;
-                        try {
-                            response = 
MetaServiceProxy.getInstance().alterCluster(builder.build());
-                            if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.OK) {
-                                LOG.warn("notify decommission response: {}", 
response);
-                            }
-                            LOG.info("notify decommission response: {} ", 
response);
-                        } catch (RpcException e) {
-                            LOG.info("failed to notify decommission", e);
-                            return;
-                        }
-                        beToDecommissionedTime.put(beId, 
System.currentTimeMillis() / 1000);
+                if (!backend.isDecommissioning()) {
+                    continue;
+                }
+                // here check wal
+                long walNum = 
Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
+                LOG.info("check decommissioning be {} state {} tabletNum {} 
isActive {} beList {}, wal num {}",
+                        backend.getId(), backend.isDecommissioning(), 
tabletNum, backend.isActive(), beList, walNum);
+                if ((tabletNum != 0 || backend.isActive() || walNum != 0) && 
beList.size() != 1) {
+                    continue;
+                }
+                if (beToDecommissionedTime.containsKey(beId)) {
+                    continue;
+                }
+                LOG.info("prepare to notify meta service be {} 
decommissioned", backend.getAddress());
+                Cloud.AlterClusterRequest.Builder builder =
+                        Cloud.AlterClusterRequest.newBuilder();
+                builder.setCloudUniqueId(Config.cloud_unique_id);
+                
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);
+
+                Cloud.ClusterPB.Builder clusterBuilder =
+                        Cloud.ClusterPB.newBuilder();
+                clusterBuilder.setClusterName(backend.getCloudClusterName());
+                clusterBuilder.setClusterId(backend.getCloudClusterId());
+                clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);
+
+                Cloud.NodeInfoPB.Builder nodeBuilder = 
Cloud.NodeInfoPB.newBuilder();
+                nodeBuilder.setIp(backend.getHost());
+                nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
+                nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
+                
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);
+
+                clusterBuilder.addNodes(nodeBuilder);
+                builder.setCluster(clusterBuilder);
+
+                Cloud.AlterClusterResponse response;
+                try {
+                    response = 
MetaServiceProxy.getInstance().alterCluster(builder.build());
+                    if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.OK) {
+                        LOG.warn("notify decommission response: {}", response);
+                        continue;
                     }
+                    LOG.info("notify decommission response: {} ", response);
+                } catch (RpcException e) {
+                    LOG.warn("failed to notify decommission", e);
+                    continue;
                 }
+                beToDecommissionedTime.put(beId, System.currentTimeMillis() / 
1000);
             }
         }
     }
@@ -884,7 +891,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 LOG.info("backend {} not found", be);
                 continue;
             }
-            if (tabletNum < minTabletsNum && backend.isAlive() && 
!backend.isDecommissioned()
+            if (tabletNum < minTabletsNum && backend.isAlive() && 
!backend.isDecommissioning()
                     && !backend.isSmoothUpgradeSrc()) {
                 destBe = be;
                 minTabletsNum = tabletNum;
@@ -898,7 +905,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 LOG.info("backend {} not found", be);
                 continue;
             }
-            if (backend.isDecommissioned() && tabletNum > 0) {
+            if (backend.isDecommissioning() && tabletNum > 0) {
                 srcBe = be;
                 srcDecommissioned = true;
                 break;
@@ -967,7 +974,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         for (Long be : bes) {
             long tabletNum = beToTablets.get(be) == null ? 0 : 
beToTablets.get(be).size();
             Backend backend = cloudSystemInfoService.getBackend(be);
-            if (backend != null && !backend.isDecommissioned()) {
+            if (backend != null && !backend.isDecommissioning()) {
                 beNum++;
             }
             totalTabletsNum += tabletNum;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 1844ec5bf2b..cb0219e2b20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.SlidingWindowCounter;
 import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@@ -128,6 +129,11 @@ public class GroupCommitManager {
     }
 
     public long getAllWalQueueSize(Backend backend) {
+        long getAllWalQueueSizeDP = 
DebugPointUtil.getDebugParamOrDefault("FE.GET_ALL_WAL_QUEUE_SIZE", -1L);
+        if (getAllWalQueueSizeDP > 0) {
+            LOG.info("backend id:" + backend.getHost() + ",use dp all wal 
size:" + getAllWalQueueSizeDP);
+            return getAllWalQueueSizeDP;
+        }
         PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
                 .setTableId(-1)
                 .build();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index c864e1ba2ae..0bd8377bddb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -97,6 +97,8 @@ public class Backend implements Writable {
     @SerializedName("isDecommissioned")
     private AtomicBoolean isDecommissioned;
 
+    private AtomicBoolean isDecommissioning = new AtomicBoolean(false);
+
     // rootPath -> DiskInfo
     @SerializedName("disksRef")
     private volatile ImmutableMap<String, DiskInfo> disksRef;
@@ -404,6 +406,14 @@ public class Backend implements Writable {
         return false;
     }
 
+    public boolean setDecommissioning(boolean isDecommissioning) {
+        if (this.isDecommissioning.compareAndSet(!isDecommissioning, 
isDecommissioning)) {
+            LOG.warn("{} set decommissioning: {}", this.toString(), 
isDecommissioning);
+            return true;
+        }
+        return false;
+    }
+
     public void setHost(String host) {
         this.host = host;
     }
@@ -490,6 +500,10 @@ public class Backend implements Writable {
         return this.isDecommissioned.get();
     }
 
+    public boolean isDecommissioning() {
+        return this.isDecommissioning.get();
+    }
+
     public boolean isQueryAvailable() {
         return isAlive() && !isQueryDisabled() && !isShutDown.get();
     }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 3d616654057..972feca3ef7 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -2462,7 +2462,7 @@ class Suite implements GroovyInterceptable {
         }
     }
 
-    def get_cluster = { be_unique_id ->
+    def get_cluster = { be_unique_id , MetaService ms=null->
         def jsonOutput = new JsonOutput()
         def map = [instance_id: "${instance_id}", cloud_unique_id: 
"${be_unique_id}" ]
         def js = jsonOutput.toJson(map)
@@ -2470,7 +2470,11 @@ class Suite implements GroovyInterceptable {
 
         def add_cluster_api = { request_body, check_func ->
             httpTest {
-                endpoint context.config.metaServiceHttpAddress
+                if (ms) {
+                    endpoint ms.host+':'+ms.httpPort
+                } else {
+                    endpoint context.config.metaServiceHttpAddress
+                }
                 uri "/MetaService/http/get_cluster?token=${token}"
                 body request_body
                 check check_func
@@ -2643,7 +2647,7 @@ class Suite implements GroovyInterceptable {
         }
     }
 
-    def d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
+    def d_node = { be_unique_id, ip, port, cluster_name, cluster_id, 
MetaService ms=null ->
         def jsonOutput = new JsonOutput()
         def clusterInfo = [
                      type: "COMPUTE",
@@ -2663,7 +2667,11 @@ class Suite implements GroovyInterceptable {
 
         def d_cluster_api = { request_body, check_func ->
             httpTest {
-                endpoint context.config.metaServiceHttpAddress
+                if (ms) {
+                    endpoint ms.host+':'+ms.httpPort
+                } else {
+                    endpoint context.config.metaServiceHttpAddress
+                }
                 uri "/MetaService/http/decommission_node?token=${token}"
                 body request_body
                 check check_func
diff --git 
a/regression-test/suites/cloud_p0/node_mgr/test_cloud_decommission.groovy 
b/regression-test/suites/cloud_p0/node_mgr/test_cloud_decommission.groovy
new file mode 100644
index 00000000000..a8173108f0b
--- /dev/null
+++ b/regression-test/suites/cloud_p0/node_mgr/test_cloud_decommission.groovy
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonSlurper
+import groovy.json.JsonOutput
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("cloud_decommission", 'p0, docker') {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def checkStatus = { ms, decommissionBeUniqueId, decommissionBe ->
+        boolean found = false
+        dockerAwaitUntil(100) {
+            found = false
+            def resp = get_cluster.call(decommissionBeUniqueId, ms)
+            resp.each { cluster ->
+                cluster.nodes.each { node ->
+                    if (node.ip as String == decommissionBe.Host as String && 
node.heartbeat_port as Integer == decommissionBe.HeartbeatPort as Integer && 
node.status as String == "NODE_STATUS_DECOMMISSIONED") {
+                        found = true
+                    }
+                }
+            }
+            found
+        }
+
+        assertTrue(found)
+    }
+
+    def dropAndCheckBe = { host, heartbeatPort ->
+        sql """ ALTER SYSTEM DROPP BACKEND "${host}:${heartbeatPort}" """
+        dockerAwaitUntil(100) {
+            def result = sql_return_maparray """ SHOW BACKENDS """ 
+            log.info("show backends result {}", result)
+            def ret = result.find {it.Host == host && it.HeartbeatPort == 
heartbeatPort}
+            ret == null
+        }
+    }
+
+    def check = { Closure beforeDecommissionActionSupplier, Closure 
afterDecommissionActionSupplier, int beNum ->
+        def begin = System.currentTimeMillis()
+        setFeConfig("cloud_balance_tablet_percent_per_run", 0.5)
+
+        // in docker,be's cluster name
+        sql """ use @compute_cluster """
+
+        def result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
decommission_table """
+        assertEquals(result.size(), beNum)
+        dockerAwaitUntil(100) {
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM decommission_table """
+            if (beNum == 3) {
+                result.every { Integer.valueOf((String) it.ReplicaNum) >= 15 
&& Integer.valueOf((String) it.ReplicaNum) <= 17 }
+            } else {
+                // beNum == 2
+                result.every { Integer.valueOf((String) it.ReplicaNum) >= 23 
&& Integer.valueOf((String) it.ReplicaNum) <= 25 }
+            }
+        }
+
+        if (beforeDecommissionActionSupplier) {
+            beforeDecommissionActionSupplier()
+        }
+        // idx = 1, http decommission one be 
+        def decommissionBeFirstIdx = 1
+        def decommissionBeFirst = cluster.getBeByIndex(decommissionBeFirstIdx)
+        log.info(" decommissionBeFirst: {} ", decommissionBeFirst.host)
+        def showBes = sql_return_maparray """SHOW BACKENDS"""
+        log.info(" showBes: {} ", showBes)
+        def firstDecommissionBe = showBes.find {
+            it.Host == decommissionBeFirst.host
+        }
+        assertNotNull(firstDecommissionBe)
+        log.info("first decommission be {}", firstDecommissionBe)
+        def jsonSlurper = new JsonSlurper()
+        def jsonObject = jsonSlurper.parseText(firstDecommissionBe.Tag)
+        String firstDecommissionBeCloudClusterId = jsonObject.compute_group_id
+        String firstDecommissionBeUniqueId = jsonObject.cloud_unique_id
+        String firstDecommissionBeClusterName = jsonObject.compute_group_name
+
+        def ms = cluster.getAllMetaservices().get(0)
+        logger.info("ms addr={}, port={}", ms.host, ms.httpPort)
+        d_node.call(firstDecommissionBeUniqueId, firstDecommissionBe.Host, 
firstDecommissionBe.HeartbeatPort,
+                firstDecommissionBeClusterName, 
firstDecommissionBeCloudClusterId, ms)
+
+        dockerAwaitUntil(100) {
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM decommission_table """ 
+            result.any { Integer.valueOf((String) it.ReplicaNum) == 0 }
+        }
+
+        checkStatus(ms, firstDecommissionBeUniqueId, firstDecommissionBe)
+
+        result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
decommission_table """ 
+        assertEquals(result.size(), beNum)
+        for (row : result) {
+            log.info("replica distribution: ${row} ".toString())
+        }
+        if (afterDecommissionActionSupplier) {
+            afterDecommissionActionSupplier()
+        }
+
+        // Drop the selected backend
+        dropAndCheckBe(firstDecommissionBe.Host, 
firstDecommissionBe.HeartbeatPort)
+
+        if (beforeDecommissionActionSupplier) {
+            beforeDecommissionActionSupplier()
+        }
+        // idx = 2, sql node decommission one be
+        def decommissionBeSecondIdx = 2
+        def secondDecommissionBe = 
cluster.getBeByIndex(decommissionBeSecondIdx)
+
+        // Decommission the selected backend
+        sql """ ALTER SYSTEM DECOMMISSION BACKEND 
"$secondDecommissionBe.Host:$secondDecommissionBe.HeartbeatPort" """
+
+        result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
decommission_table """ 
+        assertEquals(result.size(), beNum - 1)
+
+        dockerAwaitUntil(100) {
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM decommission_table """ 
+            log.info("show replica result {}", result)
+            def ret = result.findAll { Integer.valueOf((String) it.ReplicaNum) 
== 0 }
+            log.info("ret {}", ret)
+            // be has been droped
+            ret.size() == beNum - 2
+        }
+        def secondDecommissionBeUniqueId = firstDecommissionBeUniqueId
+        checkStatus(ms, secondDecommissionBeUniqueId, secondDecommissionBe)
+
+        for (row : result) {
+            log.info("replica distribution: ${row} ".toString())
+        }
+        if (afterDecommissionActionSupplier) {
+            afterDecommissionActionSupplier()
+        }
+        // Drop the selected backend
+        dropAndCheckBe(secondDecommissionBe.Host, 
secondDecommissionBe.HeartbeatPort)
+
+        def showComputeGroup = sql_return_maparray """ SHOW COMPUTE GROUPS """
+        log.info("show compute group {}", showComputeGroup)
+        // when 2 bes are decommissioned, the compute group will be deleted
+        def bes = sql_return_maparray """ SHOW BACKENDS """
+        if (bes.size() == 0) {
+            assertEquals(0, showComputeGroup.size())
+        }
+
+        System.currentTimeMillis() - begin
+    }
+
+    def checkDifferentAction = { Closure beforeDecommissionActionSupplier, 
Closure afterDecommissionActionSupplier, int atLeastCost, int waitTime, int 
beNum ->
+        def begin = System.currentTimeMillis()
+        def cost = check.call(beforeDecommissionActionSupplier, 
afterDecommissionActionSupplier, beNum)
+        log.info("in check, inner cost {}", cost)
+        cost = System.currentTimeMillis() - begin
+        log.info("in check, outter cost {}", cost)
+        assertTrue(waitTime > atLeastCost)
+        // decommission 2 bes
+        assertTrue(cost >= 2 * waitTime)
+        cost
+    }
+
+    def checkWal = { int atLeastCost, int beNum -> 
+        def futrue = null
+        // 25s
+        def waitTime = 25 * 1000
+        
+        Closure beforeClosure = {
+            log.info("before wal closure")
+            
GetDebugPoint().enableDebugPointForAllFEs("FE.GET_ALL_WAL_QUEUE_SIZE", 
[value:5])
+            futrue = thread {
+                Thread.sleep(waitTime)
+                cluster.clearFrontendDebugPoints()
+            }
+        } as Closure
+
+        Closure afterClosure = {
+            log.info("after wal closure")
+            assertNotNull(futrue)
+            futrue.get()
+        } as Closure
+
+        checkDifferentAction(beforeClosure, afterClosure, atLeastCost as int, 
waitTime as int, beNum)
+    }
+
+    def checkTxnNotFinish = { int atLeastCost, int beNum -> 
+        def futrue = null
+        def waitTime = 30 * 1000
+        // check txn not finish 
+        Closure beforeClosure = {
+            log.info("before insert closure")
+            // after waitTime insert finish
+            futrue = thread {
+                // insert waitTime seconds
+                for (int i = 1; i <= waitTime / 1000; i++) {
+                    Thread.sleep(1 * 1000)
+                    sql """insert into decommission_table values ($i + 1, $i * 
2, $i + 3)"""
+                }
+            }
+        }
+        Closure afterClosure = { ->
+            log.info("after insert closure")
+            assertNotNull(futrue)
+            futrue.get()
+        }
+        checkDifferentAction(beforeClosure, afterClosure, atLeastCost as int, 
waitTime as int, beNum)
+    }
+
+    def createTestTable = { ->
+        sql """
+            CREATE TABLE decommission_table (
+            class INT,
+            id INT,
+            score INT SUM
+            )
+            AGGREGATE KEY(class, id)
+            DISTRIBUTED BY HASH(class) BUCKETS 48
+        """ 
+    }
+
+    def checkFENormalAfterRestart = {
+        cluster.restartFrontends()
+        def reconnectFe = {
+            sleep(5000)
+            logger.info("Reconnecting to a new frontend...")
+            def newFe = cluster.getMasterFe()
+            if (newFe) {
+                logger.info("New frontend found: 
${newFe.host}:${newFe.httpPort}")
+                def url = String.format(
+                        
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
+                        newFe.host, newFe.queryPort)
+                url = context.config.buildUrlWithDb(url, context.dbName)
+                context.connectTo(url, context.config.jdbcUser, 
context.config.jdbcPassword)
+                logger.info("Successfully reconnected to the new frontend")
+            } else {
+                logger.error("No new frontend found to reconnect")
+            }
+        }
+        reconnectFe()
+        def ret = sql """show frontends;"""
+        assertEquals(2, ret.size())
+    }
+
+    def clusterOptions = [
+        new ClusterOptions(),
+        new ClusterOptions(),
+    ]
+
+    for (int i = 0; i < clusterOptions.size(); i++) {
+        log.info("begin {} step", i + 1)
+        clusterOptions[i].feConfigs += [
+            'sys_log_verbose_modules=org',
+            'heartbeat_interval_second=1',
+            'cloud_tablet_rebalancer_interval_second=1',
+            'cloud_cluster_check_interval_second=1'
+        ]
+        clusterOptions[i].setFeNum(2)
+        // cluster has 3 bes
+        // cluster has 2 bes, after decommission 2 nodes, and drop 2 nodes, 
compute group name will be delete from fe
+        int beNum = i == 0 ? 3 : 2
+        clusterOptions[i].setBeNum(beNum)
+        clusterOptions[i].cloudMode = true
+        // clusterOptions[i].connectToFollower = true
+        clusterOptions[i].enableDebugPoints()
+
+        def noWalCheckCost = 0
+        docker(clusterOptions[i]) {
+            createTestTable.call()
+            noWalCheckCost = check(null, null, beNum)
+            checkFENormalAfterRestart.call()
+        }
+        log.info("no wal check cost {}", noWalCheckCost)
+
+        def walCheckCost = 0
+        docker(clusterOptions[i]) {
+            createTestTable.call()
+            walCheckCost = checkWal(noWalCheckCost as int, beNum as int)
+            checkFENormalAfterRestart.call()
+        }
+        log.info("wal check cost {}", walCheckCost)
+
+        def txnCheckCost = 0
+        docker(clusterOptions[i]) {
+            createTestTable.call()
+            txnCheckCost = checkTxnNotFinish(noWalCheckCost as int, beNum as 
int)
+            checkFENormalAfterRestart.call()
+        } 
+        log.info("txn check cost {}", txnCheckCost)
+        log.info("finish {} step", i + 1)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to