This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 23cc24cc6e [ISSUE #8947] Notify pop request before calculate consumer 
lag (#8949)
23cc24cc6e is described below

commit 23cc24cc6e2fa33b9be2c434c42dee3a54e726a4
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Nov 20 14:57:35 2024 +0800

    [ISSUE #8947] Notify pop request before calculate consumer lag (#8949)
---
 .../longpolling/NotifyMessageArrivingListener.java |  4 +-
 .../broker/longpolling/PopCommandCallback.java     | 49 +++++++++++++++++
 .../broker/longpolling/PopLongPollingService.java  | 49 +++++++++++++----
 .../broker/metrics/ConsumerLagCalculator.java      | 62 ++++++++++++++--------
 .../broker/processor/NotificationProcessor.java    |  4 +-
 .../broker/processor/PopMessageProcessor.java      | 18 +++++--
 .../longpolling/PopLongPollingServiceTest.java     | 11 ++--
 .../org/apache/rocketmq/common/BrokerConfig.java   | 19 +++++++
 .../apache/rocketmq/remoting/CommandCallback.java  | 22 ++++++++
 .../remoting/protocol/RemotingCommand.java         | 11 ++++
 10 files changed, 206 insertions(+), 43 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index 1ddb9f4f8e..9c0ee89e4d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -40,8 +40,8 @@ public class NotifyMessageArrivingListener implements 
MessageArrivingListener {
         this.pullRequestHoldService.notifyMessageArriving(
             topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, 
properties);
         this.popMessageProcessor.notifyMessageArriving(
-            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+            topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, 
properties);
         this.notificationProcessor.notifyMessageArriving(
-            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+            topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, 
properties);
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
new file mode 100644
index 0000000000..2e190e20f9
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.longpolling;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
+import org.apache.rocketmq.remoting.CommandCallback;
+
+public class PopCommandCallback implements CommandCallback {
+
+    private final BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
+        Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer;
+
+    private final ConsumerLagCalculator.ProcessGroupInfo info;
+    private final Consumer<ConsumerLagCalculator.CalculateLagResult> 
lagRecorder;
+
+
+    public PopCommandCallback(
+        BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
+                    Consumer<ConsumerLagCalculator.CalculateLagResult>> 
biConsumer,
+        ConsumerLagCalculator.ProcessGroupInfo info,
+        Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder) {
+
+        this.biConsumer = biConsumer;
+        this.info = info;
+        this.lagRecorder = lagRecorder;
+    }
+
+    @Override
+    public void accept() {
+        biConsumer.accept(info, lagRecorder);
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index b5179114f3..91185fbe94 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.longpolling;
 
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.CommandCallback;
 import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
@@ -45,6 +47,7 @@ import static 
org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC;
 import static 
org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT;
 
 public class PopLongPollingService extends ServiceThread {
+
     private static final Logger POP_LOGGER =
         LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -150,10 +153,10 @@ public class PopLongPollingService extends ServiceThread {
     }
 
     public void notifyMessageArrivingWithRetryTopic(final String topic, final 
int queueId) {
-        this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L, 
null, null);
+        this.notifyMessageArrivingWithRetryTopic(topic, queueId, -1L, null, 
0L, null, null);
     }
 
-    public void notifyMessageArrivingWithRetryTopic(final String topic, final 
int queueId,
+    public void notifyMessageArrivingWithRetryTopic(final String topic, final 
int queueId, long offset,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         String notifyTopic;
         if (KeyBuilder.isPopRetryTopicV2(topic)) {
@@ -161,25 +164,37 @@ public class PopLongPollingService extends ServiceThread {
         } else {
             notifyTopic = topic;
         }
-        notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime, 
filterBitMap, properties);
+        notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, 
msgStoreTime, filterBitMap, properties);
     }
 
-    public void notifyMessageArriving(final String topic, final int queueId,
+    public void notifyMessageArriving(final String topic, final int queueId, 
long offset,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
         if (cids == null) {
             return;
         }
+        long interval = 
brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval();
+        boolean force = interval > 0L && offset % interval == 0L;
         for (Map.Entry<String, Byte> cid : cids.entrySet()) {
             if (queueId >= 0) {
-                notifyMessageArriving(topic, -1, cid.getKey(), tagsCode, 
msgStoreTime, filterBitMap, properties);
+                notifyMessageArriving(topic, -1, cid.getKey(), force, 
tagsCode, msgStoreTime, filterBitMap, properties);
             }
-            notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode, 
msgStoreTime, filterBitMap, properties);
+            notifyMessageArriving(topic, queueId, cid.getKey(), force, 
tagsCode, msgStoreTime, filterBitMap, properties);
         }
     }
 
     public boolean notifyMessageArriving(final String topic, final int 
queueId, final String cid,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
+        return notifyMessageArriving(topic, queueId, cid, false, tagsCode, 
msgStoreTime, filterBitMap, properties, null);
+    }
+
+    public boolean notifyMessageArriving(final String topic, final int 
queueId, final String cid, boolean force,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
+        return notifyMessageArriving(topic, queueId, cid, force, tagsCode, 
msgStoreTime, filterBitMap, properties, null);
+    }
+
+    public boolean notifyMessageArriving(final String topic, final int 
queueId, final String cid, boolean force,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties, CommandCallback callback) {
         ConcurrentSkipListSet<PopRequest> remotingCommands = 
pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
         if (remotingCommands == null || remotingCommands.isEmpty()) {
             return false;
@@ -190,7 +205,7 @@ public class PopLongPollingService extends ServiceThread {
             return false;
         }
 
-        if (popRequest.getMessageFilter() != null && 
popRequest.getSubscriptionData() != null) {
+        if (!force && popRequest.getMessageFilter() != null && 
popRequest.getSubscriptionData() != null) {
             boolean match = 
popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                 new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, 
filterBitMap));
             if (match && properties != null) {
@@ -206,16 +221,30 @@ public class PopLongPollingService extends ServiceThread {
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", 
popRequest);
         }
-        return wakeUp(popRequest);
+
+        return wakeUp(popRequest, callback);
     }
 
     public boolean wakeUp(final PopRequest request) {
+        return wakeUp(request, null);
+    }
+
+    public boolean wakeUp(final PopRequest request, CommandCallback callback) {
         if (request == null || !request.complete()) {
             return false;
         }
+
+        if (callback != null && request.getRemotingCommand() != null) {
+            if (request.getRemotingCommand().getCallbackList() == null) {
+                request.getRemotingCommand().setCallbackList(new 
ArrayList<>());
+            }
+            request.getRemotingCommand().getCallbackList().add(callback);
+        }
+
         if (!request.getCtx().channel().isActive()) {
             return false;
         }
+
         Runnable run = () -> {
             try {
                 final RemotingCommand response = 
processor.processRequest(request.getCtx(), request.getRemotingCommand());
@@ -234,7 +263,9 @@ public class PopLongPollingService extends ServiceThread {
                 POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
             }
         };
-        this.brokerController.getPullMessageExecutor().submit(new 
RequestTask(run, request.getChannel(), request.getRemotingCommand()));
+
+        this.brokerController.getPullMessageExecutor().submit(
+            new RequestTask(run, request.getChannel(), 
request.getRemotingCommand()));
         return true;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 3ac6528b2a..1b898f95de 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.longpolling.PopCommandCallback;
+import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.processor.PopBufferMergeService;
 import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
@@ -51,6 +53,7 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
 
 public class ConsumerLagCalculator {
+
     private final BrokerConfig brokerConfig;
     private final TopicConfigManager topicConfigManager;
     private final ConsumerManager consumerManager;
@@ -59,6 +62,7 @@ public class ConsumerLagCalculator {
     private final SubscriptionGroupManager subscriptionGroupManager;
     private final MessageStore messageStore;
     private final PopBufferMergeService popBufferMergeService;
+    private final PopLongPollingService popLongPollingService;
     private final PopInflightMessageCounter popInflightMessageCounter;
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -72,10 +76,11 @@ public class ConsumerLagCalculator {
         this.subscriptionGroupManager = 
brokerController.getSubscriptionGroupManager();
         this.messageStore = brokerController.getMessageStore();
         this.popBufferMergeService = 
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+        this.popLongPollingService = 
brokerController.getPopMessageProcessor().getPopLongPollingService();
         this.popInflightMessageCounter = 
brokerController.getPopInflightMessageCounter();
     }
 
-    private static class ProcessGroupInfo {
+    public static class ProcessGroupInfo {
         public String group;
         public String topic;
         public boolean isPop;
@@ -211,34 +216,44 @@ public class ConsumerLagCalculator {
                 return;
             }
 
-            CalculateLagResult result = new CalculateLagResult(info.group, 
info.topic, false);
+            if (info.isPop && 
brokerConfig.isEnableNotifyBeforePopCalculateLag()) {
+                if (popLongPollingService.notifyMessageArriving(info.topic, 
-1, info.group,
+                    true, null, 0, null, null, new 
PopCommandCallback(this::calculate, info, lagRecorder))) {
+                    return;
+                }
+            }
+
+            calculate(info, lagRecorder);
+        });
+    }
 
+    public void calculate(ProcessGroupInfo info, Consumer<CalculateLagResult> 
lagRecorder) {
+        CalculateLagResult result = new CalculateLagResult(info.group, 
info.topic, false);
+        try {
+            Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic, 
info.isPop);
+            if (lag != null) {
+                result.lag = lag.getObject1();
+                result.earliestUnconsumedTimestamp = lag.getObject2();
+            }
+            lagRecorder.accept(result);
+        } catch (ConsumeQueueException e) {
+            LOGGER.error("Failed to get lag stats", e);
+        }
+
+        if (info.isPop) {
             try {
-                Pair<Long, Long> lag = getConsumerLagStats(info.group, 
info.topic, info.isPop);
-                if (lag != null) {
-                    result.lag = lag.getObject1();
-                    result.earliestUnconsumedTimestamp = lag.getObject2();
+                Pair<Long, Long> retryLag = getConsumerLagStats(info.group, 
info.retryTopic, true);
+
+                result = new CalculateLagResult(info.group, info.topic, true);
+                if (retryLag != null) {
+                    result.lag = retryLag.getObject1();
+                    result.earliestUnconsumedTimestamp = retryLag.getObject2();
                 }
                 lagRecorder.accept(result);
             } catch (ConsumeQueueException e) {
                 LOGGER.error("Failed to get lag stats", e);
             }
-
-            if (info.isPop) {
-                try {
-                    Pair<Long, Long> retryLag = 
getConsumerLagStats(info.group, info.retryTopic, true);
-
-                    result = new CalculateLagResult(info.group, info.topic, 
true);
-                    if (retryLag != null) {
-                        result.lag = retryLag.getObject1();
-                        result.earliestUnconsumedTimestamp = 
retryLag.getObject2();
-                    }
-                    lagRecorder.accept(result);
-                } catch (ConsumeQueueException e) {
-                    LOGGER.error("Failed to get lag stats", e);
-                }
-            }
-        });
+        }
     }
 
     public void calculateInflight(Consumer<CalculateInflightResult> 
inflightRecorder) {
@@ -320,6 +335,9 @@ public class ConsumerLagCalculator {
             earliestUnconsumedTimestamp = 0L;
         }
 
+        LOGGER.debug("GetConsumerLagStats, topic={}, group={}, lag={}, 
latency={}", topic, group, total,
+            earliestUnconsumedTimestamp > 0 ? System.currentTimeMillis() - 
earliestUnconsumedTimestamp : 0);
+
         return new Pair<>(total, earliestUnconsumedTimestamp);
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 75c77b6d79..b4ebd9c4a9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -62,10 +62,10 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
 
     // When a new message is written to CommitLog, this method would be called.
     // Suspended long polling will receive notification and be wakeup.
-    public void notifyMessageArriving(final String topic, final int queueId,
+    public void notifyMessageArriving(final String topic, final int queueId, 
long offset,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         this.popLongPollingService.notifyMessageArrivingWithRetryTopic(
-            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+            topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, 
properties);
     }
 
     public void notifyMessageArriving(final String topic, final int queueId) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index fe8ccb03dc..e0454afa3c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -65,6 +65,7 @@ import 
org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.CommandCallback;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
@@ -97,8 +98,10 @@ import static 
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL
 import static 
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
 
 public class PopMessageProcessor implements NettyRequestProcessor {
+
     private static final Logger POP_LOGGER =
         LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+
     private final BrokerController brokerController;
     private final Random random = new Random(System.currentTimeMillis());
     String reviveTopic;
@@ -196,15 +199,15 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
     }
 
-    public void notifyMessageArriving(final String topic, final int queueId,
+    public void notifyMessageArriving(final String topic, final int queueId, 
long offset,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         popLongPollingService.notifyMessageArrivingWithRetryTopic(
-            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+            topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, 
properties);
     }
 
     public void notifyMessageArriving(final String topic, final int queueId, 
final String cid) {
         popLongPollingService.notifyMessageArriving(
-            topic, queueId, cid, null, 0L, null, null);
+            topic, queueId, cid, false, null, 0L, null, null);
     }
 
     @Override
@@ -419,6 +422,15 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         final RemotingCommand finalResponse = response;
         SubscriptionData finalSubscriptionData = subscriptionData;
         getMessageFuture.thenApply(restNum -> {
+            try {
+                if (request.getCallbackList() != null) {
+                    request.getCallbackList().forEach(CommandCallback::accept);
+                    request.getCallbackList().clear();
+                }
+            } catch (Throwable t) {
+                POP_LOGGER.error("PopProcessor execute callback error", t);
+            }
+
             if (!getMessageResult.getMessageBufferList().isEmpty()) {
                 finalResponse.setCode(ResponseCode.SUCCESS);
                 getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 6527beeb68..1f064ec05d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -80,21 +80,22 @@ public class PopLongPollingServiceTest {
     @Test
     public void testNotifyMessageArrivingWithRetryTopic() {
         int queueId = 0;
-        
doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic,
 queueId, null, 0L, null, null);
+        
doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic,
 queueId, -1L, null, 0L, null, null);
         
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId);
-        verify(popLongPollingService, 
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, null, 0L, 
null, null);
+        verify(popLongPollingService, 
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 
0L, null, null);
     }
     
     @Test
     public void testNotifyMessageArriving() {
         int queueId = 0;
         Long tagsCode = 123L;
+        long offset = 123L;
         long msgStoreTime = System.currentTimeMillis();
         byte[] filterBitMap = new byte[]{0x01};
         Map<String, String> properties = new ConcurrentHashMap<>();
-        
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
-        
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
-        verify(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+        
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
+        
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, 
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
+        verify(popLongPollingService).notifyMessageArriving(defaultTopic, 
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
     }
     
     @Test
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index c651047661..f459abf0db 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -227,6 +227,9 @@ public class BrokerConfig extends BrokerIdentity {
     private int popCkMaxBufferSize = 200000;
     private int popCkOffsetMaxQueueSize = 20000;
     private boolean enablePopBatchAck = false;
+    // set the interval to the maxFilterMessageSize in MessageStoreConfig 
divided by the cq unit size
+    private long popLongPollingForceNotifyInterval = 800;
+    private boolean enableNotifyBeforePopCalculateLag = true;
     private boolean enableNotifyAfterPopOrderLockRelease = true;
     private boolean initPopOffsetByCheckMsgInMem = true;
     // read message from pop retry topic v1, for the compatibility, will be 
removed in the future version
@@ -1326,6 +1329,22 @@ public class BrokerConfig extends BrokerIdentity {
         this.enableNetWorkFlowControl = enableNetWorkFlowControl;
     }
 
+    public long getPopLongPollingForceNotifyInterval() {
+        return popLongPollingForceNotifyInterval;
+    }
+
+    public void setPopLongPollingForceNotifyInterval(long 
popLongPollingForceNotifyInterval) {
+        this.popLongPollingForceNotifyInterval = 
popLongPollingForceNotifyInterval;
+    }
+
+    public boolean isEnableNotifyBeforePopCalculateLag() {
+        return enableNotifyBeforePopCalculateLag;
+    }
+
+    public void setEnableNotifyBeforePopCalculateLag(boolean 
enableNotifyBeforePopCalculateLag) {
+        this.enableNotifyBeforePopCalculateLag = 
enableNotifyBeforePopCalculateLag;
+    }
+
     public boolean isEnableNotifyAfterPopOrderLockRelease() {
         return enableNotifyAfterPopOrderLockRelease;
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java
new file mode 100644
index 0000000000..884f3d9e5d
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCallback.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public interface CommandCallback {
+
+    void accept();
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 5de48350cf..9b2b0f07b4 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.CommandCallback;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -96,6 +98,7 @@ public class RemotingCommand {
     private transient byte[] body;
     private boolean suspended;
     private transient Stopwatch processTimer;
+    private transient List<CommandCallback> callbackList;
 
     protected RemotingCommand() {
     }
@@ -639,4 +642,12 @@ public class RemotingCommand {
     public void setProcessTimer(Stopwatch processTimer) {
         this.processTimer = processTimer;
     }
+
+    public List<CommandCallback> getCallbackList() {
+        return callbackList;
+    }
+
+    public void setCallbackList(List<CommandCallback> callbackList) {
+        this.callbackList = callbackList;
+    }
 }

Reply via email to