This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d2fd068be7 fix: registerProducer should not be affected by concurrent 
scanNotAct… (#8847)
d2fd068be7 is described below

commit d2fd068be77d06495d810b799d29c2d1f222e4dc
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Wed Oct 23 09:37:17 2024 +0800

    fix: registerProducer should not be affected by concurrent scanNotAct… 
(#8847)
    
    * fix: registerProducer should not be affected by concurrent 
scanNotActiveChannel
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * chore: fix code format and make CI pass
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../rocketmq/broker/client/ProducerManager.java    | 36 ++++++++++++----------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 011c9e4be3..2c3acb6ba9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -71,15 +71,15 @@ public class ProducerManager {
     public ProducerTableInfo getProducerTable() {
         Map<String, List<ProducerInfo>> map = new HashMap<>();
         for (String group : this.groupChannelTable.keySet()) {
-            for (Entry<Channel, ClientChannelInfo> entry: 
this.groupChannelTable.get(group).entrySet()) {
+            for (Entry<Channel, ClientChannelInfo> entry : 
this.groupChannelTable.get(group).entrySet()) {
                 ClientChannelInfo clientChannelInfo = entry.getValue();
                 if (map.containsKey(group)) {
                     map.get(group).add(new ProducerInfo(
-                            clientChannelInfo.getClientId(),
-                            
clientChannelInfo.getChannel().remoteAddress().toString(),
-                            clientChannelInfo.getLanguage(),
-                            clientChannelInfo.getVersion(),
-                            clientChannelInfo.getLastUpdateTimestamp()
+                        clientChannelInfo.getClientId(),
+                        
clientChannelInfo.getChannel().remoteAddress().toString(),
+                        clientChannelInfo.getLanguage(),
+                        clientChannelInfo.getVersion(),
+                        clientChannelInfo.getLastUpdateTimestamp()
                     ));
                 } else {
                     map.put(group, new ArrayList<>(Collections.singleton(new 
ProducerInfo(
@@ -118,8 +118,8 @@ public class ProducerManager {
                         clientChannelTable.remove(info.getClientId());
                     }
                     log.warn(
-                            "ProducerManager#scanNotActiveChannel: remove 
expired channel[{}] from ProducerManager groupChannelTable, producer group 
name: {}",
-                            
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+                        "ProducerManager#scanNotActiveChannel: remove expired 
channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+                        
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
                     
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, info);
                     RemotingHelper.closeChannel(info.getChannel());
                 }
@@ -144,8 +144,8 @@ public class ProducerManager {
                     clientChannelTable.remove(clientChannelInfo.getClientId());
                     removed = true;
                     log.info(
-                            "NETTY EVENT: remove channel[{}][{}] from 
ProducerManager groupChannelTable, producer group: {}",
-                            clientChannelInfo.toString(), remoteAddr, group);
+                        "NETTY EVENT: remove channel[{}][{}] from 
ProducerManager groupChannelTable, producer group: {}",
+                        clientChannelInfo.toString(), remoteAddr, group);
                     
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, 
clientChannelInfo);
                     if (clientChannelInfoTable.isEmpty()) {
                         ConcurrentMap<Channel, ClientChannelInfo> 
oldGroupTable = this.groupChannelTable.remove(group);
@@ -167,21 +167,26 @@ public class ProducerManager {
         ConcurrentMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
         if (null == channelTable) {
             channelTable = new ConcurrentHashMap<>();
+            // Make sure channelTable will NOT be cleaned by 
#scanNotActiveChannel
+            channelTable.put(clientChannelInfo.getChannel(), 
clientChannelInfo);
             ConcurrentMap<Channel, ClientChannelInfo> prev = 
this.groupChannelTable.putIfAbsent(group, channelTable);
-            if (null != prev) {
+            if (null == prev) {
+                // Add client-id to channel mapping for new producer group
+                clientChannelTable.put(clientChannelInfo.getClientId(), 
clientChannelInfo.getChannel());
+            } else {
                 channelTable = prev;
             }
         }
 
         clientChannelInfoFound = 
channelTable.get(clientChannelInfo.getChannel());
+        // Add client-channel info to existing producer group
         if (null == clientChannelInfoFound) {
             channelTable.put(clientChannelInfo.getChannel(), 
clientChannelInfo);
             clientChannelTable.put(clientChannelInfo.getClientId(), 
clientChannelInfo.getChannel());
-            log.info("new producer connected, group: {} channel: {}", group,
-                    clientChannelInfo.toString());
+            log.info("new producer connected, group: {} channel: {}", group, 
clientChannelInfo.toString());
         }
 
-
+        // Refresh existing client-channel-info update-timestamp
         if (clientChannelInfoFound != null) {
             
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
         }
@@ -193,8 +198,7 @@ public class ProducerManager {
             ClientChannelInfo old = 
channelTable.remove(clientChannelInfo.getChannel());
             clientChannelTable.remove(clientChannelInfo.getClientId());
             if (old != null) {
-                log.info("unregister a producer[{}] from groupChannelTable 
{}", group,
-                        clientChannelInfo.toString());
+                log.info("unregister a producer[{}] from groupChannelTable 
{}", group, clientChannelInfo.toString());
                 
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, 
clientChannelInfo);
             }
 

Reply via email to