This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9f10d38a23 [ISSUE #9313] Add scheduled clean task. (#9314)
9f10d38a23 is described below
commit 9f10d38a231099cf10cc445acf5ace22b2c7007b
Author: Ji Juntao <[email protected]>
AuthorDate: Fri Apr 25 17:46:30 2025 +0800
[ISSUE #9313] Add scheduled clean task. (#9314)
* add scheduled clean task.
* make code compatible to lmq.
* make code compatible to lmq.
* make code compatible to lmq.
* make code compatible to lmq.
---
.../java/org/apache/rocketmq/broker/BrokerController.java | 12 ++++++++++++
.../rocketmq/broker/processor/AdminBrokerProcessor.java | 1 +
.../broker/processor/AdminBrokerProcessorTest.java | 2 ++
.../java/org/apache/rocketmq/store/timer/TimerMetrics.java | 14 +++++++++++++-
4 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2616e039e8..2083b769eb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -691,6 +691,18 @@ public class BrokerController {
}
}, 10, 1, TimeUnit.SECONDS);
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
BrokerController.this.messageStore.getTimerMessageStore().getTimerMetrics()
+
.cleanMetrics(BrokerController.this.topicConfigManager.getTopicConfigTable().keySet());
+ } catch (Throwable e) {
+ LOG.error("BrokerController: failed to clean unused timer
metrics.", e);
+ }
+ }
+ }, 3, 3, TimeUnit.MINUTES);
+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c747fa15af..7064485e29 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -825,6 +825,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
+
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic);
}
private synchronized RemotingCommand
updateAndCreateAccessConfig(ChannelHandlerContext ctx, RemotingCommand request)
{
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 90c333b770..8418781b6b 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -259,6 +259,8 @@ public class AdminBrokerProcessorTest {
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig(topic));
brokerController.getMessageStoreConfig().setTimerWheelEnable(false);
+
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(timerMessageStore);
+
when(this.timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
}
@After
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index 7f8fedd8a5..0d80dae3e3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
@@ -184,7 +185,8 @@ public class TimerMetrics extends ConfigManager {
while (iterator.hasNext()) {
Map.Entry<String, Metric> entry = iterator.next();
final String topic = entry.getKey();
- if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) {
+ if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)
+ || topic.startsWith(MixAll.LMQ_PREFIX)) {
continue;
}
if (topics.contains(topic)) {
@@ -196,6 +198,16 @@ public class TimerMetrics extends ConfigManager {
}
}
+ public boolean removeTimingCount(String topic) {
+ try {
+ timingCount.remove(topic);
+ } catch (Exception e) {
+ log.error("removeTimingCount error", e);
+ return false;
+ }
+ return true;
+ }
+
public static class TimerMetricsSerializeWrapper extends
RemotingSerializable {
private ConcurrentMap<String, Metric> timingCount =
new ConcurrentHashMap<>(1024);