This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e5d33722d9 [ISSUE #10223] Not query the index of system topics in
tiered storage (#10224)
e5d33722d9 is described below
commit e5d33722d9ccd5992951f761264af02a805ef67c
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 30 13:34:53 2026 +0800
[ISSUE #10223] Not query the index of system topics in tiered storage
(#10224)
---
.../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 3 ++-
.../rocketmq/tieredstore/core/MessageStoreFetcherImpl.java | 6 ++++++
.../rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java | 9 +++++++++
3 files changed, 17 insertions(+), 1 deletion(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 4409bb599b..aee767dae2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1501,7 +1501,8 @@ public class DefaultMessageStore implements MessageStore {
return queryMessageResult;
}
- @Override public CompletableFuture<QueryMessageResult>
queryMessageAsync(String topic, String key,
+ @Override
+ public CompletableFuture<QueryMessageResult> queryMessageAsync(String
topic, String key,
int maxNum, long begin, long end) {
return CompletableFuture.completedFuture(queryMessage(topic, key,
maxNum, begin, end));
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index f669f8940a..2a5dc2dd8a 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -61,6 +61,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
private final TieredMessageStore messageStore;
private final IndexService indexService;
private final FlatFileStore flatFileStore;
+ private final MessageStoreFilter topicFilter;
private final long memoryMaxSize;
private final Cache<String /* topic@queueId@offset */, SelectBufferResult>
fetcherCache;
@@ -78,6 +79,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
this.messageStore = messageStore;
this.indexService = indexService;
this.metadataStore = flatFileStore.getMetadataStore();
+ this.topicFilter = messageStore.getTopicFilter();
this.memoryMaxSize =
(long) (Runtime.getRuntime().maxMemory() *
storeConfig.getReadAheadCacheSizeThresholdRate());
this.fetcherCache = this.initCache(storeConfig);
@@ -437,6 +439,10 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
public CompletableFuture<QueryMessageResult> queryMessageAsync(
String topic, String key, int maxCount, long begin, long end) {
+ if (topicFilter.filterTopic(topic)) {
+ return CompletableFuture.completedFuture(new QueryMessageResult());
+ }
+
long topicId;
try {
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
index fdcdec066f..fd681f27b7 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.DefaultMessageFilter;
import org.apache.rocketmq.store.GetMessageResult;
@@ -81,6 +83,9 @@ public class MessageStoreFetcherImplTest {
groupName, mq.getTopic(), 0, 0, 32, null).join();
Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE,
getMessageResult.getStatus());
+ FieldUtils.writeField(fetcher,
+ "topicFilter", new MessageStoreTopicFilter(storeConfig), true);
+
getMessageResult = fetcher.getMessageAsync(
groupName, mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
Assert.assertEquals(GetMessageStatus.OFFSET_TOO_SMALL,
getMessageResult.getStatus());
@@ -325,5 +330,9 @@ public class MessageStoreFetcherImplTest {
queryMessageResult = fetcher.queryMessageAsync(
mq.getTopic(), "uk", 120, 0L, System.currentTimeMillis()).join();
Assert.assertEquals(100,
queryMessageResult.getMessageBufferList().size());
+
+ queryMessageResult =
fetcher.queryMessageAsync(TopicValidator.SYSTEM_TOPIC_PREFIX + mq.getTopic(),
+ "uk", 120, 0L, System.currentTimeMillis()).join();
+ Assert.assertEquals(0,
queryMessageResult.getMessageBufferList().size());
}
}