This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3696be0632 [ISSUE #8463] Some statistical items should also be deleted 
to prevent memory leakage when a topic or group is deleted (#8464)
3696be0632 is described below

commit 3696be06321c24b4c534a5a6299fd587710b5de4
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Thu Aug 1 10:04:11 2024 +0800

    [ISSUE #8463] Some statistical items should also be deleted to prevent 
memory leakage when a topic or group is deleted (#8464)
    
    * Some important statistical items should also be deleted to prevent memory 
leakage when a topic or group is deleted
    
    * Add UTs
---
 .../org/apache/rocketmq/common/stats/Stats.java    |  3 +++
 .../rocketmq/store/stats/BrokerStatsManager.java   | 24 +++++++++++++---------
 .../store/stats/BrokerStatsManagerTest.java        | 13 ++++++++++++
 3 files changed, 30 insertions(+), 10 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
index b70f96e412..f67ccf9ae9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
@@ -44,4 +44,7 @@ public class Stats {
     public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
     public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
     public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";
+    public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
+    public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
+    public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index c165d333fd..a6c33f6131 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -69,9 +69,9 @@ public class BrokerStatsManager {
     @Deprecated public static final String COMMERCIAL_PERM_FAILURES = 
Stats.COMMERCIAL_PERM_FAILURES;
 
     // Send message latency
-    public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
-    public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
-    public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
+    @Deprecated public static final String TOPIC_PUT_LATENCY = 
"TOPIC_PUT_LATENCY";
+    @Deprecated public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
+    @Deprecated public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
     public static final String DLQ_PUT_NUMS = "DLQ_PUT_NUMS";
     public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
     public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
@@ -179,10 +179,10 @@ public class BrokerStatsManager {
         this.statsTable.put(Stats.TOPIC_PUT_SIZE, new 
StatsItemSet(Stats.TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_NUMS, new 
StatsItemSet(Stats.GROUP_GET_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_SIZE, new 
StatsItemSet(Stats.GROUP_GET_SIZE, this.scheduledExecutorService, log));
-        this.statsTable.put(GROUP_ACK_NUMS, new StatsItemSet(GROUP_ACK_NUMS, 
this.scheduledExecutorService, log));
-        this.statsTable.put(GROUP_CK_NUMS, new StatsItemSet(GROUP_CK_NUMS, 
this.scheduledExecutorService, log));
+        this.statsTable.put(Stats.GROUP_ACK_NUMS, new 
StatsItemSet(Stats.GROUP_ACK_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(Stats.GROUP_CK_NUMS, new 
StatsItemSet(Stats.GROUP_CK_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_LATENCY, new 
StatsItemSet(Stats.GROUP_GET_LATENCY, this.scheduledExecutorService, log));
-        this.statsTable.put(TOPIC_PUT_LATENCY, new 
StatsItemSet(TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
+        this.statsTable.put(Stats.TOPIC_PUT_LATENCY, new 
StatsItemSet(Stats.TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
         this.statsTable.put(Stats.SNDBCK_PUT_NUMS, new 
StatsItemSet(Stats.SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(DLQ_PUT_NUMS, new StatsItemSet(DLQ_PUT_NUMS, 
this.scheduledExecutorService, log));
         this.statsTable.put(Stats.BROKER_PUT_NUMS, new 
StatsItemSet(Stats.BROKER_PUT_NUMS, this.scheduledExecutorService, log));
@@ -338,10 +338,13 @@ public class BrokerStatsManager {
         }
         this.statsTable.get(Stats.GROUP_GET_NUMS).delValueByPrefixKey(topic, 
"@");
         this.statsTable.get(Stats.GROUP_GET_SIZE).delValueByPrefixKey(topic, 
"@");
+        this.statsTable.get(Stats.GROUP_CK_NUMS).delValueByPrefixKey(topic, 
"@");
+        this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueByPrefixKey(topic, 
"@");
         this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueByPrefixKey(topic, 
"@");
         this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueByPrefixKey(topic, 
"@");
         this.statsTable.get(Stats.SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, 
"@");
         this.statsTable.get(Stats.GROUP_GET_LATENCY).delValueByInfixKey(topic, 
"@");
+        
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).delValueBySuffixKey(topic, "@");
         this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
         this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
     }
@@ -349,6 +352,8 @@ public class BrokerStatsManager {
     public void onGroupDeleted(final String group) {
         this.statsTable.get(Stats.GROUP_GET_NUMS).delValueBySuffixKey(group, 
"@");
         this.statsTable.get(Stats.GROUP_GET_SIZE).delValueBySuffixKey(group, 
"@");
+        this.statsTable.get(Stats.GROUP_CK_NUMS).delValueBySuffixKey(group, 
"@");
+        this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueBySuffixKey(group, 
"@");
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
             
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
@@ -434,12 +439,12 @@ public class BrokerStatsManager {
 
     public void incGroupCkNums(final String group, final String topic, final 
int incValue) {
         final String statsKey = buildStatsKey(topic, group);
-        this.statsTable.get(GROUP_CK_NUMS).addValue(statsKey, incValue, 1);
+        this.statsTable.get(Stats.GROUP_CK_NUMS).addValue(statsKey, incValue, 
1);
     }
 
     public void incGroupAckNums(final String group, final String topic, final 
int incValue) {
         final String statsKey = buildStatsKey(topic, group);
-        this.statsTable.get(GROUP_ACK_NUMS).addValue(statsKey, incValue, 1);
+        this.statsTable.get(Stats.GROUP_ACK_NUMS).addValue(statsKey, incValue, 
1);
     }
 
     public String buildStatsKey(String topic, String group) {
@@ -509,9 +514,8 @@ public class BrokerStatsManager {
             statsKey = new StringBuilder(6);
         }
         statsKey.append(queueId).append("@").append(topic);
-        this.statsTable.get(TOPIC_PUT_LATENCY).addValue(statsKey.toString(), 
incValue, 1);
+        
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey.toString(), 
incValue, 1);
     }
-
     public void incBrokerPutNums() {
         
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
index a602da0939..058ad0b020 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
@@ -24,6 +24,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.rocketmq.common.stats.Stats.BROKER_PUT_NUMS;
+import static org.apache.rocketmq.common.stats.Stats.GROUP_ACK_NUMS;
+import static org.apache.rocketmq.common.stats.Stats.GROUP_CK_NUMS;
 import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_SIZE;
 import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_TIME;
 import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_LATENCY;
@@ -34,6 +36,7 @@ import static 
org.apache.rocketmq.common.stats.Stats.QUEUE_GET_SIZE;
 import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_NUMS;
 import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_SIZE;
 import static org.apache.rocketmq.common.stats.Stats.SNDBCK_PUT_NUMS;
+import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_LATENCY;
 import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_NUMS;
 import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -139,8 +142,11 @@ public class BrokerStatsManagerTest {
         brokerStatsManager.incTopicPutSize(TOPIC, 100);
         brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
         brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
+        brokerStatsManager.incTopicPutLatency(TOPIC, QUEUE_ID, 10);
         brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
         brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+        brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
+        brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);
         brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
         brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
         brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
@@ -162,6 +168,9 @@ public class BrokerStatsManagerTest {
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, 
"1@" + TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, 
"1@" + TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, 
"1@" + TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC 
+ "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, 
TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_LATENCY, 
QUEUE_ID + "@" + TOPIC));
     }
 
     @Test
@@ -174,6 +183,8 @@ public class BrokerStatsManagerTest {
         brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
         brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
         brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
+        brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
+        brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);
 
         brokerStatsManager.onGroupDeleted(GROUP_NAME);
 
@@ -185,6 +196,8 @@ public class BrokerStatsManagerTest {
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, 
"1@" + TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, 
"1@" + TOPIC + "@" + GROUP_NAME));
         Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, 
"1@" + TOPIC + "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC 
+ "@" + GROUP_NAME));
+        Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, 
TOPIC + "@" + GROUP_NAME));
     }
 
     @Test

Reply via email to