This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new b727270f [ISSUE #969] C++ client support attemptId in PushConsumer (#970) b727270f is described below commit b727270f75e728b23c63b977cf6d233444c97e61 Author: lizhimins <707364...@qq.com> AuthorDate: Thu Mar 27 13:44:06 2025 +0800 [ISSUE #969] C++ client support attemptId in PushConsumer (#970) --- cpp/source/base/Message.cpp | 1 - cpp/source/base/Protocol.cpp | 3 +- .../rocketmq/AsyncReceiveMessageCallback.cpp | 40 ++++++++++-------- cpp/source/rocketmq/ProcessQueueImpl.cpp | 48 ++++++++++++++-------- cpp/source/rocketmq/PushConsumerImpl.cpp | 18 ++++---- .../rocketmq/include/AsyncReceiveMessageCallback.h | 10 ++--- cpp/source/rocketmq/include/ProcessQueue.h | 2 +- cpp/source/rocketmq/include/ProcessQueueImpl.h | 6 +-- cpp/source/rocketmq/include/PushConsumerImpl.h | 5 ++- 9 files changed, 78 insertions(+), 55 deletions(-) diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp index cd3cc236..8d82bd3f 100644 --- a/cpp/source/base/Message.cpp +++ b/cpp/source/base/Message.cpp @@ -20,7 +20,6 @@ #include <memory> #include "UniqueIdGenerator.h" -#include "absl/memory/memory.h" ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/source/base/Protocol.cpp b/cpp/source/base/Protocol.cpp index 91a643b3..70562a2b 100644 --- a/cpp/source/base/Protocol.cpp +++ b/cpp/source/base/Protocol.cpp @@ -72,7 +72,8 @@ bool operator==(const rmq::MessageQueue& lhs, const rmq::MessageQueue& rhs) { } std::string simpleNameOf(const rmq::MessageQueue& m) { - return fmt::format("{}{}-{}-{}", m.topic().resource_namespace(), m.topic().name(), m.id(), m.broker().name()); + return fmt::format("{}@{}@{}@{}", + m.topic().resource_namespace(), m.topic().name(), m.id(), m.broker().name()); } bool operator==(const std::vector<rmq::MessageQueue>& lhs, const std::vector<rmq::MessageQueue>& rhs) { diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp index f68c9f88..d1c3ba30 100644 --- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp +++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp @@ -30,10 +30,14 @@ ROCKETMQ_NAMESPACE_BEGIN AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> process_queue) : process_queue_(std::move(process_queue)) { - receive_message_later_ = std::bind(&AsyncReceiveMessageCallback::checkThrottleThenReceive, this); + + receive_message_later_ = std::bind( + &AsyncReceiveMessageCallback::checkThrottleThenReceive, this, std::placeholders::_1); } -void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const ReceiveMessageResult& result) { +void AsyncReceiveMessageCallback::onCompletion( + const std::error_code& ec, std::string& attempt_id, const ReceiveMessageResult& result) { + std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock(); if (!process_queue) { SPDLOG_INFO("Process queue has been destructed."); @@ -47,18 +51,19 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const if (ec == ErrorCode::TooManyRequests) { SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName()); - receiveMessageLater(std::chrono::milliseconds(20)); + receiveMessageLater(std::chrono::milliseconds(20), attempt_id); return; } if (ec == ErrorCode::NoContent) { - checkThrottleThenReceive(); + checkThrottleThenReceive(""); return; } if (ec) { - SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message()); - receiveMessageLater(std::chrono::seconds (1)); + SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), + ec.message()); + receiveMessageLater(std::chrono::seconds(1), attempt_id); return; } @@ -66,12 +71,12 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const result.source_host, result.messages.size(), process_queue->simpleName()); process_queue->accountCache(result.messages); consumer->getConsumeMessageService()->dispatch(process_queue, result.messages); - checkThrottleThenReceive(); + checkThrottleThenReceive(""); } const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = "receive-later-task"; -void AsyncReceiveMessageCallback::checkThrottleThenReceive() { +void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string attempt_id) { auto process_queue = process_queue_.lock(); if (!process_queue) { SPDLOG_WARN("Process queue should have been destructed"); @@ -82,14 +87,14 @@ void AsyncReceiveMessageCallback::checkThrottleThenReceive() { SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.", process_queue->simpleName()); process_queue->syncIdleState(); - receiveMessageLater(std::chrono::seconds(1)); + receiveMessageLater(std::chrono::seconds(1), attempt_id); } else { // Receive message immediately - receiveMessageImmediately(); + receiveMessageImmediately(attempt_id); } } -void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay) { +void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay, std::string& attempt_id) { auto process_queue = process_queue_.lock(); if (!process_queue) { return; @@ -98,17 +103,18 @@ void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds auto client_instance = process_queue->getClientManager(); std::weak_ptr<AsyncReceiveMessageCallback> receive_callback_weak_ptr(shared_from_this()); - auto task = [receive_callback_weak_ptr]() { + auto task = [receive_callback_weak_ptr, &attempt_id]() { auto async_receive_ptr = receive_callback_weak_ptr.lock(); if (async_receive_ptr) { - async_receive_ptr->checkThrottleThenReceive(); + async_receive_ptr->checkThrottleThenReceive(attempt_id); } }; - client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0)); + client_instance->getScheduler()->schedule( + task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0)); } -void AsyncReceiveMessageCallback::receiveMessageImmediately() { +void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& attempt_id) { auto process_queue_shared_ptr = process_queue_.lock(); if (!process_queue_shared_ptr) { SPDLOG_INFO("ProcessQueue has been released. Ignore further receive message request-response cycles"); @@ -121,7 +127,9 @@ void AsyncReceiveMessageCallback::receiveMessageImmediately() { process_queue_shared_ptr->simpleName()); return; } - impl->receiveMessage(process_queue_shared_ptr->messageQueue(), process_queue_shared_ptr->getFilterExpression()); + + impl->receiveMessage(process_queue_shared_ptr->messageQueue(), + process_queue_shared_ptr->getFilterExpression(), attempt_id); } ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp b/cpp/source/rocketmq/ProcessQueueImpl.cpp index 002325c0..59df8578 100644 --- a/cpp/source/rocketmq/ProcessQueueImpl.cpp +++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp @@ -16,14 +16,13 @@ */ #include "ProcessQueueImpl.h" -#include <atomic> #include <chrono> #include <memory> #include <system_error> #include <utility> +#include "UniqueIdGenerator.h" #include "AsyncReceiveMessageCallback.h" -#include "ClientManagerImpl.h" #include "MetadataConstants.h" #include "Protocol.h" #include "PushConsumerImpl.h" @@ -98,39 +97,39 @@ bool ProcessQueueImpl::shouldThrottle() const { return false; } -void ProcessQueueImpl::receiveMessage() { +void ProcessQueueImpl::receiveMessage(std::string& attempt_id) { auto consumer = consumer_.lock(); if (!consumer) { return; } - - popMessage(); + popMessage(attempt_id); } -void ProcessQueueImpl::popMessage() { +void ProcessQueueImpl::popMessage(std::string& attempt_id) { rmq::ReceiveMessageRequest request; absl::flat_hash_map<std::string, std::string> metadata; auto consumer_client = consumer_.lock(); if (!consumer_client) { return; } + Signature::sign(consumer_client->config(), metadata); - wrapPopMessageRequest(metadata, request); + wrapPopMessageRequest(metadata, request, attempt_id); syncIdleState(); - SPDLOG_DEBUG("Try to pop message from {}", simpleNameOf(message_queue_)); + SPDLOG_DEBUG("Receive message from={}, attemptId={}", simpleNameOf(message_queue_), attempt_id); std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_}; - auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& result) { - std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock(); - if (recv_cb) { - recv_cb->onCompletion(ec, result); + auto callback = + [cb, &attempt_id](const std::error_code& ec, const ReceiveMessageResult& result) { + std::shared_ptr<AsyncReceiveMessageCallback> receive_cb = cb.lock(); + if (receive_cb) { + receive_cb->onCompletion(ec, attempt_id, result); } }; - client_manager_->receiveMessage(urlOf(message_queue_), metadata, request, - absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout + - consumer_client->config().request_timeout), - callback); + auto timeout = absl::ToChronoMilliseconds( + consumer_client->config().subscriber.polling_timeout + consumer_client->config().request_timeout); + client_manager_->receiveMessage(urlOf(message_queue_), metadata, request, timeout, callback); } void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& messages) { @@ -184,8 +183,18 @@ void ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expres } } +void generateAttemptId(std::string& attempt_id) { + const std::string unique_id = UniqueIdGenerator::instance().next(); + if (unique_id.size() < 34) { + return; + } + attempt_id = fmt::format( + "{}-{}-{}-{}-{}", unique_id.substr(0, 8), unique_id.substr(8, 4), + unique_id.substr(12, 4), unique_id.substr(16, 4), unique_id.substr(20, 12)); +} + void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata, - rmq::ReceiveMessageRequest& request) { + rmq::ReceiveMessageRequest& request, std::string& attempt_id) { std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock(); assert(consumer); request.mutable_group()->CopyFrom(consumer->config().subscriber.group); @@ -205,6 +214,11 @@ void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, st auto fraction = invisible_time_ - std::chrono::duration_cast<std::chrono::seconds>(invisible_time_); int32_t nano_seconds = static_cast<int32_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(fraction).count()); request.mutable_invisible_duration()->set_nanos(nano_seconds); + + if (attempt_id.empty()) { + generateAttemptId(attempt_id); + } + request.set_attempt_id(attempt_id); } std::weak_ptr<PushConsumerImpl> ProcessQueueImpl::getConsumer() { diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index 712ac814..68bb9f67 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -16,10 +16,8 @@ */ #include "PushConsumerImpl.h" -#include <atomic> #include <cassert> #include <chrono> -#include <cstdint> #include <cstdlib> #include <string> #include <system_error> @@ -324,9 +322,10 @@ void PushConsumerImpl::syncProcessQueue(const std::string& topic, for (const auto& message_queue : message_queue_list) { if (std::none_of(current.cbegin(), current.cend(), [&](const rmq::MessageQueue& item) { return item == message_queue; })) { - SPDLOG_INFO("Start to receive message from {} according to latest assignment info from load balancer", + SPDLOG_DEBUG("Start to receive message from {} according to latest assignment info from load balancer", simpleNameOf(message_queue)); - if (!receiveMessage(message_queue, filter_expression)) { + std::string attempt_id; + if (!receiveMessage(message_queue, filter_expression, attempt_id)) { if (!active()) { SPDLOG_WARN("Failed to initiate receive message request-response-cycle for {}", simpleNameOf(message_queue)); // TODO: remove it from current assignment such that a second attempt will be made again in the next round. @@ -350,9 +349,9 @@ std::shared_ptr<ProcessQueue> PushConsumerImpl::getOrCreateProcessQueue(const rm process_queue = process_queue_table_.at(simpleNameOf(message_queue)); } else { SPDLOG_INFO("Create ProcessQueue for message queue[{}]", simpleNameOf(message_queue)); - // create ProcessQueue - process_queue = - std::make_shared<ProcessQueueImpl>(message_queue, filter_expression, shared_from_this(), client_manager_); + // create process queue object + process_queue = std::make_shared<ProcessQueueImpl>( + message_queue, filter_expression, shared_from_this(), client_manager_); std::shared_ptr<AsyncReceiveMessageCallback> receive_callback = std::make_shared<AsyncReceiveMessageCallback>(process_queue); process_queue->callback(receive_callback); @@ -363,7 +362,8 @@ std::shared_ptr<ProcessQueue> PushConsumerImpl::getOrCreateProcessQueue(const rm } bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue, - const FilterExpression& filter_expression) { + const FilterExpression& filter_expression, + std::string& attempt_id) { if (!active()) { SPDLOG_INFO("PushConsumer has stopped. Drop further receive message request"); return false; @@ -379,7 +379,7 @@ bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue, SPDLOG_ERROR("Failed to resolve address for brokerName={}", message_queue.broker().name()); return false; } - process_queue_ptr->receiveMessage(); + process_queue_ptr->receiveMessage(attempt_id); return true; } diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h index 5a134428..b19f097b 100644 --- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h +++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h @@ -29,11 +29,11 @@ class AsyncReceiveMessageCallback : public std::enable_shared_from_this<AsyncRec public: explicit AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> process_queue); - void onCompletion(const std::error_code& ec, const ReceiveMessageResult& result); + void onCompletion(const std::error_code& ec, std::string& attempt_id, const ReceiveMessageResult& result); - void receiveMessageLater(std::chrono::milliseconds delay); + void receiveMessageLater(std::chrono::milliseconds delay, std::string& attempt_id); - void receiveMessageImmediately(); + void receiveMessageImmediately(std::string& attempt_id); private: /** @@ -42,9 +42,9 @@ private: */ std::weak_ptr<ProcessQueue> process_queue_; - std::function<void(void)> receive_message_later_; + std::function<void(std::string)> receive_message_later_; - void checkThrottleThenReceive(); + void checkThrottleThenReceive(std::string attempt_id); static const char* RECEIVE_LATER_TASK_NAME; }; diff --git a/cpp/source/rocketmq/include/ProcessQueue.h b/cpp/source/rocketmq/include/ProcessQueue.h index 0e8ce74d..c512be34 100644 --- a/cpp/source/rocketmq/include/ProcessQueue.h +++ b/cpp/source/rocketmq/include/ProcessQueue.h @@ -38,7 +38,7 @@ public: virtual void callback(std::shared_ptr<AsyncReceiveMessageCallback> callback) = 0; - virtual void receiveMessage() = 0; + virtual void receiveMessage(std::string& attempt_id) = 0; virtual std::string topic() const = 0; diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h b/cpp/source/rocketmq/include/ProcessQueueImpl.h index 75a5d2d3..b811e936 100644 --- a/cpp/source/rocketmq/include/ProcessQueueImpl.h +++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h @@ -63,7 +63,7 @@ public: std::shared_ptr<ClientManager> getClientManager() override; - void receiveMessage() override; + void receiveMessage(std::string& attempt_id) override; const std::string& simpleName() const override { return simple_name_; @@ -127,10 +127,10 @@ private: */ std::atomic<uint64_t> cached_message_memory_; - void popMessage(); + void popMessage(std::string& attempt_id); void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata, - rmq::ReceiveMessageRequest& request); + rmq::ReceiveMessageRequest& request, std::string& attempt_id); void wrapFilterExpression(rmq::FilterExpression* filter_expression); }; diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h index 8f360fda..42c3fe49 100644 --- a/cpp/source/rocketmq/include/PushConsumerImpl.h +++ b/cpp/source/rocketmq/include/PushConsumerImpl.h @@ -99,8 +99,9 @@ public: const FilterExpression& filter_expression) LOCKS_EXCLUDED(process_queue_table_mtx_); - bool receiveMessage(const rmq::MessageQueue& message_queue, const FilterExpression& filter_expression) - LOCKS_EXCLUDED(process_queue_table_mtx_); + bool receiveMessage(const rmq::MessageQueue& message_queue, + const FilterExpression& filter_expression, + std::string& attempt_id) LOCKS_EXCLUDED(process_queue_table_mtx_); uint32_t consumeThreadPoolSize() const;