This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2fa7513a2b [ISSUE #8764] Implement consume lag estimation in cq 
rocksdb store (#8800)
2fa7513a2b is described below

commit 2fa7513a2bcdddd1583b7e293b2f06bd350691e0
Author: LetLetMe <43874697+letle...@users.noreply.github.com>
AuthorDate: Tue Oct 15 14:14:31 2024 +0800

    [ISSUE #8764] Implement consume lag estimation in cq rocksdb store (#8800)
---
 .../apache/rocketmq/store/RocksDBMessageStore.java |  6 --
 .../rocketmq/store/queue/RocksDBConsumeQueue.java  | 41 +++++++++-
 .../rocketmq/store/queue/ConsumeQueueTest.java     | 92 +++++++++++++++++++++-
 3 files changed, 128 insertions(+), 11 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 90df7aed59..21f8d45c9d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -168,12 +168,6 @@ public class RocksDBMessageStore extends 
DefaultMessageStore {
         }
     }
 
-    @Override
-    public long estimateMessageCount(String topic, int queueId, long from, 
long to, MessageFilter filter) {
-        // todo
-        return 0;
-    }
-
     @Override
     public void initMetrics(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier) {
         DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 2363c2896e..83ba7bebad 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -222,10 +222,47 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
 
     @Override
     public long estimateMessageCount(long from, long to, MessageFilter filter) 
{
-        // todo
-        return 0;
+        // Check from and to offset validity
+        Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
+        if (fromUnit == null) {
+            return -1;
+        }
+
+        if (from >= to) {
+            return -1;
+        }
+
+        if (to > getMaxOffsetInQueue()) {
+            to = getMaxOffsetInQueue();
+        }
+
+        int maxSampleSize = 
messageStore.getMessageStoreConfig().getMaxConsumeQueueScan();
+        int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to 
- from);
+
+        int matchThreshold = 
messageStore.getMessageStoreConfig().getSampleCountThreshold();
+        int matchSize = 0;
+
+        for (int i = 0; i < sampleSize; i++) {
+            long index = from + i;
+            Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
+            if (pair == null) {
+                continue;
+            }
+            CqUnit cqUnit = pair.getObject1();
+            if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), 
cqUnit.getCqExtUnit())) {
+                matchSize++;
+                // if matchSize is plenty, early exit estimate
+                if (matchSize > matchThreshold) {
+                    sampleSize = i;
+                    break;
+                }
+            }
+        }
+        // Make sure the second half is a floating point number, otherwise it 
will be truncated to 0
+        return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / 
(sampleSize * 1.0)));
     }
 
+
     @Override
     public long getMinOffsetInQueue() {
         return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index c3c8be52dd..bf3b1eeca8 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.RocksDBMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Assert;
@@ -84,7 +86,26 @@ public class ConsumeQueueTest extends QueueTestBase {
         return master;
     }
 
-    protected void putMsg(DefaultMessageStore messageStore) throws Exception {
+    protected RocksDBMessageStore genRocksdbMessageStore() throws Exception {
+        MessageStoreConfig messageStoreConfig = buildStoreConfig(
+            COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE
+        );
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+
+        RocksDBMessageStore master = new RocksDBMessageStore(
+            messageStoreConfig, new BrokerStatsManager(brokerConfig),
+            (topic, queueId, logicOffset, tagsCode, msgStoreTime, 
filterBitMap, properties) -> {
+            }, brokerConfig, new ConcurrentHashMap<>());
+
+        assertThat(master.load()).isTrue();
+
+        master.start();
+
+        return master;
+    }
+
+    protected void putMsg(MessageStore messageStore) {
         int totalMsgs = 200;
         for (int i = 0; i < totalMsgs; i++) {
             MessageExtBrokerInner message = buildMessage();
@@ -184,9 +205,33 @@ public class ConsumeQueueTest extends QueueTestBase {
 
     @Test
     public void testEstimateMessageCountInEmptyConsumeQueue() {
-        DefaultMessageStore master = null;
+        DefaultMessageStore messageStore = null;
+        try {
+            messageStore = gen();
+            doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
+    @Test
+    public void testEstimateRocksdbMessageCountInEmptyConsumeQueue() {
+        if (notExecuted()) {
+            return;
+        }
+        DefaultMessageStore messageStore = null;
+        try {
+            messageStore = genRocksdbMessageStore();
+            doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
+    public void doTestEstimateMessageCountInEmptyConsumeQueue(MessageStore 
master) {
         try {
-            master = gen();
             ConsumeQueueInterface consumeQueue = 
master.findConsumeQueue(TOPIC, QUEUE_ID);
             MessageFilter filter = new MessageFilter() {
                 @Override
@@ -219,16 +264,34 @@ public class ConsumeQueueTest extends QueueTestBase {
         }
     }
 
+    @Test
+    public void testEstimateRocksdbMessageCount() {
+        if (notExecuted()) {
+            return;
+        }
+        DefaultMessageStore messageStore = null;
+        try {
+            messageStore = genRocksdbMessageStore();
+            doTestEstimateMessageCount(messageStore);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
     @Test
     public void testEstimateMessageCount() {
         DefaultMessageStore messageStore = null;
         try {
             messageStore = gen();
+            doTestEstimateMessageCount(messageStore);
         } catch (Exception e) {
             e.printStackTrace();
             assertThat(Boolean.FALSE).isTrue();
         }
+    }
 
+    public void doTestEstimateMessageCount(MessageStore messageStore) {
         try {
             try {
                 putMsg(messageStore);
@@ -265,15 +328,34 @@ public class ConsumeQueueTest extends QueueTestBase {
         }
     }
 
+    @Test
+    public void testEstimateRocksdbMessageCountSample() {
+        if (notExecuted()) {
+            return;
+        }
+        DefaultMessageStore messageStore = null;
+        try {
+            messageStore = genRocksdbMessageStore();
+            doTestEstimateMessageCountSample(messageStore);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(Boolean.FALSE).isTrue();
+        }
+    }
+
     @Test
     public void testEstimateMessageCountSample() {
         DefaultMessageStore messageStore = null;
         try {
             messageStore = gen();
+            doTestEstimateMessageCountSample(messageStore);
         } catch (Exception e) {
             e.printStackTrace();
             assertThat(Boolean.FALSE).isTrue();
         }
+    }
+
+    public void doTestEstimateMessageCountSample(MessageStore messageStore) {
 
         try {
             try {
@@ -303,4 +385,8 @@ public class ConsumeQueueTest extends QueueTestBase {
             UtilAll.deleteFile(new File(STORE_PATH));
         }
     }
+
+    private boolean notExecuted() {
+        return MixAll.isMac();
+    }
 }

Reply via email to