This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f80750faede [improvement](clone) dead be will abort sched task #36795 
(#36897)
f80750faede is described below

commit f80750faedecb6fe71f5a829e458432e462efc28
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Jun 27 13:35:51 2024 +0800

    [improvement](clone) dead be will abort sched task #36795 (#36897)
    
    cherry pick from #36795
---
 .../org/apache/doris/clone/TabletScheduler.java    |   9 ++
 .../apache/doris/common/util/DebugPointUtil.java   |  10 +-
 .../java/org/apache/doris/system/HeartbeatMgr.java |  10 ++
 .../apache/doris/clone/BeDownCancelCloneTest.java  | 148 +++++++++++++++++++++
 .../apache/doris/utframe/MockedBackendFactory.java |   8 ++
 5 files changed, 183 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 094beca0425..b92d9fa86b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1867,10 +1867,13 @@ public class TabletScheduler extends MasterDaemon {
      * If task is timeout, remove the tablet.
      */
     public void handleRunningTablets() {
+        Set<Long> aliveBeIds = 
Sets.newHashSet(Env.getCurrentSystemInfo().getAllBackendIds(true));
         // 1. remove the tablet ctx if timeout
         List<TabletSchedCtx> cancelTablets = Lists.newArrayList();
         synchronized (this) {
             for (TabletSchedCtx tabletCtx : runningTablets.values()) {
+                long srcBeId = tabletCtx.getSrcBackendId();
+                long destBeId = tabletCtx.getDestBackendId();
                 if (Config.disable_tablet_scheduler) {
                     tabletCtx.setErrMsg("tablet scheduler is disabled");
                     cancelTablets.add(tabletCtx);
@@ -1881,6 +1884,12 @@ public class TabletScheduler extends MasterDaemon {
                     tabletCtx.setErrMsg("timeout");
                     cancelTablets.add(tabletCtx);
                     stat.counterCloneTaskTimeout.incrementAndGet();
+                } else if (destBeId > 0 && !aliveBeIds.contains(destBeId)) {
+                    tabletCtx.setErrMsg("dest be " + destBeId + " is dead");
+                    cancelTablets.add(tabletCtx);
+                } else if (srcBeId > 0 && !aliveBeIds.contains(srcBeId)) {
+                    tabletCtx.setErrMsg("src be " + srcBeId + " is dead");
+                    cancelTablets.add(tabletCtx);
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
index da06232f0c0..420cee77631 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
@@ -134,12 +134,18 @@ public class DebugPointUtil {
         addDebugPoint(name, new DebugPoint());
     }
 
-    public static <E> void addDebugPointWithValue(String name, E value) {
+    public static void addDebugPointWithParams(String name, Map<String, 
String> params) {
         DebugPoint debugPoint = new DebugPoint();
-        debugPoint.params.put("value", String.format("%s", value));
+        debugPoint.params = params;
         addDebugPoint(name, debugPoint);
     }
 
+    public static <E> void addDebugPointWithValue(String name, E value) {
+        Map<String, String> params = Maps.newHashMap();
+        params.put("value", String.format("%s", value));
+        addDebugPointWithParams(name, params);
+    }
+
     public static void removeDebugPoint(String name) {
         DebugPoint debugPoint = debugPoints.remove(name);
         LOG.info("remove debug point: name={}, exists={}", name, debugPoint != 
null);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 9d13218ae06..9a5058f8d02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.Version;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.persist.HbPackage;
 import org.apache.doris.resource.Tag;
@@ -56,6 +57,7 @@ import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -253,6 +255,14 @@ public class HeartbeatMgr extends MasterDaemon {
                     result.setBackendInfo(backendInfo);
                 }
 
+                String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault(
+                        "HeartbeatMgr.BackendHeartbeatHandler", "deadBeIds", 
"");
+                if (!Strings.isNullOrEmpty(debugDeadBeIds)
+                        && 
Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) == 
backendId)) {
+                    
result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
+                    result.getStatus().addToErrorMsgs("debug point 
HeartbeatMgr.deadBeIds set dead be");
+                }
+
                 ok = true;
                 if (result.getStatus().getStatusCode() == TStatusCode.OK) {
                     TBackendInfo tBackendInfo = result.getBackendInfo();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
new file mode 100644
index 00000000000..06cef5e2446
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class BeDownCancelCloneTest extends TestWithFeService {
+
+    @Override
+    protected int backendNum() {
+        return 4;
+    }
+
+    @Override
+    protected void beforeCreatingConnectContext() throws Exception {
+        FeConstants.runningUnitTest = true;
+        FeConstants.default_scheduler_interval_millisecond = 1000;
+        Config.enable_debug_points = true;
+        Config.tablet_checker_interval_ms = 100;
+        Config.tablet_schedule_interval_ms = 100;
+        Config.tablet_repair_delay_factor_second = 1;
+        Config.allow_replica_on_same_host = true;
+        Config.disable_balance = true;
+        Config.schedule_batch_size = 1000;
+        Config.schedule_slot_num_per_hdd_path = 1000;
+        Config.heartbeat_interval_second = 5;
+        Config.max_backend_heartbeat_failure_tolerance_count = 1;
+        Config.min_clone_task_timeout_sec = 20 * 60 * 1000;
+    }
+
+    @Test
+    public void test() throws Exception {
+        connectContext = createDefaultCtx();
+
+        createDatabase("db1");
+        System.out.println(Env.getCurrentInternalCatalog().getDbNames());
+
+        // 3. create table tbl1
+        createTable("create table db1.tbl1(k1 int) distributed by hash(k1) 
buckets 1;");
+        RebalancerTestUtil.updateReplicaPathHash();
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+        Assertions.assertNotNull(tbl);
+        Tablet tablet = tbl.getPartitions().iterator().next()
+                
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+                .getTablets().iterator().next();
+
+        Assertions.assertEquals(3, tablet.getReplicas().size());
+        long destBeId = 
Env.getCurrentSystemInfo().getAllBackendIds(true).stream()
+                .filter(beId -> tablet.getReplicaByBackendId(beId) == null)
+                .findFirst()
+                .orElse(-1L);
+        Assertions.assertTrue(destBeId != -1L);
+        Backend destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
+        Assertions.assertNotNull(destBe);
+        Assertions.assertTrue(destBe.isAlive());
+
+        // add debug point, make clone wait
+        
DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.block");
+
+        // move replica[0] to destBeId
+        Replica srcReplica = tablet.getReplicas().get(0);
+        String moveTabletSql = "ADMIN SET REPLICA STATUS 
PROPERTIES(\"tablet_id\" = \"" + tablet.getId() + "\", "
+                + "\"backend_id\" = \"" + srcReplica.getBackendId() + "\", 
\"status\" = \"drop\")";
+        Assertions.assertNotNull(getSqlStmtExecutor(moveTabletSql));
+        Assertions.assertFalse(srcReplica.isScheduleAvailable());
+
+        Thread.sleep(3000);
+
+        Assertions.assertEquals(0, 
Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).size());
+        Assertions.assertEquals(4, tablet.getReplicas().size());
+        Replica destReplica = tablet.getReplicaByBackendId(destBeId);
+        Assertions.assertNotNull(destReplica);
+        Assertions.assertEquals(Replica.ReplicaState.CLONE, 
destReplica.getState());
+
+        // clone a replica on destBe
+        List<TabletSchedCtx> runningTablets = 
Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
+        Assertions.assertEquals(1, runningTablets.size());
+        Assertions.assertEquals(destBeId, 
runningTablets.get(0).getDestBackendId());
+
+        Map<String, String> params2 = Maps.newHashMap();
+        params2.put("deadBeIds", String.valueOf(destBeId));
+        
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", 
params2);
+
+        Thread.sleep((Config.heartbeat_interval_second
+                * Config.max_backend_heartbeat_failure_tolerance_count + 4) * 
1000L);
+
+        destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
+        Assertions.assertNotNull(destBe);
+        Assertions.assertFalse(destBe.isAlive());
+
+        // delete clone dest task
+        
Assertions.assertFalse(Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).isEmpty());
+
+        // first drop dest replica (its backend is dead) and src replica (it's 
mark as drop)
+        // then re clone a replica to src be, and waiting for cloning.
+        runningTablets = 
Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
+        Assertions.assertEquals(1, runningTablets.size());
+        Assertions.assertEquals(srcReplica.getBackendId(), 
runningTablets.get(0).getDestBackendId());
+
+        
DebugPointUtil.removeDebugPoint("MockedBackendFactory.handleCloneTablet.block");
+        Thread.sleep(2000);
+
+        // destBe is dead, cancel clone task
+        runningTablets = 
Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
+        Assertions.assertEquals(0, runningTablets.size());
+
+        Assertions.assertEquals(3, tablet.getReplicas().size());
+        for (Replica replica : tablet.getReplicas()) {
+            Assertions.assertTrue(replica.getBackendId() != destBeId);
+            Assertions.assertTrue(replica.isScheduleAvailable());
+            Assertions.assertEquals(Replica.ReplicaState.NORMAL, 
replica.getState());
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 735c46c70be..12273331634 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.DiskInfo.DiskState;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.PBackendServiceGrpc;
@@ -235,6 +236,13 @@ public class MockedBackendFactory {
                 }
 
                 private void handleCloneTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
+                    while 
(DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.block")) {
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                            // ignore
+                        }
+                    }
                     TCloneReq req = request.getCloneReq();
                     long dataSize = Math.max(1, 
CatalogTestUtil.getTabletDataSize(req.tablet_id));
                     long pathHash = req.dest_path_hash;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to