This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d3de48c80 [ISSUE #5605]Introduce tag estimation for lag calculation 
(#5606)
d3de48c80 is described below

commit d3de48c806a8b0c87b73fe7a6d49054892845487
Author: SSpirits <[email protected]>
AuthorDate: Wed Nov 30 14:52:59 2022 +0800

    [ISSUE #5605]Introduce tag estimation for lag calculation (#5606)
    
    * Introduce tag estimation for lag calculation
    
    Co-authored-by: Li Zhanhui <[email protected]>
    
    * fixed according to review comments
    
    Co-authored-by: Li Zhanhui <[email protected]>
---
 .../org/apache/rocketmq/common/BrokerConfig.java   |  13 ++
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  91 ++++++++-
 .../apache/rocketmq/store/DefaultMessageStore.java |  31 +++
 .../org/apache/rocketmq/store/MappedFileQueue.java |  25 ++-
 .../org/apache/rocketmq/store/MessageStore.java    |  11 ++
 .../rocketmq/store/config/MessageStoreConfig.java  |  27 ++-
 .../rocketmq/store/queue/BatchConsumeQueue.java    | 116 ++++++++++-
 .../store/queue/ConsumeQueueInterface.java         |  11 ++
 .../store/queue/BatchConsumeMessageTest.java       | 103 ++++++++++
 .../rocketmq/store/queue/ConsumeQueueTest.java     | 205 +++++++++++++++++++-
 .../listener/rmq/concurrent/RMQBlockListener.java  |  60 ++++++
 .../rocketmq/test/util/MQAdminTestUtils.java       |  25 ++-
 .../org/apache/rocketmq/test/base/BaseConf.java    |   6 +-
 .../rocketmq/test/base/IntegrationTestBase.java    |  12 ++
 .../rocketmq/test/offset/LagCalculationIT.java     | 212 +++++++++++++++++++++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   7 +
 .../tools/admin/DefaultMQAdminExtImpl.java         |  10 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   8 +-
 18 files changed, 950 insertions(+), 23 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index b9c0975b0..8e78320f1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -385,6 +385,11 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean metricsInDelta = false;
 
+    /**
+     * Estimate accumulation or not when subscription filter type is tag and 
is not SUB_ALL.
+     */
+    private boolean estimateAccumulation = true;
+
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
     }
@@ -1584,4 +1589,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setTransactionOpBatchInterval(int transactionOpBatchInterval) {
         this.transactionOpBatchInterval = transactionOpBatchInterval;
     }
+
+    public boolean isEstimateAccumulation() {
+        return estimateAccumulation;
+    }
+
+    public void setEstimateAccumulation(boolean estimateAccumulation) {
+        this.estimateAccumulation = estimateAccumulation;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 5bbf773e4..3530b1c39 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -34,7 +34,6 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
@@ -45,6 +44,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
     public static final int CQ_STORE_UNIT_SIZE = 20;
+    public static final int MSG_TAG_OFFSET_INDEX = 12;
     private static final Logger LOG_ERROR = 
LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
 
     private final MessageStore messageStore;
@@ -1001,4 +1001,91 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
     public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
         mappedFileQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
     }
+
+    @Override
+    public long estimateMessageCount(long from, long to, MessageFilter filter) 
{
+        long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+        long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+        List<MappedFile> mappedFiles = 
mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+        if (mappedFiles.isEmpty()) {
+            return -1;
+        }
+
+        boolean sample = false;
+        long match = 0;
+        long raw = 0;
+
+        for (MappedFile mappedFile : mappedFiles) {
+            int start = 0;
+            int len = mappedFile.getFileSize();
+
+            // calculate start and len for first segment and last segment to 
reduce scanning
+            // first file segment
+            if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+                start = (int) (physicalOffsetFrom - 
mappedFile.getFileFromOffset());
+                if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() 
>= physicalOffsetTo) {
+                    // current mapped file covers search range completely.
+                    len = (int) (physicalOffsetTo - physicalOffsetFrom);
+                } else {
+                    len = mappedFile.getFileSize() - start;
+                }
+            }
+
+            // last file segment
+            if (0 == start && mappedFile.getFileFromOffset() + 
mappedFile.getFileSize() > physicalOffsetTo) {
+                len = (int) (physicalOffsetTo - 
mappedFile.getFileFromOffset());
+            }
+
+            // select partial data to scan
+            SelectMappedBufferResult slice = 
mappedFile.selectMappedBuffer(start, len);
+            if (null != slice) {
+                try {
+                    ByteBuffer buffer = slice.getByteBuffer();
+                    int current = 0;
+                    while (current < len) {
+                        // skip physicalOffset and message length fields.
+                        buffer.position(current + MSG_TAG_OFFSET_INDEX);
+                        long tagCode = buffer.getLong();
+                        ConsumeQueueExt.CqExtUnit ext = null;
+                        if (isExtWriteEnable()) {
+                            ext = consumeQueueExt.get(tagCode);
+                            tagCode = ext.getTagsCode();
+                        }
+                        if (filter.isMatchedByConsumeQueue(tagCode, ext)) {
+                            match++;
+                        }
+                        raw++;
+                        current += CQ_STORE_UNIT_SIZE;
+
+                        if (raw >= 
messageStore.getMessageStoreConfig().getMaxConsumeQueueScan()) {
+                            sample = true;
+                            break;
+                        }
+
+                        if (match > 
messageStore.getMessageStoreConfig().getSampleCountThreshold()) {
+                            sample = true;
+                            break;
+                        }
+                    }
+                } finally {
+                    slice.release();
+                }
+            }
+            // we have scanned enough entries, now is the time to return an 
educated guess.
+            if (sample) {
+                break;
+            }
+        }
+
+        long result = match;
+        if (sample) {
+            if (0 == raw) {
+                log.error("[BUG]. Raw should NOT be 0");
+                return 0;
+            }
+            result = (long) (match * (to - from) * 1.0 / raw);
+        }
+        log.debug("Result={}, raw={}, match={}, sample={}", result, raw, 
match, sample);
+        return result;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 31c1a2cb4..9b0c38656 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2666,4 +2666,35 @@ public class DefaultMessageStore implements MessageStore 
{
     public boolean isShutdown() {
         return shutdown;
     }
+
+    @Override
+    public long estimateMessageCount(String topic, int queueId, long from, 
long to, MessageFilter filter) {
+        if (from < 0) {
+            from = 0;
+        }
+
+        if (from >= to) {
+            return 0;
+        }
+
+        if (null == filter) {
+            return to - from;
+        }
+
+        ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+        if (null == consumeQueue) {
+            return 0;
+        }
+
+        // correct the "from" argument to min offset in queue if it is too 
small
+        long minOffset = consumeQueue.getMinOffsetInQueue();
+        if (from < minOffset) {
+            long diff = to - from;
+            from = minOffset;
+            to = from + diff;
+        }
+
+        long msgCount = consumeQueue.estimateMessageCount(from, to, filter);
+        return msgCount == -1 ? to - from : msgCount;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 0fc28ac52..af300c337 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.store;
 
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -26,8 +27,6 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Stream;
-
-import com.google.common.collect.Lists;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -780,4 +779,26 @@ public class MappedFileQueue implements Swappable {
     public String getStorePath() {
         return storePath;
     }
+
+    public List<MappedFile> range(final long from, final long to) {
+        Object[] mfs = copyMappedFiles(0);
+        if (null == mfs) {
+            return new ArrayList<>();
+        }
+
+        List<MappedFile> result = new ArrayList<>();
+        for (Object mf : mfs) {
+            MappedFile mappedFile = (MappedFile) mf;
+            if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() <= 
from) {
+                continue;
+            }
+
+            if (to <= mappedFile.getFileFromOffset()) {
+                break;
+            }
+            result.add(mappedFile);
+        }
+
+        return result;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 95de57cb3..df07a735b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -826,4 +826,15 @@ public interface MessageStore {
      */
     boolean isShutdown();
 
+    /**
+     * Estimate number of messages, within [from, to], which match given filter
+     *
+     * @param topic   Topic name
+     * @param queueId Queue ID
+     * @param from    Lower boundary of the range, inclusive.
+     * @param to      Upper boundary of the range, inclusive.
+     * @param filter  The message filter.
+     * @return Estimate number of messages matching given filter.
+     */
+    long estimateMessageCount(String topic, int queueId, long from, long to, 
MessageFilter filter);
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index c93d3eea5..91c80e940 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -16,12 +16,11 @@
  */
 package org.apache.rocketmq.store.config;
 
+import java.io.File;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.queue.BatchConsumeQueue;
 
-import java.io.File;
-
 public class MessageStoreConfig {
 
     public static final String MULTI_PATH_SPLITTER = 
System.getProperty("rocketmq.broker.multiPathSplitter", ",");
@@ -360,6 +359,16 @@ public class MessageStoreConfig {
 
     private boolean asyncLearner = false;
 
+    /**
+     * Number of records to scan before starting to estimate.
+     */
+    private int maxConsumeQueueScan = 20_000;
+
+    /**
+     * Number of matched records before starting to estimate.
+     */
+    private int sampleCountThreshold = 5000;
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -1565,5 +1574,19 @@ public class MessageStoreConfig {
         this.timerMaxDelaySec = timerMaxDelaySec;
     }
 
+    public int getMaxConsumeQueueScan() {
+        return maxConsumeQueueScan;
+    }
+
+    public void setMaxConsumeQueueScan(int maxConsumeQueueScan) {
+        this.maxConsumeQueueScan = maxConsumeQueueScan;
+    }
 
+    public int getSampleCountThreshold() {
+        return sampleCountThreshold;
+    }
+
+    public void setSampleCountThreshold(int sampleCountThreshold) {
+        this.sampleCountThreshold = sampleCountThreshold;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index ed7d1bd56..8a307b957 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -17,35 +17,36 @@
 
 package org.apache.rocketmq.store.queue;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.MappedFileQueue;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.logfile.MappedFile;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Function;
-
 public class BatchConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
     //position 8, size 4, tagscode 8, storetime 8, msgBaseOffset 8, batchSize 
2, compactedOffset 4, reserved 4
     public static final int CQ_STORE_UNIT_SIZE = 46;
+    public static final int MSG_TAG_OFFSET_INDEX = 12;
     public static final int MSG_STORE_TIME_OFFSET_INDEX = 20;
     public static final int MSG_BASE_OFFSET_INDEX = 28;
     public static final int MSG_BATCH_SIZE_INDEX = 36;
@@ -1005,4 +1006,103 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCy
     public MappedFileQueue getMappedFileQueue() {
         return mappedFileQueue;
     }
+
+    @Override
+    public long estimateMessageCount(long from, long to, MessageFilter filter) 
{
+        // transfer message offset to physical offset
+        SelectMappedBufferResult firstMappedFileBuffer = 
getBatchMsgIndexBuffer(from);
+        if (firstMappedFileBuffer == null) {
+            return -1;
+        }
+        long physicalOffsetFrom = firstMappedFileBuffer.getStartOffset();
+
+        SelectMappedBufferResult lastMappedFileBuffer = 
getBatchMsgIndexBuffer(to);
+        if (lastMappedFileBuffer == null) {
+            return -1;
+        }
+        long physicalOffsetTo = lastMappedFileBuffer.getStartOffset();
+
+        List<MappedFile> mappedFiles = 
mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+        if (mappedFiles.isEmpty()) {
+            return -1;
+        }
+
+        boolean sample = false;
+        long match = 0;
+        long matchCqUnitCount = 0;
+        long raw = 0;
+        long scanCqUnitCount = 0;
+
+        for (MappedFile mappedFile : mappedFiles) {
+            int start = 0;
+            int len = mappedFile.getFileSize();
+
+            // calculate start and len for first segment and last segment to 
reduce scanning
+            // first file segment
+            if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+                start = (int) (physicalOffsetFrom - 
mappedFile.getFileFromOffset());
+                if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() 
>= physicalOffsetTo) {
+                    // current mapped file covers search range completely.
+                    len = (int) (physicalOffsetTo - physicalOffsetFrom);
+                } else {
+                    len = mappedFile.getFileSize() - start;
+                }
+            }
+
+            // last file segment
+            if (0 == start && mappedFile.getFileFromOffset() + 
mappedFile.getFileSize() > physicalOffsetTo) {
+                len = (int) (physicalOffsetTo - 
mappedFile.getFileFromOffset());
+            }
+
+            // select partial data to scan
+            SelectMappedBufferResult slice = 
mappedFile.selectMappedBuffer(start, len);
+            if (null != slice) {
+                try {
+                    ByteBuffer buffer = slice.getByteBuffer();
+                    int current = 0;
+                    while (current < len) {
+                        // skip physicalOffset and message length fields.
+                        buffer.position(current + MSG_TAG_OFFSET_INDEX);
+                        long tagCode = buffer.getLong();
+                        buffer.position(current + MSG_BATCH_SIZE_INDEX);
+                        long batchSize = buffer.getShort();
+                        if (filter.isMatchedByConsumeQueue(tagCode, null)) {
+                            match += batchSize;
+                            matchCqUnitCount++;
+                        }
+                        raw += batchSize;
+                        scanCqUnitCount++;
+                        current += CQ_STORE_UNIT_SIZE;
+
+                        if (scanCqUnitCount >= 
messageStore.getMessageStoreConfig().getMaxConsumeQueueScan()) {
+                            sample = true;
+                            break;
+                        }
+
+                        if (matchCqUnitCount > 
messageStore.getMessageStoreConfig().getSampleCountThreshold()) {
+                            sample = true;
+                            break;
+                        }
+                    }
+                } finally {
+                    slice.release();
+                }
+            }
+            // we have scanned enough entries, now is the time to return an 
educated guess.
+            if (sample) {
+                break;
+            }
+        }
+
+        long result = match;
+        if (sample) {
+            if (0 == raw) {
+                log.error("[BUG]. Raw should NOT be 0");
+                return 0;
+            }
+            result = (long) (match * (to - from) * 1.0 / raw);
+        }
+        log.debug("Result={}, raw={}, match={}, sample={}", result, raw, 
match, sample);
+        return result;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index f36dda094..76242a5e3 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.store.queue;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageFilter;
 
 public interface ConsumeQueueInterface {
     /**
@@ -145,4 +146,14 @@ public interface ConsumeQueueInterface {
      * @param messageNum message number
      */
     void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, 
MessageExtBrokerInner msg, short messageNum);
+
+    /**
+     * Estimate number of records matching given filter.
+     *
+     * @param from Lower boundary, inclusive.
+     * @param to Upper boundary, inclusive.
+     * @param filter Specified filter criteria
+     * @return Number of matching records.
+     */
+    long estimateMessageCount(long from, long to, MessageFilter filter);
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 2485ec670..8e8fee278 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -18,9 +18,11 @@
 package org.apache.rocketmq.store.queue;
 
 import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Random;
@@ -34,8 +36,10 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
@@ -49,6 +53,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
 
 public class BatchConsumeMessageTest extends QueueTestBase {
+    private static final int BATCH_NUM = 10;
+    private static final int TOTAL_MSGS = 200;
     private MessageStore messageStore;
 
     @Before
@@ -456,4 +462,101 @@ public class BatchConsumeMessageTest extends 
QueueTestBase {
         }
     }
 
+    protected void putMsg(String topic) {
+        createTopic(topic, CQType.BatchCQ, messageStore);
+
+        for (int i = 0; i < TOTAL_MSGS; i++) {
+            MessageExtBrokerInner message = buildMessage(topic, BATCH_NUM * (i 
% 2 + 1));
+            switch (i % 3) {
+                case 0:
+                    message.setTags("TagA");
+                    break;
+
+                case 1:
+                    message.setTags("TagB");
+                    break;
+            }
+            message.setTagsCode(message.getTags().hashCode());
+            
message.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
+            PutMessageResult putMessageResult = 
messageStore.putMessage(message);
+            Assert.assertEquals(PutMessageStatus.PUT_OK, 
putMessageResult.getPutMessageStatus());
+        }
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+    }
+
+    @Test
+    public void testEstimateMessageCountInEmptyConsumeQueue() {
+        String topic = UUID.randomUUID().toString();
+        ConsumeQueueInterface consumeQueue = 
messageStore.findConsumeQueue(topic, 0);
+        MessageFilter filter = new MessageFilter() {
+            @Override
+            public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                return tagsCode == "TagA".hashCode();
+            }
+
+            @Override
+            public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                return false;
+            }
+        };
+        long estimation = consumeQueue.estimateMessageCount(0, 0, filter);
+        Assert.assertEquals(-1, estimation);
+
+        // test for illegal offset
+        estimation = consumeQueue.estimateMessageCount(0, 100, filter);
+        Assert.assertEquals(-1, estimation);
+        estimation = consumeQueue.estimateMessageCount(100, 1000, filter);
+        Assert.assertEquals(-1, estimation);
+    }
+
+    @Test
+    public void testEstimateMessageCount() {
+        String topic = UUID.randomUUID().toString();
+        putMsg(topic);
+        ConsumeQueueInterface cq = messageStore.findConsumeQueue(topic, 0);
+        MessageFilter filter = new MessageFilter() {
+            @Override
+            public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                return tagsCode == "TagA".hashCode();
+            }
+
+            @Override
+            public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                return false;
+            }
+        };
+        long estimation = cq.estimateMessageCount(0, 2999, filter);
+        Assert.assertEquals(1000, estimation);
+
+        // test for illegal offset
+        estimation = cq.estimateMessageCount(0, Long.MAX_VALUE, filter);
+        Assert.assertEquals(-1, estimation);
+        estimation = cq.estimateMessageCount(100000, 1000000, filter);
+        Assert.assertEquals(-1, estimation);
+        estimation = cq.estimateMessageCount(100, 0, filter);
+        Assert.assertEquals(-1, estimation);
+    }
+
+    @Test
+    public void testEstimateMessageCountSample() {
+        String topic = UUID.randomUUID().toString();
+        putMsg(topic);
+        messageStore.getMessageStoreConfig().setSampleCountThreshold(10);
+        messageStore.getMessageStoreConfig().setMaxConsumeQueueScan(20);
+        ConsumeQueueInterface cq = messageStore.findConsumeQueue(topic, 0);
+        MessageFilter filter = new MessageFilter() {
+            @Override
+            public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                return tagsCode == "TagA".hashCode();
+            }
+
+            @Override
+            public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                return false;
+            }
+        };
+        long estimation = cq.estimateMessageCount(1000, 2000, filter);
+        Assert.assertEquals(300, estimation);
+    }
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index f2742015c..6a8bfc5bc 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -16,17 +16,98 @@
  */
 package org.apache.rocketmq.store.queue;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.UUID;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.awaitility.Awaitility.await;
 
 public class ConsumeQueueTest extends QueueTestBase {
 
+    private static final String TOPIC = "StoreTest";
+    private static final int QUEUE_ID = 0;
+    private static final String STORE_PATH = "." + File.separator + 
"unit_test_store";
+    private static final int COMMIT_LOG_FILE_SIZE = 1024 * 8;
+    private static final int CQ_FILE_SIZE = 10 * 20;
+    private static final int CQ_EXT_FILE_SIZE = 10 * 
(ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
+
+    public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int 
cqFileSize,
+        boolean enableCqExt, int cqExtFileSize) {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize);
+        messageStoreConfig.setMappedFileSizeConsumeQueue(cqFileSize);
+        messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+        messageStoreConfig.setMessageIndexEnable(false);
+        messageStoreConfig.setEnableConsumeQueueExt(enableCqExt);
+
+        messageStoreConfig.setStorePathRootDir(STORE_PATH);
+        messageStoreConfig.setStorePathCommitLog(STORE_PATH + File.separator + 
"commitlog");
+
+        return messageStoreConfig;
+    }
+
+    protected DefaultMessageStore gen() throws Exception {
+        MessageStoreConfig messageStoreConfig = buildStoreConfig(
+            COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE
+        );
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+
+        DefaultMessageStore master = new DefaultMessageStore(
+            messageStoreConfig, new BrokerStatsManager(brokerConfig),
+            (topic, queueId, logicOffset, tagsCode, msgStoreTime, 
filterBitMap, properties) -> {
+            }, brokerConfig);
+
+        assertThat(master.load()).isTrue();
+
+        master.start();
+
+        return master;
+    }
+
+    protected void putMsg(DefaultMessageStore messageStore) throws Exception {
+        int totalMsgs = 200;
+        for (int i = 0; i < totalMsgs; i++) {
+            MessageExtBrokerInner message = buildMessage();
+            message.setQueueId(0);
+            switch (i % 3) {
+                case 0:
+                    message.setTags("TagA");
+                    break;
+
+                case 1:
+                    message.setTags("TagB");
+                    break;
+
+                case 2:
+                    message.setTags("TagC");
+                    break;
+            }
+            message.setTagsCode(message.getTags().hashCode());
+            
message.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
+            messageStore.putMessage(message);
+        }
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+    }
+
     @Test
     public void testIterator() throws Exception {
         final int msgNum = 100;
@@ -99,4 +180,126 @@ public class ConsumeQueueTest extends QueueTestBase {
         }
         messageStore.getQueueStore().destroy(consumeQueue);
     }
+
+    @Test
+    public void testEstimateMessageCountInEmptyConsumeQueue() {
+        DefaultMessageStore master = null;
+        try {
+            master = gen();
+            ConsumeQueueInterface consumeQueue = 
master.findConsumeQueue(TOPIC, QUEUE_ID);
+            MessageFilter filter = new MessageFilter() {
+                @Override
+                public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                    return tagsCode == "TagA".hashCode();
+                }
+
+                @Override
+                public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                    return false;
+                }
+            };
+            long estimation = consumeQueue.estimateMessageCount(0, 0, filter);
+            Assert.assertEquals(-1, estimation);
+
+            // test for illegal offset
+            estimation = consumeQueue.estimateMessageCount(0, 100, filter);
+            Assert.assertEquals(-1, estimation);
+            estimation = consumeQueue.estimateMessageCount(100, 1000, filter);
+            Assert.assertEquals(-1, estimation);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        } finally {
+            if (master != null) {
+                master.shutdown();
+                master.destroy();
+            }
+            UtilAll.deleteFile(new File(STORE_PATH));
+        }
+    }
+
+    @Test
+    public void testEstimateMessageCount() {
+        DefaultMessageStore messageStore = null;
+        try {
+            messageStore = gen();
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        try {
+            try {
+                putMsg(messageStore);
+            } catch (Exception e) {
+                fail("Failed to put message", e);
+            }
+
+            ConsumeQueueInterface cq = messageStore.findConsumeQueue(TOPIC, 
QUEUE_ID);
+            MessageFilter filter = new MessageFilter() {
+                @Override
+                public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                    return tagsCode == "TagA".hashCode();
+                }
+
+                @Override
+                public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                    return false;
+                }
+            };
+            long estimation = cq.estimateMessageCount(0, 199, filter);
+            Assert.assertEquals(67, estimation);
+
+            // test for illegal offset
+            estimation = cq.estimateMessageCount(0, 1000, filter);
+            Assert.assertEquals(67, estimation);
+            estimation = cq.estimateMessageCount(1000, 10000, filter);
+            Assert.assertEquals(-1, estimation);
+            estimation = cq.estimateMessageCount(100, 0, filter);
+            Assert.assertEquals(-1, estimation);
+        } finally {
+            messageStore.shutdown();
+            messageStore.destroy();
+            UtilAll.deleteFile(new File(STORE_PATH));
+        }
+    }
+
+    @Test
+    public void testEstimateMessageCountSample() {
+        DefaultMessageStore messageStore = null;
+        try {
+            messageStore = gen();
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+
+        try {
+            try {
+                putMsg(messageStore);
+            } catch (Exception e) {
+                fail("Failed to put message", e);
+            }
+            messageStore.getMessageStoreConfig().setSampleCountThreshold(10);
+            messageStore.getMessageStoreConfig().setMaxConsumeQueueScan(20);
+            ConsumeQueueInterface cq = messageStore.findConsumeQueue(TOPIC, 
QUEUE_ID);
+            MessageFilter filter = new MessageFilter() {
+                @Override
+                public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                    return tagsCode == "TagA".hashCode();
+                }
+
+                @Override
+                public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                    return false;
+                }
+            };
+            long estimation = cq.estimateMessageCount(100, 150, filter);
+            Assert.assertEquals(15, estimation);
+        } finally {
+            messageStore.shutdown();
+            messageStore.destroy();
+            UtilAll.deleteFile(new File(STORE_PATH));
+        }
+    }
 }
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQBlockListener.java
 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQBlockListener.java
new file mode 100644
index 000000000..907612cce
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQBlockListener.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.listener.rmq.concurrent;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class RMQBlockListener extends RMQNormalListener {
+    private volatile boolean block = true;
+    private volatile boolean inBlock = true;
+
+    public RMQBlockListener() {
+        super();
+    }
+
+    public RMQBlockListener(boolean block) {
+        super();
+        this.block = block;
+    }
+
+    public boolean isBlocked() {
+        return inBlock;
+    }
+
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeConcurrentlyContext context) {
+        ConsumeConcurrentlyStatus status = super.consumeMessage(msgs, context);
+
+        try {
+            while (block) {
+                inBlock = true;
+                Thread.sleep(100);
+            }
+        } catch (InterruptedException ignore) {
+        }
+
+        return status;
+    }
+}
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 129fe8f9a..554289d01 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -31,7 +31,10 @@ import 
org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
 import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -41,8 +44,6 @@ import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.rpc.ClientMetadata;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminUtils;
@@ -56,6 +57,18 @@ import static org.awaitility.Awaitility.await;
 public class MQAdminTestUtils {
     private static Logger log = 
LoggerFactory.getLogger(MQAdminTestUtils.class);
 
+    private static DefaultMQAdminExt mqAdminExt;
+
+    public static void startAdmin(String nameSrvAddr) throws MQClientException 
{
+        mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        mqAdminExt.start();
+    }
+
+    public static void shutdownAdmin() {
+        mqAdminExt.shutdown();
+    }
+
     public static boolean createTopic(String nameSrvAddr, String clusterName, 
String topic,
                                       int queueNum, Map<String, String> 
attributes) {
         int defaultWaitTime = 30;
@@ -298,4 +311,12 @@ public class MQAdminTestUtils {
         cmd.execute(commandLine, options, null);
     }
 
+    public static ConsumeStats examineConsumeStats(String brokerAddr, String 
topic, String group) {
+        ConsumeStats consumeStats = null;
+        try {
+            consumeStats = mqAdminExt.examineConsumeStats(brokerAddr, group, 
topic, Long.MAX_VALUE);
+        } catch (Exception ignored) {
+        }
+        return consumeStats;
+    }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java 
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index d1b89e914..d27139195 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -35,11 +35,11 @@ import 
org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -55,6 +55,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminExt;
 import org.junit.Assert;
 
+import static org.apache.rocketmq.test.base.IntegrationTestBase.initMQAdmin;
 import static org.awaitility.Awaitility.await;
 
 public class BaseConf {
@@ -109,6 +110,7 @@ public class BaseConf {
         brokerControllerList = ImmutableList.of(brokerController1, 
brokerController2, brokerController3);
         brokerControllerMap = brokerControllerList.stream().collect(
             Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), 
Function.identity()));
+        initMQAdmin(NAMESRV_ADDR);
     }
 
     public BaseConf() {
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java 
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index a2b9b95ae..221793692 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.UtilAll;
@@ -84,6 +85,7 @@ public class IntegrationTestBase {
                     for (File file : TMPE_FILES) {
                         UtilAll.deleteFile(file);
                     }
+                    MQAdminTestUtils.shutdownAdmin();
                 } catch (Exception e) {
                     logger.error("Shutdown error", e);
                 }
@@ -133,6 +135,8 @@ public class IntegrationTestBase {
         brokerConfig.setBrokerIP1("127.0.0.1");
         brokerConfig.setNamesrvAddr(nsAddr);
         brokerConfig.setEnablePropertyFilter(true);
+        brokerConfig.setEnableCalcFilterBitMap(true);
+        storeConfig.setEnableConsumeQueueExt(true);
         brokerConfig.setLoadBalancePollNameServerInterval(500);
         storeConfig.setStorePathRootDir(baseDir);
         storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
@@ -195,4 +199,12 @@ public class IntegrationTestBase {
         UtilAll.deleteFile(file);
     }
 
+    public static void initMQAdmin(String nsAddr) {
+        try {
+            MQAdminTestUtils.startAdmin(nsAddr);
+        } catch (MQClientException e) {
+            logger.info("MQAdmin start failed");
+            System.exit(1);
+        }
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java 
b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
new file mode 100644
index 000000000..810118b3e
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.offset;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.DefaultMessageFilter;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQBlockListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class LagCalculationIT extends BaseConf {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LagCalculationIT.class);
+    private RMQNormalProducer producer = null;
+    private RMQNormalConsumer consumer = null;
+    private String topic = null;
+    private RMQBlockListener blockListener = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        LOGGER.info(String.format("use topic: %s;", topic));
+        for (BrokerController controller : brokerControllerList) {
+            controller.getBrokerConfig().setLongPollingEnable(false);
+            controller.getBrokerConfig().setShortPollingTimeMills(500);
+            controller.getBrokerConfig().setEstimateAccumulation(true);
+        }
+        producer = getProducer(NAMESRV_ADDR, topic);
+        blockListener = new RMQBlockListener(false);
+        consumer = getConsumer(NAMESRV_ADDR, topic, "*", blockListener);
+    }
+
+    @After
+    public void tearDown() {
+        shutdown();
+    }
+
+    private Pair<Long, Long> getLag(List<MessageQueue> mqs) {
+        long lag = 0;
+        long pullLag = 0;
+        for (BrokerController controller : brokerControllerList) {
+            ConsumeStats consumeStats = 
MQAdminTestUtils.examineConsumeStats(controller.getBrokerAddr(), topic, 
consumer.getConsumerGroup());
+            Map<MessageQueue, OffsetWrapper> offsetTable = 
consumeStats.getOffsetTable();
+            for (MessageQueue mq : mqs) {
+                if 
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
+                    long brokerOffset = 
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
+
+                    long consumerOffset = 
controller.getConsumerOffsetManager().queryOffset(consumer.getConsumerGroup(),
+                        topic, mq.getQueueId());
+                    long pullOffset =
+                        
controller.getConsumerOffsetManager().queryPullOffset(consumer.getConsumerGroup(),
+                            topic, mq.getQueueId());
+                    OffsetWrapper offsetWrapper = offsetTable.get(mq);
+                    assertEquals(brokerOffset, 
offsetWrapper.getBrokerOffset());
+                    assertEquals(consumerOffset, 
offsetWrapper.getConsumerOffset());
+                    assertEquals(pullOffset, offsetWrapper.getPullOffset());
+                    lag += brokerOffset - consumerOffset;
+                    pullLag += brokerOffset - pullOffset;
+                }
+            }
+        }
+        return new Pair<>(lag, pullLag);
+    }
+
+    @Test
+    public void testCalculateLag() throws InterruptedException {
+        int msgSize = 10;
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+
+        producer.send(mqMsgs.getMsgsWithMQ());
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 
CONSUME_TIME);
+        // wait for updating offset
+        Thread.sleep(5 * 1000);
+
+        Pair<Long, Long> pair = getLag(mqs);
+        assertEquals(0, (long) pair.getObject1());
+        assertEquals(0, (long) pair.getObject2());
+
+        blockListener.setBlock(true);
+        consumer.clearMsg();
+        producer.clearMsg();
+        producer.send(mqMsgs.getMsgsWithMQ());
+        // wait for updating offset
+        Thread.sleep(5 * 1000);
+
+        pair = getLag(mqs);
+        assertEquals(producer.getAllMsgBody().size(), (long) 
pair.getObject1());
+        assertEquals(0, (long) pair.getObject2());
+
+        blockListener.setBlock(false);
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 
CONSUME_TIME);
+        consumer.shutdown();
+        producer.clearMsg();
+        producer.send(mqMsgs.getMsgsWithMQ());
+        // wait for updating offset
+        Thread.sleep(5 * 1000);
+
+        pair = getLag(mqs);
+        assertEquals(producer.getAllMsgBody().size(), (long) 
pair.getObject1());
+        assertEquals(producer.getAllMsgBody().size(), (long) 
pair.getObject2());
+    }
+
+    @Test
+    public void testEstimateLag() throws Exception {
+        int msgNoTagSize = 80;
+        int msgWithTagSize = 20;
+        int repeat = 2;
+        String tag = "TAG_FOR_TEST_ESTIMATE";
+        String sql = "TAGS = 'TAG_FOR_TEST_ESTIMATE' And value < " + repeat / 
2;
+        MessageSelector selector = MessageSelector.bySql(sql);
+        RMQBlockListener sqlListener = new RMQBlockListener(true);
+        RMQSqlConsumer sqlConsumer = 
ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), topic, 
selector, sqlListener);
+        RMQBlockListener tagListener = new RMQBlockListener(true);
+        RMQNormalConsumer tagConsumer = getConsumer(NAMESRV_ADDR, topic, tag, 
tagListener);
+        // wait for building filter data
+        await().atMost(5, TimeUnit.SECONDS).until(() -> 
sqlListener.isBlocked() && tagListener.isBlocked());
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        for (int i = 0; i < repeat; i++) {
+            MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgNoTagSize);
+            Map<MessageQueue, List<Object>> msgMap = mqMsgs.getMsgsWithMQ();
+            mqMsgs = new MessageQueueMsg(mqs, msgWithTagSize, tag);
+            Map<MessageQueue, List<Object>> msgWithTagMap = 
mqMsgs.getMsgsWithMQ();
+            int finalI = i;
+            msgMap.forEach((mq, msgList) -> {
+                List<Object> msgWithTagList = msgWithTagMap.get(mq);
+                for (Object o : msgWithTagList) {
+                    ((Message) o).putUserProperty("value", 
String.valueOf(finalI));
+                }
+                msgList.addAll(msgWithTagList);
+                Collections.shuffle(msgList);
+            });
+            producer.send(msgMap);
+        }
+
+        // test lag estimation for tag consumer
+        for (BrokerController controller : brokerControllerList) {
+            for (MessageQueue mq : mqs) {
+                if 
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
+                    long brokerOffset = 
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
+                    long estimateMessageCount = controller.getMessageStore()
+                        .estimateMessageCount(topic, mq.getQueueId(), 0, 
brokerOffset,
+                            new 
DefaultMessageFilter(FilterAPI.buildSubscriptionData(topic, tag)));
+                    assertEquals(repeat * msgWithTagSize, 
estimateMessageCount);
+                }
+            }
+        }
+
+        // test lag estimation for sql consumer
+        for (BrokerController controller : brokerControllerList) {
+            for (MessageQueue mq : mqs) {
+                if 
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
+                    long brokerOffset = 
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
+                    SubscriptionData subscriptionData = 
controller.getConsumerManager().findSubscriptionData(sqlConsumer.getConsumerGroup(),
 topic);
+                    ConsumerFilterData consumerFilterData = 
controller.getConsumerFilterManager().get(topic, 
sqlConsumer.getConsumerGroup());
+                    long estimateMessageCount = controller.getMessageStore()
+                        .estimateMessageCount(topic, mq.getQueueId(), 0, 
brokerOffset,
+                            new ExpressionMessageFilter(subscriptionData, 
consumerFilterData, controller.getConsumerFilterManager()));
+                    assertEquals(repeat / 2 * msgWithTagSize, 
estimateMessageCount);
+                }
+            }
+        }
+
+        sqlConsumer.shutdown();
+        tagConsumer.shutdown();
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 9f15ccaff..7bc308036 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -296,6 +296,13 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
     }
 
+    @Override
+    public ConsumeStats examineConsumeStats(final String brokerAddr, final 
String consumerGroup,
+        final String topicName, final long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.examineConsumeStats(brokerAddr, 
consumerGroup, topicName, timeoutMillis);
+    }
+
     @Override
     public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String 
consumerGroup, String topic) {
         return 
defaultMQAdminExtImpl.examineConsumeStatsConcurrent(consumerGroup, topic);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 5f3bcbd38..0460ed95b 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -62,6 +62,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -106,8 +108,6 @@ import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapp
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.tools.admin.api.TrackType;
@@ -493,6 +493,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return staticResult;
     }
 
+    @Override
+    public ConsumeStats examineConsumeStats(String brokerAddr, String 
consumerGroup, String topicName,
+        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(brokerAddr, 
consumerGroup, topicName, timeoutMillis);
+    }
+
     @Override
     public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(final 
String consumerGroup, final String topic) {
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index efe8c0342..ebf878f32 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -145,6 +145,10 @@ public interface MQAdminExt extends MQAdmin {
         final String topic) throws RemotingException, MQClientException,
         InterruptedException, MQBrokerException;
 
+    ConsumeStats examineConsumeStats(final String brokerAddr, final String 
consumerGroup, final String topicName,
+        final long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException;
+
     AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String 
consumerGroup, String topic);
 
     ClusterInfo examineBrokerClusterInfo() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException,
@@ -462,9 +466,9 @@ public interface MQAdminExt extends MQAdmin {
 
     /**
      * clean controller broker meta data
-     *
      */
     void cleanControllerBrokerData(String controllerAddr, String clusterName, 
String brokerName,
-        String brokerAddr, boolean isCleanLivingBroker) throws 
RemotingException, InterruptedException, MQBrokerException;
+        String brokerAddr,
+        boolean isCleanLivingBroker) throws RemotingException, 
InterruptedException, MQBrokerException;
 
 }

Reply via email to