This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch fifo_opt in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 15dbae44743ef61d5b0be5ff1bf8e2fc388f9343 Author: Li Zhanhui <lizhan...@gmail.com> AuthorDate: Sat Apr 13 22:24:06 2024 +0800 feat: use shared_ptr for message internally Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- cpp/.clang-format | 11 +++ cpp/.gitignore | 1 + cpp/source/rocketmq/ProducerImpl.cpp | 110 ++++++++++++++--------------- cpp/source/rocketmq/include/ProducerImpl.h | 8 +-- cpp/source/rocketmq/include/SendContext.h | 9 ++- 5 files changed, 74 insertions(+), 65 deletions(-) diff --git a/cpp/.clang-format b/cpp/.clang-format index fb63bcfb..4d705bad 100644 --- a/cpp/.clang-format +++ b/cpp/.clang-format @@ -17,6 +17,17 @@ AllowShortFunctionsOnASingleLine: false IndentCaseBlocks: false IndentCaseLabels: true ReflowComments: true +AlignConsecutiveDeclarations: + Enabled: true + AcrossEmptyLines: false + AcrossComments: false +AlignConsecutiveAssignments: + Enabled: true + AcrossEmptyLines: false + AcrossComments: false +AlignTrailingComments: + Kind: Always + OverEmptyLines: 2 ... --- diff --git a/cpp/.gitignore b/cpp/.gitignore index b7f10c0f..26ace723 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp /compile_commands.json /.cache/ .clangd +tools/send_fifo.sh diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 73130161..8b11106b 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -16,9 +16,9 @@ */ #include "ProducerImpl.h" -#include <algorithm> #include <apache/rocketmq/v2/definition.pb.h> +#include <algorithm> #include <atomic> #include <cassert> #include <chrono> @@ -115,7 +115,8 @@ void ProducerImpl::validate(const Message& message, std::error_code& ec) { } } -void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request, +void ProducerImpl::wrapSendMessageRequest(const Message& message, + SendMessageRequest& request, const rmq::MessageQueue& message_queue) { auto msg = new rmq::Message(); @@ -151,13 +152,14 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()); auto mutable_delivery_timestamp = system_properties->mutable_delivery_timestamp(); mutable_delivery_timestamp->set_seconds(std::chrono::duration_cast<std::chrono::seconds>(duration).count()); - mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() % 1000000000); + mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() % + 1000000000); } } // Born-time - auto duration = absl::Now() - absl::UnixEpoch(); - int64_t seconds = absl::ToInt64Seconds(duration); + auto duration = absl::Now() - absl::UnixEpoch(); + int64_t seconds = absl::ToInt64Seconds(duration); system_properties->mutable_born_timestamp()->set_seconds(seconds); system_properties->mutable_born_timestamp()->set_nanos(absl::ToInt64Nanoseconds(duration - absl::Seconds(seconds))); @@ -200,7 +202,7 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString()); } -SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept { +SendReceipt ProducerImpl::send(MessageConstSharedPtr message, std::error_code& ec) noexcept { ensureRunning(ec); if (ec) { return {}; @@ -218,14 +220,14 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noe return {}; } - auto mtx = std::make_shared<absl::Mutex>(); - auto cv = std::make_shared<absl::CondVar>(); - bool completed = false; - SendReceipt send_receipt; + auto mtx = std::make_shared<absl::Mutex>(); + auto cv = std::make_shared<absl::CondVar>(); + bool completed = false; + SendReceipt send_receipt; // Define callback auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) { - ec = code; + ec = code; send_receipt = receipt; { absl::MutexLock lk(mtx.get()); @@ -246,7 +248,7 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noe return send_receipt; } -void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { +void ProducerImpl::send(MessageConstSharedPtr message, SendCallback cb) { std::error_code ec; ensureRunning(ec); if (ec) { @@ -254,13 +256,10 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { cb(ec, send_receipt); } - std::string topic = message->topic(); + std::string topic = message->topic(); std::weak_ptr<ProducerImpl> client(shared_from_this()); - // Walk-around: Use capture by move when C++14 is possible - const Message* msg = message.release(); - auto callback = [client, cb, msg](const std::error_code& ec, const TopicPublishInfoPtr& publish_info) { - MessageConstPtr ptr(msg); + auto callback = [=](const std::error_code& ec, const TopicPublishInfoPtr& publish_info) mutable { // No route entries of the given topic is available if (ec) { SendReceipt send_receipt; @@ -275,16 +274,16 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { return; } - auto publisher = client.lock(); + auto publisher = client.lock(); std::vector<rmq::MessageQueue> message_queue_list; - if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) { + if (!publish_info->selectMessageQueues(message->group(), message_queue_list)) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; cb(ec, send_receipt); return; } - publisher->send0(std::move(ptr), cb, message_queue_list); + publisher->send0(std::move(message), cb, message_queue_list); }; getPublishInfoAsync(topic, callback); @@ -306,8 +305,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { { // Trace Send RPC if (!context->message_->traceContext().empty() && client_config_.sampler_) { - auto span_context = opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext()); - auto span = opencensus::trace::Span::BlankSpan(); + auto span_context = opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext()); + auto span = opencensus::trace::Span::BlankSpan(); std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " + MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION; if (span_context.IsValid()) { @@ -349,8 +348,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { client_manager_->send(target, metadata, request, callback); } -void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list) { - SendReceipt send_receipt; +void ProducerImpl::send0(MessageConstSharedPtr&& message, SendCallback callback, std::vector<rmq::MessageQueue> list) { + SendReceipt send_receipt; std::error_code ec; validate(*message, ec); if (ec) { @@ -370,7 +369,7 @@ void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::ve bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, TransactionState resolution) { EndTransactionRequest request; - const std::string& topic = transaction.topic; + const std::string& topic = transaction.topic; request.mutable_topic()->set_name(topic); request.mutable_topic()->set_resource_namespace(resourceNamespace()); request.set_message_id(transaction.message_id); @@ -390,8 +389,8 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, Transacti absl::flat_hash_map<std::string, std::string> metadata; Signature::sign(config(), metadata); bool completed = false; - bool success = false; - auto span = opencensus::trace::Span::BlankSpan(); + bool success = false; + auto span = opencensus::trace::Span::BlankSpan(); if (!transaction.trace_context.empty() && client_config_.sampler_) { // Trace transactional message opencensus::trace::SpanContext span_context = @@ -399,9 +398,10 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, Transacti std::string trace_operation_name = TransactionState::COMMIT == resolution ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION; - std::string span_name = resourceNamespace() + "/" + transaction.topic + " " + trace_operation_name; + std::string span_name = resourceNamespace() + "/" + transaction.topic + " " + trace_operation_name; if (span_context.IsValid()) { - span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()}); + span = + opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()}); } else { span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()}); } @@ -410,9 +410,9 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, Transacti // TracingUtility::addUniversalSpanAttributes(message, config(), span); } - auto mtx = std::make_shared<absl::Mutex>(); - auto cv = std::make_shared<absl::CondVar>(); - const auto& endpoint = transaction.target; + auto mtx = std::make_shared<absl::Mutex>(); + auto cv = std::make_shared<absl::CondVar>(); + const auto& endpoint = transaction.target; std::weak_ptr<ProducerImpl> publisher(shared_from_this()); auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) { @@ -463,10 +463,8 @@ void ProducerImpl::isolateEndpoint(const std::string& target) { isolated_endpoints_.insert(target); } -void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { - MiniTransaction mini = {}; - mini.topic = message->topic(); - mini.trace_context = message->traceContext(); +void ProducerImpl::send(MessageConstSharedPtr message, std::error_code& ec, Transaction& transaction) { + MiniTransaction mini = {.topic = message->topic(), .trace_context = message->traceContext()}; if (!message->group().empty()) { ec = ErrorCode::MessagePropertyConflictWithType; @@ -480,15 +478,15 @@ void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transactio return; } - Message* msg = const_cast<Message*>(message.get()); + Message* msg = const_cast<Message*>(message.get()); msg->mutableExtension().transactional = true; SendReceipt send_receipt = send(std::move(message), ec); - mini.message_id = send_receipt.message_id; + mini.message_id = send_receipt.message_id; mini.transaction_id = send_receipt.transaction_id; - mini.target = send_receipt.target; - auto& impl = dynamic_cast<TransactionImpl&>(transaction); + mini.target = send_receipt.target; + auto& impl = dynamic_cast<TransactionImpl&>(transaction); impl.appendMiniTransaction(mini); } @@ -531,16 +529,16 @@ void ProducerImpl::cachePublishInfo(const std::string& topic, TopicPublishInfoPt } TopicPublishInfoPtr ProducerImpl::getPublishInfo(const std::string& topic) { - bool complete = false; - auto mtx = std::make_shared<absl::Mutex>(); - auto cv = std::make_shared<absl::CondVar>(); + bool complete = false; + auto mtx = std::make_shared<absl::Mutex>(); + auto cv = std::make_shared<absl::CondVar>(); TopicPublishInfoPtr topic_publish_info; - std::error_code error_code; - auto cb = [&, mtx, cv](const std::error_code& ec, const TopicPublishInfoPtr& ptr) { + std::error_code error_code; + auto cb = [&, mtx, cv](const std::error_code& ec, const TopicPublishInfoPtr& ptr) { absl::MutexLock lk(mtx.get()); topic_publish_info = ptr; - error_code = ec; - complete = true; + error_code = ec; + complete = true; cv->SignalAll(); }; getPublishInfoAsync(topic, cb); @@ -564,19 +562,19 @@ void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message) std::weak_ptr<ProducerImpl> producer(shared_from_this()); MiniTransaction transaction = {}; - transaction.topic = message->topic(); - transaction.message_id = message->id(); - transaction.transaction_id = message->extension().transaction_id; - transaction.trace_context = message->traceContext(); - transaction.target = message->extension().target_endpoint; - TransactionState state = transaction_checker_(*message); + transaction.topic = message->topic(); + transaction.message_id = message->id(); + transaction.transaction_id = message->extension().transaction_id; + transaction.trace_context = message->traceContext(); + transaction.target = message->extension().target_endpoint; + TransactionState state = transaction_checker_(*message); endTransaction0(transaction, state); } else { SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr"); } } -void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) { +void ProducerImpl::topicsOfInterest(std::vector<std::string>& topics) { absl::MutexLock lk(&topics_mtx_); for (auto& topic : topics_) { if (std::find(topics.begin(), topics.end(), topic) == topics.end()) { @@ -585,9 +583,9 @@ void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) { } } -void ProducerImpl::withTopics(const std::vector<std::string> &topics) { +void ProducerImpl::withTopics(const std::vector<std::string>& topics) { absl::MutexLock lk(&topics_mtx_); - for (auto &topic: topics) { + for (auto& topic : topics) { if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) { topics_.push_back(topic); } diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index d7260a93..9b467aa6 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -53,9 +53,9 @@ public: void shutdown() override; - SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept; + SendReceipt send(MessageConstSharedPtr message, std::error_code& ec) noexcept; - void send(MessageConstPtr message, SendCallback callback); + void send(MessageConstSharedPtr message, SendCallback callback); void setTransactionChecker(TransactionChecker checker); @@ -64,7 +64,7 @@ public: return absl::make_unique<TransactionImpl>(producer); } - void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); + void send(MessageConstSharedPtr message, std::error_code& ec, Transaction& transaction); /** * Check if the RPC client for the target host is isolated or not @@ -160,7 +160,7 @@ private: void validate(const Message& message, std::error_code& ec); - void send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list); + void send0(MessageConstSharedPtr&& message, SendCallback callback, std::vector<rmq::MessageQueue> list); void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) LOCKS_EXCLUDED(isolated_endpoints_mtx_); diff --git a/cpp/source/rocketmq/include/SendContext.h b/cpp/source/rocketmq/include/SendContext.h index 4c05cebe..760c6558 100644 --- a/cpp/source/rocketmq/include/SendContext.h +++ b/cpp/source/rocketmq/include/SendContext.h @@ -19,12 +19,11 @@ #include <memory> #include <system_error> +#include "Protocol.h" +#include "TransactionImpl.h" #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" #include "opencensus/trace/span.h" - -#include "Protocol.h" -#include "TransactionImpl.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" @@ -37,7 +36,7 @@ class ProducerImpl; class SendContext : public std::enable_shared_from_this<SendContext> { public: SendContext(std::weak_ptr<ProducerImpl> producer, - MessageConstPtr message, + MessageConstSharedPtr&& message, SendCallback callback, std::vector<rmq::MessageQueue> candidates) : producer_(std::move(producer)), @@ -57,7 +56,7 @@ public: } std::weak_ptr<ProducerImpl> producer_; - MessageConstPtr message_; + MessageConstSharedPtr message_; std::size_t attempt_times_{0}; SendCallback callback_;