This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 8341c13d06 [ISSUE #8402] Fix init retry topic offset incorrect when EscapeBridge enabled (#8404) 8341c13d06 is described below commit 8341c13d064c96e9ef70da4fcf17e49d3e1847f9 Author: imzs <i...@foxmail.com> AuthorDate: Tue Jul 23 10:24:12 2024 +0800 [ISSUE #8402] Fix init retry topic offset incorrect when EscapeBridge enabled (#8404) --- .../broker/processor/PopMessageProcessor.java | 10 ++-- .../broker/processor/PopReviveService.java | 4 +- .../broker/processor/PopMessageProcessorTest.java | 63 +++++++++++++++++++++- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 0304a5dab0..89b4c39d72 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -723,8 +723,8 @@ public class PopMessageProcessor implements NettyRequestProcessor { private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) { long offset; - if (ConsumeInitMode.MIN == initMode) { - return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); + if (ConsumeInitMode.MIN == initMode || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); } else { if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() && this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 0 && @@ -738,10 +738,10 @@ public class PopMessageProcessor implements NettyRequestProcessor { offset = 0; } } - if (init) { - this.brokerController.getConsumerOffsetManager().commitOffset( + } + if (init) { // whichever initMode + this.brokerController.getConsumerOffsetManager().commitOffset( "getPopOffset", group, topic, queueId, offset); - } } return offset; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index e3ba492f28..8074af23bf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -125,7 +125,7 @@ public class PopReviveService extends ServiceThread { msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime())); } msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId()); + addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId()); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus()); if (brokerController.getBrokerConfig().isEnablePopLog()) { @@ -153,7 +153,7 @@ public class PopReviveService extends ServiceThread { } } - private void addRetryTopicIfNoExit(String topic, String consumerGroup) { + public void addRetryTopicIfNotExist(String topic, String consumerGroup) { if (brokerController != null) { TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); if (topicConfig != null) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java index d8c8fa1034..8a2ce8a2ba 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.message.MessageDecoder; @@ -40,6 +41,7 @@ import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.DefaultMappedFile; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,6 +55,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -151,14 +154,70 @@ public class PopMessageProcessorTest { assertThat(response.getRemark()).contains("pop message is forbidden because timerWheelEnable is false"); } + @Test + public void testGetInitOffset_retryTopic() throws RemotingCommandException { + when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig()); + String newGroup = group + "-" + System.currentTimeMillis(); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, newGroup); + long minOffset = 100L; + when(messageStore.getMinOffsetInQueue(retryTopic, 0)).thenReturn(minOffset); + brokerController.getTopicConfigManager().getTopicConfigTable().put(retryTopic, new TopicConfig(retryTopic, 1, 1)); + GetMessageResult getMessageResult = createGetMessageResult(0); + when(messageStore.getMessageAsync(eq(newGroup), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(getMessageResult)); + + long offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); + Assert.assertEquals(-1, offset); + + RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, ConsumeInitMode.MAX); + popMessageProcessor.processRequest(handlerContext, request); + offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); + Assert.assertEquals(minOffset, offset); + + when(messageStore.getMinOffsetInQueue(retryTopic, 0)).thenReturn(minOffset * 2); + popMessageProcessor.processRequest(handlerContext, request); + offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); + Assert.assertEquals(minOffset, offset); // will not entry getInitOffset() again + messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent UnnecessaryStubbingException + } + + @Test + public void testGetInitOffset_normalTopic() throws RemotingCommandException { + long maxOffset = 999L; + when(messageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig()); + when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset); + String newGroup = group + "-" + System.currentTimeMillis(); + GetMessageResult getMessageResult = createGetMessageResult(0); + when(messageStore.getMessageAsync(eq(newGroup), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(getMessageResult)); + + long offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); + Assert.assertEquals(-1, offset); + + RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, ConsumeInitMode.MAX); + popMessageProcessor.processRequest(handlerContext, request); + offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); + Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false + + when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset * 2); + popMessageProcessor.processRequest(handlerContext, request); + offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); + Assert.assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() again + messageStore.getMaxOffsetInQueue(topic, 0); // prevent UnnecessaryStubbingException + } + private RemotingCommand createPopMsgCommand() { + return createPopMsgCommand(group, topic, -1, ConsumeInitMode.MAX); + } + + private RemotingCommand createPopMsgCommand(String group, String topic, int queueId, int initMode) { PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); requestHeader.setConsumerGroup(group); requestHeader.setMaxMsgNums(30); - requestHeader.setQueueId(-1); + requestHeader.setQueueId(queueId); requestHeader.setTopic(topic); requestHeader.setInvisibleTime(10_000); - requestHeader.setInitMode(ConsumeInitMode.MAX); + requestHeader.setInitMode(initMode); requestHeader.setOrder(false); requestHeader.setPollTime(15_000); requestHeader.setBornTime(System.currentTimeMillis());