This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c217d8bb8 [ISSUE #5481] Decrease the repeated consumption probability 
of expired message (#5483)
c217d8bb8 is described below

commit c217d8bb80d933da4bf21c36041bae684072c629
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Nov 9 20:02:20 2022 +0800

    [ISSUE #5481] Decrease the repeated consumption probability of expired 
message (#5483)
    
    * Decrease the repeated consumption probability of expired message
    
    * Polish code
    
    * Make test method name consistent with that of main source file
    
    Co-authored-by: Li Zhanhui <[email protected]>
---
 .../consumer/ConsumeMessageConcurrentlyService.java |  7 +++++++
 .../rocketmq/client/impl/consumer/ProcessQueue.java | 21 +++++++++++++++++++++
 .../client/impl/consumer/ProcessQueueTest.java      | 13 +++++++++++++
 3 files changed, 41 insertions(+)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 0fbdc5ced..aee699ea2 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -284,6 +284,13 @@ public class ConsumeMessageConcurrentlyService implements 
ConsumeMessageService
                 List<MessageExt> msgBackFailed = new 
ArrayList<>(consumeRequest.getMsgs().size());
                 for (int i = ackIndex + 1; i < 
consumeRequest.getMsgs().size(); i++) {
                     MessageExt msg = consumeRequest.getMsgs().get(i);
+                    // Maybe message is expired and cleaned, just ignore it.
+                    if 
(!consumeRequest.getProcessQueue().containsMessage(msg)) {
+                        log.info("Message is not found in its process queue; 
skip send-back-procedure, topic={}, "
+                                + "brokerName={}, queueId={}, queueOffset={}", 
msg.getTopic(), msg.getBrokerName(),
+                            msg.getQueueId(), msg.getQueueOffset());
+                        continue;
+                    }
                     boolean result = this.sendMessageBack(msg, context);
                     if (!result) {
                         msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 9ffab3cba..0fdec4737 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -334,6 +334,27 @@ public class ProcessQueue {
         return result;
     }
 
+    /**
+     * Return the result that whether current message is exist in the process 
queue or not.
+     */
+    public boolean containsMessage(MessageExt message) {
+        if (message == null) {
+            // should never reach here.
+            return false;
+        }
+        try {
+            this.treeMapLock.readLock().lockInterruptibly();
+            try {
+                return this.msgTreeMap.containsKey(message.getQueueOffset());
+            } finally {
+                this.treeMapLock.readLock().unlock();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to check message's existence in process queue, 
message={}", message, t);
+        }
+        return false;
+    }
+
     public boolean hasTempMessage() {
         try {
             this.treeMapLock.readLock().lockInterruptibly();
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
index a31c5fb25..259d6430b 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.assertj.core.util.Lists;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -64,6 +65,18 @@ public class ProcessQueueTest {
         assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
     }
 
+    @Test
+    public void testContainsMessage() {
+        ProcessQueue pq = new ProcessQueue();
+        final List<MessageExt> messageList = createMessageList(2);
+        final MessageExt message0 = messageList.get(0);
+        final MessageExt message1 = messageList.get(1);
+
+        pq.putMessage(Lists.list(message0));
+        assertThat(pq.containsMessage(message0)).isTrue();
+        assertThat(pq.containsMessage(message1)).isFalse();
+    }
+
     @Test
     public void testFillProcessQueueInfo() {
         ProcessQueue pq = new ProcessQueue();

Reply via email to