This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d2fd068be7 fix: registerProducer should not be affected by concurrent scanNotAct… (#8847) d2fd068be7 is described below commit d2fd068be77d06495d810b799d29c2d1f222e4dc Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Wed Oct 23 09:37:17 2024 +0800 fix: registerProducer should not be affected by concurrent scanNotAct… (#8847) * fix: registerProducer should not be affected by concurrent scanNotActiveChannel Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * chore: fix code format and make CI pass Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --------- Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- .../rocketmq/broker/client/ProducerManager.java | 36 ++++++++++++---------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 011c9e4be3..2c3acb6ba9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -71,15 +71,15 @@ public class ProducerManager { public ProducerTableInfo getProducerTable() { Map<String, List<ProducerInfo>> map = new HashMap<>(); for (String group : this.groupChannelTable.keySet()) { - for (Entry<Channel, ClientChannelInfo> entry: this.groupChannelTable.get(group).entrySet()) { + for (Entry<Channel, ClientChannelInfo> entry : this.groupChannelTable.get(group).entrySet()) { ClientChannelInfo clientChannelInfo = entry.getValue(); if (map.containsKey(group)) { map.get(group).add(new ProducerInfo( - clientChannelInfo.getClientId(), - clientChannelInfo.getChannel().remoteAddress().toString(), - clientChannelInfo.getLanguage(), - clientChannelInfo.getVersion(), - clientChannelInfo.getLastUpdateTimestamp() + clientChannelInfo.getClientId(), + clientChannelInfo.getChannel().remoteAddress().toString(), + clientChannelInfo.getLanguage(), + clientChannelInfo.getVersion(), + clientChannelInfo.getLastUpdateTimestamp() )); } else { map.put(group, new ArrayList<>(Collections.singleton(new ProducerInfo( @@ -118,8 +118,8 @@ public class ProducerManager { clientChannelTable.remove(info.getClientId()); } log.warn( - "ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", - RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); + "ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", + RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, info); RemotingHelper.closeChannel(info.getChannel()); } @@ -144,8 +144,8 @@ public class ProducerManager { clientChannelTable.remove(clientChannelInfo.getClientId()); removed = true; log.info( - "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", - clientChannelInfo.toString(), remoteAddr, group); + "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", + clientChannelInfo.toString(), remoteAddr, group); callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo); if (clientChannelInfoTable.isEmpty()) { ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group); @@ -167,21 +167,26 @@ public class ProducerManager { ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null == channelTable) { channelTable = new ConcurrentHashMap<>(); + // Make sure channelTable will NOT be cleaned by #scanNotActiveChannel + channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable); - if (null != prev) { + if (null == prev) { + // Add client-id to channel mapping for new producer group + clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel()); + } else { channelTable = prev; } } clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); + // Add client-channel info to existing producer group if (null == clientChannelInfoFound) { channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel()); - log.info("new producer connected, group: {} channel: {}", group, - clientChannelInfo.toString()); + log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString()); } - + // Refresh existing client-channel-info update-timestamp if (clientChannelInfoFound != null) { clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); } @@ -193,8 +198,7 @@ public class ProducerManager { ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); clientChannelTable.remove(clientChannelInfo.getClientId()); if (old != null) { - log.info("unregister a producer[{}] from groupChannelTable {}", group, - clientChannelInfo.toString()); + log.info("unregister a producer[{}] from groupChannelTable {}", group, clientChannelInfo.toString()); callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo); }