This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d45269b041 [Feature](cloud) Add cloud report for clean up expired 
tablets (#42066)
9d45269b041 is described below

commit 9d45269b041441d08d032a79a1504d67ecc8b103
Author: deardeng <565620...@qq.com>
AuthorDate: Tue Nov 5 00:35:17 2024 +0800

    [Feature](cloud) Add cloud report for clean up expired tablets (#42066)
    
    1. add tablet report to fe in cloud
    2. clean up expired tablet in be
---
 be/src/agent/agent_server.cpp                      |  15 +-
 be/src/agent/heartbeat_server.cpp                  |   6 +
 be/src/agent/task_worker_pool.cpp                  |  61 ++++++++
 be/src/agent/task_worker_pool.h                    |   4 +
 be/src/cloud/cloud_tablet.cpp                      |   8 ++
 be/src/cloud/cloud_tablet.h                        |   3 +
 be/src/cloud/cloud_tablet_mgr.cpp                  |  72 +++++++++-
 be/src/cloud/cloud_tablet_mgr.h                    |  16 +++
 be/src/cloud/config.cpp                            |   1 +
 be/src/cloud/config.h                              |   2 +
 be/src/http/action/tablets_info_action.cpp         |  20 +--
 be/src/olap/base_tablet.h                          |   3 +
 be/src/olap/tablet.cpp                             |   4 -
 be/src/olap/tablet.h                               |   3 -
 be/src/service/http_service.cpp                    |   2 +-
 .../apache/doris/cloud/CacheHotspotManager.java    |   4 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  42 +++++-
 .../doris/cloud/master/CloudReportHandler.java     |  89 ++++++++++++
 .../java/org/apache/doris/master/MasterImpl.java   |   3 +-
 .../org/apache/doris/master/ReportHandler.java     |  19 ++-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +-
 .../java/org/apache/doris/system/HeartbeatMgr.java |   1 +
 .../org/apache/doris/clone/RepairVersionTest.java  |  10 +-
 gensrc/thrift/HeartbeatService.thrift              |   2 +
 gensrc/thrift/MasterService.thrift                 |   2 +
 .../test_clean_tablet_when_drop_force_table.groovy | 140 ++++++++++++++++++
 .../test_clean_tablet_when_rebalance.groovy        | 158 +++++++++++++++++++++
 27 files changed, 650 insertions(+), 42 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 9d36148b64f..361a8ab93a9 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -33,6 +33,7 @@
 #include "agent/utils.h"
 #include "agent/workload_group_listener.h"
 #include "agent/workload_sched_policy_listener.h"
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
@@ -193,7 +194,7 @@ void AgentServer::start_workers(StorageEngine& engine, 
ExecEnv* exec_env) {
             "REPORT_DISK_STATE", _master_info, 
config::report_disk_state_interval_seconds, [&engine, &master_info = 
_master_info] { report_disk_callback(engine, master_info); }));
 
     _report_workers.push_back(std::make_unique<ReportWorker>(
-            "REPORT_OLAP_TABLE", _master_info, 
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { 
report_tablet_callback(engine, master_info); }));
+            "REPORT_OLAP_TABLET", _master_info, 
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { 
report_tablet_callback(engine, master_info); }));
     // clang-format on
 }
 
@@ -211,6 +212,10 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& 
engine, ExecEnv* exec_
             "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
             [&engine](auto&& task) { return 
calc_delete_bitmap_callback(engine, task); });
 
+    // cloud, drop tablet just clean clear_cache, so just one thread do it
+    _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
+            "DROP_TABLE", 1, [&engine](auto&& task) { return 
drop_tablet_callback(engine, task); });
+
     _report_workers.push_back(std::make_unique<ReportWorker>(
             "REPORT_TASK", _master_info, config::report_task_interval_seconds,
             [&master_info = _master_info] { report_task_callback(master_info); 
}));
@@ -218,6 +223,14 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& 
engine, ExecEnv* exec_
     _report_workers.push_back(std::make_unique<ReportWorker>(
             "REPORT_DISK_STATE", _master_info, 
config::report_disk_state_interval_seconds,
             [&engine, &master_info = _master_info] { 
report_disk_callback(engine, master_info); }));
+
+    if (config::enable_cloud_tablet_report) {
+        _report_workers.push_back(std::make_unique<ReportWorker>(
+                "REPORT_OLAP_TABLET", _master_info, 
config::report_tablet_interval_seconds,
+                [&engine, &master_info = _master_info] {
+                    report_tablet_callback(engine, master_info);
+                }));
+    }
 }
 
 // TODO(lingbin): each task in the batch may have it own status or FE must 
check and
diff --git a/be/src/agent/heartbeat_server.cpp 
b/be/src/agent/heartbeat_server.cpp
index 146604aaab2..78002ed08fe 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <string>
 
+#include "cloud/cloud_tablet_mgr.h"
 #include "cloud/config.h"
 #include "common/config.h"
 #include "common/status.h"
@@ -275,6 +276,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
         LOG(INFO) << "set config cloud_unique_id " << 
master_info.cloud_unique_id << " " << st;
     }
 
+    if (master_info.__isset.tablet_report_inactive_duration_ms) {
+        doris::g_tablet_report_inactive_duration_ms =
+                master_info.tablet_report_inactive_duration_ms;
+    }
+
     if (need_report) {
         LOG(INFO) << "Master FE is changed or restarted. report tablet and 
disk info immediately";
         _engine.notify_listeners();
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 5906511ce15..d9efe6dbedd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -48,6 +48,8 @@
 #include "cloud/cloud_delete_task.h"
 #include "cloud/cloud_engine_calc_delete_bitmap_task.h"
 #include "cloud/cloud_schema_change_job.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
@@ -116,6 +118,10 @@ bool register_task_info(const TTaskType::type task_type, 
int64_t signature) {
         // no need to report task of these types
         return true;
     }
+    if (task_type == TTaskType::type::DROP && config::is_cloud_mode()) {
+        // cloud no need to report drop task status
+        return true;
+    }
 
     if (signature == -1) { // No need to report task with unintialized 
signature
         return true;
@@ -1134,6 +1140,46 @@ void report_tablet_callback(StorageEngine& engine, const 
TMasterInfo& master_inf
     }
 }
 
+void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& 
master_info) {
+    // Random sleep 1~5 seconds before doing report.
+    // In order to avoid the problem that the FE receives many report requests 
at the same time
+    // and can not be processed.
+    if (config::report_random_wait) {
+        random_sleep(5);
+    }
+
+    TReportRequest request;
+    request.__set_backend(BackendOptions::get_local_backend());
+    request.__isset.tablets = true;
+
+    increase_report_version();
+    uint64_t report_version;
+    uint64_t total_num_tablets = 0;
+    for (int i = 0; i < 5; i++) {
+        request.tablets.clear();
+        report_version = s_report_version;
+        engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, 
&total_num_tablets);
+        if (report_version == s_report_version) {
+            break;
+        }
+    }
+
+    if (report_version < s_report_version) {
+        LOG(WARNING) << "report version " << report_version << " change to " 
<< s_report_version;
+        
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
+        return;
+    }
+
+    request.__set_report_version(report_version);
+    request.__set_num_tablets(total_num_tablets);
+
+    bool succ = handle_report(request, master_info, "tablet");
+    report_tablet_total << 1;
+    if (!succ) [[unlikely]] {
+        report_tablet_failed << 1;
+    }
+}
+
 void upload_callback(StorageEngine& engine, ExecEnv* env, const 
TAgentTaskRequest& req) {
     const auto& upload_request = req.upload_req;
 
@@ -1610,6 +1656,21 @@ void drop_tablet_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
     remove_task_info(req.task_type, req.signature);
 }
 
+void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& 
req) {
+    const auto& drop_tablet_req = req.drop_tablet_req;
+    DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
+        LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
+                .tag("tablet_id", drop_tablet_req.tablet_id);
+        return;
+    });
+    // 1. erase lru from tablet mgr
+    // TODO(dx) clean tablet file cache
+    // get tablet's info(such as cachekey, tablet id, rsid)
+    engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
+    // 2. gen clean file cache task
+    return;
+}
+
 void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
     const auto& push_req = req.push_req;
 
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index f51d6c2a4c0..c50ac57ffe9 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -155,6 +155,8 @@ void create_tablet_callback(StorageEngine& engine, const 
TAgentTaskRequest& req)
 
 void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req);
 
+void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& 
req);
+
 void clear_transaction_task_callback(StorageEngine& engine, const 
TAgentTaskRequest& req);
 
 void push_callback(StorageEngine& engine, const TAgentTaskRequest& req);
@@ -188,6 +190,8 @@ void report_disk_callback(CloudStorageEngine& engine, const 
TMasterInfo& master_
 
 void report_tablet_callback(StorageEngine& engine, const TMasterInfo& 
master_info);
 
+void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& 
master_info);
+
 void calc_delete_bitmap_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
 
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 54ea450f204..b467703637c 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -897,4 +897,12 @@ Status CloudTablet::sync_meta() {
     return Status::OK();
 }
 
+void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
+    std::shared_lock rdlock(_meta_lock);
+    tablet_info->__set_total_version_count(_tablet_meta->version_count());
+    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
+    // Currently, this information will not be used by the cloud report,
+    // but it may be used in the future.
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index a79d25f7540..5f4785b62d2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -196,10 +196,13 @@ public:
     int64_t last_base_compaction_success_time_ms = 0;
     int64_t last_cumu_compaction_success_time_ms = 0;
     int64_t last_cumu_no_suitable_version_ms = 0;
+    int64_t last_access_time_ms = 0;
 
     // Return merged extended schema
     TabletSchemaSPtr merged_tablet_schema() const override;
 
+    void build_tablet_report_info(TTabletInfo* tablet_info);
+
 private:
     // FIXME(plat1ko): No need to record base size if rowsets are ordered by 
version
     void update_base_size(const Rowset& rs);
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index e5c31785c1e..7ecb72e62fd 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -28,6 +28,7 @@
 #include "runtime/memory/cache_policy.h"
 
 namespace doris {
+uint64_t g_tablet_report_inactive_duration_ms = 0;
 namespace {
 
 // port from
@@ -142,6 +143,12 @@ CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)
 
 CloudTabletMgr::~CloudTabletMgr() = default;
 
+void set_tablet_access_time_ms(CloudTablet* tablet) {
+    using namespace std::chrono;
+    int64_t now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+    tablet->last_access_time_ms = now;
+}
+
 Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t 
tablet_id,
                                                                 bool 
warmup_data) {
     // LRU value type. `Value`'s lifetime MUST NOT be longer than 
`CloudTabletMgr`
@@ -181,8 +188,11 @@ Result<std::shared_ptr<CloudTablet>> 
CloudTabletMgr::get_tablet(int64_t tablet_i
 
             auto* handle = _cache->insert(key, value.release(), 1, 
sizeof(CloudTablet),
                                           CachePriority::NORMAL);
-            auto ret = std::shared_ptr<CloudTablet>(
-                    tablet.get(), [this, handle](...) { 
_cache->release(handle); });
+            auto ret =
+                    std::shared_ptr<CloudTablet>(tablet.get(), [this, 
handle](CloudTablet* tablet) {
+                        set_tablet_access_time_ms(tablet);
+                        _cache->release(handle);
+                    });
             _tablet_map->put(std::move(tablet));
             return ret;
         };
@@ -191,12 +201,16 @@ Result<std::shared_ptr<CloudTablet>> 
CloudTabletMgr::get_tablet(int64_t tablet_i
         if (tablet == nullptr) {
             return ResultError(Status::InternalError("failed to get tablet 
{}", tablet_id));
         }
+        set_tablet_access_time_ms(tablet.get());
         return tablet;
     }
 
     CloudTablet* tablet_raw_ptr = 
reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
-    auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr,
-                                               [this, handle](...) { 
_cache->release(handle); });
+    set_tablet_access_time_ms(tablet_raw_ptr);
+    auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, 
handle](CloudTablet* tablet) {
+        set_tablet_access_time_ms(tablet);
+        _cache->release(handle);
+    });
     return tablet;
 }
 
@@ -357,4 +371,54 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(
     return Status::OK();
 }
 
+void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, 
TTablet>* tablets_info,
+                                                   uint64_t* tablet_num) {
+    DCHECK(tablets_info != nullptr);
+    VLOG_NOTICE << "begin to build all report cloud tablets info";
+
+    HistogramStat tablet_version_num_hist;
+
+    auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
+        auto tablet = tablet_wk.lock();
+        if (!tablet) return;
+        (*tablet_num)++;
+        TTabletInfo tablet_info;
+        tablet->build_tablet_report_info(&tablet_info);
+        using namespace std::chrono;
+        int64_t now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+        if (now - g_tablet_report_inactive_duration_ms * 1000 < 
tablet->last_access_time_ms) {
+            // the tablet is still being accessed and used in recently, so not 
report it
+            return;
+        }
+        auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
+        // On the cloud, a specific BE has only one tablet replica;
+        // there are no multiple replicas for a specific BE.
+        // This is only to reuse the non-cloud report protocol.
+        tablet_version_num_hist.add(tablet_info.total_version_count);
+        t_tablet.tablet_infos.emplace_back(std::move(tablet_info));
+    };
+
+    auto weak_tablets = get_weak_tablets();
+    std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);
+
+    DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
+            tablet_version_num_hist);
+    LOG(INFO) << "success to build all cloud report tablets info. 
all_tablet_count=" << *tablet_num
+              << " exceed drop time limit count=" << tablets_info->size();
+}
+
+void CloudTabletMgr::get_tablet_info(int64_t num_tablets, 
std::vector<TabletInfo>* tablets_info) {
+    auto weak_tablets = get_weak_tablets();
+    for (auto& weak_tablet : weak_tablets) {
+        auto tablet = weak_tablet.lock();
+        if (tablet == nullptr) {
+            continue;
+        }
+        if (tablets_info->size() >= num_tablets) {
+            return;
+        }
+        tablets_info->push_back(tablet->get_tablet_info());
+    }
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h
index 976d483b36c..903f372cbde 100644
--- a/be/src/cloud/cloud_tablet_mgr.h
+++ b/be/src/cloud/cloud_tablet_mgr.h
@@ -17,6 +17,9 @@
 
 #pragma once
 
+#include <gen_cpp/MasterService_types.h>
+#include <gen_cpp/Types_types.h>
+
 #include <functional>
 #include <memory>
 #include <vector>
@@ -31,6 +34,8 @@ class CloudStorageEngine;
 class LRUCachePolicy;
 class CountDownLatch;
 
+extern uint64_t g_tablet_report_inactive_duration_ms;
+
 class CloudTabletMgr {
 public:
     CloudTabletMgr(CloudStorageEngine& engine);
@@ -65,6 +70,17 @@ public:
                                        
std::vector<std::shared_ptr<CloudTablet>>* tablets,
                                        int64_t* max_score);
 
+    /**
+     * Gets tablets info and total tablet num that are reported
+     *
+     * @param tablets_info used by report
+     * @param tablet_num tablets in be tabletMgr, total num
+     */
+    void build_all_report_tablets_info(std::map<TTabletId, TTablet>* 
tablets_info,
+                                       uint64_t* tablet_num);
+
+    void get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* 
tablets_info);
+
 private:
     CloudStorageEngine& _engine;
 
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index e724dbea84e..32e3250f87c 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -75,4 +75,5 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
 
 DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");
 
+DEFINE_mBool(enable_cloud_tablet_report, "true");
 } // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 86197f924d0..8af967afb8c 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -108,4 +108,6 @@ DECLARE_mInt32(tablet_txn_info_min_expired_seconds);
 
 DECLARE_mBool(enable_use_cloud_unique_id_from_fe);
 
+DECLARE_Bool(enable_cloud_tablet_report);
+
 } // namespace doris::config
diff --git a/be/src/http/action/tablets_info_action.cpp 
b/be/src/http/action/tablets_info_action.cpp
index 9c27c1de9a0..672b03ce6ce 100644
--- a/be/src/http/action/tablets_info_action.cpp
+++ b/be/src/http/action/tablets_info_action.cpp
@@ -24,6 +24,8 @@
 #include <string>
 #include <vector>
 
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
 #include "cloud/config.h"
 #include "http/http_channel.h"
 #include "http/http_headers.h"
@@ -51,12 +53,6 @@ void TabletsInfoAction::handle(HttpRequest* req) {
 
 EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) {
     EasyJson tablets_info_ej;
-    if (config::is_cloud_mode()) {
-        // TODO(plat1ko): CloudStorageEngine
-        tablets_info_ej["msg"] = "TabletsInfoAction::get_tablets_info is not 
implemented";
-        tablets_info_ej["code"] = 0;
-        return tablets_info_ej;
-    }
 
     int64_t number;
     std::string msg;
@@ -74,9 +70,15 @@ EasyJson TabletsInfoAction::get_tablets_info(string 
tablet_num_to_return) {
         msg = "Parameter Error";
     }
     std::vector<TabletInfo> tablets_info;
-    TabletManager* tablet_manager =
-            
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager();
-    tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
+    if (!config::is_cloud_mode()) {
+        TabletManager* tablet_manager =
+                
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager();
+        tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
+    } else {
+        CloudTabletMgr& cloud_tablet_manager =
+                
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr();
+        cloud_tablet_manager.get_tablet_info(number, &tablets_info);
+    }
 
     tablets_info_ej["msg"] = msg;
     tablets_info_ej["code"] = 0;
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 11eae0e47f9..b5da0e3bf06 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -293,6 +293,9 @@ public:
 
     Status show_nested_index_file(std::string* json_meta);
 
+    TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); }
+    TabletInfo get_tablet_info() const { return TabletInfo(tablet_id(), 
tablet_uid()); }
+
 protected:
     // Find the missed versions until the spec_version.
     //
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 7c69ba54831..b1d4c9dfb89 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1205,10 +1205,6 @@ Status Tablet::_contains_version(const Version& version) 
{
     return Status::OK();
 }
 
-TabletInfo Tablet::get_tablet_info() const {
-    return TabletInfo(tablet_id(), tablet_uid());
-}
-
 std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_cumulative_compaction() {
     std::vector<RowsetSharedPtr> candidate_rowsets;
     if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index e181af3d4d3..f5866c67641 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -115,7 +115,6 @@ public:
 
     DataDir* data_dir() const { return _data_dir; }
     int64_t replica_id() const { return _tablet_meta->replica_id(); }
-    TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); }
 
     const std::string& tablet_path() const { return _tablet_path; }
 
@@ -279,8 +278,6 @@ public:
 
     void check_tablet_path_exists();
 
-    TabletInfo get_tablet_info() const;
-
     std::vector<RowsetSharedPtr> 
pick_candidate_rowsets_to_cumulative_compaction();
     std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
     std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction();
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 7704d07b6f9..e7b920796a1 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -387,7 +387,7 @@ void HttpService::register_local_handler(StorageEngine& 
engine) {
     _ev_http_server->register_handler(HttpMethod::POST, "/api/pad_rowset", 
pad_rowset_action);
 
     ReportAction* report_tablet_action = _pool.add(new ReportAction(
-            _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, 
"REPORT_OLAP_TABLE"));
+            _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, 
"REPORT_OLAP_TABLET"));
     _ev_http_server->register_handler(HttpMethod::GET, "/api/report/tablet", 
report_tablet_action);
 
     ReportAction* report_disk_action = _pool.add(new ReportAction(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index b35a3b9e911..0b83baa94d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -429,7 +429,7 @@ public class CacheHotspotManager extends MasterDaemon {
         for (Backend backend : backends) {
             Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
                                     .getCloudTabletRebalancer()
-                                    .getSnapshotTabletsByBeId(backend.getId());
+                                    
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
             List<Tablet> warmUpTablets = new ArrayList<>();
             for (Tablet tablet : tablets) {
                 if (beTabletIds.contains(tablet.getId())) {
@@ -559,7 +559,7 @@ public class CacheHotspotManager extends MasterDaemon {
             for (Backend backend : backends) {
                 Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
                                         .getCloudTabletRebalancer()
-                                        
.getSnapshotTabletsByBeId(backend.getId());
+                                        
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
                 List<Tablet> warmUpTablets = new ArrayList<>();
                 for (Tablet tablet : tablets) {
                     if (beTabletIds.contains(tablet.getId())) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 78947afdb11..8e5033470b0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -73,6 +73,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
     private volatile ConcurrentHashMap<Long, List<Tablet>> 
beToColocateTabletsGlobal =
             new ConcurrentHashMap<Long, List<Tablet>>();
 
+    // used for cloud tablet report
+    private volatile ConcurrentHashMap<Long, List<Tablet>> 
beToTabletsGlobalInSecondary =
+            new ConcurrentHashMap<Long, List<Tablet>>();
+
     private Map<Long, List<Tablet>> futureBeToTabletsGlobal;
 
     private Map<String, List<Long>> clusterToBes;
@@ -164,7 +168,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         public boolean srcDecommissioned;
     }
 
-    public Set<Long> getSnapshotTabletsByBeId(Long beId) {
+    public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
         Set<Long> tabletIds = Sets.newHashSet();
         List<Tablet> tablets = beToTabletsGlobal.get(beId);
         if (tablets != null) {
@@ -183,6 +187,24 @@ public class CloudTabletRebalancer extends MasterDaemon {
         return tabletIds;
     }
 
+    public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
+        Set<Long> tabletIds = Sets.newHashSet();
+        List<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
+        if (tablets != null) {
+            for (Tablet tablet : tablets) {
+                tabletIds.add(tablet.getId());
+            }
+        }
+        return tabletIds;
+    }
+
+    public Set<Long> getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) {
+        Set<Long> tabletIds = Sets.newHashSet();
+        tabletIds.addAll(getSnapshotTabletsInPrimaryByBeId(beId));
+        tabletIds.addAll(getSnapshotTabletsInSecondaryByBeId(beId));
+        return tabletIds;
+    }
+
     public int getTabletNumByBackendId(long beId) {
         List<Tablet> tablets = beToTabletsGlobal.get(beId);
         List<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
@@ -617,6 +639,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public void statRouteInfo() {
         ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobal = new 
ConcurrentHashMap<Long, List<Tablet>>();
+        ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobalInSecondary
+                = new ConcurrentHashMap<Long, List<Tablet>>();
         ConcurrentHashMap<Long, List<Tablet>> tmpBeToColocateTabletsGlobal
                 = new ConcurrentHashMap<Long, List<Tablet>>();
 
@@ -641,11 +665,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
                             continue;
                         }
                         if (allBes.contains(beId)) {
-                            List<Tablet> colocateTablets = 
tmpBeToColocateTabletsGlobal.get(beId);
-                            if (colocateTablets == null) {
-                                colocateTablets = new ArrayList<Tablet>();
-                                tmpBeToColocateTabletsGlobal.put(beId, 
colocateTablets);
-                            }
+                            List<Tablet> colocateTablets =
+                                    
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>());
                             colocateTablets.add(tablet);
                         }
                         continue;
@@ -657,6 +678,14 @@ public class CloudTabletRebalancer extends MasterDaemon {
                         continue;
                     }
 
+                    Backend secondaryBe = replica.getSecondaryBackend(cluster);
+                    long secondaryBeId = secondaryBe == null ? -1L : 
secondaryBe.getId();
+                    if (allBes.contains(secondaryBeId)) {
+                        List<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
+                                .computeIfAbsent(secondaryBeId, k -> new 
ArrayList<>());
+                        tablets.add(tablet);
+                    }
+
                     InfightTablet taskKey = new InfightTablet(tablet.getId(), 
cluster);
                     InfightTask task = tabletToInfightTask.get(taskKey);
                     long futureBeId = task == null ? beId : task.destBe;
@@ -670,6 +699,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         });
 
         beToTabletsGlobal = tmpBeToTabletsGlobal;
+        beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
         beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java
new file mode 100644
index 00000000000..6564bd7d3a5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.master;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.master.ReportHandler;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.DropReplicaTask;
+import org.apache.doris.thrift.TTablet;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class CloudReportHandler extends ReportHandler {
+    private static final Logger LOG = 
LogManager.getLogger(CloudReportHandler.class);
+
+    @Override
+    public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
+                             Map<Long, Long> backendPartitionsVersion, long 
backendReportVersion, long numTablets) {
+        long start = System.currentTimeMillis();
+        LOG.info("backend[{}] have {} tablet(s), {} need deal tablet(s). 
report version: {}",
+                backendId, numTablets, backendTablets.size(), 
backendReportVersion);
+        // current be useful
+        Set<Long> tabletIdsInFe = ((CloudEnv) 
Env.getCurrentEnv()).getCloudTabletRebalancer()
+                .getSnapshotTabletsInPrimaryAndSecondaryByBeId(backendId);
+
+        Set<Long> tabletIdsInBe = backendTablets.keySet();
+        // handle (be - meta)
+        Set<Long> tabletIdsNeedDrop = diffTablets(tabletIdsInFe, 
tabletIdsInBe);
+        // drop agent task
+        deleteFromBackend(backendId, tabletIdsNeedDrop);
+
+        Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+        LOG.info("finished to handle task report from backend {}-{}, "
+                + "diff task num: {}, cost: {} ms.",
+                backendId, be != null ? be.getHost() : "",
+                tabletIdsNeedDrop.size(),
+                (System.currentTimeMillis() - start));
+    }
+
+    // tabletIdsInFe, tablet is used in Primary or Secondary
+    // tabletIdsInBe, tablet report exceed time, need to check
+    // returns tabletIds need to drop
+    private Set<Long> diffTablets(Set<Long> tabletIdsInFe, Set<Long> 
tabletIdsInBe) {
+        // tabletsInBe - tabletsInFe
+        Set<Long> result = new HashSet<>(tabletIdsInBe);
+        result.removeAll(tabletIdsInFe);
+        return result;
+    }
+
+    private static void deleteFromBackend(long backendId, Set<Long> 
tabletIdsWillDrop) {
+        int deleteFromBackendCounter = 0;
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Long tabletId : tabletIdsWillDrop) {
+            DropReplicaTask task = new DropReplicaTask(backendId, tabletId, 
-1, -1, false);
+            batchTask.addTask(task);
+            LOG.info("delete tablet[{}] from backend[{}]", tabletId, 
backendId);
+            ++deleteFromBackendCounter;
+        }
+
+        if (batchTask.getTaskNum() != 0) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+
+        LOG.info("delete {} tablet(s) from backend[{}]", 
deleteFromBackendCounter, backendId);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 35b6a230b24..a4bbe763f60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.cloud.catalog.CloudTablet;
+import org.apache.doris.cloud.master.CloudReportHandler;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.load.DeleteJob;
@@ -76,7 +77,7 @@ import java.util.stream.Collectors;
 public class MasterImpl {
     private static final Logger LOG = LogManager.getLogger(MasterImpl.class);
 
-    private ReportHandler reportHandler = new ReportHandler();
+    private ReportHandler reportHandler =  Config.isCloudMode() ? new 
CloudReportHandler() : new ReportHandler();
 
     public MasterImpl() {
         reportHandler.start();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d773cb5850e..1c8f51bd4eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -162,6 +162,7 @@ public class ReportHandler extends Daemon {
         Map<Long, TTablet> tablets = null;
         Map<Long, Long> partitionsVersion = null;
         long reportVersion = -1;
+        long numTablets = 0;
 
         ReportType reportType = null;
 
@@ -188,6 +189,12 @@ public class ReportHandler extends Daemon {
             Env.getCurrentSystemInfo().updateBackendReportVersion(beId, 
reportVersion, -1L, -1L, false);
         }
 
+        if (tablets == null) {
+            numTablets = request.isSetNumTablets() ? request.getNumTablets() : 
0;
+        } else {
+            numTablets = request.isSetNumTablets() ? request.getNumTablets() : 
tablets.size();
+        }
+
         if (request.isSetPartitionsVersion()) {
             partitionsVersion = request.getPartitionsVersion();
         }
@@ -206,7 +213,7 @@ public class ReportHandler extends Daemon {
 
         ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, 
tablets, partitionsVersion,
                 reportVersion, request.getStoragePolicy(), 
request.getResource(), request.getNumCores(),
-                request.getPipelineExecutorSize());
+                request.getPipelineExecutorSize(), numTablets);
         try {
             putToQueue(reportTask);
         } catch (Exception e) {
@@ -294,12 +301,13 @@ public class ReportHandler extends Daemon {
         private List<TStorageResource> storageResources;
         private int cpuCores;
         private int pipelineExecutorSize;
+        private long numTablets;
 
         public ReportTask(long beId, ReportType reportType, Map<TTaskType, 
Set<Long>> tasks,
                 Map<String, TDisk> disks, Map<Long, TTablet> tablets,
                 Map<Long, Long> partitionsVersion, long reportVersion,
                 List<TStoragePolicy> storagePolicies, List<TStorageResource> 
storageResources, int cpuCores,
-                int pipelineExecutorSize) {
+                int pipelineExecutorSize, long numTablets) {
             this.beId = beId;
             this.reportType = reportType;
             this.tasks = tasks;
@@ -311,6 +319,7 @@ public class ReportHandler extends Daemon {
             this.storageResources = storageResources;
             this.cpuCores = cpuCores;
             this.pipelineExecutorSize = pipelineExecutorSize;
+            this.numTablets = numTablets;
         }
 
         @Override
@@ -336,7 +345,7 @@ public class ReportHandler extends Daemon {
                     if (partitions == null) {
                         partitions = Maps.newHashMap();
                     }
-                    ReportHandler.tabletReport(beId, tablets, partitions, 
reportVersion);
+                    tabletReport(beId, tablets, partitions, reportVersion, 
numTablets);
                 }
             }
         }
@@ -471,8 +480,8 @@ public class ReportHandler extends Daemon {
     }
 
     // public for fe ut
-    public static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets,
-            Map<Long, Long> backendPartitionsVersion, long 
backendReportVersion) {
+    public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
+            Map<Long, Long> backendPartitionsVersion, long 
backendReportVersion, long numTablets) {
         long start = System.currentTimeMillis();
         LOG.info("backend[{}] reports {} tablet(s). report version: {}",
                 backendId, backendTablets.size(), backendReportVersion);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 494232ef38c..f4c33d75cdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2584,7 +2584,7 @@ public class StmtExecutor {
                 List<Long> tabletIdList = new ArrayList<Long>();
                 Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
                                            .getCloudTabletRebalancer()
-                                           
.getSnapshotTabletsByBeId(backend.getId());
+                                           
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
                 allTabletIds.forEach(tabletId -> {
                     if (beTabletIds.contains(tabletId)) {
                         tabletIdList.add(tabletId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 89f55239f7f..fb6853e83c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -253,6 +253,7 @@ public class HeartbeatMgr extends MasterDaemon {
                 if (Config.isCloudMode()) {
                     String cloudUniqueId = 
backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
                     copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
+                    
copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds);
                 }
                 THeartbeatResult result;
                 if (!FeConstants.runningUnitTest) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
index 1ac497dbebe..423c839faa2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -105,8 +105,8 @@ public class RepairVersionTest extends TestWithFeService {
         Map<Long, TTablet> tablets = Maps.newHashMap();
         tablets.put(tablet.getId(), tTablet);
         Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
-
-        ReportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L);
+        ReportHandler reportHandler = new ReportHandler();
+        reportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L, tablets.size());
 
         Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
         Assertions.assertEquals(-1L, replica.getLastFailedVersion());
@@ -135,12 +135,12 @@ public class RepairVersionTest extends TestWithFeService {
         tTablet.addToTabletInfos(tTabletInfo);
         Map<Long, TTablet> tablets = Maps.newHashMap();
         tablets.put(tablet.getId(), tTablet);
-
-        ReportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L);
+        ReportHandler reportHandler = new ReportHandler();
+        reportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L, tablets.size());
         Assertions.assertEquals(-1L, replica.getLastFailedVersion());
 
         DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", 
new DebugPoint());
-        ReportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L);
+        reportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L, tablets.size());
         Assertions.assertEquals(replica.getVersion() + 1, 
replica.getLastFailedVersion());
 
         Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
diff --git a/gensrc/thrift/HeartbeatService.thrift 
b/gensrc/thrift/HeartbeatService.thrift
index c03f04a6543..acdc608f21b 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -41,6 +41,8 @@ struct TMasterInfo {
     9: optional list<TFrontendInfo> frontend_infos
     10: optional string meta_service_endpoint;
     11: optional string cloud_unique_id;
+    // See configuration item Config.java rehash_tablet_after_be_dead_seconds 
for meaning
+    12: optional i64 tablet_report_inactive_duration_ms;
 }
 
 struct TBackendInfo {
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index ecedf0ee1af..9d8cd9111ba 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -114,6 +114,8 @@ struct TReportRequest {
     11: i32 num_cores
     12: i32 pipeline_executor_size
     13: optional map<Types.TPartitionId, Types.TVersion> partitions_version
+    // tablet num in be, in cloud num_tablets may not eq tablet_list.size()
+    14: optional i64 num_tablets
 }
 
 struct TMasterResult {
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
new file mode 100644
index 00000000000..4dc847d603a
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+
+suite('test_clean_tablet_when_drop_force_table', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=5'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1'
+    ]
+    options.setFeNum(3)
+    options.setBeNum(3)
+    options.cloudMode = true
+    options.enableDebugPoints()
+    
+    def backendIdToHost = { ->
+        def spb = sql_return_maparray """SHOW BACKENDS"""
+        def beIdToHost = [:]
+        spb.each {
+            beIdToHost[it.BackendId] = it.Host
+        }
+        beIdToHost 
+    }
+
+    def getTabletAndBeHostFromFe = { table ->
+        def result = sql_return_maparray """SHOW TABLETS FROM $table"""
+        def bes = backendIdToHost.call()
+        // tablet : host
+        def ret = [:]
+        result.each {
+            ret[it.TabletId] = bes[it.BackendId]
+        }
+        ret
+    }
+
+    def getTabletAndBeHostFromBe = { ->
+        def bes = cluster.getAllBackends()
+        def ret = [:]
+        bes.each { be ->
+            // 
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
+            def data = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true).data
+            def tablets = data.tablets.collect { it.tablet_id as String }
+            tablets.each{
+                ret[it] = data.host
+            }
+        }
+        ret
+    }
+
+    def testCase = { table, waitTime, useDp=false-> 
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `k2` int(11) NULL
+            )
+            DUPLICATE KEY(`k1`, `k2`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (1, 1), (2, 2), (3, 3)
+        """
+
+        for (int i = 0; i < 5; i++) {
+            sql """
+                select * from $table
+            """
+        }
+
+        // before drop table force
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        def beforeGetFromBe = getTabletAndBeHostFromBe.call()
+        logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, 
beforeGetFromBe)
+        beforeGetFromFe.each {
+            assertTrue(beforeGetFromBe.containsKey(it.Key))
+            assertEquals(beforeGetFromBe[it.Key], it.Value)
+        }
+        if (useDp) {
+            
GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
+        }
+        // after drop table force
+
+        sql """
+            DROP TABLE $table FORCE
+        """
+        def futrue
+        if (useDp) {
+            futrue = thread {
+                sleep(10 * 1000)
+                
GetDebugPoint().disableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
+            }
+        }
+        def start = System.currentTimeMillis() / 1000
+        // tablet can't find in be 
+        dockerAwaitUntil(50) {
+            def beTablets = getTabletAndBeHostFromBe.call().keySet()
+            logger.info("before drop tablets {}, after tablets {}", 
beforeGetFromFe, beTablets)
+            beforeGetFromFe.keySet().every { 
!getTabletAndBeHostFromBe.call().containsKey(it) }
+        }
+        logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 
1000 - start)
+        assertTrue(System.currentTimeMillis() / 1000 - start > waitTime)
+        if (useDp) {
+            futrue.get()
+        }
+    }
+
+    docker(options) {
+        // because rehash_tablet_after_be_dead_seconds=5
+        testCase("test_clean_tablet_when_drop_force_table_1", 5)
+        // report retry
+        testCase("test_clean_tablet_when_drop_force_table_2", 10, true) 
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
new file mode 100644
index 00000000000..4a44b317cc2
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+
+suite('test_clean_tablet_when_rebalance', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    def rehashTime = 100
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime")
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1'
+    ]
+    options.setFeNum(3)
+    options.setBeNum(3)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def choseDeadBeIndex = 1
+    def table = "test_clean_tablet_when_rebalance"
+
+    def backendIdToHost = { ->
+        def spb = sql_return_maparray """SHOW BACKENDS"""
+        def beIdToHost = [:]
+        spb.each {
+            beIdToHost[it.BackendId] = it.Host
+        }
+        beIdToHost 
+    }
+
+    def getTabletAndBeHostFromFe = { ->
+        def result = sql_return_maparray """SHOW TABLETS FROM $table"""
+        def bes = backendIdToHost.call()
+        // tablet : host
+        def ret = [:]
+        result.each {
+            ret[it.TabletId] = bes[it.BackendId]
+        }
+        ret
+    }
+
+    def getTabletAndBeHostFromBe = { ->
+        def bes = cluster.getAllBackends()
+        def ret = [:]
+        bes.each { be ->
+            // 
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
+            def data = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true).data
+            def tablets = data.tablets.collect { it.tablet_id as String }
+            tablets.each{
+                ret[it] = data.host
+            }
+        }
+        ret
+    }
+
+    def testCase = { deadTime -> 
+        boolean beDeadLong = deadTime > rehashTime ? true : false
+        logger.info("begin exec beDeadLong {}", beDeadLong)
+
+        for (int i = 0; i < 5; i++) {
+            sql """
+                select * from $table
+            """
+        }
+
+        def beforeGetFromFe = getTabletAndBeHostFromFe()
+        def beforeGetFromBe = getTabletAndBeHostFromBe.call()
+        logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, 
beforeGetFromBe)
+        beforeGetFromFe.each {
+            assertTrue(beforeGetFromBe.containsKey(it.Key))
+            assertEquals(beforeGetFromBe[it.Key], it.Value)
+        }
+
+        cluster.stopBackends(choseDeadBeIndex)
+        dockerAwaitUntil(50) {
+            def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
+                    .collect { it.BackendId }
+                    .unique()
+            logger.info("bes {}", bes)
+            bes.size() == 2
+        }
+
+        if (beDeadLong) {
+            setFeConfig('enable_cloud_partition_balance', false)
+            setFeConfig('enable_cloud_table_balance', false)
+            setFeConfig('enable_cloud_global_balance', false)
+        }
+        sleep(deadTime * 1000)
+
+        cluster.startBackends(choseDeadBeIndex)
+
+        dockerAwaitUntil(50) {
+           def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
+                .collect { it.BackendId }
+                .unique()
+            logger.info("bes {}", bes)
+            bes.size() == (beDeadLong ? 2 : 3)
+        }
+        for (int i = 0; i < 5; i++) {
+            sql """
+                select * from $table
+            """
+            sleep(1000)
+        }
+        beforeGetFromFe = getTabletAndBeHostFromFe()
+        beforeGetFromBe = getTabletAndBeHostFromBe.call()
+        logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, 
beforeGetFromBe)
+        beforeGetFromFe.each {
+            assertTrue(beforeGetFromBe.containsKey(it.Key))
+            assertEquals(beforeGetFromBe[it.Key], it.Value)
+        }
+    }
+
+    docker(options) {
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `k2` int(11) NULL
+            )
+            DUPLICATE KEY(`k1`, `k2`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        sql """
+            insert into $table values (1, 1), (2, 2), (3, 3)
+        """
+        // 'rehash_tablet_after_be_dead_seconds=10'
+        // be-1 dead, but not dead for a long time
+        testCase(5)
+        // be-1 dead, and dead for a long time
+        testCase(200)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to