This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c246dfff [ISSUE #1001] C++ client should block on build when sync 
settings failed (#1002)
c246dfff is described below

commit c246dfff883cfb0f929bbd0dbf4e443e1a6fe82a
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue May 20 16:21:43 2025 +0800

    [ISSUE #1001] C++ client should block on build when sync settings failed 
(#1002)
---
 .github/workflows/cpp_build.yml                    |  2 +-
 cpp/source/client/ClientManagerImpl.cpp            |  7 ++---
 cpp/source/client/TelemetryBidiReactor.cpp         | 36 ++++++++++++++++------
 cpp/source/client/include/TelemetryBidiReactor.h   | 20 +++++++-----
 cpp/source/rocketmq/ClientImpl.cpp                 | 34 ++++++++++++--------
 cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp  |  2 +-
 cpp/source/rocketmq/ProducerImpl.cpp               |  2 +-
 cpp/source/rocketmq/PushConsumerImpl.cpp           |  1 -
 .../rocketmq/include/ConsumeMessageServiceImpl.h   |  4 +--
 9 files changed, 69 insertions(+), 39 deletions(-)

diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 5dc0302b..667b6af9 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -11,7 +11,7 @@ jobs:
         # Disable VS 2022 before 
https://github.com/bazelbuild/bazel/issues/18592 issue is solved
         # Remove macos-11 since there is no such runner available
         # os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, 
windows-2022]
-        os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
+        os: [ubuntu-22.04, windows-2019]
     steps:
       - uses: actions/checkout@v2
       - name: Compile On Linux
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 3b22ddd6..95aaa4f6 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -94,22 +94,21 @@ void ClientManagerImpl::start() {
     SPDLOG_WARN("Unexpected client instance state: {}", 
state_.load(std::memory_order_relaxed));
     return;
   }
+
   state_.store(State::STARTING, std::memory_order_relaxed);
 
   callback_thread_pool_->start();
-
   scheduler_->start();
 
   std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = 
shared_from_this();
-
   auto heartbeat_functor = [client_instance_weak_ptr]() {
     auto client_instance = client_instance_weak_ptr.lock();
     if (client_instance) {
       client_instance->doHeartbeat();
     }
   };
-  heartbeat_task_id_ =
-      scheduler_->schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, 
std::chrono::seconds(1), std::chrono::seconds(10));
+  heartbeat_task_id_ = scheduler_->schedule(
+      heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), 
std::chrono::seconds(10));
   SPDLOG_DEBUG("Heartbeat task-id={}", heartbeat_task_id_);
 
   state_.store(State::STARTED, std::memory_order_relaxed);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 74e8689c..2e12f437 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -38,7 +38,6 @@ 
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
   auto ptr = client_.lock();
   auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
   context_.set_deadline(deadline);
-  sync_settings_future_ = sync_settings_promise_.get_future();
   Metadata metadata;
   Signature::sign(ptr->config(), metadata);
   for (const auto& entry : metadata) {
@@ -56,8 +55,19 @@ TelemetryBidiReactor::~TelemetryBidiReactor() {
 }
 
 bool TelemetryBidiReactor::awaitApplyingSettings() {
-  sync_settings_future_.get();
-  return true;
+  auto settings_future = sync_settings_promise_.get_future();
+  std::future_status status = 
settings_future.wait_for(std::chrono::seconds(3));
+  if (status == std::future_status::ready) {
+    if (settings_future.get()) {
+      return true;
+    }
+  }
+  {
+    absl::MutexLock lk(&state_mtx_);
+    state_ = StreamState::Closed;
+    state_cv_.SignalAll();
+  }
+  return false;
 }
 
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
@@ -283,21 +293,29 @@ void TelemetryBidiReactor::tryWriteNext() {
                 static_cast<std::uint8_t>(state_));
     return;
   }
+
   if (writes_.empty()) {
-    SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_);
+    SPDLOG_DEBUG("No pending TelemetryCommand to write. Peer={}", 
peer_address_);
     return;
   }
 
   if (!writes_.empty()) {
-    SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().ShortDebugString());
-    AddHold();
-    StartWrite(&(writes_.front()));
+    SPDLOG_DEBUG("Writing TelemetryCommand to {}: {}", peer_address_, 
writes_.front().ShortDebugString());
+    if (StreamState::Ready == state_) {
+      AddHold();
+      StartWrite(&(writes_.front()));
+    } else {
+      SPDLOG_WARN("Writing TelemetryCommand error due to unexpected state. 
State={}, Peer={}",
+                  static_cast<uint8_t>(state_), peer_address_);
+    }
   }
 }
 
 void TelemetryBidiReactor::signalClose() {
   absl::MutexLock lk(&state_mtx_);
-  state_ = StreamState::Closing;
+  if (state_ == StreamState::Ready) {
+    state_ = StreamState::Closing;
+  }
 }
 
 void TelemetryBidiReactor::close() {
@@ -361,7 +379,7 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool 
ok) {
   if (!ok) {
     // for read stream
     // Remove the hold corresponding to AddHold in 
TelemetryBidiReactor::TelemetryBidiReactor.
-    RemoveHold();
+    // RemoveHold();
 
     SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_),
                  static_cast<std::uint8_t>(StreamState::Closing));
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 4bd58f96..3217fd4b 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -40,19 +40,25 @@ enum class StreamState : std::uint8_t
   Closed = 2,
 };
 
-/// stream-state: ready --> closing --> closed
+/// TelemetryBidiReactor: Manages a bidirectional gRPC stream for telemetry 
data
 ///
-/// requirement:
-///    1, close --> blocking wait till bidireactor is closed;
-///    2, when session is closed and client is still active, recreate a new 
session to accept incoming commands from
-///    server
+/// Stream State Transitions:
+///    Ready --> Closing --> Closed
 ///
+/// Key Features:
+///    1. Close Operation: Performs a blocking wait until the bidirectional 
reactor is fully closed.
+///    2. Session Management: If the session closes while the client is still 
active,
+///       it automatically initiates the creation of a new session to maintain
+///       communication with the server.
+///
+/// The reactor handles reading from and writing to the stream, manages stream 
state,
+/// and applies settings received from the server.
 class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
                              public 
std::enable_shared_from_this<TelemetryBidiReactor> {
 public:
   TelemetryBidiReactor(std::weak_ptr<Client> client, 
rmq::MessagingService::Stub* stub, std::string peer_address);
 
-  ~TelemetryBidiReactor();
+  ~TelemetryBidiReactor() override;
 
   /// Notifies the application that all operations associated with this RPC
   /// have completed and all Holds have been removed. OnDone provides the RPC
@@ -125,7 +131,6 @@ private:
   absl::CondVar state_cv_;
 
   std::promise<bool> sync_settings_promise_;
-  std::future<bool> sync_settings_future_;
 
   void applySettings(const rmq::Settings& settings);
 
@@ -137,6 +142,7 @@ private:
 
   /// Attempt to write pending telemetry command to server.
   void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
+
   void signalClose();
 };
 
diff --git a/cpp/source/rocketmq/ClientImpl.cpp 
b/cpp/source/rocketmq/ClientImpl.cpp
index 7f91b048..a5cfae8d 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -106,26 +106,30 @@ void ClientImpl::start() {
   name_server_resolver_->start();
 
   client_config_.client_id = clientId();
-
   if (!client_manager_) {
-    client_manager_ = 
std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, 
client_config_.withSsl);
+    client_manager_ = std::make_shared<ClientManagerImpl>(
+        client_config_.resource_namespace, client_config_.withSsl);
+    client_manager_->start();
   }
-  client_manager_->start();
 
   const auto& endpoint = name_server_resolver_->resolve();
   if (endpoint.empty()) {
     SPDLOG_ERROR("Failed to resolve name server address");
-    abort();
+    return;
   }
 
-  createSession(endpoint, false);
-  {
-    absl::MutexLock lk(&session_map_mtx_);
-    session_map_[endpoint]->await();
+  while (true) {
+    createSession(endpoint, false);
+    {
+      absl::MutexLock lk(&session_map_mtx_);
+      if (session_map_.contains(endpoint) && session_map_[endpoint]->await()) {
+        break;
+      }
+      session_map_.erase(endpoint);
+    }
   }
 
   std::weak_ptr<ClientImpl> ptr(self());
-
   {
     // Query routes for topics of interest in synchronous
     std::vector<std::string> topics;
@@ -164,8 +168,9 @@ void ClientImpl::start() {
     }
   };
 
-  route_update_handle_ = 
client_manager_->getScheduler()->schedule(route_update_functor, 
UPDATE_ROUTE_TASK_NAME,
-                                                                   
std::chrono::seconds(10), std::chrono::seconds(30));
+  route_update_handle_ = client_manager_->getScheduler()->schedule(
+      route_update_functor, UPDATE_ROUTE_TASK_NAME,
+      std::chrono::seconds(10), std::chrono::seconds(30));
 
   auto telemetry_functor = [ptr]() {
     std::shared_ptr<ClientImpl> base = ptr.lock();
@@ -597,8 +602,11 @@ void ClientImpl::notifyClientTermination(const 
NotifyClientTerminationRequest& r
   Signature::sign(client_config_, metadata);
 
   for (const auto& endpoint : endpoints) {
-    client_manager_->notifyClientTermination(endpoint, metadata, request,
-                                             
absl::ToChronoMilliseconds(client_config_.request_timeout));
+    std::error_code ec = client_manager_->notifyClientTermination(
+        endpoint, metadata, 
request,absl::ToChronoMilliseconds(client_config_.request_timeout));
+    if (ec) {
+      SPDLOG_WARN("Notify client termination error, ErrorCode={}, 
Endpoint={}", ec.message(), endpoint);
+    }
   }
 }
 
diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp 
b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index 2b5e7c70..57268f35 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -72,7 +72,7 @@ void 
ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> process_q
     return;
   }
 
-  for (auto message : messages) {
+  for (const auto& message : messages) {
     auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), 
process_queue, message);
     pool_->submit([consume_task]() { consume_task->process(); });
   }
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 7ea7d4f6..b7dd82e8 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -56,7 +56,7 @@ void ProducerImpl::start() {
 
   State expecting = State::STARTING;
   if (!state_.compare_exchange_strong(expecting, State::STARTED)) {
-    SPDLOG_ERROR("Start with unexpected state. Expecting: {}, Actual: {}", 
State::STARTING,
+    SPDLOG_ERROR("Producer started with an unexpected state. Expecting: {}, 
Actual: {}", State::STARTING,
                  state_.load(std::memory_order_relaxed));
     return;
   }
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 68bb9f67..acc08177 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -65,7 +65,6 @@ void PushConsumerImpl::start() {
   if (!message_listener_) {
     SPDLOG_ERROR("Required message listener is missing");
     abort();
-    return;
   }
 
   client_config_.subscriber.group.set_resource_namespace(resourceNamespace());
diff --git a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h 
b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
index f2f54bbd..ee0e78b4 100644
--- a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
+++ b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -43,8 +43,8 @@ public:
    * Make it noncopyable.
    */
   ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl &other) = delete;
-  ConsumeMessageServiceImpl &
-  operator=(const ConsumeMessageServiceImpl &other) = delete;
+
+  ConsumeMessageServiceImpl &operator=(const ConsumeMessageServiceImpl &other) 
= delete;
 
   void start() override;
 

Reply via email to