This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 15dbae44743ef61d5b0be5ff1bf8e2fc388f9343
Author: Li Zhanhui <lizhan...@gmail.com>
AuthorDate: Sat Apr 13 22:24:06 2024 +0800

    feat: use shared_ptr for message internally
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 cpp/.clang-format                          |  11 +++
 cpp/.gitignore                             |   1 +
 cpp/source/rocketmq/ProducerImpl.cpp       | 110 ++++++++++++++---------------
 cpp/source/rocketmq/include/ProducerImpl.h |   8 +--
 cpp/source/rocketmq/include/SendContext.h  |   9 ++-
 5 files changed, 74 insertions(+), 65 deletions(-)

diff --git a/cpp/.clang-format b/cpp/.clang-format
index fb63bcfb..4d705bad 100644
--- a/cpp/.clang-format
+++ b/cpp/.clang-format
@@ -17,6 +17,17 @@ AllowShortFunctionsOnASingleLine: false
 IndentCaseBlocks: false
 IndentCaseLabels: true
 ReflowComments: true
+AlignConsecutiveDeclarations:
+  Enabled: true
+  AcrossEmptyLines: false
+  AcrossComments: false
+AlignConsecutiveAssignments:
+  Enabled: true
+  AcrossEmptyLines: false
+  AcrossComments: false
+AlignTrailingComments:
+  Kind: Always
+  OverEmptyLines: 2
 ...
 
 ---
diff --git a/cpp/.gitignore b/cpp/.gitignore
index b7f10c0f..26ace723 100644
--- a/cpp/.gitignore
+++ b/cpp/.gitignore
@@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp
 /compile_commands.json
 /.cache/
 .clangd
+tools/send_fifo.sh
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 73130161..8b11106b 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -16,9 +16,9 @@
  */
 #include "ProducerImpl.h"
 
-#include <algorithm>
 #include <apache/rocketmq/v2/definition.pb.h>
 
+#include <algorithm>
 #include <atomic>
 #include <cassert>
 #include <chrono>
@@ -115,7 +115,8 @@ void ProducerImpl::validate(const Message& message, 
std::error_code& ec) {
   }
 }
 
-void ProducerImpl::wrapSendMessageRequest(const Message& message, 
SendMessageRequest& request,
+void ProducerImpl::wrapSendMessageRequest(const Message&           message,
+                                          SendMessageRequest&      request,
                                           const rmq::MessageQueue& 
message_queue) {
   auto msg = new rmq::Message();
 
@@ -151,13 +152,14 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
       
system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
       auto mutable_delivery_timestamp = 
system_properties->mutable_delivery_timestamp();
       
mutable_delivery_timestamp->set_seconds(std::chrono::duration_cast<std::chrono::seconds>(duration).count());
-      
mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count()
 % 1000000000);
+      
mutable_delivery_timestamp->set_nanos(std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count()
 %
+                                            1000000000);
     }
   }
 
   // Born-time
-  auto duration = absl::Now() - absl::UnixEpoch();
-  int64_t seconds = absl::ToInt64Seconds(duration);
+  auto    duration = absl::Now() - absl::UnixEpoch();
+  int64_t seconds  = absl::ToInt64Seconds(duration);
   system_properties->mutable_born_timestamp()->set_seconds(seconds);
   
system_properties->mutable_born_timestamp()->set_nanos(absl::ToInt64Nanoseconds(duration
 - absl::Seconds(seconds)));
 
@@ -200,7 +202,7 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
   SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString());
 }
 
-SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) 
noexcept {
+SendReceipt ProducerImpl::send(MessageConstSharedPtr message, std::error_code& 
ec) noexcept {
   ensureRunning(ec);
   if (ec) {
     return {};
@@ -218,14 +220,14 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
     return {};
   }
 
-  auto mtx = std::make_shared<absl::Mutex>();
-  auto cv = std::make_shared<absl::CondVar>();
-  bool          completed = false;
-  SendReceipt   send_receipt;
+  auto        mtx       = std::make_shared<absl::Mutex>();
+  auto        cv        = std::make_shared<absl::CondVar>();
+  bool        completed = false;
+  SendReceipt send_receipt;
 
   // Define callback
   auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) {
-    ec = code;
+    ec           = code;
     send_receipt = receipt;
     {
       absl::MutexLock lk(mtx.get());
@@ -246,7 +248,7 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   return send_receipt;
 }
 
-void ProducerImpl::send(MessageConstPtr message, SendCallback cb) {
+void ProducerImpl::send(MessageConstSharedPtr message, SendCallback cb) {
   std::error_code ec;
   ensureRunning(ec);
   if (ec) {
@@ -254,13 +256,10 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     cb(ec, send_receipt);
   }
 
-  std::string topic = message->topic();
+  std::string                 topic = message->topic();
   std::weak_ptr<ProducerImpl> client(shared_from_this());
 
-  // Walk-around: Use capture by move when C++14 is possible
-  const Message* msg = message.release();
-  auto callback = [client, cb, msg](const std::error_code& ec, const 
TopicPublishInfoPtr& publish_info) {
-    MessageConstPtr ptr(msg);
+  auto callback = [=](const std::error_code& ec, const TopicPublishInfoPtr& 
publish_info) mutable {
     // No route entries of the given topic is available
     if (ec) {
       SendReceipt send_receipt;
@@ -275,16 +274,16 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
       return;
     }
 
-    auto publisher = client.lock();
+    auto                           publisher = client.lock();
     std::vector<rmq::MessageQueue> message_queue_list;
-    if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
+    if (!publish_info->selectMessageQueues(message->group(), 
message_queue_list)) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
       cb(ec, send_receipt);
       return;
     }
 
-    publisher->send0(std::move(ptr), cb, message_queue_list);
+    publisher->send0(std::move(message), cb, message_queue_list);
   };
 
   getPublishInfoAsync(topic, callback);
@@ -306,8 +305,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> 
context) {
   {
     // Trace Send RPC
     if (!context->message_->traceContext().empty() && client_config_.sampler_) 
{
-      auto span_context = 
opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext());
-      auto span = opencensus::trace::Span::BlankSpan();
+      auto span_context     = 
opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext());
+      auto span             = opencensus::trace::Span::BlankSpan();
       std::string span_name = resourceNamespace() + "/" + 
context->message_->topic() + " " +
                               
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
       if (span_context.IsValid()) {
@@ -349,8 +348,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> 
context) {
   client_manager_->send(target, metadata, request, callback);
 }
 
-void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, 
std::vector<rmq::MessageQueue> list) {
-  SendReceipt send_receipt;
+void ProducerImpl::send0(MessageConstSharedPtr&& message, SendCallback 
callback, std::vector<rmq::MessageQueue> list) {
+  SendReceipt     send_receipt;
   std::error_code ec;
   validate(*message, ec);
   if (ec) {
@@ -370,7 +369,7 @@ void ProducerImpl::send0(MessageConstPtr message, 
SendCallback callback, std::ve
 
 bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, 
TransactionState resolution) {
   EndTransactionRequest request;
-  const std::string& topic = transaction.topic;
+  const std::string&    topic = transaction.topic;
   request.mutable_topic()->set_name(topic);
   request.mutable_topic()->set_resource_namespace(resourceNamespace());
   request.set_message_id(transaction.message_id);
@@ -390,8 +389,8 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& 
transaction, Transacti
   absl::flat_hash_map<std::string, std::string> metadata;
   Signature::sign(config(), metadata);
   bool completed = false;
-  bool success = false;
-  auto span = opencensus::trace::Span::BlankSpan();
+  bool success   = false;
+  auto span      = opencensus::trace::Span::BlankSpan();
   if (!transaction.trace_context.empty() && client_config_.sampler_) {
     // Trace transactional message
     opencensus::trace::SpanContext span_context =
@@ -399,9 +398,10 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& 
transaction, Transacti
     std::string trace_operation_name = TransactionState::COMMIT == resolution
                                            ? 
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
                                            : 
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
-    std::string span_name = resourceNamespace() + "/" + transaction.topic + " 
" + trace_operation_name;
+    std::string span_name            = resourceNamespace() + "/" + 
transaction.topic + " " + trace_operation_name;
     if (span_context.IsValid()) {
-      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context, {client_config_.sampler_.get()});
+      span =
+          opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context, {client_config_.sampler_.get()});
     } else {
       span = opencensus::trace::Span::StartSpan(span_name, nullptr, 
{client_config_.sampler_.get()});
     }
@@ -410,9 +410,9 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& 
transaction, Transacti
     // TracingUtility::addUniversalSpanAttributes(message, config(), span);
   }
 
-  auto mtx = std::make_shared<absl::Mutex>();
-  auto cv = std::make_shared<absl::CondVar>();
-  const auto& endpoint = transaction.target;
+  auto                        mtx      = std::make_shared<absl::Mutex>();
+  auto                        cv       = std::make_shared<absl::CondVar>();
+  const auto&                 endpoint = transaction.target;
   std::weak_ptr<ProducerImpl> publisher(shared_from_this());
 
   auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, 
const EndTransactionResponse& response) {
@@ -463,10 +463,8 @@ void ProducerImpl::isolateEndpoint(const std::string& 
target) {
   isolated_endpoints_.insert(target);
 }
 
-void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, 
Transaction& transaction) {
-  MiniTransaction mini = {};
-  mini.topic = message->topic();
-  mini.trace_context = message->traceContext();
+void ProducerImpl::send(MessageConstSharedPtr message, std::error_code& ec, 
Transaction& transaction) {
+  MiniTransaction mini = {.topic = message->topic(), .trace_context = 
message->traceContext()};
 
   if (!message->group().empty()) {
     ec = ErrorCode::MessagePropertyConflictWithType;
@@ -480,15 +478,15 @@ void ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec, Transactio
     return;
   }
 
-  Message* msg = const_cast<Message*>(message.get());
+  Message* msg                          = const_cast<Message*>(message.get());
   msg->mutableExtension().transactional = true;
 
   SendReceipt send_receipt = send(std::move(message), ec);
 
-  mini.message_id = send_receipt.message_id;
+  mini.message_id     = send_receipt.message_id;
   mini.transaction_id = send_receipt.transaction_id;
-  mini.target = send_receipt.target;
-  auto& impl = dynamic_cast<TransactionImpl&>(transaction);
+  mini.target         = send_receipt.target;
+  auto& impl          = dynamic_cast<TransactionImpl&>(transaction);
   impl.appendMiniTransaction(mini);
 }
 
@@ -531,16 +529,16 @@ void ProducerImpl::cachePublishInfo(const std::string& 
topic, TopicPublishInfoPt
 }
 
 TopicPublishInfoPtr ProducerImpl::getPublishInfo(const std::string& topic) {
-  bool complete = false;
-  auto mtx = std::make_shared<absl::Mutex>();
-  auto cv = std::make_shared<absl::CondVar>();
+  bool                complete = false;
+  auto                mtx      = std::make_shared<absl::Mutex>();
+  auto                cv       = std::make_shared<absl::CondVar>();
   TopicPublishInfoPtr topic_publish_info;
-  std::error_code error_code;
-  auto cb = [&, mtx, cv](const std::error_code& ec, const TopicPublishInfoPtr& 
ptr) {
+  std::error_code     error_code;
+  auto                cb = [&, mtx, cv](const std::error_code& ec, const 
TopicPublishInfoPtr& ptr) {
     absl::MutexLock lk(mtx.get());
     topic_publish_info = ptr;
-    error_code = ec;
-    complete = true;
+    error_code         = ec;
+    complete           = true;
     cv->SignalAll();
   };
   getPublishInfoAsync(topic, cb);
@@ -564,19 +562,19 @@ void 
ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
     std::weak_ptr<ProducerImpl> producer(shared_from_this());
 
     MiniTransaction transaction = {};
-    transaction.topic = message->topic();
-    transaction.message_id = message->id();
-    transaction.transaction_id = message->extension().transaction_id;
-    transaction.trace_context = message->traceContext();
-    transaction.target = message->extension().target_endpoint;
-    TransactionState state = transaction_checker_(*message);
+    transaction.topic           = message->topic();
+    transaction.message_id      = message->id();
+    transaction.transaction_id  = message->extension().transaction_id;
+    transaction.trace_context   = message->traceContext();
+    transaction.target          = message->extension().target_endpoint;
+    TransactionState state      = transaction_checker_(*message);
     endTransaction0(transaction, state);
   } else {
     SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr");
   }
 }
 
-void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
+void ProducerImpl::topicsOfInterest(std::vector<std::string>& topics) {
   absl::MutexLock lk(&topics_mtx_);
   for (auto& topic : topics_) {
     if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
@@ -585,9 +583,9 @@ void 
ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
   }
 }
 
-void ProducerImpl::withTopics(const std::vector<std::string> &topics) {
+void ProducerImpl::withTopics(const std::vector<std::string>& topics) {
   absl::MutexLock lk(&topics_mtx_);
-  for (auto &topic: topics) {
+  for (auto& topic : topics) {
     if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) {
       topics_.push_back(topic);
     }
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a93..9b467aa6 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -53,9 +53,9 @@ public:
 
   void shutdown() override;
 
-  SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
+  SendReceipt send(MessageConstSharedPtr message, std::error_code& ec) 
noexcept;
 
-  void send(MessageConstPtr message, SendCallback callback);
+  void send(MessageConstSharedPtr message, SendCallback callback);
 
   void setTransactionChecker(TransactionChecker checker);
 
@@ -64,7 +64,7 @@ public:
     return absl::make_unique<TransactionImpl>(producer);
   }
 
-  void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
+  void send(MessageConstSharedPtr message, std::error_code& ec, Transaction& 
transaction);
 
   /**
    * Check if the RPC client for the target host is isolated or not
@@ -160,7 +160,7 @@ private:
 
   void validate(const Message& message, std::error_code& ec);
 
-  void send0(MessageConstPtr message, SendCallback callback, 
std::vector<rmq::MessageQueue> list);
+  void send0(MessageConstSharedPtr&& message, SendCallback callback, 
std::vector<rmq::MessageQueue> list);
 
   void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) 
LOCKS_EXCLUDED(isolated_endpoints_mtx_);
 
diff --git a/cpp/source/rocketmq/include/SendContext.h 
b/cpp/source/rocketmq/include/SendContext.h
index 4c05cebe..760c6558 100644
--- a/cpp/source/rocketmq/include/SendContext.h
+++ b/cpp/source/rocketmq/include/SendContext.h
@@ -19,12 +19,11 @@
 #include <memory>
 #include <system_error>
 
+#include "Protocol.h"
+#include "TransactionImpl.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/synchronization/mutex.h"
 #include "opencensus/trace/span.h"
-
-#include "Protocol.h"
-#include "TransactionImpl.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
@@ -37,7 +36,7 @@ class ProducerImpl;
 class SendContext : public std::enable_shared_from_this<SendContext> {
 public:
   SendContext(std::weak_ptr<ProducerImpl> producer,
-              MessageConstPtr message,
+              MessageConstSharedPtr&& message,
               SendCallback callback,
               std::vector<rmq::MessageQueue> candidates)
       : producer_(std::move(producer)),
@@ -57,7 +56,7 @@ public:
   }
 
   std::weak_ptr<ProducerImpl> producer_;
-  MessageConstPtr message_;
+  MessageConstSharedPtr message_;
   std::size_t attempt_times_{0};
   SendCallback callback_;
 

Reply via email to