This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new a363f417 Fix crash issue of TelemetryBidiReactor and MetricBidiReactor (#809) a363f417 is described below commit a363f417c76cd3ae6777e433b64456f30d30b5b1 Author: Wang Chuan <0902h...@gmail.com> AuthorDate: Tue Aug 6 17:44:51 2024 +0800 Fix crash issue of TelemetryBidiReactor and MetricBidiReactor (#809) * Fix crash issue of TelemetryBidiReactor and MetricBidiReactor * Update TelemetryBidiReactor.cpp * Update TelemetryBidiReactor.h * fix: remove macos-11 as there is no runner of this type available Signed-off-by: Zhanhui Li <lizhan...@gmail.com> --------- Signed-off-by: Zhanhui Li <lizhan...@gmail.com> Co-authored-by: Zhanhui Li <lizhan...@gmail.com> --- .github/workflows/cpp_build.yml | 3 +- cpp/source/client/SessionImpl.cpp | 4 +- cpp/source/client/TelemetryBidiReactor.cpp | 330 +++++------------------ cpp/source/client/include/TelemetryBidiReactor.h | 52 +--- cpp/source/stats/MetricBidiReactor.cpp | 6 +- cpp/source/stats/OpencensusExporter.cpp | 18 +- cpp/source/stats/include/MetricBidiReactor.h | 4 +- 7 files changed, 104 insertions(+), 313 deletions(-) diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml index 785995bd..7973881a 100644 --- a/.github/workflows/cpp_build.yml +++ b/.github/workflows/cpp_build.yml @@ -9,8 +9,9 @@ jobs: fail-fast: false matrix: # Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved + # Remove macos-11 since there is no such runner available # os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022] - os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019] + os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019] steps: - uses: actions/checkout@v2 - name: Compile On Linux diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp index 36416829..b3f8b73b 100644 --- a/cpp/source/client/SessionImpl.cpp +++ b/cpp/source/client/SessionImpl.cpp @@ -29,7 +29,7 @@ SessionImpl::SessionImpl(std::weak_ptr<Client> client, std::shared_ptr<RpcClient } bool SessionImpl::await() { - return telemetry_->await(); + return telemetry_->awaitApplyingSettings(); } void SessionImpl::syncSettings() { @@ -41,7 +41,7 @@ void SessionImpl::syncSettings() { } SessionImpl::~SessionImpl() { - telemetry_->fireClose(); + telemetry_->close(); SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); } diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index d75ac361..e0a83a28 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -37,87 +37,50 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, std::string peer_address) : client_(client), peer_address_(std::move(peer_address)), - read_state_(StreamState::Created), - write_state_(StreamState::Created) { + state_(StreamState::Ready) { auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); + sync_settings_future_ = sync_settings_promise_.get_future(); Metadata metadata; Signature::sign(ptr->config(), metadata); for (const auto& entry : metadata) { context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); - write_state_ = StreamState::Ready; - // Increase hold for write stream. + StartRead(&read_); + // for read stream AddHold(); StartCall(); } TelemetryBidiReactor::~TelemetryBidiReactor() { - SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, WriteStreamState={}", peer_address_, - static_cast<std::uint8_t>(read_state_), static_cast<std::uint8_t>(read_state_)); + SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, static_cast<std::uint8_t>(state_)); } -bool TelemetryBidiReactor::await() { - absl::MutexLock lk(&state_mtx_); - if (StreamState::Created != write_state_) { - return true; - } - - state_cv_.Wait(&state_mtx_); - return StreamState::Error != write_state_; +bool TelemetryBidiReactor::awaitApplyingSettings() { + sync_settings_future_.get(); + return true; } void TelemetryBidiReactor::OnWriteDone(bool ok) { SPDLOG_DEBUG("{}#OnWriteDone", peer_address_); - if (!ok) { - RemoveHold(); - { - absl::MutexLock lk(&state_mtx_); - SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); - write_state_ = StreamState::Error; - - // Sync read state. - switch (read_state_) { - case StreamState::Created: - case StreamState::Ready: { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closed)); - read_state_ = StreamState::Closed; - break; - } - case StreamState::Inflight: { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closing)); - read_state_ = StreamState::Closing; - break; - } - case StreamState::Closing: - case StreamState::Error: - case StreamState::Closed: { - break; - } - } + // for write stream + RemoveHold(); - state_cv_.SignalAll(); - } + if (!ok) { + SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); + signalClose(); return; - } else { - absl::MutexLock lk(&state_mtx_); - if (StreamState::Inflight == write_state_) { - write_state_ = StreamState::Ready; - } } - // Check if the read stream has started. - fireRead(); - // Remove the command that has been written to server. { absl::MutexLock lk(&writes_mtx_); - writes_.pop_front(); + if (!writes_.empty()) { + writes_.pop_front(); + } } tryWriteNext(); @@ -125,55 +88,26 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) { void TelemetryBidiReactor::OnReadDone(bool ok) { SPDLOG_DEBUG("{}#OnReadDone", peer_address_); + if (!ok) { + // for read stream + RemoveHold(); + SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); + signalClose(); + return; + } + { absl::MutexLock lk(&state_mtx_); - if (!ok) { - // Remove read hold. - RemoveHold(); - { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Error)); - read_state_ = StreamState::Error; - SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); - - // Sync write state - switch (write_state_) { - case StreamState::Created: { - // Not reachable - break; - } - case StreamState::Ready: { - write_state_ = StreamState::Closed; - // There is no inflight write request, remove write hold on its behalf. - RemoveHold(); - state_cv_.SignalAll(); - break; - } - case StreamState::Inflight: { - write_state_ = StreamState::Closing; - break; - } - case StreamState::Closing: - case StreamState::Error: - case StreamState::Closed: { - break; - } - } - } - return; - } else if (StreamState::Closing == read_state_) { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closed)); - read_state_ = StreamState::Closed; - state_cv_.SignalAll(); + if (StreamState::Ready != state_) { return; } } SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); - auto ptr = client_.lock(); - if (!ptr) { + auto client = client_.lock(); + if (!client) { SPDLOG_INFO("Client for {} has destructed", peer_address_); + signalClose(); return; } @@ -182,14 +116,10 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { auto settings = read_.settings(); SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); applySettings(settings); + sync_settings_promise_.set_value(true); break; } case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { - auto client = client_.lock(); - if (!client) { - fireClose(); - return; - } SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString()); auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message()); auto raw = const_cast<Message*>(message.get()); @@ -209,19 +139,13 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { } case rmq::TelemetryCommand::kVerifyMessageCommand: { - auto client = client_.lock(); - if (!client) { - fireClose(); - return; - } - std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this()); auto cb = [ptr](TelemetryCommand command) { auto reactor = ptr.lock(); if (!reactor) { return; } - reactor->onVerifyMessageResult(std::move(command)); + reactor->write(std::move(command)); }; auto message = client->manager()->wrapMessage(read_.verify_message_command().message()); auto raw = const_cast<Message*>(message.get()); @@ -239,14 +163,9 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { { absl::MutexLock lk(&state_mtx_); - if (StreamState::Inflight == read_state_) { - SPDLOG_DEBUG("Spawn new read op, read-state={}", static_cast<std::uint8_t>(read_state_)); + if (StreamState::Ready == state_) { + SPDLOG_DEBUG("Spawn new read op, state={}", static_cast<std::uint8_t>(state_)); StartRead(&read_); - } else if (read_state_ == StreamState::Closing) { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closed)); - read_state_ = StreamState::Closed; - state_cv_.SignalAll(); } } } @@ -341,36 +260,13 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings client->config().subscriber.receive_batch_size = settings.subscription().receive_batch_size(); } -void TelemetryBidiReactor::fireRead() { - absl::MutexLock lk(&state_mtx_); - if (StreamState::Created != read_state_) { - SPDLOG_DEBUG("Further read from {} is not allowded due to stream-state={}", peer_address_, - static_cast<std::uint8_t>(read_state_)); - return; - } - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Ready)); - read_state_ = StreamState::Ready; - AddHold(); - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Inflight)); - read_state_ = StreamState::Inflight; - StartRead(&read_); -} - void TelemetryBidiReactor::write(TelemetryCommand command) { SPDLOG_DEBUG("{}#write", peer_address_); { absl::MutexLock lk(&state_mtx_); // Reject incoming write commands if the stream state is closing or has witnessed some error. - switch (write_state_) { - case StreamState::Closing: - case StreamState::Error: - case StreamState::Closed: - return; - default: - // no-op - break; + if (StreamState::Ready != state_) { + return; } } @@ -383,134 +279,57 @@ void TelemetryBidiReactor::write(TelemetryCommand command) { void TelemetryBidiReactor::tryWriteNext() { SPDLOG_DEBUG("{}#tryWriteNext", peer_address_); - { - absl::MutexLock lk(&state_mtx_); - if (StreamState::Error == write_state_ || StreamState::Closed == write_state_) { - SPDLOG_WARN("Further write to {} is not allowded due to stream-state={}", peer_address_, - static_cast<std::uint8_t>(write_state_)); - return; - } + absl::MutexLock lk(&writes_mtx_); + if (StreamState::Ready != state_) { + SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(state_)); + return; + } + if (writes_.empty()) { + SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_); + return; } - { - absl::MutexLock lk(&writes_mtx_); - if (writes_.empty() && StreamState::Closing != write_state_) { - SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_); - return; - } - - if (StreamState::Ready == write_state_) { - write_state_ = StreamState::Inflight; - } - - if (writes_.empty()) { - // Tell server there is no more write requests. - StartWritesDone(); - } else { - SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); - StartWrite(&(writes_.front())); - } + if (!writes_.empty()) { + SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); + AddHold(); + StartWrite(&(writes_.front())); } } -void TelemetryBidiReactor::fireClose() { +void TelemetryBidiReactor::signalClose() { + absl::MutexLock lk(&state_mtx_); + state_ = StreamState::Closing; +} + +void TelemetryBidiReactor::close() { SPDLOG_INFO("{}#fireClose", peer_address_); { - // Acquire state lock absl::MutexLock lk(&state_mtx_); - - // Transition read state - switch (read_state_) { - case StreamState::Created: - case StreamState::Ready: { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closed)); - read_state_ = StreamState::Closed; - state_cv_.SignalAll(); - break; - } - - case StreamState::Inflight: { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closing)); - read_state_ = StreamState::Closing; - break; - } - case StreamState::Closing: { - break; - } - case StreamState::Closed: - case StreamState::Error: { - state_cv_.SignalAll(); - break; - } - } - - // Transition write state - switch (write_state_) { - case StreamState::Created: - case StreamState::Ready: - case StreamState::Inflight: { - SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closing)); - write_state_ = StreamState::Closing; - break; - } - case StreamState::Closing: { - break; - } - case StreamState::Closed: - case StreamState::Error: { - state_cv_.SignalAll(); - break; - } + if (state_ == StreamState::Ready) { + state_ = StreamState::Closing; } } - if (StreamState::Closing == write_state_) { - tryWriteNext(); + { + absl::MutexLock lk(&writes_mtx_); + writes_.clear(); } + context_.TryCancel(); { // Acquire state lock absl::MutexLock lk(&state_mtx_); - while ((StreamState::Closed != read_state_ && StreamState::Error != read_state_) || - (StreamState::Closed != write_state_ && StreamState::Error != write_state_)) { + while (StreamState::Closed != state_) { if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { - SPDLOG_WARN("StreamState CondVar timed out before getting signalled: read-state={}, write-state={}", - static_cast<uint8_t>(read_state_), static_cast<uint8_t>(write_state_)); + SPDLOG_WARN("StreamState CondVar timed out before getting signalled: state={}", + static_cast<uint8_t>(state_)); } } } } -void TelemetryBidiReactor::OnWritesDoneDone(bool ok) { - SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); - assert(StreamState::Closing == write_state_); - - absl::MutexLock lk(&state_mtx_); - // Remove the hold for the write stream. - RemoveHold(); - - if (!ok) { - write_state_ = StreamState::Error; - SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_); - } else { - write_state_ = StreamState::Closed; - SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); - } - state_cv_.SignalAll(); -} - -void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { - { - absl::MutexLock lk(&writes_mtx_); - writes_.emplace_back(command); - } - tryWriteNext(); -} - /// Notifies the application that all operations associated with this RPC /// have completed and all Holds have been removed. OnDone provides the RPC /// status outcome for both successful and failed RPCs and will be called in @@ -524,20 +343,9 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_, status.error_code(), status.error_message(), status.error_details()); } - { - SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_); absl::MutexLock lk(&state_mtx_); - if (StreamState::Error != read_state_) { - SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closed)); - read_state_ = StreamState::Closed; - } - if (StreamState::Error != write_state_) { - SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Closed)); - write_state_ = StreamState::Closed; - } + state_ = StreamState::Closed; state_cv_.SignalAll(); } @@ -555,12 +363,14 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_); if (!ok) { - absl::MutexLock lk(&state_mtx_); - SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), - static_cast<std::uint8_t>(StreamState::Error)); - read_state_ = StreamState::Error; - state_cv_.SignalAll(); + // for read stream + // Remove the hold corresponding to AddHold in TelemetryBidiReactor::TelemetryBidiReactor. + RemoveHold(); + + SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_), + static_cast<std::uint8_t>(StreamState::Closing)); SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); + signalClose(); return; } diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index 3bdbe3d3..4bd58f96 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -19,6 +19,7 @@ #include <atomic> #include <chrono> #include <cstdint> +#include <future> #include <list> #include <memory> #include <utility> @@ -34,29 +35,17 @@ ROCKETMQ_NAMESPACE_BEGIN enum class StreamState : std::uint8_t { - Created = 0, - Ready = 1, - Inflight = 2, - Closing = 3, - Closed = 4, - Error = 5, + Ready = 0, + Closing = 1, + Closed = 2, }; -/// write-stream-state: created --> ready --> inflight --> ready --> ... -/// --> error -/// --> closing --> closed -/// --> closing --> closed -/// --> error +/// stream-state: ready --> closing --> closed /// -/// -/// read-stream-state: created --> ready --> inflight --> inflight -/// --> closing --> closed -/// --> error -/// --> closed /// requirement: -/// 1, fireClose --> blocking await till bidireactor is closed; +/// 1, close --> blocking wait till bidireactor is closed; /// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from -/// server 3, after writing the first Settings telemetry command, launch the read directional stream +/// server /// class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>, public std::enable_shared_from_this<TelemetryBidiReactor> { @@ -97,21 +86,12 @@ public: /// will succeed, and any further Start* should not be called. void OnWriteDone(bool ok) override; - /// Notifies the application that a StartWritesDone operation completed. Note - /// that this is only used on explicit StartWritesDone operations and not for - /// those that are implicitly invoked as part of a StartWriteLast. - /// - /// \param[in] ok Was it successful? If false, the application will later see - /// the failure reflected as a bad status in OnDone and no - /// further Start* should be called. - void OnWritesDoneDone(bool ok) override; - /// Core API method to initiate this bidirectional stream. void write(TelemetryCommand command); - bool await(); + bool awaitApplyingSettings(); - void fireClose(); + void close(); private: grpc::ClientContext context_; @@ -140,14 +120,12 @@ private: */ std::string peer_address_; - StreamState read_state_ GUARDED_BY(state_mtx_); - StreamState write_state_ GUARDED_BY(state_mtx_); + StreamState state_ GUARDED_BY(state_mtx_); absl::Mutex state_mtx_; absl::CondVar state_cv_; - void changeStreamStateThenNotify(StreamState state); - - void onVerifyMessageResult(TelemetryCommand command); + std::promise<bool> sync_settings_promise_; + std::future<bool> sync_settings_future_; void applySettings(const rmq::Settings& settings); @@ -157,13 +135,9 @@ private: void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client); - /// Start the read stream. - /// - /// Once got the OnReadDone and status is OK, call StartRead immediately. - void fireRead(); - /// Attempt to write pending telemetry command to server. void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_); + void signalClose(); }; ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/stats/MetricBidiReactor.cpp b/cpp/source/stats/MetricBidiReactor.cpp index e6921378..e03e7c61 100644 --- a/cpp/source/stats/MetricBidiReactor.cpp +++ b/cpp/source/stats/MetricBidiReactor.cpp @@ -25,12 +25,10 @@ ROCKETMQ_NAMESPACE_BEGIN -MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter) +MetricBidiReactor::MetricBidiReactor(std::shared_ptr<Client> client, std::shared_ptr<OpencensusExporter> exporter) : client_(client), exporter_(exporter) { - auto ptr = client_.lock(); - Metadata metadata; - Signature::sign(ptr->config(), metadata); + Signature::sign(client->config(), metadata); for (const auto& entry : metadata) { context_.AddMetadata(entry.first, entry.second); diff --git a/cpp/source/stats/OpencensusExporter.cpp b/cpp/source/stats/OpencensusExporter.cpp index effe5120..2c4c187d 100644 --- a/cpp/source/stats/OpencensusExporter.cpp +++ b/cpp/source/stats/OpencensusExporter.cpp @@ -167,12 +167,16 @@ void OpencensusExporter::ExportViewData( const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) { opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request; wrap(data, request); - std::weak_ptr<OpencensusExporter> exporter{shared_from_this()}; if (!bidi_reactor_) { - bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter); + auto ptr = client_.lock(); + if (ptr) { + bidi_reactor_ = absl::make_unique<MetricBidiReactor>(ptr, shared_from_this()); + } else { + SPDLOG_INFO("did not create stream since the client is no longer available."); + } } - if (request.metrics_size()) { + if (request.metrics_size() && bidi_reactor_) { SPDLOG_DEBUG("ExportMetricRequest: {}", request.DebugString()); bidi_reactor_->write(request); } else { @@ -181,8 +185,12 @@ void OpencensusExporter::ExportViewData( } void OpencensusExporter::resetStream() { - std::weak_ptr<OpencensusExporter> exporter{shared_from_this()}; - bidi_reactor_.reset(new MetricBidiReactor(client_, exporter)); + auto ptr = client_.lock(); + if (ptr) { + bidi_reactor_.reset(new MetricBidiReactor(ptr, shared_from_this())); + } else { + SPDLOG_INFO("did not reset stream since the client is no longer available."); + } } ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/stats/include/MetricBidiReactor.h b/cpp/source/stats/include/MetricBidiReactor.h index e4d75344..ae49f6e8 100644 --- a/cpp/source/stats/include/MetricBidiReactor.h +++ b/cpp/source/stats/include/MetricBidiReactor.h @@ -36,8 +36,8 @@ class MetricBidiReactor : public grpc::ClientBidiReactor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> { public: - MetricBidiReactor(std::weak_ptr<Client> client, - std::weak_ptr<OpencensusExporter> exporter); + MetricBidiReactor(std::shared_ptr<Client> client, + std::shared_ptr<OpencensusExporter> exporter); /// Notifies the application that a StartRead operation completed. ///