This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4a15256f37 [ISSUE #9300] Periodic cleanup of inactive items in 
StatsItemSet (#9301)
4a15256f37 is described below

commit 4a15256f378a88253bb1e77b27fa02eb85d57ebd
Author: ymwneu <ymw...@126.com>
AuthorDate: Tue Apr 1 13:43:56 2025 +0800

    [ISSUE #9300] Periodic cleanup of inactive items in StatsItemSet (#9301)
---
 .../org/apache/rocketmq/common/BrokerConfig.java   | 10 +++++
 .../rocketmq/common/stats/MomentStatsItem.java     |  9 +++++
 .../rocketmq/common/stats/MomentStatsItemSet.java  | 23 ++++++++++++
 .../apache/rocketmq/common/stats/StatsItem.java    |  9 +++++
 .../apache/rocketmq/common/stats/StatsItemSet.java | 19 ++++++++++
 .../rocketmq/store/stats/BrokerStatsManager.java   | 43 +++++++++++++++++++++-
 6 files changed, 111 insertions(+), 2 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index dd34544935..b7ec944505 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -130,6 +130,8 @@ public class BrokerConfig extends BrokerIdentity {
     private boolean accountStatsEnable = true;
     private boolean accountStatsPrintZeroValues = true;
 
+    private int maxStatsIdleTimeInMinutes = -1;
+
     private boolean transferMsgByHeap = true;
 
     private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
@@ -1535,6 +1537,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.accountStatsPrintZeroValues = accountStatsPrintZeroValues;
     }
 
+    public int getMaxStatsIdleTimeInMinutes() {
+        return maxStatsIdleTimeInMinutes;
+    }
+
+    public void setMaxStatsIdleTimeInMinutes(int maxStatsIdleTimeInMinutes) {
+        this.maxStatsIdleTimeInMinutes = maxStatsIdleTimeInMinutes;
+    }
+
     public boolean isLockInStrictMode() {
         return lockInStrictMode;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
index 71c796b283..559bb77953 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
@@ -31,6 +31,7 @@ public class MomentStatsItem {
     private final String statsKey;
     private final ScheduledExecutorService scheduledExecutorService;
     private final Logger log;
+    private long lastUpdateTimestamp = System.currentTimeMillis();
 
     public MomentStatsItem(String statsName, String statsKey,
         ScheduledExecutorService scheduledExecutorService, Logger log) {
@@ -72,4 +73,12 @@ public class MomentStatsItem {
     public String getStatsName() {
         return statsName;
     }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
index a4571d7b8a..fd65351a54 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
@@ -24,9 +24,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 public class MomentStatsItemSet {
+    private static final Logger COMMERCIAL_LOG = 
LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
     private final ConcurrentMap<String/* key */, MomentStatsItem> 
statsItemTable =
         new ConcurrentHashMap<>(128);
     private final String statsName;
@@ -72,6 +75,13 @@ public class MomentStatsItemSet {
     public void setValue(final String statsKey, final int value) {
         MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
         statsItem.getValue().set(value);
+        statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
+    }
+
+    public void setValue(final String statsKey, final long value) {
+        MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+        statsItem.getValue().set(value);
+        statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
     }
 
     public void delValueByInfixKey(final String statsKey, String separator) {
@@ -109,4 +119,17 @@ public class MomentStatsItemSet {
 
         return statsItem;
     }
+
+    public void cleanResource(int maxStatsIdleTimeInMinutes) {
+        COMMERCIAL_LOG.info("CleanStatisticItem: kind:{}, size:{}", statsName, 
this.statsItemTable.size());
+        Iterator<Entry<String, MomentStatsItem>> it = 
this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MomentStatsItem> next = it.next();
+            MomentStatsItem statsItem = next.getValue();
+            if (System.currentTimeMillis() - 
statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) {
+                it.remove();
+                COMMERCIAL_LOG.info("CleanStatisticItem: removeKind:{}, 
removeKey:{}", statsName, statsItem.getStatsKey());
+            }
+        }
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index 8307c20aa6..cc5de16095 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -38,6 +38,7 @@ public class StatsItem {
 
     private final String statsName;
     private final String statsKey;
+    private long lastUpdateTimestamp = System.currentTimeMillis();
     private final ScheduledExecutorService scheduledExecutorService;
 
     private final Logger logger;
@@ -229,6 +230,14 @@ public class StatsItem {
     public LongAdder getTimes() {
         return times;
     }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
 }
 
 class CallSnapshot {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index c5b140b5cc..8ed1486e9f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -24,9 +24,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 public class StatsItemSet {
+    private static final Logger COMMERCIAL_LOG = 
LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
     private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
         new ConcurrentHashMap<>(128);
 
@@ -157,12 +160,14 @@ public class StatsItemSet {
         StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
         statsItem.getValue().add(incValue);
         statsItem.getTimes().add(incTimes);
+        statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
     }
 
     public void addRTValue(final String statsKey, final int incValue, final 
int incTimes) {
         StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
         statsItem.getValue().add(incValue);
         statsItem.getTimes().add(incTimes);
+        statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
     }
 
     public void delValue(final String statsKey) {
@@ -256,4 +261,18 @@ public class StatsItemSet {
     public StatsItem getStatsItem(final String statsKey) {
         return this.statsItemTable.get(statsKey);
     }
+
+
+    public void cleanResource(int maxStatsIdleTimeInMinutes) {
+        COMMERCIAL_LOG.info("CleanStatisticItemOld: kind:{}, size:{}", 
statsName, this.statsItemTable.size());
+        Iterator<Entry<String, StatsItem>> it = 
this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            StatsItem statsItem = next.getValue();
+            if (System.currentTimeMillis() - 
statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) {
+                it.remove();
+                COMMERCIAL_LOG.info("CleanStatisticItemOld: removeKind:{}, 
removeKey:{}", statsName, statsItem.getStatsKey());
+            }
+        }
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index a6c33f6131..530339c23b 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.store.stats;
 
 import java.util.HashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -121,6 +123,8 @@ public class BrokerStatsManager {
     public static final String CHANNEL_ACTIVITY_IDLE = "IDLE";
     public static final String CHANNEL_ACTIVITY_EXCEPTION = "EXCEPTION";
     public static final String CHANNEL_ACTIVITY_CLOSE = "CLOSE";
+    private static final String[] NEED_CLEAN_STATS_SET =
+            new String[] {TOPIC_PUT_NUMS, TOPIC_PUT_SIZE, GROUP_GET_NUMS, 
GROUP_GET_SIZE, SNDBCK_PUT_NUMS, GROUP_GET_LATENCY};
 
     /**
      * read disk follow stats
@@ -134,6 +138,7 @@ public class BrokerStatsManager {
     private ScheduledExecutorService scheduledExecutorService;
     private ScheduledExecutorService commercialExecutor;
     private ScheduledExecutorService accountExecutor;
+    private ScheduledExecutorService cleanResourceExecutor;
 
     private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
     private final String clusterName;
@@ -277,6 +282,12 @@ public class BrokerStatsManager {
                 return false;
             }
         });
+        cleanResourceExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                cleanAllResource();
+            }
+        }, 10, 10, TimeUnit.MINUTES);
     }
 
     private void initScheduleService() {
@@ -286,6 +297,8 @@ public class BrokerStatsManager {
             ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
         this.accountExecutor =
             ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
+        this.cleanResourceExecutor =
+                ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("CleanStatsResourceThread", true, brokerConfig));
     }
 
     public MomentStatsItemSet getMomentStatsItemSetFallSize() {
@@ -318,6 +331,7 @@ public class BrokerStatsManager {
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
         this.commercialExecutor.shutdown();
+        this.cleanResourceExecutor.shutdown();
     }
 
     public StatsItem getStatsItem(final String statsName, final String 
statsKey) {
@@ -589,13 +603,13 @@ public class BrokerStatsManager {
     public void recordDiskFallBehindTime(final String group, final String 
topic, final int queueId,
         final long fallBehind) {
         final String statsKey = buildStatsKey(queueId, topic, group);
-        
this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+        this.momentStatsItemSetFallTime.setValue(statsKey, fallBehind);
     }
 
     public void recordDiskFallBehindSize(final String group, final String 
topic, final int queueId,
         final long fallBehind) {
         final String statsKey = buildStatsKey(queueId, topic, group);
-        
this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+        this.momentStatsItemSetFallSize.setValue(statsKey, fallBehind);
     }
 
     public void incDLQStatValue(final String key, final String owner, final 
String group,
@@ -764,6 +778,31 @@ public class BrokerStatsManager {
         boolean online(String instanceId, String group, String topic);
     }
 
+
+    private void cleanAllResource() {
+        try {
+            int maxStatsIdleTimeInMinutes = brokerConfig != null ? 
brokerConfig.getMaxStatsIdleTimeInMinutes() : -1;
+            if (maxStatsIdleTimeInMinutes < 0) {
+                COMMERCIAL_LOG.info("[BrokerStatsManager#cleanAllResource] 
maxStatsIdleTimeInMinutes={}, no need to clean resource", 
maxStatsIdleTimeInMinutes);
+                return;
+            }
+            if (maxStatsIdleTimeInMinutes <= 10 && maxStatsIdleTimeInMinutes 
>= 0) {
+                maxStatsIdleTimeInMinutes = 30;
+            }
+            for (String statsKind : NEED_CLEAN_STATS_SET) {
+                StatsItemSet statsItemSet = this.statsTable.get(statsKind);
+                if (null == statsItemSet) {
+                    continue;
+                }
+                statsItemSet.cleanResource(maxStatsIdleTimeInMinutes);
+            }
+            
momentStatsItemSetFallSize.cleanResource(maxStatsIdleTimeInMinutes);
+            
momentStatsItemSetFallTime.cleanResource(maxStatsIdleTimeInMinutes);
+        } catch (Throwable throwable) {
+            COMMERCIAL_LOG.error("[BrokerStatsManager#cleanAllResource] clean 
resource error", throwable);
+        }
+    }
+
     public enum StatsType {
         SEND_SUCCESS,
         SEND_FAILURE,

Reply via email to