github-actions[bot] commented on code in PR #32804:
URL: https://github.com/apache/doris/pull/32804#discussion_r1565143956


##########
be/src/cloud/cloud_backend_service.cpp:
##########
@@ -45,4 +57,137 @@
     return Status::OK();
 }
 
+void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&,
+                                                const 
TSyncLoadForTabletsRequest& request) {
+    auto f = [this, tablet_ids = request.tablet_ids]() {
+        std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t 
tablet_id) {
+            CloudTabletSPtr tablet;
+            auto result = _engine.tablet_mgr().get_tablet(tablet_id, true);
+            if (!result.has_value()) {
+                return;
+            }
+            Status st = result.value()->sync_rowsets(-1, true);
+            if (!st.ok()) {
+                LOG_WARNING("failed to sync load for tablet").error(st);
+            }
+        });
+    };
+    
static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f)));
+}
+
+void 
CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& 
response,
+                                                   const 
TGetTopNHotPartitionsRequest& request) {
+    TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables);
+    response.file_cache_size = 
io::FileCacheFactory::instance()->get_capacity();
+    response.__isset.hot_tables = !response.hot_tables.empty();
+}
+
+void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
+                                          const TWarmUpTabletsRequest& 
request) {
+    Status st;
+    auto* manager = CloudWarmUpManager::instance();
+    switch (request.type) {
+    case TWarmUpTabletsRequestType::SET_JOB: {
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "SET_JOB")
+                .tag("job_id", request.job_id);
+        st = manager->check_and_set_job_id(request.job_id);
+        if (!st) {
+            LOG_WARNING("SET_JOB failed.").error(st);
+            break;
+        }
+        [[fallthrough]];
+    }
+    case TWarmUpTabletsRequestType::SET_BATCH: {
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "SET_BATCH")
+                .tag("job_id", request.job_id)
+                .tag("batch_id", request.batch_id)
+                .tag("jobs size", request.job_metas.size());
+        bool retry = false;
+        st = manager->check_and_set_batch_id(request.job_id, request.batch_id, 
&retry);
+        if (!retry && st) {
+            manager->add_job(request.job_metas);
+        } else {
+            if (retry) {
+                LOG_WARNING("retry the job.")
+                        .tag("job_id", request.job_id)
+                        .tag("batch_id", request.batch_id);
+            } else {
+                LOG_WARNING("SET_BATCH failed.").error(st);
+            }
+        }
+        break;
+    }
+    case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: {
+        auto [job_id, batch_id, pending_job_size, finish_job_size] =
+                manager->get_current_job_state();
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE")
+                .tag("job_id", job_id)
+                .tag("batch_id", batch_id)
+                .tag("pending_job_size", pending_job_size)
+                .tag("finish_job_size", finish_job_size);
+        response.__set_job_id(job_id);
+        response.__set_batch_id(batch_id);
+        response.__set_pending_job_size(pending_job_size);
+        response.__set_finish_job_size(finish_job_size);
+        break;
+    }
+    case TWarmUpTabletsRequestType::CLEAR_JOB: {
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "CLEAR_JOB")
+                .tag("job_id", request.job_id);
+        st = manager->clear_job(request.job_id);
+        break;
+    }
+    default:
+        DCHECK(false);
+    };
+    st.to_thrift(&response.status);
+}
+
+void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& response,
+                                          const TPreCacheAsyncRequest& 
request) {
+    std::string brpc_addr = fmt::format("{}:{}", request.host, 
request.brpc_port);
+    Status st = Status::OK();
+    TStatus t_status;
+    std::shared_ptr<PBackendService_Stub> brpc_stub =
+            
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
+    if (!brpc_stub) {
+        st = Status::RpcError("Address {} is wrong", brpc_addr);
+        return;
+    }
+    brpc::Controller cntl;
+    PGetFileCacheMetaRequest brpc_request;
+    std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
+                  [&](int64_t tablet_id) { 
brpc_request.add_tablet_ids(tablet_id); });
+    PGetFileCacheMetaResponse brpc_response;
+    brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, 
&brpc_response, nullptr);
+    if (!cntl.Failed()) {
+        std::vector<FileCacheBlockMeta> metas;
+        std::transform(brpc_response.file_cache_segment_metas().cbegin(),
+                       brpc_response.file_cache_segment_metas().cend(), 
std::back_inserter(metas),
+                       [](const FileCacheBlockMeta& meta) { return meta; });
+        io::DownloadTask download_task(std::move(metas));
+        
io::FileCacheBlockDownloader::instance()->submit_download_task(download_task);
+    } else {
+        st = Status::RpcError("{} isn't connected", brpc_addr);
+    }
+    st.to_thrift(&t_status);
+    response.status = t_status;
+}
+
+void CloudBackendService::check_pre_cache(TCheckPreCacheResponse& response,

Review Comment:
   warning: method 'check_pre_cache' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void CloudBackendService::check_pre_cache(TCheckPreCacheResponse& 
response,
   ```
   



##########
be/src/cloud/cloud_backend_service.cpp:
##########
@@ -45,4 +57,137 @@ Status 
CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv*
     return Status::OK();
 }
 
+void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&,
+                                                const 
TSyncLoadForTabletsRequest& request) {
+    auto f = [this, tablet_ids = request.tablet_ids]() {
+        std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t 
tablet_id) {
+            CloudTabletSPtr tablet;
+            auto result = _engine.tablet_mgr().get_tablet(tablet_id, true);
+            if (!result.has_value()) {
+                return;
+            }
+            Status st = result.value()->sync_rowsets(-1, true);
+            if (!st.ok()) {
+                LOG_WARNING("failed to sync load for tablet").error(st);
+            }
+        });
+    };
+    
static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f)));
+}
+
+void 
CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& 
response,
+                                                   const 
TGetTopNHotPartitionsRequest& request) {
+    TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables);
+    response.file_cache_size = 
io::FileCacheFactory::instance()->get_capacity();
+    response.__isset.hot_tables = !response.hot_tables.empty();
+}
+
+void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
+                                          const TWarmUpTabletsRequest& 
request) {
+    Status st;
+    auto* manager = CloudWarmUpManager::instance();
+    switch (request.type) {
+    case TWarmUpTabletsRequestType::SET_JOB: {
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "SET_JOB")
+                .tag("job_id", request.job_id);
+        st = manager->check_and_set_job_id(request.job_id);
+        if (!st) {
+            LOG_WARNING("SET_JOB failed.").error(st);
+            break;
+        }
+        [[fallthrough]];
+    }
+    case TWarmUpTabletsRequestType::SET_BATCH: {
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "SET_BATCH")
+                .tag("job_id", request.job_id)
+                .tag("batch_id", request.batch_id)
+                .tag("jobs size", request.job_metas.size());
+        bool retry = false;
+        st = manager->check_and_set_batch_id(request.job_id, request.batch_id, 
&retry);
+        if (!retry && st) {
+            manager->add_job(request.job_metas);
+        } else {
+            if (retry) {
+                LOG_WARNING("retry the job.")
+                        .tag("job_id", request.job_id)
+                        .tag("batch_id", request.batch_id);
+            } else {
+                LOG_WARNING("SET_BATCH failed.").error(st);
+            }
+        }
+        break;
+    }
+    case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: {
+        auto [job_id, batch_id, pending_job_size, finish_job_size] =
+                manager->get_current_job_state();
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE")
+                .tag("job_id", job_id)
+                .tag("batch_id", batch_id)
+                .tag("pending_job_size", pending_job_size)
+                .tag("finish_job_size", finish_job_size);
+        response.__set_job_id(job_id);
+        response.__set_batch_id(batch_id);
+        response.__set_pending_job_size(pending_job_size);
+        response.__set_finish_job_size(finish_job_size);
+        break;
+    }
+    case TWarmUpTabletsRequestType::CLEAR_JOB: {
+        LOG_INFO("receive the warm up request.")
+                .tag("request_type", "CLEAR_JOB")
+                .tag("job_id", request.job_id);
+        st = manager->clear_job(request.job_id);
+        break;
+    }
+    default:
+        DCHECK(false);
+    };
+    st.to_thrift(&response.status);
+}
+
+void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& response,

Review Comment:
   warning: method 'pre_cache_async' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& 
response,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to