This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 65017010 [ISSUE #934] C++ trans producer should return a SendReceipt 
object in the send call (#936)
65017010 is described below

commit 650170106805f8d7cb4b7265cf88e917567b1d8d
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue Feb 11 11:26:55 2025 +0800

    [ISSUE #934] C++ trans producer should return a SendReceipt object in the 
send call (#936)
---
 .../ExampleProducerWithTransactionalMessage.cpp    |  4 +++-
 cpp/include/rocketmq/Producer.h                    |  2 +-
 cpp/source/rocketmq/Producer.cpp                   |  4 ++--
 cpp/source/rocketmq/ProducerImpl.cpp               | 12 ++++++++---
 cpp/source/rocketmq/include/ProducerImpl.h         | 25 ++++++++++------------
 5 files changed, 26 insertions(+), 21 deletions(-)

diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp 
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index f595c6ef..d2c194d5 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -109,9 +109,11 @@ int main(int argc, char* argv[]) {
     auto transaction = producer.beginTransaction();
     std::error_code ec;
 
-    producer.send(std::move(message), ec, *transaction);
+    SendReceipt send_receipt = producer.send(std::move(message), ec, 
*transaction);
 
     if (!ec) {
+      std::cout << "Send transactional message to " << FLAGS_topic << " OK. "
+                << "Message-ID: " << send_receipt.message_id << std::endl;
       if (!transaction->commit()) {
         std::cerr << "Failed to commit message" << std::endl;
       }
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 6b42843d..d3de06e9 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -65,7 +65,7 @@ public:
 
   std::unique_ptr<Transaction> beginTransaction();
 
-  void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
+  SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
 private:
   explicit Producer(std::shared_ptr<ProducerImpl> impl) : 
impl_(std::move(impl)) {
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 907d0a28..4f29383f 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -64,8 +64,8 @@ std::unique_ptr<Transaction> Producer::beginTransaction() {
   return impl_->beginTransaction();
 }
 
-void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction) {
-  impl_->send(std::move(message), ec, transaction);
+SendReceipt Producer::send(MessageConstPtr message, std::error_code& ec, 
Transaction& transaction) {
+  return impl_->send(std::move(message), ec, transaction);
 }
 
 ProducerBuilder Producer::newBuilder() {
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 9b664d59..3de3d37d 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -474,7 +474,7 @@ void ProducerImpl::isolateEndpoint(const std::string& 
target) {
   isolated_endpoints_.insert(target);
 }
 
-void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, 
Transaction& transaction) {
+SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec, 
Transaction& transaction) {
   MiniTransaction mini = {};
   mini.topic = message->topic();
   mini.trace_context = message->traceContext();
@@ -482,13 +482,17 @@ void ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec, Transactio
   if (!message->group().empty()) {
     ec = ErrorCode::MessagePropertyConflictWithType;
     SPDLOG_WARN("FIFO message may not be transactional");
-    return;
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   if (message->deliveryTimestamp().time_since_epoch().count()) {
     ec = ErrorCode::MessagePropertyConflictWithType;
     SPDLOG_WARN("Timed message may not be transactional");
-    return;
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   Message* msg = const_cast<Message*>(message.get());
@@ -501,6 +505,8 @@ void ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec, Transactio
   mini.target = send_receipt.target;
   auto& impl = dynamic_cast<TransactionImpl&>(transaction);
   impl.appendMiniTransaction(mini);
+
+  return send_receipt;
 }
 
 void ProducerImpl::getPublishInfoAsync(const std::string& topic, const 
PublishInfoCallback& cb) {
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index 2c284172..2cb9c44e 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -49,20 +49,20 @@ public:
   void shutdown() override;
 
   /**
-   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
-   * sent.
+   * Note we require application to transfer ownership of the message
+   * to send to avoid concurrent modification during sent.
    *
-   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
-   * application to conduct customized retry policy.
+   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>,
+   * facilitating application to conduct customized retry policy.
    */
   SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
 
   /**
-   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
-   * sent.
+   * Note we require application to transfer ownership of the message
+   * to send to avoid concurrent modification during sent.
    *
-   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
-   * application to conduct customized retry policy.
+   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>,
+   * facilitating application to conduct customized retry policy.
    */
   void send(MessageConstPtr message, SendCallback callback);
 
@@ -74,13 +74,10 @@ public:
   }
 
   /**
-   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
-   * sent.
-   *
-   * TODO: Refine this API. Current API is not good enough as it cannot handle 
the message back to its caller on publish
-   * failure.
+   * Note we require application to transfer ownership of the message
+   * to send to avoid concurrent modification during sent.
    */
-  void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
+  SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
   /**
    * Check if the RPC client for the target host is isolated or not

Reply via email to