This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9d45269b041 [Feature](cloud) Add cloud report for clean up expired tablets (#42066) 9d45269b041 is described below commit 9d45269b041441d08d032a79a1504d67ecc8b103 Author: deardeng <565620...@qq.com> AuthorDate: Tue Nov 5 00:35:17 2024 +0800 [Feature](cloud) Add cloud report for clean up expired tablets (#42066) 1. add tablet report to fe in cloud 2. clean up expired tablet in be --- be/src/agent/agent_server.cpp | 15 +- be/src/agent/heartbeat_server.cpp | 6 + be/src/agent/task_worker_pool.cpp | 61 ++++++++ be/src/agent/task_worker_pool.h | 4 + be/src/cloud/cloud_tablet.cpp | 8 ++ be/src/cloud/cloud_tablet.h | 3 + be/src/cloud/cloud_tablet_mgr.cpp | 72 +++++++++- be/src/cloud/cloud_tablet_mgr.h | 16 +++ be/src/cloud/config.cpp | 1 + be/src/cloud/config.h | 2 + be/src/http/action/tablets_info_action.cpp | 20 +-- be/src/olap/base_tablet.h | 3 + be/src/olap/tablet.cpp | 4 - be/src/olap/tablet.h | 3 - be/src/service/http_service.cpp | 2 +- .../apache/doris/cloud/CacheHotspotManager.java | 4 +- .../doris/cloud/catalog/CloudTabletRebalancer.java | 42 +++++- .../doris/cloud/master/CloudReportHandler.java | 89 ++++++++++++ .../java/org/apache/doris/master/MasterImpl.java | 3 +- .../org/apache/doris/master/ReportHandler.java | 19 ++- .../java/org/apache/doris/qe/StmtExecutor.java | 2 +- .../java/org/apache/doris/system/HeartbeatMgr.java | 1 + .../org/apache/doris/clone/RepairVersionTest.java | 10 +- gensrc/thrift/HeartbeatService.thrift | 2 + gensrc/thrift/MasterService.thrift | 2 + .../test_clean_tablet_when_drop_force_table.groovy | 140 ++++++++++++++++++ .../test_clean_tablet_when_rebalance.groovy | 158 +++++++++++++++++++++ 27 files changed, 650 insertions(+), 42 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 9d36148b64f..361a8ab93a9 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -33,6 +33,7 @@ #include "agent/utils.h" #include "agent/workload_group_listener.h" #include "agent/workload_sched_policy_listener.h" +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -193,7 +194,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { "REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); })); _report_workers.push_back(std::make_unique<ReportWorker>( - "REPORT_OLAP_TABLE", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); })); + "REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); })); // clang-format on } @@ -211,6 +212,10 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_ "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count, [&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); }); + // cloud, drop tablet just clean clear_cache, so just one thread do it + _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>( + "DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); }); + _report_workers.push_back(std::make_unique<ReportWorker>( "REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); })); @@ -218,6 +223,14 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_ _report_workers.push_back(std::make_unique<ReportWorker>( "REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); })); + + if (config::enable_cloud_tablet_report) { + _report_workers.push_back(std::make_unique<ReportWorker>( + "REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds, + [&engine, &master_info = _master_info] { + report_tablet_callback(engine, master_info); + })); + } } // TODO(lingbin): each task in the batch may have it own status or FE must check and diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 146604aaab2..78002ed08fe 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -26,6 +26,7 @@ #include <ostream> #include <string> +#include "cloud/cloud_tablet_mgr.h" #include "cloud/config.h" #include "common/config.h" #include "common/status.h" @@ -275,6 +276,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st; } + if (master_info.__isset.tablet_report_inactive_duration_ms) { + doris::g_tablet_report_inactive_duration_ms = + master_info.tablet_report_inactive_duration_ms; + } + if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; _engine.notify_listeners(); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 5906511ce15..d9efe6dbedd 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -48,6 +48,8 @@ #include "cloud/cloud_delete_task.h" #include "cloud/cloud_engine_calc_delete_bitmap_task.h" #include "cloud/cloud_schema_change_job.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -116,6 +118,10 @@ bool register_task_info(const TTaskType::type task_type, int64_t signature) { // no need to report task of these types return true; } + if (task_type == TTaskType::type::DROP && config::is_cloud_mode()) { + // cloud no need to report drop task status + return true; + } if (signature == -1) { // No need to report task with unintialized signature return true; @@ -1134,6 +1140,46 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf } } +void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) { + // Random sleep 1~5 seconds before doing report. + // In order to avoid the problem that the FE receives many report requests at the same time + // and can not be processed. + if (config::report_random_wait) { + random_sleep(5); + } + + TReportRequest request; + request.__set_backend(BackendOptions::get_local_backend()); + request.__isset.tablets = true; + + increase_report_version(); + uint64_t report_version; + uint64_t total_num_tablets = 0; + for (int i = 0; i < 5; i++) { + request.tablets.clear(); + report_version = s_report_version; + engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets); + if (report_version == s_report_version) { + break; + } + } + + if (report_version < s_report_version) { + LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; + DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); + return; + } + + request.__set_report_version(report_version); + request.__set_num_tablets(total_num_tablets); + + bool succ = handle_report(request, master_info, "tablet"); + report_tablet_total << 1; + if (!succ) [[unlikely]] { + report_tablet_failed << 1; + } +} + void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { const auto& upload_request = req.upload_req; @@ -1610,6 +1656,21 @@ void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { remove_task_info(req.task_type, req.signature); } +void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { + const auto& drop_tablet_req = req.drop_tablet_req; + DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", { + LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed") + .tag("tablet_id", drop_tablet_req.tablet_id); + return; + }); + // 1. erase lru from tablet mgr + // TODO(dx) clean tablet file cache + // get tablet's info(such as cachekey, tablet id, rsid) + engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); + // 2. gen clean file cache task + return; +} + void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) { const auto& push_req = req.push_req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index f51d6c2a4c0..c50ac57ffe9 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -155,6 +155,8 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req); +void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req); + void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req); void push_callback(StorageEngine& engine, const TAgentTaskRequest& req); @@ -188,6 +190,8 @@ void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info); +void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo& master_info); + void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req); } // namespace doris diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 54ea450f204..b467703637c 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -897,4 +897,12 @@ Status CloudTablet::sync_meta() { return Status::OK(); } +void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) { + std::shared_lock rdlock(_meta_lock); + tablet_info->__set_total_version_count(_tablet_meta->version_count()); + tablet_info->__set_tablet_id(_tablet_meta->tablet_id()); + // Currently, this information will not be used by the cloud report, + // but it may be used in the future. +} + } // namespace doris diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index a79d25f7540..5f4785b62d2 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -196,10 +196,13 @@ public: int64_t last_base_compaction_success_time_ms = 0; int64_t last_cumu_compaction_success_time_ms = 0; int64_t last_cumu_no_suitable_version_ms = 0; + int64_t last_access_time_ms = 0; // Return merged extended schema TabletSchemaSPtr merged_tablet_schema() const override; + void build_tablet_report_info(TTabletInfo* tablet_info); + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index e5c31785c1e..7ecb72e62fd 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -28,6 +28,7 @@ #include "runtime/memory/cache_policy.h" namespace doris { +uint64_t g_tablet_report_inactive_duration_ms = 0; namespace { // port from @@ -142,6 +143,12 @@ CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) CloudTabletMgr::~CloudTabletMgr() = default; +void set_tablet_access_time_ms(CloudTablet* tablet) { + using namespace std::chrono; + int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + tablet->last_access_time_ms = now; +} + Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data) { // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` @@ -181,8 +188,11 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet), CachePriority::NORMAL); - auto ret = std::shared_ptr<CloudTablet>( - tablet.get(), [this, handle](...) { _cache->release(handle); }); + auto ret = + std::shared_ptr<CloudTablet>(tablet.get(), [this, handle](CloudTablet* tablet) { + set_tablet_access_time_ms(tablet); + _cache->release(handle); + }); _tablet_map->put(std::move(tablet)); return ret; }; @@ -191,12 +201,16 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i if (tablet == nullptr) { return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); } + set_tablet_access_time_ms(tablet.get()); return tablet; } CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get(); - auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, - [this, handle](...) { _cache->release(handle); }); + set_tablet_access_time_ms(tablet_raw_ptr); + auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, handle](CloudTablet* tablet) { + set_tablet_access_time_ms(tablet); + _cache->release(handle); + }); return tablet; } @@ -357,4 +371,54 @@ Status CloudTabletMgr::get_topn_tablets_to_compact( return Status::OK(); } +void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info, + uint64_t* tablet_num) { + DCHECK(tablets_info != nullptr); + VLOG_NOTICE << "begin to build all report cloud tablets info"; + + HistogramStat tablet_version_num_hist; + + auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) { + auto tablet = tablet_wk.lock(); + if (!tablet) return; + (*tablet_num)++; + TTabletInfo tablet_info; + tablet->build_tablet_report_info(&tablet_info); + using namespace std::chrono; + int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) { + // the tablet is still being accessed and used in recently, so not report it + return; + } + auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; + // On the cloud, a specific BE has only one tablet replica; + // there are no multiple replicas for a specific BE. + // This is only to reuse the non-cloud report protocol. + tablet_version_num_hist.add(tablet_info.total_version_count); + t_tablet.tablet_infos.emplace_back(std::move(tablet_info)); + }; + + auto weak_tablets = get_weak_tablets(); + std::for_each(weak_tablets.begin(), weak_tablets.end(), handler); + + DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( + tablet_version_num_hist); + LOG(INFO) << "success to build all cloud report tablets info. all_tablet_count=" << *tablet_num + << " exceed drop time limit count=" << tablets_info->size(); +} + +void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info) { + auto weak_tablets = get_weak_tablets(); + for (auto& weak_tablet : weak_tablets) { + auto tablet = weak_tablet.lock(); + if (tablet == nullptr) { + continue; + } + if (tablets_info->size() >= num_tablets) { + return; + } + tablets_info->push_back(tablet->get_tablet_info()); + } +} + } // namespace doris diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index 976d483b36c..903f372cbde 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -17,6 +17,9 @@ #pragma once +#include <gen_cpp/MasterService_types.h> +#include <gen_cpp/Types_types.h> + #include <functional> #include <memory> #include <vector> @@ -31,6 +34,8 @@ class CloudStorageEngine; class LRUCachePolicy; class CountDownLatch; +extern uint64_t g_tablet_report_inactive_duration_ms; + class CloudTabletMgr { public: CloudTabletMgr(CloudStorageEngine& engine); @@ -65,6 +70,17 @@ public: std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t* max_score); + /** + * Gets tablets info and total tablet num that are reported + * + * @param tablets_info used by report + * @param tablet_num tablets in be tabletMgr, total num + */ + void build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info, + uint64_t* tablet_num); + + void get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info); + private: CloudStorageEngine& _engine; diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index e724dbea84e..32e3250f87c 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -75,4 +75,5 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120"); DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true"); +DEFINE_mBool(enable_cloud_tablet_report, "true"); } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 86197f924d0..8af967afb8c 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -108,4 +108,6 @@ DECLARE_mInt32(tablet_txn_info_min_expired_seconds); DECLARE_mBool(enable_use_cloud_unique_id_from_fe); +DECLARE_Bool(enable_cloud_tablet_report); + } // namespace doris::config diff --git a/be/src/http/action/tablets_info_action.cpp b/be/src/http/action/tablets_info_action.cpp index 9c27c1de9a0..672b03ce6ce 100644 --- a/be/src/http/action/tablets_info_action.cpp +++ b/be/src/http/action/tablets_info_action.cpp @@ -24,6 +24,8 @@ #include <string> #include <vector> +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" #include "cloud/config.h" #include "http/http_channel.h" #include "http/http_headers.h" @@ -51,12 +53,6 @@ void TabletsInfoAction::handle(HttpRequest* req) { EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) { EasyJson tablets_info_ej; - if (config::is_cloud_mode()) { - // TODO(plat1ko): CloudStorageEngine - tablets_info_ej["msg"] = "TabletsInfoAction::get_tablets_info is not implemented"; - tablets_info_ej["code"] = 0; - return tablets_info_ej; - } int64_t number; std::string msg; @@ -74,9 +70,15 @@ EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) { msg = "Parameter Error"; } std::vector<TabletInfo> tablets_info; - TabletManager* tablet_manager = - ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager(); - tablet_manager->obtain_specific_quantity_tablets(tablets_info, number); + if (!config::is_cloud_mode()) { + TabletManager* tablet_manager = + ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager(); + tablet_manager->obtain_specific_quantity_tablets(tablets_info, number); + } else { + CloudTabletMgr& cloud_tablet_manager = + ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr(); + cloud_tablet_manager.get_tablet_info(number, &tablets_info); + } tablets_info_ej["msg"] = msg; tablets_info_ej["code"] = 0; diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 11eae0e47f9..b5da0e3bf06 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -293,6 +293,9 @@ public: Status show_nested_index_file(std::string* json_meta); + TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); } + TabletInfo get_tablet_info() const { return TabletInfo(tablet_id(), tablet_uid()); } + protected: // Find the missed versions until the spec_version. // diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 7c69ba54831..b1d4c9dfb89 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1205,10 +1205,6 @@ Status Tablet::_contains_version(const Version& version) { return Status::OK(); } -TabletInfo Tablet::get_tablet_info() const { - return TabletInfo(tablet_id(), tablet_uid()); -} - std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compaction() { std::vector<RowsetSharedPtr> candidate_rowsets; if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e181af3d4d3..f5866c67641 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -115,7 +115,6 @@ public: DataDir* data_dir() const { return _data_dir; } int64_t replica_id() const { return _tablet_meta->replica_id(); } - TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); } const std::string& tablet_path() const { return _tablet_path; } @@ -279,8 +278,6 @@ public: void check_tablet_path_exists(); - TabletInfo get_tablet_info() const; - std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction(); diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 7704d07b6f9..e7b920796a1 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -387,7 +387,7 @@ void HttpService::register_local_handler(StorageEngine& engine) { _ev_http_server->register_handler(HttpMethod::POST, "/api/pad_rowset", pad_rowset_action); ReportAction* report_tablet_action = _pool.add(new ReportAction( - _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_OLAP_TABLE")); + _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_OLAP_TABLET")); _ev_http_server->register_handler(HttpMethod::GET, "/api/report/tablet", report_tablet_action); ReportAction* report_disk_action = _pool.add(new ReportAction( diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index b35a3b9e911..0b83baa94d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -429,7 +429,7 @@ public class CacheHotspotManager extends MasterDaemon { for (Backend backend : backends) { Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv()) .getCloudTabletRebalancer() - .getSnapshotTabletsByBeId(backend.getId()); + .getSnapshotTabletsInPrimaryByBeId(backend.getId()); List<Tablet> warmUpTablets = new ArrayList<>(); for (Tablet tablet : tablets) { if (beTabletIds.contains(tablet.getId())) { @@ -559,7 +559,7 @@ public class CacheHotspotManager extends MasterDaemon { for (Backend backend : backends) { Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv()) .getCloudTabletRebalancer() - .getSnapshotTabletsByBeId(backend.getId()); + .getSnapshotTabletsInPrimaryByBeId(backend.getId()); List<Tablet> warmUpTablets = new ArrayList<>(); for (Tablet tablet : tablets) { if (beTabletIds.contains(tablet.getId())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 78947afdb11..8e5033470b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -73,6 +73,10 @@ public class CloudTabletRebalancer extends MasterDaemon { private volatile ConcurrentHashMap<Long, List<Tablet>> beToColocateTabletsGlobal = new ConcurrentHashMap<Long, List<Tablet>>(); + // used for cloud tablet report + private volatile ConcurrentHashMap<Long, List<Tablet>> beToTabletsGlobalInSecondary = + new ConcurrentHashMap<Long, List<Tablet>>(); + private Map<Long, List<Tablet>> futureBeToTabletsGlobal; private Map<String, List<Long>> clusterToBes; @@ -164,7 +168,7 @@ public class CloudTabletRebalancer extends MasterDaemon { public boolean srcDecommissioned; } - public Set<Long> getSnapshotTabletsByBeId(Long beId) { + public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) { Set<Long> tabletIds = Sets.newHashSet(); List<Tablet> tablets = beToTabletsGlobal.get(beId); if (tablets != null) { @@ -183,6 +187,24 @@ public class CloudTabletRebalancer extends MasterDaemon { return tabletIds; } + public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) { + Set<Long> tabletIds = Sets.newHashSet(); + List<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId); + if (tablets != null) { + for (Tablet tablet : tablets) { + tabletIds.add(tablet.getId()); + } + } + return tabletIds; + } + + public Set<Long> getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) { + Set<Long> tabletIds = Sets.newHashSet(); + tabletIds.addAll(getSnapshotTabletsInPrimaryByBeId(beId)); + tabletIds.addAll(getSnapshotTabletsInSecondaryByBeId(beId)); + return tabletIds; + } + public int getTabletNumByBackendId(long beId) { List<Tablet> tablets = beToTabletsGlobal.get(beId); List<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId); @@ -617,6 +639,8 @@ public class CloudTabletRebalancer extends MasterDaemon { public void statRouteInfo() { ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobal = new ConcurrentHashMap<Long, List<Tablet>>(); + ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobalInSecondary + = new ConcurrentHashMap<Long, List<Tablet>>(); ConcurrentHashMap<Long, List<Tablet>> tmpBeToColocateTabletsGlobal = new ConcurrentHashMap<Long, List<Tablet>>(); @@ -641,11 +665,8 @@ public class CloudTabletRebalancer extends MasterDaemon { continue; } if (allBes.contains(beId)) { - List<Tablet> colocateTablets = tmpBeToColocateTabletsGlobal.get(beId); - if (colocateTablets == null) { - colocateTablets = new ArrayList<Tablet>(); - tmpBeToColocateTabletsGlobal.put(beId, colocateTablets); - } + List<Tablet> colocateTablets = + tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>()); colocateTablets.add(tablet); } continue; @@ -657,6 +678,14 @@ public class CloudTabletRebalancer extends MasterDaemon { continue; } + Backend secondaryBe = replica.getSecondaryBackend(cluster); + long secondaryBeId = secondaryBe == null ? -1L : secondaryBe.getId(); + if (allBes.contains(secondaryBeId)) { + List<Tablet> tablets = tmpBeToTabletsGlobalInSecondary + .computeIfAbsent(secondaryBeId, k -> new ArrayList<>()); + tablets.add(tablet); + } + InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster); InfightTask task = tabletToInfightTask.get(taskKey); long futureBeId = task == null ? beId : task.destBe; @@ -670,6 +699,7 @@ public class CloudTabletRebalancer extends MasterDaemon { }); beToTabletsGlobal = tmpBeToTabletsGlobal; + beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary; beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java new file mode 100644 index 00000000000..6564bd7d3a5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.master; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.master.ReportHandler; +import org.apache.doris.system.Backend; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.DropReplicaTask; +import org.apache.doris.thrift.TTablet; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class CloudReportHandler extends ReportHandler { + private static final Logger LOG = LogManager.getLogger(CloudReportHandler.class); + + @Override + public void tabletReport(long backendId, Map<Long, TTablet> backendTablets, + Map<Long, Long> backendPartitionsVersion, long backendReportVersion, long numTablets) { + long start = System.currentTimeMillis(); + LOG.info("backend[{}] have {} tablet(s), {} need deal tablet(s). report version: {}", + backendId, numTablets, backendTablets.size(), backendReportVersion); + // current be useful + Set<Long> tabletIdsInFe = ((CloudEnv) Env.getCurrentEnv()).getCloudTabletRebalancer() + .getSnapshotTabletsInPrimaryAndSecondaryByBeId(backendId); + + Set<Long> tabletIdsInBe = backendTablets.keySet(); + // handle (be - meta) + Set<Long> tabletIdsNeedDrop = diffTablets(tabletIdsInFe, tabletIdsInBe); + // drop agent task + deleteFromBackend(backendId, tabletIdsNeedDrop); + + Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + LOG.info("finished to handle task report from backend {}-{}, " + + "diff task num: {}, cost: {} ms.", + backendId, be != null ? be.getHost() : "", + tabletIdsNeedDrop.size(), + (System.currentTimeMillis() - start)); + } + + // tabletIdsInFe, tablet is used in Primary or Secondary + // tabletIdsInBe, tablet report exceed time, need to check + // returns tabletIds need to drop + private Set<Long> diffTablets(Set<Long> tabletIdsInFe, Set<Long> tabletIdsInBe) { + // tabletsInBe - tabletsInFe + Set<Long> result = new HashSet<>(tabletIdsInBe); + result.removeAll(tabletIdsInFe); + return result; + } + + private static void deleteFromBackend(long backendId, Set<Long> tabletIdsWillDrop) { + int deleteFromBackendCounter = 0; + AgentBatchTask batchTask = new AgentBatchTask(); + for (Long tabletId : tabletIdsWillDrop) { + DropReplicaTask task = new DropReplicaTask(backendId, tabletId, -1, -1, false); + batchTask.addTask(task); + LOG.info("delete tablet[{}] from backend[{}]", tabletId, backendId); + ++deleteFromBackendCounter; + } + + if (batchTask.getTaskNum() != 0) { + AgentTaskExecutor.submit(batchTask); + } + + LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 35b6a230b24..a4bbe763f60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudTablet; +import org.apache.doris.cloud.master.CloudReportHandler; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.load.DeleteJob; @@ -76,7 +77,7 @@ import java.util.stream.Collectors; public class MasterImpl { private static final Logger LOG = LogManager.getLogger(MasterImpl.class); - private ReportHandler reportHandler = new ReportHandler(); + private ReportHandler reportHandler = Config.isCloudMode() ? new CloudReportHandler() : new ReportHandler(); public MasterImpl() { reportHandler.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index d773cb5850e..1c8f51bd4eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -162,6 +162,7 @@ public class ReportHandler extends Daemon { Map<Long, TTablet> tablets = null; Map<Long, Long> partitionsVersion = null; long reportVersion = -1; + long numTablets = 0; ReportType reportType = null; @@ -188,6 +189,12 @@ public class ReportHandler extends Daemon { Env.getCurrentSystemInfo().updateBackendReportVersion(beId, reportVersion, -1L, -1L, false); } + if (tablets == null) { + numTablets = request.isSetNumTablets() ? request.getNumTablets() : 0; + } else { + numTablets = request.isSetNumTablets() ? request.getNumTablets() : tablets.size(); + } + if (request.isSetPartitionsVersion()) { partitionsVersion = request.getPartitionsVersion(); } @@ -206,7 +213,7 @@ public class ReportHandler extends Daemon { ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, tablets, partitionsVersion, reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(), - request.getPipelineExecutorSize()); + request.getPipelineExecutorSize(), numTablets); try { putToQueue(reportTask); } catch (Exception e) { @@ -294,12 +301,13 @@ public class ReportHandler extends Daemon { private List<TStorageResource> storageResources; private int cpuCores; private int pipelineExecutorSize; + private long numTablets; public ReportTask(long beId, ReportType reportType, Map<TTaskType, Set<Long>> tasks, Map<String, TDisk> disks, Map<Long, TTablet> tablets, Map<Long, Long> partitionsVersion, long reportVersion, List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources, int cpuCores, - int pipelineExecutorSize) { + int pipelineExecutorSize, long numTablets) { this.beId = beId; this.reportType = reportType; this.tasks = tasks; @@ -311,6 +319,7 @@ public class ReportHandler extends Daemon { this.storageResources = storageResources; this.cpuCores = cpuCores; this.pipelineExecutorSize = pipelineExecutorSize; + this.numTablets = numTablets; } @Override @@ -336,7 +345,7 @@ public class ReportHandler extends Daemon { if (partitions == null) { partitions = Maps.newHashMap(); } - ReportHandler.tabletReport(beId, tablets, partitions, reportVersion); + tabletReport(beId, tablets, partitions, reportVersion, numTablets); } } } @@ -471,8 +480,8 @@ public class ReportHandler extends Daemon { } // public for fe ut - public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, - Map<Long, Long> backendPartitionsVersion, long backendReportVersion) { + public void tabletReport(long backendId, Map<Long, TTablet> backendTablets, + Map<Long, Long> backendPartitionsVersion, long backendReportVersion, long numTablets) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 494232ef38c..f4c33d75cdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2584,7 +2584,7 @@ public class StmtExecutor { List<Long> tabletIdList = new ArrayList<Long>(); Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv()) .getCloudTabletRebalancer() - .getSnapshotTabletsByBeId(backend.getId()); + .getSnapshotTabletsInPrimaryByBeId(backend.getId()); allTabletIds.forEach(tabletId -> { if (beTabletIds.contains(tabletId)) { tabletIdList.add(tabletId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 89f55239f7f..fb6853e83c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -253,6 +253,7 @@ public class HeartbeatMgr extends MasterDaemon { if (Config.isCloudMode()) { String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); copiedMasterInfo.setCloudUniqueId(cloudUniqueId); + copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds); } THeartbeatResult result; if (!FeConstants.runningUnitTest) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java index 1ac497dbebe..423c839faa2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -105,8 +105,8 @@ public class RepairVersionTest extends TestWithFeService { Map<Long, TTablet> tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); + ReportHandler reportHandler = new ReportHandler(); + reportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L, tablets.size()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); @@ -135,12 +135,12 @@ public class RepairVersionTest extends TestWithFeService { tTablet.addToTabletInfos(tTabletInfo); Map<Long, TTablet> tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); + ReportHandler reportHandler = new ReportHandler(); + reportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L, tablets.size()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); + reportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L, tablets.size()); Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index c03f04a6543..acdc608f21b 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -41,6 +41,8 @@ struct TMasterInfo { 9: optional list<TFrontendInfo> frontend_infos 10: optional string meta_service_endpoint; 11: optional string cloud_unique_id; + // See configuration item Config.java rehash_tablet_after_be_dead_seconds for meaning + 12: optional i64 tablet_report_inactive_duration_ms; } struct TBackendInfo { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index ecedf0ee1af..9d8cd9111ba 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -114,6 +114,8 @@ struct TReportRequest { 11: i32 num_cores 12: i32 pipeline_executor_size 13: optional map<Types.TPartitionId, Types.TVersion> partitions_version + // tablet num in be, in cloud num_tablets may not eq tablet_list.size() + 14: optional i64 num_tablets } struct TMasterResult { diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy new file mode 100644 index 00000000000..4dc847d603a --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_tablet_when_drop_force_table', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=5' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1' + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + options.enableDebugPoints() + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromFe = { table -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : host + def ret = [:] + result.each { + ret[it.TabletId] = bes[it.BackendId] + } + ret + } + + def getTabletAndBeHostFromBe = { -> + def bes = cluster.getAllBackends() + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def testCase = { table, waitTime, useDp=false-> + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (1, 1), (2, 2), (3, 3) + """ + + for (int i = 0; i < 5; i++) { + sql """ + select * from $table + """ + } + + // before drop table force + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe.call() + logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + beforeGetFromFe.each { + assertTrue(beforeGetFromBe.containsKey(it.Key)) + assertEquals(beforeGetFromBe[it.Key], it.Value) + } + if (useDp) { + GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") + } + // after drop table force + + sql """ + DROP TABLE $table FORCE + """ + def futrue + if (useDp) { + futrue = thread { + sleep(10 * 1000) + GetDebugPoint().disableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") + } + } + def start = System.currentTimeMillis() / 1000 + // tablet can't find in be + dockerAwaitUntil(50) { + def beTablets = getTabletAndBeHostFromBe.call().keySet() + logger.info("before drop tablets {}, after tablets {}", beforeGetFromFe, beTablets) + beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe.call().containsKey(it) } + } + logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 1000 - start) + assertTrue(System.currentTimeMillis() / 1000 - start > waitTime) + if (useDp) { + futrue.get() + } + } + + docker(options) { + // because rehash_tablet_after_be_dead_seconds=5 + testCase("test_clean_tablet_when_drop_force_table_1", 5) + // report retry + testCase("test_clean_tablet_when_drop_force_table_2", 10, true) + } +} diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy new file mode 100644 index 00000000000..4a44b317cc2 --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_tablet_when_rebalance', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + def rehashTime = 100 + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime") + options.beConfigs += [ + 'report_tablet_interval_seconds=1' + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + options.enableDebugPoints() + + def choseDeadBeIndex = 1 + def table = "test_clean_tablet_when_rebalance" + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromFe = { -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : host + def ret = [:] + result.each { + ret[it.TabletId] = bes[it.BackendId] + } + ret + } + + def getTabletAndBeHostFromBe = { -> + def bes = cluster.getAllBackends() + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def testCase = { deadTime -> + boolean beDeadLong = deadTime > rehashTime ? true : false + logger.info("begin exec beDeadLong {}", beDeadLong) + + for (int i = 0; i < 5; i++) { + sql """ + select * from $table + """ + } + + def beforeGetFromFe = getTabletAndBeHostFromFe() + def beforeGetFromBe = getTabletAndBeHostFromBe.call() + logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + beforeGetFromFe.each { + assertTrue(beforeGetFromBe.containsKey(it.Key)) + assertEquals(beforeGetFromBe[it.Key], it.Value) + } + + cluster.stopBackends(choseDeadBeIndex) + dockerAwaitUntil(50) { + def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") + .collect { it.BackendId } + .unique() + logger.info("bes {}", bes) + bes.size() == 2 + } + + if (beDeadLong) { + setFeConfig('enable_cloud_partition_balance', false) + setFeConfig('enable_cloud_table_balance', false) + setFeConfig('enable_cloud_global_balance', false) + } + sleep(deadTime * 1000) + + cluster.startBackends(choseDeadBeIndex) + + dockerAwaitUntil(50) { + def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") + .collect { it.BackendId } + .unique() + logger.info("bes {}", bes) + bes.size() == (beDeadLong ? 2 : 3) + } + for (int i = 0; i < 5; i++) { + sql """ + select * from $table + """ + sleep(1000) + } + beforeGetFromFe = getTabletAndBeHostFromFe() + beforeGetFromBe = getTabletAndBeHostFromBe.call() + logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + beforeGetFromFe.each { + assertTrue(beforeGetFromBe.containsKey(it.Key)) + assertEquals(beforeGetFromBe[it.Key], it.Value) + } + } + + docker(options) { + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (1, 1), (2, 2), (3, 3) + """ + // 'rehash_tablet_after_be_dead_seconds=10' + // be-1 dead, but not dead for a long time + testCase(5) + // be-1 dead, and dead for a long time + testCase(200) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org