This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b7f0162ff fix thread-safety problem of admin tools (#4843)
b7f0162ff is described below

commit b7f0162ffade63de1beec8e9271ae013401a666b
Author: HuiTong <[email protected]>
AuthorDate: Fri Aug 19 12:26:47 2022 +0800

    fix thread-safety problem of admin tools (#4843)
---
 .../java/org/apache/rocketmq/common/admin/ConsumeStats.java    | 10 ++++++----
 .../java/org/apache/rocketmq/common/admin/TopicStatsTable.java | 10 ++++++----
 .../org/apache/rocketmq/common/protocol/body/TopicList.java    |  5 +++--
 .../org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java |  4 ++--
 4 files changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java 
b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
index 6b1c49290..ae7e18dd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
@@ -16,14 +16,16 @@
  */
 package org.apache.rocketmq.common.admin;
 
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class ConsumeStats extends RemotingSerializable {
-    private HashMap<MessageQueue, OffsetWrapper> offsetTable = new 
HashMap<MessageQueue, OffsetWrapper>();
+    private Map<MessageQueue, OffsetWrapper> offsetTable = new 
ConcurrentHashMap<MessageQueue, OffsetWrapper>();
     private double consumeTps = 0;
 
     public long computeTotalDiff() {
@@ -39,11 +41,11 @@ public class ConsumeStats extends RemotingSerializable {
         return diffTotal;
     }
 
-    public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() {
+    public Map<MessageQueue, OffsetWrapper> getOffsetTable() {
         return offsetTable;
     }
 
-    public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper> 
offsetTable) {
+    public void setOffsetTable(Map<MessageQueue, OffsetWrapper> offsetTable) {
         this.offsetTable = offsetTable;
     }
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java 
b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
index 729075c06..42a8872dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
@@ -16,18 +16,20 @@
  */
 package org.apache.rocketmq.common.admin;
 
-import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicStatsTable extends RemotingSerializable {
-    private HashMap<MessageQueue, TopicOffset> offsetTable = new 
HashMap<MessageQueue, TopicOffset>();
+    private Map<MessageQueue, TopicOffset> offsetTable = new 
ConcurrentHashMap<MessageQueue, TopicOffset>();
 
-    public HashMap<MessageQueue, TopicOffset> getOffsetTable() {
+    public Map<MessageQueue, TopicOffset> getOffsetTable() {
         return offsetTable;
     }
 
-    public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable) 
{
+    public void setOffsetTable(Map<MessageQueue, TopicOffset> offsetTable) {
         this.offsetTable = offsetTable;
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
index baf831247..9b9144e2c 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
@@ -16,12 +16,13 @@
  */
 package org.apache.rocketmq.common.protocol.body;
 
-import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicList extends RemotingSerializable {
-    private Set<String> topicList = new HashSet<String>();
+    private Set<String> topicList = new CopyOnWriteArraySet<>();
     private String brokerAddr;
 
     public Set<String> getTopicList() {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 3cea455a2..bb08e0119 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -797,7 +797,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         }
 
         if (!hasConsumed) {
-            HashMap<MessageQueue, TopicOffset> topicStatus = 
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, 
timeoutMillis).getOffsetTable();
+            Map<MessageQueue, TopicOffset> topicStatus = 
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, 
timeoutMillis).getOffsetTable();
             for (int i = 0; i < queueData.getReadQueueNums(); i++) {
                 MessageQueue queue = new MessageQueue(topic, 
queueData.getBrokerName(), i);
                 OffsetWrapper offsetWrapper = new OffsetWrapper();
@@ -1107,7 +1107,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return adminToolExecute(new AdminToolHandler() {
             @Override
             public AdminToolResult doExecute() throws Exception {
-                final List<QueueTimeSpan> spanSet = new 
ArrayList<QueueTimeSpan>();
+                final List<QueueTimeSpan> spanSet = new 
CopyOnWriteArrayList<>();
                 TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
 
                 if (topicRouteData == null || topicRouteData.getBrokerDatas() 
== null || topicRouteData.getBrokerDatas().size() == 0) {

Reply via email to