This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 303a2179543c4570794621097a338d0f1e4271db Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Aug 27 20:56:47 2022 +0800 [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721) --- .../tubemq-client-cpp/src/baseconsumer.cc | 68 +++++++++++++--------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc index c7152a16e..5fbff8ddc 100644 --- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc +++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc @@ -127,27 +127,23 @@ void BaseConsumer::ShutDown() { // 3. close all brokers closeAllBrokers(); // 4. check master hb thread status - int check_count = 5; - while (master_hb_status_.Get() != 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(30)); - if (--check_count <= 0) { - LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s", - master_hb_status_.Get(), client_uuid_.c_str()); - break; - } - } - check_count = 5; - while (master_reg_status_.Get() != 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(30)); - if (--check_count <= 0) { - LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s", - master_reg_status_.Get(), client_uuid_.c_str()); - break; + int check_count = 0; + while (master_hb_status_.Get() != 0 || master_reg_status_.Get() != 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(40)); + if (++check_count % 10 == 0) { + if (check_count >= 1000) { + LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, exit, client=%s", + master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str()); + break; + } else { + LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, continue, client=%s", + master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str()); + } } } // 5. join hb thread; - heart_beat_timer_ = nullptr; rebalance_thread_ptr_->join(); + heart_beat_timer_ = nullptr; rebalance_thread_ptr_ = nullptr; // 6. remove client stub TubeMQService::Instance()->RmvClientObj(shared_from_this()); @@ -539,21 +535,26 @@ void BaseConsumer::heartBeat2Master() { req_protocol->request_id_ = request->request_id_; req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500; // send message to target - auto self = shared_from_this(); AsyncRequest(request, req_protocol) - .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) { + .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) { + if (GetClientIndex() == tb_config::kInvalidValue || + !TubeMQService::Instance()->IsRunning() || + !isClientRunning()) { + master_hb_status_.CompareAndSet(1, 0); + return; + } if (error.Value() != err_code::kErrSuccess) { master_sh_retry_cnt_++; LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s", target_ip.c_str(), target_port, error.Message().c_str(), client_uuid_.c_str()); if (master_sh_retry_cnt_ >= tb_config::kMaxMasterHBRetryCount) { - LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s", - master_sh_retry_cnt_, client_uuid_.c_str()); - master_sh_retry_cnt_ = 0; - is_master_actived_.Set(false); - asyncRegister2Master(true); - master_hb_status_.CompareAndSet(1, 0); - return; + LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s", + master_sh_retry_cnt_, client_uuid_.c_str()); + master_sh_retry_cnt_ = 0; + is_master_actived_.Set(false); + asyncRegister2Master(true); + master_hb_status_.CompareAndSet(1, 0); + return; } } else { // process response @@ -578,7 +579,14 @@ void BaseConsumer::heartBeat2Master() { } } } + if (GetClientIndex() == tb_config::kInvalidValue || + !TubeMQService::Instance()->IsRunning() || + !isClientRunning()) { + master_hb_status_.CompareAndSet(1, 0); + return; + } heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms())); + auto self = shared_from_this(); heart_beat_timer_->async_wait([self, this](const std::error_code& ec) { if (ec) { return; @@ -786,9 +794,13 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) { req_protocol->request_id_ = request->request_id_; req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500; // send message to target - auto self = shared_from_this(); AsyncRequest(request, req_protocol) - .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) { + .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) { + if (GetClientIndex() == tb_config::kInvalidValue || + !TubeMQService::Instance()->IsRunning() || + !isClientRunning()) { + return; + } if (error.Value() != err_code::kErrSuccess) { LOG_WARN("[Heartbeat2Broker] request network to failure (%s), ression is %s", broker_info.GetAddrInfo().c_str(), error.Message().c_str());