This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new b727270f [ISSUE #969] C++ client support attemptId in PushConsumer 
(#970)
b727270f is described below

commit b727270f75e728b23c63b977cf6d233444c97e61
Author: lizhimins <707364...@qq.com>
AuthorDate: Thu Mar 27 13:44:06 2025 +0800

    [ISSUE #969] C++ client support attemptId in PushConsumer (#970)
---
 cpp/source/base/Message.cpp                        |  1 -
 cpp/source/base/Protocol.cpp                       |  3 +-
 .../rocketmq/AsyncReceiveMessageCallback.cpp       | 40 ++++++++++--------
 cpp/source/rocketmq/ProcessQueueImpl.cpp           | 48 ++++++++++++++--------
 cpp/source/rocketmq/PushConsumerImpl.cpp           | 18 ++++----
 .../rocketmq/include/AsyncReceiveMessageCallback.h | 10 ++---
 cpp/source/rocketmq/include/ProcessQueue.h         |  2 +-
 cpp/source/rocketmq/include/ProcessQueueImpl.h     |  6 +--
 cpp/source/rocketmq/include/PushConsumerImpl.h     |  5 ++-
 9 files changed, 78 insertions(+), 55 deletions(-)

diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp
index cd3cc236..8d82bd3f 100644
--- a/cpp/source/base/Message.cpp
+++ b/cpp/source/base/Message.cpp
@@ -20,7 +20,6 @@
 #include <memory>
 
 #include "UniqueIdGenerator.h"
-#include "absl/memory/memory.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/source/base/Protocol.cpp b/cpp/source/base/Protocol.cpp
index 91a643b3..70562a2b 100644
--- a/cpp/source/base/Protocol.cpp
+++ b/cpp/source/base/Protocol.cpp
@@ -72,7 +72,8 @@ bool operator==(const rmq::MessageQueue& lhs, const 
rmq::MessageQueue& rhs) {
 }
 
 std::string simpleNameOf(const rmq::MessageQueue& m) {
-  return fmt::format("{}{}-{}-{}", m.topic().resource_namespace(), 
m.topic().name(), m.id(), m.broker().name());
+  return fmt::format("{}@{}@{}@{}",
+                     m.topic().resource_namespace(), m.topic().name(), m.id(), 
m.broker().name());
 }
 
 bool operator==(const std::vector<rmq::MessageQueue>& lhs, const 
std::vector<rmq::MessageQueue>& rhs) {
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp 
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index f68c9f88..d1c3ba30 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -30,10 +30,14 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue>
 process_queue)
     : process_queue_(std::move(process_queue)) {
-  receive_message_later_ = 
std::bind(&AsyncReceiveMessageCallback::checkThrottleThenReceive, this);
+
+  receive_message_later_ = std::bind(
+      &AsyncReceiveMessageCallback::checkThrottleThenReceive, this, 
std::placeholders::_1);
 }
 
-void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, 
const ReceiveMessageResult& result) {
+void AsyncReceiveMessageCallback::onCompletion(
+    const std::error_code& ec, std::string& attempt_id, const 
ReceiveMessageResult& result) {
+
   std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock();
   if (!process_queue) {
     SPDLOG_INFO("Process queue has been destructed.");
@@ -47,18 +51,19 @@ void AsyncReceiveMessageCallback::onCompletion(const 
std::error_code& ec, const
 
   if (ec == ErrorCode::TooManyRequests) {
     SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. 
Queue={}", process_queue->simpleName());
-    receiveMessageLater(std::chrono::milliseconds(20));
+    receiveMessageLater(std::chrono::milliseconds(20), attempt_id);
     return;
   }
 
   if (ec == ErrorCode::NoContent) {
-    checkThrottleThenReceive();
+    checkThrottleThenReceive("");
     return;
   }
 
   if (ec) {
-    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 
second.", process_queue->simpleName(), ec.message());
-    receiveMessageLater(std::chrono::seconds (1));
+    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 
second.", process_queue->simpleName(),
+                ec.message());
+    receiveMessageLater(std::chrono::seconds(1), attempt_id);
     return;
   }
 
@@ -66,12 +71,12 @@ void AsyncReceiveMessageCallback::onCompletion(const 
std::error_code& ec, const
                result.source_host, result.messages.size(), 
process_queue->simpleName());
   process_queue->accountCache(result.messages);
   consumer->getConsumeMessageService()->dispatch(process_queue, 
result.messages);
-  checkThrottleThenReceive();
+  checkThrottleThenReceive("");
 }
 
 const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = 
"receive-later-task";
 
-void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
+void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string 
attempt_id) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     SPDLOG_WARN("Process queue should have been destructed");
@@ -82,14 +87,14 @@ void 
AsyncReceiveMessageCallback::checkThrottleThenReceive() {
     SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive 
messages later.",
                 process_queue->simpleName());
     process_queue->syncIdleState();
-    receiveMessageLater(std::chrono::seconds(1));
+    receiveMessageLater(std::chrono::seconds(1), attempt_id);
   } else {
     // Receive message immediately
-    receiveMessageImmediately();
+    receiveMessageImmediately(attempt_id);
   }
 }
 
-void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds 
delay) {
+void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds 
delay, std::string& attempt_id) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     return;
@@ -98,17 +103,18 @@ void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
   auto client_instance = process_queue->getClientManager();
   std::weak_ptr<AsyncReceiveMessageCallback> 
receive_callback_weak_ptr(shared_from_this());
 
-  auto task = [receive_callback_weak_ptr]() {
+  auto task = [receive_callback_weak_ptr, &attempt_id]() {
     auto async_receive_ptr = receive_callback_weak_ptr.lock();
     if (async_receive_ptr) {
-      async_receive_ptr->checkThrottleThenReceive();
+      async_receive_ptr->checkThrottleThenReceive(attempt_id);
     }
   };
 
-  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, 
delay, std::chrono::seconds(0));
+  client_instance->getScheduler()->schedule(
+      task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
 }
 
-void AsyncReceiveMessageCallback::receiveMessageImmediately() {
+void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& 
attempt_id) {
   auto process_queue_shared_ptr = process_queue_.lock();
   if (!process_queue_shared_ptr) {
     SPDLOG_INFO("ProcessQueue has been released. Ignore further receive 
message request-response cycles");
@@ -121,7 +127,9 @@ void 
AsyncReceiveMessageCallback::receiveMessageImmediately() {
                 process_queue_shared_ptr->simpleName());
     return;
   }
-  impl->receiveMessage(process_queue_shared_ptr->messageQueue(), 
process_queue_shared_ptr->getFilterExpression());
+
+  impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
+                       process_queue_shared_ptr->getFilterExpression(), 
attempt_id);
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp 
b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 002325c0..59df8578 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -16,14 +16,13 @@
  */
 #include "ProcessQueueImpl.h"
 
-#include <atomic>
 #include <chrono>
 #include <memory>
 #include <system_error>
 #include <utility>
 
+#include "UniqueIdGenerator.h"
 #include "AsyncReceiveMessageCallback.h"
-#include "ClientManagerImpl.h"
 #include "MetadataConstants.h"
 #include "Protocol.h"
 #include "PushConsumerImpl.h"
@@ -98,39 +97,39 @@ bool ProcessQueueImpl::shouldThrottle() const {
   return false;
 }
 
-void ProcessQueueImpl::receiveMessage() {
+void ProcessQueueImpl::receiveMessage(std::string& attempt_id) {
   auto consumer = consumer_.lock();
   if (!consumer) {
     return;
   }
-
-  popMessage();
+  popMessage(attempt_id);
 }
 
-void ProcessQueueImpl::popMessage() {
+void ProcessQueueImpl::popMessage(std::string& attempt_id) {
   rmq::ReceiveMessageRequest request;
   absl::flat_hash_map<std::string, std::string> metadata;
   auto consumer_client = consumer_.lock();
   if (!consumer_client) {
     return;
   }
+
   Signature::sign(consumer_client->config(), metadata);
-  wrapPopMessageRequest(metadata, request);
+  wrapPopMessageRequest(metadata, request, attempt_id);
   syncIdleState();
-  SPDLOG_DEBUG("Try to pop message from {}", simpleNameOf(message_queue_));
+  SPDLOG_DEBUG("Receive message from={}, attemptId={}", 
simpleNameOf(message_queue_), attempt_id);
 
   std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
-  auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& 
result) {
-    std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock();
-    if (recv_cb) {
-      recv_cb->onCompletion(ec, result);
+  auto callback =
+      [cb, &attempt_id](const std::error_code& ec, const ReceiveMessageResult& 
result) {
+    std::shared_ptr<AsyncReceiveMessageCallback> receive_cb = cb.lock();
+    if (receive_cb) {
+      receive_cb->onCompletion(ec, attempt_id, result);
     }
   };
 
-  client_manager_->receiveMessage(urlOf(message_queue_), metadata, request,
-                                  
absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout 
+
-                                                             
consumer_client->config().request_timeout),
-                                  callback);
+  auto timeout = absl::ToChronoMilliseconds(
+      consumer_client->config().subscriber.polling_timeout + 
consumer_client->config().request_timeout);
+  client_manager_->receiveMessage(urlOf(message_queue_), metadata, request, 
timeout, callback);
 }
 
 void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& 
messages) {
@@ -184,8 +183,18 @@ void 
ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expres
   }
 }
 
+void generateAttemptId(std::string& attempt_id) {
+  const std::string unique_id = UniqueIdGenerator::instance().next();
+  if (unique_id.size() < 34) {
+    return;
+  }
+  attempt_id = fmt::format(
+      "{}-{}-{}-{}-{}", unique_id.substr(0, 8), unique_id.substr(8, 4),
+      unique_id.substr(12, 4), unique_id.substr(16, 4), unique_id.substr(20, 
12));
+}
+
 void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, 
std::string>& metadata,
-                                             rmq::ReceiveMessageRequest& 
request) {
+                                             rmq::ReceiveMessageRequest& 
request, std::string& attempt_id) {
   std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
   assert(consumer);
   request.mutable_group()->CopyFrom(consumer->config().subscriber.group);
@@ -205,6 +214,11 @@ void 
ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, st
   auto fraction = invisible_time_ - 
std::chrono::duration_cast<std::chrono::seconds>(invisible_time_);
   int32_t nano_seconds = 
static_cast<int32_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(fraction).count());
   request.mutable_invisible_duration()->set_nanos(nano_seconds);
+
+  if (attempt_id.empty()) {
+    generateAttemptId(attempt_id);
+  }
+  request.set_attempt_id(attempt_id);
 }
 
 std::weak_ptr<PushConsumerImpl> ProcessQueueImpl::getConsumer() {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 712ac814..68bb9f67 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -16,10 +16,8 @@
  */
 #include "PushConsumerImpl.h"
 
-#include <atomic>
 #include <cassert>
 #include <chrono>
-#include <cstdint>
 #include <cstdlib>
 #include <string>
 #include <system_error>
@@ -324,9 +322,10 @@ void PushConsumerImpl::syncProcessQueue(const std::string& 
topic,
   for (const auto& message_queue : message_queue_list) {
     if (std::none_of(current.cbegin(), current.cend(),
                      [&](const rmq::MessageQueue& item) { return item == 
message_queue; })) {
-      SPDLOG_INFO("Start to receive message from {} according to latest 
assignment info from load balancer",
+      SPDLOG_DEBUG("Start to receive message from {} according to latest 
assignment info from load balancer",
                   simpleNameOf(message_queue));
-      if (!receiveMessage(message_queue, filter_expression)) {
+      std::string attempt_id;
+      if (!receiveMessage(message_queue, filter_expression, attempt_id)) {
         if (!active()) {
           SPDLOG_WARN("Failed to initiate receive message 
request-response-cycle for {}", simpleNameOf(message_queue));
           // TODO: remove it from current assignment such that a second 
attempt will be made again in the next round.
@@ -350,9 +349,9 @@ std::shared_ptr<ProcessQueue> 
PushConsumerImpl::getOrCreateProcessQueue(const rm
       process_queue = process_queue_table_.at(simpleNameOf(message_queue));
     } else {
       SPDLOG_INFO("Create ProcessQueue for message queue[{}]", 
simpleNameOf(message_queue));
-      // create ProcessQueue
-      process_queue =
-          std::make_shared<ProcessQueueImpl>(message_queue, filter_expression, 
shared_from_this(), client_manager_);
+      // create process queue object
+      process_queue = std::make_shared<ProcessQueueImpl>(
+          message_queue, filter_expression, shared_from_this(), 
client_manager_);
       std::shared_ptr<AsyncReceiveMessageCallback> receive_callback =
           std::make_shared<AsyncReceiveMessageCallback>(process_queue);
       process_queue->callback(receive_callback);
@@ -363,7 +362,8 @@ std::shared_ptr<ProcessQueue> 
PushConsumerImpl::getOrCreateProcessQueue(const rm
 }
 
 bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue,
-                                      const FilterExpression& 
filter_expression) {
+                                      const FilterExpression& 
filter_expression,
+                                      std::string& attempt_id) {
   if (!active()) {
     SPDLOG_INFO("PushConsumer has stopped. Drop further receive message 
request");
     return false;
@@ -379,7 +379,7 @@ bool PushConsumerImpl::receiveMessage(const 
rmq::MessageQueue& message_queue,
     SPDLOG_ERROR("Failed to resolve address for brokerName={}", 
message_queue.broker().name());
     return false;
   }
-  process_queue_ptr->receiveMessage();
+  process_queue_ptr->receiveMessage(attempt_id);
   return true;
 }
 
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h 
b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index 5a134428..b19f097b 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -29,11 +29,11 @@ class AsyncReceiveMessageCallback : public 
std::enable_shared_from_this<AsyncRec
 public:
   explicit AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> 
process_queue);
 
-  void onCompletion(const std::error_code& ec, const ReceiveMessageResult& 
result);
+  void onCompletion(const std::error_code& ec, std::string& attempt_id, const 
ReceiveMessageResult& result);
 
-  void receiveMessageLater(std::chrono::milliseconds delay);
+  void receiveMessageLater(std::chrono::milliseconds delay, std::string& 
attempt_id);
 
-  void receiveMessageImmediately();
+  void receiveMessageImmediately(std::string& attempt_id);
 
 private:
   /**
@@ -42,9 +42,9 @@ private:
    */
   std::weak_ptr<ProcessQueue> process_queue_;
 
-  std::function<void(void)> receive_message_later_;
+  std::function<void(std::string)> receive_message_later_;
 
-  void checkThrottleThenReceive();
+  void checkThrottleThenReceive(std::string attempt_id);
 
   static const char* RECEIVE_LATER_TASK_NAME;
 };
diff --git a/cpp/source/rocketmq/include/ProcessQueue.h 
b/cpp/source/rocketmq/include/ProcessQueue.h
index 0e8ce74d..c512be34 100644
--- a/cpp/source/rocketmq/include/ProcessQueue.h
+++ b/cpp/source/rocketmq/include/ProcessQueue.h
@@ -38,7 +38,7 @@ public:
 
   virtual void callback(std::shared_ptr<AsyncReceiveMessageCallback> callback) 
= 0;
 
-  virtual void receiveMessage() = 0;
+  virtual void receiveMessage(std::string& attempt_id) = 0;
 
   virtual std::string topic() const = 0;
 
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h 
b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 75a5d2d3..b811e936 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -63,7 +63,7 @@ public:
 
   std::shared_ptr<ClientManager> getClientManager() override;
 
-  void receiveMessage() override;
+  void receiveMessage(std::string& attempt_id) override;
 
   const std::string& simpleName() const override {
     return simple_name_;
@@ -127,10 +127,10 @@ private:
    */
   std::atomic<uint64_t> cached_message_memory_;
 
-  void popMessage();
+  void popMessage(std::string& attempt_id);
 
   void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& 
metadata,
-                             rmq::ReceiveMessageRequest& request);
+                             rmq::ReceiveMessageRequest& request, std::string& 
attempt_id);
 
   void wrapFilterExpression(rmq::FilterExpression* filter_expression);
 };
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h 
b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 8f360fda..42c3fe49 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -99,8 +99,9 @@ public:
                                                         const 
FilterExpression& filter_expression)
       LOCKS_EXCLUDED(process_queue_table_mtx_);
 
-  bool receiveMessage(const rmq::MessageQueue& message_queue, const 
FilterExpression& filter_expression)
-      LOCKS_EXCLUDED(process_queue_table_mtx_);
+  bool receiveMessage(const rmq::MessageQueue& message_queue,
+                      const FilterExpression& filter_expression,
+                      std::string& attempt_id) 
LOCKS_EXCLUDED(process_queue_table_mtx_);
 
   uint32_t consumeThreadPoolSize() const;
 

Reply via email to