This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8341c13d06 [ISSUE #8402] Fix init retry topic offset incorrect when 
EscapeBridge enabled (#8404)
8341c13d06 is described below

commit 8341c13d064c96e9ef70da4fcf17e49d3e1847f9
Author: imzs <i...@foxmail.com>
AuthorDate: Tue Jul 23 10:24:12 2024 +0800

    [ISSUE #8402] Fix init retry topic offset incorrect when EscapeBridge 
enabled (#8404)
---
 .../broker/processor/PopMessageProcessor.java      | 10 ++--
 .../broker/processor/PopReviveService.java         |  4 +-
 .../broker/processor/PopMessageProcessorTest.java  | 63 +++++++++++++++++++++-
 3 files changed, 68 insertions(+), 9 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0304a5dab0..89b4c39d72 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -723,8 +723,8 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 
     private long getInitOffset(String topic, String group, int queueId, int 
initMode, boolean init) {
         long offset;
-        if (ConsumeInitMode.MIN == initMode) {
-            return 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+        if (ConsumeInitMode.MIN == initMode || 
topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            offset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
         } else {
             if 
(this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() &&
                 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 
0 &&
@@ -738,10 +738,10 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                     offset = 0;
                 }
             }
-            if (init) {
-                this.brokerController.getConsumerOffsetManager().commitOffset(
+        }
+        if (init) { // whichever initMode
+            this.brokerController.getConsumerOffsetManager().commitOffset(
                     "getPopOffset", group, topic, queueId, offset);
-            }
         }
         return offset;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index e3ba492f28..8074af23bf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -125,7 +125,7 @@ public class PopReviveService extends ServiceThread {
             msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, 
String.valueOf(popCheckPoint.getPopTime()));
         }
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
+        addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
         PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
         PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, 
putMessageResult.getPutMessageStatus());
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -153,7 +153,7 @@ public class PopReviveService extends ServiceThread {
         }
     }
 
-    private void addRetryTopicIfNoExit(String topic, String consumerGroup) {
+    public void addRetryTopicIfNotExist(String topic, String consumerGroup) {
         if (brokerController != null) {
             TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(topic);
             if (topicConfig != null) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index d8c8fa1034..8a2ce8a2ba 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -40,6 +41,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,6 +55,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -151,14 +154,70 @@ public class PopMessageProcessorTest {
         assertThat(response.getRemark()).contains("pop message is forbidden 
because timerWheelEnable is false");
     }
 
+    @Test
+    public void testGetInitOffset_retryTopic() throws RemotingCommandException 
{
+        when(messageStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
+        String newGroup = group + "-" + System.currentTimeMillis();
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, newGroup);
+        long minOffset = 100L;
+        when(messageStore.getMinOffsetInQueue(retryTopic, 
0)).thenReturn(minOffset);
+        
brokerController.getTopicConfigManager().getTopicConfigTable().put(retryTopic, 
new TopicConfig(retryTopic, 1, 1));
+        GetMessageResult getMessageResult = createGetMessageResult(0);
+        when(messageStore.getMessageAsync(eq(newGroup), anyString(), anyInt(), 
anyLong(), anyInt(), any()))
+                
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+
+        long offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
+        Assert.assertEquals(-1, offset);
+
+        RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, 
ConsumeInitMode.MAX);
+        popMessageProcessor.processRequest(handlerContext, request);
+        offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
+        Assert.assertEquals(minOffset, offset);
+
+        when(messageStore.getMinOffsetInQueue(retryTopic, 
0)).thenReturn(minOffset * 2);
+        popMessageProcessor.processRequest(handlerContext, request);
+        offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
+        Assert.assertEquals(minOffset, offset); // will not entry 
getInitOffset() again
+        messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent 
UnnecessaryStubbingException
+    }
+
+    @Test
+    public void testGetInitOffset_normalTopic() throws 
RemotingCommandException {
+        long maxOffset = 999L;
+        when(messageStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
+        when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset);
+        String newGroup = group + "-" + System.currentTimeMillis();
+        GetMessageResult getMessageResult = createGetMessageResult(0);
+        when(messageStore.getMessageAsync(eq(newGroup), anyString(), anyInt(), 
anyLong(), anyInt(), any()))
+                
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+
+        long offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
+        Assert.assertEquals(-1, offset);
+
+        RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, 
ConsumeInitMode.MAX);
+        popMessageProcessor.processRequest(handlerContext, request);
+        offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
+        Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false
+
+        when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset 
* 2);
+        popMessageProcessor.processRequest(handlerContext, request);
+        offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
+        Assert.assertEquals(maxOffset - 1, offset); // will not entry 
getInitOffset() again
+        messageStore.getMaxOffsetInQueue(topic, 0); // prevent 
UnnecessaryStubbingException
+    }
+
     private RemotingCommand createPopMsgCommand() {
+        return createPopMsgCommand(group, topic, -1, ConsumeInitMode.MAX);
+    }
+
+    private RemotingCommand createPopMsgCommand(String group, String topic, 
int queueId, int initMode) {
         PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
         requestHeader.setConsumerGroup(group);
         requestHeader.setMaxMsgNums(30);
-        requestHeader.setQueueId(-1);
+        requestHeader.setQueueId(queueId);
         requestHeader.setTopic(topic);
         requestHeader.setInvisibleTime(10_000);
-        requestHeader.setInitMode(ConsumeInitMode.MAX);
+        requestHeader.setInitMode(initMode);
         requestHeader.setOrder(false);
         requestHeader.setPollTime(15_000);
         requestHeader.setBornTime(System.currentTimeMillis());

Reply via email to