This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 07f13fd883 fix: remove unnecessary synchronized to improve concurrency 
(#8840)
07f13fd883 is described below

commit 07f13fd883e87e40d7de4827e28913e617fb9832
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Mon Oct 21 09:25:55 2024 +0800

    fix: remove unnecessary synchronized to improve concurrency (#8840)
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../rocketmq/broker/client/ProducerManager.java    | 45 +++++++++++-----------
 1 file changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index f9fe1193e2..011c9e4be3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -39,11 +40,11 @@ public class ProducerManager {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
-    private final ConcurrentHashMap<String /* group name */, 
ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
+    private final ConcurrentMap<String /* group name */, 
ConcurrentMap<Channel, ClientChannelInfo>> groupChannelTable =
         new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, Channel> clientChannelTable = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Channel> clientChannelTable = new 
ConcurrentHashMap<>();
     protected final BrokerStatsManager brokerStatsManager;
-    private PositiveAtomicCounter positiveAtomicCounter = new 
PositiveAtomicCounter();
+    private final PositiveAtomicCounter positiveAtomicCounter = new 
PositiveAtomicCounter();
     private final List<ProducerChangeListener> producerChangeListenerList = 
new CopyOnWriteArrayList<>();
 
     public ProducerManager() {
@@ -63,7 +64,7 @@ public class ProducerManager {
         return channels != null && !channels.isEmpty();
     }
 
-    public ConcurrentHashMap<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>> getGroupChannelTable() {
+    public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>> 
getGroupChannelTable() {
         return groupChannelTable;
     }
 
@@ -95,13 +96,13 @@ public class ProducerManager {
     }
 
     public void scanNotActiveChannel() {
-        Iterator<Map.Entry<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
+        Iterator<Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>>> 
iterator = this.groupChannelTable.entrySet().iterator();
 
         while (iterator.hasNext()) {
-            Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> 
entry = iterator.next();
+            Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry 
= iterator.next();
 
             final String group = entry.getKey();
-            final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = 
entry.getValue();
+            final ConcurrentMap<Channel, ClientChannelInfo> chlMap = 
entry.getValue();
 
             Iterator<Entry<Channel, ClientChannelInfo>> it = 
chlMap.entrySet().iterator();
             while (it.hasNext()) {
@@ -132,16 +133,13 @@ public class ProducerManager {
         }
     }
 
-    public synchronized boolean doChannelCloseEvent(final String remoteAddr, 
final Channel channel) {
+    public boolean doChannelCloseEvent(final String remoteAddr, final Channel 
channel) {
         boolean removed = false;
         if (channel != null) {
-            for (final Map.Entry<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable
-                    .entrySet()) {
+            for (final Map.Entry<String, ConcurrentMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
                 final String group = entry.getKey();
-                final ConcurrentHashMap<Channel, ClientChannelInfo> 
clientChannelInfoTable =
-                        entry.getValue();
-                final ClientChannelInfo clientChannelInfo =
-                        clientChannelInfoTable.remove(channel);
+                final ConcurrentMap<Channel, ClientChannelInfo> 
clientChannelInfoTable = entry.getValue();
+                final ClientChannelInfo clientChannelInfo = 
clientChannelInfoTable.remove(channel);
                 if (clientChannelInfo != null) {
                     clientChannelTable.remove(clientChannelInfo.getClientId());
                     removed = true;
@@ -150,7 +148,7 @@ public class ProducerManager {
                             clientChannelInfo.toString(), remoteAddr, group);
                     
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, 
clientChannelInfo);
                     if (clientChannelInfoTable.isEmpty()) {
-                        ConcurrentHashMap<Channel, ClientChannelInfo> 
oldGroupTable = this.groupChannelTable.remove(group);
+                        ConcurrentMap<Channel, ClientChannelInfo> 
oldGroupTable = this.groupChannelTable.remove(group);
                         if (oldGroupTable != null) {
                             log.info("unregister a producer group[{}] from 
groupChannelTable", group);
                             
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
@@ -163,13 +161,16 @@ public class ProducerManager {
         return removed;
     }
 
-    public synchronized void registerProducer(final String group, final 
ClientChannelInfo clientChannelInfo) {
-        ClientChannelInfo clientChannelInfoFound = null;
+    public void registerProducer(final String group, final ClientChannelInfo 
clientChannelInfo) {
+        ClientChannelInfo clientChannelInfoFound;
 
-        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
+        ConcurrentMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
         if (null == channelTable) {
             channelTable = new ConcurrentHashMap<>();
-            this.groupChannelTable.put(group, channelTable);
+            ConcurrentMap<Channel, ClientChannelInfo> prev = 
this.groupChannelTable.putIfAbsent(group, channelTable);
+            if (null != prev) {
+                channelTable = prev;
+            }
         }
 
         clientChannelInfoFound = 
channelTable.get(clientChannelInfo.getChannel());
@@ -186,8 +187,8 @@ public class ProducerManager {
         }
     }
 
-    public synchronized void unregisterProducer(final String group, final 
ClientChannelInfo clientChannelInfo) {
-        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
+    public void unregisterProducer(final String group, final ClientChannelInfo 
clientChannelInfo) {
+        ConcurrentMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
         if (null != channelTable && !channelTable.isEmpty()) {
             ClientChannelInfo old = 
channelTable.remove(clientChannelInfo.getChannel());
             clientChannelTable.remove(clientChannelInfo.getClientId());
@@ -210,7 +211,7 @@ public class ProducerManager {
             return null;
         }
         List<Channel> channelList;
-        ConcurrentHashMap<Channel, ClientChannelInfo> 
channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+        ConcurrentMap<Channel, ClientChannelInfo> 
channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
         if (channelClientChannelInfoHashMap != null) {
             channelList = new 
ArrayList<>(channelClientChannelInfoHashMap.keySet());
         } else {

Reply via email to