dataroaring commented on code in PR #16639:
URL: https://github.com/apache/doris/pull/16639#discussion_r1103931956


##########
be/src/util/doris_metrics.cpp:
##########
@@ -197,6 +230,37 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     _server_metric_entity = _metric_registry.register_entity("server");
 
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
fragment_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_data);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_data_by_http);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, exec_plan_fragment);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
exec_plan_fragment_prepare);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
exec_plan_fragment_start);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cancel_plan_fragment);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_data);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_table_schema);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_open);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
tablet_writer_add_block);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
tablet_writer_add_block_by_http);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, tablet_writer_cancel);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_info);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, update_cache);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fetch_cache);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clear_cache);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, merge_filter);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, apply_filter);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_block);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, transmit_block_by_http);

Review Comment:
   actually, there are these metrics in bvar in brpc already.



##########
be/src/common/config.h:
##########
@@ -35,8 +35,10 @@ CONF_Int32(be_port, "9060");
 // port for brpc
 CONF_Int32(brpc_port, "8060");
 
-// the number of bthreads for brpc, the default value is set to -1, which 
means the number of bthreads is #cpu-cores
-CONF_Int32(brpc_num_threads, "-1");
+// the number of bthreads for brpc, the default value is set to 32
+// brpc only for network service send or accept request
+// no more  process any logic
+CONF_Int32(brpc_num_threads, "32");

Review Comment:
   We do not need to change default value here, e.g. if a user runs server on a 
machine with 128 cores, it would break his/her config used.



##########
be/src/service/internal_service.cpp:
##########
@@ -423,200 +480,233 @@ void 
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
                                              const PTabletKeyLookupRequest* 
request,
                                              PTabletKeyLookupResponse* 
response,
                                              google::protobuf::Closure* done) {
-    [[maybe_unused]] brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
-    brpc::ClosureGuard guard(done);
-    Status st = _tablet_fetch_data(request, response);
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->tablet_fetch_data->increment(1);
+    _heavy_work_pool.offer([this, controller, request, response, done]() {
+        [[maybe_unused]] brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
+        brpc::ClosureGuard guard(done);
+        Status st = _tablet_fetch_data(request, response);
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    DorisMetrics::instance()->get_info->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        // PProxyRequest is defined in gensrc/proto/internal_service.proto
+        // Currently it supports 2 kinds of requests:
+        // 1. get all kafka partition ids for given topic
+        // 2. get all kafka partition offsets for given topic and timestamp.
+        if (request->has_kafka_meta_request()) {
+            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+                // get latest offsets for specified partition ids
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_latest_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else if (!kafka_request.offset_times().empty()) {
-            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
-            std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                            request->kafka_meta_request(), &partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if (!kafka_request.offset_times().empty()) {
+                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_partition_offsets_for_times(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else {
-            // get partition ids of topic
-            std::vector<int32_t> partition_ids;
-            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                    request->kafka_meta_request(), &partition_ids);
-            if (st.ok()) {
-                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                for (int32_t id : partition_ids) {
-                    kafka_result->add_partition_ids(id);
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else {
+                // get partition ids of topic
+                std::vector<int32_t> partition_ids;
+                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                        request->kafka_meta_request(), &partition_ids);
+                if (st.ok()) {
+                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                    for (int32_t id : partition_ids) {
+                        kafka_result->add_partition_ids(id);
+                    }
                 }
+                st.to_protobuf(response->mutable_status());
+                return;
             }
-            st.to_protobuf(response->mutable_status());
-            return;
         }
-    }
-    Status::OK().to_protobuf(response->mutable_status());
+        Status::OK().to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->update(request, response);
+    DorisMetrics::instance()->update_cache->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->update(request, response);
+    });
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->fetch(request, result);
+    DorisMetrics::instance()->fetch_cache->increment(1);
+    _heavy_work_pool.offer([this, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->fetch(request, result);
+    });
 }

Review Comment:
   If offer fails, what's the result of brpc in result ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to