This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3696be0632 [ISSUE #8463] Some statistical items should also be deleted to prevent memory leakage when a topic or group is deleted (#8464) 3696be0632 is described below commit 3696be06321c24b4c534a5a6299fd587710b5de4 Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Thu Aug 1 10:04:11 2024 +0800 [ISSUE #8463] Some statistical items should also be deleted to prevent memory leakage when a topic or group is deleted (#8464) * Some important statistical items should also be deleted to prevent memory leakage when a topic or group is deleted * Add UTs --- .../org/apache/rocketmq/common/stats/Stats.java | 3 +++ .../rocketmq/store/stats/BrokerStatsManager.java | 24 +++++++++++++--------- .../store/stats/BrokerStatsManagerTest.java | 13 ++++++++++++ 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java b/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java index b70f96e412..f67ccf9ae9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java @@ -44,4 +44,7 @@ public class Stats { public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE"; public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME"; public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY"; + public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY"; + public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS"; + public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS"; } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index c165d333fd..a6c33f6131 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -69,9 +69,9 @@ public class BrokerStatsManager { @Deprecated public static final String COMMERCIAL_PERM_FAILURES = Stats.COMMERCIAL_PERM_FAILURES; // Send message latency - public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY"; - public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS"; - public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS"; + @Deprecated public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY"; + @Deprecated public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS"; + @Deprecated public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS"; public static final String DLQ_PUT_NUMS = "DLQ_PUT_NUMS"; public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS"; public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS"; @@ -179,10 +179,10 @@ public class BrokerStatsManager { this.statsTable.put(Stats.TOPIC_PUT_SIZE, new StatsItemSet(Stats.TOPIC_PUT_SIZE, this.scheduledExecutorService, log)); this.statsTable.put(Stats.GROUP_GET_NUMS, new StatsItemSet(Stats.GROUP_GET_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(Stats.GROUP_GET_SIZE, new StatsItemSet(Stats.GROUP_GET_SIZE, this.scheduledExecutorService, log)); - this.statsTable.put(GROUP_ACK_NUMS, new StatsItemSet(GROUP_ACK_NUMS, this.scheduledExecutorService, log)); - this.statsTable.put(GROUP_CK_NUMS, new StatsItemSet(GROUP_CK_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(Stats.GROUP_ACK_NUMS, new StatsItemSet(Stats.GROUP_ACK_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(Stats.GROUP_CK_NUMS, new StatsItemSet(Stats.GROUP_CK_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(Stats.GROUP_GET_LATENCY, new StatsItemSet(Stats.GROUP_GET_LATENCY, this.scheduledExecutorService, log)); - this.statsTable.put(TOPIC_PUT_LATENCY, new StatsItemSet(TOPIC_PUT_LATENCY, this.scheduledExecutorService, log)); + this.statsTable.put(Stats.TOPIC_PUT_LATENCY, new StatsItemSet(Stats.TOPIC_PUT_LATENCY, this.scheduledExecutorService, log)); this.statsTable.put(Stats.SNDBCK_PUT_NUMS, new StatsItemSet(Stats.SNDBCK_PUT_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(DLQ_PUT_NUMS, new StatsItemSet(DLQ_PUT_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(Stats.BROKER_PUT_NUMS, new StatsItemSet(Stats.BROKER_PUT_NUMS, this.scheduledExecutorService, log)); @@ -338,10 +338,13 @@ public class BrokerStatsManager { } this.statsTable.get(Stats.GROUP_GET_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(Stats.GROUP_GET_SIZE).delValueByPrefixKey(topic, "@"); + this.statsTable.get(Stats.GROUP_CK_NUMS).delValueByPrefixKey(topic, "@"); + this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@"); this.statsTable.get(Stats.SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(Stats.GROUP_GET_LATENCY).delValueByInfixKey(topic, "@"); + this.statsTable.get(Stats.TOPIC_PUT_LATENCY).delValueBySuffixKey(topic, "@"); this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@"); this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@"); } @@ -349,6 +352,8 @@ public class BrokerStatsManager { public void onGroupDeleted(final String group) { this.statsTable.get(Stats.GROUP_GET_NUMS).delValueBySuffixKey(group, "@"); this.statsTable.get(Stats.GROUP_GET_SIZE).delValueBySuffixKey(group, "@"); + this.statsTable.get(Stats.GROUP_CK_NUMS).delValueBySuffixKey(group, "@"); + this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueBySuffixKey(group, "@"); if (enableQueueStat) { this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueBySuffixKey(group, "@"); this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueBySuffixKey(group, "@"); @@ -434,12 +439,12 @@ public class BrokerStatsManager { public void incGroupCkNums(final String group, final String topic, final int incValue) { final String statsKey = buildStatsKey(topic, group); - this.statsTable.get(GROUP_CK_NUMS).addValue(statsKey, incValue, 1); + this.statsTable.get(Stats.GROUP_CK_NUMS).addValue(statsKey, incValue, 1); } public void incGroupAckNums(final String group, final String topic, final int incValue) { final String statsKey = buildStatsKey(topic, group); - this.statsTable.get(GROUP_ACK_NUMS).addValue(statsKey, incValue, 1); + this.statsTable.get(Stats.GROUP_ACK_NUMS).addValue(statsKey, incValue, 1); } public String buildStatsKey(String topic, String group) { @@ -509,9 +514,8 @@ public class BrokerStatsManager { statsKey = new StringBuilder(6); } statsKey.append(queueId).append("@").append(topic); - this.statsTable.get(TOPIC_PUT_LATENCY).addValue(statsKey.toString(), incValue, 1); + this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey.toString(), incValue, 1); } - public void incBrokerPutNums() { this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1); } diff --git a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java index a602da0939..058ad0b020 100644 --- a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java @@ -24,6 +24,8 @@ import org.junit.Before; import org.junit.Test; import static org.apache.rocketmq.common.stats.Stats.BROKER_PUT_NUMS; +import static org.apache.rocketmq.common.stats.Stats.GROUP_ACK_NUMS; +import static org.apache.rocketmq.common.stats.Stats.GROUP_CK_NUMS; import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_SIZE; import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_TIME; import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_LATENCY; @@ -34,6 +36,7 @@ import static org.apache.rocketmq.common.stats.Stats.QUEUE_GET_SIZE; import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_NUMS; import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_SIZE; import static org.apache.rocketmq.common.stats.Stats.SNDBCK_PUT_NUMS; +import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_LATENCY; import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_NUMS; import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_SIZE; import static org.assertj.core.api.Assertions.assertThat; @@ -139,8 +142,11 @@ public class BrokerStatsManagerTest { brokerStatsManager.incTopicPutSize(TOPIC, 100); brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID); brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100); + brokerStatsManager.incTopicPutLatency(TOPIC, QUEUE_ID, 10); brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1); brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100); + brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1); + brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1); brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1); brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100); brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC); @@ -162,6 +168,9 @@ public class BrokerStatsManagerTest { Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_LATENCY, QUEUE_ID + "@" + TOPIC)); } @Test @@ -174,6 +183,8 @@ public class BrokerStatsManagerTest { brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1); brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L); brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L); + brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1); + brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1); brokerStatsManager.onGroupDeleted(GROUP_NAME); @@ -185,6 +196,8 @@ public class BrokerStatsManagerTest { Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, TOPIC + "@" + GROUP_NAME)); } @Test