morningman commented on a change in pull request #6916: URL: https://github.com/apache/incubator-doris/pull/6916#discussion_r739636432
########## File path: be/src/runtime/data_stream_sender.cpp ########## @@ -105,17 +105,18 @@ Status DataStreamSender::Channel::init(RuntimeState* state) { _brpc_request.set_be_number(_be_number); _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) * 1000; - if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { - _brpc_stub = - state->exec_env()->brpc_stub_cache()->get_stub("127.0.0.1", _brpc_dest_addr.port); - } else { - _brpc_stub = state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr); - } // In bucket shuffle join will set fragment_instance_id (-1, -1) // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1); + if (_need_close) { + _brpc_stub = state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr); + if (!_brpc_stub) { + LOG(WARNING) << "Get rpc stub failed, dest_addr=" << _brpc_dest_addr; + return Status::InternalError("get rpc stub failed"); Review comment: More useful error msg ########## File path: be/src/util/brpc_stub_cache.h ########## @@ -34,25 +34,32 @@ class BrpcStubCache { BrpcStubCache(); virtual ~BrpcStubCache(); - virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) { - std::lock_guard<SpinLock> l(_lock); + virtual std::shared_ptr<PBackendService_Stub> get_stub(const butil::EndPoint& endpoint) { + std::lock_guard<std::mutex> l(_mutex); auto stub_ptr = _stub_map.seek(endpoint); if (stub_ptr != nullptr) { - return *stub_ptr; + if (available(*stub_ptr, endpoint)) { Review comment: It is not good to check it every time we call `get_stub`. I think it should be checked when error happens. ########## File path: be/src/util/brpc_stub_cache.cpp ########## @@ -24,15 +24,12 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT) BrpcStubCache::BrpcStubCache() { _stub_map.init(239); REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { - std::lock_guard<SpinLock> l(_lock); + std::lock_guard<std::mutex> l(_mutex); Review comment: Why not using SpinLock? ########## File path: docs/zh-CN/administrator-guide/http-actions/check-reset-rpc-cache.md ########## @@ -0,0 +1,46 @@ +--- +{ + "title": "检查和重置连接缓存", Review comment: You need to add these new docs to sidebar ########## File path: be/src/runtime/runtime_filter_mgr.cpp ########## @@ -229,9 +229,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ request_fragment_id->set_hi(targets[i].target_fragment_instance_id.hi); request_fragment_id->set_lo(targets[i].target_fragment_instance_id.lo); - PBackendService_Stub* stub = ExecEnv::GetInstance()->brpc_stub_cache()->get_stub( - targets[i].target_fragment_instance_addr); - VLOG_NOTICE << "send filter " << rpc_contexts[i]->request.filter_id() + std::shared_ptr<PBackendService_Stub> stub( + ExecEnv::GetInstance()->brpc_stub_cache()->get_stub( + targets[i].target_fragment_instance_addr)); + LOG(INFO) << "send filter " << rpc_contexts[i]->request.filter_id() Review comment: Do not use INFO level log here. ########## File path: be/src/exec/tablet_sink.cpp ########## @@ -368,6 +370,10 @@ void NodeChannel::try_send_batch() { if (row_batch->num_rows() > 0) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); row_batch->serialize(request.mutable_row_batch()); + if (request.row_batch().ByteSizeLong() >= double(config::brpc_max_body_size) * 0.95f) { Review comment: ByteSizeLong() will recursively calls ByteSizeLong() on all embedded messages. It is expensive. And it this message really help for debugging? First, there is no context info in log. Second, what should we do if we meet this message? ########## File path: be/src/util/brpc_stub_cache.h ########## @@ -34,25 +34,32 @@ class BrpcStubCache { BrpcStubCache(); virtual ~BrpcStubCache(); - virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) { - std::lock_guard<SpinLock> l(_lock); + virtual std::shared_ptr<PBackendService_Stub> get_stub(const butil::EndPoint& endpoint) { + std::lock_guard<std::mutex> l(_mutex); Review comment: Why not using SpinLock? ########## File path: be/src/exprs/runtime_filter_rpc.cpp ########## @@ -39,7 +39,12 @@ struct IRuntimeFilter::rpc_context { Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) { DCHECK(is_producer()); DCHECK(_rpc_context == nullptr); - PBackendService_Stub* stub = state->exec_env()->brpc_stub_cache()->get_stub(*addr); + std::shared_ptr<PBackendService_Stub> stub( + state->exec_env()->brpc_stub_cache()->get_stub(*addr)); + if (!stub) { + LOG(WARNING) << "Get rpc stub failed, host=" << addr->hostname << ", port=" << addr->port; + return Status::InternalError("get rpc stub failed"); Review comment: Should return more useful info in error log, like ip and reason -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org