gavinchou commented on code in PR #27555: URL: https://github.com/apache/doris/pull/27555#discussion_r1413150384
########## be/src/agent/task_worker_pool.cpp: ########## @@ -324,1709 +145,1552 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) .error(result.status); try_time += 1; } - sleep(config::sleep_one_second); + sleep(1); } } -void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } +Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id, + const TSchemaHash schema_hash, TTabletInfo* tablet_info) { + tablet_info->__set_tablet_id(tablet_id); + tablet_info->__set_schema_hash(schema_hash); + return engine.tablet_manager()->report_tablet_info(tablet_info); +} - agent_task_req = _tasks.front(); - _tasks.pop_front(); - } +void random_sleep(int second) { + Random rnd(UnixMillis()); + sleep(rnd.Uniform(second) + 1); +} - auto& alter_inverted_index_rq = agent_task_req.alter_inverted_index_req; - LOG(INFO) << "get alter inverted index task. signature=" << agent_task_req.signature - << ", tablet_id=" << alter_inverted_index_rq.tablet_id - << ", job_id=" << alter_inverted_index_rq.job_id; +void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature, + const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) { + Status status; - Status status = Status::OK(); - TabletSharedPtr tablet_ptr = StorageEngine::instance()->tablet_manager()->get_tablet( - alter_inverted_index_rq.tablet_id); - if (tablet_ptr != nullptr) { - EngineIndexChangeTask engine_task(alter_inverted_index_rq); - status = StorageEngine::instance()->execute_task(&engine_task); - } else { - status = - Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); - } + std::string_view process_name; + switch (task_type) { + case TTaskType::ALTER: + process_name = "alter tablet"; + break; + default: + std::string task_name; + EnumToString(TTaskType, task_type, task_name); + LOG(WARNING) << "schema change type invalid. type: " << task_name + << ", signature: " << signature; + status = Status::NotSupported("Schema change type invalid"); + break; + } - // Return result to fe - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - std::vector<TTabletInfo> finish_tablet_infos; - if (!status.ok()) { - LOG(WARNING) << "failed to alter inverted index task, signature=" - << agent_task_req.signature - << ", tablet_id=" << alter_inverted_index_rq.tablet_id - << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; - } else { - LOG(INFO) << "successfully alter inverted index task, signature=" - << agent_task_req.signature - << ", tablet_id=" << alter_inverted_index_rq.tablet_id - << ", job_id=" << alter_inverted_index_rq.job_id; - TTabletInfo tablet_info; - status = _get_tablet_info(alter_inverted_index_rq.tablet_id, - alter_inverted_index_rq.schema_hash, agent_task_req.signature, - &tablet_info); - if (status.ok()) { - finish_tablet_infos.push_back(tablet_info); - } - finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); - } - finish_task_request.__set_task_status(status.to_thrift()); - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + // Check last schema change status, if failed delete tablet file + // Do not need to adjust delete success or not + // Because if delete failed create rollup will failed + TTabletId new_tablet_id = 0; + TSchemaHash new_schema_hash = 0; + if (status.ok()) { + new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; + new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash; + EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2); + status = engine_task.execute(); } -} -void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - agent_task_req = _tasks.front(); - _tasks.pop_front(); - } - LOG(INFO) << "get update tablet meta task. signature=" << agent_task_req.signature; - - Status status; - auto& update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; - for (auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { - TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - tablet_meta_info.tablet_id); - if (tablet == nullptr) { - status = Status::NotFound("tablet not found"); - LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id=" - << tablet_meta_info.tablet_id; - continue; - } - bool need_to_save = false; - if (tablet_meta_info.__isset.storage_policy_id) { - tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id); - need_to_save = true; - } - if (tablet_meta_info.__isset.is_in_memory) { - tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( - tablet_meta_info.is_in_memory); - std::shared_lock rlock(tablet->get_header_lock()); - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory); - } - tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); - need_to_save = true; - } - if (tablet_meta_info.__isset.compaction_policy) { - if (tablet_meta_info.compaction_policy != "size_based" && - tablet_meta_info.compaction_policy != "time_series") { - status = Status::InvalidArgument( - "invalid compaction policy, only support for size_based or " - "time_series"); - continue; - } - tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy); - need_to_save = true; - } - if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { - status = Status::InvalidArgument( - "only time series compaction policy support time series config"); - continue; - } - tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes( - tablet_meta_info.time_series_compaction_goal_size_mbytes); - need_to_save = true; - } - if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { - status = Status::InvalidArgument( - "only time series compaction policy support time series config"); - continue; - } - tablet->tablet_meta()->set_time_series_compaction_file_count_threshold( - tablet_meta_info.time_series_compaction_file_count_threshold); - need_to_save = true; - } - if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { - status = Status::InvalidArgument( - "only time series compaction policy support time series config"); - continue; - } - tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds( - tablet_meta_info.time_series_compaction_time_threshold_seconds); - need_to_save = true; - } - if (tablet_meta_info.__isset.replica_id) { - tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); - } - if (tablet_meta_info.__isset.binlog_config) { - // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums - auto& t_binlog_config = tablet_meta_info.binlog_config; - if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds || - !t_binlog_config.__isset.max_bytes || - !t_binlog_config.__isset.max_history_nums) { - status = Status::InvalidArgument("invalid binlog config, some fields not set"); - LOG(WARNING) << fmt::format( - "invalid binlog config, some fields not set, tablet_id={}, " - "t_binlog_config={}", - tablet_meta_info.tablet_id, - apache::thrift::ThriftDebugString(t_binlog_config)); - continue; - } + if (status.ok()) { + s_report_version.fetch_add(1, std::memory_order_relaxed); + } - BinlogConfig new_binlog_config; - new_binlog_config = tablet_meta_info.binlog_config; - LOG(INFO) << fmt::format( - "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, " - "new_binlog_config={}", - tablet_meta_info.tablet_id, - tablet->tablet_meta()->binlog_config().to_string(), - new_binlog_config.to_string()); - tablet->set_binlog_config(new_binlog_config); - need_to_save = true; - } - if (tablet_meta_info.__isset.enable_single_replica_compaction) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->tablet_meta() - ->mutable_tablet_schema() - ->set_enable_single_replica_compaction( - tablet_meta_info.enable_single_replica_compaction); - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_enable_single_replica_compaction( - tablet_meta_info.enable_single_replica_compaction); - } - tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction( - tablet_meta_info.enable_single_replica_compaction); - need_to_save = true; - } + // Return result to fe + finish_task_request->__set_backend(BackendOptions::get_local_backend()); + finish_task_request->__set_report_version(s_report_version); + finish_task_request->__set_task_type(task_type); + finish_task_request->__set_signature(signature); - if (tablet_meta_info.__isset.skip_write_index_on_load) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load( - tablet_meta_info.skip_write_index_on_load); - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_skip_write_index_on_load( - tablet_meta_info.skip_write_index_on_load); - } - tablet->tablet_schema_unlocked()->set_skip_write_index_on_load( - tablet_meta_info.skip_write_index_on_load); - need_to_save = true; - } - if (need_to_save) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->save_meta(); - } + std::vector<TTabletInfo> finish_tablet_infos; + if (status.ok()) { + TTabletInfo tablet_info; + status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info); + if (status.ok()) { + finish_tablet_infos.push_back(tablet_info); } + } - LOG(INFO) << "finish update tablet meta task. signature=" << agent_task_req.signature; - if (agent_task_req.signature != -1) { - TFinishTaskRequest finish_task_request; - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } + if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) { + LOG_WARNING("failed to {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id) + .error(status); + } else { + finish_task_request->__set_finish_tablet_infos(finish_tablet_infos); + LOG_INFO("successfully {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id); } + finish_task_request->__set_task_status(status.to_thrift()); } -void TaskWorkerPool::_check_consistency_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - TCheckConsistencyReq check_consistency_req; - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } +Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req, + TabletSharedPtr& tablet, DataDir** dest_store) { + int64_t tablet_id = req.tablet_id; + tablet = engine.tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + return Status::InternalError("could not find tablet {}", tablet_id); + } - agent_task_req = _tasks.front(); - check_consistency_req = agent_task_req.check_consistency_req; - _tasks.pop_front(); + if (req.__isset.data_dir) { + // request specify the data dir + *dest_store = engine.get_store(req.data_dir); + if (*dest_store == nullptr) { + return Status::InternalError("could not find data dir {}", req.data_dir); } + } else { + // this is a storage medium + // get data dir by storage medium - uint32_t checksum = 0; - EngineChecksumTask engine_task(check_consistency_req.tablet_id, - check_consistency_req.schema_hash, - check_consistency_req.version, &checksum); - Status status = StorageEngine::instance()->execute_task(&engine_task); - if (!status.ok()) { - LOG_WARNING("failed to check consistency") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", check_consistency_req.tablet_id) - .error(status); - } else { - LOG_INFO("successfully check consistency") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", check_consistency_req.tablet_id) - .tag("checksum", checksum); + // judge case when no need to migrate + uint32_t count = engine.available_storage_medium_type_count(); + if (count <= 1) { + return Status::InternalError("available storage medium type count is less than 1"); } - - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum)); - finish_task_request.__set_request_version(check_consistency_req.version); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } -} - -void TaskWorkerPool::_report_task_worker_thread_callback() { - StorageEngine::instance()->register_report_listener(this); - TReportRequest request; - while (_is_work) { - _is_doing_work = false; - { - // wait at most report_task_interval_seconds, or being notified - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait_for( - worker_thread_lock, std::chrono::seconds(config::report_task_interval_seconds)); + // check current tablet storage medium + TStorageMedium::type storage_medium = req.storage_medium; + TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium(); + if (src_storage_medium == storage_medium) { + return Status::InternalError("tablet is already on specified storage medium {}", + storage_medium); } - if (!_is_work) { - break; + // get a random store of specified storage medium + auto stores = engine.get_stores_for_create_tablet(storage_medium); + if (stores.empty()) { + return Status::InternalError("failed to get root path for create tablet"); } - if (_master_info.network_address.port == 0) { - // port == 0 means not received heartbeat yet - // sleep a short time and try again - LOG(INFO) - << "waiting to receive first heartbeat from frontend before doing task report"; - continue; - } + *dest_store = stores[0]; + } + if (tablet->data_dir()->path() == (*dest_store)->path()) { + LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path()); + return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}", + tablet->data_dir()->path()); + } - _is_doing_work = true; - // See _random_sleep() comment in _report_disk_state_worker_thread_callback - _random_sleep(5); - { - std::lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock); - request.__set_tasks(_s_task_signatures); - request.__set_backend(BackendOptions::get_local_backend()); - } - _handle_report(request, ReportType::TASK); + // check local disk capacity + int64_t tablet_size = tablet->tablet_local_size(); + if ((*dest_store)->reach_capacity_limit(tablet_size)) { + return Status::InternalError("reach the capacity limit of path {}, tablet_size={}", + (*dest_store)->path(), tablet_size); } - StorageEngine::instance()->deregister_report_listener(this); + return Status::OK(); } -/// disk state report thread will report disk state at a configurable fix interval. -void TaskWorkerPool::_report_disk_state_worker_thread_callback() { - StorageEngine::instance()->register_report_listener(this); - - while (_is_work) { - _is_doing_work = false; - { - // wait at most report_disk_state_interval_seconds, or being notified - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait_for( - worker_thread_lock, - std::chrono::seconds(config::report_disk_state_interval_seconds)); - } - if (!_is_work) { - break; - } - - if (_master_info.network_address.port == 0) { - // port == 0 means not received heartbeat yet - LOG(INFO) - << "waiting to receive first heartbeat from frontend before doing disk report"; - continue; - } +// Return `true` if report success +bool handle_report(const TReportRequest& request, const TMasterInfo& master_info, + std::string_view name) { + TMasterResult result; + Status status = MasterServerClient::instance()->report(request, &result); + if (!status.ok()) [[unlikely]] { + LOG_WARNING("failed to report {}", name) + .tag("host", master_info.network_address.hostname) + .tag("port", master_info.network_address.port) + .error(status); + return false; + } - _is_doing_work = true; - // Random sleep 1~5 seconds before doing report. - // In order to avoid the problem that the FE receives many report requests at the same time - // and can not be processed. - _random_sleep(5); - - TReportRequest request; - request.__set_backend(BackendOptions::get_local_backend()); - request.__isset.disks = true; - - std::vector<DataDirInfo> data_dir_infos; - static_cast<void>(StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, - true /* update */)); - - for (auto& root_path_info : data_dir_infos) { - TDisk disk; - disk.__set_root_path(root_path_info.path); - disk.__set_path_hash(root_path_info.path_hash); - disk.__set_storage_medium(root_path_info.storage_medium); - disk.__set_disk_total_capacity(root_path_info.disk_capacity); - disk.__set_data_used_capacity(root_path_info.local_used_capacity); - disk.__set_remote_used_capacity(root_path_info.remote_used_capacity); - disk.__set_disk_available_capacity(root_path_info.available); - disk.__set_trash_used_capacity(root_path_info.trash_used_capacity); - disk.__set_used(root_path_info.is_used); - request.disks[root_path_info.path] = disk; - } - request.__set_num_cores(CpuInfo::num_cores()); - request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 - ? config::pipeline_executor_size - : CpuInfo::num_cores()); - _handle_report(request, ReportType::DISK); + else if (result.status.status_code != TStatusCode::OK) [[unlikely]] { + LOG_WARNING("failed to report {}", name) + .tag("host", master_info.network_address.hostname) + .tag("port", master_info.network_address.port) + .error(result.status); + return false; } - StorageEngine::instance()->deregister_report_listener(this); + + return true; } -void TaskWorkerPool::_report_tablet_worker_thread_callback() { - StorageEngine::instance()->register_report_listener(this); +void _submit_task(const TAgentTaskRequest& task, + std::function<Status(const TAgentTaskRequest&)> submit_op) { + const TTaskType::type task_type = task.task_type; + int64_t signature = task.signature; - while (_is_work) { - _is_doing_work = false; + std::string type_str; + EnumToString(TTaskType, task_type, type_str); + VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature; - { - // wait at most report_tablet_interval_seconds, or being notified - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait_for( - worker_thread_lock, - std::chrono::seconds(config::report_tablet_interval_seconds)); - } - if (!_is_work) { - break; - } + if (!register_task_info(task_type, signature)) { + LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature); + return; + } - if (_master_info.network_address.port == 0) { - // port == 0 means not received heartbeat yet - LOG(INFO) << "waiting to receive first heartbeat from frontend before doing tablet " - "report"; - continue; - } + // Set the receiving time of task so that we can determine whether it is timed out later + (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr)); + auto st = submit_op(task); + if (!st.ok()) [[unlikely]] { + LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature); + return; + } - _is_doing_work = true; - // See _random_sleep() comment in _report_disk_state_worker_thread_callback - _random_sleep(5); - - TReportRequest request; - request.__set_backend(BackendOptions::get_local_backend()); - request.__isset.tablets = true; - - uint64_t report_version = _s_report_version; - static_cast<void>( - StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info( - &request.tablets)); - if (report_version < _s_report_version) { - // TODO llj This can only reduce the possibility for report error, but can't avoid it. - // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this - // report, and the report version has a small probability that it has not been updated in time. When FE - // receives this report, it is possible to delete the new tablet. - LOG(WARNING) << "report version " << report_version << " change to " - << _s_report_version; - DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); - continue; - } - int64_t max_compaction_score = - std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), - DorisMetrics::instance()->tablet_base_max_compaction_score->value()); - request.__set_tablet_max_compaction_score(max_compaction_score); - request.__set_report_version(report_version); - - // report storage policy and resource - auto& storage_policy_list = request.storage_policy; - for (auto [id, version] : get_storage_policy_ids()) { - auto& storage_policy = storage_policy_list.emplace_back(); - storage_policy.__set_id(id); - storage_policy.__set_version(version); - } - request.__isset.storage_policy = true; - auto& resource_list = request.resource; - for (auto [id, version] : get_storage_resource_ids()) { - auto& resource = resource_list.emplace_back(); - resource.__set_id(id); - resource.__set_version(version); - } - request.__isset.resource = true; + LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); +} - _handle_report(request, ReportType::TABLET); +bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version"); + +bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX"); Review Comment: Is uppercase necessary? and using single string instead for better searching. e.g. g_publish_version_latency("doris_pk_publish_version") ########## be/src/agent/task_worker_pool.cpp: ########## @@ -324,1709 +145,1552 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) .error(result.status); try_time += 1; } - sleep(config::sleep_one_second); + sleep(1); } } -void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } +Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id, + const TSchemaHash schema_hash, TTabletInfo* tablet_info) { + tablet_info->__set_tablet_id(tablet_id); + tablet_info->__set_schema_hash(schema_hash); + return engine.tablet_manager()->report_tablet_info(tablet_info); +} - agent_task_req = _tasks.front(); - _tasks.pop_front(); - } +void random_sleep(int second) { + Random rnd(UnixMillis()); + sleep(rnd.Uniform(second) + 1); +} - auto& alter_inverted_index_rq = agent_task_req.alter_inverted_index_req; - LOG(INFO) << "get alter inverted index task. signature=" << agent_task_req.signature - << ", tablet_id=" << alter_inverted_index_rq.tablet_id - << ", job_id=" << alter_inverted_index_rq.job_id; +void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature, + const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) { + Status status; - Status status = Status::OK(); - TabletSharedPtr tablet_ptr = StorageEngine::instance()->tablet_manager()->get_tablet( - alter_inverted_index_rq.tablet_id); - if (tablet_ptr != nullptr) { - EngineIndexChangeTask engine_task(alter_inverted_index_rq); - status = StorageEngine::instance()->execute_task(&engine_task); - } else { - status = - Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); - } + std::string_view process_name; + switch (task_type) { + case TTaskType::ALTER: + process_name = "alter tablet"; + break; + default: + std::string task_name; + EnumToString(TTaskType, task_type, task_name); + LOG(WARNING) << "schema change type invalid. type: " << task_name + << ", signature: " << signature; + status = Status::NotSupported("Schema change type invalid"); + break; + } - // Return result to fe - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - std::vector<TTabletInfo> finish_tablet_infos; - if (!status.ok()) { - LOG(WARNING) << "failed to alter inverted index task, signature=" - << agent_task_req.signature - << ", tablet_id=" << alter_inverted_index_rq.tablet_id - << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; - } else { - LOG(INFO) << "successfully alter inverted index task, signature=" - << agent_task_req.signature - << ", tablet_id=" << alter_inverted_index_rq.tablet_id - << ", job_id=" << alter_inverted_index_rq.job_id; - TTabletInfo tablet_info; - status = _get_tablet_info(alter_inverted_index_rq.tablet_id, - alter_inverted_index_rq.schema_hash, agent_task_req.signature, - &tablet_info); - if (status.ok()) { - finish_tablet_infos.push_back(tablet_info); - } - finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); - } - finish_task_request.__set_task_status(status.to_thrift()); - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + // Check last schema change status, if failed delete tablet file + // Do not need to adjust delete success or not + // Because if delete failed create rollup will failed + TTabletId new_tablet_id = 0; + TSchemaHash new_schema_hash = 0; + if (status.ok()) { + new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; + new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash; + EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2); + status = engine_task.execute(); } -} -void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - agent_task_req = _tasks.front(); - _tasks.pop_front(); - } - LOG(INFO) << "get update tablet meta task. signature=" << agent_task_req.signature; - - Status status; - auto& update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; - for (auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { - TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - tablet_meta_info.tablet_id); - if (tablet == nullptr) { - status = Status::NotFound("tablet not found"); - LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id=" - << tablet_meta_info.tablet_id; - continue; - } - bool need_to_save = false; - if (tablet_meta_info.__isset.storage_policy_id) { - tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id); - need_to_save = true; - } - if (tablet_meta_info.__isset.is_in_memory) { - tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( - tablet_meta_info.is_in_memory); - std::shared_lock rlock(tablet->get_header_lock()); - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory); - } - tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); - need_to_save = true; - } - if (tablet_meta_info.__isset.compaction_policy) { - if (tablet_meta_info.compaction_policy != "size_based" && - tablet_meta_info.compaction_policy != "time_series") { - status = Status::InvalidArgument( - "invalid compaction policy, only support for size_based or " - "time_series"); - continue; - } - tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy); - need_to_save = true; - } - if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { - status = Status::InvalidArgument( - "only time series compaction policy support time series config"); - continue; - } - tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes( - tablet_meta_info.time_series_compaction_goal_size_mbytes); - need_to_save = true; - } - if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { - status = Status::InvalidArgument( - "only time series compaction policy support time series config"); - continue; - } - tablet->tablet_meta()->set_time_series_compaction_file_count_threshold( - tablet_meta_info.time_series_compaction_file_count_threshold); - need_to_save = true; - } - if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { - status = Status::InvalidArgument( - "only time series compaction policy support time series config"); - continue; - } - tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds( - tablet_meta_info.time_series_compaction_time_threshold_seconds); - need_to_save = true; - } - if (tablet_meta_info.__isset.replica_id) { - tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); - } - if (tablet_meta_info.__isset.binlog_config) { - // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums - auto& t_binlog_config = tablet_meta_info.binlog_config; - if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds || - !t_binlog_config.__isset.max_bytes || - !t_binlog_config.__isset.max_history_nums) { - status = Status::InvalidArgument("invalid binlog config, some fields not set"); - LOG(WARNING) << fmt::format( - "invalid binlog config, some fields not set, tablet_id={}, " - "t_binlog_config={}", - tablet_meta_info.tablet_id, - apache::thrift::ThriftDebugString(t_binlog_config)); - continue; - } + if (status.ok()) { + s_report_version.fetch_add(1, std::memory_order_relaxed); + } - BinlogConfig new_binlog_config; - new_binlog_config = tablet_meta_info.binlog_config; - LOG(INFO) << fmt::format( - "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, " - "new_binlog_config={}", - tablet_meta_info.tablet_id, - tablet->tablet_meta()->binlog_config().to_string(), - new_binlog_config.to_string()); - tablet->set_binlog_config(new_binlog_config); - need_to_save = true; - } - if (tablet_meta_info.__isset.enable_single_replica_compaction) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->tablet_meta() - ->mutable_tablet_schema() - ->set_enable_single_replica_compaction( - tablet_meta_info.enable_single_replica_compaction); - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_enable_single_replica_compaction( - tablet_meta_info.enable_single_replica_compaction); - } - tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction( - tablet_meta_info.enable_single_replica_compaction); - need_to_save = true; - } + // Return result to fe + finish_task_request->__set_backend(BackendOptions::get_local_backend()); + finish_task_request->__set_report_version(s_report_version); + finish_task_request->__set_task_type(task_type); + finish_task_request->__set_signature(signature); - if (tablet_meta_info.__isset.skip_write_index_on_load) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load( - tablet_meta_info.skip_write_index_on_load); - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_skip_write_index_on_load( - tablet_meta_info.skip_write_index_on_load); - } - tablet->tablet_schema_unlocked()->set_skip_write_index_on_load( - tablet_meta_info.skip_write_index_on_load); - need_to_save = true; - } - if (need_to_save) { - std::shared_lock rlock(tablet->get_header_lock()); - tablet->save_meta(); - } + std::vector<TTabletInfo> finish_tablet_infos; + if (status.ok()) { + TTabletInfo tablet_info; + status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info); + if (status.ok()) { + finish_tablet_infos.push_back(tablet_info); } + } - LOG(INFO) << "finish update tablet meta task. signature=" << agent_task_req.signature; - if (agent_task_req.signature != -1) { - TFinishTaskRequest finish_task_request; - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } + if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) { + LOG_WARNING("failed to {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id) + .error(status); + } else { + finish_task_request->__set_finish_tablet_infos(finish_tablet_infos); + LOG_INFO("successfully {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id); } + finish_task_request->__set_task_status(status.to_thrift()); } -void TaskWorkerPool::_check_consistency_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - TCheckConsistencyReq check_consistency_req; - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } +Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req, + TabletSharedPtr& tablet, DataDir** dest_store) { + int64_t tablet_id = req.tablet_id; + tablet = engine.tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + return Status::InternalError("could not find tablet {}", tablet_id); + } - agent_task_req = _tasks.front(); - check_consistency_req = agent_task_req.check_consistency_req; - _tasks.pop_front(); + if (req.__isset.data_dir) { + // request specify the data dir + *dest_store = engine.get_store(req.data_dir); + if (*dest_store == nullptr) { + return Status::InternalError("could not find data dir {}", req.data_dir); } + } else { + // this is a storage medium + // get data dir by storage medium - uint32_t checksum = 0; - EngineChecksumTask engine_task(check_consistency_req.tablet_id, - check_consistency_req.schema_hash, - check_consistency_req.version, &checksum); - Status status = StorageEngine::instance()->execute_task(&engine_task); - if (!status.ok()) { - LOG_WARNING("failed to check consistency") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", check_consistency_req.tablet_id) - .error(status); - } else { - LOG_INFO("successfully check consistency") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", check_consistency_req.tablet_id) - .tag("checksum", checksum); + // judge case when no need to migrate + uint32_t count = engine.available_storage_medium_type_count(); + if (count <= 1) { + return Status::InternalError("available storage medium type count is less than 1"); } - - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum)); - finish_task_request.__set_request_version(check_consistency_req.version); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } -} - -void TaskWorkerPool::_report_task_worker_thread_callback() { - StorageEngine::instance()->register_report_listener(this); - TReportRequest request; - while (_is_work) { - _is_doing_work = false; - { - // wait at most report_task_interval_seconds, or being notified - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait_for( - worker_thread_lock, std::chrono::seconds(config::report_task_interval_seconds)); + // check current tablet storage medium + TStorageMedium::type storage_medium = req.storage_medium; + TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium(); + if (src_storage_medium == storage_medium) { + return Status::InternalError("tablet is already on specified storage medium {}", + storage_medium); } - if (!_is_work) { - break; + // get a random store of specified storage medium + auto stores = engine.get_stores_for_create_tablet(storage_medium); + if (stores.empty()) { + return Status::InternalError("failed to get root path for create tablet"); } - if (_master_info.network_address.port == 0) { - // port == 0 means not received heartbeat yet - // sleep a short time and try again - LOG(INFO) - << "waiting to receive first heartbeat from frontend before doing task report"; - continue; - } + *dest_store = stores[0]; + } + if (tablet->data_dir()->path() == (*dest_store)->path()) { + LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path()); + return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}", + tablet->data_dir()->path()); + } - _is_doing_work = true; - // See _random_sleep() comment in _report_disk_state_worker_thread_callback - _random_sleep(5); - { - std::lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock); - request.__set_tasks(_s_task_signatures); - request.__set_backend(BackendOptions::get_local_backend()); - } - _handle_report(request, ReportType::TASK); + // check local disk capacity + int64_t tablet_size = tablet->tablet_local_size(); + if ((*dest_store)->reach_capacity_limit(tablet_size)) { + return Status::InternalError("reach the capacity limit of path {}, tablet_size={}", + (*dest_store)->path(), tablet_size); } - StorageEngine::instance()->deregister_report_listener(this); + return Status::OK(); } -/// disk state report thread will report disk state at a configurable fix interval. -void TaskWorkerPool::_report_disk_state_worker_thread_callback() { - StorageEngine::instance()->register_report_listener(this); - - while (_is_work) { - _is_doing_work = false; - { - // wait at most report_disk_state_interval_seconds, or being notified - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait_for( - worker_thread_lock, - std::chrono::seconds(config::report_disk_state_interval_seconds)); - } - if (!_is_work) { - break; - } - - if (_master_info.network_address.port == 0) { - // port == 0 means not received heartbeat yet - LOG(INFO) - << "waiting to receive first heartbeat from frontend before doing disk report"; - continue; - } +// Return `true` if report success +bool handle_report(const TReportRequest& request, const TMasterInfo& master_info, + std::string_view name) { + TMasterResult result; + Status status = MasterServerClient::instance()->report(request, &result); + if (!status.ok()) [[unlikely]] { + LOG_WARNING("failed to report {}", name) + .tag("host", master_info.network_address.hostname) + .tag("port", master_info.network_address.port) + .error(status); + return false; + } - _is_doing_work = true; - // Random sleep 1~5 seconds before doing report. - // In order to avoid the problem that the FE receives many report requests at the same time - // and can not be processed. - _random_sleep(5); - - TReportRequest request; - request.__set_backend(BackendOptions::get_local_backend()); - request.__isset.disks = true; - - std::vector<DataDirInfo> data_dir_infos; - static_cast<void>(StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, - true /* update */)); - - for (auto& root_path_info : data_dir_infos) { - TDisk disk; - disk.__set_root_path(root_path_info.path); - disk.__set_path_hash(root_path_info.path_hash); - disk.__set_storage_medium(root_path_info.storage_medium); - disk.__set_disk_total_capacity(root_path_info.disk_capacity); - disk.__set_data_used_capacity(root_path_info.local_used_capacity); - disk.__set_remote_used_capacity(root_path_info.remote_used_capacity); - disk.__set_disk_available_capacity(root_path_info.available); - disk.__set_trash_used_capacity(root_path_info.trash_used_capacity); - disk.__set_used(root_path_info.is_used); - request.disks[root_path_info.path] = disk; - } - request.__set_num_cores(CpuInfo::num_cores()); - request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 - ? config::pipeline_executor_size - : CpuInfo::num_cores()); - _handle_report(request, ReportType::DISK); + else if (result.status.status_code != TStatusCode::OK) [[unlikely]] { + LOG_WARNING("failed to report {}", name) + .tag("host", master_info.network_address.hostname) + .tag("port", master_info.network_address.port) + .error(result.status); + return false; } - StorageEngine::instance()->deregister_report_listener(this); + + return true; } -void TaskWorkerPool::_report_tablet_worker_thread_callback() { - StorageEngine::instance()->register_report_listener(this); +void _submit_task(const TAgentTaskRequest& task, + std::function<Status(const TAgentTaskRequest&)> submit_op) { + const TTaskType::type task_type = task.task_type; + int64_t signature = task.signature; - while (_is_work) { - _is_doing_work = false; + std::string type_str; + EnumToString(TTaskType, task_type, type_str); + VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature; - { - // wait at most report_tablet_interval_seconds, or being notified - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait_for( - worker_thread_lock, - std::chrono::seconds(config::report_tablet_interval_seconds)); - } - if (!_is_work) { - break; - } + if (!register_task_info(task_type, signature)) { + LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature); + return; + } - if (_master_info.network_address.port == 0) { - // port == 0 means not received heartbeat yet - LOG(INFO) << "waiting to receive first heartbeat from frontend before doing tablet " - "report"; - continue; - } + // Set the receiving time of task so that we can determine whether it is timed out later + (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr)); + auto st = submit_op(task); + if (!st.ok()) [[unlikely]] { + LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature); + return; + } - _is_doing_work = true; - // See _random_sleep() comment in _report_disk_state_worker_thread_callback - _random_sleep(5); - - TReportRequest request; - request.__set_backend(BackendOptions::get_local_backend()); - request.__isset.tablets = true; - - uint64_t report_version = _s_report_version; - static_cast<void>( - StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info( - &request.tablets)); - if (report_version < _s_report_version) { - // TODO llj This can only reduce the possibility for report error, but can't avoid it. - // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this - // report, and the report version has a small probability that it has not been updated in time. When FE - // receives this report, it is possible to delete the new tablet. - LOG(WARNING) << "report version " << report_version << " change to " - << _s_report_version; - DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); - continue; - } - int64_t max_compaction_score = - std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), - DorisMetrics::instance()->tablet_base_max_compaction_score->value()); - request.__set_tablet_max_compaction_score(max_compaction_score); - request.__set_report_version(report_version); - - // report storage policy and resource - auto& storage_policy_list = request.storage_policy; - for (auto [id, version] : get_storage_policy_ids()) { - auto& storage_policy = storage_policy_list.emplace_back(); - storage_policy.__set_id(id); - storage_policy.__set_version(version); - } - request.__isset.storage_policy = true; - auto& resource_list = request.resource; - for (auto [id, version] : get_storage_resource_ids()) { - auto& resource = resource_list.emplace_back(); - resource.__set_id(id); - resource.__set_version(version); - } - request.__isset.resource = true; + LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); +} - _handle_report(request, ReportType::TABLET); +bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version"); + +bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX"); +bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY"); +bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD"); +bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD"); +bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT"); +bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT"); +bvar::Adder<uint64_t> MOVE_count("task", "MOVE"); +bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION"); +bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY"); +bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF"); +bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE"); +bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE"); +bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION"); +bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK"); +bvar::Adder<uint64_t> DELETE_count("task", "DELETE"); +bvar::Adder<uint64_t> PUSH_count("task", "PUSH"); +bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO"); +bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE"); +bvar::Adder<uint64_t> CLONE_count("task", "CLONE"); +bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); +bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG"); + +void add_task_count(const TAgentTaskRequest& task, int n) { + // clang-format off + switch (task.task_type) { + #define ADD_TASK_COUNT(type) \ + case TTaskType::type: \ + type##_count << n; \ + return; + ADD_TASK_COUNT(ALTER_INVERTED_INDEX) + ADD_TASK_COUNT(CHECK_CONSISTENCY) + ADD_TASK_COUNT(UPLOAD) + ADD_TASK_COUNT(DOWNLOAD) + ADD_TASK_COUNT(MAKE_SNAPSHOT) + ADD_TASK_COUNT(RELEASE_SNAPSHOT) + ADD_TASK_COUNT(MOVE) + ADD_TASK_COUNT(COMPACTION) + ADD_TASK_COUNT(PUSH_STORAGE_POLICY) + ADD_TASK_COUNT(PUSH_COOLDOWN_CONF) + ADD_TASK_COUNT(CREATE) + ADD_TASK_COUNT(DROP) + ADD_TASK_COUNT(PUBLISH_VERSION) + ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK) + ADD_TASK_COUNT(UPDATE_TABLET_META_INFO) + ADD_TASK_COUNT(ALTER) + ADD_TASK_COUNT(CLONE) + ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) + ADD_TASK_COUNT(GC_BINLOG) + #undef ADD_TASK_COUNT + case TTaskType::REALTIME_PUSH: + case TTaskType::PUSH: + if (task.push_req.push_type == TPushType::LOAD_V2) { + PUSH_count << n; + } else if (task.push_req.push_type == TPushType::DELETE) { + DELETE_count << n; + } + return; + default: + return; } - StorageEngine::instance()->deregister_report_listener(this); + // clang-format on } -void TaskWorkerPool::_upload_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - TUploadReq upload_request; - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - - agent_task_req = _tasks.front(); - upload_request = agent_task_req.upload_req; - _tasks.pop_front(); - } +bvar::Adder<uint64_t> report_task_total("report", "task_total"); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org