This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new c246dfff [ISSUE #1001] C++ client should block on build when sync settings failed (#1002) c246dfff is described below commit c246dfff883cfb0f929bbd0dbf4e443e1a6fe82a Author: lizhimins <707364...@qq.com> AuthorDate: Tue May 20 16:21:43 2025 +0800 [ISSUE #1001] C++ client should block on build when sync settings failed (#1002) --- .github/workflows/cpp_build.yml | 2 +- cpp/source/client/ClientManagerImpl.cpp | 7 ++--- cpp/source/client/TelemetryBidiReactor.cpp | 36 ++++++++++++++++------ cpp/source/client/include/TelemetryBidiReactor.h | 20 +++++++----- cpp/source/rocketmq/ClientImpl.cpp | 34 ++++++++++++-------- cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp | 2 +- cpp/source/rocketmq/ProducerImpl.cpp | 2 +- cpp/source/rocketmq/PushConsumerImpl.cpp | 1 - .../rocketmq/include/ConsumeMessageServiceImpl.h | 4 +-- 9 files changed, 69 insertions(+), 39 deletions(-) diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml index 5dc0302b..667b6af9 100644 --- a/.github/workflows/cpp_build.yml +++ b/.github/workflows/cpp_build.yml @@ -11,7 +11,7 @@ jobs: # Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved # Remove macos-11 since there is no such runner available # os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022] - os: [ubuntu-20.04, ubuntu-22.04, windows-2019] + os: [ubuntu-22.04, windows-2019] steps: - uses: actions/checkout@v2 - name: Compile On Linux diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 3b22ddd6..95aaa4f6 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -94,22 +94,21 @@ void ClientManagerImpl::start() { SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed)); return; } + state_.store(State::STARTING, std::memory_order_relaxed); callback_thread_pool_->start(); - scheduler_->start(); std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this(); - auto heartbeat_functor = [client_instance_weak_ptr]() { auto client_instance = client_instance_weak_ptr.lock(); if (client_instance) { client_instance->doHeartbeat(); } }; - heartbeat_task_id_ = - scheduler_->schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10)); + heartbeat_task_id_ = scheduler_->schedule( + heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10)); SPDLOG_DEBUG("Heartbeat task-id={}", heartbeat_task_id_); state_.store(State::STARTED, std::memory_order_relaxed); diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index 74e8689c..2e12f437 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -38,7 +38,6 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); - sync_settings_future_ = sync_settings_promise_.get_future(); Metadata metadata; Signature::sign(ptr->config(), metadata); for (const auto& entry : metadata) { @@ -56,8 +55,19 @@ TelemetryBidiReactor::~TelemetryBidiReactor() { } bool TelemetryBidiReactor::awaitApplyingSettings() { - sync_settings_future_.get(); - return true; + auto settings_future = sync_settings_promise_.get_future(); + std::future_status status = settings_future.wait_for(std::chrono::seconds(3)); + if (status == std::future_status::ready) { + if (settings_future.get()) { + return true; + } + } + { + absl::MutexLock lk(&state_mtx_); + state_ = StreamState::Closed; + state_cv_.SignalAll(); + } + return false; } void TelemetryBidiReactor::OnWriteDone(bool ok) { @@ -283,21 +293,29 @@ void TelemetryBidiReactor::tryWriteNext() { static_cast<std::uint8_t>(state_)); return; } + if (writes_.empty()) { - SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_); + SPDLOG_DEBUG("No pending TelemetryCommand to write. Peer={}", peer_address_); return; } if (!writes_.empty()) { - SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString()); - AddHold(); - StartWrite(&(writes_.front())); + SPDLOG_DEBUG("Writing TelemetryCommand to {}: {}", peer_address_, writes_.front().ShortDebugString()); + if (StreamState::Ready == state_) { + AddHold(); + StartWrite(&(writes_.front())); + } else { + SPDLOG_WARN("Writing TelemetryCommand error due to unexpected state. State={}, Peer={}", + static_cast<uint8_t>(state_), peer_address_); + } } } void TelemetryBidiReactor::signalClose() { absl::MutexLock lk(&state_mtx_); - state_ = StreamState::Closing; + if (state_ == StreamState::Ready) { + state_ = StreamState::Closing; + } } void TelemetryBidiReactor::close() { @@ -361,7 +379,7 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { if (!ok) { // for read stream // Remove the hold corresponding to AddHold in TelemetryBidiReactor::TelemetryBidiReactor. - RemoveHold(); + // RemoveHold(); SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_), static_cast<std::uint8_t>(StreamState::Closing)); diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index 4bd58f96..3217fd4b 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -40,19 +40,25 @@ enum class StreamState : std::uint8_t Closed = 2, }; -/// stream-state: ready --> closing --> closed +/// TelemetryBidiReactor: Manages a bidirectional gRPC stream for telemetry data /// -/// requirement: -/// 1, close --> blocking wait till bidireactor is closed; -/// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from -/// server +/// Stream State Transitions: +/// Ready --> Closing --> Closed /// +/// Key Features: +/// 1. Close Operation: Performs a blocking wait until the bidirectional reactor is fully closed. +/// 2. Session Management: If the session closes while the client is still active, +/// it automatically initiates the creation of a new session to maintain +/// communication with the server. +/// +/// The reactor handles reading from and writing to the stream, manages stream state, +/// and applies settings received from the server. class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>, public std::enable_shared_from_this<TelemetryBidiReactor> { public: TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address); - ~TelemetryBidiReactor(); + ~TelemetryBidiReactor() override; /// Notifies the application that all operations associated with this RPC /// have completed and all Holds have been removed. OnDone provides the RPC @@ -125,7 +131,6 @@ private: absl::CondVar state_cv_; std::promise<bool> sync_settings_promise_; - std::future<bool> sync_settings_future_; void applySettings(const rmq::Settings& settings); @@ -137,6 +142,7 @@ private: /// Attempt to write pending telemetry command to server. void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_); + void signalClose(); }; diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp index 7f91b048..a5cfae8d 100644 --- a/cpp/source/rocketmq/ClientImpl.cpp +++ b/cpp/source/rocketmq/ClientImpl.cpp @@ -106,26 +106,30 @@ void ClientImpl::start() { name_server_resolver_->start(); client_config_.client_id = clientId(); - if (!client_manager_) { - client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, client_config_.withSsl); + client_manager_ = std::make_shared<ClientManagerImpl>( + client_config_.resource_namespace, client_config_.withSsl); + client_manager_->start(); } - client_manager_->start(); const auto& endpoint = name_server_resolver_->resolve(); if (endpoint.empty()) { SPDLOG_ERROR("Failed to resolve name server address"); - abort(); + return; } - createSession(endpoint, false); - { - absl::MutexLock lk(&session_map_mtx_); - session_map_[endpoint]->await(); + while (true) { + createSession(endpoint, false); + { + absl::MutexLock lk(&session_map_mtx_); + if (session_map_.contains(endpoint) && session_map_[endpoint]->await()) { + break; + } + session_map_.erase(endpoint); + } } std::weak_ptr<ClientImpl> ptr(self()); - { // Query routes for topics of interest in synchronous std::vector<std::string> topics; @@ -164,8 +168,9 @@ void ClientImpl::start() { } }; - route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME, - std::chrono::seconds(10), std::chrono::seconds(30)); + route_update_handle_ = client_manager_->getScheduler()->schedule( + route_update_functor, UPDATE_ROUTE_TASK_NAME, + std::chrono::seconds(10), std::chrono::seconds(30)); auto telemetry_functor = [ptr]() { std::shared_ptr<ClientImpl> base = ptr.lock(); @@ -597,8 +602,11 @@ void ClientImpl::notifyClientTermination(const NotifyClientTerminationRequest& r Signature::sign(client_config_, metadata); for (const auto& endpoint : endpoints) { - client_manager_->notifyClientTermination(endpoint, metadata, request, - absl::ToChronoMilliseconds(client_config_.request_timeout)); + std::error_code ec = client_manager_->notifyClientTermination( + endpoint, metadata, request,absl::ToChronoMilliseconds(client_config_.request_timeout)); + if (ec) { + SPDLOG_WARN("Notify client termination error, ErrorCode={}, Endpoint={}", ec.message(), endpoint); + } } } diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp index 2b5e7c70..57268f35 100644 --- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp +++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp @@ -72,7 +72,7 @@ void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> process_q return; } - for (auto message : messages) { + for (const auto& message : messages) { auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), process_queue, message); pool_->submit([consume_task]() { consume_task->process(); }); } diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 7ea7d4f6..b7dd82e8 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -56,7 +56,7 @@ void ProducerImpl::start() { State expecting = State::STARTING; if (!state_.compare_exchange_strong(expecting, State::STARTED)) { - SPDLOG_ERROR("Start with unexpected state. Expecting: {}, Actual: {}", State::STARTING, + SPDLOG_ERROR("Producer started with an unexpected state. Expecting: {}, Actual: {}", State::STARTING, state_.load(std::memory_order_relaxed)); return; } diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index 68bb9f67..acc08177 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -65,7 +65,6 @@ void PushConsumerImpl::start() { if (!message_listener_) { SPDLOG_ERROR("Required message listener is missing"); abort(); - return; } client_config_.subscriber.group.set_resource_namespace(resourceNamespace()); diff --git a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h index f2f54bbd..ee0e78b4 100644 --- a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h +++ b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h @@ -43,8 +43,8 @@ public: * Make it noncopyable. */ ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl &other) = delete; - ConsumeMessageServiceImpl & - operator=(const ConsumeMessageServiceImpl &other) = delete; + + ConsumeMessageServiceImpl &operator=(const ConsumeMessageServiceImpl &other) = delete; void start() override;