This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 054f910957 [ISSUE #9246] Support init offset mode in 
PopConsumerService (#9247)
054f910957 is described below

commit 054f910957fa92e0c78f59b014a22107ce8b92ca
Author: lizhimins <707364...@qq.com>
AuthorDate: Sat Mar 15 15:02:06 2025 +0800

    [ISSUE #9246] Support init offset mode in PopConsumerService (#9247)
---
 .../rocketmq/broker/pop/PopConsumerContext.java    |  9 +++++++-
 .../rocketmq/broker/pop/PopConsumerService.java    | 27 ++++++++++++++--------
 .../broker/processor/PopMessageProcessor.java      |  5 ++--
 .../broker/pop/PopConsumerContextTest.java         |  3 ++-
 .../broker/pop/PopConsumerServiceTest.java         |  9 ++++----
 5 files changed, 36 insertions(+), 17 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java
index 09bc4e6b47..0ad8bacab1 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java
@@ -35,6 +35,8 @@ public class PopConsumerContext {
 
     private final boolean fifo;
 
+    private final int initMode;
+
     private final String attemptId;
 
     private final AtomicLong restCount;
@@ -50,13 +52,14 @@ public class PopConsumerContext {
     private List<PopConsumerRecord> popConsumerRecordList;
 
     public PopConsumerContext(String clientHost,
-        long popTime, long invisibleTime, String groupId, boolean fifo, String 
attemptId) {
+        long popTime, long invisibleTime, String groupId, boolean fifo, int 
initMode, String attemptId) {
 
         this.clientHost = clientHost;
         this.popTime = popTime;
         this.invisibleTime = invisibleTime;
         this.groupId = groupId;
         this.fifo = fifo;
+        this.initMode = initMode;
         this.attemptId = attemptId;
         this.restCount = new AtomicLong(0);
         this.startOffsetInfo = new StringBuilder();
@@ -120,6 +123,10 @@ public class PopConsumerContext {
         return fifo;
     }
 
+    public int getInitMode() {
+        return initMode;
+    }
+
     public long getInvisibleTime() {
         return invisibleTime;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 1f0125412a..dde13a5ed7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -197,7 +198,18 @@ public class PopConsumerService extends ServiceThread {
         return context;
     }
 
-    public Long getPopOffset(String groupId, String topicId, int queueId) {
+    public long getPopOffset(String groupId, String topicId, int queueId, int 
initMode) {
+        long offset = 
this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, 
topicId, queueId);
+        if (offset < 0L) {
+            try {
+                offset = this.brokerController.getPopMessageProcessor()
+                    .getInitOffset(topicId, groupId, queueId, initMode, true);
+                log.info("PopConsumerService init offset, groupId={}, 
topicId={}, queueId={}, init={}, offset={}",
+                    groupId, topicId, queueId, ConsumeInitMode.MIN == initMode 
? "min" : "max", offset);
+            } catch (ConsumeQueueException e) {
+                throw new RuntimeException(e);
+            }
+        }
         Long resetOffset =
             
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId,
 groupId, queueId);
         if (resetOffset != null) {
@@ -206,7 +218,7 @@ public class PopConsumerService extends ServiceThread {
             this.brokerController.getConsumerOffsetManager()
                 .commitOffset("ResetPopOffset", groupId, topicId, queueId, 
resetOffset);
         }
-        return resetOffset;
+        return resetOffset != null ? resetOffset : offset;
     }
 
     public CompletableFuture<GetMessageResult> getMessageAsync(String 
clientHost,
@@ -215,9 +227,6 @@ public class PopConsumerService extends ServiceThread {
         log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, 
queueId={}, offset={}, batchSize={}, filter={}",
             groupId, topicId, offset, queueId, batchSize, filter != null);
 
-        Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
-        final long currentOffset = resetOffset != null ? resetOffset : offset;
-
         CompletableFuture<GetMessageResult> getMessageFuture =
             brokerController.getMessageStore().getMessageAsync(groupId, 
topicId, queueId, offset, batchSize, filter);
 
@@ -240,7 +249,7 @@ public class PopConsumerService extends ServiceThread {
 
                 log.warn("PopConsumerService getMessageAsync, initial offset 
because store is no correct, " +
                         "groupId={}, topicId={}, queueId={}, batchSize={}, 
offset={}->{}",
-                    groupId, topicId, queueId, batchSize, currentOffset, 
result.getNextBeginOffset());
+                    groupId, topicId, queueId, batchSize, offset, 
result.getNextBeginOffset());
 
                 return brokerController.getMessageStore().getMessageAsync(
                     groupId, topicId, queueId, result.getNextBeginOffset(), 
batchSize, filter);
@@ -299,7 +308,7 @@ public class PopConsumerService extends ServiceThread {
                 result.addRestCount(this.getPendingFilterCount(groupId, 
topicId, queueId));
                 return CompletableFuture.completedFuture(result);
             } else {
-                long consumeOffset = 
brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, 
queueId);
+                final long consumeOffset = this.getPopOffset(groupId, topicId, 
queueId, result.getInitMode());
                 return getMessageAsync(clientHost, groupId, topicId, queueId, 
consumeOffset, remain, filter)
                     .thenApply(getMessageResult -> addGetMessageResult(
                         result, getMessageResult, topicId, queueId, retryType, 
consumeOffset));
@@ -308,11 +317,11 @@ public class PopConsumerService extends ServiceThread {
     }
 
     public CompletableFuture<PopConsumerContext> popAsync(String clientHost, 
long popTime, long invisibleTime,
-        String groupId, String topicId, int queueId, int batchSize, boolean 
fifo, String attemptId,
+        String groupId, String topicId, int queueId, int batchSize, boolean 
fifo, String attemptId, int initMode,
         MessageFilter filter) {
 
         PopConsumerContext popConsumerContext =
-            new PopConsumerContext(clientHost, popTime, invisibleTime, 
groupId, fifo, attemptId);
+            new PopConsumerContext(clientHost, popTime, invisibleTime, 
groupId, fifo, initMode, attemptId);
 
         TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(topicId);
         if (topicConfig == null || !consumerLockService.tryLock(groupId, 
topicId)) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index b84afe2194..dd8314b7e0 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -383,7 +383,8 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             CompletableFuture<PopConsumerContext> popAsyncFuture = 
brokerController.getPopConsumerService().popAsync(
                 RemotingHelper.parseChannelRemoteAddr(channel), 
beginTimeMills, requestHeader.getInvisibleTime(),
                 requestHeader.getConsumerGroup(), requestHeader.getTopic(), 
requestHeader.getQueueId(),
-                requestHeader.getMaxMsgNums(), requestHeader.isOrder(), 
requestHeader.getAttemptId(), messageFilter);
+                requestHeader.getMaxMsgNums(), requestHeader.isOrder(),
+                requestHeader.getAttemptId(), requestHeader.getInitMode(), 
messageFilter);
 
             popAsyncFuture.thenApply(result -> {
                 if (result.isFound()) {
@@ -888,7 +889,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
     }
 
-    private long getInitOffset(String topic, String group, int queueId, int 
initMode, boolean init)
+    public long getInitOffset(String topic, String group, int queueId, int 
initMode, boolean init)
         throws ConsumeQueueException {
         long offset;
         if (ConsumeInitMode.MIN == initMode || 
topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java
index 554933eabc..6f009423f9 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.pop;
 
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -29,7 +30,7 @@ public class PopConsumerContextTest {
     public void consumerContextTest() {
         long popTime = System.currentTimeMillis();
         PopConsumerContext context = new PopConsumerContext("127.0.0.1:6789",
-            popTime, 20_000, "GroupId", true, "attemptId");
+            popTime, 20_000, "GroupId", true, ConsumeInitMode.MIN, 
"attemptId");
 
         Assert.assertFalse(context.isFound());
         Assert.assertEquals("127.0.0.1:6789", context.getClientHost());
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 2b930d5852..7fb619f740 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -190,7 +191,7 @@ public class PopConsumerServiceTest {
     @Test
     public void addGetMessageResultTest() {
         PopConsumerContext context = new PopConsumerContext(
-            clientHost, System.currentTimeMillis(), 20000, groupId, false, 
attemptId);
+            clientHost, System.currentTimeMillis(), 20000, groupId, false, 
ConsumeInitMode.MIN, attemptId);
         GetMessageResult result = new GetMessageResult();
         result.setStatus(GetMessageStatus.FOUND);
         result.getMessageQueueOffset().add(100L);
@@ -231,7 +232,7 @@ public class PopConsumerServiceTest {
 
         // fifo block
         PopConsumerContext context = new PopConsumerContext(
-            clientHost, System.currentTimeMillis(), 20000, groupId, false, 
attemptId);
+            clientHost, System.currentTimeMillis(), 20000, groupId, false, 
ConsumeInitMode.MIN, attemptId);
         consumerService.setFifoBlocked(context, groupId, topicId, queueId, 
Collections.singletonList(100L));
         Mockito.when(brokerController.getConsumerOrderInfoManager()
             .checkBlock(anyString(), anyString(), anyString(), anyInt(), 
anyLong())).thenReturn(true);
@@ -257,7 +258,7 @@ public class PopConsumerServiceTest {
 
         // fifo block test
         context = new PopConsumerContext(
-            clientHost, System.currentTimeMillis(), 20000, groupId, true, 
attemptId);
+            clientHost, System.currentTimeMillis(), 20000, groupId, true, 
ConsumeInitMode.MIN, attemptId);
         future = CompletableFuture.completedFuture(context);
         Assert.assertEquals(0L, consumerService.getMessageAsync(future, 
clientHost, groupId, topicId, queueId,
             10, null, 
PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount());
@@ -306,7 +307,7 @@ public class PopConsumerServiceTest {
 
         // pop broker
         consumerServiceSpy.popAsync(clientHost, System.currentTimeMillis(),
-            20000, groupId, topicId, -1, 10, false, attemptId, null).join();
+            20000, groupId, topicId, -1, 10, false, attemptId, 
ConsumeInitMode.MIN, null).join();
     }
 
     @Test

Reply via email to