This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c217d8bb8 [ISSUE #5481] Decrease the repeated consumption probability
of expired message (#5483)
c217d8bb8 is described below
commit c217d8bb80d933da4bf21c36041bae684072c629
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Nov 9 20:02:20 2022 +0800
[ISSUE #5481] Decrease the repeated consumption probability of expired
message (#5483)
* Decrease the repeated consumption probability of expired message
* Polish code
* Make test method name consistent with that of main source file
Co-authored-by: Li Zhanhui <[email protected]>
---
.../consumer/ConsumeMessageConcurrentlyService.java | 7 +++++++
.../rocketmq/client/impl/consumer/ProcessQueue.java | 21 +++++++++++++++++++++
.../client/impl/consumer/ProcessQueueTest.java | 13 +++++++++++++
3 files changed, 41 insertions(+)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 0fbdc5ced..aee699ea2 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -284,6 +284,13 @@ public class ConsumeMessageConcurrentlyService implements
ConsumeMessageService
List<MessageExt> msgBackFailed = new
ArrayList<>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i <
consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
+ // Maybe message is expired and cleaned, just ignore it.
+ if
(!consumeRequest.getProcessQueue().containsMessage(msg)) {
+ log.info("Message is not found in its process queue;
skip send-back-procedure, topic={}, "
+ + "brokerName={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getBrokerName(),
+ msg.getQueueId(), msg.getQueueOffset());
+ continue;
+ }
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 9ffab3cba..0fdec4737 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -334,6 +334,27 @@ public class ProcessQueue {
return result;
}
+ /**
+ * Return the result that whether current message is exist in the process
queue or not.
+ */
+ public boolean containsMessage(MessageExt message) {
+ if (message == null) {
+ // should never reach here.
+ return false;
+ }
+ try {
+ this.treeMapLock.readLock().lockInterruptibly();
+ try {
+ return this.msgTreeMap.containsKey(message.getQueueOffset());
+ } finally {
+ this.treeMapLock.readLock().unlock();
+ }
+ } catch (Throwable t) {
+ log.error("Failed to check message's existence in process queue,
message={}", message, t);
+ }
+ return false;
+ }
+
public boolean hasTempMessage() {
try {
this.treeMapLock.readLock().lockInterruptibly();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
index a31c5fb25..259d6430b 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@@ -64,6 +65,18 @@ public class ProcessQueueTest {
assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
}
+ @Test
+ public void testContainsMessage() {
+ ProcessQueue pq = new ProcessQueue();
+ final List<MessageExt> messageList = createMessageList(2);
+ final MessageExt message0 = messageList.get(0);
+ final MessageExt message1 = messageList.get(1);
+
+ pq.putMessage(Lists.list(message0));
+ assertThat(pq.containsMessage(message0)).isTrue();
+ assertThat(pq.containsMessage(message1)).isFalse();
+ }
+
@Test
public void testFillProcessQueueInfo() {
ProcessQueue pq = new ProcessQueue();