This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop-cpp in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/develop-cpp by this push: new bb533223 feat: revamp TelemetryBidiReactor states and their transition graph bb533223 is described below commit bb533223b8c5db8990c262bb63f831470fedd480 Author: Li Zhanhui <lizhan...@gmail.com> AuthorDate: Tue Mar 26 11:59:55 2024 +0800 feat: revamp TelemetryBidiReactor states and their transition graph Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- cpp/source/client/SessionImpl.cpp | 6 +- cpp/source/client/TelemetryBidiReactor.cpp | 331 +++++++++++++++-------- cpp/source/client/include/TelemetryBidiReactor.h | 61 ++--- 3 files changed, 244 insertions(+), 154 deletions(-) diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp index 0ca3fff2..36416829 100644 --- a/cpp/source/client/SessionImpl.cpp +++ b/cpp/source/client/SessionImpl.cpp @@ -16,12 +16,14 @@ */ #include "SessionImpl.h" + #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -SessionImpl::SessionImpl(std::weak_ptr<Client> client, std::shared_ptr<RpcClient> rpc_client) : client_(client), rpc_client_(rpc_client) { +SessionImpl::SessionImpl(std::weak_ptr<Client> client, std::shared_ptr<RpcClient> rpc_client) + : client_(client), rpc_client_(rpc_client) { telemetry_ = rpc_client->asyncTelemetry(client_); syncSettings(); } @@ -39,8 +41,8 @@ void SessionImpl::syncSettings() { } SessionImpl::~SessionImpl() { - SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); telemetry_->fireClose(); + SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); } ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index d45e5058..61796437 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -35,7 +35,10 @@ ROCKETMQ_NAMESPACE_BEGIN TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address) - : client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Active) { + : client_(client), + peer_address_(std::move(peer_address)), + read_state_(StreamState::Created), + write_state_(StreamState::Created) { auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); @@ -45,59 +48,67 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); + write_state_ = StreamState::Ready; // Increase hold for write stream. AddHold(); StartCall(); } TelemetryBidiReactor::~TelemetryBidiReactor() { - SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, - static_cast<std::uint8_t>(stream_state_)); + SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, WriteStreamState={}", peer_address_, + static_cast<std::uint8_t>(read_state_), static_cast<std::uint8_t>(read_state_)); } bool TelemetryBidiReactor::await() { - absl::MutexLock lk(&server_setting_received_mtx_); - if (server_setting_received_) { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Created != write_state_) { return true; } - server_setting_received_cv_.Wait(&server_setting_received_mtx_); - return server_setting_received_; -} - -void TelemetryBidiReactor::changeStreamStateThenNotify(StreamState state) { - absl::MutexLock lk(&stream_state_mtx_); - if (state == stream_state_) { - return; - } - stream_state_ = state; - stream_state_cv_.SignalAll(); + state_cv_.Wait(&state_mtx_); + return StreamState::Error != write_state_; } void TelemetryBidiReactor::OnWriteDone(bool ok) { SPDLOG_DEBUG("{}#OnWriteDone", peer_address_); - { - bool expected = true; - if (!command_inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { - SPDLOG_WARN("Illegal command-inflight state"); - return; - } - } if (!ok) { RemoveHold(); { - absl::MutexLock lk(&writes_mtx_); + absl::MutexLock lk(&state_mtx_); SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); - } - - { - absl::MutexLock lk(&stream_state_mtx_); - if (streamStateGood()) { - stream_state_ = StreamState::WriteFailure; + write_state_ = StreamState::Error; + + // Sync read state. + switch (read_state_) { + case StreamState::Created: + case StreamState::Ready: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + break; + } + case StreamState::Inflight: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closing)); + read_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: { + break; + } } + + state_cv_.SignalAll(); } return; + } else { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Inflight == write_state_) { + write_state_ = StreamState::Ready; + } } // Check if the read stream has started. @@ -114,22 +125,49 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) { void TelemetryBidiReactor::OnReadDone(bool ok) { SPDLOG_DEBUG("{}#OnReadDone", peer_address_); - if (!ok) { - RemoveHold(); - { - absl::MutexLock lk(&stream_state_mtx_); - if (streamStateGood() && stream_state_ != StreamState::Closing) { - stream_state_ = StreamState::ReadFailure; + { + absl::MutexLock lk(&state_mtx_); + if (!ok) { + // Remove read hold. + RemoveHold(); + { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Error)); + read_state_ = StreamState::Error; SPDLOG_WARN("Faild to read from telemetry stream from {}", peer_address_); - } - bool expected = false; - if (command_inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { - // There is no inflight write request, remove write hold on its behalf. - RemoveHold(); + // Sync write state + switch (write_state_) { + case StreamState::Created: { + // Not reachable + break; + } + case StreamState::Ready: { + write_state_ = StreamState::Closed; + // There is no inflight write request, remove write hold on its behalf. + RemoveHold(); + state_cv_.SignalAll(); + break; + } + case StreamState::Inflight: { + write_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: { + break; + } + } } + return; + } else if (StreamState::Closing == read_state_) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + return; } - return; } SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); @@ -144,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { auto settings = read_.settings(); SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); applySettings(settings); - { - absl::MutexLock lk(&server_setting_received_mtx_); - if (!server_setting_received_) { - server_setting_received_ = true; - server_setting_received_cv_.SignalAll(); - } - } break; } case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { @@ -206,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { } } - StartRead(&read_); + { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Inflight == read_state_) { + SPDLOG_DEBUG("Spawn new read op, read-state={}", static_cast<std::uint8_t>(read_state_)); + StartRead(&read_); + } else if (read_state_ == StreamState::Closing) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + } + } } void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) { @@ -300,30 +342,35 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings } void TelemetryBidiReactor::fireRead() { - { - absl::MutexLock lk(&stream_state_mtx_); - if (!streamStateGood()) { - SPDLOG_WARN("Further read from {} is not allowded due to stream-state={}", peer_address_, - static_cast<std::uint8_t>(stream_state_)); - return; - } - } - - bool expected = false; - if (read_stream_started_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { - // Hold for the read stream. - AddHold(); - StartRead(&read_); + absl::MutexLock lk(&state_mtx_); + if (StreamState::Created != read_state_) { + SPDLOG_DEBUG("Further read from {} is not allowded due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(read_state_)); + return; } + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Ready)); + read_state_ = StreamState::Ready; + AddHold(); + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Inflight)); + read_state_ = StreamState::Inflight; + StartRead(&read_); } void TelemetryBidiReactor::write(TelemetryCommand command) { SPDLOG_DEBUG("{}#write", peer_address_); { - absl::MutexLock lk(&stream_state_mtx_); + absl::MutexLock lk(&state_mtx_); // Reject incoming write commands if the stream state is closing or has witnessed some error. - if (!streamStateGood() || StreamState::Closing == stream_state_) { - return; + switch (write_state_) { + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: + return; + default: + // no-op + break; } } @@ -337,84 +384,123 @@ void TelemetryBidiReactor::write(TelemetryCommand command) { void TelemetryBidiReactor::tryWriteNext() { SPDLOG_DEBUG("{}#tryWriteNext", peer_address_); { - absl::MutexLock lk(&stream_state_mtx_); - if (!streamStateGood()) { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Error == write_state_ || StreamState::Closed == write_state_) { SPDLOG_WARN("Further write to {} is not allowded due to stream-state={}", peer_address_, - static_cast<std::uint8_t>(stream_state_)); + static_cast<std::uint8_t>(write_state_)); return; } } - bool closing = false; - { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Closing == stream_state_) { - closing = true; - } - } - { absl::MutexLock lk(&writes_mtx_); - if (writes_.empty() && !closing) { + if (writes_.empty() && StreamState::Closing != write_state_) { SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_); return; } - bool expected = false; - if (command_inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { - if (writes_.empty()) { - // Tell server there is no more write requests. - StartWritesDone(); - } else { - SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); - StartWrite(&(writes_.front())); - } + if (StreamState::Ready == write_state_) { + write_state_ = StreamState::Inflight; + } + + if (writes_.empty()) { + // Tell server there is no more write requests. + StartWritesDone(); } else { - SPDLOG_DEBUG("Another command is already on the wire. Peer={}", peer_address_); - return; + SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); + StartWrite(&(writes_.front())); } } } void TelemetryBidiReactor::fireClose() { SPDLOG_INFO("{}#fireClose", peer_address_); + { - absl::MutexLock lk(&stream_state_mtx_); - if (!streamStateGood()) { - SPDLOG_WARN("No futher Read/Write call to {} is allowed due to stream-state={}", peer_address_, - static_cast<std::uint8_t>(stream_state_)); - return; + // Acquire state lock + absl::MutexLock lk(&state_mtx_); + + // Transition read state + switch (read_state_) { + case StreamState::Created: + case StreamState::Ready: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + break; + } + + case StreamState::Inflight: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closing)); + read_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: { + break; + } + case StreamState::Closed: + case StreamState::Error: { + state_cv_.SignalAll(); + break; + } + } + + // Transition write state + switch (write_state_) { + case StreamState::Created: + case StreamState::Ready: + case StreamState::Inflight: { + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closing)); + write_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: { + break; + } + case StreamState::Closed: + case StreamState::Error: { + state_cv_.SignalAll(); + break; + } } } - { - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::Closing; + if (StreamState::Closing == write_state_) { + tryWriteNext(); } - tryWriteNext(); + { - absl::MutexLock lk(&stream_state_mtx_); - if (stream_state_cv_.WaitWithTimeout(&stream_state_mtx_, absl::Seconds(3))) { - SPDLOG_WARN("StreamState CondVar timed out before getting signalled"); + // Acquire state lock + absl::MutexLock lk(&state_mtx_); + while ((StreamState::Closed != read_state_ && StreamState::Error != read_state_) || + (StreamState::Closed != write_state_ && StreamState::Error != write_state_)) { + if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { + SPDLOG_WARN("StreamState CondVar timed out before getting signalled: read-state={}, write-state={}", + static_cast<uint8_t>(read_state_), static_cast<uint8_t>(write_state_)); + } } } } void TelemetryBidiReactor::OnWritesDoneDone(bool ok) { SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); + assert(StreamState::Closing == write_state_); + + absl::MutexLock lk(&state_mtx_); // Remove the hold for the write stream. RemoveHold(); if (!ok) { - absl::MutexLock lk(&stream_state_mtx_); - if (streamStateGood()) { - stream_state_ = StreamState::WriteFailure; - } + write_state_ = StreamState::Error; SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_); - return; + } else { + write_state_ = StreamState::Closed; + SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); } - - SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); + state_cv_.SignalAll(); } void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { @@ -441,7 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { { SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_); - changeStreamStateThenNotify(StreamState::Closed); + absl::MutexLock lk(&state_mtx_); + if (StreamState::Error != read_state_) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + } + if (StreamState::Error != write_state_) { + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + write_state_ = StreamState::Closed; + } + state_cv_.SignalAll(); } auto client = client_.lock(); @@ -456,11 +553,13 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_); + if (!ok) { - absl::MutexLock lk(&stream_state_mtx_); - if (streamStateGood()) { - stream_state_ = StreamState::ReadInitialMetadataFailure; - } + absl::MutexLock lk(&state_mtx_); + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Error)); + read_state_ = StreamState::Error; + state_cv_.SignalAll(); SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); return; } @@ -468,8 +567,4 @@ void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { SPDLOG_DEBUG("Received initial metadata from {}", peer_address_); } -bool TelemetryBidiReactor::streamStateGood() { - return StreamState::Active == stream_state_ || StreamState::Closing == stream_state_; -} - ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index 63c6f5e6..77ee484f 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -34,21 +34,30 @@ ROCKETMQ_NAMESPACE_BEGIN enum class StreamState : std::uint8_t { - Active = 0, - - /// Once the stream enters this state, new write requests shall be rejected - /// and once currently pending requests are written, write stream should be - /// closed as soon as possible. - Closing = 1, - - // Once stream state reaches one of the following, Start* should not be - // called. - Closed = 2, - ReadInitialMetadataFailure = 3, - ReadFailure = 4, - WriteFailure = 5, + Created = 0, + Ready = 1, + Inflight = 2, + Closing = 3, + Closed = 4, + Error = 5, }; +/// write-stream-state: created --> ready --> inflight --> ready --> ... +/// --> error +/// --> closing --> closed +/// --> closing --> closed +/// --> error +/// +/// +/// read-stream-state: created --> ready --> inflight --> inflight +/// --> closing --> closed +/// --> error +/// --> closed +/// requirement: +/// 1, fireClose --> blocking await till bidireactor is closed; +/// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from +/// server 3, after writing the first Setttings telemetry command, launch the read directional stream +/// class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>, public std::enable_shared_from_this<TelemetryBidiReactor> { public: @@ -131,20 +140,10 @@ private: */ std::string peer_address_; - /** - * @brief Indicate if there is a command being written to network. - */ - std::atomic_bool command_inflight_{false}; - - std::atomic_bool read_stream_started_{false}; - - StreamState stream_state_ GUARDED_BY(stream_state_mtx_); - absl::Mutex stream_state_mtx_; - absl::CondVar stream_state_cv_; - - bool server_setting_received_ GUARDED_BY(server_setting_received_mtx_){false}; - absl::Mutex server_setting_received_mtx_; - absl::CondVar server_setting_received_cv_; + StreamState read_state_ GUARDED_BY(state_mtx_); + StreamState write_state_ GUARDED_BY(state_mtx_); + absl::Mutex state_mtx_; + absl::CondVar state_cv_; void changeStreamStateThenNotify(StreamState state); @@ -158,19 +157,13 @@ private: void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client); - /** - * Indicate if the underlying gRPC bidirectional stream is good enough to fire - * further Start* calls. - */ - bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_); - /// Start the read stream. /// /// Once got the OnReadDone and status is OK, call StartRead immediately. void fireRead(); /// Attempt to write pending telemetry command to server. - void tryWriteNext(); + void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_); }; ROCKETMQ_NAMESPACE_END \ No newline at end of file