This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 04683ec058 [ISSUE 7117] check message is in memory or not when init 
consumer offset for pop (#7118)
04683ec058 is described below

commit 04683ec05808d63f742f8702a9bd3a2fb846c154
Author: lk <xdk...@outlook.com>
AuthorDate: Wed Aug 9 19:08:33 2023 +0800

    [ISSUE 7117] check message is in memory or not when init consumer offset 
for pop (#7118)
---
 .../broker/processor/AckMessageProcessor.java      |  1 -
 .../broker/processor/PopMessageProcessor.java      | 40 ++++++++++++++--------
 .../org/apache/rocketmq/common/BrokerConfig.java   |  9 +++++
 .../proxy/service/route/TopicRouteService.java     |  2 +-
 4 files changed, 36 insertions(+), 16 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 2140aa881c..687811409e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -308,7 +308,6 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
                 && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("put ack msg error:" + putMessageResult);
         }
-        System.out.printf("put ack to store %s", ackMsg);
         PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
         
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 53e1725614..441f7de08a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -639,20 +639,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 
         long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, 
queueId);
         if (offset < 0) {
-            if (ConsumeInitMode.MIN == initMode) {
-                offset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
-            } else {
-                // pop last one,then commit offset.
-                offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
-                // max & no consumer offset
-                if (offset < 0) {
-                    offset = 0;
-                }
-                if (init) {
-                    
this.brokerController.getConsumerOffsetManager().commitOffset(
-                        "getPopOffset", group, topic, queueId, offset);
-                }
-            }
+            offset = this.getInitOffset(topic, group, queueId, initMode, init);
         }
 
         if (checkResetOffset) {
@@ -670,6 +657,31 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
     }
 
+    private long getInitOffset(String topic, String group, int queueId, int 
initMode, boolean init) {
+        long offset;
+        if (ConsumeInitMode.MIN == initMode) {
+            return 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+        } else {
+            if 
(this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() &&
+                
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 
0 &&
+                
this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, 
queueId, 0, 1)) {
+                offset = 0;
+            } else {
+                // pop last one,then commit offset.
+                offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
+                // max & no consumer offset
+                if (offset < 0) {
+                    offset = 0;
+                }
+            }
+            if (init) {
+                this.brokerController.getConsumerOffsetManager().commitOffset(
+                    "getPopOffset", group, topic, queueId, offset);
+            }
+        }
+        return offset;
+    }
+
     public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, 
final int reviveQid) {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 02c692e2b2..a815636b18 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -222,6 +222,7 @@ public class BrokerConfig extends BrokerIdentity {
     private int popCkOffsetMaxQueueSize = 20000;
     private boolean enablePopBatchAck = false;
     private boolean enableNotifyAfterPopOrderLockRelease = true;
+    private boolean initPopOffsetByCheckMsgInMem = true;
 
     private boolean realTimeNotifyConsumerChange = true;
 
@@ -1264,6 +1265,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.enableNotifyAfterPopOrderLockRelease = 
enableNotifyAfterPopOrderLockRelease;
     }
 
+    public boolean isInitPopOffsetByCheckMsgInMem() {
+        return initPopOffsetByCheckMsgInMem;
+    }
+
+    public void setInitPopOffsetByCheckMsgInMem(boolean 
initPopOffsetByCheckMsgInMem) {
+        this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
+    }
+
     public boolean isRealTimeNotifyConsumerChange() {
         return realTimeNotifyConsumerChange;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index b6b14faa49..e012a5465a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -133,7 +133,7 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
     protected MessageQueueView buildMessageQueueView(String topic, 
TopicRouteData topicRouteData) {
         if (isTopicRouteValid(topicRouteData)) {
             MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
-            log.info("load topic route from namesrv. topic: {}, queue: {}", 
topic, tmp);
+            log.debug("load topic route from namesrv. topic: {}, queue: {}", 
topic, tmp);
             return tmp;
         }
         return MessageQueueView.WRAPPED_EMPTY_QUEUE;

Reply via email to