This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ce27e1fc64 [ISSUE #8332] fix: ack msg which has reached 
maxReconsumeTimes
ce27e1fc64 is described below

commit ce27e1fc643d4b6a47b7a63784fab7e8070322e9
Author: cserwen <cser...@apache.org>
AuthorDate: Tue Jul 30 09:11:55 2024 +0800

    [ISSUE #8332] fix: ack msg which has reached maxReconsumeTimes
    
    Co-authored-by: dengzhiwen1 <dengzhiw...@xiaomi.com>
---
 .../consumer/ConsumeMessagePopConcurrentlyService.java     |  2 +-
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java    | 14 ++++++++++++--
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
index 3713d1aba4..d519187110 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
@@ -471,7 +471,7 @@ public class ConsumeMessagePopConcurrentlyService 
implements ConsumeMessageServi
                     processQueue.decFoundMsg(-msgs.size());
                 }
 
-                log.warn("processQueue invalid. isDropped={}, isPopTimeout={}, 
messageQueue={}, msgs={}",
+                log.warn("processQueue invalid or popTimeout. isDropped={}, 
isPopTimeout={}, messageQueue={}, msgs={}",
                         processQueue.isDropped(), isPopTimeout(), 
messageQueue, msgs);
             }
         }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 3e832e5a9a..e66a9825f3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -621,10 +621,9 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     private PopResult processPopResult(final PopResult popResult, final 
SubscriptionData subscriptionData) {
         if (PopStatus.FOUND == popResult.getPopStatus()) {
             List<MessageExt> msgFoundList = popResult.getMsgFoundList();
-            List<MessageExt> msgListFilterAgain = msgFoundList;
+            List<MessageExt> msgListFilterAgain = new 
ArrayList<>(popResult.getMsgFoundList().size());
             if (!subscriptionData.getTagsSet().isEmpty() && 
!subscriptionData.isClassFilterMode()
                 && popResult.getMsgFoundList().size() > 0) {
-                msgListFilterAgain = new 
ArrayList<>(popResult.getMsgFoundList().size());
                 for (MessageExt msg : popResult.getMsgFoundList()) {
                     if (msg.getTags() != null) {
                         if 
(subscriptionData.getTagsSet().contains(msg.getTags())) {
@@ -632,6 +631,8 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                         }
                     }
                 }
+            } else {
+                msgListFilterAgain.addAll(msgFoundList);
             }
 
             if (!this.filterMessageHookList.isEmpty()) {
@@ -649,6 +650,15 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                 }
             }
 
+            Iterator<MessageExt> iterator = msgListFilterAgain.iterator();
+            while (iterator.hasNext()) {
+                MessageExt msg = iterator.next();
+                if (msg.getReconsumeTimes() > 
defaultMQPushConsumer.getMaxReconsumeTimes()) {
+                    iterator.remove();
+                    log.info("Reconsume times has reached {}, so ack msg={}", 
msg.getReconsumeTimes(), msg);
+                }
+            }
+
             if (msgFoundList.size() != msgListFilterAgain.size()) {
                 for (MessageExt msg : msgFoundList) {
                     if (!msgListFilterAgain.contains(msg)) {

Reply via email to