This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 28a0054  Unify standard and FIFO message processing implementation
28a0054 is described below

commit 28a0054ce8b41cbbcad9dbef43c188f55dabc621
Author: Li Zhanhui <[email protected]>
AuthorDate: Mon May 23 10:33:29 2022 +0800

    Unify standard and FIFO message processing implementation
---
 src/main/cpp/client/ClientManagerImpl.cpp          |  29 +-
 src/main/cpp/client/include/ClientManager.h        |   2 +-
 src/main/cpp/client/include/ClientManagerImpl.h    |   9 +-
 .../cpp/rocketmq/AsyncReceiveMessageCallback.cpp   |   3 +-
 .../cpp/rocketmq/ConsumeFifoMessageService.cpp     | 322 ---------------------
 .../cpp/rocketmq/ConsumeMessageServiceBase.cpp     | 104 -------
 .../cpp/rocketmq/ConsumeMessageServiceImpl.cpp     | 125 ++++++++
 .../cpp/rocketmq/ConsumeStandardMessageService.cpp | 275 ------------------
 src/main/cpp/rocketmq/ConsumeTask.cpp              | 167 +++++++++++
 src/main/cpp/rocketmq/PushConsumerImpl.cpp         |  41 +--
 .../rocketmq/include/ConsumeFifoMessageService.h   |  55 ----
 .../cpp/rocketmq/include/ConsumeMessageService.h   |  27 +-
 .../rocketmq/include/ConsumeMessageServiceBase.h   | 101 -------
 .../rocketmq/include/ConsumeMessageServiceImpl.h   |  85 ++++++
 .../include/ConsumeStandardMessageService.h        |  46 ---
 src/main/cpp/rocketmq/include/ConsumeTask.h        |  87 ++++++
 src/main/cpp/rocketmq/include/PushConsumerImpl.h   |   4 +-
 .../cpp/rocketmq/mocks/include/PushConsumerMock.h  |   2 -
 18 files changed, 526 insertions(+), 958 deletions(-)

diff --git a/src/main/cpp/client/ClientManagerImpl.cpp 
b/src/main/cpp/client/ClientManagerImpl.cpp
index 25fc559..a5fe96a 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -957,10 +957,11 @@ void ClientManagerImpl::endTransaction(
   client->asyncEndTransaction(request, invocation_context);
 }
 
-void ClientManagerImpl::forwardMessageToDeadLetterQueue(
-    const std::string& target_host, const Metadata& metadata, const 
ForwardMessageToDeadLetterQueueRequest& request,
-    std::chrono::milliseconds timeout,
-    const std::function<void(const 
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) {
+void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& 
target_host,
+                                                        const Metadata& 
metadata,
+                                                        const 
ForwardMessageToDeadLetterQueueRequest& request,
+                                                        
std::chrono::milliseconds timeout,
+                                                        const 
std::function<void(const std::error_code&)>& cb) {
   SPDLOG_DEBUG("ForwardMessageToDeadLetterQueue Request: {}", 
request.DebugString());
   auto client = getRpcClient(target_host);
   auto invocation_context = new 
InvocationContext<ForwardMessageToDeadLetterQueueResponse>();
@@ -977,12 +978,28 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(
     if (!invocation_context->status.ok()) {
       SPDLOG_WARN("Failed to transmit SendMessageToDeadLetterQueueRequest to 
host={}",
                   invocation_context->remote_address);
-      cb(invocation_context);
+      std::error_code ec = ErrorCode::BadRequest;
+      cb(ec);
       return;
     }
 
     SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from 
server[host={}]", invocation_context->remote_address);
-    cb(invocation_context);
+    std::error_code ec;
+    switch (invocation_context->response.status().code()) {
+      case rmq::Code::OK: {
+        break;
+      }
+      case rmq::Code::INTERNAL_SERVER_ERROR: {
+        ec = ErrorCode::ServiceUnavailable;
+      }
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        ec = ErrorCode::TooManyRequest;
+      }
+      default: {
+        ec = ErrorCode::NotImplemented;
+      }
+    }
+    cb(ec);
   };
   invocation_context->callback = callback;
   client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
diff --git a/src/main/cpp/client/include/ClientManager.h 
b/src/main/cpp/client/include/ClientManager.h
index c85c7ec..99ec6ad 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -72,7 +72,7 @@ public:
   virtual void forwardMessageToDeadLetterQueue(
       const std::string& target_host, const Metadata& metadata, const 
ForwardMessageToDeadLetterQueueRequest& request,
       std::chrono::milliseconds timeout,
-      const std::function<void(const 
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) = 0;
+      const std::function<void(const std::error_code&)>& cb) = 0;
 
   virtual void endTransaction(const std::string& target_host, const Metadata& 
metadata,
                               const EndTransactionRequest& request, 
std::chrono::milliseconds timeout,
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h 
b/src/main/cpp/client/include/ClientManagerImpl.h
index 50d8b05..63b0713 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -150,10 +150,11 @@ public:
                                const ChangeInvisibleDurationRequest&, 
std::chrono::milliseconds timeout,
                                const std::function<void(const 
std::error_code&)>&) override;
 
-  void forwardMessageToDeadLetterQueue(
-      const std::string& target_host, const Metadata& metadata, const 
ForwardMessageToDeadLetterQueueRequest& request,
-      std::chrono::milliseconds timeout,
-      const std::function<void(const 
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) override;
+  void forwardMessageToDeadLetterQueue(const std::string& target_host,
+                                       const Metadata& metadata,
+                                       const 
ForwardMessageToDeadLetterQueueRequest& request,
+                                       std::chrono::milliseconds timeout,
+                                       const std::function<void(const 
std::error_code&)>& cb) override;
 
   /**
    * End a transaction asynchronously.
diff --git a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp 
b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
index a9df915..c1b57d4 100644
--- a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -52,7 +52,8 @@ void AsyncReceiveMessageCallback::onCompletion(const 
std::error_code& ec, const
   SPDLOG_DEBUG("Receive messages from broker[host={}] returns with 
status=FOUND, msgListSize={}, queue={}",
                result.source_host, result.messages.size(), 
process_queue->simpleName());
   process_queue->cacheMessages(result.messages);
-  consumer->getConsumeMessageService()->signalDispatcher();
+
+  consumer->getConsumeMessageService()->dispatch(process_queue, 
result.messages);
   checkThrottleThenReceive();
 }
 
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp 
b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
deleted file mode 100644
index aa99543..0000000
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <chrono>
-#include <functional>
-#include <limits>
-#include <system_error>
-
-#include "TracingUtility.h"
-#include "absl/strings/str_join.h"
-
-#include "opencensus/trace/propagation/trace_context.h"
-#include "opencensus/trace/span.h"
-
-#include "ConsumeFifoMessageService.h"
-#include "MixAll.h"
-#include "ProcessQueue.h"
-#include "PushConsumerImpl.h"
-#include "rocketmq/MessageListener.h"
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ConsumeFifoMessageService::ConsumeFifoMessageService(std::weak_ptr<PushConsumerImpl>
 consumer, int thread_count,
-                                                     MessageListener 
message_listener)
-    : ConsumeMessageServiceBase(std::move(consumer), thread_count, 
message_listener) {
-}
-
-void ConsumeFifoMessageService::start() {
-  ConsumeMessageServiceBase::start();
-  State expected = State::STARTING;
-  if (state_.compare_exchange_strong(expected, State::STARTED)) {
-    SPDLOG_DEBUG("ConsumeMessageOrderlyService started");
-  }
-}
-
-void ConsumeFifoMessageService::shutdown() {
-  // Wait till consume-message-orderly-service has fully started; otherwise, 
we may potentially miss closing resources
-  // in concurrent scenario.
-  while (State::STARTING == state_.load(std::memory_order_relaxed)) {
-    absl::SleepFor(absl::Milliseconds(10));
-  }
-
-  State expected = State::STARTED;
-  if (state_.compare_exchange_strong(expected, STOPPING)) {
-    ConsumeMessageServiceBase::shutdown();
-    SPDLOG_INFO("ConsumeMessageOrderlyService shut down");
-  }
-}
-
-void ConsumeFifoMessageService::submitConsumeTask0(const 
std::shared_ptr<PushConsumerImpl>& consumer,
-                                                   const 
std::weak_ptr<ProcessQueue>& process_queue,
-                                                   MessageConstSharedPtr 
message) {
-  // In case custom executor is used.
-  const auto& custom_executor = consumer->customExecutor();
-  if (custom_executor) {
-    std::function<void(void)> consume_task =
-        std::bind(&ConsumeFifoMessageService::consumeTask, this, 
process_queue, message);
-    custom_executor(consume_task);
-    SPDLOG_DEBUG("Submit FIFO consume task to custom executor");
-    return;
-  }
-
-  // submit batch message
-  std::function<void(void)> consume_task =
-      std::bind(&ConsumeFifoMessageService::consumeTask, this, process_queue, 
message);
-  SPDLOG_DEBUG("Submit FIFO consume task to thread pool");
-  pool_->submit(consume_task);
-}
-
-void ConsumeFifoMessageService::submitConsumeTask(const 
std::weak_ptr<ProcessQueue>& process_queue) {
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    SPDLOG_INFO("Process queue has destructed");
-    return;
-  }
-
-  auto consumer = consumer_.lock();
-  if (!consumer) {
-    SPDLOG_INFO("Consumer has destructed");
-    return;
-  }
-
-  if (process_queue_ptr->bindFifoConsumeTask()) {
-    std::vector<MessageConstSharedPtr> messages;
-    process_queue_ptr->take(1, messages);
-    if (!messages.empty()) {
-      assert(1 == messages.size());
-      submitConsumeTask0(consumer, process_queue, 
std::move(*messages.begin()));
-    }
-  }
-}
-
-void ConsumeFifoMessageService::consumeTask(const std::weak_ptr<ProcessQueue>& 
process_queue,
-                                            MessageConstSharedPtr message) {
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    return;
-  }
-  const std::string& topic = message->topic();
-  ConsumeResult result;
-  std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
-  // consumer might have been destructed.
-  if (!consumer) {
-    return;
-  }
-
-  std::shared_ptr<RateLimiter<10>> rate_limiter = rateLimiter(topic);
-  if (rate_limiter) {
-    rate_limiter->acquire();
-    SPDLOG_DEBUG("Rate-limit permit acquired");
-  }
-
-  // Record await-consumption-span
-  if (message->traceContext().has_value()) {
-    auto span_context = 
opencensus::trace::propagation::FromTraceParentHeader(message->traceContext().value());
-
-    auto span = opencensus::trace::Span::BlankSpan();
-    std::string span_name =
-        consumer->resourceNamespace() + "/" + topic + " " + 
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
-    if (span_context.IsValid()) {
-      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context, traceSampler());
-    } else {
-      span = opencensus::trace::Span::StartSpan(span_name, nullptr, 
{traceSampler()});
-    }
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
-                      MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
-                      MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
-    TracingUtility::addUniversalSpanAttributes(*message, consumer->config(), 
span);
-    // 
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, 
message.extension.store_time);
-    absl::Time decoded_timestamp = 
absl::FromChrono(message->extension().decode_time);
-    span.AddAnnotation(
-        MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
-        {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
-          
opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp
 - absl::UnixEpoch()))}});
-    span.End();
-    // message.message.traceContext()
-    // MessageAccessor::setTraceContext(const_cast<MessageExt&>(message),
-    //                                  
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
-  }
-
-  auto span_context = 
opencensus::trace::propagation::FromTraceParentHeader(message->traceContext().value());
-  auto span = opencensus::trace::Span::BlankSpan();
-  std::string span_name = consumer->resourceNamespace() + "/" + 
message->topic() + " " +
-                          
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
-  if (span_context.IsValid()) {
-    span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context);
-  } else {
-    span = opencensus::trace::Span::StartSpan(span_name);
-  }
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
-                    MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
-                    MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
-  TracingUtility::addUniversalSpanAttributes(*message, consumer->config(), 
span);
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 
std::to_string(message->extension().delivery_attempt));
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
-                    MixAll::format(message->extension().store_time));
-
-  // MessageAccessor::setTraceContext(const_cast<MessageExt&>(message),
-  //                                  
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
-  auto steady_start = std::chrono::steady_clock::now();
-
-  try {
-    result = message_listener_(*message);
-  } catch (...) {
-    result = ConsumeResult::FAILURE;
-    SPDLOG_ERROR("Business FIFO callback raised an exception when 
consumeMessage");
-  }
-
-  switch (result) {
-    case ConsumeResult::SUCCESS:
-      span.SetStatus(opencensus::trace::StatusCode::OK);
-      break;
-    case ConsumeResult::FAILURE:
-      span.SetStatus(opencensus::trace::StatusCode::UNKNOWN);
-      break;
-  }
-  span.End();
-
-  auto duration = std::chrono::steady_clock::now() - steady_start;
-
-  // Log client consume-time costs
-  SPDLOG_DEBUG("Business callback spent {}ms processing message[Topic={}, 
MessageId={}].",
-               MixAll::millisecondsOf(duration), message->topic(), 
message->id());
-
-  if (result == ConsumeResult::SUCCESS) {
-    // Release message number and memory quota
-    process_queue_ptr->release(message->body().size());
-
-    // Ensure current message is acked before moving to the next message.
-    auto callback = std::bind(&ConsumeFifoMessageService::onAck, this, 
process_queue, message, std::placeholders::_1);
-    consumer->ack(*message, callback);
-  } else {
-    const Message* ptr = message.get();
-    Message* raw = const_cast<Message*>(ptr);
-    raw->mutableExtension().delivery_attempt++;
-
-    if (message->extension().delivery_attempt < 
consumer->maxDeliveryAttempts()) {
-      auto task = std::bind(&ConsumeFifoMessageService::scheduleConsumeTask, 
this, process_queue, message);
-      consumer->schedule("Scheduled-Consume-FIFO-Message-Task", task, 
std::chrono::seconds(1));
-    } else {
-      auto callback = 
std::bind(&ConsumeFifoMessageService::onForwardToDeadLetterQueue, this, 
process_queue, message,
-                                std::placeholders::_1);
-      consumer->forwardToDeadLetterQueue(*message, callback);
-    }
-  }
-}
-
-void ConsumeFifoMessageService::onAck(const std::weak_ptr<ProcessQueue>& 
process_queue,
-                                      MessageConstSharedPtr message,
-                                      const std::error_code& ec) {
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    SPDLOG_WARN("ProcessQueue has destructed.");
-    return;
-  }
-
-  if (ec) {
-    SPDLOG_WARN("Failed to acknowledge FIFO message[MessageQueue={}, 
MsgId={}]. Cause: {}",
-                process_queue_ptr->simpleName(), message->id(), ec.message());
-    auto consumer = consumer_.lock();
-    if (!consumer) {
-      SPDLOG_WARN("Consumer instance has destructed");
-      return;
-    }
-    auto task = std::bind(&ConsumeFifoMessageService::scheduleAckTask, this, 
process_queue, message);
-    int32_t duration = 100;
-    consumer->schedule("Ack-FIFO-Message-On-Failure", task, 
std::chrono::milliseconds(duration));
-    SPDLOG_INFO("Scheduled to ack message[Topic={}, MessageId={}] in {}ms", 
message->topic(), message->id(), duration);
-  } else {
-    SPDLOG_DEBUG("Acknowledge FIFO message[MessageQueue={}, MsgId={}] OK", 
process_queue_ptr->simpleName(),
-                 message->id());
-    process_queue_ptr->unbindFifoConsumeTask();
-    submitConsumeTask(process_queue);
-  }
-}
-
-void ConsumeFifoMessageService::onForwardToDeadLetterQueue(const 
std::weak_ptr<ProcessQueue>& process_queue,
-                                                           
MessageConstSharedPtr message,
-                                                           bool ok) {
-  if (ok) {
-    SPDLOG_DEBUG("Forward message[Topic={}, MessagId={}] to DLQ OK", 
message->topic(), message->id());
-    auto process_queue_ptr = process_queue.lock();
-    if (process_queue_ptr) {
-      process_queue_ptr->unbindFifoConsumeTask();
-    }
-    return;
-  }
-
-  SPDLOG_INFO("Failed to forward message[topic={}, MessageId={}] to DLQ", 
message->topic(), message->id());
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    SPDLOG_INFO("Abort further attempts considering its process queue has 
destructed");
-    return;
-  }
-
-  auto consumer = consumer_.lock();
-  assert(consumer);
-
-  auto task = 
std::bind(&ConsumeFifoMessageService::scheduleForwardDeadLetterQueueTask, this, 
process_queue, message);
-  consumer->schedule("Scheduled-Forward-DLQ-Task", task, 
std::chrono::milliseconds(100));
-}
-
-void ConsumeFifoMessageService::scheduleForwardDeadLetterQueueTask(const 
std::weak_ptr<ProcessQueue>& process_queue,
-                                                                   
MessageConstSharedPtr message) {
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    return;
-  }
-  auto consumer = consumer_.lock();
-  assert(consumer);
-  auto callback = 
std::bind(&ConsumeFifoMessageService::onForwardToDeadLetterQueue, this, 
process_queue, message,
-                            std::placeholders::_1);
-  consumer->forwardToDeadLetterQueue(*message, callback);
-}
-
-void ConsumeFifoMessageService::scheduleAckTask(const 
std::weak_ptr<ProcessQueue>& process_queue,
-                                                MessageConstSharedPtr message) 
{
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    return;
-  }
-
-  auto callback = std::bind(&ConsumeFifoMessageService::onAck, this, 
process_queue, message, std::placeholders::_1);
-  auto consumer = consumer_.lock();
-  if (consumer) {
-    consumer->ack(*message, callback);
-  }
-}
-
-void ConsumeFifoMessageService::scheduleConsumeTask(const 
std::weak_ptr<ProcessQueue>& process_queue,
-                                                    MessageConstSharedPtr 
message) {
-  auto consumer_ptr = consumer_.lock();
-  if (!consumer_ptr) {
-    return;
-  }
-
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    return;
-  }
-
-  submitConsumeTask0(consumer_ptr, process_queue_ptr, message);
-  SPDLOG_INFO("Business callback failed to process FIFO messages. Re-submit 
consume task back to thread pool");
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeMessageServiceBase.cpp 
b/src/main/cpp/rocketmq/ConsumeMessageServiceBase.cpp
deleted file mode 100644
index 7190422..0000000
--- a/src/main/cpp/rocketmq/ConsumeMessageServiceBase.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeMessageServiceBase.h"
-#include "LoggerImpl.h"
-#include "PushConsumerImpl.h"
-#include "ThreadPoolImpl.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ConsumeMessageServiceBase::ConsumeMessageServiceBase(std::weak_ptr<PushConsumerImpl>
 consumer, int thread_count,
-                                                     MessageListener 
message_listener)
-    : state_(State::CREATED), thread_count_(thread_count), 
pool_(absl::make_unique<ThreadPoolImpl>(thread_count_)),
-      consumer_(std::move(consumer)), message_listener_(message_listener) {
-}
-
-void ConsumeMessageServiceBase::start() {
-  State expected = State::CREATED;
-  if (state_.compare_exchange_strong(expected, State::STARTING, 
std::memory_order_relaxed)) {
-    pool_->start();
-    dispatch_thread_ = std::thread([this] {
-      State current_state = state_.load(std::memory_order_relaxed);
-      while (State::STOPPED != current_state && State::STOPPING != 
current_state) {
-        dispatch();
-        {
-          absl::MutexLock lk(&dispatch_mtx_);
-          dispatch_cv_.WaitWithTimeout(&dispatch_mtx_, 
absl::Milliseconds(100));
-        }
-
-        // Update current state
-        current_state = state_.load(std::memory_order_relaxed);
-      }
-    });
-  }
-}
-
-void ConsumeMessageServiceBase::signalDispatcher() {
-  absl::MutexLock lk(&dispatch_mtx_);
-  // Wake up dispatch_thread_
-  dispatch_cv_.Signal();
-}
-
-void ConsumeMessageServiceBase::throttle(const std::string& topic, 
std::uint32_t threshold) {
-  absl::MutexLock lk(&rate_limiter_table_mtx_);
-  std::shared_ptr<RateLimiter<10>> rate_limiter = 
std::make_shared<RateLimiter<10>>(threshold);
-  rate_limiter_table_.insert_or_assign(topic, rate_limiter);
-  rate_limiter_observer_.subscribe(rate_limiter);
-}
-
-void ConsumeMessageServiceBase::shutdown() {
-  State expected = State::STOPPING;
-  if (state_.compare_exchange_strong(expected, State::STOPPED, 
std::memory_order_relaxed)) {
-    pool_->shutdown();
-    {
-      absl::MutexLock lk(&dispatch_mtx_);
-      dispatch_cv_.SignalAll();
-    }
-
-    if (dispatch_thread_.joinable()) {
-      dispatch_thread_.join();
-    }
-
-    rate_limiter_observer_.stop();
-  }
-}
-
-bool ConsumeMessageServiceBase::hasConsumeRateLimiter(const std::string& 
topic) const {
-  absl::MutexLock lk(&rate_limiter_table_mtx_);
-  return rate_limiter_table_.contains(topic);
-}
-
-std::shared_ptr<RateLimiter<10>> ConsumeMessageServiceBase::rateLimiter(const 
std::string& topic) const {
-  if (!hasConsumeRateLimiter(topic)) {
-    return nullptr;
-  }
-  absl::MutexLock lk(&rate_limiter_table_mtx_);
-  return rate_limiter_table_[topic];
-}
-
-void ConsumeMessageServiceBase::dispatch() {
-  std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
-  if (!consumer) {
-    SPDLOG_WARN("The consumer has already destructed");
-    return;
-  }
-
-  auto callback = [this](const std::shared_ptr<ProcessQueue>& process_queue) { 
submitConsumeTask(process_queue); };
-  consumer->iterateProcessQueue(callback);
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp 
b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
new file mode 100644
index 0000000..9c9ad49
--- /dev/null
+++ b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ConsumeMessageServiceImpl.h"
+
+#include "ConsumeTask.h"
+#include "LoggerImpl.h"
+#include "PushConsumerImpl.h"
+#include "ThreadPoolImpl.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+ConsumeMessageServiceImpl::ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl>
 consumer,
+                                                     int thread_count,
+                                                     MessageListener 
message_listener)
+    : state_(State::CREATED),
+      thread_count_(thread_count),
+      pool_(absl::make_unique<ThreadPoolImpl>(thread_count_)),
+      consumer_(std::move(consumer)),
+      message_listener_(message_listener) {
+}
+
+void ConsumeMessageServiceImpl::start() {
+  State expected = State::CREATED;
+  if (state_.compare_exchange_strong(expected, State::STARTING, 
std::memory_order_relaxed)) {
+    pool_->start();
+  }
+}
+
+void ConsumeMessageServiceImpl::shutdown() {
+  State expected = State::STOPPING;
+  if (state_.compare_exchange_strong(expected, State::STOPPED, 
std::memory_order_relaxed)) {
+    pool_->shutdown();
+  }
+}
+
+void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> 
process_queue,
+                                         std::vector<MessageConstSharedPtr> 
messages) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  if (consumer->config().subscriber.fifo) {
+    auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), 
process_queue, std::move(messages));
+    pool_->submit([consume_task]() { consume_task->process(); });
+    return;
+  }
+
+  for (auto message : messages) {
+    auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), 
process_queue, message);
+    pool_->submit([consume_task]() { consume_task->process(); });
+  }
+}
+
+void ConsumeMessageServiceImpl::submit(std::shared_ptr<ConsumeTask> task) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  pool_->submit([task]() { task->process(); });
+}
+
+void ConsumeMessageServiceImpl::ack(const Message& message, 
std::function<void(const std::error_code&)> cb) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  consumer->ack(message, cb);
+}
+
+void ConsumeMessageServiceImpl::nack(const Message& message, 
std::function<void(const std::error_code&)> cb) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  consumer->nack(message, cb);
+}
+
+void ConsumeMessageServiceImpl::forward(const Message& message, 
std::function<void(const std::error_code&)> cb) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+  consumer->forwardToDeadLetterQueue(message, cb);
+}
+
+void ConsumeMessageServiceImpl::schedule(std::shared_ptr<ConsumeTask> task, 
std::chrono::milliseconds delay) {
+}
+
+std::size_t ConsumeMessageServiceImpl::maxDeliveryAttempt() {
+  std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
+  if (!consumer) {
+    SPDLOG_WARN("The consumer has already destructed");
+    return 0;
+  }
+
+  return consumer->maxDeliveryAttempts();
+}
+
+bool ConsumeMessageServiceImpl::preHandle(const Message& message) {
+  return true;
+}
+
+bool ConsumeMessageServiceImpl::postHandle(const Message& message, 
ConsumeResult result) {
+  return true;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp 
b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
deleted file mode 100644
index b478df9..0000000
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeStandardMessageService.h"
-
-#include <limits>
-#include <string>
-#include <system_error>
-#include <utility>
-
-#include "LoggerImpl.h"
-#include "MessageExt.h"
-#include "MixAll.h"
-#include "Protocol.h"
-#include "PushConsumerImpl.h"
-#include "TracingUtility.h"
-#include "UtilAll.h"
-#include "absl/memory/memory.h"
-#include "absl/strings/str_join.h"
-#include "absl/time/time.h"
-#include "absl/types/span.h"
-#include "rocketmq/Message.h"
-#include "rocketmq/MessageListener.h"
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ConsumeStandardMessageService::ConsumeStandardMessageService(std::weak_ptr<PushConsumerImpl>
 consumer, int thread_count,
-                                                             MessageListener 
message_listener)
-    : ConsumeMessageServiceBase(std::move(consumer), thread_count, 
message_listener) {
-}
-
-void ConsumeStandardMessageService::start() {
-  ConsumeMessageServiceBase::start();
-  State expected = State::STARTING;
-  if (state_.compare_exchange_strong(expected, State::STARTED)) {
-    SPDLOG_DEBUG("ConsumeMessageConcurrentlyService started");
-  }
-}
-
-void ConsumeStandardMessageService::shutdown() {
-  while (State::STARTING == state_.load(std::memory_order_relaxed)) {
-    absl::SleepFor(absl::Milliseconds(10));
-  }
-
-  State expected = State::STARTED;
-  if (state_.compare_exchange_strong(expected, State::STOPPING)) {
-    ConsumeMessageServiceBase::shutdown();
-    SPDLOG_DEBUG("ConsumeMessageConcurrentlyService shut down");
-  }
-}
-
-void ConsumeStandardMessageService::submitConsumeTask(const 
std::weak_ptr<ProcessQueue>& process_queue) {
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr) {
-    SPDLOG_WARN("ProcessQueue was destructed. It is likely that client should 
have shutdown.");
-    return;
-  }
-  std::shared_ptr<PushConsumerImpl> consumer = 
process_queue_ptr->getConsumer().lock();
-
-  if (!consumer) {
-    return;
-  }
-
-  std::string topic = process_queue_ptr->topic();
-  bool has_more = true;
-  std::weak_ptr<ConsumeStandardMessageService> service(shared_from_this());
-  while (has_more) {
-    std::vector<MessageConstSharedPtr> messages;
-    uint32_t batch_size = 1;
-    has_more = process_queue_ptr->take(batch_size, messages);
-    if (messages.empty()) {
-      assert(!has_more);
-      break;
-    }
-
-    // In case custom executor is used.
-    const Executor& custom_executor = consumer->customExecutor();
-    if (custom_executor) {
-      std::function<void(void)> consume_task =
-          std::bind(&ConsumeStandardMessageService::consumeTask, service, 
process_queue, messages);
-      custom_executor(consume_task);
-      SPDLOG_DEBUG("Submit consumer task to custom executor with 
message-batch-size={}", messages.size());
-      continue;
-    }
-
-    // submit batch message
-    std::function<void(void)> consume_task =
-        std::bind(&ConsumeStandardMessageService::consumeTask, service, 
process_queue_ptr, messages);
-    SPDLOG_DEBUG("Submit consumer task to thread pool with 
message-batch-size={}", messages.size());
-    pool_->submit(consume_task);
-  }
-}
-
-void 
ConsumeStandardMessageService::consumeTask(std::weak_ptr<ConsumeStandardMessageService>
 service,
-                                                const 
std::weak_ptr<ProcessQueue>& process_queue,
-                                                const 
std::vector<MessageConstSharedPtr>& msgs) {
-  auto process_queue_ptr = process_queue.lock();
-  if (!process_queue_ptr || msgs.empty()) {
-    return;
-  }
-
-  auto svc = service.lock();
-  if (!svc) {
-    return;
-  }
-
-  auto process_queue_shared_ptr = process_queue.lock();
-  if (!process_queue_shared_ptr) {
-    return;
-  }
-
-  svc->consume(process_queue_shared_ptr, msgs);
-}
-
-void ConsumeStandardMessageService::consume(const 
std::shared_ptr<ProcessQueue>& process_queue,
-                                            const 
std::vector<MessageConstSharedPtr>& msgs) {
-  std::string topic = (*msgs.begin())->topic();
-  ConsumeResult status;
-
-  std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
-  // consumer might have been destructed.
-  if (!consumer) {
-    return;
-  }
-
-  std::shared_ptr<RateLimiter<10>> rate_limiter = rateLimiter(topic);
-  if (rate_limiter) {
-    // Acquire permits one-by-one to avoid large batch hungry issue.
-    for (std::size_t i = 0; i < msgs.size(); i++) {
-      rate_limiter->acquire();
-    }
-    SPDLOG_DEBUG("{} rate-limit permits acquired", msgs.size());
-  }
-
-  // Record await-consumption-span
-  {
-    for (const auto& msg : msgs) {
-      if (!msg->traceContext().has_value()) {
-        continue;
-      }
-      auto span_context = 
opencensus::trace::propagation::FromTraceParentHeader(msg->traceContext().value());
-
-      auto span = opencensus::trace::Span::BlankSpan();
-      std::string span_name = consumer->resourceNamespace() + "/" + 
msg->topic() + " " +
-                              
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
-      if (span_context.IsValid()) {
-        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context, traceSampler());
-      } else {
-        span = opencensus::trace::Span::StartSpan(span_name, nullptr, 
{traceSampler()});
-      }
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
-                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
-                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
-      TracingUtility::addUniversalSpanAttributes(*msg, consumer->config(), 
span);
-      // 
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, 
msg.getStoreTimestamp());
-      absl::Time decoded_timestamp = 
absl::FromChrono(msg->extension().decode_time);
-      span.AddAnnotation(
-          MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
-          {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
-            
opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp
 - absl::UnixEpoch()))}});
-      span.End();
-      // MessageAccessor::setTraceContext(const_cast<MessageExt&>(msg),
-      //                                  
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
-    }
-  }
-
-  // Trace start of consume message
-  std::vector<opencensus::trace::Span> spans;
-  {
-    for (const auto& msg : msgs) {
-      if (!msg->traceContext().has_value()) {
-        continue;
-      }
-      auto span_context = 
opencensus::trace::propagation::FromTraceParentHeader(msg->traceContext().value());
-      auto span = opencensus::trace::Span::BlankSpan();
-      std::string span_name = consumer->resourceNamespace() + "/" + 
msg->topic() + " " +
-                              
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
-      if (span_context.IsValid()) {
-        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context);
-      } else {
-        span = opencensus::trace::Span::StartSpan(span_name);
-      }
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
-                        
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
-                        
MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
-      TracingUtility::addUniversalSpanAttributes(*msg, consumer->config(), 
span);
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 
msg->extension().delivery_attempt);
-      // 
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, 
msg.extension.store_time);
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, 
msgs.size());
-      spans.emplace_back(std::move(span));
-      // MessageAccessor::setTraceContext(const_cast<MessageExt&>(msg),
-      //                                  
opencensus::trace::propagation::ToTraceParentHeader(span.context()));
-    }
-  }
-
-  auto steady_start = std::chrono::steady_clock::now();
-
-  try {
-    // TODO:
-    status = message_listener_(**msgs.begin());
-  } catch (...) {
-    status = ConsumeResult::FAILURE;
-    SPDLOG_ERROR("Business callback raised an exception when consumeMessage");
-  }
-
-  auto duration = std::chrono::steady_clock::now() - steady_start;
-
-  // Trace end of consumption
-  {
-    for (auto& span : spans) {
-      switch (status) {
-        case ConsumeResult::SUCCESS:
-          span.SetStatus(opencensus::trace::StatusCode::OK);
-          break;
-        case ConsumeResult::FAILURE:
-          span.SetStatus(opencensus::trace::StatusCode::UNKNOWN);
-          break;
-      }
-      span.End();
-    }
-  }
-
-  // Log client consume-time costs
-  SPDLOG_DEBUG("Business callback spent {}ms processing {} messages.", 
MixAll::millisecondsOf(duration), msgs.size());
-
-  for (const auto& msg : msgs) {
-    const std::string& message_id = msg->id();
-
-    // Release message number and memory quota
-    process_queue->release(msg->body().size());
-
-    if (status == ConsumeResult::SUCCESS) {
-      auto callback = [process_queue, message_id](const std::error_code& ec) {
-        if (ec) {
-          SPDLOG_WARN("Failed to acknowledge message[MessageQueue={}, 
MsgId={}]. Cause: {}",
-                      process_queue->simpleName(), message_id, ec.message());
-        } else {
-          SPDLOG_DEBUG("Acknowledge message[MessageQueue={}, MsgId={}] OK", 
process_queue->simpleName(), message_id);
-        }
-      };
-      consumer->ack(*msg, callback);
-    } else {
-      auto callback = [process_queue, message_id](const std::error_code& ec) {
-        if (ec) {
-          SPDLOG_WARN(
-              "Failed to negative acknowledge message[MessageQueue={}, 
MsgId={}]. Cause: {} Message will be "
-              "re-consumed after default invisible time",
-              process_queue->simpleName(), message_id, ec.message());
-          return;
-        }
-
-        SPDLOG_DEBUG("Nack message[MessageQueue={}, MsgId={}] OK", 
process_queue->simpleName(), message_id);
-      };
-      consumer->nack(*msg, callback);
-    }
-  }
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeTask.cpp 
b/src/main/cpp/rocketmq/ConsumeTask.cpp
new file mode 100644
index 0000000..dede4a4
--- /dev/null
+++ b/src/main/cpp/rocketmq/ConsumeTask.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeTask.h"
+
+#include "LoggerImpl.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
+                         std::weak_ptr<ProcessQueue> process_queue,
+                         MessageConstSharedPtr message)
+    : service_(service), process_queue_(std::move(process_queue)) {
+  messages_.emplace_back(message);
+}
+
+ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
+                         std::weak_ptr<ProcessQueue> process_queue,
+                         std::vector<MessageConstSharedPtr> messages)
+    : service_(service), process_queue_(std::move(process_queue)), 
messages_(std::move(messages)) {
+  fifo_ = messages_.size() > 1;
+}
+
+void ConsumeTask::pop() {
+  assert(!messages_.empty());
+  auto process_queue = process_queue_.lock();
+  if (!process_queue) {
+    return;
+  }
+
+  process_queue->release(messages_[0]->body().size());
+
+  messages_.erase(messages_.begin());
+}
+
+void ConsumeTask::submit() {
+  auto svc = service_.lock();
+  if (!svc) {
+    return;
+  }
+  svc->submit(shared_from_this());
+}
+
+void ConsumeTask::schedule() {
+  auto svc = service_.lock();
+  if (!svc) {
+    return;
+  }
+
+  svc->schedule(shared_from_this(), std::chrono::seconds(1));
+}
+
+void ConsumeTask::onAck(std::shared_ptr<ConsumeTask> task, const 
std::error_code& ec) {
+  if (task->fifo_ && ec) {
+    auto service = task->service_.lock();
+    task->next_step_ = NextStep::Ack;
+    task->schedule();
+  } else {
+    // If it is not FIFO or ack operation succeeded
+    task->pop();
+    task->next_step_ = NextStep::Consume;
+  }
+  task->submit();
+}
+
+void ConsumeTask::onNack(std::shared_ptr<ConsumeTask> task, const 
std::error_code& ec) {
+  assert(!task->fifo_);
+  assert(!task->messages_.empty());
+  if (ec) {
+    SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}", 
task->messages_[0]->id(), ec.message());
+  }
+  task->pop();
+  task->next_step_ = NextStep::Consume;
+  task->submit();
+}
+
+void ConsumeTask::onForward(std::shared_ptr<ConsumeTask> task, const 
std::error_code& ec) {
+  assert(task->fifo_);
+  assert(!task->messages_.empty());
+  if (ec) {
+    SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ", 
task->messages_[0]->id());
+    task->next_step_ = NextStep::Forward;
+    task->schedule();
+  } else {
+    SPDLOG_DEBUG("Message[message-id={}] forwarded to DLQ", 
task->messages_[0]->id());
+    task->pop();
+    task->next_step_ = NextStep::Consume;
+    task->submit();
+  }
+}
+
+void ConsumeTask::process() {
+  auto svc = service_.lock();
+  if (!svc) {
+    SPDLOG_DEBUG("ConsumeMessageService has destructed");
+    return;
+  }
+
+  if (messages_.empty()) {
+    SPDLOG_DEBUG("No more messages to process");
+    return;
+  }
+
+  auto self = shared_from_this();
+
+  switch (next_step_) {
+    case NextStep::Consume: {
+      const auto& listener = svc->listener();
+      auto it = messages_.begin();
+      SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
+      svc->preHandle(**it);
+      auto result = listener(**it);
+      svc->postHandle(**it, result);
+      switch (result) {
+        case ConsumeResult::SUCCESS: {
+          auto callback = std::bind(&ConsumeTask::onAck, self, 
std::placeholders::_1);
+          svc->ack(**it, callback);
+          break;
+        }
+        case ConsumeResult::FAILURE: {
+          if (fifo_) {
+            next_step_ = NextStep::Consume;
+            // Increase delivery attempts.
+            auto raw = const_cast<Message*>((*it).get());
+            raw->mutableExtension().delivery_attempt++;
+            schedule();
+          } else {
+            // For standard way of processing, Nack to server.
+            auto callback = std::bind(&ConsumeTask::onNack, self, 
std::placeholders::_1);
+            svc->nack(**it, callback);
+          }
+          break;
+        }
+      }
+      break;
+    }
+
+    case NextStep::Ack: {
+      assert(!messages_.empty());
+      auto callback = std::bind(&ConsumeTask::onAck, self, 
std::placeholders::_1);
+      svc->ack(*messages_[0], callback);
+      break;
+    }
+    case NextStep::Forward: {
+      assert(!messages_.empty());
+      auto callback = std::bind(&ConsumeTask::onForward, self, 
std::placeholders::_1);
+      svc->forward(*messages_[0], callback);
+      break;
+    }
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp 
b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index cd94d9f..89dc112 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -24,8 +24,7 @@
 
 #include "AsyncReceiveMessageCallback.h"
 #include "ClientManagerFactory.h"
-#include "ConsumeFifoMessageService.h"
-#include "ConsumeStandardMessageService.h"
+#include "ConsumeMessageServiceImpl.h"
 #include "MixAll.h"
 #include "ProcessQueueImpl.h"
 #include "Protocol.h"
@@ -72,27 +71,11 @@ void PushConsumerImpl::start() {
 
   fetchRoutes();
 
-  if (client_config_.subscriber.fifo) {
-    SPDLOG_INFO("start orderly consume service: {}", 
client_config_.subscriber.group.name());
-    consume_message_service_ =
-        std::make_shared<ConsumeFifoMessageService>(shared_from_this(), 
consume_thread_pool_size_, message_listener_);
-    consume_batch_size_ = 1;
-  } else {
-    // For backward compatibility, by default, 
ConsumeMessageConcurrentlyService is assumed.
-    SPDLOG_INFO("start concurrently consume service: {}", 
client_config_.subscriber.group.name());
-    consume_message_service_ = std::make_shared<ConsumeStandardMessageService>(
-        shared_from_this(), consume_thread_pool_size_, message_listener_);
-  }
+  SPDLOG_INFO("start concurrently consume service: {}", 
client_config_.subscriber.group.name());
+  consume_message_service_ =
+      std::make_shared<ConsumeMessageServiceImpl>(shared_from_this(), 
consume_thread_pool_size_, message_listener_);
   consume_message_service_->start();
 
-  {
-    // Set consumer throttling
-    absl::MutexLock lock(&throttle_table_mtx_);
-    for (const auto& item : throttle_table_) {
-      consume_message_service_->throttle(item.first, item.second);
-    }
-  }
-
   // Heartbeat depends on initialization of consume-message-service
   heartbeat();
 
@@ -394,7 +377,8 @@ void PushConsumerImpl::nack(const Message& message, const 
std::function<void(con
                                            
absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
 }
 
-void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message, const 
std::function<void(bool)>& cb) {
+void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message,
+                                                const std::function<void(const 
std::error_code&)>& cb) {
   std::string target_host = message.extension().target_endpoint;
 
   absl::flat_hash_map<std::string, std::string> metadata;
@@ -449,19 +433,6 @@ std::size_t PushConsumerImpl::getProcessQueueTableSize() {
 void PushConsumerImpl::setThrottle(const std::string& topic, uint32_t 
threshold) {
   absl::MutexLock lock(&throttle_table_mtx_);
   throttle_table_.emplace(topic, threshold);
-  // If consumer has started, update it dynamically.
-  if (getConsumeMessageService()) {
-    getConsumeMessageService()->throttle(topic, threshold);
-  }
-}
-
-void PushConsumerImpl::iterateProcessQueue(const 
std::function<void(std::shared_ptr<ProcessQueue>)>& callback) {
-  absl::MutexLock lock(&process_queue_table_mtx_);
-  for (const auto& item : process_queue_table_) {
-    if (item.second->hasPendingMessages()) {
-      callback(item.second);
-    }
-  }
 }
 
 void PushConsumerImpl::fetchRoutes() {
diff --git a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h 
b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
deleted file mode 100644
index 5f95d96..0000000
--- a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ConsumeMessageServiceBase.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ConsumeFifoMessageService : public ConsumeMessageServiceBase,
-                                  public 
std::enable_shared_from_this<ConsumeFifoMessageService> {
-public:
-  ConsumeFifoMessageService(std::weak_ptr<PushConsumerImpl> consumer, int 
thread_count,
-                            MessageListener message_listener);
-  void start() override;
-
-  void shutdown() override;
-
-  /**
-   * @brief Entry of ConsumeMessageService
-   *
-   * @param process_queue
-   */
-  void submitConsumeTask(const std::weak_ptr<ProcessQueue>& process_queue) 
override;
-
-private:
-  void consumeTask(const std::weak_ptr<ProcessQueue>& process_queue, 
MessageConstSharedPtr message);
-
-  void submitConsumeTask0(const std::shared_ptr<PushConsumerImpl>& consumer,
-                          const std::weak_ptr<ProcessQueue>& process_queue,
-                          MessageConstSharedPtr message);
-
-  void scheduleAckTask(const std::weak_ptr<ProcessQueue>& process_queue, 
MessageConstSharedPtr message);
-
-  void onAck(const std::weak_ptr<ProcessQueue>& process_queue, 
MessageConstSharedPtr message, const std::error_code& ec);
-
-  void scheduleConsumeTask(const std::weak_ptr<ProcessQueue>& process_queue, 
MessageConstSharedPtr message);
-
-  void onForwardToDeadLetterQueue(const std::weak_ptr<ProcessQueue>& 
process_queue, MessageConstSharedPtr message, bool ok);
-
-  void scheduleForwardDeadLetterQueueTask(const std::weak_ptr<ProcessQueue>& 
process_queue, MessageConstSharedPtr message);
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageService.h 
b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
index 0c541ea..39d9d8e 100644
--- a/src/main/cpp/rocketmq/include/ConsumeMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
@@ -17,12 +17,16 @@
 #pragma once
 
 #include <cstdint>
+#include <memory>
+#include <system_error>
 
 #include "ProcessQueue.h"
 #include "rocketmq/MessageListener.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
+class ConsumeTask;
+
 class ConsumeMessageService {
 public:
   virtual ~ConsumeMessageService() = default;
@@ -38,11 +42,28 @@ public:
    */
   virtual void shutdown() = 0;
 
-  virtual void submitConsumeTask(const std::weak_ptr<ProcessQueue>& 
process_queue) = 0;
+  virtual void dispatch(std::shared_ptr<ProcessQueue> process_queue, 
std::vector<MessageConstSharedPtr> messages) = 0;
+
+  virtual void submit(std::shared_ptr<ConsumeTask> task) = 0;
+
+  virtual MessageListener& listener() = 0;
+
+  virtual bool preHandle(const Message& message) = 0;
+
+  virtual bool postHandle(const Message& message, ConsumeResult result) = 0;
 
-  virtual void signalDispatcher() = 0;
+  virtual void ack(const Message& message, std::function<void(const 
std::error_code& ec)> cb) = 0;
 
-  virtual void throttle(const std::string& topic, std::uint32_t threshold) = 0;
+  virtual void nack(const Message& message, std::function<void(const 
std::error_code& ec)> cb) = 0;
+
+  virtual void forward(const Message& message, std::function<void(const 
std::error_code& ec)> cb) = 0;
+
+  virtual void schedule(std::shared_ptr<ConsumeTask> task, 
std::chrono::milliseconds delay) = 0;
+
+  virtual std::size_t maxDeliveryAttempt() = 0;
 };
 
+using ConsumeMessageServiceWeakPtr = std::weak_ptr<ConsumeMessageService>;
+using ConsumeMessageServiceSharedPtr = std::shared_ptr<ConsumeMessageService>;
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageServiceBase.h 
b/src/main/cpp/rocketmq/include/ConsumeMessageServiceBase.h
deleted file mode 100644
index 9e9ff43..0000000
--- a/src/main/cpp/rocketmq/include/ConsumeMessageServiceBase.h
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <memory>
-#include <mutex>
-#include <string>
-#include <system_error>
-
-#include "absl/container/flat_hash_map.h"
-
-#include "ConsumeMessageService.h"
-#include "RateLimiter.h"
-#include "ThreadPool.h"
-#include "rocketmq/State.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class PushConsumerImpl;
-
-class ConsumeMessageServiceBase : public ConsumeMessageService {
-public:
-  ConsumeMessageServiceBase(std::weak_ptr<PushConsumerImpl> consumer, int 
thread_count,
-                            MessageListener message_listener);
-
-  ~ConsumeMessageServiceBase() override = default;
-
-  /**
-   * Make it noncopyable.
-   */
-  ConsumeMessageServiceBase(const ConsumeMessageServiceBase& other) = delete;
-  ConsumeMessageServiceBase& operator=(const ConsumeMessageServiceBase& other) 
= delete;
-
-  /**
-   * Start the dispatcher thread, which will dispatch messages in process 
queue to thread pool in form of runnable
-   * functor.
-   */
-  void start() override;
-
-  /**
-   * Stop the dispatcher thread and then reset the thread pool.
-   */
-  void shutdown() override;
-
-  /**
-   * Signal dispatcher thread to check new pending messages.
-   */
-  void signalDispatcher() override;
-
-  /**
-   * Set throttle threshold per topic.
-   *
-   * @param topic
-   * @param threshold
-   */
-  void throttle(const std::string& topic, std::uint32_t threshold) override;
-
-  bool hasConsumeRateLimiter(const std::string& topic) const 
LOCKS_EXCLUDED(rate_limiter_table_mtx_);
-
-  std::shared_ptr<RateLimiter<10>> rateLimiter(const std::string& topic) const 
LOCKS_EXCLUDED(rate_limiter_table_mtx_);
-
-protected:
-  RateLimiterObserver rate_limiter_observer_;
-
-  mutable absl::flat_hash_map<std::string, std::shared_ptr<RateLimiter<10>>>
-      rate_limiter_table_ GUARDED_BY(rate_limiter_table_mtx_);
-  mutable absl::Mutex rate_limiter_table_mtx_; // Protects rate_limiter_table_
-
-  std::atomic<State> state_;
-
-  int thread_count_;
-  std::unique_ptr<ThreadPool> pool_;
-  std::weak_ptr<PushConsumerImpl> consumer_;
-
-  absl::Mutex dispatch_mtx_;
-  std::thread dispatch_thread_;
-  absl::CondVar dispatch_cv_;
-
-  MessageListener message_listener_;
-
-  /**
-   * Dispatch messages to thread pool. Implementation of this function should 
be sub-class specific.
-   */
-  void dispatch();
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h 
b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
new file mode 100644
index 0000000..7d6c34a
--- /dev/null
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <system_error>
+
+#include "ConsumeMessageService.h"
+#include "RateLimiter.h"
+#include "ThreadPool.h"
+#include "absl/container/flat_hash_map.h"
+#include "rocketmq/State.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class PushConsumerImpl;
+
+class ConsumeMessageServiceImpl : public ConsumeMessageService,
+                                  public 
std::enable_shared_from_this<ConsumeMessageServiceImpl> {
+public:
+  ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl> consumer,
+                            int thread_count,
+                            MessageListener message_listener);
+
+  ~ConsumeMessageServiceImpl() override = default;
+
+  /**
+   * Make it noncopyable.
+   */
+  ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl& other) = delete;
+  ConsumeMessageServiceImpl& operator=(const ConsumeMessageServiceImpl& other) 
= delete;
+
+  void start() override;
+
+  void shutdown() override;
+
+  MessageListener& listener() override {
+    return message_listener_;
+  }
+
+  bool preHandle(const Message& message) override;
+
+  bool postHandle(const Message& message, ConsumeResult result) override;
+
+  void submit(std::shared_ptr<ConsumeTask> task) override;
+
+  void dispatch(std::shared_ptr<ProcessQueue> process_queue, 
std::vector<MessageConstSharedPtr> messages) override;
+
+  void ack(const Message& message, std::function<void(const std::error_code&)> 
cb) override;
+
+  void nack(const Message& message, std::function<void(const 
std::error_code&)> cb) override;
+
+  void forward(const Message& message, std::function<void(const 
std::error_code&)> cb) override;
+
+  void schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds 
delay) override;
+
+  std::size_t maxDeliveryAttempt() override;
+
+protected:
+  std::atomic<State> state_;
+
+  int thread_count_;
+  std::unique_ptr<ThreadPool> pool_;
+  std::weak_ptr<PushConsumerImpl> consumer_;
+
+  MessageListener message_listener_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeStandardMessageService.h 
b/src/main/cpp/rocketmq/include/ConsumeStandardMessageService.h
deleted file mode 100644
index e98f257..0000000
--- a/src/main/cpp/rocketmq/include/ConsumeStandardMessageService.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "ConsumeMessageServiceBase.h"
-#include "ProcessQueue.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ConsumeStandardMessageService : public ConsumeMessageServiceBase,
-                                      public 
std::enable_shared_from_this<ConsumeStandardMessageService> {
-public:
-  ConsumeStandardMessageService(std::weak_ptr<PushConsumerImpl> consumer, int 
thread_count,
-                                MessageListener message_listener);
-
-  ~ConsumeStandardMessageService() override = default;
-
-  void start() override;
-
-  void shutdown() override;
-
-  void submitConsumeTask(const std::weak_ptr<ProcessQueue>& process_queue) 
override;
-
-private:
-  static void consumeTask(std::weak_ptr<ConsumeStandardMessageService> service,
-                          const std::weak_ptr<ProcessQueue>& process_queue,
-                          const std::vector<MessageConstSharedPtr>& msgs);
-
-  void consume(const std::shared_ptr<ProcessQueue>& process_queue, const 
std::vector<MessageConstSharedPtr>& msgs);
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/ConsumeTask.h 
b/src/main/cpp/rocketmq/include/ConsumeTask.h
new file mode 100644
index 0000000..80ca971
--- /dev/null
+++ b/src/main/cpp/rocketmq/include/ConsumeTask.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "ConsumeMessageService.h"
+#include "rocketmq/Message.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class ProcessQueue;
+
+/**
+ * @brief Operation to take for the consume-task.
+ */
+enum class NextStep : std::uint8_t
+{
+  /**
+   * @brief Continue to consume the remaining messages.
+   */
+  Consume = 0,
+
+  /**
+   * @brief Ack the head, aka, messages_[0].
+   */
+  Ack,
+
+  /**
+   * @brief Forward the head, aka, messages_[0], to dead-letter-queue.
+   */
+  Forward,
+};
+
+class ConsumeTask : public std::enable_shared_from_this<ConsumeTask> {
+public:
+  ConsumeTask(ConsumeMessageServiceWeakPtr service,
+              std::weak_ptr<ProcessQueue> process_queue,
+              MessageConstSharedPtr message);
+
+  ConsumeTask(ConsumeMessageServiceWeakPtr service,
+              std::weak_ptr<ProcessQueue> process_queue,
+              std::vector<MessageConstSharedPtr> messages);
+
+  void process();
+
+  void submit();
+
+  void schedule();
+
+private:
+  ConsumeMessageServiceWeakPtr service_;
+  std::weak_ptr<ProcessQueue> process_queue_;
+  std::vector<MessageConstSharedPtr> messages_;
+  bool fifo_{false};
+  NextStep next_step_{NextStep::Consume};
+
+  /**
+   * @brief messages_[0] has completed its life-cycle.
+   */
+  void pop();
+
+  static void onAck(std::shared_ptr<ConsumeTask> task, const std::error_code& 
ec);
+
+  static void onNack(std::shared_ptr<ConsumeTask> task, const std::error_code& 
ec);
+
+  static void onForward(std::shared_ptr<ConsumeTask> task, const 
std::error_code& ec);
+};
+
+using ConsumeTaskSharedPtr = std::shared_ptr<ConsumeTask>;
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h 
b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 58ba4e5..cf5fe50 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -119,7 +119,7 @@ public:
 
   void nack(const Message& message, const std::function<void(const 
std::error_code&)>& callback);
 
-  void forwardToDeadLetterQueue(const Message& message, const 
std::function<void(bool)>& cb);
+  void forwardToDeadLetterQueue(const Message& message, const 
std::function<void(const std::error_code&)>& cb);
 
   void wrapAckMessageRequest(const Message& msg, AckMessageRequest& request);
 
@@ -154,8 +154,6 @@ public:
     return MixAll::DEFAULT_CACHED_MESSAGE_MEMORY;
   }
 
-  void iterateProcessQueue(const 
std::function<void(std::shared_ptr<ProcessQueue>)>& callback);
-
   MessageListener& messageListener() {
     return message_listener_;
   }
diff --git a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h 
b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
index 0a9a361..c6050ae 100644
--- a/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
+++ b/src/main/cpp/rocketmq/mocks/include/PushConsumerMock.h
@@ -25,8 +25,6 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class PushConsumerMock : virtual public PushConsumer, virtual public 
ConsumerMock {
 public:
-  MOCK_METHOD(void, iterateProcessQueue, (const 
std::function<void(std::shared_ptr<ProcessQueue>)>&), (override));
-
   MOCK_METHOD(MessageModel, messageModel, (), (const override));
 
   MOCK_METHOD(void, ack, (const MQMessageExt&, const std::function<void(const 
std::error_code&)>&), (override));

Reply via email to