github-actions[bot] commented on code in PR #32804: URL: https://github.com/apache/doris/pull/32804#discussion_r1565143956
########## be/src/cloud/cloud_backend_service.cpp: ########## @@ -45,4 +57,137 @@ return Status::OK(); } +void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&, + const TSyncLoadForTabletsRequest& request) { + auto f = [this, tablet_ids = request.tablet_ids]() { + std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t tablet_id) { + CloudTabletSPtr tablet; + auto result = _engine.tablet_mgr().get_tablet(tablet_id, true); + if (!result.has_value()) { + return; + } + Status st = result.value()->sync_rowsets(-1, true); + if (!st.ok()) { + LOG_WARNING("failed to sync load for tablet").error(st); + } + }); + }; + static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f))); +} + +void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response, + const TGetTopNHotPartitionsRequest& request) { + TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables); + response.file_cache_size = io::FileCacheFactory::instance()->get_capacity(); + response.__isset.hot_tables = !response.hot_tables.empty(); +} + +void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, + const TWarmUpTabletsRequest& request) { + Status st; + auto* manager = CloudWarmUpManager::instance(); + switch (request.type) { + case TWarmUpTabletsRequestType::SET_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_JOB") + .tag("job_id", request.job_id); + st = manager->check_and_set_job_id(request.job_id); + if (!st) { + LOG_WARNING("SET_JOB failed.").error(st); + break; + } + [[fallthrough]]; + } + case TWarmUpTabletsRequestType::SET_BATCH: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_BATCH") + .tag("job_id", request.job_id) + .tag("batch_id", request.batch_id) + .tag("jobs size", request.job_metas.size()); + bool retry = false; + st = manager->check_and_set_batch_id(request.job_id, request.batch_id, &retry); + if (!retry && st) { + manager->add_job(request.job_metas); + } else { + if (retry) { + LOG_WARNING("retry the job.") + .tag("job_id", request.job_id) + .tag("batch_id", request.batch_id); + } else { + LOG_WARNING("SET_BATCH failed.").error(st); + } + } + break; + } + case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: { + auto [job_id, batch_id, pending_job_size, finish_job_size] = + manager->get_current_job_state(); + LOG_INFO("receive the warm up request.") + .tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE") + .tag("job_id", job_id) + .tag("batch_id", batch_id) + .tag("pending_job_size", pending_job_size) + .tag("finish_job_size", finish_job_size); + response.__set_job_id(job_id); + response.__set_batch_id(batch_id); + response.__set_pending_job_size(pending_job_size); + response.__set_finish_job_size(finish_job_size); + break; + } + case TWarmUpTabletsRequestType::CLEAR_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "CLEAR_JOB") + .tag("job_id", request.job_id); + st = manager->clear_job(request.job_id); + break; + } + default: + DCHECK(false); + }; + st.to_thrift(&response.status); +} + +void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& response, + const TPreCacheAsyncRequest& request) { + std::string brpc_addr = fmt::format("{}:{}", request.host, request.brpc_port); + Status st = Status::OK(); + TStatus t_status; + std::shared_ptr<PBackendService_Stub> brpc_stub = + _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr); + if (!brpc_stub) { + st = Status::RpcError("Address {} is wrong", brpc_addr); + return; + } + brpc::Controller cntl; + PGetFileCacheMetaRequest brpc_request; + std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), + [&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); }); + PGetFileCacheMetaResponse brpc_response; + brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr); + if (!cntl.Failed()) { + std::vector<FileCacheBlockMeta> metas; + std::transform(brpc_response.file_cache_segment_metas().cbegin(), + brpc_response.file_cache_segment_metas().cend(), std::back_inserter(metas), + [](const FileCacheBlockMeta& meta) { return meta; }); + io::DownloadTask download_task(std::move(metas)); + io::FileCacheBlockDownloader::instance()->submit_download_task(download_task); + } else { + st = Status::RpcError("{} isn't connected", brpc_addr); + } + st.to_thrift(&t_status); + response.status = t_status; +} + +void CloudBackendService::check_pre_cache(TCheckPreCacheResponse& response, Review Comment: warning: method 'check_pre_cache' can be made static [readability-convert-member-functions-to-static] ```suggestion static void CloudBackendService::check_pre_cache(TCheckPreCacheResponse& response, ``` ########## be/src/cloud/cloud_backend_service.cpp: ########## @@ -45,4 +57,137 @@ Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* return Status::OK(); } +void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&, + const TSyncLoadForTabletsRequest& request) { + auto f = [this, tablet_ids = request.tablet_ids]() { + std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t tablet_id) { + CloudTabletSPtr tablet; + auto result = _engine.tablet_mgr().get_tablet(tablet_id, true); + if (!result.has_value()) { + return; + } + Status st = result.value()->sync_rowsets(-1, true); + if (!st.ok()) { + LOG_WARNING("failed to sync load for tablet").error(st); + } + }); + }; + static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f))); +} + +void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response, + const TGetTopNHotPartitionsRequest& request) { + TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables); + response.file_cache_size = io::FileCacheFactory::instance()->get_capacity(); + response.__isset.hot_tables = !response.hot_tables.empty(); +} + +void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, + const TWarmUpTabletsRequest& request) { + Status st; + auto* manager = CloudWarmUpManager::instance(); + switch (request.type) { + case TWarmUpTabletsRequestType::SET_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_JOB") + .tag("job_id", request.job_id); + st = manager->check_and_set_job_id(request.job_id); + if (!st) { + LOG_WARNING("SET_JOB failed.").error(st); + break; + } + [[fallthrough]]; + } + case TWarmUpTabletsRequestType::SET_BATCH: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_BATCH") + .tag("job_id", request.job_id) + .tag("batch_id", request.batch_id) + .tag("jobs size", request.job_metas.size()); + bool retry = false; + st = manager->check_and_set_batch_id(request.job_id, request.batch_id, &retry); + if (!retry && st) { + manager->add_job(request.job_metas); + } else { + if (retry) { + LOG_WARNING("retry the job.") + .tag("job_id", request.job_id) + .tag("batch_id", request.batch_id); + } else { + LOG_WARNING("SET_BATCH failed.").error(st); + } + } + break; + } + case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: { + auto [job_id, batch_id, pending_job_size, finish_job_size] = + manager->get_current_job_state(); + LOG_INFO("receive the warm up request.") + .tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE") + .tag("job_id", job_id) + .tag("batch_id", batch_id) + .tag("pending_job_size", pending_job_size) + .tag("finish_job_size", finish_job_size); + response.__set_job_id(job_id); + response.__set_batch_id(batch_id); + response.__set_pending_job_size(pending_job_size); + response.__set_finish_job_size(finish_job_size); + break; + } + case TWarmUpTabletsRequestType::CLEAR_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "CLEAR_JOB") + .tag("job_id", request.job_id); + st = manager->clear_job(request.job_id); + break; + } + default: + DCHECK(false); + }; + st.to_thrift(&response.status); +} + +void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& response, Review Comment: warning: method 'pre_cache_async' can be made static [readability-convert-member-functions-to-static] ```suggestion static void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& response, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org