This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 07f13fd883 fix: remove unnecessary synchronized to improve concurrency (#8840) 07f13fd883 is described below commit 07f13fd883e87e40d7de4827e28913e617fb9832 Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Mon Oct 21 09:25:55 2024 +0800 fix: remove unnecessary synchronized to improve concurrency (#8840) Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- .../rocketmq/broker/client/ProducerManager.java | 45 +++++++++++----------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index f9fe1193e2..011c9e4be3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.common.constant.LoggerName; @@ -39,11 +40,11 @@ public class ProducerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3; - private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable = + private final ConcurrentMap<String /* group name */, ConcurrentMap<Channel, ClientChannelInfo>> groupChannelTable = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>(); protected final BrokerStatsManager brokerStatsManager; - private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); + private final PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); private final List<ProducerChangeListener> producerChangeListenerList = new CopyOnWriteArrayList<>(); public ProducerManager() { @@ -63,7 +64,7 @@ public class ProducerManager { return channels != null && !channels.isEmpty(); } - public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { + public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>> getGroupChannelTable() { return groupChannelTable; } @@ -95,13 +96,13 @@ public class ProducerManager { } public void scanNotActiveChannel() { - Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator(); + Iterator<Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry = iterator.next(); + Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry = iterator.next(); final String group = entry.getKey(); - final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); + final ConcurrentMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator(); while (it.hasNext()) { @@ -132,16 +133,13 @@ public class ProducerManager { } } - public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { + public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { boolean removed = false; if (channel != null) { - for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable - .entrySet()) { + for (final Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) { final String group = entry.getKey(); - final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable = - entry.getValue(); - final ClientChannelInfo clientChannelInfo = - clientChannelInfoTable.remove(channel); + final ConcurrentMap<Channel, ClientChannelInfo> clientChannelInfoTable = entry.getValue(); + final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel); if (clientChannelInfo != null) { clientChannelTable.remove(clientChannelInfo.getClientId()); removed = true; @@ -150,7 +148,7 @@ public class ProducerManager { clientChannelInfo.toString(), remoteAddr, group); callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo); if (clientChannelInfoTable.isEmpty()) { - ConcurrentHashMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group); + ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group); if (oldGroupTable != null) { log.info("unregister a producer group[{}] from groupChannelTable", group); callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null); @@ -163,13 +161,16 @@ public class ProducerManager { return removed; } - public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { - ClientChannelInfo clientChannelInfoFound = null; + public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { + ClientChannelInfo clientChannelInfoFound; - ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); + ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null == channelTable) { channelTable = new ConcurrentHashMap<>(); - this.groupChannelTable.put(group, channelTable); + ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable); + if (null != prev) { + channelTable = prev; + } } clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); @@ -186,8 +187,8 @@ public class ProducerManager { } } - public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { - ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); + public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { + ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null != channelTable && !channelTable.isEmpty()) { ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); clientChannelTable.remove(clientChannelInfo.getClientId()); @@ -210,7 +211,7 @@ public class ProducerManager { return null; } List<Channel> channelList; - ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId); + ConcurrentMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId); if (channelClientChannelInfoHashMap != null) { channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet()); } else {