This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a363f417 Fix crash issue of TelemetryBidiReactor and MetricBidiReactor 
(#809)
a363f417 is described below

commit a363f417c76cd3ae6777e433b64456f30d30b5b1
Author: Wang Chuan <0902h...@gmail.com>
AuthorDate: Tue Aug 6 17:44:51 2024 +0800

    Fix crash issue of TelemetryBidiReactor and MetricBidiReactor (#809)
    
    * Fix crash issue of TelemetryBidiReactor and MetricBidiReactor
    
    * Update TelemetryBidiReactor.cpp
    
    * Update TelemetryBidiReactor.h
    
    * fix: remove macos-11 as there is no runner of this type available
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    Co-authored-by: Zhanhui Li <lizhan...@gmail.com>
---
 .github/workflows/cpp_build.yml                  |   3 +-
 cpp/source/client/SessionImpl.cpp                |   4 +-
 cpp/source/client/TelemetryBidiReactor.cpp       | 330 +++++------------------
 cpp/source/client/include/TelemetryBidiReactor.h |  52 +---
 cpp/source/stats/MetricBidiReactor.cpp           |   6 +-
 cpp/source/stats/OpencensusExporter.cpp          |  18 +-
 cpp/source/stats/include/MetricBidiReactor.h     |   4 +-
 7 files changed, 104 insertions(+), 313 deletions(-)

diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 785995bd..7973881a 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -9,8 +9,9 @@ jobs:
       fail-fast: false
       matrix:
         # Disable VS 2022 before 
https://github.com/bazelbuild/bazel/issues/18592 issue is solved
+        # Remove macos-11 since there is no such runner available
         # os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, 
windows-2022]
-        os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019]
+        os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
     steps:
       - uses: actions/checkout@v2
       - name: Compile On Linux
diff --git a/cpp/source/client/SessionImpl.cpp 
b/cpp/source/client/SessionImpl.cpp
index 36416829..b3f8b73b 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -29,7 +29,7 @@ SessionImpl::SessionImpl(std::weak_ptr<Client> client, 
std::shared_ptr<RpcClient
 }
 
 bool SessionImpl::await() {
-  return telemetry_->await();
+  return telemetry_->awaitApplyingSettings();
 }
 
 void SessionImpl::syncSettings() {
@@ -41,7 +41,7 @@ void SessionImpl::syncSettings() {
 }
 
 SessionImpl::~SessionImpl() {
-  telemetry_->fireClose();
+  telemetry_->close();
   SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
 }
 
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index d75ac361..e0a83a28 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -37,87 +37,50 @@ 
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
                                            std::string peer_address)
     : client_(client),
       peer_address_(std::move(peer_address)),
-      read_state_(StreamState::Created),
-      write_state_(StreamState::Created) {
+      state_(StreamState::Ready) {
   auto ptr = client_.lock();
   auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
   context_.set_deadline(deadline);
+  sync_settings_future_ = sync_settings_promise_.get_future();
   Metadata metadata;
   Signature::sign(ptr->config(), metadata);
   for (const auto& entry : metadata) {
     context_.AddMetadata(entry.first, entry.second);
   }
   stub->async()->Telemetry(&context_, this);
-  write_state_ = StreamState::Ready;
-  // Increase hold for write stream.
+  StartRead(&read_);
+  // for read stream
   AddHold();
   StartCall();
 }
 
 TelemetryBidiReactor::~TelemetryBidiReactor() {
-  SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, 
WriteStreamState={}", peer_address_,
-              static_cast<std::uint8_t>(read_state_), 
static_cast<std::uint8_t>(read_state_));
+  SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", 
peer_address_, static_cast<std::uint8_t>(state_));
 }
 
-bool TelemetryBidiReactor::await() {
-  absl::MutexLock lk(&state_mtx_);
-  if (StreamState::Created != write_state_) {
-    return true;
-  }
-
-  state_cv_.Wait(&state_mtx_);
-  return StreamState::Error != write_state_;
+bool TelemetryBidiReactor::awaitApplyingSettings() {
+  sync_settings_future_.get();
+  return true;
 }
 
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
   SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
 
-  if (!ok) {
-    RemoveHold();
-    {
-      absl::MutexLock lk(&state_mtx_);
-      SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().DebugString(), peer_address_);
-      write_state_ = StreamState::Error;
-
-      // Sync read state.
-      switch (read_state_) {
-        case StreamState::Created:
-        case StreamState::Ready: {
-          SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                       static_cast<std::uint8_t>(StreamState::Closed));
-          read_state_ = StreamState::Closed;
-          break;
-        }
-        case StreamState::Inflight: {
-          SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                       static_cast<std::uint8_t>(StreamState::Closing));
-          read_state_ = StreamState::Closing;
-          break;
-        }
-        case StreamState::Closing:
-        case StreamState::Error:
-        case StreamState::Closed: {
-          break;
-        }
-      }
+  // for write stream
+  RemoveHold();
 
-      state_cv_.SignalAll();
-    }
+  if (!ok) {
+    SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().DebugString(), peer_address_);
+    signalClose();
     return;
-  } else {
-    absl::MutexLock lk(&state_mtx_);
-    if (StreamState::Inflight == write_state_) {
-      write_state_ = StreamState::Ready;
-    }
   }
 
-  // Check if the read stream has started.
-  fireRead();
-
   // Remove the command that has been written to server.
   {
     absl::MutexLock lk(&writes_mtx_);
-    writes_.pop_front();
+    if (!writes_.empty()) {
+      writes_.pop_front();
+    }
   }
 
   tryWriteNext();
@@ -125,55 +88,26 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
 
 void TelemetryBidiReactor::OnReadDone(bool ok) {
   SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
+  if (!ok) {
+    // for read stream
+    RemoveHold();
+    SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
+    signalClose();
+    return;
+  }
+
   {
     absl::MutexLock lk(&state_mtx_);
-    if (!ok) {
-      // Remove read hold.
-      RemoveHold();
-      {
-        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                     static_cast<std::uint8_t>(StreamState::Error));
-        read_state_ = StreamState::Error;
-        SPDLOG_WARN("Failed to read from telemetry stream from {}", 
peer_address_);
-
-        // Sync write state
-        switch (write_state_) {
-          case StreamState::Created: {
-            // Not reachable
-            break;
-          }
-          case StreamState::Ready: {
-            write_state_ = StreamState::Closed;
-            // There is no inflight write request, remove write hold on its 
behalf.
-            RemoveHold();
-            state_cv_.SignalAll();
-            break;
-          }
-          case StreamState::Inflight: {
-            write_state_ = StreamState::Closing;
-            break;
-          }
-          case StreamState::Closing:
-          case StreamState::Error:
-          case StreamState::Closed: {
-            break;
-          }
-        }
-      }
-      return;
-    } else if (StreamState::Closing == read_state_) {
-      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                   static_cast<std::uint8_t>(StreamState::Closed));
-      read_state_ = StreamState::Closed;
-      state_cv_.SignalAll();
+    if (StreamState::Ready != state_) {
       return;
     }
   }
 
   SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, 
read_.DebugString());
-  auto ptr = client_.lock();
-  if (!ptr) {
+  auto client = client_.lock();
+  if (!client) {
     SPDLOG_INFO("Client for {} has destructed", peer_address_);
+    signalClose();
     return;
   }
 
@@ -182,14 +116,10 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       auto settings = read_.settings();
       SPDLOG_INFO("Received settings from {}: {}", peer_address_, 
settings.DebugString());
       applySettings(settings);
+      sync_settings_promise_.set_value(true);
       break;
     }
     case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
-      auto client = client_.lock();
-      if (!client) {
-        fireClose();
-        return;
-      }
       SPDLOG_DEBUG("Receive orphan transaction command: {}", 
read_.DebugString());
       auto message = 
client->manager()->wrapMessage(read_.release_verify_message_command()->message());
       auto raw = const_cast<Message*>(message.get());
@@ -209,19 +139,13 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
     }
 
     case rmq::TelemetryCommand::kVerifyMessageCommand: {
-      auto client = client_.lock();
-      if (!client) {
-        fireClose();
-        return;
-      }
-
       std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
       auto cb = [ptr](TelemetryCommand command) {
         auto reactor = ptr.lock();
         if (!reactor) {
           return;
         }
-        reactor->onVerifyMessageResult(std::move(command));
+        reactor->write(std::move(command));
       };
       auto message = 
client->manager()->wrapMessage(read_.verify_message_command().message());
       auto raw = const_cast<Message*>(message.get());
@@ -239,14 +163,9 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
 
   {
     absl::MutexLock lk(&state_mtx_);
-    if (StreamState::Inflight == read_state_) {
-      SPDLOG_DEBUG("Spawn new read op, read-state={}", 
static_cast<std::uint8_t>(read_state_));
+    if (StreamState::Ready == state_) {
+      SPDLOG_DEBUG("Spawn new read op, state={}", 
static_cast<std::uint8_t>(state_));
       StartRead(&read_);
-    } else if (read_state_ == StreamState::Closing) {
-      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                   static_cast<std::uint8_t>(StreamState::Closed));
-      read_state_ = StreamState::Closed;
-      state_cv_.SignalAll();
     }
   }
 }
@@ -341,36 +260,13 @@ void TelemetryBidiReactor::applySubscriptionConfig(const 
rmq::Settings& settings
   client->config().subscriber.receive_batch_size = 
settings.subscription().receive_batch_size();
 }
 
-void TelemetryBidiReactor::fireRead() {
-  absl::MutexLock lk(&state_mtx_);
-  if (StreamState::Created != read_state_) {
-    SPDLOG_DEBUG("Further read from {} is not allowded due to 
stream-state={}", peer_address_,
-                 static_cast<std::uint8_t>(read_state_));
-    return;
-  }
-  SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-               static_cast<std::uint8_t>(StreamState::Ready));
-  read_state_ = StreamState::Ready;
-  AddHold();
-  SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-               static_cast<std::uint8_t>(StreamState::Inflight));
-  read_state_ = StreamState::Inflight;
-  StartRead(&read_);
-}
-
 void TelemetryBidiReactor::write(TelemetryCommand command) {
   SPDLOG_DEBUG("{}#write", peer_address_);
   {
     absl::MutexLock lk(&state_mtx_);
     // Reject incoming write commands if the stream state is closing or has 
witnessed some error.
-    switch (write_state_) {
-      case StreamState::Closing:
-      case StreamState::Error:
-      case StreamState::Closed:
-        return;
-      default:
-        // no-op
-        break;
+    if (StreamState::Ready != state_) {
+      return;
     }
   }
 
@@ -383,134 +279,57 @@ void TelemetryBidiReactor::write(TelemetryCommand 
command) {
 
 void TelemetryBidiReactor::tryWriteNext() {
   SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
-  {
-    absl::MutexLock lk(&state_mtx_);
-    if (StreamState::Error == write_state_ || StreamState::Closed == 
write_state_) {
-      SPDLOG_WARN("Further write to {} is not allowded due to 
stream-state={}", peer_address_,
-                  static_cast<std::uint8_t>(write_state_));
-      return;
-    }
+  absl::MutexLock lk(&writes_mtx_);
+  if (StreamState::Ready != state_) {
+    SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}", 
peer_address_,
+                static_cast<std::uint8_t>(state_));
+    return;
+  }
+  if (writes_.empty()) {
+    SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_);
+    return;
   }
 
-  {
-    absl::MutexLock lk(&writes_mtx_);
-    if (writes_.empty() && StreamState::Closing != write_state_) {
-      SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
-      return;
-    }
-
-    if (StreamState::Ready == write_state_) {
-      write_state_ = StreamState::Inflight;
-    }
-
-    if (writes_.empty()) {
-      // Tell server there is no more write requests.
-      StartWritesDone();
-    } else {
-      SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
-      StartWrite(&(writes_.front()));
-    }
+  if (!writes_.empty()) {
+    SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
+    AddHold();
+    StartWrite(&(writes_.front()));
   }
 }
 
-void TelemetryBidiReactor::fireClose() {
+void TelemetryBidiReactor::signalClose() {
+  absl::MutexLock lk(&state_mtx_);
+  state_ = StreamState::Closing;
+}
+
+void TelemetryBidiReactor::close() {
   SPDLOG_INFO("{}#fireClose", peer_address_);
 
   {
-    // Acquire state lock
     absl::MutexLock lk(&state_mtx_);
-
-    // Transition read state
-    switch (read_state_) {
-      case StreamState::Created:
-      case StreamState::Ready: {
-        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                     static_cast<std::uint8_t>(StreamState::Closed));
-        read_state_ = StreamState::Closed;
-        state_cv_.SignalAll();
-        break;
-      }
-
-      case StreamState::Inflight: {
-        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                     static_cast<std::uint8_t>(StreamState::Closing));
-        read_state_ = StreamState::Closing;
-        break;
-      }
-      case StreamState::Closing: {
-        break;
-      }
-      case StreamState::Closed:
-      case StreamState::Error: {
-        state_cv_.SignalAll();
-        break;
-      }
-    }
-
-    // Transition write state
-    switch (write_state_) {
-      case StreamState::Created:
-      case StreamState::Ready:
-      case StreamState::Inflight: {
-        SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                     static_cast<std::uint8_t>(StreamState::Closing));
-        write_state_ = StreamState::Closing;
-        break;
-      }
-      case StreamState::Closing: {
-        break;
-      }
-      case StreamState::Closed:
-      case StreamState::Error: {
-        state_cv_.SignalAll();
-        break;
-      }
+    if (state_ == StreamState::Ready) {
+      state_ = StreamState::Closing;
     }
   }
 
-  if (StreamState::Closing == write_state_) {
-    tryWriteNext();
+  {
+    absl::MutexLock lk(&writes_mtx_);
+    writes_.clear();
   }
+  context_.TryCancel();
 
   {
     // Acquire state lock
     absl::MutexLock lk(&state_mtx_);
-    while ((StreamState::Closed != read_state_ && StreamState::Error != 
read_state_) ||
-           (StreamState::Closed != write_state_ && StreamState::Error != 
write_state_)) {
+    while (StreamState::Closed != state_) {
       if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
-        SPDLOG_WARN("StreamState CondVar timed out before getting signalled: 
read-state={}, write-state={}",
-                    static_cast<uint8_t>(read_state_), 
static_cast<uint8_t>(write_state_));
+        SPDLOG_WARN("StreamState CondVar timed out before getting signalled: 
state={}",
+                    static_cast<uint8_t>(state_));
       }
     }
   }
 }
 
-void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
-  SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
-  assert(StreamState::Closing == write_state_);
-
-  absl::MutexLock lk(&state_mtx_);
-  // Remove the hold for the write stream.
-  RemoveHold();
-
-  if (!ok) {
-    write_state_ = StreamState::Error;
-    SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
-  } else {
-    write_state_ = StreamState::Closed;
-    SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
-  }
-  state_cv_.SignalAll();
-}
-
-void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
-  {
-    absl::MutexLock lk(&writes_mtx_);
-    writes_.emplace_back(command);
-  }
-  tryWriteNext();
-}
-
 /// Notifies the application that all operations associated with this RPC
 /// have completed and all Holds have been removed. OnDone provides the RPC
 /// status outcome for both successful and failed RPCs and will be called in
@@ -524,20 +343,9 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
     SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, 
status.error_details={}", peer_address_,
                 status.error_code(), status.error_message(), 
status.error_details());
   }
-
   {
-    SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
     absl::MutexLock lk(&state_mtx_);
-    if (StreamState::Error != read_state_) {
-      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                   static_cast<std::uint8_t>(StreamState::Closed));
-      read_state_ = StreamState::Closed;
-    }
-    if (StreamState::Error != write_state_) {
-      SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                   static_cast<std::uint8_t>(StreamState::Closed));
-      write_state_ = StreamState::Closed;
-    }
+    state_ = StreamState::Closed;
     state_cv_.SignalAll();
   }
 
@@ -555,12 +363,14 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool 
ok) {
   SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
 
   if (!ok) {
-    absl::MutexLock lk(&state_mtx_);
-    SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
-                 static_cast<std::uint8_t>(StreamState::Error));
-    read_state_ = StreamState::Error;
-    state_cv_.SignalAll();
+    // for read stream
+    // Remove the hold corresponding to AddHold in 
TelemetryBidiReactor::TelemetryBidiReactor.
+    RemoveHold();
+
+    SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_),
+                 static_cast<std::uint8_t>(StreamState::Closing));
     SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
+    signalClose();
     return;
   }
 
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 3bdbe3d3..4bd58f96 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <chrono>
 #include <cstdint>
+#include <future>
 #include <list>
 #include <memory>
 #include <utility>
@@ -34,29 +35,17 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 enum class StreamState : std::uint8_t
 {
-  Created = 0,
-  Ready = 1,
-  Inflight = 2,
-  Closing = 3,
-  Closed = 4,
-  Error = 5,
+  Ready = 0,
+  Closing = 1,
+  Closed = 2,
 };
 
-/// write-stream-state: created --> ready --> inflight --> ready --> ...
-///                                                    --> error
-///                                                    --> closing --> closed
-///                                       --> closing  --> closed
-///                                                    --> error
+/// stream-state: ready --> closing --> closed
 ///
-///
-/// read-stream-state: created --> ready --> inflight --> inflight
-///                                                   --> closing --> closed
-///                                                   --> error
-///                                      --> closed
 /// requirement:
-///    1, fireClose --> blocking await till bidireactor is closed;
+///    1, close --> blocking wait till bidireactor is closed;
 ///    2, when session is closed and client is still active, recreate a new 
session to accept incoming commands from
-///    server 3, after writing the first Settings telemetry command, launch 
the read directional stream
+///    server
 ///
 class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
                              public 
std::enable_shared_from_this<TelemetryBidiReactor> {
@@ -97,21 +86,12 @@ public:
   ///               will succeed, and any further Start* should not be called.
   void OnWriteDone(bool ok) override;
 
-  /// Notifies the application that a StartWritesDone operation completed. Note
-  /// that this is only used on explicit StartWritesDone operations and not for
-  /// those that are implicitly invoked as part of a StartWriteLast.
-  ///
-  /// \param[in] ok Was it successful? If false, the application will later see
-  ///               the failure reflected as a bad status in OnDone and no
-  ///               further Start* should be called.
-  void OnWritesDoneDone(bool ok) override;
-
   /// Core API method to initiate this bidirectional stream.
   void write(TelemetryCommand command);
 
-  bool await();
+  bool awaitApplyingSettings();
 
-  void fireClose();
+  void close();
 
 private:
   grpc::ClientContext context_;
@@ -140,14 +120,12 @@ private:
    */
   std::string peer_address_;
 
-  StreamState read_state_ GUARDED_BY(state_mtx_);
-  StreamState write_state_ GUARDED_BY(state_mtx_);
+  StreamState state_ GUARDED_BY(state_mtx_);
   absl::Mutex state_mtx_;
   absl::CondVar state_cv_;
 
-  void changeStreamStateThenNotify(StreamState state);
-
-  void onVerifyMessageResult(TelemetryCommand command);
+  std::promise<bool> sync_settings_promise_;
+  std::future<bool> sync_settings_future_;
 
   void applySettings(const rmq::Settings& settings);
 
@@ -157,13 +135,9 @@ private:
 
   void applySubscriptionConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
 
-  /// Start the read stream.
-  ///
-  /// Once got the OnReadDone and status is OK, call StartRead immediately.
-  void fireRead();
-
   /// Attempt to write pending telemetry command to server.
   void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
+  void signalClose();
 };
 
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/stats/MetricBidiReactor.cpp 
b/cpp/source/stats/MetricBidiReactor.cpp
index e6921378..e03e7c61 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -25,12 +25,10 @@
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, 
std::weak_ptr<OpencensusExporter> exporter)
+MetricBidiReactor::MetricBidiReactor(std::shared_ptr<Client> client, 
std::shared_ptr<OpencensusExporter> exporter)
     : client_(client), exporter_(exporter) {
-  auto ptr = client_.lock();
-
   Metadata metadata;
-  Signature::sign(ptr->config(), metadata);
+  Signature::sign(client->config(), metadata);
 
   for (const auto& entry : metadata) {
     context_.AddMetadata(entry.first, entry.second);
diff --git a/cpp/source/stats/OpencensusExporter.cpp 
b/cpp/source/stats/OpencensusExporter.cpp
index effe5120..2c4c187d 100644
--- a/cpp/source/stats/OpencensusExporter.cpp
+++ b/cpp/source/stats/OpencensusExporter.cpp
@@ -167,12 +167,16 @@ void OpencensusExporter::ExportViewData(
     const std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>& data) {
   opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
   wrap(data, request);
-  std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
   if (!bidi_reactor_) {
-    bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter);
+    auto ptr = client_.lock();
+    if (ptr) {
+      bidi_reactor_ = absl::make_unique<MetricBidiReactor>(ptr, 
shared_from_this());
+    } else {
+      SPDLOG_INFO("did not create stream since the client is no longer 
available.");
+    }
   }
 
-  if (request.metrics_size()) {
+  if (request.metrics_size() && bidi_reactor_) {
     SPDLOG_DEBUG("ExportMetricRequest: {}", request.DebugString());
     bidi_reactor_->write(request);
   } else {
@@ -181,8 +185,12 @@ void OpencensusExporter::ExportViewData(
 }
 
 void OpencensusExporter::resetStream() {
-  std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
-  bidi_reactor_.reset(new MetricBidiReactor(client_, exporter));
+  auto ptr = client_.lock();
+  if (ptr) {
+    bidi_reactor_.reset(new MetricBidiReactor(ptr, shared_from_this()));
+  } else {
+    SPDLOG_INFO("did not reset stream since the client is no longer 
available.");
+  }
 }
 
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/stats/include/MetricBidiReactor.h 
b/cpp/source/stats/include/MetricBidiReactor.h
index e4d75344..ae49f6e8 100644
--- a/cpp/source/stats/include/MetricBidiReactor.h
+++ b/cpp/source/stats/include/MetricBidiReactor.h
@@ -36,8 +36,8 @@ class MetricBidiReactor
     : public grpc::ClientBidiReactor<ExportMetricsServiceRequest,
                                      ExportMetricsServiceResponse> {
 public:
-  MetricBidiReactor(std::weak_ptr<Client> client,
-                    std::weak_ptr<OpencensusExporter> exporter);
+  MetricBidiReactor(std::shared_ptr<Client> client,
+                    std::shared_ptr<OpencensusExporter> exporter);
 
   /// Notifies the application that a StartRead operation completed.
   ///

Reply via email to