This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2043dd5034 [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE (#7494) 2043dd5034 is described below commit 2043dd50341e0a4a2f254d72aa3109f4dfc97aac Author: Zhouxiang Zhan <zhouxz...@apache.org> AuthorDate: Tue Oct 24 10:29:43 2023 +0800 [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE (#7494) * [ISSUE #7493] Introduce a new event NettyEventType.ACTIVE for ChannelEventListener * introduce a new event NettyEventType.ACTIVE, * implement channelActive interface for NettyRemotingClient#NettyConnectManageHandler * add onChannelActive for ChannelEventListener interface. * Move send heartbeat to onChannelActive --- .../broker/client/ClientHousekeepingService.java | 5 +++++ .../client/impl/factory/MQClientInstance.java | 20 ++++++++++++-------- .../ContainerClientHouseKeepingService.java | 11 ++++++++++- .../controller/BrokerHousekeepingService.java | 5 +++++ .../namesrv/routeinfo/BrokerHousekeepingService.java | 5 +++++ .../proxy/remoting/ClientHousekeepingService.java | 4 ++++ .../rocketmq/remoting/ChannelEventListener.java | 2 ++ .../rocketmq/remoting/netty/NettyEventType.java | 3 ++- .../remoting/netty/NettyRemotingAbstract.java | 3 +++ .../rocketmq/remoting/netty/NettyRemotingClient.java | 11 +++++++++++ 10 files changed, 59 insertions(+), 10 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index cbb81f632b..7878d0eec5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -87,4 +87,9 @@ public class ClientHousekeepingService implements ChannelEventListener { this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getBrokerStatsManager().incChannelIdleNum(); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 09534a1768..ba72a6dce7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -159,14 +159,6 @@ public class MQClientInstance { private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = MQClientInstance.this.brokerAddrTable; @Override public void onChannelConnect(String remoteAddr, Channel channel) { - for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) { - for (String address : addressEntry.getValue().values()) { - if (address.equals(remoteAddr)) { - sendHeartbeatToAllBrokerWithLockV2(false); - break; - } - } - } } @Override @@ -180,6 +172,18 @@ public class MQClientInstance { @Override public void onChannelIdle(String remoteAddr, Channel channel) { } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + for (Map.Entry<String, HashMap<Long, String>> addressEntry : brokerAddrTable.entrySet()) { + for (String address : addressEntry.getValue().values()) { + if (address.equals(remoteAddr)) { + sendHeartbeatToAllBrokerWithLockV2(false); + break; + } + } + } + } }; } else { channelEventListener = null; diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java index 8bf4b4a33d..90c912247e 100644 --- a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java +++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java @@ -49,6 +49,11 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener onChannelOperation(CallbackCode.IDLE, remoteAddr, channel); } + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + onChannelOperation(CallbackCode.ACTIVE, remoteAddr, channel); + } + private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) { Collection<InnerBrokerController> masterBrokers = this.brokerContainer.getMasterBrokers(); Collection<InnerSalveBrokerController> slaveBrokers = this.brokerContainer.getSlaveBrokers(); @@ -103,6 +108,10 @@ public class ContainerClientHouseKeepingService implements ChannelEventListener /** * onChannelIdle */ - IDLE + IDLE, + /** + * onChannelActive + */ + ACTIVE } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java index 652a9eeb0d..d22d0b6069 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java @@ -48,4 +48,9 @@ public class BrokerHousekeepingService implements ChannelEventListener { public void onChannelIdle(String remoteAddr, Channel channel) { this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java index 80d9939923..b527429f77 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java @@ -46,4 +46,9 @@ public class BrokerHousekeepingService implements ChannelEventListener { public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(channel); } + + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java index e213ae8554..74eb6f2db2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/ClientHousekeepingService.java @@ -49,5 +49,9 @@ public class ClientHousekeepingService implements ChannelEventListener { this.clientManagerActivity.doChannelCloseEvent(remoteAddr, channel); } + @Override + public void onChannelActive(String remoteAddr, Channel channel) { + + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java index c99133b3a2..6802e69b90 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java @@ -26,4 +26,6 @@ public interface ChannelEventListener { void onChannelException(final String remoteAddr, final Channel channel); void onChannelIdle(final String remoteAddr, final Channel channel); + + void onChannelActive(final String remoteAddr, final Channel channel); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java index 9ac944aad3..4bc9d57dda 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java @@ -20,5 +20,6 @@ public enum NettyEventType { CONNECT, CLOSE, IDLE, - EXCEPTION + EXCEPTION, + ACTIVE } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 07ace28ea5..62a8a72901 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -701,6 +701,9 @@ public abstract class NettyRemotingAbstract { case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; + case ACTIVE: + listener.onChannelActive(event.getRemoteAddr(), event.getChannel()); + break; default: break; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 340daee67e..9f15191306 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -1106,6 +1106,17 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress); + super.channelActive(ctx); + + if (NettyRemotingClient.this.channelEventListener != null) { + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.ACTIVE, remoteAddress, ctx.channel())); + } + } + @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());