This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9f10d38a23 [ISSUE #9313] Add scheduled clean task. (#9314)
9f10d38a23 is described below

commit 9f10d38a231099cf10cc445acf5ace22b2c7007b
Author: Ji Juntao <[email protected]>
AuthorDate: Fri Apr 25 17:46:30 2025 +0800

    [ISSUE #9313] Add scheduled clean task. (#9314)
    
    * add scheduled clean task.
    
    * make code compatible to lmq.
    
    * make code compatible to lmq.
    
    * make code compatible to lmq.
    
    * make code compatible to lmq.
---
 .../java/org/apache/rocketmq/broker/BrokerController.java  | 12 ++++++++++++
 .../rocketmq/broker/processor/AdminBrokerProcessor.java    |  1 +
 .../broker/processor/AdminBrokerProcessorTest.java         |  2 ++
 .../java/org/apache/rocketmq/store/timer/TimerMetrics.java | 14 +++++++++++++-
 4 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2616e039e8..2083b769eb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -691,6 +691,18 @@ public class BrokerController {
             }
         }, 10, 1, TimeUnit.SECONDS);
 
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    
BrokerController.this.messageStore.getTimerMessageStore().getTimerMetrics()
+                            
.cleanMetrics(BrokerController.this.topicConfigManager.getTopicConfigTable().keySet());
+                } catch (Throwable e) {
+                    LOG.error("BrokerController: failed to clean unused timer 
metrics.", e);
+                }
+            }
+        }, 3, 3, TimeUnit.MINUTES);
+
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c747fa15af..7064485e29 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -825,6 +825,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
         
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
         
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
+        
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic);
     }
 
     private synchronized RemotingCommand 
updateAndCreateAccessConfig(ChannelHandlerContext ctx, RemotingCommand request) 
{
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 90c333b770..8418781b6b 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -259,6 +259,8 @@ public class AdminBrokerProcessorTest {
 
         
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig(topic));
         brokerController.getMessageStoreConfig().setTimerWheelEnable(false);
+        
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(timerMessageStore);
+        
when(this.timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
     }
 
     @After
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index 7f8fedd8a5..0d80dae3e3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -184,7 +185,8 @@ public class TimerMetrics extends ConfigManager {
         while (iterator.hasNext()) {
             Map.Entry<String, Metric> entry = iterator.next();
             final String topic = entry.getKey();
-            if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) {
+            if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)
+                    || topic.startsWith(MixAll.LMQ_PREFIX)) {
                 continue;
             }
             if (topics.contains(topic)) {
@@ -196,6 +198,16 @@ public class TimerMetrics extends ConfigManager {
         }
     }
 
+    public boolean removeTimingCount(String topic) {
+        try {
+            timingCount.remove(topic);
+        } catch (Exception e) {
+            log.error("removeTimingCount error", e);
+            return false;
+        }
+        return true;
+    }
+
     public static class TimerMetricsSerializeWrapper extends 
RemotingSerializable {
         private ConcurrentMap<String, Metric> timingCount =
                 new ConcurrentHashMap<>(1024);

Reply via email to