This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new ce27e1fc64 [ISSUE #8332] fix: ack msg which has reached maxReconsumeTimes ce27e1fc64 is described below commit ce27e1fc643d4b6a47b7a63784fab7e8070322e9 Author: cserwen <cser...@apache.org> AuthorDate: Tue Jul 30 09:11:55 2024 +0800 [ISSUE #8332] fix: ack msg which has reached maxReconsumeTimes Co-authored-by: dengzhiwen1 <dengzhiw...@xiaomi.com> --- .../consumer/ConsumeMessagePopConcurrentlyService.java | 2 +- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java index 3713d1aba4..d519187110 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java @@ -471,7 +471,7 @@ public class ConsumeMessagePopConcurrentlyService implements ConsumeMessageServi processQueue.decFoundMsg(-msgs.size()); } - log.warn("processQueue invalid. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}", + log.warn("processQueue invalid or popTimeout. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}", processQueue.isDropped(), isPopTimeout(), messageQueue, msgs); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 3e832e5a9a..e66a9825f3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -621,10 +621,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private PopResult processPopResult(final PopResult popResult, final SubscriptionData subscriptionData) { if (PopStatus.FOUND == popResult.getPopStatus()) { List<MessageExt> msgFoundList = popResult.getMsgFoundList(); - List<MessageExt> msgListFilterAgain = msgFoundList; + List<MessageExt> msgListFilterAgain = new ArrayList<>(popResult.getMsgFoundList().size()); if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode() && popResult.getMsgFoundList().size() > 0) { - msgListFilterAgain = new ArrayList<>(popResult.getMsgFoundList().size()); for (MessageExt msg : popResult.getMsgFoundList()) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { @@ -632,6 +631,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } } + } else { + msgListFilterAgain.addAll(msgFoundList); } if (!this.filterMessageHookList.isEmpty()) { @@ -649,6 +650,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } + Iterator<MessageExt> iterator = msgListFilterAgain.iterator(); + while (iterator.hasNext()) { + MessageExt msg = iterator.next(); + if (msg.getReconsumeTimes() > defaultMQPushConsumer.getMaxReconsumeTimes()) { + iterator.remove(); + log.info("Reconsume times has reached {}, so ack msg={}", msg.getReconsumeTimes(), msg); + } + } + if (msgFoundList.size() != msgListFilterAgain.size()) { for (MessageExt msg : msgFoundList) { if (!msgListFilterAgain.contains(msg)) {