This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e11e29419f [ISSUE #7308] Adding topic blacklist and filter in tiered storage module (#7310) e11e29419f is described below commit e11e29419f6e2d1d9673d0329e57b824ebf3da47 Author: lizhimins <707364...@qq.com> AuthorDate: Wed Sep 6 20:42:24 2023 +0800 [ISSUE #7308] Adding topic blacklist and filter in tiered storage module (#7310) --- .../rocketmq/tieredstore/TieredDispatcher.java | 21 ++++++++-- .../rocketmq/tieredstore/TieredMessageStore.java | 1 + .../tieredstore/file/TieredFlatFileManager.java | 17 +++++--- .../provider/TieredStoreTopicBlackListFilter.java | 45 ++++++++++++++++++++++ .../provider/TieredStoreTopicFilter.java | 25 ++++++++++++ .../TieredStoreTopicBlackListFilterTest.java | 36 +++++++++++++++++ 6 files changed, 136 insertions(+), 9 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 430c2b62eb..766c559e9c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -48,6 +48,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; +import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter; +import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter; import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; @@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); + private TieredStoreTopicFilter topicFilter; private final String brokerName; private final MessageStore defaultStore; private final TieredMessageStoreConfig storeConfig; @@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch this.defaultStore = defaultStore; this.storeConfig = storeConfig; this.brokerName = storeConfig.getBrokerName(); + this.topicFilter = new TieredStoreTopicBlackListFilter(); this.tieredFlatFileManager = TieredFlatFileManager.getInstance(storeConfig); this.dispatchRequestReadMap = new ConcurrentHashMap<>(); this.dispatchRequestWriteMap = new ConcurrentHashMap<>(); this.dispatchTaskLock = new ReentrantLock(); this.dispatchWriteLock = new ReentrantLock(); - this.initScheduleTask(); } - private void initScheduleTask() { + protected void initScheduleTask() { TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() -> tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> { if (!flatFile.getCompositeFlatFileLock().isLocked()) { @@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch }), 30, 10, TimeUnit.SECONDS); } + public TieredStoreTopicFilter getTopicFilter() { + return topicFilter; + } + + public void setTopicFilter(TieredStoreTopicFilter topicFilter) { + this.topicFilter = topicFilter; + } + @Override public void dispatch(DispatchRequest request) { if (stopped) { @@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch } String topic = request.getTopic(); - if (TieredStoreUtil.isSystemTopic(topic)) { + if (topicFilter != null && topicFilter.filterTopic(topic)) { return; } @@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch return; } + if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) { + return; + } + if (flatFile.getDispatchOffset() == -1L) { return; } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 78e855f365..9fb1b2f01c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -90,6 +90,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { boolean loadNextStore = next.load(); boolean result = loadFlatFile && loadNextStore; if (result) { + dispatcher.initScheduleTask(); dispatcher.start(); } return result; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java index e9ae4a5a52..7c744af3b9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java @@ -134,21 +134,21 @@ public class TieredFlatFileManager { public void doCleanExpiredFile() { long expiredTimeStamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); - Random random = new Random(); for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) { - int delay = random.nextInt(storeConfig.getMaxCommitJitter()); - TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> { + TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> { flatFile.getCompositeFlatFileLock().lock(); try { flatFile.cleanExpiredFile(expiredTimeStamp); flatFile.destroyExpiredFile(); if (flatFile.getConsumeQueueBaseOffset() == -1) { + logger.info("Clean flatFile because file not initialized, topic={}, queueId={}", + flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId()); destroyCompositeFile(flatFile.getMessageQueue()); } } finally { flatFile.getCompositeFlatFileLock().unlock(); } - }, delay, TimeUnit.MILLISECONDS); + }); } if (indexFile != null) { indexFile.cleanExpiredFile(expiredTimeStamp); @@ -218,8 +218,13 @@ public class TieredFlatFileManager { storeConfig.getBrokerName(), queueMetadata.getQueue().getQueueId())); queueCount.incrementAndGet(); }); - logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", - topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); + + if (queueCount.get() == 0L) { + metadataStore.deleteTopic(topicMetadata.getTopic()); + } else { + logger.info("Recover TopicFlatFile, topic: {}, queueCount: {}, cost: {}ms", + topicMetadata.getTopic(), queueCount.get(), subWatch.elapsed(TimeUnit.MILLISECONDS)); + } } catch (Exception e) { logger.error("Recover TopicFlatFile error, topic: {}", topicMetadata.getTopic(), e); } finally { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java new file mode 100644 index 0000000000..50adbb7136 --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.provider; + +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + +public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter { + + private final Set<String> topicBlackSet; + + public TieredStoreTopicBlackListFilter() { + this.topicBlackSet = new HashSet<>(); + } + + @Override + public boolean filterTopic(String topicName) { + if (StringUtils.isBlank(topicName)) { + return true; + } + return TieredStoreUtil.isSystemTopic(topicName) || topicBlackSet.contains(topicName); + } + + @Override + public void addTopicToWhiteList(String topicName) { + this.topicBlackSet.add(topicName); + } +} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java new file mode 100644 index 0000000000..3f26b8b026 --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.provider; + +public interface TieredStoreTopicFilter { + + boolean filterTopic(String topicName); + + void addTopicToWhiteList(String topicName); +} diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java new file mode 100644 index 0000000000..2bf48173c4 --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore.provider; + +import org.apache.rocketmq.common.topic.TopicValidator; +import org.junit.Assert; +import org.junit.Test; + +public class TieredStoreTopicBlackListFilterTest { + + @Test + public void filterTopicTest() { + TieredStoreTopicFilter topicFilter = new TieredStoreTopicBlackListFilter(); + Assert.assertTrue(topicFilter.filterTopic("")); + Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX + "_Topic")); + + String topicName = "WhiteTopic"; + Assert.assertFalse(topicFilter.filterTopic(topicName)); + topicFilter.addTopicToWhiteList(topicName); + Assert.assertTrue(topicFilter.filterTopic(topicName)); + } +} \ No newline at end of file