morningman commented on a change in pull request #6916:
URL: https://github.com/apache/incubator-doris/pull/6916#discussion_r739636432



##########
File path: be/src/runtime/data_stream_sender.cpp
##########
@@ -105,17 +105,18 @@ Status DataStreamSender::Channel::init(RuntimeState* 
state) {
     _brpc_request.set_be_number(_be_number);
 
     _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) * 
1000;
-    if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
-        _brpc_stub =
-                state->exec_env()->brpc_stub_cache()->get_stub("127.0.0.1", 
_brpc_dest_addr.port);
-    } else {
-        _brpc_stub = 
state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr);
-    }
 
     // In bucket shuffle join will set fragment_instance_id (-1, -1)
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo 
!= -1);
+    if (_need_close) {
+        _brpc_stub = 
state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr);
+        if (!_brpc_stub) {
+            LOG(WARNING) << "Get rpc stub failed, dest_addr=" << 
_brpc_dest_addr;
+            return Status::InternalError("get rpc stub failed");

Review comment:
       More useful error msg

##########
File path: be/src/util/brpc_stub_cache.h
##########
@@ -34,25 +34,32 @@ class BrpcStubCache {
     BrpcStubCache();
     virtual ~BrpcStubCache();
 
-    virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
-        std::lock_guard<SpinLock> l(_lock);
+    virtual std::shared_ptr<PBackendService_Stub> get_stub(const 
butil::EndPoint& endpoint) {
+        std::lock_guard<std::mutex> l(_mutex);
         auto stub_ptr = _stub_map.seek(endpoint);
         if (stub_ptr != nullptr) {
-            return *stub_ptr;
+            if (available(*stub_ptr, endpoint)) {

Review comment:
       It is not good to check it every time we call `get_stub`. I think it 
should be checked when error happens.

##########
File path: be/src/util/brpc_stub_cache.cpp
##########
@@ -24,15 +24,12 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT)
 BrpcStubCache::BrpcStubCache() {
     _stub_map.init(239);
     REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() {
-        std::lock_guard<SpinLock> l(_lock);
+        std::lock_guard<std::mutex> l(_mutex);

Review comment:
       Why not using SpinLock?

##########
File path: docs/zh-CN/administrator-guide/http-actions/check-reset-rpc-cache.md
##########
@@ -0,0 +1,46 @@
+---
+{
+    "title": "检查和重置连接缓存",

Review comment:
       You need to add these new docs to sidebar

##########
File path: be/src/runtime/runtime_filter_mgr.cpp
##########
@@ -229,9 +229,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             
request_fragment_id->set_hi(targets[i].target_fragment_instance_id.hi);
             
request_fragment_id->set_lo(targets[i].target_fragment_instance_id.lo);
 
-            PBackendService_Stub* stub = 
ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
-                    targets[i].target_fragment_instance_addr);
-            VLOG_NOTICE << "send filter " << 
rpc_contexts[i]->request.filter_id()
+            std::shared_ptr<PBackendService_Stub> stub(
+                    ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
+                            targets[i].target_fragment_instance_addr));
+            LOG(INFO) << "send filter " << rpc_contexts[i]->request.filter_id()

Review comment:
       Do not use INFO level log here.

##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -368,6 +370,10 @@ void NodeChannel::try_send_batch() {
     if (row_batch->num_rows() > 0) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         row_batch->serialize(request.mutable_row_batch());
+        if (request.row_batch().ByteSizeLong() >= 
double(config::brpc_max_body_size) * 0.95f) {

Review comment:
       ByteSizeLong() will recursively calls ByteSizeLong() on all embedded 
messages. It is expensive.
   And it this message really help for debugging? First, there is no context 
info in log. Second, what should we do if we meet this message?

##########
File path: be/src/util/brpc_stub_cache.h
##########
@@ -34,25 +34,32 @@ class BrpcStubCache {
     BrpcStubCache();
     virtual ~BrpcStubCache();
 
-    virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
-        std::lock_guard<SpinLock> l(_lock);
+    virtual std::shared_ptr<PBackendService_Stub> get_stub(const 
butil::EndPoint& endpoint) {
+        std::lock_guard<std::mutex> l(_mutex);

Review comment:
       Why not using SpinLock?

##########
File path: be/src/exprs/runtime_filter_rpc.cpp
##########
@@ -39,7 +39,12 @@ struct IRuntimeFilter::rpc_context {
 Status IRuntimeFilter::push_to_remote(RuntimeState* state, const 
TNetworkAddress* addr) {
     DCHECK(is_producer());
     DCHECK(_rpc_context == nullptr);
-    PBackendService_Stub* stub = 
state->exec_env()->brpc_stub_cache()->get_stub(*addr);
+    std::shared_ptr<PBackendService_Stub> stub(
+            state->exec_env()->brpc_stub_cache()->get_stub(*addr));
+    if (!stub) {
+        LOG(WARNING) << "Get rpc stub failed, host=" << addr->hostname << ", 
port=" << addr->port;
+        return Status::InternalError("get rpc stub failed");

Review comment:
       Should return more useful info in error log, like ip and reason




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to