This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 14156b4f0a [ISSUE #9191] Provide the ability to replace the remoting layer implementation for Proxy and Broker (#9192) 14156b4f0a is described below commit 14156b4f0aacfa9d2a11a69dc4d58686a666d07c Author: Quan <zsjper...@foxmail.com> AuthorDate: Mon Feb 24 12:27:30 2025 +0800 [ISSUE #9191] Provide the ability to replace the remoting layer implementation for Proxy and Broker (#9192) * remoting replacement for proxy and broker * add unit test * fix code style --- .../apache/rocketmq/broker/BrokerController.java | 202 ++++++++++++--------- .../rocketmq/broker/BrokerControllerTest.java | 46 +++++ .../rocketmq/client/impl/MQClientAPIImpl.java | 35 +++- .../client/impl/mqclient/MQClientAPIExt.java | 14 +- .../client/impl/mqclient/MQClientAPIFactory.java | 35 +++- .../rocketmq/client/impl/MQClientAPIImplTest.java | 44 +++++ .../org/apache/rocketmq/common/ObjectCreator.java | 23 +-- .../rocketmq/container/InnerBrokerController.java | 12 +- .../proxy/service/ClusterServiceManager.java | 19 +- .../proxy/service/ServiceManagerFactory.java | 10 +- .../remoting/netty/NettyRemotingAbstract.java | 4 + .../remoting/netty/NettyRemotingClient.java | 24 +-- .../remoting/netty/NettyRemotingServer.java | 88 +++++---- 13 files changed, 373 insertions(+), 183 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 006695c6bc..4031dce8d6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -244,10 +244,10 @@ public class BrokerController { protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>(); protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>(); protected MessageStore messageStore; - protected RemotingServer remotingServer; + protected static final String TCP_REMOTING_SERVER = "TCP_REMOTING_SERVER"; + protected static final String FAST_REMOTING_SERVER = "FAST_REMOTING_SERVER"; + protected final Map<String, RemotingServer> remotingServerMap = new ConcurrentHashMap<>(); protected CountDownLatch remotingServerStartLatch; - protected RemotingServer fastRemotingServer; - /** * If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries are stored in RocksDB. */ @@ -494,7 +494,7 @@ public class BrokerController { } protected void initializeRemotingServer() throws CloneNotSupportedException { - this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); + RemotingServer tcpRemotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); int listeningPort = nettyServerConfig.getListenPort() - 2; @@ -503,7 +503,10 @@ public class BrokerController { } fastConfig.setListenPort(listeningPort); - this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); + RemotingServer fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); + + remotingServerMap.put(TCP_REMOTING_SERVER, tcpRemotingServer); + remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer); } /** @@ -939,8 +942,12 @@ public class BrokerController { } private void reloadServerSslContext() { - ((NettyRemotingServer) remotingServer).loadSslContext(); - ((NettyRemotingServer) fastRemotingServer).loadSslContext(); + for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) { + RemotingServer remotingServer = entry.getValue(); + if (remotingServer instanceof NettyRemotingServer) { + ((NettyRemotingServer) remotingServer).loadSslContext(); + } + } } }); } catch (Exception e) { @@ -1092,59 +1099,62 @@ public class BrokerController { } public void registerProcessor() { + RemotingServer remotingServer = remotingServerMap.get(TCP_REMOTING_SERVER); + RemotingServer fastRemotingServer = remotingServerMap.get(FAST_REMOTING_SERVER); + /* * SendMessageProcessor */ sendMessageProcessor.registerSendMessageHook(sendMessageHookList); sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); - this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, recallMessageProcessor, this.sendMessageExecutor); /** * PullMessageProcessor */ - this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor); + remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * PeekMessageProcessor */ - this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor); /** * PopMessageProcessor */ - this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor); /** * AckMessageProcessor */ - this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); + remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); + remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor); /** * ChangeInvisibleTimeProcessor */ - this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor); + remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor); /** * notificationProcessor */ - this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor); /** * pollingInfoProcessor */ - this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor); /** * ReplyMessageProcessor @@ -1152,64 +1162,64 @@ public class BrokerController { replyMessageProcessor.registerSendMessageHook(sendMessageHookList); - this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); /** * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); + remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); + remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); + fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); /** * ClientManageProcessor */ - this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); - this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); - this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); + remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); + remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); + remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); + fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); + fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); + fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor */ ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); /** * QueryAssignmentProcessor */ - this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor); - this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor); + remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor); + fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor); + remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor); + fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor); /** * EndTransactionProcessor */ - this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor); + remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor); + fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor); /* * Default */ AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); - this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); - this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); + remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); + fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); /* * Initialize the mapping of request codes to request headers. @@ -1342,14 +1352,6 @@ public class BrokerController { return producerManager; } - public void setFastRemotingServer(RemotingServer fastRemotingServer) { - this.fastRemotingServer = fastRemotingServer; - } - - public RemotingServer getFastRemotingServer() { - return fastRemotingServer; - } - public PullMessageProcessor getPullMessageProcessor() { return pullMessageProcessor; } @@ -1400,12 +1402,11 @@ public class BrokerController { this.shutdownHook.beforeShutdown(this); } - if (this.remotingServer != null) { - this.remotingServer.shutdown(); - } - - if (this.fastRemotingServer != null) { - this.fastRemotingServer.shutdown(); + for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) { + RemotingServer remotingServer = entry.getValue(); + if (remotingServer != null) { + remotingServer.shutdown(); + } } if (this.brokerMetricsManager != null) { @@ -1658,19 +1659,20 @@ public class BrokerController { remotingServerStartLatch.await(); } - if (this.remotingServer != null) { - this.remotingServer.start(); + for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) { + RemotingServer remotingServer = entry.getValue(); + if (remotingServer != null) { + remotingServer.start(); - // In test scenarios where it is up to OS to pick up an available port, set the listening port back to config - if (null != nettyServerConfig && 0 == nettyServerConfig.getListenPort()) { - nettyServerConfig.setListenPort(remotingServer.localListenPort()); + if (TCP_REMOTING_SERVER.equals(entry.getKey())) { + // In test scenarios where it is up to OS to pick up an available port, set the listening port back to config + if (null != nettyServerConfig && 0 == nettyServerConfig.getListenPort()) { + nettyServerConfig.setListenPort(remotingServer.localListenPort()); + } + } } } - if (this.fastRemotingServer != null) { - this.fastRemotingServer.start(); - } - this.storeHost = new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()); for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) { @@ -2353,21 +2355,49 @@ public class BrokerController { } public void registerServerRPCHook(RPCHook rpcHook) { - getRemotingServer().registerRPCHook(rpcHook); - this.fastRemotingServer.registerRPCHook(rpcHook); + for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) { + RemotingServer remotingServer = entry.getValue(); + if (remotingServer != null) { + remotingServer.registerRPCHook(rpcHook); + } + } } public void setRequestPipeline(RequestPipeline pipeline) { - this.getRemotingServer().setRequestPipeline(pipeline); - this.fastRemotingServer.setRequestPipeline(pipeline); + for (Map.Entry<String, RemotingServer> entry : remotingServerMap.entrySet()) { + RemotingServer remotingServer = entry.getValue(); + if (remotingServer != null) { + remotingServer.setRequestPipeline(pipeline); + } + } } public RemotingServer getRemotingServer() { - return remotingServer; + return remotingServerMap.get(TCP_REMOTING_SERVER); } public void setRemotingServer(RemotingServer remotingServer) { - this.remotingServer = remotingServer; + remotingServerMap.put(TCP_REMOTING_SERVER, remotingServer); + } + + public RemotingServer getFastRemotingServer() { + return remotingServerMap.get(FAST_REMOTING_SERVER); + } + + public void setFastRemotingServer(RemotingServer fastRemotingServer) { + remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer); + } + + public RemotingServer getRemotingServerByName(String name) { + return remotingServerMap.get(name); + } + + public void setRemotingServerByName(String name, RemotingServer remotingServer) { + remotingServerMap.put(name, remotingServer); + } + + public ClientHousekeepingService getClientHousekeepingService() { + return clientHousekeepingService; } public CountDownLatch getRemotingServerStartLatch() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 6035a20acb..3ce1fe3dbd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -26,11 +26,18 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.future.FutureTaskExt; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -94,4 +101,43 @@ public class BrokerControllerTest { TimeUnit.MILLISECONDS.sleep(headSlowTimeMills); assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills); } + + @Test + public void testCustomRemotingServer() throws CloneNotSupportedException { + final RemotingServer mockRemotingServer = new NettyRemotingServer(nettyServerConfig); + final String mockRemotingServerName = "MOCK_REMOTING_SERVER"; + + BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), messageStoreConfig); + brokerController.setRemotingServerByName(mockRemotingServerName, mockRemotingServer); + brokerController.initializeRemotingServer(); + + final RPCHook rpcHook = new RPCHook() { + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + + } + }; + brokerController.registerServerRPCHook(rpcHook); + + // setRequestPipelineTest + final RequestPipeline requestPipeline = (ctx, request) -> { + + }; + brokerController.setRequestPipeline(requestPipeline); + + NettyRemotingAbstract tcpRemotingServer = (NettyRemotingAbstract) brokerController.getRemotingServer(); + Assert.assertTrue(tcpRemotingServer.getRPCHook().contains(rpcHook)); + + NettyRemotingAbstract fastRemotingServer = (NettyRemotingAbstract) brokerController.getFastRemotingServer(); + Assert.assertTrue(fastRemotingServer.getRPCHook().contains(rpcHook)); + + NettyRemotingAbstract mockRemotingServer1 = (NettyRemotingAbstract) brokerController.getRemotingServerByName(mockRemotingServerName); + Assert.assertTrue(mockRemotingServer1.getRPCHook().contains(rpcHook)); + Assert.assertSame(mockRemotingServer, mockRemotingServer1); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bed6c1c476..30d7b0a1d5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -59,6 +59,7 @@ import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ObjectCreator; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -268,19 +269,43 @@ public class MQClientAPIImpl implements NameServerUpdateCallback, StartAndShutdo private String nameSrvAddr = null; private ClientConfig clientConfig; - public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, + public MQClientAPIImpl( + final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, - RPCHook rpcHook, final ClientConfig clientConfig) { + final RPCHook rpcHook, + final ClientConfig clientConfig + ) { this(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null); } - public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, + public MQClientAPIImpl( + final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, - RPCHook rpcHook, final ClientConfig clientConfig, final ChannelEventListener channelEventListener) { + final RPCHook rpcHook, + final ClientConfig clientConfig, + final ChannelEventListener channelEventListener + ) { + this( + nettyClientConfig, + clientRemotingProcessor, + rpcHook, + clientConfig, + channelEventListener, + null + ); + } + + public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, + final ClientRemotingProcessor clientRemotingProcessor, + RPCHook rpcHook, final ClientConfig clientConfig, + final ChannelEventListener channelEventListener, + final ObjectCreator<RemotingClient> remotingClientCreator) { this.clientConfig = clientConfig; topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); topAddressing.registerChangeCallBack(this); - this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener); + this.remotingClient = remotingClientCreator != null + ? remotingClientCreator.create(nettyClientConfig, channelEventListener) + : new NettyRemotingClient(nettyClientConfig, channelEventListener); this.clientRemotingProcessor = clientRemotingProcessor; this.remotingClient.registerRPCHook(new NamespaceRpcHook(clientConfig)); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index 6624b3100d..c22f453477 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.ObjectCreator; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; @@ -48,6 +49,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.ResponseFuture; @@ -97,7 +99,17 @@ public class MQClientAPIExt extends MQClientAPIImpl { ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook ) { - super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig); + this(clientConfig, nettyClientConfig, clientRemotingProcessor, rpcHook, null); + } + + public MQClientAPIExt( + ClientConfig clientConfig, + NettyClientConfig nettyClientConfig, + ClientRemotingProcessor clientRemotingProcessor, + RPCHook rpcHook, + ObjectCreator<RemotingClient> remotingClientCreator + ) { + super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null, remotingClientCreator); this.clientConfig = clientConfig; this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java index 0fa31b6640..d85dcc70a5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java @@ -17,18 +17,22 @@ package org.apache.rocketmq.client.impl.mqclient; import com.google.common.base.Strings; + import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.common.NameserverAccessConfig; import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.utils.AsyncShutdownHelper; +import org.apache.rocketmq.common.ObjectCreator; import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.netty.NettyClientConfig; public class MQClientAPIFactory implements StartAndShutdown { @@ -40,16 +44,35 @@ public class MQClientAPIFactory implements StartAndShutdown { private final RPCHook rpcHook; private final ScheduledExecutorService scheduledExecutorService; private final NameserverAccessConfig nameserverAccessConfig; + private final ObjectCreator<RemotingClient> remotingClientCreator; + + public MQClientAPIFactory( + NameserverAccessConfig nameserverAccessConfig, + String namePrefix, + int clientNum, + ClientRemotingProcessor clientRemotingProcessor, + RPCHook rpcHook, + ScheduledExecutorService scheduledExecutorService + ) { + this(nameserverAccessConfig, namePrefix, clientNum, clientRemotingProcessor, rpcHook, scheduledExecutorService, null); + } - public MQClientAPIFactory(NameserverAccessConfig nameserverAccessConfig, String namePrefix, int clientNum, + public MQClientAPIFactory( + NameserverAccessConfig nameserverAccessConfig, + String namePrefix, + int clientNum, ClientRemotingProcessor clientRemotingProcessor, - RPCHook rpcHook, ScheduledExecutorService scheduledExecutorService) { + RPCHook rpcHook, + ScheduledExecutorService scheduledExecutorService, + ObjectCreator<RemotingClient> remotingClientCreator + ) { this.nameserverAccessConfig = nameserverAccessConfig; this.namePrefix = namePrefix; this.clientNum = clientNum; this.clientRemotingProcessor = clientRemotingProcessor; this.rpcHook = rpcHook; this.scheduledExecutorService = scheduledExecutorService; + this.remotingClientCreator = remotingClientCreator; this.init(); } @@ -102,9 +125,13 @@ public class MQClientAPIFactory implements StartAndShutdown { NettyClientConfig nettyClientConfig = new NettyClientConfig(); nettyClientConfig.setDisableCallbackExecutor(true); - MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(clientConfig, nettyClientConfig, + MQClientAPIExt mqClientAPIExt = new MQClientAPIExt( + clientConfig, + nettyClientConfig, clientRemotingProcessor, - rpcHook); + rpcHook, + remotingClientCreator + ); if (!mqClientAPIExt.updateNameServerAddressList()) { mqClientAPIExt.fetchNameServerAddr(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index c76d0c734a..6cb96df05f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ObjectCreator; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; @@ -59,6 +60,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; @@ -2104,6 +2106,48 @@ public class MQClientAPIImplTest { done.await(); } + @Test + public void testMQClientAPIImplWithoutObjectCreator() { + MQClientAPIImpl clientAPI = new MQClientAPIImpl( + new NettyClientConfig(), + null, + null, + new ClientConfig(), + null, + null + ); + RemotingClient remotingClient1 = clientAPI.getRemotingClient(); + Assert.assertTrue(remotingClient1 instanceof NettyRemotingClient); + } + + @Test + public void testMQClientAPIImplWithObjectCreator() { + ObjectCreator<RemotingClient> clientObjectCreator = args -> new MockRemotingClientTest((NettyClientConfig) args[0]); + final NettyClientConfig nettyClientConfig = new NettyClientConfig(); + MQClientAPIImpl clientAPI = new MQClientAPIImpl( + nettyClientConfig, + null, + null, + new ClientConfig(), + null, + clientObjectCreator + ); + RemotingClient remotingClient1 = clientAPI.getRemotingClient(); + Assert.assertTrue(remotingClient1 instanceof MockRemotingClientTest); + MockRemotingClientTest remotingClientTest = (MockRemotingClientTest) remotingClient1; + Assert.assertSame(remotingClientTest.getNettyClientConfig(), nettyClientConfig); + } + + private static class MockRemotingClientTest extends NettyRemotingClient { + public MockRemotingClientTest(NettyClientConfig nettyClientConfig) { + super(nettyClientConfig); + } + + public NettyClientConfig getNettyClientConfig() { + return nettyClientConfig; + } + } + private Properties createProperties() { Properties result = new Properties(); result.put("key", "value"); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java b/common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java similarity index 51% copy from proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java copy to common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java index c186752788..14c645424f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java +++ b/common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java @@ -14,25 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.proxy.service; +package org.apache.rocketmq.common; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.remoting.RPCHook; - -public class ServiceManagerFactory { - public static ServiceManager createForLocalMode(BrokerController brokerController) { - return createForLocalMode(brokerController, null); - } - - public static ServiceManager createForLocalMode(BrokerController brokerController, RPCHook rpcHook) { - return new LocalServiceManager(brokerController, rpcHook); - } - - public static ServiceManager createForClusterMode() { - return createForClusterMode(null); - } - - public static ServiceManager createForClusterMode(RPCHook rpcHook) { - return new ClusterServiceManager(rpcHook); - } +public interface ObjectCreator<T> { + T create(Object... args); } diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java index a1c1eecf59..616188e52d 100644 --- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java +++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.MessageStore; @@ -43,8 +44,11 @@ public class InnerBrokerController extends BrokerController { @Override protected void initializeRemotingServer() { - this.remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort()); - this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2); + RemotingServer remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort()); + RemotingServer fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2); + + setRemotingServer(remotingServer); + setFastRemotingServer(fastRemotingServer); } @Override @@ -119,11 +123,11 @@ public class InnerBrokerController extends BrokerController { scheduledFuture.cancel(true); } - if (this.remotingServer != null) { + if (getRemotingServer() != null) { this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort()); } - if (this.fastRemotingServer != null) { + if (getFastRemotingServer() != null) { this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java index 9786cec557..33b65d2550 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.client.common.NameserverAccessConfig; import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.common.ObjectCreator; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.common.utils.ThreadUtils; @@ -51,6 +52,7 @@ import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.proxy.service.transaction.ClusterTransactionService; import org.apache.rocketmq.proxy.service.transaction.TransactionService; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; public class ClusterServiceManager extends AbstractStartAndShutdown implements ServiceManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -70,6 +72,10 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S protected MQClientAPIFactory transactionClientAPIFactory; public ClusterServiceManager(RPCHook rpcHook) { + this(rpcHook, null); + } + + public ClusterServiceManager(RPCHook rpcHook, ObjectCreator<RemotingClient> remotingClientCreator) { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); NameserverAccessConfig nameserverAccessConfig = new NameserverAccessConfig(proxyConfig.getNamesrvAddr(), proxyConfig.getNamesrvDomain(), proxyConfig.getNamesrvDomainSubgroup()); @@ -81,14 +87,18 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S proxyConfig.getRocketmqMQClientNum(), new DoNothingClientRemotingProcessor(null), rpcHook, - scheduledExecutorService); + scheduledExecutorService, + remotingClientCreator + ); + this.operationClientAPIFactory = new MQClientAPIFactory( nameserverAccessConfig, "OperationClient_", 1, new DoNothingClientRemotingProcessor(null), rpcHook, - this.scheduledExecutorService + this.scheduledExecutorService, + remotingClientCreator ); this.topicRouteService = new ClusterTopicRouteService(operationClientAPIFactory); @@ -105,7 +115,10 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S 1, new ProxyClientRemotingProcessor(producerManager), rpcHook, - scheduledExecutorService); + scheduledExecutorService, + remotingClientCreator + ); + this.clusterTransactionService = new ClusterTransactionService(this.topicRouteService, this.producerManager, this.transactionClientAPIFactory); this.proxyRelayService = new ClusterProxyRelayService(this.clusterTransactionService); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java index c186752788..e1252fe31f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java @@ -17,7 +17,9 @@ package org.apache.rocketmq.proxy.service; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ObjectCreator; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; public class ServiceManagerFactory { public static ServiceManager createForLocalMode(BrokerController brokerController) { @@ -29,10 +31,14 @@ public class ServiceManagerFactory { } public static ServiceManager createForClusterMode() { - return createForClusterMode(null); + return createForClusterMode(null, null); } public static ServiceManager createForClusterMode(RPCHook rpcHook) { - return new ClusterServiceManager(rpcHook); + return createForClusterMode(rpcHook, null); + } + + public static ServiceManager createForClusterMode(RPCHook rpcHook, ObjectCreator<RemotingClient> remotingClientCreator) { + return new ClusterServiceManager(rpcHook, remotingClientCreator); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index d3f5a88cf2..a4f23f181a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -673,6 +673,10 @@ public abstract class NettyRemotingAbstract { } } + public HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>> getProcessorTable() { + return processorTable; + } + class NettyEventExecutor extends ServiceThread { private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<>(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 6ac54aed6d..e92809ccdf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -98,7 +98,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100; - private final NettyClientConfig nettyClientConfig; + protected final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); @@ -288,6 +288,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return null; } + protected ChannelFuture doConnect(String addr) { + String[] hostAndPort = getHostAndPort(addr); + String host = hostAndPort[0]; + int port = Integer.parseInt(hostAndPort[1]); + return fetchBootstrap(addr).connect(host, port); + } + private Bootstrap fetchBootstrap(String addr) { Map.Entry<String, SocksProxyConfig> proxyEntry = getProxy(addr); if (proxyEntry == null) { @@ -359,7 +366,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } // Do not use RemotingHelper.string2SocketAddress(), it will directly resolve the domain - private String[] getHostAndPort(String address) { + protected String[] getHostAndPort(String address) { int split = address.lastIndexOf(":"); return split < 0 ? new String[]{address} : new String[]{address.substring(0, split), address.substring(split + 1)}; } @@ -712,9 +719,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } private ChannelWrapper createChannel(String addr) { - String[] hostAndPort = getHostAndPort(addr); - ChannelFuture channelFuture = fetchBootstrap(addr) - .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + ChannelFuture channelFuture = doConnect(addr); LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); ChannelWrapper cw = new ChannelWrapper(addr, channelFuture); this.channelTables.put(addr, cw); @@ -1047,9 +1052,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti try { if (isWrapperOf(channel)) { channelToClose = channelFuture; - String[] hostAndPort = getHostAndPort(channelAddress); - channelFuture = fetchBootstrap(channelAddress) - .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + channelFuture = doConnect(channelAddress); return true; } else { LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id()); @@ -1119,15 +1122,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { - + public class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } - class NettyConnectManageHandler extends ChannelDuplexHandler { + public class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index cbf25c23c6..7ed804483b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -54,19 +54,6 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.cert.CertificateException; -import java.time.Duration; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.Pair; @@ -88,15 +75,28 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -@SuppressWarnings("NullableProblems") +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.time.Duration; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); private static final Logger TRAFFIC_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_TRAFFIC_NAME); private final ServerBootstrap serverBootstrap; - private final EventLoopGroup eventLoopGroupSelector; - private final EventLoopGroup eventLoopGroupBoss; - private final NettyServerConfig nettyServerConfig; + protected final EventLoopGroup eventLoopGroupSelector; + protected final EventLoopGroup eventLoopGroupBoss; + protected final NettyServerConfig nettyServerConfig; private final ExecutorService publicExecutor; private final ScheduledExecutorService scheduledExecutorService; @@ -120,18 +120,18 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; // sharable handlers - private TlsModeHandler tlsModeHandler; - private NettyEncoder encoder; - private NettyConnectManageHandler connectionManageHandler; - private NettyServerHandler serverHandler; - private RemotingCodeDistributionHandler distributionHandler; + protected final TlsModeHandler tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); + protected final NettyEncoder encoder = new NettyEncoder(); + protected final NettyConnectManageHandler connectionManageHandler = new NettyConnectManageHandler(); + protected final NettyServerHandler serverHandler = new NettyServerHandler(); + protected final RemotingCodeDistributionHandler distributionHandler = new RemotingCodeDistributionHandler(); public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } public NettyRemotingServer(final NettyServerConfig nettyServerConfig, - final ChannelEventListener channelEventListener) { + final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; @@ -140,13 +140,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this.publicExecutor = buildPublicExecutor(nettyServerConfig); this.scheduledExecutorService = buildScheduleExecutor(); - this.eventLoopGroupBoss = buildBossEventLoopGroup(); + this.eventLoopGroupBoss = buildEventLoopGroupBoss(); this.eventLoopGroupSelector = buildEventLoopGroupSelector(); loadSslContext(); } - private EventLoopGroup buildEventLoopGroupSelector() { + protected EventLoopGroup buildEventLoopGroupSelector() { if (useEpoll()) { return new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactoryImpl("NettyServerEPOLLSelector_")); } else { @@ -154,7 +154,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } - private EventLoopGroup buildBossEventLoopGroup() { + protected EventLoopGroup buildEventLoopGroupBoss() { if (useEpoll()) { return new EpollEventLoopGroup(1, new ThreadFactoryImpl("NettyEPOLLBoss_")); } else { @@ -197,13 +197,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti && Epoll.isAvailable(); } - @Override - public void start() { - this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(), - new ThreadFactoryImpl("NettyServerCodecThread_")); - - prepareSharableHandlers(); - + protected void initServerBootstrap(ServerBootstrap serverBootstrap) { serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) @@ -220,6 +214,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti }); addCustomConfig(serverBootstrap); + } + + @Override + public void start() { + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(), + new ThreadFactoryImpl("NettyServerCodecThread_")); + + initServerBootstrap(serverBootstrap); try { ChannelFuture sync = serverBootstrap.bind().sync(); @@ -411,14 +413,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return this.publicExecutor; } - private void prepareSharableHandlers() { - tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); - encoder = new NettyEncoder(); - connectionManageHandler = new NettyConnectManageHandler(); - serverHandler = new NettyServerHandler(); - distributionHandler = new RemotingCodeDistributionHandler(); - } - private void printRemotingCodeDistribution() { if (distributionHandler != null) { String inBoundSnapshotString = distributionHandler.getInBoundSnapshotString(); @@ -469,8 +463,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } if (detectionResult.state() == ProtocolDetectionState.DETECTED) { ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) - .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) - .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler); + .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) + .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler); } else { ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler); } @@ -664,7 +658,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void registerProcessor(final int requestCode, final NettyRequestProcessor processor, - final ExecutorService executor) { + final ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = NettyRemotingServer.this.publicExecutor; @@ -708,19 +702,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, - final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { return this.invokeSyncImpl(channel, request, timeoutMillis); } @Override public void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, - final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); } @Override public void invokeOneway(final Channel channel, final RemotingCommand request, - final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeOnewayImpl(channel, request, timeoutMillis); }