This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e11e29419f [ISSUE #7308] Adding topic blacklist and filter in tiered 
storage module (#7310)
e11e29419f is described below

commit e11e29419f6e2d1d9673d0329e57b824ebf3da47
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Sep 6 20:42:24 2023 +0800

    [ISSUE #7308] Adding topic blacklist and filter in tiered storage module 
(#7310)
---
 .../rocketmq/tieredstore/TieredDispatcher.java     | 21 ++++++++--
 .../rocketmq/tieredstore/TieredMessageStore.java   |  1 +
 .../tieredstore/file/TieredFlatFileManager.java    | 17 +++++---
 .../provider/TieredStoreTopicBlackListFilter.java  | 45 ++++++++++++++++++++++
 .../provider/TieredStoreTopicFilter.java           | 25 ++++++++++++
 .../TieredStoreTopicBlackListFilterTest.java       | 36 +++++++++++++++++
 6 files changed, 136 insertions(+), 9 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 430c2b62eb..766c559e9c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -48,6 +48,8 @@ import 
org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
 import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import 
org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter;
+import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter;
 import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
 
     private static final Logger logger = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
 
+    private TieredStoreTopicFilter topicFilter;
     private final String brokerName;
     private final MessageStore defaultStore;
     private final TieredMessageStoreConfig storeConfig;
@@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         this.defaultStore = defaultStore;
         this.storeConfig = storeConfig;
         this.brokerName = storeConfig.getBrokerName();
+        this.topicFilter = new TieredStoreTopicBlackListFilter();
         this.tieredFlatFileManager = 
TieredFlatFileManager.getInstance(storeConfig);
         this.dispatchRequestReadMap = new ConcurrentHashMap<>();
         this.dispatchRequestWriteMap = new ConcurrentHashMap<>();
         this.dispatchTaskLock = new ReentrantLock();
         this.dispatchWriteLock = new ReentrantLock();
-        this.initScheduleTask();
     }
 
-    private void initScheduleTask() {
+    protected void initScheduleTask() {
         TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() 
->
             tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> 
{
                 if (!flatFile.getCompositeFlatFileLock().isLocked()) {
@@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             }), 30, 10, TimeUnit.SECONDS);
     }
 
+    public TieredStoreTopicFilter getTopicFilter() {
+        return topicFilter;
+    }
+
+    public void setTopicFilter(TieredStoreTopicFilter topicFilter) {
+        this.topicFilter = topicFilter;
+    }
+
     @Override
     public void dispatch(DispatchRequest request) {
         if (stopped) {
@@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         }
 
         String topic = request.getTopic();
-        if (TieredStoreUtil.isSystemTopic(topic)) {
+        if (topicFilter != null && topicFilter.filterTopic(topic)) {
             return;
         }
 
@@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             return;
         }
 
+        if (topicFilter != null && 
topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
+            return;
+        }
+
         if (flatFile.getDispatchOffset() == -1L) {
             return;
         }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 78e855f365..9fb1b2f01c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -90,6 +90,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         boolean loadNextStore = next.load();
         boolean result = loadFlatFile && loadNextStore;
         if (result) {
+            dispatcher.initScheduleTask();
             dispatcher.start();
         }
         return result;
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index e9ae4a5a52..7c744af3b9 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -134,21 +134,21 @@ public class TieredFlatFileManager {
     public void doCleanExpiredFile() {
         long expiredTimeStamp = System.currentTimeMillis() -
             
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
-        Random random = new Random();
         for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
-            int delay = random.nextInt(storeConfig.getMaxCommitJitter());
-            TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> {
+            TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
                 flatFile.getCompositeFlatFileLock().lock();
                 try {
                     flatFile.cleanExpiredFile(expiredTimeStamp);
                     flatFile.destroyExpiredFile();
                     if (flatFile.getConsumeQueueBaseOffset() == -1) {
+                        logger.info("Clean flatFile because file not 
initialized, topic={}, queueId={}",
+                            flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId());
                         destroyCompositeFile(flatFile.getMessageQueue());
                     }
                 } finally {
                     flatFile.getCompositeFlatFileLock().unlock();
                 }
-            }, delay, TimeUnit.MILLISECONDS);
+            });
         }
         if (indexFile != null) {
             indexFile.cleanExpiredFile(expiredTimeStamp);
@@ -218,8 +218,13 @@ public class TieredFlatFileManager {
                                 storeConfig.getBrokerName(), 
queueMetadata.getQueue().getQueueId()));
                             queueCount.incrementAndGet();
                         });
-                        logger.info("Recover TopicFlatFile, topic: {}, 
queueCount: {}, cost: {}ms",
-                            topicMetadata.getTopic(), queueCount.get(), 
subWatch.elapsed(TimeUnit.MILLISECONDS));
+
+                        if (queueCount.get() == 0L) {
+                            
metadataStore.deleteTopic(topicMetadata.getTopic());
+                        } else {
+                            logger.info("Recover TopicFlatFile, topic: {}, 
queueCount: {}, cost: {}ms",
+                                topicMetadata.getTopic(), queueCount.get(), 
subWatch.elapsed(TimeUnit.MILLISECONDS));
+                        }
                     } catch (Exception e) {
                         logger.error("Recover TopicFlatFile error, topic: {}", 
topicMetadata.getTopic(), e);
                     } finally {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
new file mode 100644
index 0000000000..50adbb7136
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter 
{
+
+    private final Set<String> topicBlackSet;
+
+    public TieredStoreTopicBlackListFilter() {
+        this.topicBlackSet = new HashSet<>();
+    }
+
+    @Override
+    public boolean filterTopic(String topicName) {
+        if (StringUtils.isBlank(topicName)) {
+            return true;
+        }
+        return TieredStoreUtil.isSystemTopic(topicName) || 
topicBlackSet.contains(topicName);
+    }
+
+    @Override
+    public void addTopicToWhiteList(String topicName) {
+        this.topicBlackSet.add(topicName);
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
new file mode 100644
index 0000000000..3f26b8b026
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+public interface TieredStoreTopicFilter {
+
+    boolean filterTopic(String topicName);
+
+    void addTopicToWhiteList(String topicName);
+}
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
new file mode 100644
index 0000000000..2bf48173c4
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.provider;
+
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredStoreTopicBlackListFilterTest {
+
+    @Test
+    public void filterTopicTest() {
+        TieredStoreTopicFilter topicFilter = new 
TieredStoreTopicBlackListFilter();
+        Assert.assertTrue(topicFilter.filterTopic(""));
+        
Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX + 
"_Topic"));
+
+        String topicName = "WhiteTopic";
+        Assert.assertFalse(topicFilter.filterTopic(topicName));
+        topicFilter.addTopicToWhiteList(topicName);
+        Assert.assertTrue(topicFilter.filterTopic(topicName));
+    }
+}
\ No newline at end of file

Reply via email to