This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 14156b4f0a [ISSUE #9191] Provide the ability to replace the remoting 
layer implementation for Proxy and Broker (#9192)
14156b4f0a is described below

commit 14156b4f0aacfa9d2a11a69dc4d58686a666d07c
Author: Quan <zsjper...@foxmail.com>
AuthorDate: Mon Feb 24 12:27:30 2025 +0800

    [ISSUE #9191] Provide the ability to replace the remoting layer 
implementation for Proxy and Broker (#9192)
    
    * remoting replacement for proxy and broker
    
    * add unit test
    
    * fix code style
---
 .../apache/rocketmq/broker/BrokerController.java   | 202 ++++++++++++---------
 .../rocketmq/broker/BrokerControllerTest.java      |  46 +++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  35 +++-
 .../client/impl/mqclient/MQClientAPIExt.java       |  14 +-
 .../client/impl/mqclient/MQClientAPIFactory.java   |  35 +++-
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  44 +++++
 .../org/apache/rocketmq/common/ObjectCreator.java  |  23 +--
 .../rocketmq/container/InnerBrokerController.java  |  12 +-
 .../proxy/service/ClusterServiceManager.java       |  19 +-
 .../proxy/service/ServiceManagerFactory.java       |  10 +-
 .../remoting/netty/NettyRemotingAbstract.java      |   4 +
 .../remoting/netty/NettyRemotingClient.java        |  24 +--
 .../remoting/netty/NettyRemotingServer.java        |  88 +++++----
 13 files changed, 373 insertions(+), 183 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 006695c6bc..4031dce8d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -244,10 +244,10 @@ public class BrokerController {
     protected final List<SendMessageHook> sendMessageHookList = new 
ArrayList<>();
     protected final List<ConsumeMessageHook> consumeMessageHookList = new 
ArrayList<>();
     protected MessageStore messageStore;
-    protected RemotingServer remotingServer;
+    protected static final String TCP_REMOTING_SERVER = "TCP_REMOTING_SERVER";
+    protected static final String FAST_REMOTING_SERVER = 
"FAST_REMOTING_SERVER";
+    protected final Map<String, RemotingServer> remotingServerMap = new 
ConcurrentHashMap<>();
     protected CountDownLatch remotingServerStartLatch;
-    protected RemotingServer fastRemotingServer;
-
     /**
      * If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries 
are stored in RocksDB.
      */
@@ -494,7 +494,7 @@ public class BrokerController {
     }
 
     protected void initializeRemotingServer() throws 
CloneNotSupportedException {
-        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, 
this.clientHousekeepingService);
+        RemotingServer tcpRemotingServer = new 
NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
         NettyServerConfig fastConfig = (NettyServerConfig) 
this.nettyServerConfig.clone();
 
         int listeningPort = nettyServerConfig.getListenPort() - 2;
@@ -503,7 +503,10 @@ public class BrokerController {
         }
         fastConfig.setListenPort(listeningPort);
 
-        this.fastRemotingServer = new NettyRemotingServer(fastConfig, 
this.clientHousekeepingService);
+        RemotingServer fastRemotingServer = new 
NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+
+        remotingServerMap.put(TCP_REMOTING_SERVER, tcpRemotingServer);
+        remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer);
     }
 
     /**
@@ -939,8 +942,12 @@ public class BrokerController {
                             }
 
                             private void reloadServerSslContext() {
-                                ((NettyRemotingServer) 
remotingServer).loadSslContext();
-                                ((NettyRemotingServer) 
fastRemotingServer).loadSslContext();
+                                for (Map.Entry<String, RemotingServer> entry : 
remotingServerMap.entrySet()) {
+                                    RemotingServer remotingServer = 
entry.getValue();
+                                    if (remotingServer instanceof 
NettyRemotingServer) {
+                                        ((NettyRemotingServer) 
remotingServer).loadSslContext();
+                                    }
+                                }
                             }
                         });
                 } catch (Exception e) {
@@ -1092,59 +1099,62 @@ public class BrokerController {
     }
 
     public void registerProcessor() {
+        RemotingServer remotingServer = 
remotingServerMap.get(TCP_REMOTING_SERVER);
+        RemotingServer fastRemotingServer = 
remotingServerMap.get(FAST_REMOTING_SERVER);
+
         /*
          * SendMessageProcessor
          */
         sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
         
sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
 
-        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendMessageProcessor, this.sendMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, 
sendMessageProcessor, this.sendMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, 
recallMessageProcessor, this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendMessageProcessor, this.sendMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, 
sendMessageProcessor, this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, 
recallMessageProcessor, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendMessageProcessor, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, 
sendMessageProcessor, this.sendMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, 
recallMessageProcessor, this.sendMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendMessageProcessor, this.sendMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, 
sendMessageProcessor, this.sendMessageExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, 
sendMessageProcessor, this.sendMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE, 
recallMessageProcessor, this.sendMessageExecutor);
         /**
          * PullMessageProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, 
this.pullMessageProcessor, this.pullMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, 
this.pullMessageProcessor, this.litePullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, 
this.pullMessageProcessor, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, 
this.pullMessageProcessor, this.litePullMessageExecutor);
         
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
         /**
          * PeekMessageProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, 
this.peekMessageProcessor, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, 
this.peekMessageProcessor, this.pullMessageExecutor);
         /**
          * PopMessageProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, 
this.popMessageProcessor, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.POP_MESSAGE, 
this.popMessageProcessor, this.pullMessageExecutor);
 
         /**
          * AckMessageProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
 
-        this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
         /**
          * ChangeInvisibleTimeProcessor
          */
-        
this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
 this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
+        
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
         /**
          * notificationProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, 
this.notificationProcessor, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.NOTIFICATION, 
this.notificationProcessor, this.pullMessageExecutor);
 
         /**
          * pollingInfoProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, 
this.pollingInfoProcessor, this.pullMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.POLLING_INFO, 
this.pollingInfoProcessor, this.pullMessageExecutor);
 
         /**
          * ReplyMessageProcessor
@@ -1152,64 +1162,64 @@ public class BrokerController {
 
         replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
 
-        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, 
replyMessageProcessor, replyMessageExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, 
replyMessageProcessor, replyMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, 
replyMessageProcessor, replyMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, 
replyMessageProcessor, replyMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, 
replyMessageProcessor, replyMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, 
replyMessageProcessor, replyMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, 
replyMessageProcessor, replyMessageExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, 
replyMessageProcessor, replyMessageExecutor);
 
         /**
          * QueryMessageProcessor
          */
         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
-        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.queryMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.queryMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.queryMessageExecutor);
+        remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.queryMessageExecutor);
 
-        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.queryMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.queryMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.queryMessageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.queryMessageExecutor);
 
         /**
          * ClientManageProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientManageProcessor, this.heartbeatExecutor);
-        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientManageProcessor, this.clientManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientManageProcessor, this.clientManageExecutor);
+        remotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientManageProcessor, this.heartbeatExecutor);
+        remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientManageProcessor, this.clientManageExecutor);
+        remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientManageProcessor, this.clientManageExecutor);
 
-        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientManageProcessor, this.heartbeatExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientManageProcessor, this.clientManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientManageProcessor, this.clientManageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientManageProcessor, this.heartbeatExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientManageProcessor, this.clientManageExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientManageProcessor, this.clientManageExecutor);
 
         /**
          * ConsumerManageProcessor
          */
         ConsumerManageProcessor consumerManageProcessor = new 
ConsumerManageProcessor(this);
-        
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
consumerManageProcessor, this.consumerManageExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
+        
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
consumerManageProcessor, this.consumerManageExecutor);
+        remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
+        remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
 
-        
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
 consumerManageProcessor, this.consumerManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
consumerManageProcessor, this.consumerManageExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
 
         /**
          * QueryAssignmentProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, 
queryAssignmentProcessor, loadBalanceExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, 
queryAssignmentProcessor, loadBalanceExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, 
queryAssignmentProcessor, loadBalanceExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, 
queryAssignmentProcessor, loadBalanceExecutor);
+        remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, 
queryAssignmentProcessor, loadBalanceExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, 
queryAssignmentProcessor, loadBalanceExecutor);
+        remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, 
queryAssignmentProcessor, loadBalanceExecutor);
+        
fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, 
queryAssignmentProcessor, loadBalanceExecutor);
 
         /**
          * EndTransactionProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, 
endTransactionProcessor, this.endTransactionExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, 
endTransactionProcessor, this.endTransactionExecutor);
+        remotingServer.registerProcessor(RequestCode.END_TRANSACTION, 
endTransactionProcessor, this.endTransactionExecutor);
+        fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, 
endTransactionProcessor, this.endTransactionExecutor);
 
         /*
          * Default
          */
         AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
-        this.remotingServer.registerDefaultProcessor(adminProcessor, 
this.adminBrokerExecutor);
-        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, 
this.adminBrokerExecutor);
+        remotingServer.registerDefaultProcessor(adminProcessor, 
this.adminBrokerExecutor);
+        fastRemotingServer.registerDefaultProcessor(adminProcessor, 
this.adminBrokerExecutor);
 
         /*
          * Initialize the mapping of request codes to request headers.
@@ -1342,14 +1352,6 @@ public class BrokerController {
         return producerManager;
     }
 
-    public void setFastRemotingServer(RemotingServer fastRemotingServer) {
-        this.fastRemotingServer = fastRemotingServer;
-    }
-
-    public RemotingServer getFastRemotingServer() {
-        return fastRemotingServer;
-    }
-
     public PullMessageProcessor getPullMessageProcessor() {
         return pullMessageProcessor;
     }
@@ -1400,12 +1402,11 @@ public class BrokerController {
             this.shutdownHook.beforeShutdown(this);
         }
 
-        if (this.remotingServer != null) {
-            this.remotingServer.shutdown();
-        }
-
-        if (this.fastRemotingServer != null) {
-            this.fastRemotingServer.shutdown();
+        for (Map.Entry<String, RemotingServer> entry : 
remotingServerMap.entrySet()) {
+            RemotingServer remotingServer = entry.getValue();
+            if (remotingServer != null) {
+                remotingServer.shutdown();
+            }
         }
 
         if (this.brokerMetricsManager != null) {
@@ -1658,19 +1659,20 @@ public class BrokerController {
             remotingServerStartLatch.await();
         }
 
-        if (this.remotingServer != null) {
-            this.remotingServer.start();
+        for (Map.Entry<String, RemotingServer> entry : 
remotingServerMap.entrySet()) {
+            RemotingServer remotingServer = entry.getValue();
+            if (remotingServer != null) {
+                remotingServer.start();
 
-            // In test scenarios where it is up to OS to pick up an available 
port, set the listening port back to config
-            if (null != nettyServerConfig && 0 == 
nettyServerConfig.getListenPort()) {
-                
nettyServerConfig.setListenPort(remotingServer.localListenPort());
+                if (TCP_REMOTING_SERVER.equals(entry.getKey())) {
+                    // In test scenarios where it is up to OS to pick up an 
available port, set the listening port back to config
+                    if (null != nettyServerConfig && 0 == 
nettyServerConfig.getListenPort()) {
+                        
nettyServerConfig.setListenPort(remotingServer.localListenPort());
+                    }
+                }
             }
         }
 
-        if (this.fastRemotingServer != null) {
-            this.fastRemotingServer.start();
-        }
-
         this.storeHost = new 
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), 
this.getNettyServerConfig().getListenPort());
 
         for (BrokerAttachedPlugin brokerAttachedPlugin : 
brokerAttachedPlugins) {
@@ -2353,21 +2355,49 @@ public class BrokerController {
     }
 
     public void registerServerRPCHook(RPCHook rpcHook) {
-        getRemotingServer().registerRPCHook(rpcHook);
-        this.fastRemotingServer.registerRPCHook(rpcHook);
+        for (Map.Entry<String, RemotingServer> entry : 
remotingServerMap.entrySet()) {
+            RemotingServer remotingServer = entry.getValue();
+            if (remotingServer != null) {
+                remotingServer.registerRPCHook(rpcHook);
+            }
+        }
     }
 
     public void setRequestPipeline(RequestPipeline pipeline) {
-        this.getRemotingServer().setRequestPipeline(pipeline);
-        this.fastRemotingServer.setRequestPipeline(pipeline);
+        for (Map.Entry<String, RemotingServer> entry : 
remotingServerMap.entrySet()) {
+            RemotingServer remotingServer = entry.getValue();
+            if (remotingServer != null) {
+                remotingServer.setRequestPipeline(pipeline);
+            }
+        }
     }
 
     public RemotingServer getRemotingServer() {
-        return remotingServer;
+        return remotingServerMap.get(TCP_REMOTING_SERVER);
     }
 
     public void setRemotingServer(RemotingServer remotingServer) {
-        this.remotingServer = remotingServer;
+        remotingServerMap.put(TCP_REMOTING_SERVER, remotingServer);
+    }
+
+    public RemotingServer getFastRemotingServer() {
+        return remotingServerMap.get(FAST_REMOTING_SERVER);
+    }
+
+    public void setFastRemotingServer(RemotingServer fastRemotingServer) {
+        remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer);
+    }
+
+    public RemotingServer getRemotingServerByName(String name) {
+        return remotingServerMap.get(name);
+    }
+
+    public void setRemotingServerByName(String name, RemotingServer 
remotingServer) {
+        remotingServerMap.put(name, remotingServer);
+    }
+
+    public ClientHousekeepingService getClientHousekeepingService() {
+        return clientHousekeepingService;
     }
 
     public CountDownLatch getRemotingServerStartLatch() {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 6035a20acb..3ce1fe3dbd 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -26,11 +26,18 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -94,4 +101,43 @@ public class BrokerControllerTest {
         TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
         
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
     }
+
+    @Test
+    public void testCustomRemotingServer() throws CloneNotSupportedException {
+        final RemotingServer mockRemotingServer = new 
NettyRemotingServer(nettyServerConfig);
+        final String mockRemotingServerName = "MOCK_REMOTING_SERVER";
+
+        BrokerController brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, new NettyClientConfig(), messageStoreConfig);
+        brokerController.setRemotingServerByName(mockRemotingServerName, 
mockRemotingServer);
+        brokerController.initializeRemotingServer();
+
+        final RPCHook rpcHook = new RPCHook() {
+            @Override
+            public void doBeforeRequest(String remoteAddr, RemotingCommand 
request) {
+
+            }
+
+            @Override
+            public void doAfterResponse(String remoteAddr, RemotingCommand 
request, RemotingCommand response) {
+
+            }
+        };
+        brokerController.registerServerRPCHook(rpcHook);
+
+        // setRequestPipelineTest
+        final RequestPipeline requestPipeline = (ctx, request) -> {
+
+        };
+        brokerController.setRequestPipeline(requestPipeline);
+
+        NettyRemotingAbstract tcpRemotingServer = (NettyRemotingAbstract) 
brokerController.getRemotingServer();
+        Assert.assertTrue(tcpRemotingServer.getRPCHook().contains(rpcHook));
+
+        NettyRemotingAbstract fastRemotingServer = (NettyRemotingAbstract) 
brokerController.getFastRemotingServer();
+        Assert.assertTrue(fastRemotingServer.getRPCHook().contains(rpcHook));
+
+        NettyRemotingAbstract mockRemotingServer1 = (NettyRemotingAbstract) 
brokerController.getRemotingServerByName(mockRemotingServerName);
+        Assert.assertTrue(mockRemotingServer1.getRPCHook().contains(rpcHook));
+        Assert.assertSame(mockRemotingServer, mockRemotingServer1);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bed6c1c476..30d7b0a1d5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ObjectCreator;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
@@ -268,19 +269,43 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
     private String nameSrvAddr = null;
     private ClientConfig clientConfig;
 
-    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+    public MQClientAPIImpl(
+        final NettyClientConfig nettyClientConfig,
         final ClientRemotingProcessor clientRemotingProcessor,
-        RPCHook rpcHook, final ClientConfig clientConfig) {
+        final RPCHook rpcHook,
+        final ClientConfig clientConfig
+    ) {
         this(nettyClientConfig, clientRemotingProcessor, rpcHook, 
clientConfig, null);
     }
 
-    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+    public MQClientAPIImpl(
+        final NettyClientConfig nettyClientConfig,
         final ClientRemotingProcessor clientRemotingProcessor,
-        RPCHook rpcHook, final ClientConfig clientConfig, final 
ChannelEventListener channelEventListener) {
+        final RPCHook rpcHook,
+        final ClientConfig clientConfig,
+        final ChannelEventListener channelEventListener
+    ) {
+        this(
+            nettyClientConfig,
+            clientRemotingProcessor,
+            rpcHook,
+            clientConfig,
+            channelEventListener,
+            null
+        );
+    }
+
+    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+                           final ClientRemotingProcessor 
clientRemotingProcessor,
+                           RPCHook rpcHook, final ClientConfig clientConfig,
+                           final ChannelEventListener channelEventListener,
+                           final ObjectCreator<RemotingClient> 
remotingClientCreator) {
         this.clientConfig = clientConfig;
         topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), 
clientConfig.getUnitName());
         topAddressing.registerChangeCallBack(this);
-        this.remotingClient = new NettyRemotingClient(nettyClientConfig, 
channelEventListener);
+        this.remotingClient = remotingClientCreator != null
+            ? remotingClientCreator.create(nettyClientConfig, 
channelEventListener)
+            : new NettyRemotingClient(nettyClientConfig, channelEventListener);
         this.clientRemotingProcessor = clientRemotingProcessor;
 
         this.remotingClient.registerRPCHook(new 
NamespaceRpcHook(clientConfig));
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index 6624b3100d..c22f453477 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.ObjectCreator;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
@@ -48,6 +49,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
@@ -97,7 +99,17 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         ClientRemotingProcessor clientRemotingProcessor,
         RPCHook rpcHook
     ) {
-        super(nettyClientConfig, clientRemotingProcessor, rpcHook, 
clientConfig);
+        this(clientConfig, nettyClientConfig, clientRemotingProcessor, 
rpcHook, null);
+    }
+
+    public MQClientAPIExt(
+        ClientConfig clientConfig,
+        NettyClientConfig nettyClientConfig,
+        ClientRemotingProcessor clientRemotingProcessor,
+        RPCHook rpcHook,
+        ObjectCreator<RemotingClient> remotingClientCreator
+    ) {
+        super(nettyClientConfig, clientRemotingProcessor, rpcHook, 
clientConfig, null, remotingClientCreator);
         this.clientConfig = clientConfig;
         this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient());
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
index 0fa31b6640..d85dcc70a5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
@@ -17,18 +17,22 @@
 package org.apache.rocketmq.client.impl.mqclient;
 
 import com.google.common.base.Strings;
+
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.common.NameserverAccessConfig;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
+import org.apache.rocketmq.common.ObjectCreator;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 
 public class MQClientAPIFactory implements StartAndShutdown {
@@ -40,16 +44,35 @@ public class MQClientAPIFactory implements StartAndShutdown 
{
     private final RPCHook rpcHook;
     private final ScheduledExecutorService scheduledExecutorService;
     private final NameserverAccessConfig nameserverAccessConfig;
+    private final ObjectCreator<RemotingClient> remotingClientCreator;
+
+    public MQClientAPIFactory(
+        NameserverAccessConfig nameserverAccessConfig,
+        String namePrefix,
+        int clientNum,
+        ClientRemotingProcessor clientRemotingProcessor,
+        RPCHook rpcHook,
+        ScheduledExecutorService scheduledExecutorService
+    ) {
+        this(nameserverAccessConfig, namePrefix, clientNum, 
clientRemotingProcessor, rpcHook, scheduledExecutorService, null);
+    }
 
-    public MQClientAPIFactory(NameserverAccessConfig nameserverAccessConfig, 
String namePrefix, int clientNum,
+    public MQClientAPIFactory(
+        NameserverAccessConfig nameserverAccessConfig,
+        String namePrefix,
+        int clientNum,
         ClientRemotingProcessor clientRemotingProcessor,
-        RPCHook rpcHook, ScheduledExecutorService scheduledExecutorService) {
+        RPCHook rpcHook,
+        ScheduledExecutorService scheduledExecutorService,
+        ObjectCreator<RemotingClient> remotingClientCreator
+    ) {
         this.nameserverAccessConfig = nameserverAccessConfig;
         this.namePrefix = namePrefix;
         this.clientNum = clientNum;
         this.clientRemotingProcessor = clientRemotingProcessor;
         this.rpcHook = rpcHook;
         this.scheduledExecutorService = scheduledExecutorService;
+        this.remotingClientCreator = remotingClientCreator;
 
         this.init();
     }
@@ -102,9 +125,13 @@ public class MQClientAPIFactory implements 
StartAndShutdown {
         NettyClientConfig nettyClientConfig = new NettyClientConfig();
         nettyClientConfig.setDisableCallbackExecutor(true);
 
-        MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(clientConfig, 
nettyClientConfig,
+        MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(
+            clientConfig,
+            nettyClientConfig,
             clientRemotingProcessor,
-            rpcHook);
+            rpcHook,
+            remotingClientCreator
+        );
 
         if (!mqClientAPIExt.updateNameServerAddressList()) {
             mqClientAPIExt.fetchNameServerAddr();
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c76d0c734a..6cb96df05f 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ObjectCreator;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
@@ -59,6 +60,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -2104,6 +2106,48 @@ public class MQClientAPIImplTest {
         done.await();
     }
 
+    @Test
+    public void testMQClientAPIImplWithoutObjectCreator() {
+        MQClientAPIImpl clientAPI = new MQClientAPIImpl(
+            new NettyClientConfig(),
+            null,
+            null,
+            new ClientConfig(),
+            null,
+            null
+        );
+        RemotingClient remotingClient1 = clientAPI.getRemotingClient();
+        Assert.assertTrue(remotingClient1 instanceof NettyRemotingClient);
+    }
+
+    @Test
+    public void testMQClientAPIImplWithObjectCreator() {
+        ObjectCreator<RemotingClient> clientObjectCreator = args -> new 
MockRemotingClientTest((NettyClientConfig) args[0]);
+        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MQClientAPIImpl clientAPI = new MQClientAPIImpl(
+            nettyClientConfig,
+            null,
+            null,
+            new ClientConfig(),
+            null,
+            clientObjectCreator
+        );
+        RemotingClient remotingClient1 = clientAPI.getRemotingClient();
+        Assert.assertTrue(remotingClient1 instanceof MockRemotingClientTest);
+        MockRemotingClientTest remotingClientTest = (MockRemotingClientTest) 
remotingClient1;
+        Assert.assertSame(remotingClientTest.getNettyClientConfig(), 
nettyClientConfig);
+    }
+
+    private static class MockRemotingClientTest extends NettyRemotingClient {
+        public MockRemotingClientTest(NettyClientConfig nettyClientConfig) {
+            super(nettyClientConfig);
+        }
+
+        public NettyClientConfig getNettyClientConfig() {
+            return nettyClientConfig;
+        }
+    }
+
     private Properties createProperties() {
         Properties result = new Properties();
         result.put("key", "value");
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
 b/common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
similarity index 51%
copy from 
proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
copy to common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
index c186752788..14c645424f 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
@@ -14,25 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.proxy.service;
+package org.apache.rocketmq.common;
 
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.remoting.RPCHook;
-
-public class ServiceManagerFactory {
-    public static ServiceManager createForLocalMode(BrokerController 
brokerController) {
-        return createForLocalMode(brokerController, null);
-    }
-
-    public static ServiceManager createForLocalMode(BrokerController 
brokerController, RPCHook rpcHook) {
-        return new LocalServiceManager(brokerController, rpcHook);
-    }
-
-    public static ServiceManager createForClusterMode() {
-        return createForClusterMode(null);
-    }
-
-    public static ServiceManager createForClusterMode(RPCHook rpcHook) {
-        return new ClusterServiceManager(rpcHook);
-    }
+public interface ObjectCreator<T> {
+    T create(Object... args);
 }
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index a1c1eecf59..616188e52d 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.MessageStore;
@@ -43,8 +44,11 @@ public class InnerBrokerController extends BrokerController {
 
     @Override
     protected void initializeRemotingServer() {
-        this.remotingServer = 
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
-        this.fastRemotingServer = 
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort()
 - 2);
+        RemotingServer remotingServer = 
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
+        RemotingServer fastRemotingServer = 
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort()
 - 2);
+
+        setRemotingServer(remotingServer);
+        setFastRemotingServer(fastRemotingServer);
     }
 
     @Override
@@ -119,11 +123,11 @@ public class InnerBrokerController extends 
BrokerController {
             scheduledFuture.cancel(true);
         }
 
-        if (this.remotingServer != null) {
+        if (getRemotingServer() != null) {
             
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
         }
 
-        if (this.fastRemotingServer != null) {
+        if (getFastRemotingServer() != null) {
             
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort()
 - 2);
         }
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 9786cec557..33b65d2550 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.client.common.NameserverAccessConfig;
 import 
org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.common.ObjectCreator;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -51,6 +52,7 @@ import 
org.apache.rocketmq.proxy.service.route.TopicRouteService;
 import org.apache.rocketmq.proxy.service.transaction.ClusterTransactionService;
 import org.apache.rocketmq.proxy.service.transaction.TransactionService;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
 
 public class ClusterServiceManager extends AbstractStartAndShutdown implements 
ServiceManager {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -70,6 +72,10 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
     protected MQClientAPIFactory transactionClientAPIFactory;
 
     public ClusterServiceManager(RPCHook rpcHook) {
+        this(rpcHook, null);
+    }
+
+    public ClusterServiceManager(RPCHook rpcHook, 
ObjectCreator<RemotingClient> remotingClientCreator) {
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         NameserverAccessConfig nameserverAccessConfig = new 
NameserverAccessConfig(proxyConfig.getNamesrvAddr(),
             proxyConfig.getNamesrvDomain(), 
proxyConfig.getNamesrvDomainSubgroup());
@@ -81,14 +87,18 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
             proxyConfig.getRocketmqMQClientNum(),
             new DoNothingClientRemotingProcessor(null),
             rpcHook,
-            scheduledExecutorService);
+            scheduledExecutorService,
+            remotingClientCreator
+        );
+
         this.operationClientAPIFactory = new MQClientAPIFactory(
             nameserverAccessConfig,
             "OperationClient_",
             1,
             new DoNothingClientRemotingProcessor(null),
             rpcHook,
-            this.scheduledExecutorService
+            this.scheduledExecutorService,
+            remotingClientCreator
         );
 
         this.topicRouteService = new 
ClusterTopicRouteService(operationClientAPIFactory);
@@ -105,7 +115,10 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
             1,
             new ProxyClientRemotingProcessor(producerManager),
             rpcHook,
-            scheduledExecutorService);
+            scheduledExecutorService,
+            remotingClientCreator
+        );
+
         this.clusterTransactionService = new 
ClusterTransactionService(this.topicRouteService, this.producerManager,
             this.transactionClientAPIFactory);
         this.proxyRelayService = new 
ClusterProxyRelayService(this.clusterTransactionService);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
index c186752788..e1252fe31f 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
@@ -17,7 +17,9 @@
 package org.apache.rocketmq.proxy.service;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ObjectCreator;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
 
 public class ServiceManagerFactory {
     public static ServiceManager createForLocalMode(BrokerController 
brokerController) {
@@ -29,10 +31,14 @@ public class ServiceManagerFactory {
     }
 
     public static ServiceManager createForClusterMode() {
-        return createForClusterMode(null);
+        return createForClusterMode(null, null);
     }
 
     public static ServiceManager createForClusterMode(RPCHook rpcHook) {
-        return new ClusterServiceManager(rpcHook);
+        return createForClusterMode(rpcHook, null);
+    }
+
+    public static ServiceManager createForClusterMode(RPCHook rpcHook, 
ObjectCreator<RemotingClient> remotingClientCreator) {
+        return new ClusterServiceManager(rpcHook, remotingClientCreator);
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index d3f5a88cf2..a4f23f181a 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -673,6 +673,10 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    public HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>> 
getProcessorTable() {
+        return processorTable;
+    }
+
     class NettyEventExecutor extends ServiceThread {
         private final LinkedBlockingQueue<NettyEvent> eventQueue = new 
LinkedBlockingQueue<>();
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 6ac54aed6d..e92809ccdf 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -98,7 +98,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
 
-    private final NettyClientConfig nettyClientConfig;
+    protected final NettyClientConfig nettyClientConfig;
     private final Bootstrap bootstrap = new Bootstrap();
     private final EventLoopGroup eventLoopGroupWorker;
     private final Lock lockChannelTables = new ReentrantLock();
@@ -288,6 +288,13 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         return null;
     }
 
+    protected ChannelFuture doConnect(String addr) {
+        String[] hostAndPort = getHostAndPort(addr);
+        String host = hostAndPort[0];
+        int port = Integer.parseInt(hostAndPort[1]);
+        return fetchBootstrap(addr).connect(host, port);
+    }
+
     private Bootstrap fetchBootstrap(String addr) {
         Map.Entry<String, SocksProxyConfig> proxyEntry = getProxy(addr);
         if (proxyEntry == null) {
@@ -359,7 +366,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
     }
 
     // Do not use RemotingHelper.string2SocketAddress(), it will directly 
resolve the domain
-    private String[] getHostAndPort(String address) {
+    protected String[] getHostAndPort(String address) {
         int split = address.lastIndexOf(":");
         return split < 0 ? new String[]{address} : new 
String[]{address.substring(0, split), address.substring(split + 1)};
     }
@@ -712,9 +719,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
     }
 
     private ChannelWrapper createChannel(String addr) {
-        String[] hostAndPort = getHostAndPort(addr);
-        ChannelFuture channelFuture = fetchBootstrap(addr)
-            .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+        ChannelFuture channelFuture = doConnect(addr);
         LOGGER.info("createChannel: begin to connect remote host[{}] 
asynchronously", addr);
         ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
         this.channelTables.put(addr, cw);
@@ -1047,9 +1052,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 try {
                     if (isWrapperOf(channel)) {
                         channelToClose = channelFuture;
-                        String[] hostAndPort = getHostAndPort(channelAddress);
-                        channelFuture = fetchBootstrap(channelAddress)
-                            .connect(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
+                        channelFuture = doConnect(channelAddress);
                         return true;
                     } else {
                         LOGGER.warn("channelWrapper has reconnect, so do 
nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
@@ -1119,15 +1122,14 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         }
     }
 
-    class NettyClientHandler extends 
SimpleChannelInboundHandler<RemotingCommand> {
-
+    public class NettyClientHandler extends 
SimpleChannelInboundHandler<RemotingCommand> {
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand 
msg) throws Exception {
             processMessageReceived(ctx, msg);
         }
     }
 
-    class NettyConnectManageHandler extends ChannelDuplexHandler {
+    public class NettyConnectManageHandler extends ChannelDuplexHandler {
         @Override
         public void connect(ChannelHandlerContext ctx, SocketAddress 
remoteAddress, SocketAddress localAddress,
             ChannelPromise promise) throws Exception {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index cbf25c23c6..7ed804483b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -54,19 +54,6 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
-import java.time.Duration;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Pair;
@@ -88,15 +75,28 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-@SuppressWarnings("NullableProblems")
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.time.Duration;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 public class NettyRemotingServer extends NettyRemotingAbstract implements 
RemotingServer {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
     private static final Logger TRAFFIC_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_TRAFFIC_NAME);
 
     private final ServerBootstrap serverBootstrap;
-    private final EventLoopGroup eventLoopGroupSelector;
-    private final EventLoopGroup eventLoopGroupBoss;
-    private final NettyServerConfig nettyServerConfig;
+    protected final EventLoopGroup eventLoopGroupSelector;
+    protected final EventLoopGroup eventLoopGroupBoss;
+    protected final NettyServerConfig nettyServerConfig;
 
     private final ExecutorService publicExecutor;
     private final ScheduledExecutorService scheduledExecutorService;
@@ -120,18 +120,18 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
     // sharable handlers
-    private TlsModeHandler tlsModeHandler;
-    private NettyEncoder encoder;
-    private NettyConnectManageHandler connectionManageHandler;
-    private NettyServerHandler serverHandler;
-    private RemotingCodeDistributionHandler distributionHandler;
+    protected final TlsModeHandler tlsModeHandler = new 
TlsModeHandler(TlsSystemConfig.tlsMode);
+    protected final NettyEncoder encoder = new NettyEncoder();
+    protected final NettyConnectManageHandler connectionManageHandler = new 
NettyConnectManageHandler();
+    protected final NettyServerHandler serverHandler = new 
NettyServerHandler();
+    protected final RemotingCodeDistributionHandler distributionHandler = new 
RemotingCodeDistributionHandler();
 
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
         this(nettyServerConfig, null);
     }
 
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
-        final ChannelEventListener channelEventListener) {
+                               final ChannelEventListener 
channelEventListener) {
         super(nettyServerConfig.getServerOnewaySemaphoreValue(), 
nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
         this.nettyServerConfig = nettyServerConfig;
@@ -140,13 +140,13 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         this.publicExecutor = buildPublicExecutor(nettyServerConfig);
         this.scheduledExecutorService = buildScheduleExecutor();
 
-        this.eventLoopGroupBoss = buildBossEventLoopGroup();
+        this.eventLoopGroupBoss = buildEventLoopGroupBoss();
         this.eventLoopGroupSelector = buildEventLoopGroupSelector();
 
         loadSslContext();
     }
 
-    private EventLoopGroup buildEventLoopGroupSelector() {
+    protected EventLoopGroup buildEventLoopGroupSelector() {
         if (useEpoll()) {
             return new 
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactoryImpl("NettyServerEPOLLSelector_"));
         } else {
@@ -154,7 +154,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
     }
 
-    private EventLoopGroup buildBossEventLoopGroup() {
+    protected EventLoopGroup buildEventLoopGroupBoss() {
         if (useEpoll()) {
             return new EpollEventLoopGroup(1, new 
ThreadFactoryImpl("NettyEPOLLBoss_"));
         } else {
@@ -197,13 +197,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             && Epoll.isAvailable();
     }
 
-    @Override
-    public void start() {
-        this.defaultEventExecutorGroup = new 
DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
-            new ThreadFactoryImpl("NettyServerCodecThread_"));
-
-        prepareSharableHandlers();
-
+    protected void initServerBootstrap(ServerBootstrap serverBootstrap) {
         serverBootstrap.group(this.eventLoopGroupBoss, 
this.eventLoopGroupSelector)
             .channel(useEpoll() ? EpollServerSocketChannel.class : 
NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
@@ -220,6 +214,14 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             });
 
         addCustomConfig(serverBootstrap);
+    }
+
+    @Override
+    public void start() {
+        this.defaultEventExecutorGroup = new 
DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
+            new ThreadFactoryImpl("NettyServerCodecThread_"));
+
+        initServerBootstrap(serverBootstrap);
 
         try {
             ChannelFuture sync = serverBootstrap.bind().sync();
@@ -411,14 +413,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         return this.publicExecutor;
     }
 
-    private void prepareSharableHandlers() {
-        tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
-        encoder = new NettyEncoder();
-        connectionManageHandler = new NettyConnectManageHandler();
-        serverHandler = new NettyServerHandler();
-        distributionHandler = new RemotingCodeDistributionHandler();
-    }
-
     private void printRemotingCodeDistribution() {
         if (distributionHandler != null) {
             String inBoundSnapshotString = 
distributionHandler.getInBoundSnapshotString();
@@ -469,8 +463,8 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 }
                 if (detectionResult.state() == 
ProtocolDetectionState.DETECTED) {
                     ctx.pipeline().addAfter(defaultEventExecutorGroup, 
ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
-                            .addAfter(defaultEventExecutorGroup, 
HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
-                            .addAfter(defaultEventExecutorGroup, 
HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
+                        .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, 
HA_PROXY_HANDLER, new HAProxyMessageHandler())
+                        .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, 
TLS_MODE_HANDLER, tlsModeHandler);
                 } else {
                     ctx.pipeline().addAfter(defaultEventExecutorGroup, 
ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
                 }
@@ -664,7 +658,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
         @Override
         public void registerProcessor(final int requestCode, final 
NettyRequestProcessor processor,
-            final ExecutorService executor) {
+                                      final ExecutorService executor) {
             ExecutorService executorThis = executor;
             if (null == executor) {
                 executorThis = NettyRemotingServer.this.publicExecutor;
@@ -708,19 +702,19 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
         @Override
         public RemotingCommand invokeSync(final Channel channel, final 
RemotingCommand request,
-            final long timeoutMillis) throws InterruptedException, 
RemotingSendRequestException, RemotingTimeoutException {
+                                          final long timeoutMillis) throws 
InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
             return this.invokeSyncImpl(channel, request, timeoutMillis);
         }
 
         @Override
         public void invokeAsync(final Channel channel, final RemotingCommand 
request, final long timeoutMillis,
-            final InvokeCallback invokeCallback) throws InterruptedException, 
RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
+                                final InvokeCallback invokeCallback) throws 
InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
             this.invokeAsyncImpl(channel, request, timeoutMillis, 
invokeCallback);
         }
 
         @Override
         public void invokeOneway(final Channel channel, final RemotingCommand 
request,
-            final long timeoutMillis) throws InterruptedException, 
RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
+                                 final long timeoutMillis) throws 
InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
             this.invokeOnewayImpl(channel, request, timeoutMillis);
         }
 

Reply via email to