This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 4a15256f37 [ISSUE #9300] Periodic cleanup of inactive items in StatsItemSet (#9301) 4a15256f37 is described below commit 4a15256f378a88253bb1e77b27fa02eb85d57ebd Author: ymwneu <ymw...@126.com> AuthorDate: Tue Apr 1 13:43:56 2025 +0800 [ISSUE #9300] Periodic cleanup of inactive items in StatsItemSet (#9301) --- .../org/apache/rocketmq/common/BrokerConfig.java | 10 +++++ .../rocketmq/common/stats/MomentStatsItem.java | 9 +++++ .../rocketmq/common/stats/MomentStatsItemSet.java | 23 ++++++++++++ .../apache/rocketmq/common/stats/StatsItem.java | 9 +++++ .../apache/rocketmq/common/stats/StatsItemSet.java | 19 ++++++++++ .../rocketmq/store/stats/BrokerStatsManager.java | 43 +++++++++++++++++++++- 6 files changed, 111 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index dd34544935..b7ec944505 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -130,6 +130,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean accountStatsEnable = true; private boolean accountStatsPrintZeroValues = true; + private int maxStatsIdleTimeInMinutes = -1; + private boolean transferMsgByHeap = true; private String regionId = MixAll.DEFAULT_TRACE_REGION_ID; @@ -1535,6 +1537,14 @@ public class BrokerConfig extends BrokerIdentity { this.accountStatsPrintZeroValues = accountStatsPrintZeroValues; } + public int getMaxStatsIdleTimeInMinutes() { + return maxStatsIdleTimeInMinutes; + } + + public void setMaxStatsIdleTimeInMinutes(int maxStatsIdleTimeInMinutes) { + this.maxStatsIdleTimeInMinutes = maxStatsIdleTimeInMinutes; + } + public boolean isLockInStrictMode() { return lockInStrictMode; } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java index 71c796b283..559bb77953 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java @@ -31,6 +31,7 @@ public class MomentStatsItem { private final String statsKey; private final ScheduledExecutorService scheduledExecutorService; private final Logger log; + private long lastUpdateTimestamp = System.currentTimeMillis(); public MomentStatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, Logger log) { @@ -72,4 +73,12 @@ public class MomentStatsItem { public String getStatsName() { return statsName; } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index a4571d7b8a..fd65351a54 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -24,9 +24,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class MomentStatsItemSet { + private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME); private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable = new ConcurrentHashMap<>(128); private final String statsName; @@ -72,6 +75,13 @@ public class MomentStatsItemSet { public void setValue(final String statsKey, final int value) { MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey); statsItem.getValue().set(value); + statsItem.setLastUpdateTimestamp(System.currentTimeMillis()); + } + + public void setValue(final String statsKey, final long value) { + MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey); + statsItem.getValue().set(value); + statsItem.setLastUpdateTimestamp(System.currentTimeMillis()); } public void delValueByInfixKey(final String statsKey, String separator) { @@ -109,4 +119,17 @@ public class MomentStatsItemSet { return statsItem; } + + public void cleanResource(int maxStatsIdleTimeInMinutes) { + COMMERCIAL_LOG.info("CleanStatisticItem: kind:{}, size:{}", statsName, this.statsItemTable.size()); + Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, MomentStatsItem> next = it.next(); + MomentStatsItem statsItem = next.getValue(); + if (System.currentTimeMillis() - statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) { + it.remove(); + COMMERCIAL_LOG.info("CleanStatisticItem: removeKind:{}, removeKey:{}", statsName, statsItem.getStatsKey()); + } + } + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index 8307c20aa6..cc5de16095 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -38,6 +38,7 @@ public class StatsItem { private final String statsName; private final String statsKey; + private long lastUpdateTimestamp = System.currentTimeMillis(); private final ScheduledExecutorService scheduledExecutorService; private final Logger logger; @@ -229,6 +230,14 @@ public class StatsItem { public LongAdder getTimes() { return times; } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } } class CallSnapshot { diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index c5b140b5cc..8ed1486e9f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -24,9 +24,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class StatsItemSet { + private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME); private final ConcurrentMap<String/* key */, StatsItem> statsItemTable = new ConcurrentHashMap<>(128); @@ -157,12 +160,14 @@ public class StatsItemSet { StatsItem statsItem = this.getAndCreateStatsItem(statsKey); statsItem.getValue().add(incValue); statsItem.getTimes().add(incTimes); + statsItem.setLastUpdateTimestamp(System.currentTimeMillis()); } public void addRTValue(final String statsKey, final int incValue, final int incTimes) { StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey); statsItem.getValue().add(incValue); statsItem.getTimes().add(incTimes); + statsItem.setLastUpdateTimestamp(System.currentTimeMillis()); } public void delValue(final String statsKey) { @@ -256,4 +261,18 @@ public class StatsItemSet { public StatsItem getStatsItem(final String statsKey) { return this.statsItemTable.get(statsKey); } + + + public void cleanResource(int maxStatsIdleTimeInMinutes) { + COMMERCIAL_LOG.info("CleanStatisticItemOld: kind:{}, size:{}", statsName, this.statsItemTable.size()); + Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, StatsItem> next = it.next(); + StatsItem statsItem = next.getValue(); + if (System.currentTimeMillis() - statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) { + it.remove(); + COMMERCIAL_LOG.info("CleanStatisticItemOld: removeKind:{}, removeKey:{}", statsName, statsItem.getStatsKey()); + } + } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index a6c33f6131..530339c23b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.store.stats; import java.util.HashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -121,6 +123,8 @@ public class BrokerStatsManager { public static final String CHANNEL_ACTIVITY_IDLE = "IDLE"; public static final String CHANNEL_ACTIVITY_EXCEPTION = "EXCEPTION"; public static final String CHANNEL_ACTIVITY_CLOSE = "CLOSE"; + private static final String[] NEED_CLEAN_STATS_SET = + new String[] {TOPIC_PUT_NUMS, TOPIC_PUT_SIZE, GROUP_GET_NUMS, GROUP_GET_SIZE, SNDBCK_PUT_NUMS, GROUP_GET_LATENCY}; /** * read disk follow stats @@ -134,6 +138,7 @@ public class BrokerStatsManager { private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService commercialExecutor; private ScheduledExecutorService accountExecutor; + private ScheduledExecutorService cleanResourceExecutor; private final HashMap<String, StatsItemSet> statsTable = new HashMap<>(); private final String clusterName; @@ -277,6 +282,12 @@ public class BrokerStatsManager { return false; } }); + cleanResourceExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + cleanAllResource(); + } + }, 10, 10, TimeUnit.MINUTES); } private void initScheduleService() { @@ -286,6 +297,8 @@ public class BrokerStatsManager { ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig)); this.accountExecutor = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig)); + this.cleanResourceExecutor = + ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanStatsResourceThread", true, brokerConfig)); } public MomentStatsItemSet getMomentStatsItemSetFallSize() { @@ -318,6 +331,7 @@ public class BrokerStatsManager { public void shutdown() { this.scheduledExecutorService.shutdown(); this.commercialExecutor.shutdown(); + this.cleanResourceExecutor.shutdown(); } public StatsItem getStatsItem(final String statsName, final String statsKey) { @@ -589,13 +603,13 @@ public class BrokerStatsManager { public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) { final String statsKey = buildStatsKey(queueId, topic, group); - this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); + this.momentStatsItemSetFallTime.setValue(statsKey, fallBehind); } public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) { final String statsKey = buildStatsKey(queueId, topic, group); - this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); + this.momentStatsItemSetFallSize.setValue(statsKey, fallBehind); } public void incDLQStatValue(final String key, final String owner, final String group, @@ -764,6 +778,31 @@ public class BrokerStatsManager { boolean online(String instanceId, String group, String topic); } + + private void cleanAllResource() { + try { + int maxStatsIdleTimeInMinutes = brokerConfig != null ? brokerConfig.getMaxStatsIdleTimeInMinutes() : -1; + if (maxStatsIdleTimeInMinutes < 0) { + COMMERCIAL_LOG.info("[BrokerStatsManager#cleanAllResource] maxStatsIdleTimeInMinutes={}, no need to clean resource", maxStatsIdleTimeInMinutes); + return; + } + if (maxStatsIdleTimeInMinutes <= 10 && maxStatsIdleTimeInMinutes >= 0) { + maxStatsIdleTimeInMinutes = 30; + } + for (String statsKind : NEED_CLEAN_STATS_SET) { + StatsItemSet statsItemSet = this.statsTable.get(statsKind); + if (null == statsItemSet) { + continue; + } + statsItemSet.cleanResource(maxStatsIdleTimeInMinutes); + } + momentStatsItemSetFallSize.cleanResource(maxStatsIdleTimeInMinutes); + momentStatsItemSetFallTime.cleanResource(maxStatsIdleTimeInMinutes); + } catch (Throwable throwable) { + COMMERCIAL_LOG.error("[BrokerStatsManager#cleanAllResource] clean resource error", throwable); + } + } + public enum StatsType { SEND_SUCCESS, SEND_FAILURE,