This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 054f910957 [ISSUE #9246] Support init offset mode in PopConsumerService (#9247) 054f910957 is described below commit 054f910957fa92e0c78f59b014a22107ce8b92ca Author: lizhimins <707364...@qq.com> AuthorDate: Sat Mar 15 15:02:06 2025 +0800 [ISSUE #9246] Support init offset mode in PopConsumerService (#9247) --- .../rocketmq/broker/pop/PopConsumerContext.java | 9 +++++++- .../rocketmq/broker/pop/PopConsumerService.java | 27 ++++++++++++++-------- .../broker/processor/PopMessageProcessor.java | 5 ++-- .../broker/pop/PopConsumerContextTest.java | 3 ++- .../broker/pop/PopConsumerServiceTest.java | 9 ++++---- 5 files changed, 36 insertions(+), 17 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java index 09bc4e6b47..0ad8bacab1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java @@ -35,6 +35,8 @@ public class PopConsumerContext { private final boolean fifo; + private final int initMode; + private final String attemptId; private final AtomicLong restCount; @@ -50,13 +52,14 @@ public class PopConsumerContext { private List<PopConsumerRecord> popConsumerRecordList; public PopConsumerContext(String clientHost, - long popTime, long invisibleTime, String groupId, boolean fifo, String attemptId) { + long popTime, long invisibleTime, String groupId, boolean fifo, int initMode, String attemptId) { this.clientHost = clientHost; this.popTime = popTime; this.invisibleTime = invisibleTime; this.groupId = groupId; this.fifo = fifo; + this.initMode = initMode; this.attemptId = attemptId; this.restCount = new AtomicLong(0); this.startOffsetInfo = new StringBuilder(); @@ -120,6 +123,10 @@ public class PopConsumerContext { return fifo; } + public int getInitMode() { + return initMode; + } + public long getInvisibleTime() { return invisibleTime; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 1f0125412a..dde13a5ed7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageAccessor; @@ -197,7 +198,18 @@ public class PopConsumerService extends ServiceThread { return context; } - public Long getPopOffset(String groupId, String topicId, int queueId) { + public long getPopOffset(String groupId, String topicId, int queueId, int initMode) { + long offset = this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId); + if (offset < 0L) { + try { + offset = this.brokerController.getPopMessageProcessor() + .getInitOffset(topicId, groupId, queueId, initMode, true); + log.info("PopConsumerService init offset, groupId={}, topicId={}, queueId={}, init={}, offset={}", + groupId, topicId, queueId, ConsumeInitMode.MIN == initMode ? "min" : "max", offset); + } catch (ConsumeQueueException e) { + throw new RuntimeException(e); + } + } Long resetOffset = this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId); if (resetOffset != null) { @@ -206,7 +218,7 @@ public class PopConsumerService extends ServiceThread { this.brokerController.getConsumerOffsetManager() .commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset); } - return resetOffset; + return resetOffset != null ? resetOffset : offset; } public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost, @@ -215,9 +227,6 @@ public class PopConsumerService extends ServiceThread { log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}", groupId, topicId, offset, queueId, batchSize, filter != null); - Long resetOffset = this.getPopOffset(groupId, topicId, queueId); - final long currentOffset = resetOffset != null ? resetOffset : offset; - CompletableFuture<GetMessageResult> getMessageFuture = brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter); @@ -240,7 +249,7 @@ public class PopConsumerService extends ServiceThread { log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " + "groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}", - groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset()); + groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset()); return brokerController.getMessageStore().getMessageAsync( groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter); @@ -299,7 +308,7 @@ public class PopConsumerService extends ServiceThread { result.addRestCount(this.getPendingFilterCount(groupId, topicId, queueId)); return CompletableFuture.completedFuture(result); } else { - long consumeOffset = brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId); + final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode()); return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter) .thenApply(getMessageResult -> addGetMessageResult( result, getMessageResult, topicId, queueId, retryType, consumeOffset)); @@ -308,11 +317,11 @@ public class PopConsumerService extends ServiceThread { } public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime, - String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, + String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode, MessageFilter filter) { PopConsumerContext popConsumerContext = - new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, attemptId); + new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId); TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId); if (topicConfig == null || !consumerLockService.tryLock(groupId, topicId)) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index b84afe2194..dd8314b7e0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -383,7 +383,8 @@ public class PopMessageProcessor implements NettyRequestProcessor { CompletableFuture<PopConsumerContext> popAsyncFuture = brokerController.getPopConsumerService().popAsync( RemotingHelper.parseChannelRemoteAddr(channel), beginTimeMills, requestHeader.getInvisibleTime(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getAttemptId(), messageFilter); + requestHeader.getMaxMsgNums(), requestHeader.isOrder(), + requestHeader.getAttemptId(), requestHeader.getInitMode(), messageFilter); popAsyncFuture.thenApply(result -> { if (result.isFound()) { @@ -888,7 +889,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { } } - private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) + public long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) throws ConsumeQueueException { long offset; if (ConsumeInitMode.MIN == initMode || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java index 554933eabc..6f009423f9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.pop; +import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -29,7 +30,7 @@ public class PopConsumerContextTest { public void consumerContextTest() { long popTime = System.currentTimeMillis(); PopConsumerContext context = new PopConsumerContext("127.0.0.1:6789", - popTime, 20_000, "GroupId", true, "attemptId"); + popTime, 20_000, "GroupId", true, ConsumeInitMode.MIN, "attemptId"); Assert.assertFalse(context.isFound()); Assert.assertEquals("127.0.0.1:6789", context.getClientHost()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 2b930d5852..7fb619f740 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -190,7 +191,7 @@ public class PopConsumerServiceTest { @Test public void addGetMessageResultTest() { PopConsumerContext context = new PopConsumerContext( - clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId); + clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId); GetMessageResult result = new GetMessageResult(); result.setStatus(GetMessageStatus.FOUND); result.getMessageQueueOffset().add(100L); @@ -231,7 +232,7 @@ public class PopConsumerServiceTest { // fifo block PopConsumerContext context = new PopConsumerContext( - clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId); + clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId); consumerService.setFifoBlocked(context, groupId, topicId, queueId, Collections.singletonList(100L)); Mockito.when(brokerController.getConsumerOrderInfoManager() .checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong())).thenReturn(true); @@ -257,7 +258,7 @@ public class PopConsumerServiceTest { // fifo block test context = new PopConsumerContext( - clientHost, System.currentTimeMillis(), 20000, groupId, true, attemptId); + clientHost, System.currentTimeMillis(), 20000, groupId, true, ConsumeInitMode.MIN, attemptId); future = CompletableFuture.completedFuture(context); Assert.assertEquals(0L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId, 10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount()); @@ -306,7 +307,7 @@ public class PopConsumerServiceTest { // pop broker consumerServiceSpy.popAsync(clientHost, System.currentTimeMillis(), - 20000, groupId, topicId, -1, 10, false, attemptId, null).join(); + 20000, groupId, topicId, -1, 10, false, attemptId, ConsumeInitMode.MIN, null).join(); } @Test