This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 28a0054 Unify standard and FIFO message processing implementation
28a0054 is described below
commit 28a0054ce8b41cbbcad9dbef43c188f55dabc621
Author: Li Zhanhui <[email protected]>
AuthorDate: Mon May 23 10:33:29 2022 +0800
Unify standard and FIFO message processing implementation
---
src/main/cpp/client/ClientManagerImpl.cpp | 29 +-
src/main/cpp/client/include/ClientManager.h | 2 +-
src/main/cpp/client/include/ClientManagerImpl.h | 9 +-
.../cpp/rocketmq/AsyncReceiveMessageCallback.cpp | 3 +-
.../cpp/rocketmq/ConsumeFifoMessageService.cpp | 322 ---------------------
.../cpp/rocketmq/ConsumeMessageServiceBase.cpp | 104 -------
.../cpp/rocketmq/ConsumeMessageServiceImpl.cpp | 125 ++++++++
.../cpp/rocketmq/ConsumeStandardMessageService.cpp | 275 ------------------
src/main/cpp/rocketmq/ConsumeTask.cpp | 167 +++++++++++
src/main/cpp/rocketmq/PushConsumerImpl.cpp | 41 +--
.../rocketmq/include/ConsumeFifoMessageService.h | 55 ----
.../cpp/rocketmq/include/ConsumeMessageService.h | 27 +-
.../rocketmq/include/ConsumeMessageServiceBase.h | 101 -------
.../rocketmq/include/ConsumeMessageServiceImpl.h | 85 ++++++
.../include/ConsumeStandardMessageService.h | 46 ---
src/main/cpp/rocketmq/include/ConsumeTask.h | 87 ++++++
src/main/cpp/rocketmq/include/PushConsumerImpl.h | 4 +-
.../cpp/rocketmq/mocks/include/PushConsumerMock.h | 2 -
18 files changed, 526 insertions(+), 958 deletions(-)
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp
b/src/main/cpp/client/ClientManagerImpl.cpp
index 25fc559..a5fe96a 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -957,10 +957,11 @@ void ClientManagerImpl::endTransaction(
client->asyncEndTransaction(request, invocation_context);
}
-void ClientManagerImpl::forwardMessageToDeadLetterQueue(
- const std::string& target_host, const Metadata& metadata, const
ForwardMessageToDeadLetterQueueRequest& request,
- std::chrono::milliseconds timeout,
- const std::function<void(const
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) {
+void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string&
target_host,
+ const Metadata&
metadata,
+ const
ForwardMessageToDeadLetterQueueRequest& request,
+
std::chrono::milliseconds timeout,
+ const
std::function<void(const std::error_code&)>& cb) {
SPDLOG_DEBUG("ForwardMessageToDeadLetterQueue Request: {}",
request.DebugString());
auto client = getRpcClient(target_host);
auto invocation_context = new
InvocationContext<ForwardMessageToDeadLetterQueueResponse>();
@@ -977,12 +978,28 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(
if (!invocation_context->status.ok()) {
SPDLOG_WARN("Failed to transmit SendMessageToDeadLetterQueueRequest to
host={}",
invocation_context->remote_address);
- cb(invocation_context);
+ std::error_code ec = ErrorCode::BadRequest;
+ cb(ec);
return;
}
SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from
server[host={}]", invocation_context->remote_address);
- cb(invocation_context);
+ std::error_code ec;
+ switch (invocation_context->response.status().code()) {
+ case rmq::Code::OK: {
+ break;
+ }
+ case rmq::Code::INTERNAL_SERVER_ERROR: {
+ ec = ErrorCode::ServiceUnavailable;
+ }
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ ec = ErrorCode::TooManyRequest;
+ }
+ default: {
+ ec = ErrorCode::NotImplemented;
+ }
+ }
+ cb(ec);
};
invocation_context->callback = callback;
client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
diff --git a/src/main/cpp/client/include/ClientManager.h
b/src/main/cpp/client/include/ClientManager.h
index c85c7ec..99ec6ad 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -72,7 +72,7 @@ public:
virtual void forwardMessageToDeadLetterQueue(
const std::string& target_host, const Metadata& metadata, const
ForwardMessageToDeadLetterQueueRequest& request,
std::chrono::milliseconds timeout,
- const std::function<void(const
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) = 0;
+ const std::function<void(const std::error_code&)>& cb) = 0;
virtual void endTransaction(const std::string& target_host, const Metadata&
metadata,
const EndTransactionRequest& request,
std::chrono::milliseconds timeout,
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h
b/src/main/cpp/client/include/ClientManagerImpl.h
index 50d8b05..63b0713 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -150,10 +150,11 @@ public:
const ChangeInvisibleDurationRequest&,
std::chrono::milliseconds timeout,
const std::function<void(const
std::error_code&)>&) override;
- void forwardMessageToDeadLetterQueue(
- const std::string& target_host, const Metadata& metadata, const
ForwardMessageToDeadLetterQueueRequest& request,
- std::chrono::milliseconds timeout,
- const std::function<void(const
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) override;
+ void forwardMessageToDeadLetterQueue(const std::string& target_host,
+ const Metadata& metadata,
+ const
ForwardMessageToDeadLetterQueueRequest& request,
+ std::chrono::milliseconds timeout,
+ const std::function<void(const
std::error_code&)>& cb) override;
/**
* End a transaction asynchronously.
diff --git a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
index a9df915..c1b57d4 100644
--- a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -52,7 +52,8 @@ void AsyncReceiveMessageCallback::onCompletion(const
std::error_code& ec, const
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with
status=FOUND, msgListSize={}, queue={}",
result.source_host, result.messages.size(),
process_queue->simpleName());
process_queue->cacheMessages(result.messages);
- consumer->getConsumeMessageService()->signalDispatcher();
+
+ consumer->getConsumeMessageService()->dispatch(process_queue,
result.messages);
checkThrottleThenReceive();
}
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
deleted file mode 100644
index aa99543..0000000
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <chrono>
-#include <functional>
-#include <limits>
-#include <system_error>
-
-#include "TracingUtility.h"
-#include "absl/strings/str_join.h"
-
-#include "opencensus/trace/propagation/trace_context.h"
-#include "opencensus/trace/span.h"
-
-#include "ConsumeFifoMessageService.h"
-#include "MixAll.h"
-#include "ProcessQueue.h"
-#include "PushConsumerImpl.h"
-#include "rocketmq/MessageListener.h"
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ConsumeFifoMessageService::ConsumeFifoMessageService(std::weak_ptr<PushConsumerImpl>
consumer, int thread_count,
- MessageListener
message_listener)
- : ConsumeMessageServiceBase(std::move(consumer), thread_count,
message_listener) {
-}
-
-void ConsumeFifoMessageService::start() {
- ConsumeMessageServiceBase::start();
- State expected = State::STARTING;
- if (state_.compare_exchange_strong(expected, State::STARTED)) {
- SPDLOG_DEBUG("ConsumeMessageOrderlyService started");
- }
-}
-
-void ConsumeFifoMessageService::shutdown() {
- // Wait till consume-message-orderly-service has fully started; otherwise,
we may potentially miss closing resources
- // in concurrent scenario.
- while (State::STARTING == state_.load(std::memory_order_relaxed)) {
- absl::SleepFor(absl::Milliseconds(10));
- }
-
- State expected = State::STARTED;
- if (state_.compare_exchange_strong(expected, STOPPING)) {
- ConsumeMessageServiceBase::shutdown();
- SPDLOG_INFO("ConsumeMessageOrderlyService shut down");
- }
-}
-
-void ConsumeFifoMessageService::submitConsumeTask0(const
std::shared_ptr<PushConsumerImpl>& consumer,
- const
std::weak_ptr<ProcessQueue>& process_queue,
- MessageConstSharedPtr
message) {
- // In case custom executor is used.
- const auto& custom_executor = consumer->customExecutor();
- if (custom_executor) {
- std::function<void(void)> consume_task =
- std::bind(&ConsumeFifoMessageService::consumeTask, this,
process_queue, message);
- custom_executor(consume_task);
- SPDLOG_DEBUG("Submit FIFO consume task to custom executor");
- return;
- }
-
- // submit batch message
- std::function<void(void)> consume_task =
- std::bind(&ConsumeFifoMessageService::consumeTask, this, process_queue,
message);
- SPDLOG_DEBUG("Submit FIFO consume task to thread pool");
- pool_->submit(consume_task);
-}
-
-void ConsumeFifoMessageService::submitConsumeTask(const
std::weak_ptr<ProcessQueue>& process_queue) {
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- SPDLOG_INFO("Process queue has destructed");
- return;
- }
-
- auto consumer = consumer_.lock();
- if (!consumer) {
- SPDLOG_INFO("Consumer has destructed");
- return;
- }
-
- if (process_queue_ptr->bindFifoConsumeTask()) {
- std::vector<MessageConstSharedPtr> messages;
- process_queue_ptr->take(1, messages);
- if (!messages.empty()) {
- assert(1 == messages.size());
- submitConsumeTask0(consumer, process_queue,
std::move(*messages.begin()));
- }
- }
-}
-
-void ConsumeFifoMessageService::consumeTask(const std::weak_ptr<ProcessQueue>&
process_queue,
- MessageConstSharedPtr message) {
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- return;
- }
- const std::string& topic = message->topic();
- ConsumeResult result;
- std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
- // consumer might have been destructed.
- if (!consumer) {
- return;
- }
-
- std::shared_ptr<RateLimiter<10>> rate_limiter = rateLimiter(topic);
- if (rate_limiter) {
- rate_limiter->acquire();
- SPDLOG_DEBUG("Rate-limit permit acquired");
- }
-
- // Record await-consumption-span
- if (message->traceContext().has_value()) {
- auto span_context =
opencensus::trace::propagation::FromTraceParentHeader(message->traceContext().value());
-
- auto span = opencensus::trace::Span::BlankSpan();
- std::string span_name =
- consumer->resourceNamespace() + "/" + topic + " " +
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
- if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name,
span_context, traceSampler());
- } else {
- span = opencensus::trace::Span::StartSpan(span_name, nullptr,
{traceSampler()});
- }
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
- MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
- MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
- TracingUtility::addUniversalSpanAttributes(*message, consumer->config(),
span);
- //
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
message.extension.store_time);
- absl::Time decoded_timestamp =
absl::FromChrono(message->extension().decode_time);
- span.AddAnnotation(
- MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
- {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
-
opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp
- absl::UnixEpoch()))}});
- span.End();
- // message.message.traceContext()
- // MessageAccessor::setTraceContext(const_cast<MessageExt&>(message),
- //
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
- }
-
- auto span_context =
opencensus::trace::propagation::FromTraceParentHeader(message->traceContext().value());
- auto span = opencensus::trace::Span::BlankSpan();
- std::string span_name = consumer->resourceNamespace() + "/" +
message->topic() + " " +
-
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
- if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name,
span_context);
- } else {
- span = opencensus::trace::Span::StartSpan(span_name);
- }
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
- MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
- MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
- TracingUtility::addUniversalSpanAttributes(*message, consumer->config(),
span);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT,
std::to_string(message->extension().delivery_attempt));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
- MixAll::format(message->extension().store_time));
-
- // MessageAccessor::setTraceContext(const_cast<MessageExt&>(message),
- //
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
- auto steady_start = std::chrono::steady_clock::now();
-
- try {
- result = message_listener_(*message);
- } catch (...) {
- result = ConsumeResult::FAILURE;
- SPDLOG_ERROR("Business FIFO callback raised an exception when
consumeMessage");
- }
-
- switch (result) {
- case ConsumeResult::SUCCESS:
- span.SetStatus(opencensus::trace::StatusCode::OK);
- break;
- case ConsumeResult::FAILURE:
- span.SetStatus(opencensus::trace::StatusCode::UNKNOWN);
- break;
- }
- span.End();
-
- auto duration = std::chrono::steady_clock::now() - steady_start;
-
- // Log client consume-time costs
- SPDLOG_DEBUG("Business callback spent {}ms processing message[Topic={},
MessageId={}].",
- MixAll::millisecondsOf(duration), message->topic(),
message->id());
-
- if (result == ConsumeResult::SUCCESS) {
- // Release message number and memory quota
- process_queue_ptr->release(message->body().size());
-
- // Ensure current message is acked before moving to the next message.
- auto callback = std::bind(&ConsumeFifoMessageService::onAck, this,
process_queue, message, std::placeholders::_1);
- consumer->ack(*message, callback);
- } else {
- const Message* ptr = message.get();
- Message* raw = const_cast<Message*>(ptr);
- raw->mutableExtension().delivery_attempt++;
-
- if (message->extension().delivery_attempt <
consumer->maxDeliveryAttempts()) {
- auto task = std::bind(&ConsumeFifoMessageService::scheduleConsumeTask,
this, process_queue, message);
- consumer->schedule("Scheduled-Consume-FIFO-Message-Task", task,
std::chrono::seconds(1));
- } else {
- auto callback =
std::bind(&ConsumeFifoMessageService::onForwardToDeadLetterQueue, this,
process_queue, message,
- std::placeholders::_1);
- consumer->forwardToDeadLetterQueue(*message, callback);
- }
- }
-}
-
-void ConsumeFifoMessageService::onAck(const std::weak_ptr<ProcessQueue>&
process_queue,
- MessageConstSharedPtr message,
- const std::error_code& ec) {
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- SPDLOG_WARN("ProcessQueue has destructed.");
- return;
- }
-
- if (ec) {
- SPDLOG_WARN("Failed to acknowledge FIFO message[MessageQueue={},
MsgId={}]. Cause: {}",
- process_queue_ptr->simpleName(), message->id(), ec.message());
- auto consumer = consumer_.lock();
- if (!consumer) {
- SPDLOG_WARN("Consumer instance has destructed");
- return;
- }
- auto task = std::bind(&ConsumeFifoMessageService::scheduleAckTask, this,
process_queue, message);
- int32_t duration = 100;
- consumer->schedule("Ack-FIFO-Message-On-Failure", task,
std::chrono::milliseconds(duration));
- SPDLOG_INFO("Scheduled to ack message[Topic={}, MessageId={}] in {}ms",
message->topic(), message->id(), duration);
- } else {
- SPDLOG_DEBUG("Acknowledge FIFO message[MessageQueue={}, MsgId={}] OK",
process_queue_ptr->simpleName(),
- message->id());
- process_queue_ptr->unbindFifoConsumeTask();
- submitConsumeTask(process_queue);
- }
-}
-
-void ConsumeFifoMessageService::onForwardToDeadLetterQueue(const
std::weak_ptr<ProcessQueue>& process_queue,
-
MessageConstSharedPtr message,
- bool ok) {
- if (ok) {
- SPDLOG_DEBUG("Forward message[Topic={}, MessagId={}] to DLQ OK",
message->topic(), message->id());
- auto process_queue_ptr = process_queue.lock();
- if (process_queue_ptr) {
- process_queue_ptr->unbindFifoConsumeTask();
- }
- return;
- }
-
- SPDLOG_INFO("Failed to forward message[topic={}, MessageId={}] to DLQ",
message->topic(), message->id());
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- SPDLOG_INFO("Abort further attempts considering its process queue has
destructed");
- return;
- }
-
- auto consumer = consumer_.lock();
- assert(consumer);
-
- auto task =
std::bind(&ConsumeFifoMessageService::scheduleForwardDeadLetterQueueTask, this,
process_queue, message);
- consumer->schedule("Scheduled-Forward-DLQ-Task", task,
std::chrono::milliseconds(100));
-}
-
-void ConsumeFifoMessageService::scheduleForwardDeadLetterQueueTask(const
std::weak_ptr<ProcessQueue>& process_queue,
-
MessageConstSharedPtr message) {
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- return;
- }
- auto consumer = consumer_.lock();
- assert(consumer);
- auto callback =
std::bind(&ConsumeFifoMessageService::onForwardToDeadLetterQueue, this,
process_queue, message,
- std::placeholders::_1);
- consumer->forwardToDeadLetterQueue(*message, callback);
-}
-
-void ConsumeFifoMessageService::scheduleAckTask(const
std::weak_ptr<ProcessQueue>& process_queue,
- MessageConstSharedPtr message)
{
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- return;
- }
-
- auto callback = std::bind(&ConsumeFifoMessageService::onAck, this,
process_queue, message, std::placeholders::_1);
- auto consumer = consumer_.lock();
- if (consumer) {
- consumer->ack(*message, callback);
- }
-}
-
-void ConsumeFifoMessageService::scheduleConsumeTask(const
std::weak_ptr<ProcessQueue>& process_queue,
- MessageConstSharedPtr
message) {
- auto consumer_ptr = consumer_.lock();
- if (!consumer_ptr) {
- return;
- }
-
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- return;
- }
-
- submitConsumeTask0(consumer_ptr, process_queue_ptr, message);
- SPDLOG_INFO("Business callback failed to process FIFO messages. Re-submit
consume task back to thread pool");
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeMessageServiceBase.cpp
b/src/main/cpp/rocketmq/ConsumeMessageServiceBase.cpp
deleted file mode 100644
index 7190422..0000000
--- a/src/main/cpp/rocketmq/ConsumeMessageServiceBase.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeMessageServiceBase.h"
-#include "LoggerImpl.h"
-#include "PushConsumerImpl.h"
-#include "ThreadPoolImpl.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ConsumeMessageServiceBase::ConsumeMessageServiceBase(std::weak_ptr<PushConsumerImpl>
consumer, int thread_count,
- MessageListener
message_listener)
- : state_(State::CREATED), thread_count_(thread_count),
pool_(absl::make_unique<ThreadPoolImpl>(thread_count_)),
- consumer_(std::move(consumer)), message_listener_(message_listener) {
-}
-
-void ConsumeMessageServiceBase::start() {
- State expected = State::CREATED;
- if (state_.compare_exchange_strong(expected, State::STARTING,
std::memory_order_relaxed)) {
- pool_->start();
- dispatch_thread_ = std::thread([this] {
- State current_state = state_.load(std::memory_order_relaxed);
- while (State::STOPPED != current_state && State::STOPPING !=
current_state) {
- dispatch();
- {
- absl::MutexLock lk(&dispatch_mtx_);
- dispatch_cv_.WaitWithTimeout(&dispatch_mtx_,
absl::Milliseconds(100));
- }
-
- // Update current state
- current_state = state_.load(std::memory_order_relaxed);
- }
- });
- }
-}
-
-void ConsumeMessageServiceBase::signalDispatcher() {
- absl::MutexLock lk(&dispatch_mtx_);
- // Wake up dispatch_thread_
- dispatch_cv_.Signal();
-}
-
-void ConsumeMessageServiceBase::throttle(const std::string& topic,
std::uint32_t threshold) {
- absl::MutexLock lk(&rate_limiter_table_mtx_);
- std::shared_ptr<RateLimiter<10>> rate_limiter =
std::make_shared<RateLimiter<10>>(threshold);
- rate_limiter_table_.insert_or_assign(topic, rate_limiter);
- rate_limiter_observer_.subscribe(rate_limiter);
-}
-
-void ConsumeMessageServiceBase::shutdown() {
- State expected = State::STOPPING;
- if (state_.compare_exchange_strong(expected, State::STOPPED,
std::memory_order_relaxed)) {
- pool_->shutdown();
- {
- absl::MutexLock lk(&dispatch_mtx_);
- dispatch_cv_.SignalAll();
- }
-
- if (dispatch_thread_.joinable()) {
- dispatch_thread_.join();
- }
-
- rate_limiter_observer_.stop();
- }
-}
-
-bool ConsumeMessageServiceBase::hasConsumeRateLimiter(const std::string&
topic) const {
- absl::MutexLock lk(&rate_limiter_table_mtx_);
- return rate_limiter_table_.contains(topic);
-}
-
-std::shared_ptr<RateLimiter<10>> ConsumeMessageServiceBase::rateLimiter(const
std::string& topic) const {
- if (!hasConsumeRateLimiter(topic)) {
- return nullptr;
- }
- absl::MutexLock lk(&rate_limiter_table_mtx_);
- return rate_limiter_table_[topic];
-}
-
-void ConsumeMessageServiceBase::dispatch() {
- std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
- if (!consumer) {
- SPDLOG_WARN("The consumer has already destructed");
- return;
- }
-
- auto callback = [this](const std::shared_ptr<ProcessQueue>& process_queue) {
submitConsumeTask(process_queue); };
- consumer->iterateProcessQueue(callback);
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
new file mode 100644
index 0000000..9c9ad49
--- /dev/null
+++ b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConsumeMessageServiceImpl.h"
+
+#include "ConsumeTask.h"
+#include "LoggerImpl.h"
+#include "PushConsumerImpl.h"
+#include "ThreadPoolImpl.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+ConsumeMessageServiceImpl::ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl>
consumer,
+ int thread_count,
+ MessageListener
message_listener)
+ : state_(State::CREATED),
+ thread_count_(thread_count),
+ pool_(absl::make_unique<ThreadPoolImpl>(thread_count_)),
+ consumer_(std::move(consumer)),
+ message_listener_(message_listener) {
+}
+
+void ConsumeMessageServiceImpl::start() {
+ State expected = State::CREATED;
+ if (state_.compare_exchange_strong(expected, State::STARTING,
std::memory_order_relaxed)) {
+ pool_->start();
+ }
+}
+
+void ConsumeMessageServiceImpl::shutdown() {
+ State expected = State::STOPPING;
+ if (state_.compare_exchange_strong(expected, State::STOPPED,
std::memory_order_relaxed)) {
+ pool_->shutdown();
+ }
+}
+
+void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue>
process_queue,
+ std::vector<MessageConstSharedPtr>
messages) {
+ auto consumer = consumer_.lock();
+ if (!consumer) {
+ return;
+ }
+
+ if (consumer->config().subscriber.fifo) {
+ auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(),
process_queue, std::move(messages));
+ pool_->submit([consume_task]() { consume_task->process(); });
+ return;
+ }
+
+ for (auto message : messages) {
+ auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(),
process_queue, message);
+ pool_->submit([consume_task]() { consume_task->process(); });
+ }
+}
+
+void ConsumeMessageServiceImpl::submit(std::shared_ptr<ConsumeTask> task) {
+ auto consumer = consumer_.lock();
+ if (!consumer) {
+ return;
+ }
+
+ pool_->submit([task]() { task->process(); });
+}
+
+void ConsumeMessageServiceImpl::ack(const Message& message,
std::function<void(const std::error_code&)> cb) {
+ auto consumer = consumer_.lock();
+ if (!consumer) {
+ return;
+ }
+
+ consumer->ack(message, cb);
+}
+
+void ConsumeMessageServiceImpl::nack(const Message& message,
std::function<void(const std::error_code&)> cb) {
+ auto consumer = consumer_.lock();
+ if (!consumer) {
+ return;
+ }
+
+ consumer->nack(message, cb);
+}
+
+void ConsumeMessageServiceImpl::forward(const Message& message,
std::function<void(const std::error_code&)> cb) {
+ auto consumer = consumer_.lock();
+ if (!consumer) {
+ return;
+ }
+ consumer->forwardToDeadLetterQueue(message, cb);
+}
+
+void ConsumeMessageServiceImpl::schedule(std::shared_ptr<ConsumeTask> task,
std::chrono::milliseconds delay) {
+}
+
+std::size_t ConsumeMessageServiceImpl::maxDeliveryAttempt() {
+ std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
+ if (!consumer) {
+ SPDLOG_WARN("The consumer has already destructed");
+ return 0;
+ }
+
+ return consumer->maxDeliveryAttempts();
+}
+
+bool ConsumeMessageServiceImpl::preHandle(const Message& message) {
+ return true;
+}
+
+bool ConsumeMessageServiceImpl::postHandle(const Message& message,
ConsumeResult result) {
+ return true;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
deleted file mode 100644
index b478df9..0000000
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeStandardMessageService.h"
-
-#include <limits>
-#include <string>
-#include <system_error>
-#include <utility>
-
-#include "LoggerImpl.h"
-#include "MessageExt.h"
-#include "MixAll.h"
-#include "Protocol.h"
-#include "PushConsumerImpl.h"
-#include "TracingUtility.h"
-#include "UtilAll.h"
-#include "absl/memory/memory.h"
-#include "absl/strings/str_join.h"
-#include "absl/time/time.h"
-#include "absl/types/span.h"
-#include "rocketmq/Message.h"
-#include "rocketmq/MessageListener.h"
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ConsumeStandardMessageService::ConsumeStandardMessageService(std::weak_ptr<PushConsumerImpl>
consumer, int thread_count,
- MessageListener
message_listener)
- : ConsumeMessageServiceBase(std::move(consumer), thread_count,
message_listener) {
-}
-
-void ConsumeStandardMessageService::start() {
- ConsumeMessageServiceBase::start();
- State expected = State::STARTING;
- if (state_.compare_exchange_strong(expected, State::STARTED)) {
- SPDLOG_DEBUG("ConsumeMessageConcurrentlyService started");
- }
-}
-
-void ConsumeStandardMessageService::shutdown() {
- while (State::STARTING == state_.load(std::memory_order_relaxed)) {
- absl::SleepFor(absl::Milliseconds(10));
- }
-
- State expected = State::STARTED;
- if (state_.compare_exchange_strong(expected, State::STOPPING)) {
- ConsumeMessageServiceBase::shutdown();
- SPDLOG_DEBUG("ConsumeMessageConcurrentlyService shut down");
- }
-}
-
-void ConsumeStandardMessageService::submitConsumeTask(const
std::weak_ptr<ProcessQueue>& process_queue) {
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr) {
- SPDLOG_WARN("ProcessQueue was destructed. It is likely that client should
have shutdown.");
- return;
- }
- std::shared_ptr<PushConsumerImpl> consumer =
process_queue_ptr->getConsumer().lock();
-
- if (!consumer) {
- return;
- }
-
- std::string topic = process_queue_ptr->topic();
- bool has_more = true;
- std::weak_ptr<ConsumeStandardMessageService> service(shared_from_this());
- while (has_more) {
- std::vector<MessageConstSharedPtr> messages;
- uint32_t batch_size = 1;
- has_more = process_queue_ptr->take(batch_size, messages);
- if (messages.empty()) {
- assert(!has_more);
- break;
- }
-
- // In case custom executor is used.
- const Executor& custom_executor = consumer->customExecutor();
- if (custom_executor) {
- std::function<void(void)> consume_task =
- std::bind(&ConsumeStandardMessageService::consumeTask, service,
process_queue, messages);
- custom_executor(consume_task);
- SPDLOG_DEBUG("Submit consumer task to custom executor with
message-batch-size={}", messages.size());
- continue;
- }
-
- // submit batch message
- std::function<void(void)> consume_task =
- std::bind(&ConsumeStandardMessageService::consumeTask, service,
process_queue_ptr, messages);
- SPDLOG_DEBUG("Submit consumer task to thread pool with
message-batch-size={}", messages.size());
- pool_->submit(consume_task);
- }
-}
-
-void
ConsumeStandardMessageService::consumeTask(std::weak_ptr<ConsumeStandardMessageService>
service,
- const
std::weak_ptr<ProcessQueue>& process_queue,
- const
std::vector<MessageConstSharedPtr>& msgs) {
- auto process_queue_ptr = process_queue.lock();
- if (!process_queue_ptr || msgs.empty()) {
- return;
- }
-
- auto svc = service.lock();
- if (!svc) {
- return;
- }
-
- auto process_queue_shared_ptr = process_queue.lock();
- if (!process_queue_shared_ptr) {
- return;
- }
-
- svc->consume(process_queue_shared_ptr, msgs);
-}
-
-void ConsumeStandardMessageService::consume(const
std::shared_ptr<ProcessQueue>& process_queue,
- const
std::vector<MessageConstSharedPtr>& msgs) {
- std::string topic = (*msgs.begin())->topic();
- ConsumeResult status;
-
- std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
- // consumer might have been destructed.
- if (!consumer) {
- return;
- }
-
- std::shared_ptr<RateLimiter<10>> rate_limiter = rateLimiter(topic);
- if (rate_limiter) {
- // Acquire permits one-by-one to avoid large batch hungry issue.
- for (std::size_t i = 0; i < msgs.size(); i++) {
- rate_limiter->acquire();
- }
- SPDLOG_DEBUG("{} rate-limit permits acquired", msgs.size());
- }
-
- // Record await-consumption-span
- {
- for (const auto& msg : msgs) {
- if (!msg->traceContext().has_value()) {
- continue;
- }
- auto span_context =
opencensus::trace::propagation::FromTraceParentHeader(msg->traceContext().value());
-
- auto span = opencensus::trace::Span::BlankSpan();
- std::string span_name = consumer->resourceNamespace() + "/" +
msg->topic() + " " +
-
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
- if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name,
span_context, traceSampler());
- } else {
- span = opencensus::trace::Span::StartSpan(span_name, nullptr,
{traceSampler()});
- }
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
- MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
- MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
- TracingUtility::addUniversalSpanAttributes(*msg, consumer->config(),
span);
- //
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
msg.getStoreTimestamp());
- absl::Time decoded_timestamp =
absl::FromChrono(msg->extension().decode_time);
- span.AddAnnotation(
- MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
- {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
-
opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp
- absl::UnixEpoch()))}});
- span.End();
- // MessageAccessor::setTraceContext(const_cast<MessageExt&>(msg),
- //
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
- }
- }
-
- // Trace start of consume message
- std::vector<opencensus::trace::Span> spans;
- {
- for (const auto& msg : msgs) {
- if (!msg->traceContext().has_value()) {
- continue;
- }
- auto span_context =
opencensus::trace::propagation::FromTraceParentHeader(msg->traceContext().value());
- auto span = opencensus::trace::Span::BlankSpan();
- std::string span_name = consumer->resourceNamespace() + "/" +
msg->topic() + " " +
-
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
- if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name,
span_context);
- } else {
- span = opencensus::trace::Span::StartSpan(span_name);
- }
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
-
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
-
MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
- TracingUtility::addUniversalSpanAttributes(*msg, consumer->config(),
span);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT,
msg->extension().delivery_attempt);
- //
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
msg.extension.store_time);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE,
msgs.size());
- spans.emplace_back(std::move(span));
- // MessageAccessor::setTraceContext(const_cast<MessageExt&>(msg),
- //
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
- }
- }
-
- auto steady_start = std::chrono::steady_clock::now();
-
- try {
- // TODO:
- status = message_listener_(**msgs.begin());
- } catch (...) {
- status = ConsumeResult::FAILURE;
- SPDLOG_ERROR("Business callback raised an exception when consumeMessage");
- }
-
- auto duration = std::chrono::steady_clock::now() - steady_start;
-
- // Trace end of consumption
- {
- for (auto& span : spans) {
- switch (status) {
- case ConsumeResult::SUCCESS:
- span.SetStatus(opencensus::trace::StatusCode::OK);
- break;
- case ConsumeResult::FAILURE:
- span.SetStatus(opencensus::trace::StatusCode::UNKNOWN);
- break;
- }
- span.End();
- }
- }
-
- // Log client consume-time costs
- SPDLOG_DEBUG("Business callback spent {}ms processing {} messages.",
MixAll::millisecondsOf(duration), msgs.size());
-
- for (const auto& msg : msgs) {
- const std::string& message_id = msg->id();
-
- // Release message number and memory quota
- process_queue->release(msg->body().size());
-
- if (status == ConsumeResult::SUCCESS) {
- auto callback = [process_queue, message_id](const std::error_code& ec) {
- if (ec) {
- SPDLOG_WARN("Failed to acknowledge message[MessageQueue={},
MsgId={}]. Cause: {}",
- process_queue->simpleName(), message_id, ec.message());
- } else {
- SPDLOG_DEBUG("Acknowledge message[MessageQueue={}, MsgId={}] OK",
process_queue->simpleName(), message_id);
- }
- };
- consumer->ack(*msg, callback);
- } else {
- auto callback = [process_queue, message_id](const std::error_code& ec) {
- if (ec) {
- SPDLOG_WARN(
- "Failed to negative acknowledge message[MessageQueue={},
MsgId={}]. Cause: {} Message will be "
- "re-consumed after default invisible time",
- process_queue->simpleName(), message_id, ec.message());
- return;
- }
-
- SPDLOG_DEBUG("Nack message[MessageQueue={}, MsgId={}] OK",
process_queue->simpleName(), message_id);
- };
- consumer->nack(*msg, callback);
- }
- }
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeTask.cpp
b/src/main/cpp/rocketmq/ConsumeTask.cpp
new file mode 100644
index 0000000..dede4a4
--- /dev/null
+++ b/src/main/cpp/rocketmq/ConsumeTask.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeTask.h"
+
+#include "LoggerImpl.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
+ std::weak_ptr<ProcessQueue> process_queue,
+ MessageConstSharedPtr message)
+ : service_(service), process_queue_(std::move(process_queue)) {
+ messages_.emplace_back(message);
+}
+
+ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
+ std::weak_ptr<ProcessQueue> process_queue,
+ std::vector<MessageConstSharedPtr> messages)
+ : service_(service), process_queue_(std::move(process_queue)),
messages_(std::move(messages)) {
+ fifo_ = messages_.size() > 1;
+}
+
+void ConsumeTask::pop() {
+ assert(!messages_.empty());
+ auto process_queue = process_queue_.lock();
+ if (!process_queue) {
+ return;
+ }
+
+ process_queue->release(messages_[0]->body().size());
+
+ messages_.erase(messages_.begin());
+}
+
+void ConsumeTask::submit() {
+ auto svc = service_.lock();
+ if (!svc) {
+ return;
+ }
+ svc->submit(shared_from_this());
+}
+
+void ConsumeTask::schedule() {
+ auto svc = service_.lock();
+ if (!svc) {
+ return;
+ }
+
+ svc->schedule(shared_from_this(), std::chrono::seconds(1));
+}
+
+void ConsumeTask::onAck(std::shared_ptr<ConsumeTask> task, const
std::error_code& ec) {
+ if (task->fifo_ && ec) {
+ auto service = task->service_.lock();
+ task->next_step_ = NextStep::Ack;
+ task->schedule();
+ } else {
+ // If it is not FIFO or ack operation succeeded
+ task->pop();
+ task->next_step_ = NextStep::Consume;
+ }
+ task->submit();
+}
+
+void ConsumeTask::onNack(std::shared_ptr<ConsumeTask> task, const
std::error_code& ec) {
+ assert(!task->fifo_);
+ assert(!task->messages_.empty());
+ if (ec) {
+ SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}",
task->messages_[0]->id(), ec.message());
+ }
+ task->pop();
+ task->next_step_ = NextStep::Consume;
+ task->submit();
+}
+
+void ConsumeTask::onForward(std::shared_ptr<ConsumeTask> task, const
std::error_code& ec) {
+ assert(task->fifo_);
+ assert(!task->messages_.empty());
+ if (ec) {
+ SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ",
task->messages_[0]->id());
+ task->next_step_ = NextStep::Forward;
+ task->schedule();
+ } else {
+ SPDLOG_DEBUG("Message[message-id={}] forwarded to DLQ",
task->messages_[0]->id());
+ task->pop();
+ task->next_step_ = NextStep::Consume;
+ task->submit();
+ }
+}
+
+void ConsumeTask::process() {
+ auto svc = service_.lock();
+ if (!svc) {
+ SPDLOG_DEBUG("ConsumeMessageService has destructed");
+ return;
+ }
+
+ if (messages_.empty()) {
+ SPDLOG_DEBUG("No more messages to process");
+ return;
+ }
+
+ auto self = shared_from_this();
+
+ switch (next_step_) {
+ case NextStep::Consume: {
+ const auto& listener = svc->listener();
+ auto it = messages_.begin();
+ SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
+ svc->preHandle(**it);
+ auto result = listener(**it);
+ svc->postHandle(**it, result);
+ switch (result) {
+ case ConsumeResult::SUCCESS: {
+ auto callback = std::bind(&ConsumeTask::onAck, self,
std::placeholders::_1);
+ svc->ack(**it, callback);
+ break;
+ }
+ case ConsumeResult::FAILURE: {
+ if (fifo_) {
+ next_step_ = NextStep::Consume;
+ // Increase delivery attempts.
+ auto raw = const_cast<Message*>((*it).get());
+ raw->mutableExtension().delivery_attempt++;
+ schedule();
+ } else {
+ // For standard way of processing, Nack to server.
+ auto callback = std::bind(&ConsumeTask::onNack, self,
std::placeholders::_1);
+ svc->nack(**it, callback);
+ }
+ break;
+ }
+ }
+ break;
+ }
+
+ case NextStep::Ack: {
+ assert(!messages_.empty());
+ auto callback = std::bind(&ConsumeTask::onAck, self,
std::placeholders::_1);
+ svc->ack(*messages_[0], callback);
+ break;
+ }
+ case NextStep::Forward: {
+ assert(!messages_.empty());
+ auto callback = std::bind(&ConsumeTask::onForward, self,
std::placeholders::_1);
+ svc->forward(*messages_[0], callback);
+ break;
+ }
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index cd94d9f..89dc112 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -24,8 +24,7 @@
#include "AsyncReceiveMessageCallback.h"
#include "ClientManagerFactory.h"
-#include "ConsumeFifoMessageService.h"
-#include "ConsumeStandardMessageService.h"
+#include "ConsumeMessageServiceImpl.h"
#include "MixAll.h"
#include "ProcessQueueImpl.h"
#include "Protocol.h"
@@ -72,27 +71,11 @@ void PushConsumerImpl::start() {
fetchRoutes();
- if (client_config_.subscriber.fifo) {
- SPDLOG_INFO("start orderly consume service: {}",
client_config_.subscriber.group.name());
- consume_message_service_ =
- std::make_shared<ConsumeFifoMessageService>(shared_from_this(),
consume_thread_pool_size_, message_listener_);
- consume_batch_size_ = 1;
- } else {
- // For backward compatibility, by default,
ConsumeMessageConcurrentlyService is assumed.
- SPDLOG_INFO("start concurrently consume service: {}",
client_config_.subscriber.group.name());
- consume_message_service_ = std::make_shared<ConsumeStandardMessageService>(
- shared_from_this(), consume_thread_pool_size_, message_listener_);
- }
+ SPDLOG_INFO("start concurrently consume service: {}",
client_config_.subscriber.group.name());
+ consume_message_service_ =
+ std::make_shared<ConsumeMessageServiceImpl>(shared_from_this(),
consume_thread_pool_size_, message_listener_);
consume_message_service_->start();
- {
- // Set consumer throttling
- absl::MutexLock lock(&throttle_table_mtx_);
- for (const auto& item : throttle_table_) {
- consume_message_service_->throttle(item.first, item.second);
- }
- }
-
// Heartbeat depends on initialization of consume-message-service
heartbeat();
@@ -394,7 +377,8 @@ void PushConsumerImpl::nack(const Message& message, const
std::function<void(con
absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
}
-void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message, const
std::function<void(bool)>& cb) {
+void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message,
+ const std::function<void(const
std::error_code&)>& cb) {
std::string target_host = message.extension().target_endpoint;
absl::flat_hash_map<std::string, std::string> metadata;
@@ -449,19 +433,6 @@ std::size_t PushConsumerImpl::getProcessQueueTableSize() {
void PushConsumerImpl::setThrottle(const std::string& topic, uint32_t
threshold) {
absl::MutexLock lock(&throttle_table_mtx_);
throttle_table_.emplace(topic, threshold);
- // If consumer has started, update it dynamically.
- if (getConsumeMessageService()) {
- getConsumeMessageService()->throttle(topic, threshold);
- }
-}
-
-void PushConsumerImpl::iterateProcessQueue(const
std::function<void(std::shared_ptr<ProcessQueue>)>& callback) {
- absl::MutexLock lock(&process_queue_table_mtx_);
- for (const auto& item : process_queue_table_) {
- if (item.second->hasPendingMessages()) {
- callback(item.second);
- }
- }
}
void PushConsumerImpl::fetchRoutes() {
diff --git a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
deleted file mode 100644
index 5f95d96..0000000
--- a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeMessageServiceBase.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ConsumeFifoMessageService : public ConsumeMessageServiceBase,
- public
std::enable_shared_from_this<ConsumeFifoMessageService> {
-public:
- ConsumeFifoMessageService(std::weak_ptr<PushConsumerImpl> consumer, int
thread_count,
- MessageListener message_listener);
- void start() override;
-
- void shutdown() override;
-
- /**
- * @brief Entry of ConsumeMessageService
- *
- * @param process_queue
- */
- void submitConsumeTask(const std::weak_ptr<ProcessQueue>& process_queue)
override;
-
-private:
- void consumeTask(const std::weak_ptr<ProcessQueue>& process_queue,
MessageConstSharedPtr message);
-
- void submitConsumeTask0(const std::shared_ptr<PushConsumerImpl>& consumer,
- const std::weak_ptr<ProcessQueue>& process_queue,
- MessageConstSharedPtr message);
-
- void scheduleAckTask(const std::weak_ptr<ProcessQueue>& process_queue,
MessageConstSharedPtr message);
-
- void onAck(const std::weak_ptr<ProcessQueue>& process_queue,
MessageConstSharedPtr message, const std::error_code& ec);
-
- void scheduleConsumeTask(const std::weak_ptr<ProcessQueue>& process_queue,
MessageConstSharedPtr message);
-
- void onForwardToDeadLetterQueue(const std::weak_ptr<ProcessQueue>&
process_queue, MessageConstSharedPtr message, bool ok);
-
- void scheduleForwardDeadLetterQueueTask(const std::weak_ptr<ProcessQueue>&
process_queue, MessageConstSharedPtr message);
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageService.h
b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
index 0c541ea..39d9d8e 100644
--- a/src/main/cpp/rocketmq/include/ConsumeMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
@@ -17,12 +17,16 @@
#pragma once
#include <cstdint>
+#include <memory>
+#include <system_error>
#include "ProcessQueue.h"
#include "rocketmq/MessageListener.h"
ROCKETMQ_NAMESPACE_BEGIN
+class ConsumeTask;
+
class ConsumeMessageService {
public:
virtual ~ConsumeMessageService() = default;
@@ -38,11 +42,28 @@ public:
*/
virtual void shutdown() = 0;
- virtual void submitConsumeTask(const std::weak_ptr<ProcessQueue>&
process_queue) = 0;
+ virtual void dispatch(std::shared_ptr<ProcessQueue> process_queue,
std::vector<MessageConstSharedPtr> messages) = 0;
+
+ virtual void submit(std::shared_ptr<ConsumeTask> task) = 0;
+
+ virtual MessageListener& listener() = 0;
+
+ virtual bool preHandle(const Message& message) = 0;
+
+ virtual bool postHandle(const Message& message, ConsumeResult result) = 0;
- virtual void signalDispatcher() = 0;
+ virtual void ack(const Message& message, std::function<void(const
std::error_code& ec)> cb) = 0;
- virtual void throttle(const std::string& topic, std::uint32_t threshold) = 0;
+ virtual void nack(const Message& message, std::function<void(const
std::error_code& ec)> cb) = 0;
+
+ virtual void forward(const Message& message, std::function<void(const
std::error_code& ec)> cb) = 0;
+
+ virtual void schedule(std::shared_ptr<ConsumeTask> task,
std::chrono::milliseconds delay) = 0;
+
+ virtual std::size_t maxDeliveryAttempt() = 0;
};
+using ConsumeMessageServiceWeakPtr = std::weak_ptr<ConsumeMessageService>;
+using ConsumeMessageServiceSharedPtr = std::shared_ptr<ConsumeMessageService>;
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageServiceBase.h
b/src/main/cpp/rocketmq/include/ConsumeMessageServiceBase.h
deleted file mode 100644
index 9e9ff43..0000000
--- a/src/main/cpp/rocketmq/include/ConsumeMessageServiceBase.h
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <memory>
-#include <mutex>
-#include <string>
-#include <system_error>
-
-#include "absl/container/flat_hash_map.h"
-
-#include "ConsumeMessageService.h"
-#include "RateLimiter.h"
-#include "ThreadPool.h"
-#include "rocketmq/State.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class PushConsumerImpl;
-
-class ConsumeMessageServiceBase : public ConsumeMessageService {
-public:
- ConsumeMessageServiceBase(std::weak_ptr<PushConsumerImpl> consumer, int
thread_count,
- MessageListener message_listener);
-
- ~ConsumeMessageServiceBase() override = default;
-
- /**
- * Make it noncopyable.
- */
- ConsumeMessageServiceBase(const ConsumeMessageServiceBase& other) = delete;
- ConsumeMessageServiceBase& operator=(const ConsumeMessageServiceBase& other)
= delete;
-
- /**
- * Start the dispatcher thread, which will dispatch messages in process
queue to thread pool in form of runnable
- * functor.
- */
- void start() override;
-
- /**
- * Stop the dispatcher thread and then reset the thread pool.
- */
- void shutdown() override;
-
- /**
- * Signal dispatcher thread to check new pending messages.
- */
- void signalDispatcher() override;
-
- /**
- * Set throttle threshold per topic.
- *
- * @param topic
- * @param threshold
- */
- void throttle(const std::string& topic, std::uint32_t threshold) override;
-
- bool hasConsumeRateLimiter(const std::string& topic) const
LOCKS_EXCLUDED(rate_limiter_table_mtx_);
-
- std::shared_ptr<RateLimiter<10>> rateLimiter(const std::string& topic) const
LOCKS_EXCLUDED(rate_limiter_table_mtx_);
-
-protected:
- RateLimiterObserver rate_limiter_observer_;
-
- mutable absl::flat_hash_map<std::string, std::shared_ptr<RateLimiter<10>>>
- rate_limiter_table_ GUARDED_BY(rate_limiter_table_mtx_);
- mutable absl::Mutex rate_limiter_table_mtx_; // Protects rate_limiter_table_
-
- std::atomic<State> state_;
-
- int thread_count_;
- std::unique_ptr<ThreadPool> pool_;
- std::weak_ptr<PushConsumerImpl> consumer_;
-
- absl::Mutex dispatch_mtx_;
- std::thread dispatch_thread_;
- absl::CondVar dispatch_cv_;
-
- MessageListener message_listener_;
-
- /**
- * Dispatch messages to thread pool. Implementation of this function should
be sub-class specific.
- */
- void dispatch();
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
new file mode 100644
index 0000000..7d6c34a
--- /dev/null
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <system_error>
+
+#include "ConsumeMessageService.h"
+#include "RateLimiter.h"
+#include "ThreadPool.h"
+#include "absl/container/flat_hash_map.h"
+#include "rocketmq/State.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class PushConsumerImpl;
+
+class ConsumeMessageServiceImpl : public ConsumeMessageService,
+ public
std::enable_shared_from_this<ConsumeMessageServiceImpl> {
+public:
+ ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl> consumer,
+ int thread_count,
+ MessageListener message_listener);
+
+ ~ConsumeMessageServiceImpl() override = default;
+
+ /**
+ * Make it noncopyable.
+ */
+ ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl& other) = delete;
+ ConsumeMessageServiceImpl& operator=(const ConsumeMessageServiceImpl& other)
= delete;
+
+ void start() override;
+
+ void shutdown() override;
+
+ MessageListener& listener() override {
+ return message_listener_;
+ }
+
+ bool preHandle(const Message& message) override;
+
+ bool postHandle(const Message& message, ConsumeResult result) override;
+
+ void submit(std::shared_ptr<ConsumeTask> task) override;
+
+ void dispatch(std::shared_ptr<ProcessQueue> process_queue,
std::vector<MessageConstSharedPtr> messages) override;
+
+ void ack(const Message& message, std::function<void(const std::error_code&)>
cb) override;
+
+ void nack(const Message& message, std::function<void(const
std::error_code&)> cb) override;
+
+ void forward(const Message& message, std::function<void(const
std::error_code&)> cb) override;
+
+ void schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds
delay) override;
+
+ std::size_t maxDeliveryAttempt() override;
+
+protected:
+ std::atomic<State> state_;
+
+ int thread_count_;
+ std::unique_ptr<ThreadPool> pool_;
+ std::weak_ptr<PushConsumerImpl> consumer_;
+
+ MessageListener message_listener_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeStandardMessageService.h
b/src/main/cpp/rocketmq/include/ConsumeStandardMessageService.h
deleted file mode 100644
index e98f257..0000000
--- a/src/main/cpp/rocketmq/include/ConsumeStandardMessageService.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "ConsumeMessageServiceBase.h"
-#include "ProcessQueue.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ConsumeStandardMessageService : public ConsumeMessageServiceBase,
- public
std::enable_shared_from_this<ConsumeStandardMessageService> {
-public:
- ConsumeStandardMessageService(std::weak_ptr<PushConsumerImpl> consumer, int
thread_count,
- MessageListener message_listener);
-
- ~ConsumeStandardMessageService() override = default;
-
- void start() override;
-
- void shutdown() override;
-
- void submitConsumeTask(const std::weak_ptr<ProcessQueue>& process_queue)
override;
-
-private:
- static void consumeTask(std::weak_ptr<ConsumeStandardMessageService> service,
- const std::weak_ptr<ProcessQueue>& process_queue,
- const std::vector<MessageConstSharedPtr>& msgs);
-
- void consume(const std::shared_ptr<ProcessQueue>& process_queue, const
std::vector<MessageConstSharedPtr>& msgs);
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeTask.h
b/src/main/cpp/rocketmq/include/ConsumeTask.h
new file mode 100644
index 0000000..80ca971
--- /dev/null
+++ b/src/main/cpp/rocketmq/include/ConsumeTask.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "ConsumeMessageService.h"
+#include "rocketmq/Message.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class ProcessQueue;
+
+/**
+ * @brief Operation to take for the consume-task.
+ */
+enum class NextStep : std::uint8_t
+{
+ /**
+ * @brief Continue to consume the remaining messages.
+ */
+ Consume = 0,
+
+ /**
+ * @brief Ack the head, aka, messages_[0].
+ */
+ Ack,
+
+ /**
+ * @brief Forward the head, aka, messages_[0], to dead-letter-queue.
+ */
+ Forward,
+};
+
+class ConsumeTask : public std::enable_shared_from_this<ConsumeTask> {
+public:
+ ConsumeTask(ConsumeMessageServiceWeakPtr service,
+ std::weak_ptr<ProcessQueue> process_queue,
+ MessageConstSharedPtr message);
+
+ ConsumeTask(ConsumeMessageServiceWeakPtr service,
+ std::weak_ptr<ProcessQueue> process_queue,
+ std::vector<MessageConstSharedPtr> messages);
+
+ void process();
+
+ void submit();
+
+ void schedule();
+
+private:
+ ConsumeMessageServiceWeakPtr service_;
+ std::weak_ptr<ProcessQueue> process_queue_;
+ std::vector<MessageConstSharedPtr> messages_;
+ bool fifo_{false};
+ NextStep next_step_{NextStep::Consume};
+
+ /**
+ * @brief messages_[0] has completed its life-cycle.
+ */
+ void pop();
+
+ static void onAck(std::shared_ptr<ConsumeTask> task, const std::error_code&
ec);
+
+ static void onNack(std::shared_ptr<ConsumeTask> task, const std::error_code&
ec);
+
+ static void onForward(std::shared_ptr<ConsumeTask> task, const
std::error_code& ec);
+};
+
+using ConsumeTaskSharedPtr = std::shared_ptr<ConsumeTask>;
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 58ba4e5..cf5fe50 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -119,7 +119,7 @@ public:
void nack(const Message& message, const std::function<void(const
std::error_code&)>& callback);
- void forwardToDeadLetterQueue(const Message& message, const
std::function<void(bool)>& cb);
+ void forwardToDeadLetterQueue(const Message& message, const
std::function<void(const std::error_code&)>& cb);
void wrapAckMessageRequest(const Message& msg, AckMessageRequest& request);
@@ -154,8 +154,6 @@ public:
return MixAll::DEFAULT_CACHED_MESSAGE_MEMORY;
}
- void iterateProcessQueue(const
std::function<void(std::shared_ptr<ProcessQueue>)>& callback);
-
MessageListener& messageListener() {
return message_listener_;
}
diff --git a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
index 0a9a361..c6050ae 100644
--- a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
+++ b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
@@ -25,8 +25,6 @@ ROCKETMQ_NAMESPACE_BEGIN
class PushConsumerMock : virtual public PushConsumer, virtual public
ConsumerMock {
public:
- MOCK_METHOD(void, iterateProcessQueue, (const
std::function<void(std::shared_ptr<ProcessQueue>)>&), (override));
-
MOCK_METHOD(MessageModel, messageModel, (), (const override));
MOCK_METHOD(void, ack, (const MQMessageExt&, const std::function<void(const
std::error_code&)>&), (override));