This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 65017010 [ISSUE #934] C++ trans producer should return a SendReceipt object in the send call (#936) 65017010 is described below commit 650170106805f8d7cb4b7265cf88e917567b1d8d Author: lizhimins <707364...@qq.com> AuthorDate: Tue Feb 11 11:26:55 2025 +0800 [ISSUE #934] C++ trans producer should return a SendReceipt object in the send call (#936) --- .../ExampleProducerWithTransactionalMessage.cpp | 4 +++- cpp/include/rocketmq/Producer.h | 2 +- cpp/source/rocketmq/Producer.cpp | 4 ++-- cpp/source/rocketmq/ProducerImpl.cpp | 12 ++++++++--- cpp/source/rocketmq/include/ProducerImpl.h | 25 ++++++++++------------ 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp index f595c6ef..d2c194d5 100644 --- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp +++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp @@ -109,9 +109,11 @@ int main(int argc, char* argv[]) { auto transaction = producer.beginTransaction(); std::error_code ec; - producer.send(std::move(message), ec, *transaction); + SendReceipt send_receipt = producer.send(std::move(message), ec, *transaction); if (!ec) { + std::cout << "Send transactional message to " << FLAGS_topic << " OK. " + << "Message-ID: " << send_receipt.message_id << std::endl; if (!transaction->commit()) { std::cerr << "Failed to commit message" << std::endl; } diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h index 6b42843d..d3de06e9 100644 --- a/cpp/include/rocketmq/Producer.h +++ b/cpp/include/rocketmq/Producer.h @@ -65,7 +65,7 @@ public: std::unique_ptr<Transaction> beginTransaction(); - void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); + SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); private: explicit Producer(std::shared_ptr<ProducerImpl> impl) : impl_(std::move(impl)) { diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 907d0a28..4f29383f 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -64,8 +64,8 @@ std::unique_ptr<Transaction> Producer::beginTransaction() { return impl_->beginTransaction(); } -void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { - impl_->send(std::move(message), ec, transaction); +SendReceipt Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { + return impl_->send(std::move(message), ec, transaction); } ProducerBuilder Producer::newBuilder() { diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 9b664d59..3de3d37d 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -474,7 +474,7 @@ void ProducerImpl::isolateEndpoint(const std::string& target) { isolated_endpoints_.insert(target); } -void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { +SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { MiniTransaction mini = {}; mini.topic = message->topic(); mini.trace_context = message->traceContext(); @@ -482,13 +482,17 @@ void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transactio if (!message->group().empty()) { ec = ErrorCode::MessagePropertyConflictWithType; SPDLOG_WARN("FIFO message may not be transactional"); - return; + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } if (message->deliveryTimestamp().time_since_epoch().count()) { ec = ErrorCode::MessagePropertyConflictWithType; SPDLOG_WARN("Timed message may not be transactional"); - return; + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } Message* msg = const_cast<Message*>(message.get()); @@ -501,6 +505,8 @@ void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transactio mini.target = send_receipt.target; auto& impl = dynamic_cast<TransactionImpl&>(transaction); impl.appendMiniTransaction(mini); + + return send_receipt; } void ProducerImpl::getPublishInfoAsync(const std::string& topic, const PublishInfoCallback& cb) { diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index 2c284172..2cb9c44e 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -49,20 +49,20 @@ public: void shutdown() override; /** - * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during - * sent. + * Note we require application to transfer ownership of the message + * to send to avoid concurrent modification during sent. * - * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating - * application to conduct customized retry policy. + * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, + * facilitating application to conduct customized retry policy. */ SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept; /** - * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during - * sent. + * Note we require application to transfer ownership of the message + * to send to avoid concurrent modification during sent. * - * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating - * application to conduct customized retry policy. + * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, + * facilitating application to conduct customized retry policy. */ void send(MessageConstPtr message, SendCallback callback); @@ -74,13 +74,10 @@ public: } /** - * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during - * sent. - * - * TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish - * failure. + * Note we require application to transfer ownership of the message + * to send to avoid concurrent modification during sent. */ - void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); + SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); /** * Check if the RPC client for the target host is isolated or not