This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2043dd5034 [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE 
(#7494)
2043dd5034 is described below

commit 2043dd50341e0a4a2f254d72aa3109f4dfc97aac
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Tue Oct 24 10:29:43 2023 +0800

    [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE (#7494)
    
    * [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE for 
ChannelEventListener
    
    * introduce a new event NettyEventType.ACTIVE,
    
    * implement channelActive interface for 
NettyRemotingClient#NettyConnectManageHandler
    
    * add onChannelActive for ChannelEventListener interface.
    
    * Move send heartbeat to onChannelActive
---
 .../broker/client/ClientHousekeepingService.java     |  5 +++++
 .../client/impl/factory/MQClientInstance.java        | 20 ++++++++++++--------
 .../ContainerClientHouseKeepingService.java          | 11 ++++++++++-
 .../controller/BrokerHousekeepingService.java        |  5 +++++
 .../namesrv/routeinfo/BrokerHousekeepingService.java |  5 +++++
 .../proxy/remoting/ClientHousekeepingService.java    |  4 ++++
 .../rocketmq/remoting/ChannelEventListener.java      |  2 ++
 .../rocketmq/remoting/netty/NettyEventType.java      |  3 ++-
 .../remoting/netty/NettyRemotingAbstract.java        |  3 +++
 .../rocketmq/remoting/netty/NettyRemotingClient.java | 11 +++++++++++
 10 files changed, 59 insertions(+), 10 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index cbb81f632b..7878d0eec5 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -87,4 +87,9 @@ public class ClientHousekeepingService implements 
ChannelEventListener {
         
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, 
channel);
         this.brokerController.getBrokerStatsManager().incChannelIdleNum();
     }
+
+    @Override
+    public void onChannelActive(String remoteAddr, Channel channel) {
+
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 09534a1768..ba72a6dce7 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -159,14 +159,6 @@ public class MQClientInstance {
                 private final ConcurrentMap<String, HashMap<Long, String>> 
brokerAddrTable = MQClientInstance.this.brokerAddrTable;
                 @Override
                 public void onChannelConnect(String remoteAddr, Channel 
channel) {
-                    for (Map.Entry<String, HashMap<Long, String>> addressEntry 
: brokerAddrTable.entrySet()) {
-                        for (String address : 
addressEntry.getValue().values()) {
-                            if (address.equals(remoteAddr)) {
-                                sendHeartbeatToAllBrokerWithLockV2(false);
-                                break;
-                            }
-                        }
-                    }
                 }
 
                 @Override
@@ -180,6 +172,18 @@ public class MQClientInstance {
                 @Override
                 public void onChannelIdle(String remoteAddr, Channel channel) {
                 }
+
+                @Override
+                public void onChannelActive(String remoteAddr, Channel 
channel) {
+                    for (Map.Entry<String, HashMap<Long, String>> addressEntry 
: brokerAddrTable.entrySet()) {
+                        for (String address : 
addressEntry.getValue().values()) {
+                            if (address.equals(remoteAddr)) {
+                                sendHeartbeatToAllBrokerWithLockV2(false);
+                                break;
+                            }
+                        }
+                    }
+                }
             };
         } else {
             channelEventListener = null;
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
 
b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
index 8bf4b4a33d..90c912247e 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
@@ -49,6 +49,11 @@ public class ContainerClientHouseKeepingService implements 
ChannelEventListener
         onChannelOperation(CallbackCode.IDLE, remoteAddr, channel);
     }
 
+    @Override
+    public void onChannelActive(String remoteAddr, Channel channel) {
+        onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel);
+    }
+
     private void onChannelOperation(CallbackCode callbackCode, String 
remoteAddr, Channel channel) {
         Collection<InnerBrokerController> masterBrokers = 
this.brokerContainer.getMasterBrokers();
         Collection<InnerSalveBrokerController> slaveBrokers = 
this.brokerContainer.getSlaveBrokers();
@@ -103,6 +108,10 @@ public class ContainerClientHouseKeepingService implements 
ChannelEventListener
         /**
          * onChannelIdle
          */
-        IDLE
+        IDLE,
+        /**
+         * onChannelActive
+         */
+        ACTIVE
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
index 652a9eeb0d..d22d0b6069 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
@@ -48,4 +48,9 @@ public class BrokerHousekeepingService implements 
ChannelEventListener {
     public void onChannelIdle(String remoteAddr, Channel channel) {
         
this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
     }
+
+    @Override
+    public void onChannelActive(String remoteAddr, Channel channel) {
+
+    }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
index 80d9939923..b527429f77 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
@@ -46,4 +46,9 @@ public class BrokerHousekeepingService implements 
ChannelEventListener {
     public void onChannelIdle(String remoteAddr, Channel channel) {
         this.namesrvController.getRouteInfoManager().onChannelDestroy(channel);
     }
+
+    @Override
+    public void onChannelActive(String remoteAddr, Channel channel) {
+
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
index e213ae8554..74eb6f2db2 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java
@@ -49,5 +49,9 @@ public class ClientHousekeepingService implements 
ChannelEventListener {
         this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel);
     }
 
+    @Override
+    public void onChannelActive(String remoteAddr, Channel channel) {
+
+    }
 }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
index c99133b3a2..6802e69b90 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
@@ -26,4 +26,6 @@ public interface ChannelEventListener {
     void onChannelException(final String remoteAddr, final Channel channel);
 
     void onChannelIdle(final String remoteAddr, final Channel channel);
+
+    void onChannelActive(final String remoteAddr, final Channel channel);
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
index 9ac944aad3..4bc9d57dda 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
@@ -20,5 +20,6 @@ public enum NettyEventType {
     CONNECT,
     CLOSE,
     IDLE,
-    EXCEPTION
+    EXCEPTION,
+    ACTIVE
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 07ace28ea5..62a8a72901 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -701,6 +701,9 @@ public abstract class NettyRemotingAbstract {
                             case EXCEPTION:
                                 
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                                 break;
+                            case ACTIVE:
+                                
listener.onChannelActive(event.getRemoteAddr(), event.getChannel());
+                                break;
                             default:
                                 break;
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 340daee67e..9f15191306 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -1106,6 +1106,17 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             }
         }
 
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
+            super.channelActive(ctx);
+
+            if (NettyRemotingClient.this.channelEventListener != null) {
+                NettyRemotingClient.this.putNettyEvent(new 
NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel()));
+            }
+        }
+
         @Override
         public void disconnect(ChannelHandlerContext ctx, ChannelPromise 
promise) throws Exception {
             final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());

Reply via email to