chenlinzhong commented on code in PR #16639:
URL: https://github.com/apache/doris/pull/16639#discussion_r1103938347


##########
be/src/service/internal_service.cpp:
##########
@@ -423,200 +480,233 @@ void 
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
                                              const PTabletKeyLookupRequest* 
request,
                                              PTabletKeyLookupResponse* 
response,
                                              google::protobuf::Closure* done) {
-    [[maybe_unused]] brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
-    brpc::ClosureGuard guard(done);
-    Status st = _tablet_fetch_data(request, response);
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->tablet_fetch_data->increment(1);
+    _heavy_work_pool.offer([this, controller, request, response, done]() {
+        [[maybe_unused]] brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
+        brpc::ClosureGuard guard(done);
+        Status st = _tablet_fetch_data(request, response);
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    DorisMetrics::instance()->get_info->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {

Review Comment:
   ok



##########
be/src/service/internal_service.cpp:
##########
@@ -286,125 +334,134 @@ Status PInternalServiceImpl::_exec_plan_fragment(const 
std::string& ser_request,
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId tid;
-    tid.__set_hi(request->finst_id().hi());
-    tid.__set_lo(request->finst_id().lo());
-
-    Status st = Status::OK();
-    if (request->has_cancel_reason()) {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
-                  << ", reason: " << request->cancel_reason();
-        _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-    } else {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
-        _exec_env->fragment_mgr()->cancel(tid);
-    }
-
-    // TODO: the logic seems useless, cancel only return Status::OK. remove it
-    st.to_protobuf(result->mutable_status());
+    DorisMetrics::instance()->cancel_plan_fragment->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId tid;
+        tid.__set_hi(request->finst_id().hi());
+        tid.__set_lo(request->finst_id().lo());
+
+        Status st = Status::OK();
+        if (request->has_cancel_reason()) {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
+                      << ", reason: " << request->cancel_reason();
+            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+        } else {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
+            _exec_env->fragment_mgr()->cancel(tid);
+        }
+        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
+        st.to_protobuf(result->mutable_status());
+    });
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    DorisMetrics::instance()->fetch_data->increment(1);
+    _heavy_work_pool.offer([this, controller, request, result, done]() {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    });
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "fetch table schema";
-    brpc::ClosureGuard closure_guard(done);
-    TFileScanRange file_scan_range;
-    Status st = Status::OK();
-    {
-        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-        uint32_t len = request->file_scan_range().size();
-        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+    DorisMetrics::instance()->fetch_table_schema->increment(1);
+    _light_work_pool.offer([request, result, done]() {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to