This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2fa7513a2b [ISSUE #8764] Implement consume lag estimation in cq rocksdb store (#8800) 2fa7513a2b is described below commit 2fa7513a2bcdddd1583b7e293b2f06bd350691e0 Author: LetLetMe <43874697+letle...@users.noreply.github.com> AuthorDate: Tue Oct 15 14:14:31 2024 +0800 [ISSUE #8764] Implement consume lag estimation in cq rocksdb store (#8800) --- .../apache/rocketmq/store/RocksDBMessageStore.java | 6 -- .../rocketmq/store/queue/RocksDBConsumeQueue.java | 41 +++++++++- .../rocketmq/store/queue/ConsumeQueueTest.java | 92 +++++++++++++++++++++- 3 files changed, 128 insertions(+), 11 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 90df7aed59..21f8d45c9d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -168,12 +168,6 @@ public class RocksDBMessageStore extends DefaultMessageStore { } } - @Override - public long estimateMessageCount(String topic, int queueId, long from, long to, MessageFilter filter) { - // todo - return 0; - } - @Override public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) { DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 2363c2896e..83ba7bebad 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -222,10 +222,47 @@ public class RocksDBConsumeQueue implements ConsumeQueueInterface { @Override public long estimateMessageCount(long from, long to, MessageFilter filter) { - // todo - return 0; + // Check from and to offset validity + Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from); + if (fromUnit == null) { + return -1; + } + + if (from >= to) { + return -1; + } + + if (to > getMaxOffsetInQueue()) { + to = getMaxOffsetInQueue(); + } + + int maxSampleSize = messageStore.getMessageStoreConfig().getMaxConsumeQueueScan(); + int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from); + + int matchThreshold = messageStore.getMessageStoreConfig().getSampleCountThreshold(); + int matchSize = 0; + + for (int i = 0; i < sampleSize; i++) { + long index = from + i; + Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index); + if (pair == null) { + continue; + } + CqUnit cqUnit = pair.getObject1(); + if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) { + matchSize++; + // if matchSize is plenty, early exit estimate + if (matchSize > matchThreshold) { + sampleSize = i; + break; + } + } + } + // Make sure the second half is a floating point number, otherwise it will be truncated to 0 + return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / (sampleSize * 1.0))); } + @Override public long getMinOffsetInQueue() { return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java index c3c8be52dd..bf3b1eeca8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.message.MessageDecoder; @@ -31,6 +32,7 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.RocksDBMessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Assert; @@ -84,7 +86,26 @@ public class ConsumeQueueTest extends QueueTestBase { return master; } - protected void putMsg(DefaultMessageStore messageStore) throws Exception { + protected RocksDBMessageStore genRocksdbMessageStore() throws Exception { + MessageStoreConfig messageStoreConfig = buildStoreConfig( + COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE + ); + + BrokerConfig brokerConfig = new BrokerConfig(); + + RocksDBMessageStore master = new RocksDBMessageStore( + messageStoreConfig, new BrokerStatsManager(brokerConfig), + (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + }, brokerConfig, new ConcurrentHashMap<>()); + + assertThat(master.load()).isTrue(); + + master.start(); + + return master; + } + + protected void putMsg(MessageStore messageStore) { int totalMsgs = 200; for (int i = 0; i < totalMsgs; i++) { MessageExtBrokerInner message = buildMessage(); @@ -184,9 +205,33 @@ public class ConsumeQueueTest extends QueueTestBase { @Test public void testEstimateMessageCountInEmptyConsumeQueue() { - DefaultMessageStore master = null; + DefaultMessageStore messageStore = null; + try { + messageStore = gen(); + doTestEstimateMessageCountInEmptyConsumeQueue(messageStore); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + + @Test + public void testEstimateRocksdbMessageCountInEmptyConsumeQueue() { + if (notExecuted()) { + return; + } + DefaultMessageStore messageStore = null; + try { + messageStore = genRocksdbMessageStore(); + doTestEstimateMessageCountInEmptyConsumeQueue(messageStore); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + + public void doTestEstimateMessageCountInEmptyConsumeQueue(MessageStore master) { try { - master = gen(); ConsumeQueueInterface consumeQueue = master.findConsumeQueue(TOPIC, QUEUE_ID); MessageFilter filter = new MessageFilter() { @Override @@ -219,16 +264,34 @@ public class ConsumeQueueTest extends QueueTestBase { } } + @Test + public void testEstimateRocksdbMessageCount() { + if (notExecuted()) { + return; + } + DefaultMessageStore messageStore = null; + try { + messageStore = genRocksdbMessageStore(); + doTestEstimateMessageCount(messageStore); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + @Test public void testEstimateMessageCount() { DefaultMessageStore messageStore = null; try { messageStore = gen(); + doTestEstimateMessageCount(messageStore); } catch (Exception e) { e.printStackTrace(); assertThat(Boolean.FALSE).isTrue(); } + } + public void doTestEstimateMessageCount(MessageStore messageStore) { try { try { putMsg(messageStore); @@ -265,15 +328,34 @@ public class ConsumeQueueTest extends QueueTestBase { } } + @Test + public void testEstimateRocksdbMessageCountSample() { + if (notExecuted()) { + return; + } + DefaultMessageStore messageStore = null; + try { + messageStore = genRocksdbMessageStore(); + doTestEstimateMessageCountSample(messageStore); + } catch (Exception e) { + e.printStackTrace(); + assertThat(Boolean.FALSE).isTrue(); + } + } + @Test public void testEstimateMessageCountSample() { DefaultMessageStore messageStore = null; try { messageStore = gen(); + doTestEstimateMessageCountSample(messageStore); } catch (Exception e) { e.printStackTrace(); assertThat(Boolean.FALSE).isTrue(); } + } + + public void doTestEstimateMessageCountSample(MessageStore messageStore) { try { try { @@ -303,4 +385,8 @@ public class ConsumeQueueTest extends QueueTestBase { UtilAll.deleteFile(new File(STORE_PATH)); } } + + private boolean notExecuted() { + return MixAll.isMac(); + } }