This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 5da292fba4f3c676964847a2bc653442685b472d
Author: TheR1sing3un <[email protected]>
AuthorDate: Sun Feb 5 22:18:18 2023 +0800

    feat(controller): Improved logic and adaptation testing for persistent 
broker id versions
    
    1. Improved logic and adaptation testing for persistent broker id versions
---
 .../broker/controller/ReplicasManager.java         |  89 +++--
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |   3 +-
 .../controller/BrokerHeartbeatManager.java         |   3 +-
 .../rocketmq/controller/ControllerManager.java     |  55 ++-
 .../impl/DefaultBrokerHeartbeatManager.java        |  15 +-
 .../impl/manager/ReplicasInfoManager.java          |  55 +--
 .../processor/ControllerRequestProcessor.java      |  24 --
 .../impl/controller/ControllerManagerTest.java     | 108 +++---
 .../impl/controller/ControllerTestBase.java        |  27 ++
 .../controller/impl/DLedgerControllerTest.java     | 194 ++++++-----
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |   2 +-
 .../impl/manager/ReplicasInfoManagerTest.java      | 374 +++++++++++----------
 .../remoting/protocol/body/BrokerReplicasInfo.java |  32 +-
 .../protocol/body/RoleChangeNotifyEntry.java       |  11 +-
 .../NotifyBrokerRoleChangedRequestHeader.java      |  37 +-
 .../controller/AlterSyncStateSetRequestHeader.java |   8 +-
 .../controller/ElectMasterResponseHeader.java      |  12 +-
 .../controller/GetReplicaInfoRequestHeader.java    |  18 +-
 .../controller/GetReplicaInfoResponseHeader.java   |   6 +-
 .../CleanControllerBrokerDataRequestHeader.java    |   4 +-
 .../register/ApplyBrokerIdRequestHeader.java       |  20 ++
 .../register/ApplyBrokerIdResponseHeader.java      |  17 +
 .../register/GetNextBrokerIdRequestHeader.java     |  12 +
 .../register/GetNextBrokerIdResponseHeader.java    |  19 ++
 .../register/RegisterSuccessRequestHeader.java     |  19 ++
 .../register/RegisterSuccessResponseHeader.java    |  31 ++
 26 files changed, 679 insertions(+), 516 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index b2b4a9163..7bbe43e1e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -49,7 +49,6 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataRespon
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
@@ -178,16 +177,21 @@ public class ReplicasManager {
         }
 
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
-            if (registerBrokerToController()) {
-                LOGGER.info("First time register broker success");
-                this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE;
-            } else {
+            for (int retryTimes = 0; retryTimes < 5; retryTimes++) {
+                if (registerBrokerToController()) {
+                    LOGGER.info("First time register broker success");
+                    this.state = State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE;
+                    break;
+                }
+            }
+            // register 5 times but still unsuccessful
+            if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
                 return false;
             }
         }
 
         if (this.state == State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) {
-            if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) {
+            if (this.masterBrokerId != null || brokerElect()) {
                 LOGGER.info("Master in this broker set is elected");
                 this.state = State.RUNNING;
             } else {
@@ -365,42 +369,42 @@ public class ReplicasManager {
         }
     }
 
-    private boolean registerBrokerToController() {
-        // Register this broker to controller to get a stable and credible 
broker id, and persist metadata to local file.
-        try {
-            final RegisterBrokerToControllerResponseHeader registerResponse = 
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-                this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, 
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
-                this.haService.getLastEpoch(), 
this.brokerController.getMessageStore().getMaxPhyOffset(), 
this.brokerConfig.getBrokerElectionPriority());
-            final String newMasterAddress = 
registerResponse.getMasterAddress();
-            if (StringUtils.isNoneEmpty(newMasterAddress)) {
-                if (StringUtils.equals(newMasterAddress, this.localAddress)) {
-                    changeToMaster(registerResponse.getMasterEpoch(), 
registerResponse.getSyncStateSetEpoch());
-                } else {
-                    changeToSlave(newMasterAddress, 
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
-                }
-                // Set isolated to false, make broker can register to namesrv 
regularly
-                brokerController.setIsolated(false);
-            } else {
-                // if master address is empty, just apply the brokerId
-                if (registerResponse.getBrokerId() <= 0) {
-                    // wrong broker id
-                    LOGGER.error("Register to controller but receive a invalid 
broker id = {}", registerResponse.getBrokerId());
-                    return false;
-                }
-                this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
-            }
-            return true;
-        } catch (final Exception e) {
-            LOGGER.error("Failed to register broker to controller", e);
-            return false;
-        }
-    }
+//    private boolean registerBrokerToController() {
+//        // Register this broker to controller to get a stable and credible 
broker id, and persist metadata to local file.
+//        try {
+//            final RegisterBrokerToControllerResponseHeader registerResponse 
= this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
+//                this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, 
this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+//                this.haService.getLastEpoch(), 
this.brokerController.getMessageStore().getMaxPhyOffset(), 
this.brokerConfig.getBrokerElectionPriority());
+//            final String newMasterAddress = 
registerResponse.getMasterAddress();
+//            if (StringUtils.isNoneEmpty(newMasterAddress)) {
+//                if (StringUtils.equals(newMasterAddress, this.localAddress)) 
{
+//                    changeToMaster(registerResponse.getMasterEpoch(), 
registerResponse.getSyncStateSetEpoch());
+//                } else {
+//                    changeToSlave(newMasterAddress, 
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
+//                }
+//                // Set isolated to false, make broker can register to 
namesrv regularly
+//                brokerController.setIsolated(false);
+//            } else {
+//                // if master address is empty, just apply the brokerId
+//                if (registerResponse.getBrokerId() <= 0) {
+//                    // wrong broker id
+//                    LOGGER.error("Register to controller but receive a 
invalid broker id = {}", registerResponse.getBrokerId());
+//                    return false;
+//                }
+//                
this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
+//            }
+//            return true;
+//        } catch (final Exception e) {
+//            LOGGER.error("Failed to register broker to controller", e);
+//            return false;
+//        }
+//    }
 
     /**
      * Register broker to controller, and persist the metadata to file
      * @return whether registering process succeeded
      */
-    private boolean registerBrokerToController2() {
+    private boolean registerBrokerToController() {
         try {
             // 1. confirm now registering state
             confirmNowRegisteringState();
@@ -514,6 +518,17 @@ public class ReplicasManager {
     private boolean registerSuccess() {
         try {
             RegisterSuccessResponseHeader response = 
this.brokerOuterAPI.registerSuccess(brokerConfig.getBrokerClusterName(), 
brokerConfig.getBrokerName(), brokerId, localAddress, controllerLeaderAddress);
+            final Long masterBrokerId = response.getMasterBrokerId();
+            final String masterAddress = response.getMasterAddress();
+            if (masterBrokerId == null) {
+                return true;
+            }
+            if (this.brokerId.equals(masterBrokerId)) {
+                changeToMaster(response.getMasterEpoch(), 
response.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, response.getMasterEpoch(), 
masterBrokerId);
+            }
+            brokerController.setIsolated(false);
             return true;
         } catch (Exception e) {
             LOGGER.error("fail to send registerSuccess request to controller", 
e);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 054f1edaa..5f8c670a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -136,7 +136,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMetrics;
 
 import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
-import static 
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_ID_INVALID;
 import static 
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
 import static 
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
 import static 
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
@@ -1258,7 +1257,7 @@ public class BrokerOuterAPI {
      */
     public Pair<GetReplicaInfoResponseHeader, SyncStateSet> 
getReplicaInfo(final String controllerAddress,
         final String brokerName, final String brokerAddress) throws Exception {
-        final GetReplicaInfoRequestHeader requestHeader = new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress);
+        final GetReplicaInfoRequestHeader requestHeader = new 
GetReplicaInfoRequestHeader(brokerName);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 81e3cf31c..ed021bb88 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -61,7 +61,6 @@ public interface BrokerHeartbeatManager {
         /**
          * Trigger when broker inactive.
          */
-        void onBrokerInactive(final String clusterName, final String 
brokerName, final String brokerAddress,
-            final long brokerId);
+        void onBrokerInactive(final String clusterName, final String 
brokerName, final Long brokerId);
     }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 116607cc1..0f565ec81 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -116,18 +115,17 @@ public class ControllerManager {
      *
      * @param clusterName The cluster name of this inactive broker
      * @param brokerName The inactive broker name
-     * @param brokerAddress The inactive broker address(ip)
      * @param brokerId The inactive broker id
      */
-    private void onBrokerInactive(String clusterName, String brokerName, 
String brokerAddress, long brokerId) {
+    private void onBrokerInactive(String clusterName, String brokerName, Long 
brokerId) {
         if (controller.isLeaderState()) {
             try {
-                final CompletableFuture<RemotingCommand> replicaInfoFuture = 
controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, 
brokerAddress));
+                final CompletableFuture<RemotingCommand> replicaInfoFuture = 
controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
                 final RemotingCommand replicaInfoResponse = 
replicaInfoFuture.get(5, TimeUnit.SECONDS);
                 final GetReplicaInfoResponseHeader replicaInfoResponseHeader = 
(GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader();
                 // Not master broker offline
-                if 
(!replicaInfoResponseHeader.getMasterAddress().equals(brokerAddress)) {
-                    log.warn("The {} broker with IP address {} shutdown", 
brokerName, brokerAddress);
+                if 
(!brokerId.equals(replicaInfoResponseHeader.getMasterBrokerId())) {
+                    log.warn("The broker with brokerId: {} in broker-set: {} 
shutdown", brokerId, brokerName);
                     return;
                 }
 
@@ -135,7 +133,7 @@ public class ControllerManager {
                 final RemotingCommand electMasterResponse = 
electMasterFuture.get(5, TimeUnit.SECONDS);
                 final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
                 if (responseHeader != null) {
-                    log.info("Broker {}'s master {} shutdown, elect a new 
master done, result:{}", brokerName, brokerAddress, responseHeader);
+                    log.info("The broker with brokerId: {} in broker-set: {} 
shutdown, elect a new master done, result: {}", brokerId, brokerName, 
responseHeader);
                     if (controllerConfig.isNotifyBrokerRoleChanged()) {
                         
notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
                     }
@@ -144,7 +142,7 @@ public class ControllerManager {
                 log.error("", e);
             }
         } else {
-            log.info("The {} broker with IP address {} shutdown", brokerName, 
brokerAddress);
+            log.warn("The broker with brokerId: {} in broker-set: {} 
shutdown", brokerId, brokerName);
         }
     }
 
@@ -154,38 +152,35 @@ public class ControllerManager {
     public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) {
         final BrokerMemberGroup memberGroup = entry.getBrokerMemberGroup();
         if (memberGroup != null) {
-            final String master = entry.getMasterAddress();
-            if (StringUtils.isEmpty(master)) {
-                log.warn("Notify broker role change failed, because member 
group is not null but the new master address is empty, entry:{}", entry);
+            final Long masterBrokerId = entry.getMasterBrokerId();
+            String clusterName = memberGroup.getCluster();
+            String brokerName = memberGroup.getBrokerName();
+            if (masterBrokerId == null) {
+                log.warn("Notify broker role change failed, because member 
group is not null but the new master brokerId is empty, entry:{}", entry);
                 return;
             }
-            // First, inform the master
-            if (this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), 
master)) {
-                doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, entry);
-            }
-
-            // Then, inform all slaves
-            final Map<Long, String> brokerIdAddrs = 
memberGroup.getBrokerAddrs();
-            for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
-                if (!master.equals(broker.getValue()) && 
this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), 
broker.getValue())) {
-                    doNotifyBrokerRoleChanged(broker.getValue(), 
broker.getKey(), entry);
-                }
-            }
-
+            // Inform all active brokers
+            final Map<Long, String> brokerAddrs = memberGroup.getBrokerAddrs();
+            brokerAddrs.entrySet().stream().filter(x -> 
this.heartbeatManager.isBrokerActive(clusterName, brokerName, x.getKey()))
+                    .forEach(x -> doNotifyBrokerRoleChanged(x.getValue(), 
entry));
         }
     }
 
-    public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long 
brokerId,
-                                          final RoleChangeNotifyEntry entry) {
+    /**
+     * Notify broker that there are roles-changing in controller
+     * @param brokerAddr target broker's address to notify
+     * @param entry role change entry
+     */
+    public void doNotifyBrokerRoleChanged(final String brokerAddr, final 
RoleChangeNotifyEntry entry) {
         if (StringUtils.isNoneEmpty(brokerAddr)) {
-            log.info("Try notify broker {} with id {} that role changed, 
RoleChangeNotifyEntry:{}", brokerAddr, brokerId, entry);
-            final NotifyBrokerRoleChangedRequestHeader requestHeader = new 
NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(),
-                    entry.getMasterEpoch(), entry.getSyncStateSetEpoch(), 
brokerId);
+            log.info("Try notify broker {} that role changed, 
RoleChangeNotifyEntry:{}", brokerAddr, entry);
+            final NotifyBrokerRoleChangedRequestHeader requestHeader = new 
NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), 
entry.getMasterBrokerId(),
+                    entry.getMasterEpoch(), entry.getSyncStateSetEpoch());
             final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, 
requestHeader);
             try {
                 this.remotingClient.invokeOneway(brokerAddr, request, 3000);
             } catch (final Exception e) {
-                log.error("Failed to notify broker {} with id {} that role 
changed", brokerAddr, brokerId, e);
+                log.error("Failed to notify broker {} that role changed", 
brokerAddr, e);
             }
         }
     }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 3045da85e..39edce507 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -72,14 +72,14 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
                 final Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = 
iterator.next();
                 long last = next.getValue().getLastUpdateTimestamp();
                 long timeoutMillis = 
next.getValue().getHeartbeatTimeoutMillis();
-                if ((last + timeoutMillis) < System.currentTimeMillis()) {
+                if (System.currentTimeMillis() - last > timeoutMillis) {
                     final Channel channel = next.getValue().getChannel();
                     iterator.remove();
                     if (channel != null) {
                         RemotingHelper.closeChannel(channel);
                     }
                     this.executor.submit(() ->
-                        notifyBrokerInActive(next.getKey().getClusterName(), 
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), 
next.getValue().getBrokerId()));
+                        notifyBrokerInActive(next.getKey().getClusterName(), 
next.getValue().getBrokerName(), next.getValue().getBrokerId()));
                     log.warn("The broker channel {} expired, brokerInfo {}, 
expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis);
                 }
             }
@@ -88,9 +88,9 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
         }
     }
 
-    private void notifyBrokerInActive(String clusterName, String brokerName, 
String brokerAddr, Long brokerId) {
+    private void notifyBrokerInActive(String clusterName, String brokerName, 
Long brokerId) {
         for (BrokerLifecycleListener listener : this.brokerLifecycleListeners) 
{
-            listener.onBrokerInactive(clusterName, brokerName, brokerAddr, 
brokerId);
+            listener.onBrokerInactive(clusterName, brokerName, brokerId);
         }
     }
 
@@ -126,9 +126,6 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
             prev.setLastUpdateTimestamp(System.currentTimeMillis());
             prev.setHeartbeatTimeoutMillis(realTimeoutMillis);
             prev.setElectionPriority(realElectionPriority);
-            prev.setBrokerId(realBrokerId);
-            prev.setBrokerAddr(brokerAddr);
-            prev.setChannel(channel);
             if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && 
realMaxOffset > prev.getMaxOffset()) {
                 prev.setEpoch(realEpoch);
                 prev.setMaxOffset(realMaxOffset);
@@ -143,10 +140,10 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
         BrokerAddrInfo addrInfo = null;
         for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : 
this.brokerLiveTable.entrySet()) {
             if (entry.getValue().getChannel() == channel) {
-                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", 
entry.getValue().getChannel(), entry.getValue().getBrokerName(), 
entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId());
+                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", 
entry.getValue().getChannel(), entry.getValue().getBrokerName(), 
entry.getValue().getBrokerAddr(), entry.getValue().getBrokerId());
                 addrInfo = entry.getKey();
                 this.executor.submit(() ->
-                    notifyBrokerInActive(entry.getKey().getClusterName(), 
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), 
entry.getValue().getBrokerId()));
+                    notifyBrokerInActive(entry.getKey().getClusterName(), 
entry.getValue().getBrokerName(), entry.getValue().getBrokerId()));
                 break;
             }
         }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 1400932e0..d2061bb24 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -168,7 +168,6 @@ public class ReplicasInfoManager {
         final ElectMasterResponseHeader response = result.getResponse();
         if (!isContainsBroker(brokerName)) {
             // this broker set hasn't been registered
-            response.setMasterAddress("");
             
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, 
"Broker hasn't been registered");
             return result;
         }
@@ -233,7 +232,6 @@ public class ReplicasInfoManager {
         } else {
             
result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to 
elect a new master");
         }
-        response.setMasterAddress("");
         return result;
     }
 
@@ -250,58 +248,6 @@ public class ReplicasInfoManager {
         return null;
     }
 
-//    public ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerBroker(
-//            final RegisterBrokerToControllerRequestHeader request, final 
BrokerValidPredicate alivePredicate) {
-//        String brokerAddress = request.getBrokerAddress();
-//        final String brokerName = request.getBrokerName();
-//        final String clusterName = request.getClusterName();
-//        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
-//        final RegisterBrokerToControllerResponseHeader response = 
result.getResponse();
-//        if (!isContainsBroker(brokerName)) {
-//            
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, 
"Broker-set hasn't been registered in controller");
-//            return result;
-//        }
-//        final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
-//        final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
-//        if (brokerReplicaInfo.isBrokerExist())
-//
-//        // If the broker's metadata does not exist in the state machine, we 
can assign the broker a brokerId valued 1
-//        // By default, we set this variable to a value of 1
-//        long brokerId = MixAll.FIRST_SLAVE_ID;
-//        boolean shouldApplyBrokerId = true;
-//        if (isContainsBroker(brokerName)) {
-//            final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
-//            final BrokerReplicaInfo brokerReplicaInfo = 
this.replicaInfoTable.get(brokerName);
-//
-//            if (brokerReplicaInfo.isBrokerExist(brokerAddress)) {
-//                // this broker have registered
-//                brokerId = brokerReplicaInfo.getBrokerId(brokerAddress);
-//                shouldApplyBrokerId = false;
-//            } else {
-//                // If this broker replicas is first time come online, we 
need to apply a new id for this replicas.
-//                brokerId = brokerReplicaInfo.newBrokerId();
-//            }
-//
-//            if (syncStateInfo.isMasterExist() && 
brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
-//                // If the master is alive, just return master info.
-//                final String masterAddress = 
syncStateInfo.getMasterAddress();
-//                response.setMasterAddress(masterAddress);
-//                response.setMasterEpoch(syncStateInfo.getMasterEpoch());
-//                
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
-//            }
-//        }
-//
-//        response.setBrokerId(brokerId);
-//        if (response.getMasterAddress() == null) {
-//            response.setMasterAddress("");
-//        }
-//        if (shouldApplyBrokerId) {
-//            final ApplyBrokerIdEvent applyIdEvent = new 
ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, 
brokerId);
-//            result.addEvent(applyIdEvent);
-//        }
-//        return result;
-//    }
-
     public ControllerResult<GetNextBrokerIdResponseHeader> 
getNextBrokerId(final GetNextBrokerIdRequestHeader request) {
         final String clusterName = request.getClusterName();
         final String brokerName = request.getBrokerName();
@@ -339,6 +285,7 @@ public class ReplicasInfoManager {
         // broker-set registered
         if (!brokerReplicaInfo.isBrokerExist(brokerId) || 
registerCheckCode.equals(brokerReplicaInfo.getBrokerRegisterCheckCode(brokerId)))
 {
             // if brokerId hasn't been assigned or brokerId was assigned to 
this broker
+            result.addEvent(event);
             return result;
         }
         result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_ID_INVALID, 
String.format("Fail to apply brokerId: %d in broker-set: %s", brokerId, 
brokerName));
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index da1de6ef1..9e300b738 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.controller.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import java.io.UnsupportedEncodingException;
-import java.sql.Time;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
@@ -46,8 +45,6 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanContro
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.APPLY_BROKER_ID;
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT;
@@ -57,7 +54,6 @@ import static 
org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_ELECT
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA;
-import static 
org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONTROLLER_CONFIG;
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.GET_NEXT_BROKER_ID;
 import static 
org.apache.rocketmq.remoting.protocol.RequestCode.REGISTER_SUCCESS;
@@ -90,8 +86,6 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
                 return this.handleAlterSyncStateSet(ctx, request);
             case CONTROLLER_ELECT_MASTER:
                 return this.handleControllerElectMaster(ctx, request);
-            case CONTROLLER_REGISTER_BROKER:
-                return this.handleControllerRegisterBroker(ctx, request);
             case CONTROLLER_GET_REPLICA_INFO:
                 return this.handleControllerGetReplicaInfo(ctx, request);
             case CONTROLLER_GET_METADATA_INFO:
@@ -148,24 +142,6 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
         return RemotingCommand.createResponseCommand(null);
     }
 
-
-    private RemotingCommand 
handleControllerRegisterBroker(ChannelHandlerContext ctx,
-                                                           RemotingCommand 
request) throws Exception {
-        final RegisterBrokerToControllerRequestHeader controllerRequest = 
(RegisterBrokerToControllerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-        final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().registerBroker(controllerRequest);
-        if (future != null) {
-            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, 
TimeUnit.SECONDS);
-            final RegisterBrokerToControllerResponseHeader responseHeader = 
(RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
-            if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
-                
this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), 
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                        responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                        controllerRequest.getEpoch(), 
controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), 
controllerRequest.getElectionPriority());
-            }
-            return response;
-        }
-        return RemotingCommand.createResponseCommand(null);
-    }
-
     private RemotingCommand 
handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
                                                            RemotingCommand 
request) throws Exception {
         final GetReplicaInfoRequestHeader controllerRequest = 
(GetReplicaInfoRequestHeader) 
request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 6a54a15fc..49c56f06b 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -25,9 +25,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.ControllerConfig;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.controller.ControllerManager;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -36,21 +34,23 @@ import 
org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
-import static 
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
-import static 
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -114,6 +114,7 @@ public class ControllerManagerTest {
             }
             return null;
         }, item -> item != null);
+        System.out.println("leader born!!!!!");
         return manager;
     }
 
@@ -128,35 +129,56 @@ public class ControllerManagerTest {
     /**
      * Register broker to controller
      */
-    public RegisterBrokerToControllerResponseHeader registerBroker(
+    public void registerBroker(
         final String controllerAddress, final String clusterName,
-        final String brokerName, final String address, final RemotingClient 
client,
-        final long heartbeatTimeoutMillis) throws Exception {
+        final String brokerName, final Long brokerId,  final String 
brokerAddress, final Long expectMasterBrokerId, final RemotingClient client) 
throws Exception {
+        // Get next brokerId;
+        final GetNextBrokerIdRequestHeader getNextBrokerIdRequestHeader = new 
GetNextBrokerIdRequestHeader(clusterName, brokerName);
+        final RemotingCommand getNextBrokerIdRequest = 
RemotingCommand.createRequestCommand(RequestCode.GET_NEXT_BROKER_ID, 
getNextBrokerIdRequestHeader);
+        final RemotingCommand getNextBrokerIdResponse = 
client.invokeSync(controllerAddress, getNextBrokerIdRequest, 3000);
+        final GetNextBrokerIdResponseHeader getNextBrokerIdResponseHeader = 
(GetNextBrokerIdResponseHeader) 
getNextBrokerIdResponse.decodeCommandCustomHeader(GetNextBrokerIdResponseHeader.class);
+        String registerCheckCode = brokerAddress + ";" + 
System.currentTimeMillis();
+        assertEquals(ResponseCode.SUCCESS, getNextBrokerIdResponse.getCode());
+        assertEquals(brokerId, 
getNextBrokerIdResponseHeader.getNextBrokerId());
 
-        final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, 
heartbeatTimeoutMillis, 1, 1000L, 0);
-        final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, 
requestHeader);
-        final RemotingCommand response = client.invokeSync(controllerAddress, 
request, 3000);
-        assertNotNull(response);
-        switch (response.getCode()) {
-            case SUCCESS: {
-                return (RegisterBrokerToControllerResponseHeader) 
response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
-            }
-            case CONTROLLER_NOT_LEADER: {
-                throw new MQBrokerException(response.getCode(), "Controller 
leader was changed");
-            }
-        }
-        throw new MQBrokerException(response.getCode(), response.getRemark());
+        // Apply brokerId
+        final ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader = new 
ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, 
registerCheckCode);
+        final RemotingCommand applyBrokerIdRequest = 
RemotingCommand.createRequestCommand(RequestCode.APPLY_BROKER_ID, 
applyBrokerIdRequestHeader);
+        final RemotingCommand applyBrokerIdResponse = 
client.invokeSync(controllerAddress, applyBrokerIdRequest, 3000);
+        final ApplyBrokerIdResponseHeader applyBrokerIdResponseHeader = 
(ApplyBrokerIdResponseHeader) 
applyBrokerIdResponse.decodeCommandCustomHeader(ApplyBrokerIdResponseHeader.class);
+        assertEquals(ResponseCode.SUCCESS, applyBrokerIdResponse.getCode());
+
+        // Register success
+        final RegisterSuccessRequestHeader registerSuccessRequestHeader = new 
RegisterSuccessRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
+        final RemotingCommand registerSuccessRequest = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_SUCCESS, 
registerSuccessRequestHeader);
+        final RemotingCommand registerSuccessResponse = 
client.invokeSync(controllerAddress, registerSuccessRequest, 3000);
+        final RegisterSuccessResponseHeader registerSuccessResponseHeader = 
(RegisterSuccessResponseHeader) 
registerSuccessResponse.decodeCommandCustomHeader(RegisterSuccessResponseHeader.class);
+        assertEquals(ResponseCode.SUCCESS, registerSuccessResponse.getCode());
+        assertEquals(expectMasterBrokerId, 
registerSuccessResponseHeader.getMasterBrokerId());
     }
 
     public RemotingCommand brokerTryElect(final String controllerAddress, 
final String clusterName,
-        final String brokerName, final String brokerAddress, final 
RemotingClient client) throws Exception {
-        final ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, 
brokerAddress);
+        final String brokerName, final Long brokerId, final RemotingClient 
client) throws Exception {
+        final ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, 
requestHeader);
         RemotingCommand response = client.invokeSync(controllerAddress, 
request, 3000);
         assertNotNull(response);
         return response;
     }
 
+    public void sendHeartbeat(final String controllerAddress, final String 
clusterName, final String brokerName, final Long brokerId,
+                              final String brokerAddress, final Long timeout, 
final RemotingClient client) throws Exception {
+        final BrokerHeartbeatRequestHeader heartbeatRequestHeader0 = new 
BrokerHeartbeatRequestHeader();
+        heartbeatRequestHeader0.setBrokerId(brokerId);
+        heartbeatRequestHeader0.setClusterName(clusterName);
+        heartbeatRequestHeader0.setBrokerName(brokerName);
+        heartbeatRequestHeader0.setBrokerAddr(brokerAddress);
+        heartbeatRequestHeader0.setHeartbeatTimeoutMills(timeout);
+        final RemotingCommand heartbeatRequest = 
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, 
heartbeatRequestHeader0);
+        RemotingCommand remotingCommand = client.invokeSync(controllerAddress, 
heartbeatRequest, 3000);
+        assertEquals(ResponseCode.SUCCESS, remotingCommand.getCode());
+    }
+
     @Test
     public void testSomeApi() throws Exception {
         mockData();
@@ -164,37 +186,41 @@ public class ControllerManagerTest {
         String leaderAddr = "localhost" + ":" + 
leader.getController().getRemotingServer().localListenPort();
 
         // Register two broker
-        final RegisterBrokerToControllerResponseHeader responseHeader1 = 
registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", 
this.remotingClient, 1000L);
-        assert responseHeader1 != null;
-        assertEquals(responseHeader1.getBrokerId(), MixAll.FIRST_SLAVE_ID);
+        registerBroker(leaderAddr, "cluster1", "broker1", 1L, 
"127.0.0.1:8000", null, this.remotingClient);
+
+        registerBroker(leaderAddr, "cluster1", "broker1", 2L, 
"127.0.0.1:8001", null, this.remotingClient1);
 
-        final RegisterBrokerToControllerResponseHeader responseHeader2 = 
registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", 
this.remotingClient1, 4000L);
-        assert responseHeader2 != null;
-        assertEquals(responseHeader2.getBrokerId(), MixAll.FIRST_SLAVE_ID + 1);
+        // Send heartbeat
+        sendHeartbeat(leaderAddr, "cluster1", "broker1", 1L, "127.0.0.1:8000", 
3000L, remotingClient);
+        sendHeartbeat(leaderAddr, "cluster1", "broker1", 2L, "127.0.0.1:8001", 
3000L, remotingClient1);
 
         // Two all try elect itself as master, but only the first can be the 
master
-        RemotingCommand tryElectCommand1 = brokerTryElect(leaderAddr, 
"cluster1", "broker1", "127.0.0.1:8000", this.remotingClient);
+        RemotingCommand tryElectCommand1 = brokerTryElect(leaderAddr, 
"cluster1", "broker1", 1L, this.remotingClient);
         ElectMasterResponseHeader brokerTryElectResponseHeader1 = 
(ElectMasterResponseHeader) 
tryElectCommand1.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
-        RemotingCommand tryElectCommand2 = brokerTryElect(leaderAddr, 
"cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1);
+        RemotingCommand tryElectCommand2 = brokerTryElect(leaderAddr, 
"cluster1", "broker1", 2L, this.remotingClient1);
         ElectMasterResponseHeader brokerTryElectResponseHeader2 = 
(ElectMasterResponseHeader) 
tryElectCommand2.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
 
-        assertEquals(SUCCESS, tryElectCommand1.getCode());
+        assertEquals(ResponseCode.SUCCESS, tryElectCommand1.getCode());
+        assertEquals(1L, 
brokerTryElectResponseHeader1.getMasterBrokerId().longValue());
         assertEquals("127.0.0.1:8000", 
brokerTryElectResponseHeader1.getMasterAddress());
-        assertEquals(1L, brokerTryElectResponseHeader1.getMasterEpoch());
-        assertEquals(1L, brokerTryElectResponseHeader1.getSyncStateSetEpoch());
+        assertEquals(1, 
brokerTryElectResponseHeader1.getMasterEpoch().intValue());
+        assertEquals(1, 
brokerTryElectResponseHeader1.getSyncStateSetEpoch().intValue());
 
-        assertEquals(CONTROLLER_MASTER_STILL_EXIST, 
tryElectCommand2.getCode());
+        assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, 
tryElectCommand2.getCode());
+        assertEquals(1L, 
brokerTryElectResponseHeader2.getMasterBrokerId().longValue());
         assertEquals("127.0.0.1:8000", 
brokerTryElectResponseHeader2.getMasterAddress());
-        assertEquals(1L, brokerTryElectResponseHeader2.getMasterEpoch());
-        assertEquals(1L, brokerTryElectResponseHeader2.getSyncStateSetEpoch());
+        assertEquals(1, 
brokerTryElectResponseHeader2.getMasterEpoch().intValue());
+        assertEquals(1, 
brokerTryElectResponseHeader2.getSyncStateSetEpoch().intValue());
 
-        // Send heartbeat for broker2
+        // Send heartbeat for broker2 every one second
         ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
         executor.scheduleAtFixedRate(() -> {
             final BrokerHeartbeatRequestHeader heartbeatRequestHeader = new 
BrokerHeartbeatRequestHeader();
             heartbeatRequestHeader.setClusterName("cluster1");
             heartbeatRequestHeader.setBrokerName("broker1");
             heartbeatRequestHeader.setBrokerAddr("127.0.0.1:8001");
+            heartbeatRequestHeader.setBrokerId(2L);
+            heartbeatRequestHeader.setHeartbeatTimeoutMills(3000L);
             final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, 
heartbeatRequestHeader);
             try {
                 final RemotingCommand remotingCommand = 
this.remotingClient1.invokeSync(leaderAddr, request, 3000);
@@ -207,7 +233,7 @@ public class ControllerManagerTest {
             final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, 
requestHeader);
             final RemotingCommand response = 
this.remotingClient1.invokeSync(leaderAddr, request, 3000);
             final GetReplicaInfoResponseHeader responseHeader = 
(GetReplicaInfoResponseHeader) 
response.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class);
-            return StringUtils.equals(responseHeader.getMasterAddress(), 
"127.0.0.1:8001");
+            return responseHeader.getMasterBrokerId().equals(2L);
         }, item -> item);
 
         // The new master should be broker2.
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
new file mode 100644
index 000000000..9b8fa757c
--- /dev/null
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerTestBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.impl.controller;
+
+public class ControllerTestBase {
+
+    public final static String DEFAULT_CLUSTER_NAME = "cluster-a";
+
+    public final static String DEFAULT_BROKER_NAME = "broker-set-a";
+
+    public final static String[] DEFAULT_IP = {"127.0.0.1:9000", 
"127.0.0.1:9001", "127.0.0.1:9002"};
+}
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 9cfd1146e..7b3953508 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -22,11 +22,11 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.controller.Controller;
@@ -41,16 +41,20 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterReques
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static 
org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
+import static 
org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
+import static 
org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -96,42 +100,44 @@ public class DLedgerControllerTest {
     }
 
     public void registerNewBroker(Controller leader, String clusterName, 
String brokerName, String brokerAddress,
-        long expectBrokerId) throws Exception {
-        // Register new broker
-        final RegisterBrokerToControllerRequestHeader registerRequest = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress);
-        RemotingCommand response = 
await().atMost(Duration.ofSeconds(20)).until(() -> {
-            try {
-                final RemotingCommand responseInner = 
leader.registerBroker(registerRequest).get(2, TimeUnit.SECONDS);
-                if (responseInner == null || responseInner.getCode() != 
ResponseCode.SUCCESS) {
-                    return null;
-                }
-                return responseInner;
-            } catch (Exception e) {
-                e.printStackTrace();
-                return null;
-            }
-        }, Objects::nonNull);
+        Long expectBrokerId) throws Exception {
+        // Get next brokerId
+        final GetNextBrokerIdRequestHeader getNextBrokerIdRequest = new 
GetNextBrokerIdRequestHeader(clusterName, brokerName);
+        RemotingCommand remotingCommand = 
leader.getNextBrokerId(getNextBrokerIdRequest).get(2, TimeUnit.SECONDS);
+        GetNextBrokerIdResponseHeader getNextBrokerIdResp = 
(GetNextBrokerIdResponseHeader) remotingCommand.readCustomHeader();
+        Long nextBrokerId = getNextBrokerIdResp.getNextBrokerId();
+        String registerCheckCode = brokerAddress + ";" + 
System.currentTimeMillis();
 
-        final RegisterBrokerToControllerResponseHeader registerResult = 
(RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
+        // Check response
+        assertEquals(expectBrokerId, nextBrokerId);
 
-        assertEquals(expectBrokerId, registerResult.getBrokerId());
-    }
+        // Apply brokerId
+        final ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader = new 
ApplyBrokerIdRequestHeader(clusterName, brokerName, nextBrokerId, 
registerCheckCode);
+        RemotingCommand remotingCommand1 = 
leader.applyBrokerId(applyBrokerIdRequestHeader).get(2, TimeUnit.SECONDS);
 
-    public void brokerTryElectMaster(Controller leader, String clusterName, 
String brokerName, String brokerAddress,
-        boolean exceptSuccess) {
-        final ElectMasterRequestHeader electMasterRequestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, 
brokerAddress);
-        RemotingCommand command = 
await().atMost(Duration.ofSeconds(20)).until(() -> {
-            return leader.electMaster(electMasterRequestHeader).get(2, 
TimeUnit.SECONDS);
-        }, Objects::nonNull);
+        // Check response
+        assertEquals(ResponseCode.SUCCESS, remotingCommand1.getCode());
+
+        // Register success
+        final RegisterSuccessRequestHeader registerSuccessRequestHeader = new 
RegisterSuccessRequestHeader(clusterName, brokerName, nextBrokerId, 
brokerAddress);
+        RemotingCommand remotingCommand2 = 
leader.registerSuccess(registerSuccessRequestHeader).get(2, TimeUnit.SECONDS);
+
+
+        assertEquals(ResponseCode.SUCCESS, remotingCommand2.getCode());
+    }
 
+    public void brokerTryElectMaster(Controller leader, String clusterName, 
String brokerName, String brokerAddress, Long brokerId,
+        boolean exceptSuccess) throws Exception {
+        final ElectMasterRequestHeader electMasterRequestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
+        RemotingCommand command = 
leader.electMaster(electMasterRequestHeader).get(2, TimeUnit.SECONDS);
         ElectMasterResponseHeader header = (ElectMasterResponseHeader) 
command.readCustomHeader();
         assertEquals(exceptSuccess, ResponseCode.SUCCESS == command.getCode());
     }
 
-    private boolean alterNewInSyncSet(Controller leader, String brokerName, 
String masterAddress, int masterEpoch,
-        Set<String> newSyncStateSet, int syncStateSetEpoch) throws Exception {
+    private boolean alterNewInSyncSet(Controller leader, String brokerName, 
Long masterBrokerId, Integer masterEpoch,
+        Set<Long> newSyncStateSet, Integer syncStateSetEpoch) throws Exception 
{
         final AlterSyncStateSetRequestHeader alterRequest =
-            new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
+            new AlterSyncStateSetRequestHeader(brokerName, masterBrokerId, 
masterEpoch);
         final RemotingCommand response = 
leader.alterSyncStateSet(alterRequest, new SyncStateSet(newSyncStateSet, 
syncStateSetEpoch)).get(10, TimeUnit.SECONDS);
         if (null == response || response.getCode() != ResponseCode.SUCCESS) {
             return false;
@@ -177,30 +183,30 @@ public class DLedgerControllerTest {
         DLedgerController leader = waitLeader(controllers);
 
         // register
-        registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", 1L);
-        registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", 2L);
-        registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", 3L);
+        registerNewBroker(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[0], 1L);
+        registerNewBroker(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[1], 2L);
+        registerNewBroker(leader, DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[2], 3L);
         // try elect
-        brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9000", 
true);
-        brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9001", 
false);
-        brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9002", 
false);
-        final RemotingCommand getInfoResponse = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
+        brokerTryElectMaster(leader, DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L,true);
+        brokerTryElectMaster(leader, DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L,  false);
+        brokerTryElectMaster(leader, DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L,false);
+        final RemotingCommand getInfoResponse = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).get(10, TimeUnit.SECONDS);
         final GetReplicaInfoResponseHeader replicaInfo = 
(GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
-        assertEquals(1, replicaInfo.getMasterEpoch());
-        assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress());
+        assertEquals(1, replicaInfo.getMasterEpoch().intValue());
+        assertEquals(DEFAULT_IP[0], replicaInfo.getMasterAddress());
         // Try alter sync state set
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add("127.0.0.1:9000");
-        newSyncStateSet.add("127.0.0.1:9001");
-        newSyncStateSet.add("127.0.0.1:9002");
-        assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, 
newSyncStateSet, 1));
+        final HashSet<Long> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add(1L);
+        newSyncStateSet.add(2L);
+        newSyncStateSet.add(3L);
+        assertTrue(alterNewInSyncSet(leader, DEFAULT_BROKER_NAME, 1L, 1, 
newSyncStateSet, 1));
         return leader;
     }
 
-    public void setBrokerAlivePredicate(DLedgerController controller, 
String... deathBroker) {
-        controller.setBrokerAlivePredicate((clusterName, brokerName, 
brokerAddress) -> {
-            for (String broker : deathBroker) {
-                if (broker.equals(brokerAddress)) {
+    public void setBrokerAlivePredicate(DLedgerController controller, Long... 
deathBroker) {
+        controller.setBrokerAlivePredicate((clusterName, brokerName, brokerId) 
-> {
+            for (Long broker : deathBroker) {
+                if (broker.equals(brokerId)) {
                     return false;
                 }
             }
@@ -208,10 +214,10 @@ public class DLedgerControllerTest {
         });
     }
 
-    public void setBrokerElectPolicy(DLedgerController controller, String... 
deathBroker) {
-        controller.setElectPolicy(new DefaultElectPolicy((clusterName, 
brokerName, brokerAddress) -> {
-            for (String broker : deathBroker) {
-                if (broker.equals(brokerAddress)) {
+    public void setBrokerElectPolicy(DLedgerController controller, Long... 
deathBroker) {
+        controller.setElectPolicy(new DefaultElectPolicy((clusterName, 
brokerName, brokerId) -> {
+            for (Long broker : deathBroker) {
+                if (broker.equals(brokerId)) {
                     return false;
                 }
             }
@@ -222,78 +228,80 @@ public class DLedgerControllerTest {
     @Test
     public void testElectMaster() throws Exception {
         final DLedgerController leader = mockMetaData(false);
-        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
-        setBrokerElectPolicy(leader, "127.0.0.1:9000");
+        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
+        setBrokerElectPolicy(leader, 1L);
         final RemotingCommand resp = leader.electMaster(request).get(10, 
TimeUnit.SECONDS);
         final ElectMasterResponseHeader response = (ElectMasterResponseHeader) 
resp.readCustomHeader();
-        assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getMasterAddress().isEmpty());
-        assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000");
+        assertEquals(2, response.getMasterEpoch().intValue());
+        assertNotEquals(1L, response.getMasterBrokerId().longValue());
+        assertNotEquals(DEFAULT_IP[0], response.getMasterAddress());
     }
 
     @Test
     public void 
testAllReplicasShutdownAndRestartWithUnEnableElectUnCleanMaster() throws 
Exception {
         final DLedgerController leader = mockMetaData(false);
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add("127.0.0.1:9000");
+        final HashSet<Long> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add(1L);
 
-        assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, 
newSyncStateSet, 2));
+        assertTrue(alterNewInSyncSet(leader, DEFAULT_BROKER_NAME, 1L, 1, 
newSyncStateSet, 2));
 
         // Now we trigger electMaster api, which means the old master is 
shutdown and want to elect a new master.
-        // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, 
not more replicas can be elected as master, it will be failed.
-        final ElectMasterRequestHeader electRequest = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
-        setBrokerElectPolicy(leader, "127.0.0.1:9000");
+        // However, the syncStateSet in statemachine is {1}, not more replicas 
can be elected as master, it will be failed.
+        final ElectMasterRequestHeader electRequest = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
+        setBrokerElectPolicy(leader, 1L);
         leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
 
-        final RemotingCommand resp = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader("broker1")).
+        final RemotingCommand resp = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).
             get(10, TimeUnit.SECONDS);
         final GetReplicaInfoResponseHeader replicaInfo = 
(GetReplicaInfoResponseHeader) resp.readCustomHeader();
         final SyncStateSet syncStateSet = 
RemotingSerializable.decode(resp.getBody(), SyncStateSet.class);
         assertEquals(syncStateSet.getSyncStateSet(), newSyncStateSet);
-        assertEquals(replicaInfo.getMasterAddress(), "");
-        assertEquals(replicaInfo.getMasterEpoch(), 2);
+        assertEquals(null, replicaInfo.getMasterAddress());
+        assertEquals(2, replicaInfo.getMasterEpoch().intValue());
 
-        // Now, we start broker1 - 127.0.0.1:9001 to try elect, but it was not 
in syncStateSet, so it will not be elected as master.
+        // Now, we start broker - id[2]address[127.0.0.1:9001] to try elect, 
but it was not in syncStateSet, so it will not be elected as master.
         final ElectMasterRequestHeader request1 =
-            ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", 
"127.0.0.1:9001");
+            ElectMasterRequestHeader.ofBrokerTrigger(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, 2L);
         final ElectMasterResponseHeader r1 = (ElectMasterResponseHeader) 
leader.electMaster(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
-        assertEquals(r1.getMasterAddress(), "");
+        assertEquals(null, r1.getMasterBrokerId());
+        assertEquals(null, r1.getMasterAddress());
 
-        // Now, we start broker1 - 127.0.0.1:9000 to try elect, it will be 
elected as master
+        // Now, we start broker - id[1]address[127.0.0.1:9000] to try elect, 
it will be elected as master
         setBrokerElectPolicy(leader);
         final ElectMasterRequestHeader request2 =
-            ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", 
"127.0.0.1:9000");
+            ElectMasterRequestHeader.ofBrokerTrigger(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, 1L);
         final ElectMasterResponseHeader r2 = (ElectMasterResponseHeader) 
leader.electMaster(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
-        assertEquals(r2.getMasterAddress(), "127.0.0.1:9000");
-        assertEquals(r2.getMasterEpoch(), 3);
+        assertEquals(1L, r2.getMasterBrokerId().longValue());
+        assertEquals(DEFAULT_IP[0], r2.getMasterAddress());
+        assertEquals(3, r2.getMasterEpoch().intValue());
     }
 
     @Test
     public void testEnableElectUnCleanMaster() throws Exception {
         final DLedgerController leader = mockMetaData(true);
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add("127.0.0.1:9000");
+        final HashSet<Long> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add(1L);
 
-        assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, 
newSyncStateSet, 2));
+        assertTrue(alterNewInSyncSet(leader, DEFAULT_BROKER_NAME, 1L, 1, 
newSyncStateSet, 2));
 
         // Now we trigger electMaster api, which means the old master is 
shutdown and want to elect a new master.
-        // However, event if the syncStateSet in statemachine is 
{"127.0.0.1:9000"}
+        // However, event if the syncStateSet in statemachine is 
{DEFAULT_IP[0]}
         // the option {enableElectUncleanMaster = true}, so the controller 
sill can elect a new master
-        final ElectMasterRequestHeader electRequest = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
-        setBrokerElectPolicy(leader, "127.0.0.1:9000");
+        final ElectMasterRequestHeader electRequest = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
+        setBrokerElectPolicy(leader, 1L);
         final CompletableFuture<RemotingCommand> future = 
leader.electMaster(electRequest);
         future.get(10, TimeUnit.SECONDS);
 
-        final RemotingCommand resp = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
+        final RemotingCommand resp = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).get(10, TimeUnit.SECONDS);
         final GetReplicaInfoResponseHeader replicaInfo = 
(GetReplicaInfoResponseHeader) resp.readCustomHeader();
         final SyncStateSet syncStateSet = 
RemotingSerializable.decode(resp.getBody(), SyncStateSet.class);
 
-        final HashSet<String> newSyncStateSet2 = new HashSet<>();
-        newSyncStateSet2.add(replicaInfo.getMasterAddress());
+        final HashSet<Long> newSyncStateSet2 = new HashSet<>();
+        newSyncStateSet2.add(replicaInfo.getMasterBrokerId());
         assertEquals(syncStateSet.getSyncStateSet(), newSyncStateSet2);
-        assertNotEquals(replicaInfo.getMasterAddress(), "");
-        assertNotEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
-        assertEquals(replicaInfo.getMasterEpoch(), 2);
+        assertNotEquals(1L, replicaInfo.getMasterBrokerId().longValue());
+        assertNotEquals(DEFAULT_IP[0], replicaInfo.getMasterAddress());
+        assertEquals(2, replicaInfo.getMasterEpoch().intValue());
     }
 
     @Test
@@ -306,7 +314,7 @@ public class DLedgerControllerTest {
         assertNotNull(newLeader);
 
         RemotingCommand response = 
await().atMost(Duration.ofSeconds(10)).until(() -> {
-            final RemotingCommand resp = newLeader.getReplicaInfo(new 
GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
+            final RemotingCommand resp = newLeader.getReplicaInfo(new 
GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).get(10, TimeUnit.SECONDS);
             if (resp.getCode() == ResponseCode.SUCCESS) {
 
                 return resp;
@@ -316,13 +324,13 @@ public class DLedgerControllerTest {
         }, item -> item != null);
         final GetReplicaInfoResponseHeader replicaInfo = 
(GetReplicaInfoResponseHeader) response.readCustomHeader();
         final SyncStateSet syncStateSetResult = 
RemotingSerializable.decode(response.getBody(), SyncStateSet.class);
-        assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
-        assertEquals(replicaInfo.getMasterEpoch(), 1);
+        assertEquals(replicaInfo.getMasterAddress(), DEFAULT_IP[0]);
+        assertEquals(1, replicaInfo.getMasterEpoch().intValue());
 
-        final HashSet<String> syncStateSet = new HashSet<>();
-        syncStateSet.add("127.0.0.1:9000");
-        syncStateSet.add("127.0.0.1:9001");
-        syncStateSet.add("127.0.0.1:9002");
+        final HashSet<Long> syncStateSet = new HashSet<>();
+        syncStateSet.add(1L);
+        syncStateSet.add(2L);
+        syncStateSet.add(3L);
         assertEquals(syncStateSetResult.getSyncStateSet(), syncStateSet);
     }
 }
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index 306acf5b6..7b1e086e3 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -40,7 +40,7 @@ public class DefaultBrokerHeartbeatManagerTest {
     @Test
     public void testDetectBrokerAlive() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        this.heartbeatManager.addBrokerLifecycleListener((clusterName, 
brokerName, brokerAddress, brokerId) -> {
+        this.heartbeatManager.addBrokerLifecycleListener((clusterName, 
brokerName, brokerId) -> {
             latch.countDown();
         });
         this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:7000", 1L,3000L, null,
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index d5cad8188..3b93b6740 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -20,11 +20,11 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
+
 import org.apache.rocketmq.common.ControllerConfig;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
+import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
 import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
@@ -41,16 +41,24 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterReques
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static 
org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_BROKER_NAME;
+import static 
org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_CLUSTER_NAME;
+import static 
org.apache.rocketmq.controller.impl.controller.ControllerTestBase.DEFAULT_IP;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class ReplicasInfoManagerTest {
@@ -60,11 +68,9 @@ public class ReplicasInfoManagerTest {
 
     private ControllerConfig config;
 
-    private ElectPolicy electPolicy;
 
     @Before
     public void init() {
-        this.electPolicy = new DefaultElectPolicy((clusterName, brokerAddr) -> 
true, null);
         this.config = new ControllerConfig();
         this.config.setEnableElectUncleanMaster(false);
         this.config.setScanNotActiveBrokerInterval(300000000);
@@ -80,62 +86,102 @@ public class ReplicasInfoManagerTest {
         this.heartbeatManager = null;
     }
 
+    private BrokerReplicasInfo.ReplicasInfo getReplicasInfo(String brokerName) 
{
+        ControllerResult<Void> syncStateData = 
this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName));
+        BrokerReplicasInfo replicasInfo = 
RemotingSerializable.decode(syncStateData.getBody(), BrokerReplicasInfo.class);
+        return replicasInfo.getReplicasInfoTable().get(brokerName);
+    }
+
     public void registerNewBroker(String clusterName, String brokerName, 
String brokerAddress,
-        long exceptBrokerId, String exceptMasterAddress) {
-        // Register new broker
-        final RegisterBrokerToControllerRequestHeader registerRequest =
-            new RegisterBrokerToControllerRequestHeader(clusterName, 
brokerName, brokerAddress);
-        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerResult = this.replicasInfoManager.registerBroker(registerRequest, (s, 
v) -> true);
-        apply(registerResult.getEvents());
+                                  Long exceptBrokerId, Long 
exceptMasterBrokerId) {
+
+        // Get next brokerId
+        final GetNextBrokerIdRequestHeader getNextBrokerIdRequestHeader = new 
GetNextBrokerIdRequestHeader(clusterName, brokerName);
+        final ControllerResult<GetNextBrokerIdResponseHeader> 
nextBrokerIdResult = 
this.replicasInfoManager.getNextBrokerId(getNextBrokerIdRequestHeader);
+        Long nextBrokerId = nextBrokerIdResult.getResponse().getNextBrokerId();
+        String registerCheckCode = brokerAddress + ";" + 
System.currentTimeMillis();
+
         // check response
-        assertEquals(ResponseCode.SUCCESS, registerResult.getResponseCode());
-        assertEquals(exceptBrokerId, 
registerResult.getResponse().getBrokerId());
-        assertEquals(exceptMasterAddress, 
registerResult.getResponse().getMasterAddress());
+        assertEquals(ResponseCode.SUCCESS, 
nextBrokerIdResult.getResponseCode());
+        assertEquals(exceptBrokerId, nextBrokerId);
+
+        // Apply brokerId
+        final ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader = new 
ApplyBrokerIdRequestHeader(clusterName, brokerName, nextBrokerId, 
registerCheckCode);
+        final ControllerResult<ApplyBrokerIdResponseHeader> 
applyBrokerIdResult = 
this.replicasInfoManager.applyBrokerId(applyBrokerIdRequestHeader);
+        apply(applyBrokerIdResult.getEvents());
+
+        // check response
+        assertEquals(ResponseCode.SUCCESS, 
applyBrokerIdResult.getResponseCode());
+
         // check it in state machine
-        final GetReplicaInfoResponseHeader replicaInfo = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
-        assertEquals(exceptBrokerId, replicaInfo.getBrokerId());
-    }
+        BrokerReplicasInfo.ReplicasInfo replicasInfo = 
getReplicasInfo(brokerName);
+        BrokerReplicasInfo.ReplicaIdentity replicaIdentity = 
replicasInfo.getNotInSyncReplicas().stream().filter(x -> 
x.getBrokerId().equals(nextBrokerId)).findFirst().get();
+        assertNotNull(replicaIdentity);
+        assertEquals(brokerName, replicaIdentity.getBrokerName());
+        assertEquals(exceptBrokerId, replicaIdentity.getBrokerId());
+        assertEquals(brokerAddress, replicaIdentity.getBrokerAddress());
+
+        // register success
+        final RegisterSuccessRequestHeader registerSuccessRequestHeader = new 
RegisterSuccessRequestHeader(clusterName, brokerName, exceptBrokerId, 
brokerAddress);
+        ControllerResult<RegisterSuccessResponseHeader> registerSuccessResult 
= this.replicasInfoManager.registerSuccess(registerSuccessRequestHeader, (a, b, 
c) -> true);
+        apply(registerSuccessResult.getEvents());
 
-    public void brokerElectMaster(String clusterName, long brokerId, String 
brokerName, String brokerAddress,
-        boolean isFirstTryElect) {
+        // check response
+        assertEquals(ResponseCode.SUCCESS, 
registerSuccessResult.getResponseCode());
+        assertEquals(exceptMasterBrokerId, 
registerSuccessResult.getResponse().getMasterBrokerId());
 
-        final GetReplicaInfoResponseHeader replicaInfoBefore = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
-        byte[] body = 
this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)).getBody();
-        BrokerReplicasInfo syncStateDataBefore = 
RemotingSerializable.decode(body, BrokerReplicasInfo.class);
+    }
+    public void brokerElectMaster(String clusterName, Long brokerId, String 
brokerName, String brokerAddress, boolean isFirstTryElect, boolean 
expectToBeElected) {
+        this.brokerElectMaster(clusterName, brokerId, brokerName, 
brokerAddress, isFirstTryElect,expectToBeElected, (a, b, c) -> true);
+    }
+    
+    public void brokerElectMaster(String clusterName, Long brokerId, String 
brokerName, String brokerAddress, boolean isFirstTryElect, boolean 
expectToBeElected, BrokerValidPredicate validPredicate) {
+
+        final GetReplicaInfoResponseHeader replicaInfoBefore = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName)).getResponse();
+        BrokerReplicasInfo.ReplicasInfo syncStateSetInfo = 
getReplicasInfo(brokerName);
         // Try elect itself as a master
-        ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, 
brokerAddress);
-        final ControllerResult<ElectMasterResponseHeader> result = 
this.replicasInfoManager.electMaster(requestHeader, this.electPolicy);
+        ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
+        final ControllerResult<ElectMasterResponseHeader> result = 
this.replicasInfoManager.electMaster(requestHeader, new 
DefaultElectPolicy(validPredicate, null));
         apply(result.getEvents());
 
-        final GetReplicaInfoResponseHeader replicaInfoAfter = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+        final GetReplicaInfoResponseHeader replicaInfoAfter = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName)).getResponse();
         final ElectMasterResponseHeader response = result.getResponse();
 
         if (isFirstTryElect) {
             // it should be elected
             // check response
             assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
-            assertEquals(1, response.getMasterEpoch());
-            assertEquals(1, response.getSyncStateSetEpoch());
+            assertEquals(1, response.getMasterEpoch().intValue());
+            assertEquals(1, response.getSyncStateSetEpoch().intValue());
             assertEquals(brokerAddress, response.getMasterAddress());
+            assertEquals(brokerId, response.getMasterBrokerId());
             // check it in state machine
             assertEquals(brokerAddress, replicaInfoAfter.getMasterAddress());
-            assertEquals(1, replicaInfoAfter.getMasterEpoch());
-            assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+            assertEquals(1, replicaInfoAfter.getMasterEpoch().intValue());
+            assertEquals(brokerId, replicaInfoAfter.getMasterBrokerId());
         } else {
 
             // failed because now master still exist
-            if (StringUtils.isNotEmpty(replicaInfoBefore.getMasterAddress())) {
+            if (replicaInfoBefore.getMasterBrokerId() != null && 
validPredicate.check(clusterName, brokerName, 
replicaInfoBefore.getMasterBrokerId())) {
                 assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, 
result.getResponseCode());
                 assertEquals(replicaInfoBefore.getMasterAddress(), 
response.getMasterAddress());
                 assertEquals(replicaInfoBefore.getMasterEpoch(), 
response.getMasterEpoch());
-                assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+                assertEquals(replicaInfoBefore.getMasterBrokerId(), 
response.getMasterBrokerId());
+                assertEquals(replicaInfoBefore.getMasterBrokerId(), 
replicaInfoAfter.getMasterBrokerId());
                 return;
             }
-            if 
(syncStateDataBefore.getReplicasInfoTable().containsKey(brokerAddress) || 
this.config.isEnableElectUncleanMaster()) {
-                // can be elected successfully
+            if (syncStateSetInfo.isExistInSync(brokerName, brokerId, 
brokerAddress) || this.config.isEnableElectUncleanMaster()) {
+                // a new master can be elected successfully
                 assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
-                assertEquals(MixAll.MASTER_ID, replicaInfoAfter.getBrokerId());
-                assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+                assertEquals(replicaInfoBefore.getMasterEpoch() + 1, 
replicaInfoAfter.getMasterEpoch().intValue());
+                
+                if (expectToBeElected) {
+                    assertEquals(brokerAddress, response.getMasterAddress());
+                    assertEquals(brokerId, response.getMasterBrokerId());
+                    assertEquals(brokerAddress, 
replicaInfoAfter.getMasterAddress());
+                    assertEquals(brokerId, 
replicaInfoAfter.getMasterBrokerId());
+                }
+
             } else {
                 // failed because elect nothing
                 assertEquals(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, 
result.getResponseCode());
@@ -143,49 +189,12 @@ public class ReplicasInfoManagerTest {
         }
     }
 
-    @Test
-    public void testRegisterNewBroker() {
-        final RegisterBrokerToControllerRequestHeader registerRequest =
-            new RegisterBrokerToControllerRequestHeader("default", 
"brokerName-a", "127.0.0.1:9000");
-        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerResult = this.replicasInfoManager.registerBroker(registerRequest, (s, 
v) -> true);
-        apply(registerResult.getEvents());
-        final RegisterBrokerToControllerRequestHeader registerRequest0 =
-            new RegisterBrokerToControllerRequestHeader("default", 
"brokerName-a", "127.0.0.1:9001");
-        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerResult0 = this.replicasInfoManager.registerBroker(registerRequest0, (s, 
v) -> true);
-        apply(registerResult0.getEvents());
-        final ElectMasterRequestHeader electMasterRequest = 
ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", 
"127.0.0.1:9000");
-        ControllerResult<ElectMasterResponseHeader> 
electMasterResponseHeaderControllerResult = 
this.replicasInfoManager.electMaster(electMasterRequest, new 
DefaultElectPolicy());
-        apply(electMasterResponseHeaderControllerResult.getEvents());
-        final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader("brokerName-a"));
-        final GetReplicaInfoResponseHeader replicaInfo = 
getInfoResult.getResponse();
-        assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress());
-        assertEquals(1, replicaInfo.getMasterEpoch());
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add("127.0.0.1:9000");
-        newSyncStateSet.add("127.0.0.1:9001");
-        alterNewInSyncSet("brokerName-a", "127.0.0.1:9000", 1, 
newSyncStateSet, 1);
-        final RegisterBrokerToControllerRequestHeader registerRequest1 =
-            new RegisterBrokerToControllerRequestHeader("default", 
"brokerName-a", "127.0.0.1:9002");
-        final ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerResult1 = this.replicasInfoManager.registerBroker(registerRequest1, (s, 
v) -> StringUtils.equals(v, "127.0.0.1:9001"));
-        apply(registerResult1.getEvents());
-        assertEquals(3, registerResult1.getResponse().getBrokerId());
-        assertEquals("", registerResult1.getResponse().getMasterAddress());
-        ElectPolicy electPolicy1 = new DefaultElectPolicy((clusterName, 
brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"),null);
-        final ElectMasterRequestHeader electMasterRequest1 = 
ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", 
"127.0.0.1:9002");
-        ControllerResult<ElectMasterResponseHeader> 
electMasterResponseHeaderControllerResult1 = 
this.replicasInfoManager.electMaster(electMasterRequest1, electPolicy1);
-        apply(electMasterResponseHeaderControllerResult1.getEvents());
-        final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult0 = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader("brokerName-a"));
-        final GetReplicaInfoResponseHeader replicaInfo0 = 
getInfoResult0.getResponse();
-        assertEquals(replicaInfo0.getMasterAddress(), "127.0.0.1:9001");
-        assertTrue(replicaInfo0.getMasterAddress().equals("127.0.0.1:9001") || 
replicaInfo0.getMasterAddress().equals("127.0.0.1:9002"));
-        assertEquals(replicaInfo0.getMasterEpoch(), 2);
-    }
-
-    private boolean alterNewInSyncSet(String brokerName, String masterAddress, 
int masterEpoch,
-        Set<String> newSyncStateSet, int syncStateSetEpoch) {
+    private boolean alterNewInSyncSet(String brokerName, Long brokerId, 
Integer masterEpoch,
+        Set<Long> newSyncStateSet, Integer syncStateSetEpoch) {
         final AlterSyncStateSetRequestHeader alterRequest =
-            new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
-        final ControllerResult<AlterSyncStateSetResponseHeader> result = 
this.replicasInfoManager.alterSyncStateSet(alterRequest, new 
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
+            new AlterSyncStateSetRequestHeader(brokerName, brokerId, 
masterEpoch);
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = 
this.replicasInfoManager.alterSyncStateSet(alterRequest,
+                new SyncStateSet(newSyncStateSet, syncStateSetEpoch), 
(cluster, brokerName1, brokerId1) -> true);
         apply(result.getEvents());
 
         final ControllerResult<GetReplicaInfoResponseHeader> resp = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName));
@@ -204,79 +213,106 @@ public class ReplicasInfoManagerTest {
     }
 
     public void mockMetaData() {
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "");
-        brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
-        brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
-        brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add("127.0.0.1:9000");
-        newSyncStateSet.add("127.0.0.1:9001");
-        newSyncStateSet.add("127.0.0.1:9002");
-        assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, 
newSyncStateSet, 1));
+        registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[0], 1L, null);
+        registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[1], 2L, null);
+        registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[2], 3L, null);
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 1L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[0], true, true);
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 2L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[1], false, false);
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 3L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[2], false, false);
+        final HashSet<Long> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add(1L);
+        newSyncStateSet.add(2L);
+        newSyncStateSet.add(3L);
+        assertTrue(alterNewInSyncSet(DEFAULT_BROKER_NAME, 1L, 1, 
newSyncStateSet, 1));
     }
 
     public void mockHeartbeatDataMasterStillAlive() {
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9000", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, 10000000000L, null,
             1, 1L, -1L, 0);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null,
             1, 2L, -1L, 0);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null,
             1, 3L, -1L, 0);
     }
 
     public void mockHeartbeatDataHigherEpoch() {
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, -10000L, null,
             1, 3L, -1L, 0);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null,
             1, 2L, -1L, 0);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null,
             0, 3L, -1L, 0);
     }
 
     public void mockHeartbeatDataHigherOffset() {
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, -10000L, null,
             1, 3L, -1L, 0);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null,
             1, 2L, -1L, 0);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null,
             1, 3L, -1L, 0);
     }
 
     public void mockHeartbeatDataHigherPriority() {
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[0], 1L, -10000L, null,
             1, 3L, -1L, 3);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[1], 2L, 10000000000L, null,
             1, 3L, -1L, 2);
-        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+        this.heartbeatManager.onBrokerHeartbeat(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, DEFAULT_IP[2], 3L, 10000000000L, null,
             1, 3L, -1L, 1);
     }
 
     @Test
     public void testRegisterBrokerSuccess() {
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "");
-        brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
-        brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
-        brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
+        mockMetaData();
+
+        BrokerReplicasInfo.ReplicasInfo replicasInfo = 
getReplicasInfo(DEFAULT_BROKER_NAME);
+        assertEquals(1L, replicasInfo.getMasterBrokerId().longValue());
+        assertEquals(DEFAULT_IP[0], replicasInfo.getMasterAddress());
+        assertEquals(1, replicasInfo.getMasterEpoch());
+        assertEquals(2, replicasInfo.getSyncStateSetEpoch());
+        assertEquals(3, replicasInfo.getInSyncReplicas().size());
+        assertEquals(0, replicasInfo.getNotInSyncReplicas().size());
     }
 
     @Test
     public void testRegisterWithMasterExistResp() {
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
-        brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
-        brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, 
"127.0.0.1:9000");
-        brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
+        registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[0], 1L, null);
+        registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[1], 2L, null);
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 1L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[0], true, true);
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 2L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[1], false, false);
+        registerNewBroker(DEFAULT_CLUSTER_NAME, DEFAULT_BROKER_NAME, 
DEFAULT_IP[2], 3L, 1L);
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 3L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[2], false, false);
+
+        BrokerReplicasInfo.ReplicasInfo replicasInfo = 
getReplicasInfo(DEFAULT_BROKER_NAME);
+        assertEquals(1L, replicasInfo.getMasterBrokerId().longValue());
+        assertEquals(DEFAULT_IP[0], replicasInfo.getMasterAddress());
+        assertEquals(1, replicasInfo.getMasterEpoch());
+        assertEquals(1, replicasInfo.getSyncStateSetEpoch());
+        assertEquals(1, replicasInfo.getInSyncReplicas().size());
+        assertEquals(2, replicasInfo.getNotInSyncReplicas().size());
+    }
+
+    @Test
+    public void testRegisterWithOldMasterInactive() {
+        mockMetaData();
+        // If now only broker-3 alive, it will be elected to be a new master
+        brokerElectMaster(DEFAULT_CLUSTER_NAME, 3L, DEFAULT_BROKER_NAME, 
DEFAULT_IP[2], false, true, (a, b, c) -> c.equals(3L));
+
+        // Check in statemachine
+        BrokerReplicasInfo.ReplicasInfo replicasInfo = 
getReplicasInfo(DEFAULT_BROKER_NAME);
+        assertEquals(3L, replicasInfo.getMasterBrokerId().longValue());
+        assertEquals(DEFAULT_IP[2], replicasInfo.getMasterAddress());
+        assertEquals(2, replicasInfo.getMasterEpoch());
+        assertEquals(3, replicasInfo.getSyncStateSetEpoch());
+        assertEquals(1, replicasInfo.getInSyncReplicas().size());
+        assertEquals(2, replicasInfo.getNotInSyncReplicas().size());
     }
 
     @Test
     public void testElectMasterOldMasterStillAlive() {
         mockMetaData();
-        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
+        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataMasterStillAlive();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
@@ -287,100 +323,100 @@ public class ReplicasInfoManagerTest {
     @Test
     public void testElectMasterPreferHigherEpoch() {
         mockMetaData();
-        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
+        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherEpoch();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
             electPolicy);
         final ElectMasterResponseHeader response = cResult.getResponse();
-        assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getMasterAddress().isEmpty());
-        assertEquals("127.0.0.1:9001", response.getMasterAddress());
+        assertEquals(DEFAULT_IP[1], response.getMasterAddress());
+        assertEquals(2L, response.getMasterBrokerId().longValue());
+        assertEquals(2, response.getMasterEpoch().intValue());
     }
 
     @Test
     public void testElectMasterPreferHigherOffsetWhenEpochEquals() {
         mockMetaData();
-        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
+        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherOffset();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
             electPolicy);
         final ElectMasterResponseHeader response = cResult.getResponse();
-        assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getMasterAddress().isEmpty());
-        assertEquals("127.0.0.1:9002", response.getMasterAddress());
+        assertEquals(DEFAULT_IP[2], response.getMasterAddress());
+        assertEquals(3L, response.getMasterBrokerId().longValue());
+        assertEquals(2, response.getMasterEpoch().intValue());
     }
 
     @Test
     public void testElectMasterPreferHigherPriorityWhenEpochAndOffsetEquals() {
         mockMetaData();
-        final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader(DEFAULT_BROKER_NAME);
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherPriority();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
             electPolicy);
         final ElectMasterResponseHeader response = cResult.getResponse();
-        assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getMasterAddress().isEmpty());
-        assertEquals("127.0.0.1:9002", response.getMasterAddress());
+        assertEquals(DEFAULT_IP[2], response.getMasterAddress());
+        assertEquals(3L, response.getMasterBrokerId().longValue());
+        assertEquals(2, response.getMasterEpoch().intValue());
     }
 
     @Test
     public void testElectMaster() {
         mockMetaData();
-        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
+        final ElectMasterRequestHeader request = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((cluster, brokerName, brokerId)  -> 
!brokerId.equals(1L), null));
         final ElectMasterResponseHeader response = cResult.getResponse();
-        assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getMasterAddress().isEmpty());
-        assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000");
-
+        assertEquals(2, response.getMasterEpoch().intValue());
+        assertNotEquals(1L, response.getMasterBrokerId().longValue());
+        assertNotEquals(DEFAULT_IP[0], response.getMasterAddress());
         apply(cResult.getEvents());
 
-        final Set<String> brokerSet = new HashSet<>();
-        brokerSet.add("127.0.0.1:9000");
-        brokerSet.add("127.0.0.1:9001");
-        brokerSet.add("127.0.0.1:9002");
-        assertTrue(alterNewInSyncSet("broker1", response.getMasterAddress(), 
response.getMasterEpoch(), brokerSet, response.getSyncStateSetEpoch()));
+        final Set<Long> brokerSet = new HashSet<>();
+        brokerSet.add(1L);
+        brokerSet.add(2L);
+        brokerSet.add(3L);
+        assertTrue(alterNewInSyncSet(DEFAULT_BROKER_NAME, 
response.getMasterBrokerId(), response.getMasterEpoch(), brokerSet, 
response.getSyncStateSetEpoch()));
 
         // test admin try to elect a assignedMaster, but it isn't alive
-        final ElectMasterRequestHeader assignRequest = 
ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", 
"127.0.0.1:9000");
+        final ElectMasterRequestHeader assignRequest = 
ElectMasterRequestHeader.ofAdminTrigger(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, 1L);
         final ControllerResult<ElectMasterResponseHeader> cResult1 = 
this.replicasInfoManager.electMaster(assignRequest,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((cluster, brokerName, brokerId)  -> 
!brokerId.equals(1L), null));
 
         assertEquals(cResult1.getResponseCode(), 
ResponseCode.CONTROLLER_ELECT_MASTER_FAILED);
 
         // test admin try to elect a assignedMaster but old master still 
alive, and the old master is equals to assignedMaster
-        final ElectMasterRequestHeader assignRequest1 = 
ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", 
response.getMasterAddress());
+        final ElectMasterRequestHeader assignRequest1 = 
ElectMasterRequestHeader.ofAdminTrigger(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, response.getMasterBrokerId());
         final ControllerResult<ElectMasterResponseHeader> cResult2 = 
this.replicasInfoManager.electMaster(assignRequest1,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> true, 
null));
+            new DefaultElectPolicy((cluster, brokerName, brokerId)  -> true, 
null));
         assertEquals(cResult2.getResponseCode(), 
ResponseCode.CONTROLLER_MASTER_STILL_EXIST);
 
         // admin successful elect a assignedMaster.
-        final ElectMasterRequestHeader assignRequest2 = 
ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", 
"127.0.0.1:9000");
+        final ElectMasterRequestHeader assignRequest2 = 
ElectMasterRequestHeader.ofAdminTrigger(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, 1L);
         final ControllerResult<ElectMasterResponseHeader> cResult3 = 
this.replicasInfoManager.electMaster(assignRequest2,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals(response.getMasterAddress()), null));
+            new DefaultElectPolicy((cluster, brokerName, brokerId)  -> 
!brokerId.equals(response.getMasterBrokerId()), null));
         assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
 
         final ElectMasterResponseHeader response3 = cResult3.getResponse();
-        assertEquals(response3.getMasterAddress(), "127.0.0.1:9000");
-        assertEquals(response3.getMasterEpoch(), 3);
+        assertEquals(1L, response3.getMasterBrokerId().longValue());
+        assertEquals(DEFAULT_IP[0], response3.getMasterAddress());
+        assertEquals(3, response3.getMasterEpoch().intValue());
     }
 
     @Test
     public void testAllReplicasShutdownAndRestart() {
         mockMetaData();
-        final HashSet<String> newSyncStateSet = new HashSet<>();
-        newSyncStateSet.add("127.0.0.1:9000");
-        assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, 
newSyncStateSet, 2));
+        final HashSet<Long> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add(1L);
+        assertTrue(alterNewInSyncSet(DEFAULT_BROKER_NAME, 1L, 1, 
newSyncStateSet, 2));
 
         // Now we trigger electMaster api, which means the old master is 
shutdown and want to elect a new master.
-        // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, 
not more replicas can be elected as master, it will be failed.
-        final ElectMasterRequestHeader electRequest = 
ElectMasterRequestHeader.ofControllerTrigger("broker1");
+        // However, the syncStateSet in statemachine is {DEFAULT_IP[0]}, not 
more replicas can be elected as master, it will be failed.
+        final ElectMasterRequestHeader electRequest = 
ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(electRequest,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((cluster, brokerName, brokerId)  -> 
!brokerId.equals(1L), null));
         final List<EventMessage> events = cResult.getEvents();
         assertEquals(events.size(), 1);
         final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
@@ -388,42 +424,42 @@ public class ReplicasInfoManagerTest {
 
         apply(cResult.getEvents());
 
-        final GetReplicaInfoResponseHeader replicaInfo = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader("broker1")).getResponse();
-        assertEquals(replicaInfo.getMasterAddress(), "");
-        assertEquals(replicaInfo.getMasterEpoch(), 2);
+        final GetReplicaInfoResponseHeader replicaInfo = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).getResponse();
+        assertEquals(replicaInfo.getMasterAddress(), null);
+        assertEquals(2, replicaInfo.getMasterEpoch().intValue());
     }
 
     @Test
     public void testCleanBrokerData() {
         mockMetaData();
-        CleanControllerBrokerDataRequestHeader header1 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
-        ControllerResult<Void> result1 = 
this.replicasInfoManager.cleanBrokerData(header1, (cluster, brokerAddr) -> 
true);
+        CleanControllerBrokerDataRequestHeader header1 = new 
CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, "1");
+        ControllerResult<Void> result1 = 
this.replicasInfoManager.cleanBrokerData(header1, (cluster, brokerName, 
brokerId) -> true);
         assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result1.getResponseCode());
 
-        CleanControllerBrokerDataRequestHeader header2 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", null);
-        ControllerResult<Void> result2 = 
this.replicasInfoManager.cleanBrokerData(header2, (cluster, brokerAddr) -> 
true);
+        CleanControllerBrokerDataRequestHeader header2 = new 
CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, null);
+        ControllerResult<Void> result2 = 
this.replicasInfoManager.cleanBrokerData(header2, (cluster, brokerName, 
brokerId) -> true);
         assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result2.getResponseCode());
-        assertEquals("Broker broker1 is still alive, clean up failure", 
result2.getRemark());
+        assertEquals("Broker broker-set-a is still alive, clean up failure", 
result2.getRemark());
 
-        CleanControllerBrokerDataRequestHeader header3 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
-        ControllerResult<Void> result3 = 
this.replicasInfoManager.cleanBrokerData(header3, (cluster, brokerAddr) -> 
false);
+        CleanControllerBrokerDataRequestHeader header3 = new 
CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, "1");
+        ControllerResult<Void> result3 = 
this.replicasInfoManager.cleanBrokerData(header3, (cluster, brokerName, 
brokerId) -> false);
         assertEquals(ResponseCode.SUCCESS, result3.getResponseCode());
 
-        CleanControllerBrokerDataRequestHeader header4 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002");
-        ControllerResult<Void> result4 = 
this.replicasInfoManager.cleanBrokerData(header4, (cluster, brokerAddr) -> 
false);
+        CleanControllerBrokerDataRequestHeader header4 = new 
CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, 
DEFAULT_BROKER_NAME, "1;2;3");
+        ControllerResult<Void> result4 = 
this.replicasInfoManager.cleanBrokerData(header4, (cluster, brokerName, 
brokerId) -> false);
         assertEquals(ResponseCode.SUCCESS, result4.getResponseCode());
 
-        CleanControllerBrokerDataRequestHeader header5 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker12", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
-        ControllerResult<Void> result5 = 
this.replicasInfoManager.cleanBrokerData(header5, (cluster, brokerAddr) -> 
false);
+        CleanControllerBrokerDataRequestHeader header5 = new 
CleanControllerBrokerDataRequestHeader(DEFAULT_CLUSTER_NAME, "broker12", 
"1;2;3", true);
+        ControllerResult<Void> result5 = 
this.replicasInfoManager.cleanBrokerData(header5, (cluster, brokerName, 
brokerId) -> false);
         assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result5.getResponseCode());
         assertEquals("Broker broker12 is not existed,clean broker data 
failure.", result5.getRemark());
 
-        CleanControllerBrokerDataRequestHeader header6 = new 
CleanControllerBrokerDataRequestHeader(null, "broker12", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
-        ControllerResult<Void> result6 = 
this.replicasInfoManager.cleanBrokerData(header6, (cluster, brokerAddr) -> 
cluster != null);
+        CleanControllerBrokerDataRequestHeader header6 = new 
CleanControllerBrokerDataRequestHeader(null, "broker12", "1;2;3", true);
+        ControllerResult<Void> result6 = 
this.replicasInfoManager.cleanBrokerData(header6, (cluster, brokerName, 
brokerId) -> cluster != null);
         assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result6.getResponseCode());
 
-        CleanControllerBrokerDataRequestHeader header7 = new 
CleanControllerBrokerDataRequestHeader(null, "broker1", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
-        ControllerResult<Void> result7 = 
this.replicasInfoManager.cleanBrokerData(header7, (cluster, brokerAddr) -> 
false);
+        CleanControllerBrokerDataRequestHeader header7 = new 
CleanControllerBrokerDataRequestHeader(null, DEFAULT_BROKER_NAME, "1;2;3", 
true);
+        ControllerResult<Void> result7 = 
this.replicasInfoManager.cleanBrokerData(header7, (cluster, brokerName, 
brokerId) -> false);
         assertEquals(ResponseCode.SUCCESS, result7.getResponseCode());
 
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
index c7e410b80..f7ceb82d7 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.remoting.protocol.body;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class BrokerReplicasInfo extends RemotingSerializable  {
@@ -46,8 +48,8 @@ public class BrokerReplicasInfo extends RemotingSerializable  
{
         private Long masterBrokerId;
 
         private String masterAddress;
-        private int masterEpoch;
-        private int syncStateSetEpoch;
+        private Integer masterEpoch;
+        private Integer syncStateSetEpoch;
         private List<ReplicaIdentity> inSyncReplicas;
         private List<ReplicaIdentity> notInSyncReplicas;
 
@@ -111,6 +113,19 @@ public class BrokerReplicasInfo extends 
RemotingSerializable  {
         public Long getMasterBrokerId() {
             return masterBrokerId;
         }
+
+        public boolean isExistInSync(String brokerName, Long brokerId, String 
brokerAddress) {
+            return this.getInSyncReplicas().contains(new 
ReplicaIdentity(brokerName, brokerId, brokerAddress));
+        }
+
+        public boolean isExistInNotSync(String brokerName, Long brokerId, 
String brokerAddress) {
+            return this.getNotInSyncReplicas().contains(new 
ReplicaIdentity(brokerName, brokerId, brokerAddress));
+        }
+
+        public boolean isExistInAllReplicas(String brokerName, Long brokerId, 
String brokerAddress) {
+            return this.isExistInSync(brokerName, brokerId, brokerAddress) || 
this.isExistInNotSync(brokerName, brokerId, brokerAddress);
+        }
+
     }
 
     public static class ReplicaIdentity extends RemotingSerializable {
@@ -157,5 +172,18 @@ public class BrokerReplicasInfo extends 
RemotingSerializable  {
                     ", brokerAddress='" + brokerAddress + '\'' +
                     '}';
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            ReplicaIdentity that = (ReplicaIdentity) o;
+            return brokerName.equals(that.brokerName) && 
brokerId.equals(that.brokerId) && brokerAddress.equals(that.brokerAddress);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(brokerName, brokerId, brokerAddress);
+        }
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
index 2016b2968..91f9e1e8d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
@@ -26,19 +26,22 @@ public class RoleChangeNotifyEntry {
 
     private final String masterAddress;
 
+    private final Long masterBrokerId;
+
     private final int masterEpoch;
 
     private final int syncStateSetEpoch;
 
-    public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String 
masterAddress, int masterEpoch, int syncStateSetEpoch) {
+    public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String 
masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch) {
         this.brokerMemberGroup = brokerMemberGroup;
         this.masterAddress = masterAddress;
         this.masterEpoch = masterEpoch;
         this.syncStateSetEpoch = syncStateSetEpoch;
+        this.masterBrokerId = masterBrokerId;
     }
 
     public static RoleChangeNotifyEntry convert(ElectMasterResponseHeader 
header) {
-        return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), 
header.getMasterAddress(), header.getMasterEpoch(), 
header.getSyncStateSetEpoch());
+        return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), 
header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), 
header.getSyncStateSetEpoch());
     }
 
 
@@ -57,4 +60,8 @@ public class RoleChangeNotifyEntry {
     public int getSyncStateSetEpoch() {
         return syncStateSetEpoch;
     }
+
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
index b32ab7238..3a112a578 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
@@ -21,19 +21,18 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class NotifyBrokerRoleChangedRequestHeader implements 
CommandCustomHeader {
     private String masterAddress;
-    private int masterEpoch;
-    private int syncStateSetEpoch;
-    // The id of this broker.
-    private long brokerId;
+    private Integer masterEpoch;
+    private Integer syncStateSetEpoch;
+    private Long masterBrokerId;
 
     public NotifyBrokerRoleChangedRequestHeader() {
     }
 
-    public NotifyBrokerRoleChangedRequestHeader(String masterAddress, int 
masterEpoch, int syncStateSetEpoch, long brokerId) {
+    public NotifyBrokerRoleChangedRequestHeader(String masterAddress, Long 
masterBrokerId, Integer masterEpoch, Integer syncStateSetEpoch) {
         this.masterAddress = masterAddress;
         this.masterEpoch = masterEpoch;
         this.syncStateSetEpoch = syncStateSetEpoch;
-        this.brokerId = brokerId;
+        this.masterBrokerId = masterBrokerId;
     }
 
     public String getMasterAddress() {
@@ -44,38 +43,38 @@ public class NotifyBrokerRoleChangedRequestHeader 
implements CommandCustomHeader
         this.masterAddress = masterAddress;
     }
 
-    public int getMasterEpoch() {
+    public Integer getMasterEpoch() {
         return masterEpoch;
     }
 
-    public void setMasterEpoch(int masterEpoch) {
+    public void setMasterEpoch(Integer masterEpoch) {
         this.masterEpoch = masterEpoch;
     }
 
-    public int getSyncStateSetEpoch() {
+    public Integer getSyncStateSetEpoch() {
         return syncStateSetEpoch;
     }
 
-    public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+    public void setSyncStateSetEpoch(Integer syncStateSetEpoch) {
         this.syncStateSetEpoch = syncStateSetEpoch;
     }
 
-    public long getBrokerId() {
-        return brokerId;
+    public Long getMasterBrokerId() {
+        return masterBrokerId;
     }
 
-    public void setBrokerId(long brokerId) {
-        this.brokerId = brokerId;
+    public void setMasterBrokerId(Long masterBrokerId) {
+        this.masterBrokerId = masterBrokerId;
     }
 
     @Override
     public String toString() {
         return "NotifyBrokerRoleChangedRequestHeader{" +
-            "masterAddress='" + masterAddress + '\'' +
-            ", masterEpoch=" + masterEpoch +
-            ", syncStateSetEpoch=" + syncStateSetEpoch +
-            ", brokerId=" + brokerId +
-            '}';
+                "masterAddress='" + masterAddress + '\'' +
+                ", masterEpoch=" + masterEpoch +
+                ", syncStateSetEpoch=" + syncStateSetEpoch +
+                ", masterBrokerId=" + masterBrokerId +
+                '}';
     }
 
     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
index 9fbf74e1f..5161d74dc 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java
@@ -22,12 +22,12 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 public class AlterSyncStateSetRequestHeader implements CommandCustomHeader {
     private String brokerName;
     private Long masterBrokerId;
-    private int masterEpoch;
+    private Integer masterEpoch;
 
     public AlterSyncStateSetRequestHeader() {
     }
 
-    public AlterSyncStateSetRequestHeader(String brokerName, Long 
masterBrokerId, int masterEpoch) {
+    public AlterSyncStateSetRequestHeader(String brokerName, Long 
masterBrokerId, Integer masterEpoch) {
         this.brokerName = brokerName;
         this.masterBrokerId = masterBrokerId;
         this.masterEpoch = masterEpoch;
@@ -49,11 +49,11 @@ public class AlterSyncStateSetRequestHeader implements 
CommandCustomHeader {
         this.masterBrokerId = masterBrokerId;
     }
 
-    public int getMasterEpoch() {
+    public Integer getMasterEpoch() {
         return masterEpoch;
     }
 
-    public void setMasterEpoch(int masterEpoch) {
+    public void setMasterEpoch(Integer masterEpoch) {
         this.masterEpoch = masterEpoch;
     }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index 1544b37db..658e2b592 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -24,8 +24,8 @@ public class ElectMasterResponseHeader implements 
CommandCustomHeader {
 
     private Long masterBrokerId;
     private String masterAddress;
-    private int masterEpoch;
-    private int syncStateSetEpoch;
+    private Integer masterEpoch;
+    private Integer syncStateSetEpoch;
     private BrokerMemberGroup brokerMemberGroup;
 
     public ElectMasterResponseHeader() {
@@ -39,19 +39,19 @@ public class ElectMasterResponseHeader implements 
CommandCustomHeader {
         this.masterAddress = masterAddress;
     }
 
-    public int getMasterEpoch() {
+    public Integer getMasterEpoch() {
         return masterEpoch;
     }
 
-    public void setMasterEpoch(int masterEpoch) {
+    public void setMasterEpoch(Integer masterEpoch) {
         this.masterEpoch = masterEpoch;
     }
 
-    public int getSyncStateSetEpoch() {
+    public Integer getSyncStateSetEpoch() {
         return syncStateSetEpoch;
     }
 
-    public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+    public void setSyncStateSetEpoch(Integer syncStateSetEpoch) {
         this.syncStateSetEpoch = syncStateSetEpoch;
     }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java
index 9fe0e5316..efc7afc84 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoRequestHeader.java
@@ -21,7 +21,6 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class GetReplicaInfoRequestHeader implements CommandCustomHeader {
     private String brokerName;
-    private String brokerAddress;
 
     public GetReplicaInfoRequestHeader() {
     }
@@ -30,10 +29,6 @@ public class GetReplicaInfoRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
-    public GetReplicaInfoRequestHeader(String brokerName, String 
brokerAddress) {
-        this.brokerName = brokerName;
-        this.brokerAddress = brokerAddress;
-    }
 
     public String getBrokerName() {
         return brokerName;
@@ -43,20 +38,11 @@ public class GetReplicaInfoRequestHeader implements 
CommandCustomHeader {
         this.brokerName = brokerName;
     }
 
-    public String getBrokerAddress() {
-        return brokerAddress;
-    }
-
-    public void setBrokerAddress(String brokerAddress) {
-        this.brokerAddress = brokerAddress;
-    }
-
     @Override
     public String toString() {
         return "GetReplicaInfoRequestHeader{" +
-            "brokerName='" + brokerName + '\'' +
-            ", brokerAddress='" + brokerAddress + '\'' +
-            '}';
+                "brokerName='" + brokerName + '\'' +
+                '}';
     }
 
     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
index a7b6bbefa..f7aa49e69 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java
@@ -23,7 +23,7 @@ public class GetReplicaInfoResponseHeader implements 
CommandCustomHeader {
 
     private Long masterBrokerId;
     private String masterAddress;
-    private int masterEpoch;
+    private Integer masterEpoch;
 
     public GetReplicaInfoResponseHeader() {
     }
@@ -36,11 +36,11 @@ public class GetReplicaInfoResponseHeader implements 
CommandCustomHeader {
         this.masterAddress = masterAddress;
     }
 
-    public int getMasterEpoch() {
+    public Integer getMasterEpoch() {
         return masterEpoch;
     }
 
-    public void setMasterEpoch(int masterEpoch) {
+    public void setMasterEpoch(Integer masterEpoch) {
         this.masterEpoch = masterEpoch;
     }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java
index 9bc84d195..795afeed8 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/admin/CleanControllerBrokerDataRequestHeader.java
@@ -46,8 +46,8 @@ public class CleanControllerBrokerDataRequestHeader 
implements CommandCustomHead
         this.isCleanLivingBroker = isCleanLivingBroker;
     }
 
-    public CleanControllerBrokerDataRequestHeader(String clusterName, String 
brokerName, String brokerAddress) {
-        this(clusterName, brokerName, brokerAddress, false);
+    public CleanControllerBrokerDataRequestHeader(String clusterName, String 
brokerName, String brokerIdSetToClean) {
+        this(clusterName, brokerName, brokerIdSetToClean, false);
     }
 
     @Override
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
index bfc103eb3..c577d6a73 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdRequestHeader.java
@@ -30,6 +30,10 @@ public class ApplyBrokerIdRequestHeader implements 
CommandCustomHeader {
 
     private String registerCheckCode;
 
+    public ApplyBrokerIdRequestHeader() {
+
+    }
+
     public ApplyBrokerIdRequestHeader(String clusterName, String brokerName, 
Long appliedBrokerId, String registerCheckCode) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
@@ -57,4 +61,20 @@ public class ApplyBrokerIdRequestHeader implements 
CommandCustomHeader {
     public String getRegisterCheckCode() {
         return registerCheckCode;
     }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public void setAppliedBrokerId(Long appliedBrokerId) {
+        this.appliedBrokerId = appliedBrokerId;
+    }
+
+    public void setRegisterCheckCode(String registerCheckCode) {
+        this.registerCheckCode = registerCheckCode;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
index 1221c206d..a7f100f77 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/ApplyBrokerIdResponseHeader.java
@@ -26,6 +26,8 @@ public class ApplyBrokerIdResponseHeader implements 
CommandCustomHeader {
 
     private String brokerName;
 
+    public ApplyBrokerIdResponseHeader() {
+    }
 
     public ApplyBrokerIdResponseHeader(String clusterName, String brokerName) {
         this.clusterName = clusterName;
@@ -46,4 +48,19 @@ public class ApplyBrokerIdResponseHeader implements 
CommandCustomHeader {
 
     }
 
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
index 90361ff74..aeb222955 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdRequestHeader.java
@@ -26,6 +26,10 @@ public class GetNextBrokerIdRequestHeader implements 
CommandCustomHeader {
 
     private String brokerName;
 
+    public GetNextBrokerIdRequestHeader() {
+
+    }
+
     public GetNextBrokerIdRequestHeader(String clusterName, String brokerName) 
{
         this.clusterName = clusterName;
         this.brokerName = brokerName;
@@ -51,4 +55,12 @@ public class GetNextBrokerIdRequestHeader implements 
CommandCustomHeader {
     public String getBrokerName() {
         return brokerName;
     }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
index 3fece1768..7d62722d4 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/GetNextBrokerIdResponseHeader.java
@@ -28,6 +28,9 @@ public class GetNextBrokerIdResponseHeader implements 
CommandCustomHeader {
 
     private Long nextBrokerId;
 
+    public GetNextBrokerIdResponseHeader() {
+    }
+
     public GetNextBrokerIdResponseHeader(String clusterName, String 
brokerName) {
         this(clusterName, brokerName, null);
     }
@@ -59,4 +62,20 @@ public class GetNextBrokerIdResponseHeader implements 
CommandCustomHeader {
     public Long getNextBrokerId() {
         return nextBrokerId;
     }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
index db5808d6d..cdddcfcd6 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessRequestHeader.java
@@ -30,6 +30,9 @@ public class RegisterSuccessRequestHeader implements 
CommandCustomHeader {
 
     private String brokerAddress;
 
+    public RegisterSuccessRequestHeader() {
+    }
+
     public RegisterSuccessRequestHeader(String clusterName, String brokerName, 
Long brokerId, String brokerAddress) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
@@ -57,4 +60,20 @@ public class RegisterSuccessRequestHeader implements 
CommandCustomHeader {
     public String getBrokerAddress() {
         return brokerAddress;
     }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    public void setBrokerAddress(String brokerAddress) {
+        this.brokerAddress = brokerAddress;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
index 61e5d8ea1..7bedc95f5 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/register/RegisterSuccessResponseHeader.java
@@ -30,11 +30,18 @@ public class RegisterSuccessResponseHeader implements 
CommandCustomHeader {
 
     private String masterAddress;
 
+    private Integer masterEpoch;
+
+    private Integer syncStateSetEpoch;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
     }
 
+    public RegisterSuccessResponseHeader() {
+    }
+
     public RegisterSuccessResponseHeader(String clusterName, String 
brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
@@ -48,6 +55,22 @@ public class RegisterSuccessResponseHeader implements 
CommandCustomHeader {
         this.masterAddress = masterAddress;
     }
 
+    public void setMasterEpoch(Integer masterEpoch) {
+        this.masterEpoch = masterEpoch;
+    }
+
+    public void setSyncStateSetEpoch(Integer syncStateSetEpoch) {
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
+    public Integer getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public Integer getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+
     public String getClusterName() {
         return clusterName;
     }
@@ -63,4 +86,12 @@ public class RegisterSuccessResponseHeader implements 
CommandCustomHeader {
     public String getMasterAddress() {
         return masterAddress;
     }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
 }


Reply via email to