This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c5f80713b [ISSUE #4813] Add elect policy for controller (#4809)
c5f80713b is described below

commit c5f80713be085e44c575e6ca25baba497b7d078c
Author: TheR1sing3un <[email protected]>
AuthorDate: Tue Aug 16 08:41:51 2022 +0800

    [ISSUE #4813] Add elect policy for controller (#4809)
    
    * feat(controller): add elect policy
    
    1. add epoch and maxOffset in heartbeat.
    2. refactor elect logic, now we
    elect a new master by elect policy(can expand).
    3. add some unit tests
    
    * refactor(controller): refactor some code and format some code
    
    1. refactor some code and format some code
    
    * fix typo in ReplicasInfoManager
    
    1. fix typo in ReplicasInfoManager
    
    * fix wrong method call in ReplicasManagerTest#before
    
    1.fix wrong method call in ReplicasManagerTest#before
    
    * fix wrong call in ReplicasManagerTest#before
    
    1. fix wrong call in ReplicasManagerTest#before
    
    * fix(controller): fix the bug about ReElectMaster
    
    1. fix the bug about ReElectMaster
    2. fix some typo
    
    * style(controller): fix typo by checkstyle
    
    1. fix typo by checkstyle
    
    * test(broker): fix invalid usage of "any()" in 
DLedegerControllerTest#before
    
    1. fix invalid usage of "any()" in DLedegerControllerTest#before
    
    * fix(controller): fix some wrong usage in test
    
    1. fix some wrong usage in test
    2. refactor some field type to avoid NPE
---
 .../apache/rocketmq/broker/BrokerController.java   |   4 +-
 .../broker/controller/ReplicasManager.java         |  13 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 282 +++++++++++----------
 .../broker/controller/ReplicasManagerTest.java     |   4 +-
 .../namesrv/BrokerHeartbeatRequestHeader.java      |  31 +++
 .../RegisterBrokerToControllerRequestHeader.java   |  41 ++-
 .../controller/BrokerHeartbeatManager.java         |   9 +-
 .../apache/rocketmq/controller/BrokerLiveInfo.java | 129 ++++++++++
 .../rocketmq/controller/ControllerManager.java     |  86 ++++---
 .../rocketmq/controller/elect/ElectPolicy.java     |  36 +++
 .../controller/elect/impl/DefaultElectPolicy.java  | 126 +++++++++
 .../controller/impl/DLedgerController.java         |  32 ++-
 .../impl/DefaultBrokerHeartbeatManager.java        |  87 +++----
 .../impl/manager/ReplicasInfoManager.java          | 121 +++------
 .../processor/ControllerRequestProcessor.java      |   5 +-
 .../impl/controller/ControllerManagerTest.java     |   2 +-
 .../controller/impl/DLedgerControllerTest.java     |  20 +-
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |   2 +-
 .../impl/manager/ReplicasInfoManagerTest.java      | 125 +++++++--
 .../store/ha/autoswitch/AutoSwitchHAService.java   |   4 +
 20 files changed, 802 insertions(+), 357 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d5e93b388..84e1689ce 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1685,7 +1685,9 @@ public class BrokerController {
                         this.brokerConfig.getBrokerName(),
                         this.brokerConfig.getBrokerId(),
                         this.brokerConfig.getSendHeartbeatTimeoutMillis(),
-                        this.brokerConfig.isInBrokerContainer()
+                        this.brokerConfig.isInBrokerContainer(), 
this.replicasManager.getLastEpoch(),
+                        this.messageStore.getMaxPhyOffset(),
+                        this.replicasManager.getConfirmOffset()
                     );
                 }
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 679585074..b1b4ebd4b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -91,6 +91,11 @@ public class ReplicasManager {
         this.haService.setLocalAddress(this.localAddress);
     }
 
+    public long getConfirmOffset() {
+        return this.haService.getConfirmOffset();
+    }
+
+
     enum State {
         INITIAL,
         FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
@@ -280,7 +285,9 @@ public class ReplicasManager {
     private boolean registerBrokerToController() {
         // Register this broker to controller, get brokerId and masterAddress.
         try {
-            final RegisterBrokerToControllerResponseHeader registerResponse = 
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), 
this.localAddress);
+            final RegisterBrokerToControllerResponseHeader registerResponse = 
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
+                    this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.getBrokerName(), this.localAddress,
+                    this.haService.getLastEpoch(), 
this.brokerController.getMessageStore().getMaxPhyOffset());
             final String newMasterAddress = 
registerResponse.getMasterAddress();
             if (StringUtils.isNoneEmpty(newMasterAddress)) {
                 if (StringUtils.equals(newMasterAddress, this.localAddress)) {
@@ -421,6 +428,10 @@ public class ReplicasManager {
         }
     }
 
+    public int getLastEpoch() {
+        return this.haService.getLastEpoch();
+    }
+
     public BrokerRole getBrokerRole() {
         return this.brokerController.getMessageStoreConfig().getBrokerRole();
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6f89d4a7d..202fcbfdb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -125,7 +126,7 @@ public class BrokerOuterAPI {
     private final TopAddressing topAddressing = new 
DefaultTopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
     private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
-        new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
+            new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
 
     private ClientMetadata clientMetadata;
     private RpcClient rpcClient;
@@ -178,13 +179,13 @@ public class BrokerOuterAPI {
     }
 
     public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String 
brokerName)
-        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
+            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
         return syncBrokerMemberGroup(clusterName, brokerName, false);
     }
 
     public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String 
brokerName,
-        boolean isCompatibleWithOldNameSrv)
-        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
+                                                   boolean 
isCompatibleWithOldNameSrv)
+            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
         if (isCompatibleWithOldNameSrv) {
             return getBrokerMemberGroupCompatible(clusterName, brokerName);
         } else {
@@ -193,7 +194,7 @@ public class BrokerOuterAPI {
     }
 
     public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String 
brokerName)
-        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
+            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
         BrokerMemberGroup brokerMemberGroup = new 
BrokerMemberGroup(clusterName, brokerName);
 
         GetBrokerMemberGroupRequestHeader requestHeader = new 
GetBrokerMemberGroupRequestHeader();
@@ -211,7 +212,7 @@ public class BrokerOuterAPI {
                 byte[] body = response.getBody();
                 if (body != null) {
                     GetBrokerMemberGroupResponseBody 
brokerMemberGroupResponseBody =
-                        GetBrokerMemberGroupResponseBody.decode(body, 
GetBrokerMemberGroupResponseBody.class);
+                            GetBrokerMemberGroupResponseBody.decode(body, 
GetBrokerMemberGroupResponseBody.class);
 
                     return 
brokerMemberGroupResponseBody.getBrokerMemberGroup();
                 }
@@ -224,7 +225,7 @@ public class BrokerOuterAPI {
     }
 
     public BrokerMemberGroup getBrokerMemberGroupCompatible(String 
clusterName, String brokerName)
-        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
+            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
         BrokerMemberGroup brokerMemberGroup = new 
BrokerMemberGroup(clusterName, brokerName);
 
         GetRouteInfoRequestHeader requestHeader = new 
GetRouteInfoRequestHeader();
@@ -243,8 +244,8 @@ public class BrokerOuterAPI {
                     TopicRouteData topicRouteData = 
TopicRouteData.decode(body, TopicRouteData.class);
                     for (BrokerData brokerData : 
topicRouteData.getBrokerDatas()) {
                         if (brokerData != null
-                            && brokerData.getBrokerName().equals(brokerName)
-                            && brokerData.getCluster().equals(clusterName)) {
+                                && 
brokerData.getBrokerName().equals(brokerName)
+                                && 
brokerData.getCluster().equals(clusterName)) {
                             
brokerMemberGroup.getBrokerAddrs().putAll(brokerData.getBrokerAddrs());
                             break;
                         }
@@ -260,13 +261,13 @@ public class BrokerOuterAPI {
     }
 
     public void sendHeartbeatViaDataVersion(
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final Long brokerId,
-        final int timeoutMillis,
-        final DataVersion dataVersion,
-        final boolean isInBrokerContainer) {
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final Long brokerId,
+            final int timeoutMillis,
+            final DataVersion dataVersion,
+            final boolean isInBrokerContainer) {
         List<String> nameServerAddressList = 
this.remotingClient.getAvailableNameSrvList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
             final QueryDataVersionRequestHeader requestHeader = new 
QueryDataVersionRequestHeader();
@@ -295,11 +296,11 @@ public class BrokerOuterAPI {
     }
 
     public void sendHeartbeat(final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final Long brokerId,
-        final int timeoutMills,
-        final boolean isInBrokerContainer) {
+                              final String brokerAddr,
+                              final String brokerName,
+                              final Long brokerId,
+                              final int timeoutMills,
+                              final boolean isInBrokerContainer) {
         List<String> nameServerAddressList = 
this.remotingClient.getAvailableNameSrvList();
 
         final BrokerHeartbeatRequestHeader requestHeader = new 
BrokerHeartbeatRequestHeader();
@@ -326,8 +327,8 @@ public class BrokerOuterAPI {
     }
 
     public BrokerSyncInfo retrieveBrokerHaInfo(String masterBrokerAddr)
-        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
-        MQBrokerException, RemotingCommandException {
+            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
+            MQBrokerException, RemotingCommandException {
         ExchangeHAInfoRequestHeader requestHeader = new 
ExchangeHAInfoRequestHeader();
         requestHeader.setMasterHaAddress(null);
 
@@ -348,7 +349,7 @@ public class BrokerOuterAPI {
     }
 
     public void sendBrokerHaInfo(String brokerAddr, String masterHaAddr, long 
brokerInitMaxOffset, String masterAddr)
-        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         ExchangeHAInfoRequestHeader requestHeader = new 
ExchangeHAInfoRequestHeader();
         requestHeader.setMasterHaAddress(masterHaAddr);
         requestHeader.setMasterFlushOffset(brokerInitMaxOffset);
@@ -371,30 +372,30 @@ public class BrokerOuterAPI {
     }
 
     public List<RegisterBrokerResult> registerBrokerAll(
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final String haServerAddr,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final List<String> filterServerList,
-        final boolean oneway,
-        final int timeoutMills,
-        final boolean enableActingMaster,
-        final boolean compressed,
-        final BrokerIdentity brokerIdentity) {
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final boolean oneway,
+            final int timeoutMills,
+            final boolean enableActingMaster,
+            final boolean compressed,
+            final BrokerIdentity brokerIdentity) {
         return registerBrokerAll(clusterName,
-            brokerAddr,
-            brokerName,
-            brokerId,
-            haServerAddr,
-            topicConfigWrapper,
-            filterServerList,
-            oneway, timeoutMills,
-            enableActingMaster,
-            compressed,
-            null,
-            brokerIdentity);
+                brokerAddr,
+                brokerName,
+                brokerId,
+                haServerAddr,
+                topicConfigWrapper,
+                filterServerList,
+                oneway, timeoutMills,
+                enableActingMaster,
+                compressed,
+                null,
+                brokerIdentity);
     }
 
     /**
@@ -410,23 +411,23 @@ public class BrokerOuterAPI {
      * @param filterServerList
      * @param oneway
      * @param timeoutMills
-     * @param compressed default false
+     * @param compressed         default false
      * @return
      */
     public List<RegisterBrokerResult> registerBrokerAll(
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final String haServerAddr,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final List<String> filterServerList,
-        final boolean oneway,
-        final int timeoutMills,
-        final boolean enableActingMaster,
-        final boolean compressed,
-        final Long heartbeatTimeoutMillis,
-        final BrokerIdentity brokerIdentity) {
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final boolean oneway,
+            final int timeoutMills,
+            final boolean enableActingMaster,
+            final boolean compressed,
+            final Long heartbeatTimeoutMillis,
+            final BrokerIdentity brokerIdentity) {
 
         final List<RegisterBrokerResult> registerBrokerResultList = new 
CopyOnWriteArrayList<>();
         List<String> nameServerAddressList = 
this.remotingClient.getAvailableNameSrvList();
@@ -483,13 +484,13 @@ public class BrokerOuterAPI {
     }
 
     private RegisterBrokerResult registerBroker(
-        final String namesrvAddr,
-        final boolean oneway,
-        final int timeoutMills,
-        final RegisterBrokerRequestHeader requestHeader,
-        final byte[] body
+            final String namesrvAddr,
+            final boolean oneway,
+            final int timeoutMills,
+            final RegisterBrokerRequestHeader requestHeader,
+            final byte[] body
     ) throws RemotingCommandException, MQBrokerException, 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-        InterruptedException {
+            InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, 
requestHeader);
         request.setBody(body);
 
@@ -507,7 +508,7 @@ public class BrokerOuterAPI {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 RegisterBrokerResponseHeader responseHeader =
-                    (RegisterBrokerResponseHeader) 
response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+                        (RegisterBrokerResponseHeader) 
response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                 RegisterBrokerResult result = new RegisterBrokerResult();
                 result.setMasterAddr(responseHeader.getMasterAddr());
                 result.setHaServerAddr(responseHeader.getHaServerAddr());
@@ -524,10 +525,10 @@ public class BrokerOuterAPI {
     }
 
     public void unregisterBrokerAll(
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId
     ) {
         List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null) {
@@ -543,11 +544,11 @@ public class BrokerOuterAPI {
     }
 
     public void unregisterBroker(
-        final String namesrvAddr,
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId
+            final String namesrvAddr,
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId
     ) throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException {
         UnRegisterBrokerRequestHeader requestHeader = new 
UnRegisterBrokerRequestHeader();
         requestHeader.setBrokerAddr(brokerAddr);
@@ -573,13 +574,13 @@ public class BrokerOuterAPI {
     }
 
     public List<Boolean> needRegister(
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final int timeoutMills,
-        final boolean isInBrokerContainer) {
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final int timeoutMills,
+            final boolean isInBrokerContainer) {
         final List<Boolean> changedList = new CopyOnWriteArrayList<>();
         List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
@@ -602,7 +603,7 @@ public class BrokerOuterAPI {
                             switch (response.getCode()) {
                                 case ResponseCode.SUCCESS: {
                                     QueryDataVersionResponseHeader 
queryDataVersionResponseHeader =
-                                        (QueryDataVersionResponseHeader) 
response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+                                            (QueryDataVersionResponseHeader) 
response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                     changed = 
queryDataVersionResponseHeader.getChanged();
                                     byte[] body = response.getBody();
                                     if (body != null) {
@@ -639,8 +640,8 @@ public class BrokerOuterAPI {
     }
 
     public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
-        final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQBrokerException {
+            final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 
3000);
@@ -657,8 +658,8 @@ public class BrokerOuterAPI {
     }
 
     public TimerCheckpoint getTimerCheckPoint(
-        final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQBrokerException {
+            final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 
3000);
@@ -671,12 +672,12 @@ public class BrokerOuterAPI {
                 break;
         }
 
-        throw new MQBrokerException(response.getCode(), 
response.getRemark(),addr);
+        throw new MQBrokerException(response.getCode(), response.getRemark(), 
addr);
     }
 
     public TimerMetrics.TimerMetricsSerializeWrapper getTimerMetrics(
-        final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQBrokerException {
+            final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 
3000);
@@ -693,8 +694,8 @@ public class BrokerOuterAPI {
     }
 
     public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
-        final String addr) throws InterruptedException, 
RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+            final String addr) throws InterruptedException, 
RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, 3000);
         assert response != null;
@@ -710,8 +711,8 @@ public class BrokerOuterAPI {
     }
 
     public String getAllDelayOffset(
-        final String addr) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException, 
UnsupportedEncodingException {
+            final String addr) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException, 
UnsupportedEncodingException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, 3000);
         assert response != null;
@@ -727,8 +728,8 @@ public class BrokerOuterAPI {
     }
 
     public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
-        final String addr) throws InterruptedException, 
RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+            final String addr) throws InterruptedException, 
RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, 3000);
         assert response != null;
@@ -752,8 +753,8 @@ public class BrokerOuterAPI {
     }
 
     public long getMaxOffset(final String addr, final String topic, final int 
queueId, final boolean committed,
-        final boolean isOnlyThisBroker)
-        throws RemotingException, MQBrokerException, InterruptedException {
+                             final boolean isOnlyThisBroker)
+            throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new 
GetMaxOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
@@ -776,7 +777,7 @@ public class BrokerOuterAPI {
     }
 
     public long getMinOffset(final String addr, final String topic, final int 
queueId, final boolean isOnlyThisBroker)
-        throws RemotingException, MQBrokerException, InterruptedException {
+            throws RemotingException, MQBrokerException, InterruptedException {
         GetMinOffsetRequestHeader requestHeader = new 
GetMinOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
@@ -798,10 +799,10 @@ public class BrokerOuterAPI {
     }
 
     public void lockBatchMQAsync(
-        final String addr,
-        final LockBatchRequestBody requestBody,
-        final long timeoutMillis,
-        final LockCallback callback) throws RemotingException, 
InterruptedException {
+            final String addr,
+            final LockBatchRequestBody requestBody,
+            final long timeoutMillis,
+            final LockCallback callback) throws RemotingException, 
InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
@@ -815,7 +816,7 @@ public class BrokerOuterAPI {
                 if (response != null) {
                     if (response.getCode() == ResponseCode.SUCCESS) {
                         LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(),
-                            LockBatchResponseBody.class);
+                                LockBatchResponseBody.class);
                         Set<MessageQueue> messageQueues = 
responseBody.getLockOKMQSet();
                         callback.onSuccess(messageQueues);
                     } else {
@@ -829,10 +830,10 @@ public class BrokerOuterAPI {
     }
 
     public void unlockBatchMQAsync(
-        final String addr,
-        final UnlockBatchRequestBody requestBody,
-        final long timeoutMillis,
-        final UnlockCallback callback) throws RemotingException, 
InterruptedException {
+            final String addr,
+            final UnlockBatchRequestBody requestBody,
+            final long timeoutMillis,
+            final UnlockCallback callback) throws RemotingException, 
InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
@@ -862,8 +863,8 @@ public class BrokerOuterAPI {
     }
 
     public SendResult sendMessageToSpecificBroker(String brokerAddr, final 
String brokerName,
-        final MessageExt msg, String group,
-        long timeoutMillis) throws RemotingException, MQBrokerException, 
InterruptedException {
+                                                  final MessageExt msg, String 
group,
+                                                  long timeoutMillis) throws 
RemotingException, MQBrokerException, InterruptedException {
 
         SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
         requestHeader.setProducerGroup(group);
@@ -889,9 +890,9 @@ public class BrokerOuterAPI {
     }
 
     private SendResult processSendResponse(
-        final String brokerName,
-        final Message msg,
-        final RemotingCommand response
+            final String brokerName,
+            final Message msg,
+            final RemotingCommand response
     ) throws MQBrokerException, RemotingCommandException {
         switch (response.getCode()) {
             case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -918,7 +919,7 @@ public class BrokerOuterAPI {
                 }
 
                 SendMessageResponseHeader responseHeader =
-                    (SendMessageResponseHeader) 
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                        (SendMessageResponseHeader) 
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
 
                 //If namespace not null , reset Topic without namespace.
                 String topic = msg.getTopic();
@@ -934,8 +935,8 @@ public class BrokerOuterAPI {
                     uniqMsgId = sb.toString();
                 }
                 SendResult sendResult = new SendResult(sendStatus,
-                    uniqMsgId,
-                    responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
+                        uniqMsgId,
+                        responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
                 sendResult.setTransactionId(responseHeader.getTransactionId());
                 String regionId = 
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
                 String traceOn = 
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -962,12 +963,12 @@ public class BrokerOuterAPI {
     }
 
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, 
final long timeoutMillis)
-        throws RemotingException, MQBrokerException, InterruptedException {
+            throws RemotingException, MQBrokerException, InterruptedException {
         return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
     }
 
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, 
final long timeoutMillis,
-        boolean allowTopicNotExist) throws MQBrokerException, 
InterruptedException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+                                                          boolean 
allowTopicNotExist) throws MQBrokerException, InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
         GetRouteInfoRequestHeader requestHeader = new 
GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1012,7 +1013,7 @@ public class BrokerOuterAPI {
     }
 
     public void forwardRequest(String brokerAddr, RemotingCommand request, 
long timeoutMillis,
-        InvokeCallback invokeCallback) throws InterruptedException, 
RemotingSendRequestException, RemotingTimeoutException, 
RemotingTooMuchRequestException, RemotingConnectException {
+                               InvokeCallback invokeCallback) throws 
InterruptedException, RemotingSendRequestException, RemotingTimeoutException, 
RemotingTooMuchRequestException, RemotingConnectException {
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, 
invokeCallback);
     }
 
@@ -1030,8 +1031,8 @@ public class BrokerOuterAPI {
     }
 
     public MessageRequestModeSerializeWrapper getAllMessageRequestMode(
-        final String addr) throws RemotingSendRequestException, 
RemotingConnectException,
-        MQBrokerException, RemotingTimeoutException, InterruptedException {
+            final String addr) throws RemotingSendRequestException, 
RemotingConnectException,
+            MQBrokerException, RemotingTimeoutException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE, 
null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, 3000);
         assert response != null;
@@ -1060,10 +1061,10 @@ public class BrokerOuterAPI {
      * Alter syncStateSet
      */
     public SyncStateSet alterSyncStateSet(
-        final String controllerAddress,
-        final String brokerName,
-        final String masterAddress, final int masterEpoch,
-        final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws 
Exception {
+            final String controllerAddress,
+            final String brokerName,
+            final String masterAddress, final int masterEpoch,
+            final Set<String> newSyncStateSet, final int syncStateSetEpoch) 
throws Exception {
 
         final AlterSyncStateSetRequestHeader requestHeader = new 
AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET,
 requestHeader);
@@ -1086,10 +1087,10 @@ public class BrokerOuterAPI {
      * Register broker to controller
      */
     public RegisterBrokerToControllerResponseHeader registerBrokerToController(
-        final String controllerAddress, final String clusterName,
-        final String brokerName, final String address) throws Exception {
+            final String controllerAddress, final String clusterName,
+            final String brokerName, final String address, final int epoch, 
final long maxOffset) throws Exception {
 
-        final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address);
+        final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, 
epoch, maxOffset);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
@@ -1108,7 +1109,7 @@ public class BrokerOuterAPI {
      * Get broker replica info
      */
     public Pair<GetReplicaInfoResponseHeader, SyncStateSet> 
getReplicaInfo(final String controllerAddress,
-        final String brokerName, final String brokerAddress) throws Exception {
+                                                                           
final String brokerName, final String brokerAddress) throws Exception {
         final GetReplicaInfoRequestHeader requestHeader = new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1131,12 +1132,15 @@ public class BrokerOuterAPI {
      * Send heartbeat to controller
      */
     public void sendHeartbeatToController(final String controllerAddress,
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final Long brokerId,
-        final int timeoutMills,
-        final boolean isInBrokerContainer) {
+                                          final String clusterName,
+                                          final String brokerAddr,
+                                          final String brokerName,
+                                          final Long brokerId,
+                                          final int timeoutMills,
+                                          final boolean isInBrokerContainer,
+                                          final int epoch,
+                                          final long maxOffset,
+                                          final long confirmOffset) {
         if (StringUtils.isEmpty(controllerAddress)) {
             return;
         }
@@ -1145,7 +1149,9 @@ public class BrokerOuterAPI {
         requestHeader.setClusterName(clusterName);
         requestHeader.setBrokerAddr(brokerAddr);
         requestHeader.setBrokerName(brokerName);
-
+        requestHeader.setEpoch(epoch);
+        requestHeader.setMaxOffset(maxOffset);
+        requestHeader.setConfirmOffset(confirmOffset);
         brokerOuterExecutor.execute(new AbstractBrokerRunnable(new 
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
             @Override
             public void run2() {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index bb0c83502..b7ab79eda 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -40,6 +40,8 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -121,7 +123,7 @@ public class ReplicasManagerTest {
         when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
         when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
         
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
-        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), 
any())).thenReturn(registerBrokerToControllerResponseHeader);
+        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), 
any(), anyInt(), 
anyLong())).thenReturn(registerBrokerToControllerResponseHeader);
         when(brokerOuterAPI.getReplicaInfo(any(), any(), 
any())).thenReturn(result);
         replicasManager = new ReplicasManager(brokerController);
         autoSwitchHAService.init(defaultMessageStore);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
index 52c3abef0..4c45e9b4a 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.header.namesrv;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
@@ -28,6 +29,12 @@ public class BrokerHeartbeatRequestHeader implements 
CommandCustomHeader {
     private String brokerAddr;
     @CFNotNull
     private String brokerName;
+    @CFNullable
+    private Integer epoch;
+    @CFNullable
+    private Long maxOffset;
+    @CFNullable
+    private Long confirmOffset;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -57,4 +64,28 @@ public class BrokerHeartbeatRequestHeader implements 
CommandCustomHeader {
     public void setBrokerName(String brokerName) {
         this.brokerName = brokerName;
     }
+
+    public Integer getEpoch() {
+        return epoch;
+    }
+
+    public void setEpoch(Integer epoch) {
+        this.epoch = epoch;
+    }
+
+    public Long getMaxOffset() {
+        return maxOffset;
+    }
+
+    public void setMaxOffset(Long maxOffset) {
+        this.maxOffset = maxOffset;
+    }
+
+    public Long getConfirmOffset() {
+        return confirmOffset;
+    }
+
+    public void setConfirmOffset(Long confirmOffset) {
+        this.confirmOffset = confirmOffset;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
index 9a0332cef..1028ead6e 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
@@ -25,6 +25,10 @@ public class RegisterBrokerToControllerRequestHeader 
implements CommandCustomHea
     private String brokerName;
     private String brokerAddress;
     @CFNullable
+    private Integer epoch;
+    @CFNullable
+    private Long maxOffset;
+    @CFNullable
     private Long heartbeatTimeoutMillis;
 
 
@@ -37,6 +41,14 @@ public class RegisterBrokerToControllerRequestHeader 
implements CommandCustomHea
         this.brokerAddress = brokerAddress;
     }
 
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String 
brokerName, String brokerAddress, int epoch, long maxOffset) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+        this.epoch = epoch;
+        this.maxOffset = maxOffset;
+    }
+
     public String getClusterName() {
         return clusterName;
     }
@@ -71,11 +83,30 @@ public class RegisterBrokerToControllerRequestHeader 
implements CommandCustomHea
 
     @Override
     public String toString() {
-        return "RegisterBrokerRequestHeader{" +
-            "clusterName='" + clusterName + '\'' +
-            ", brokerName='" + brokerName + '\'' +
-            ", brokerAddress='" + brokerAddress + '\'' +
-            '}';
+        return "RegisterBrokerToControllerRequestHeader{" +
+                "clusterName='" + clusterName + '\'' +
+                ", brokerName='" + brokerName + '\'' +
+                ", brokerAddress='" + brokerAddress + '\'' +
+                ", epoch=" + epoch +
+                ", maxOffset=" + maxOffset +
+                ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+                '}';
+    }
+
+    public Integer getEpoch() {
+        return epoch;
+    }
+
+    public void setEpoch(Integer epoch) {
+        this.epoch = epoch;
+    }
+
+    public Long getMaxOffset() {
+        return maxOffset;
+    }
+
+    public void setMaxOffset(Long maxOffset) {
+        this.maxOffset = maxOffset;
     }
 
     @Override
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index e2ff58301..364b32647 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -23,7 +23,7 @@ public interface BrokerHeartbeatManager {
     /**
      * Broker new heartbeat.
      */
-    void onBrokerHeartbeat(final String clusterName, final String brokerAddr);
+    void onBrokerHeartbeat(final String clusterName, final String brokerAddr, 
final Integer epoch, final Long maxOffset, final Long confirmOffset);
 
     /**
      * Change the metadata(brokerId ..) for a broker.
@@ -49,13 +49,18 @@ public interface BrokerHeartbeatManager {
      * Register new broker to heartManager.
      */
     void registerBroker(final String clusterName, final String brokerName, 
final String brokerAddr, final long brokerId,
-        final Long timeoutMillis, final Channel channel);
+                        final Long timeoutMillis, final Channel channel, final 
Integer epoch, final Long maxOffset);
 
     /**
      * Broker channel close
      */
     void onBrokerChannelClose(final Channel channel);
 
+    /**
+     * Get broker live information by clusterName and brokerAddr
+     */
+    BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerAddr);
+
     /**
      * Check whether broker active
      */
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
new file mode 100644
index 000000000..e88b26c39
--- /dev/null
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+
+import io.netty.channel.Channel;
+
+
+public class BrokerLiveInfo {
+    private final String brokerName;
+
+    private final String brokerAddr;
+    private final long heartbeatTimeoutMillis;
+    private final Channel channel;
+    private long brokerId;
+    private long lastUpdateTimestamp;
+    private int epoch;
+    private long maxOffset;
+    private long confirmOffset;
+
+    public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, 
long lastUpdateTimestamp, long heartbeatTimeoutMillis,
+                          Channel channel, int epoch, long maxOffset) {
+        this.brokerName = brokerName;
+        this.brokerAddr = brokerAddr;
+        this.brokerId = brokerId;
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+        this.channel = channel;
+        this.epoch = epoch;
+        this.maxOffset = maxOffset;
+    }
+    public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, 
long lastUpdateTimestamp, long heartbeatTimeoutMillis,
+                          Channel channel, int epoch, long maxOffset, long 
confirmOffset) {
+        this.brokerName = brokerName;
+        this.brokerAddr = brokerAddr;
+        this.brokerId = brokerId;
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+        this.channel = channel;
+        this.epoch = epoch;
+        this.maxOffset = maxOffset;
+        this.confirmOffset = confirmOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerLiveInfo{" +
+                "brokerName='" + brokerName + '\'' +
+                ", brokerAddr='" + brokerAddr + '\'' +
+                ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+                ", channel=" + channel +
+                ", brokerId=" + brokerId +
+                ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+                ", epoch=" + epoch +
+                ", maxOffset=" + maxOffset +
+                ", confirmOffset=" + confirmOffset +
+                '}';
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public long getHeartbeatTimeoutMillis() {
+        return heartbeatTimeoutMillis;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public int getEpoch() {
+        return epoch;
+    }
+
+    public void setEpoch(int epoch) {
+        this.epoch = epoch;
+    }
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+    public void setMaxOffset(long maxOffset) {
+        this.maxOffset = maxOffset;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setConfirmOffset(long confirmOffset) {
+        this.confirmOffset = confirmOffset;
+    }
+
+    public long getConfirmOffset() {
+        return confirmOffset;
+    }
+}
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 4f0e1e75d..cd4a60158 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.MixAll;
@@ -36,6 +37,7 @@ import 
org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
 import 
org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
 import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
@@ -63,15 +65,12 @@ public class ControllerManager {
     private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
 
     public ControllerManager(ControllerConfig controllerConfig, 
NettyServerConfig nettyServerConfig,
-        NettyClientConfig nettyClientConfig) {
+                             NettyClientConfig nettyClientConfig) {
         this.controllerConfig = controllerConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.nettyClientConfig = nettyClientConfig;
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
-        this.configuration = new Configuration(
-            log,
-            this.controllerConfig, this.nettyServerConfig
-        );
+        this.configuration = new Configuration(log, this.controllerConfig, 
this.nettyServerConfig);
         this.configuration.setStorePathFromConfig(this.controllerConfig, 
"configStorePath");
         this.remotingClient = new NettyRemotingClient(nettyClientConfig);
     }
@@ -79,12 +78,12 @@ public class ControllerManager {
     public boolean initialize() {
         this.controllerRequestThreadPoolQueue = new 
LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
         this.controllerRequestExecutor = new ThreadPoolExecutor(
-            this.controllerConfig.getControllerThreadPoolNums(),
-            this.controllerConfig.getControllerThreadPoolNums(),
-            1000 * 60,
-            TimeUnit.MILLISECONDS,
-            this.controllerRequestThreadPoolQueue,
-            new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+                this.controllerConfig.getControllerThreadPoolNums(),
+                this.controllerConfig.getControllerThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.controllerRequestThreadPoolQueue,
+                new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
             @Override
             protected <T> RunnableFuture<T> newTaskFor(final Runnable 
runnable, final T value) {
                 return new FutureTaskExt<T>(runnable, value);
@@ -97,39 +96,46 @@ public class ControllerManager {
         if 
(StringUtils.isEmpty(this.controllerConfig.getControllerDLegerSelfId())) {
             throw new IllegalArgumentException("Attribute value 
controllerDLegerSelfId of ControllerConfig is null or empty");
         }
-        this.controller = new DLedgerController(this.controllerConfig, 
(cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, 
brokerAddr),
-            this.nettyServerConfig, this.nettyClientConfig, 
this.brokerHousekeepingService);
+        this.controller = new DLedgerController(this.controllerConfig, 
this.heartbeatManager::isBrokerActive,
+                this.nettyServerConfig, this.nettyClientConfig, 
this.brokerHousekeepingService,
+                new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo));
 
         // Register broker inactive listener
-        this.heartbeatManager.addBrokerLifecycleListener(new 
BrokerHeartbeatManager.BrokerLifecycleListener() {
-            @Override
-            public void onBrokerInactive(String clusterName, String 
brokerName, String brokerAddress, long brokerId) {
-                if (brokerId == MixAll.MASTER_ID) {
-                    if (controller.isLeaderState()) {
-                        final CompletableFuture<RemotingCommand> future = 
controller.electMaster(new ElectMasterRequestHeader(brokerName));
-                        try {
-                            final RemotingCommand response = future.get(5, 
TimeUnit.SECONDS);
-                            final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
-                            if (responseHeader != null) {
-                                log.info("Broker {}'s master {} shutdown, 
elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
-                                if 
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
-                                    
heartbeatManager.changeBrokerMetadata(clusterName, 
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
-                                }
-
-                                if 
(controllerConfig.isNotifyBrokerRoleChanged()) {
-                                    notifyBrokerRoleChanged(responseHeader, 
clusterName);
-                                }
-                            }
-                        } catch (Exception ignored) {
+        
this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
+        registerProcessor();
+        return true;
+    }
+
+    /**
+     * When the heartbeatManager detects the "Broker is not active",
+     * we call this method to elect a master and do something else.
+     * @param clusterName The cluster name of this inactive broker
+     * @param brokerName The inactive broker name
+     * @param brokerAddress The inactive broker address(ip)
+     * @param brokerId The inactive broker id
+     */
+    private void onBrokerInactive(String clusterName, String brokerName, 
String brokerAddress, long brokerId) {
+        if (brokerId == MixAll.MASTER_ID) {
+            if (controller.isLeaderState()) {
+                final CompletableFuture<RemotingCommand> future = 
controller.electMaster(new ElectMasterRequestHeader(brokerName));
+                try {
+                    final RemotingCommand response = future.get(5, 
TimeUnit.SECONDS);
+                    final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) response.readCustomHeader();
+                    if (responseHeader != null) {
+                        log.info("Broker {}'s master {} shutdown, elect a new 
master done, result:{}", brokerName, brokerAddress, responseHeader);
+                        if 
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
+                            heartbeatManager.changeBrokerMetadata(clusterName, 
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
+                        }
+                        if (controllerConfig.isNotifyBrokerRoleChanged()) {
+                            notifyBrokerRoleChanged(responseHeader, 
clusterName);
                         }
-                    } else {
-                        log.info("Broker{}' master shutdown", brokerName);
                     }
+                } catch (Exception ignored) {
                 }
+            } else {
+                log.info("Broker{}' master shutdown", brokerName);
             }
-        });
-        registerProcessor();
-        return true;
+        }
     }
 
     /**
@@ -156,11 +162,11 @@ public class ControllerManager {
     }
 
     public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long 
brokerId,
-        final ElectMasterResponseHeader responseHeader) {
+                                          final ElectMasterResponseHeader 
responseHeader) {
         if (StringUtils.isNoneEmpty(brokerAddr)) {
             log.info("Try notify broker {} with id {} that role changed, 
responseHeader:{}", brokerAddr, brokerId, responseHeader);
             final NotifyBrokerRoleChangedRequestHeader requestHeader = new 
NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
-                responseHeader.getMasterEpoch(), 
responseHeader.getSyncStateSetEpoch(), brokerId);
+                    responseHeader.getMasterEpoch(), 
responseHeader.getSyncStateSetEpoch(), brokerId);
             final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, 
requestHeader);
             try {
                 this.remotingClient.invokeOneway(brokerAddr, request, 3000);
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
new file mode 100644
index 000000000..214012e51
--- /dev/null
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.elect;
+
+
+import java.util.Set;
+
+public interface ElectPolicy {
+
+    /**
+     * elect a master
+     *
+     * @param clusterName       the brokerGroup belongs
+     * @param syncStateBrokers  all broker replicas in syncStateSet
+     * @param allReplicaBrokers all broker replicas
+     * @param oldMaster         old master
+     * @param preferBrokerAddr  the broker prefer to be elected
+     * @return new master's brokerAddr
+     */
+    String elect(String clusterName, Set<String> syncStateBrokers, Set<String> 
allReplicaBrokers, String oldMaster, String preferBrokerAddr);
+
+}
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
new file mode 100644
index 000000000..c1b2a50d5
--- /dev/null
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.elect.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.BrokerLiveInfo;
+
+import java.util.Comparator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+
+public class DefaultElectPolicy implements ElectPolicy {
+
+    // <clusterName, brokerAddr> valid predicate
+    private BiPredicate<String, String> validPredicate;
+
+    // <clusterName, brokerAddr, info> getter to get more information
+    private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter;
+
+    private final Comparator<BrokerLiveInfo> comparator = (x, y) -> {
+        return x.getEpoch() == y.getEpoch() ? (int) (y.getMaxOffset() - 
x.getMaxOffset()) : y.getEpoch() - x.getEpoch();
+    };
+
+    public DefaultElectPolicy(BiPredicate<String, String> validPredicate, 
BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) {
+        this.validPredicate = validPredicate;
+        this.additionalInfoGetter = additionalInfoGetter;
+    }
+
+    public DefaultElectPolicy() {
+
+    }
+
+    /**
+     * try to elect a master, if old master still alive, now we do nothing,
+     * if preferBrokerAddr is not blank, that means we must elect a new master,
+     * and we should check if the preferBrokerAddr is valid, if so we should 
elect it as
+     * new master, if else we should elect nothing.
+     * @param clusterName       the brokerGroup belongs
+     * @param syncStateBrokers  all broker replicas in syncStateSet
+     * @param allReplicaBrokers all broker replicas
+     * @param oldMaster         old master
+     * @param preferBrokerAddr  the broker prefer to be elected
+     * @return master elected by our own policy
+     */
+    @Override
+    public String elect(String clusterName, Set<String> syncStateBrokers, 
Set<String> allReplicaBrokers, String oldMaster, String preferBrokerAddr) {
+        String newMaster = null;
+        // try to elect in syncStateBrokers
+        if (syncStateBrokers != null) {
+            newMaster = tryElect(clusterName, syncStateBrokers, oldMaster, 
preferBrokerAddr);
+        }
+        if (StringUtils.isNotEmpty(newMaster)) {
+            return newMaster;
+        }
+        // try to elect in all replicas
+        if (allReplicaBrokers != null) {
+            newMaster = tryElect(clusterName, allReplicaBrokers, oldMaster, 
preferBrokerAddr);
+        }
+        return newMaster;
+    }
+
+
+    private String tryElect(String clusterName, Set<String> brokers, String 
oldMaster, String preferBrokerAddr) {
+        if (this.validPredicate != null) {
+            brokers = brokers.stream().filter(brokerAddr -> 
this.validPredicate.test(clusterName, brokerAddr)).collect(Collectors.toSet());
+        }
+        // try to elect in brokers
+        if (brokers.size() >= 1) {
+            if (brokers.contains(oldMaster) && 
(StringUtils.isBlank(preferBrokerAddr) || preferBrokerAddr.equals(oldMaster))) {
+                // old master still valid, and our preferBrokerAddr is blank 
or is equals to oldMaster
+                return oldMaster;
+            }
+            // if preferBrokerAddr is not blank, if preferBrokerAddr is valid, 
we choose it, else we choose nothing
+            if (StringUtils.isNotBlank(preferBrokerAddr)) {
+                return brokers.contains(preferBrokerAddr) ? preferBrokerAddr : 
null;
+            }
+            if (this.additionalInfoGetter != null) {
+                // get more information from getter
+                // sort brokerLiveInfos by epoch, maxOffset
+                TreeSet<BrokerLiveInfo> brokerLiveInfos = new 
TreeSet<>(this.comparator);
+                brokers.forEach(brokerAddr -> 
brokerLiveInfos.add(this.additionalInfoGetter.apply(clusterName, brokerAddr)));
+                if (brokerLiveInfos.size() >= 1) {
+                    return brokerLiveInfos.first().getBrokerAddr();
+                }
+            }
+            // elect random
+            return brokers.iterator().next();
+        }
+        return null;
+    }
+
+
+    public BiFunction<String, String, BrokerLiveInfo> 
getAdditionalInfoGetter() {
+        return additionalInfoGetter;
+    }
+
+    public void setAdditionalInfoGetter(BiFunction<String, String, 
BrokerLiveInfo> additionalInfoGetter) {
+        this.additionalInfoGetter = additionalInfoGetter;
+    }
+
+    public BiPredicate<String, String> getValidPredicate() {
+        return validPredicate;
+    }
+
+    public void setValidPredicate(BiPredicate<String, String> validPredicate) {
+        this.validPredicate = validPredicate;
+    }
+}
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 3a5480140..c6a8b3345 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -24,6 +24,7 @@ import io.openmessaging.storage.dledger.MemberState;
 import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
 import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
 import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiPredicate;
 import java.util.function.Supplier;
+
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -48,6 +50,8 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.event.EventSerializer;
@@ -77,20 +81,25 @@ public class DLedgerController implements Controller {
     private final DLedgerControllerStateMachine statemachine;
     // Usr for checking whether the broker is alive
     private BiPredicate<String, String> brokerAlivePredicate;
+    // use for elect a master
+    private ElectPolicy electPolicy;
+
+
     private AtomicBoolean isScheduling = new AtomicBoolean(false);
 
     public DLedgerController(final ControllerConfig config, final 
BiPredicate<String, String> brokerAlivePredicate) {
-        this(config, brokerAlivePredicate, null, null, null);
+        this(config, brokerAlivePredicate, null, null, null, null);
     }
 
     public DLedgerController(final ControllerConfig controllerConfig,
-        final BiPredicate<String, String> brokerAlivePredicate, final 
NettyServerConfig nettyServerConfig,
-        final NettyClientConfig nettyClientConfig, final ChannelEventListener 
channelEventListener) {
+                             final BiPredicate<String, String> 
brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
+                             final NettyClientConfig nettyClientConfig, final 
ChannelEventListener channelEventListener,
+                             final ElectPolicy electPolicy) {
         this.controllerConfig = controllerConfig;
         this.eventSerializer = new EventSerializer();
         this.scheduler = new EventScheduler();
         this.brokerAlivePredicate = brokerAlivePredicate;
-
+        this.electPolicy = electPolicy == null ? new DefaultElectPolicy() : 
electPolicy;
         this.dLedgerConfig = new DLedgerConfig();
         
this.dLedgerConfig.setGroup(controllerConfig.getControllerDLegerGroup());
         
this.dLedgerConfig.setPeers(controllerConfig.getControllerDLegerPeers());
@@ -145,7 +154,7 @@ public class DLedgerController implements Controller {
 
     @Override
     public CompletableFuture<RemotingCommand> 
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
-        final SyncStateSet syncStateSet) {
+                                                                final 
SyncStateSet syncStateSet) {
         return this.scheduler.appendEvent("alterSyncStateSet",
             () -> this.replicasInfoManager.alterSyncStateSet(request, 
syncStateSet, this.brokerAlivePredicate), true);
     }
@@ -153,7 +162,7 @@ public class DLedgerController implements Controller {
     @Override
     public CompletableFuture<RemotingCommand> electMaster(final 
ElectMasterRequestHeader request) {
         return this.scheduler.appendEvent("electMaster",
-            () -> this.replicasInfoManager.electMaster(request, 
this.brokerAlivePredicate), true);
+            () -> this.replicasInfoManager.electMaster(request, 
this.electPolicy), true);
     }
 
     @Override
@@ -170,7 +179,6 @@ public class DLedgerController implements Controller {
 
     @Override
     public CompletableFuture<RemotingCommand> getSyncStateData(List<String> 
brokerNames) {
-
         return this.scheduler.appendEvent("getSyncStateData",
             () -> this.replicasInfoManager.getSyncStateData(brokerNames), 
false);
     }
@@ -185,7 +193,7 @@ public class DLedgerController implements Controller {
             sb.append(peer).append(";");
         }
         return 
RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new 
GetMetaDataResponseHeader(
-            state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), 
state.isLeader(), sb.toString()));
+                state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), 
state.isLeader(), sb.toString()));
     }
 
     @Override
@@ -220,6 +228,10 @@ public class DLedgerController implements Controller {
         this.brokerAlivePredicate = brokerAlivePredicate;
     }
 
+    public void setElectPolicy(ElectPolicy electPolicy) {
+        this.electPolicy = electPolicy;
+    }
+
     /**
      * Event handler that handle event
      */
@@ -276,7 +288,7 @@ public class DLedgerController implements Controller {
         }
 
         public <T> CompletableFuture<RemotingCommand> appendEvent(final String 
name,
-            final Supplier<ControllerResult<T>> supplier, boolean 
isWriteEvent) {
+                                                                  final 
Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
             if (isStopped() || 
!DLedgerController.this.roleHandler.isLeaderState()) {
                 final RemotingCommand command = 
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The 
controller is not in leader state");
                 final CompletableFuture<RemotingCommand> future = new 
CompletableFuture<>();
@@ -313,7 +325,7 @@ public class DLedgerController implements Controller {
         private final boolean isWriteEvent;
 
         ControllerEventHandler(final String name, final 
Supplier<ControllerResult<T>> supplier,
-            final boolean isWriteEvent) {
+                               final boolean isWriteEvent) {
             this.name = name;
             this.supplier = supplier;
             this.future = new CompletableFuture<>();
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 21b3c89c9..95cc85197 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.controller.impl;
 
 import io.netty.channel.Channel;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -26,11 +27,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.common.BrokerAddrInfo;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.BrokerLiveInfo;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -68,17 +71,17 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
             final Iterator<Map.Entry<BrokerAddrInfo, BrokerLiveInfo>> iterator 
= this.brokerLiveTable.entrySet().iterator();
             while (iterator.hasNext()) {
                 final Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = 
iterator.next();
-                long last = next.getValue().lastUpdateTimestamp;
-                long timeoutMillis = next.getValue().heartbeatTimeoutMillis;
+                long last = next.getValue().getLastUpdateTimestamp();
+                long timeoutMillis = 
next.getValue().getHeartbeatTimeoutMillis();
                 if ((last + timeoutMillis) < System.currentTimeMillis()) {
-                    final Channel channel = next.getValue().channel;
+                    final Channel channel = next.getValue().getChannel();
                     iterator.remove();
                     if (channel != null) {
                         RemotingUtil.closeChannel(channel);
                     }
                     this.executor.submit(() ->
-                        notifyBrokerInActive(next.getKey().getClusterName(), 
next.getValue().brokerName, next.getKey().getBrokerAddr(), 
next.getValue().brokerId));
-                    log.warn("The broker channel {} expired, brokerInfo {}, 
expired {}ms", next.getValue().channel, next.getKey(), timeoutMillis);
+                            
notifyBrokerInActive(next.getKey().getClusterName(), 
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(), 
next.getValue().getBrokerId()));
+                    log.warn("The broker channel {} expired, brokerInfo {}, 
expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis);
                 }
             }
         } catch (Exception e) {
@@ -99,14 +102,15 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
 
     @Override
     public void registerBroker(String clusterName, String brokerName, String 
brokerAddr,
-        long brokerId, Long timeoutMillis, Channel channel) {
+                               long brokerId, Long timeoutMillis, Channel 
channel, Integer epoch, Long maxOffset) {
         final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, 
brokerAddr);
         final BrokerLiveInfo prevBrokerLiveInfo = 
this.brokerLiveTable.put(addrInfo,
-            new BrokerLiveInfo(brokerName,
-                brokerId,
-                System.currentTimeMillis(),
-                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : 
timeoutMillis,
-                channel));
+                new BrokerLiveInfo(brokerName,
+                        brokerAddr,
+                        brokerId,
+                        System.currentTimeMillis(),
+                        timeoutMillis == null ? 
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+                        channel, epoch == null ? -1 : epoch, maxOffset == null 
? -1 : maxOffset));
         if (prevBrokerLiveInfo == null) {
             log.info("new broker registered, {}, brokerId:{}", addrInfo, 
brokerId);
         }
@@ -117,17 +121,30 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
         BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
         BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
         if (prev != null) {
-            prev.brokerId = brokerId;
+            prev.setBrokerId(brokerId);
             log.info("Change broker {}'s brokerId to {}", brokerAddr, 
brokerId);
         }
     }
 
     @Override
-    public void onBrokerHeartbeat(String clusterName, String brokerAddr) {
+    public void onBrokerHeartbeat(String clusterName, String brokerAddr, 
Integer epoch, Long maxOffset, Long confirmOffset) {
         BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
         BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
+        int realEpoch = epoch == null ? -1 : epoch;
+        long realMaxOffset = maxOffset == null ? -1 : maxOffset;
+        long realConfirmOffset = confirmOffset == null ? -1 : confirmOffset;
         if (prev != null) {
-            prev.lastUpdateTimestamp = System.currentTimeMillis();
+            prev.setLastUpdateTimestamp(System.currentTimeMillis());
+            if (realEpoch > prev.getEpoch()) {
+                prev.setEpoch(realEpoch);
+                prev.setMaxOffset(realMaxOffset);
+                prev.setConfirmOffset(realConfirmOffset);
+            } else if (realEpoch == prev.getEpoch()) {
+                if (realMaxOffset > prev.getMaxOffset()) {
+                    prev.setMaxOffset(realMaxOffset);
+                    prev.setConfirmOffset(realConfirmOffset);
+                }
+            }
         }
     }
 
@@ -135,11 +152,11 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
     public void onBrokerChannelClose(Channel channel) {
         BrokerAddrInfo addrInfo = null;
         for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : 
this.brokerLiveTable.entrySet()) {
-            if (entry.getValue().channel == channel) {
-                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", 
entry.getValue().channel, entry.getValue().brokerName, 
entry.getKey().getBrokerAddr(), entry.getValue().brokerId);
+            if (entry.getValue().getChannel() == channel) {
+                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", 
entry.getValue().getChannel(), entry.getValue().getBrokerName(), 
entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId());
                 addrInfo = entry.getKey();
                 this.executor.submit(() ->
-                    notifyBrokerInActive(entry.getKey().getClusterName(), 
entry.getValue().brokerName, entry.getKey().getBrokerAddr(), 
entry.getValue().brokerId));
+                        notifyBrokerInActive(entry.getKey().getClusterName(), 
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(), 
entry.getValue().getBrokerId()));
                 break;
             }
         }
@@ -148,42 +165,20 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
         }
     }
 
+    @Override
+    public BrokerLiveInfo getBrokerLiveInfo(String clusterName, String 
brokerAddr) {
+        return this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, 
brokerAddr));
+    }
+
     @Override
     public boolean isBrokerActive(String clusterName, String brokerAddr) {
         final BrokerLiveInfo info = this.brokerLiveTable.get(new 
BrokerAddrInfo(clusterName, brokerAddr));
         if (info != null) {
-            long last = info.lastUpdateTimestamp;
-            long timeoutMillis = info.heartbeatTimeoutMillis;
+            long last = info.getLastUpdateTimestamp();
+            long timeoutMillis = info.getHeartbeatTimeoutMillis();
             return (last + timeoutMillis) >= System.currentTimeMillis();
         }
         return false;
     }
 
-    static class BrokerLiveInfo {
-        private final String brokerName;
-        private final long heartbeatTimeoutMillis;
-        private final Channel channel;
-        private long brokerId;
-        private long lastUpdateTimestamp;
-
-        public BrokerLiveInfo(String brokerName, long brokerId, long 
lastUpdateTimestamp, long heartbeatTimeoutMillis,
-            Channel channel) {
-            this.brokerName = brokerName;
-            this.brokerId = brokerId;
-            this.lastUpdateTimestamp = lastUpdateTimestamp;
-            this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
-            this.channel = channel;
-        }
-
-        @Override
-        public String toString() {
-            return "BrokerLiveInfo{" +
-                "brokerName='" + brokerName + '\'' +
-                ", brokerId=" + brokerId +
-                ", lastUpdateTimestamp=" + lastUpdateTimestamp +
-                ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
-                ", channel=" + channel +
-                '}';
-        }
-    }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index c63ca0bf1..ed123aeb2 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiPredicate;
-import java.util.function.Predicate;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
@@ -41,6 +40,7 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
 import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
@@ -68,8 +68,8 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
-            final AlterSyncStateSetRequestHeader request, final SyncStateSet 
syncStateSet,
-            final BiPredicate<String, String> brokerAlivePredicate) {
+        final AlterSyncStateSetRequestHeader request, final SyncStateSet 
syncStateSet,
+        final BiPredicate<String, String> brokerAlivePredicate) {
         final String brokerName = request.getBrokerName();
         final ControllerResult<AlterSyncStateSetResponseHeader> result = new 
ControllerResult<>(new AlterSyncStateSetResponseHeader());
         final AlterSyncStateSetResponseHeader response = result.getResponse();
@@ -91,7 +91,7 @@ public class ReplicasInfoManager {
             // Check master
             if 
(!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
                 String err = String.format("Rejecting alter syncStateSet 
request because the current leader is:{%s}, not {%s}",
-                        syncStateInfo.getMasterAddress(), 
request.getMasterAddress());
+                    syncStateInfo.getMasterAddress(), 
request.getMasterAddress());
                 log.error("{}", err);
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err);
                 return result;
@@ -100,7 +100,7 @@ public class ReplicasInfoManager {
             // Check master epoch
             if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
                 String err = String.format("Rejecting alter syncStateSet 
request because the current master epoch is:{%d}, not {%d}",
-                        syncStateInfo.getMasterEpoch(), 
request.getMasterEpoch());
+                    syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
                 log.error("{}", err);
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err);
                 return result;
@@ -109,7 +109,7 @@ public class ReplicasInfoManager {
             // Check syncStateSet epoch
             if (syncStateSet.getSyncStateSetEpoch() != 
syncStateInfo.getSyncStateSetEpoch()) {
                 String err = String.format("Rejecting alter syncStateSet 
request because the current syncStateSet epoch is:{%d}, not {%d}",
-                        syncStateInfo.getSyncStateSetEpoch(), 
syncStateSet.getSyncStateSetEpoch());
+                    syncStateInfo.getSyncStateSetEpoch(), 
syncStateSet.getSyncStateSetEpoch());
                 log.error("{}", err);
                 
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH, 
err);
                 return result;
@@ -150,8 +150,7 @@ public class ReplicasInfoManager {
         return result;
     }
 
-    public ControllerResult<ElectMasterResponseHeader> electMaster(
-            final ElectMasterRequestHeader request, final BiPredicate<String, 
String> brokerAlivePredicate) {
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final 
ElectMasterRequestHeader request, final ElectPolicy electPolicy) {
         final String brokerName = request.getBrokerName();
         final String assignBrokerAddress = request.getBrokerAddress();
         final ControllerResult<ElectMasterResponseHeader> result = new 
ControllerResult<>(new ElectMasterResponseHeader());
@@ -159,87 +158,24 @@ public class ReplicasInfoManager {
             final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
             final BrokerInfo brokerInfo = 
this.replicaInfoTable.get(brokerName);
             final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
-            // First, check whether the master is still active
             final String oldMaster = syncStateInfo.getMasterAddress();
-            if (StringUtils.isNoneEmpty(oldMaster) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), oldMaster)) {
+            Set<String> allReplicaBrokers = 
controllerConfig.isEnableElectUncleanMaster() ? brokerInfo.getAllBroker() : 
null;
 
-                if (StringUtils.isBlank(assignBrokerAddress)) {
-                    String err = String.format("The old master %s is still 
alive, no need to elect new master for broker %s", oldMaster, 
brokerInfo.getBrokerName());
-                    log.warn("{}", err);
-                    
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
-                    return result;
-                }
-
-                if (StringUtils.equals(oldMaster, assignBrokerAddress)) {
-                    String err = String.format("The Re-elect master is the 
same as the old master %s which is still alive, no need to elect new master for 
broker %s", oldMaster, brokerInfo.getBrokerName());
-                    log.warn("{}", err);
-                    
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
-                    return result;
-                }
-            }
-
-            // Try elect a master in syncStateSet
-            if (syncStateSet.size() > 1) {
-                boolean electSuccess = tryElectMaster(result, brokerName, 
assignBrokerAddress, syncStateSet, candidate ->
-                        !candidate.equals(syncStateInfo.getMasterAddress()) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
-                if (electSuccess) {
-                    return result;
-                }
-            }
-
-            // Try elect a master in lagging replicas if 
enableElectUncleanMaster = true
-            if (controllerConfig.isEnableElectUncleanMaster()) {
-                boolean electSuccess = tryElectMaster(result, brokerName, 
assignBrokerAddress, brokerInfo.getAllBroker(), candidate ->
-                        !candidate.equals(syncStateInfo.getMasterAddress()) && 
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
-                if (electSuccess) {
-                    return result;
-                }
-            }
-
-            // If elect failed, we still need to apply an ElectMasterEvent to 
tell the statemachine
-            // that the master was shutdown and no new master was elected.
-            final ElectMasterEvent event = new ElectMasterEvent(false, 
brokerName);
-            result.addEvent(event);
-            
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Failed 
to elect a new broker master");
-            return result;
-        }
-        result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, 
"Broker metadata is not existed");
-        return result;
-    }
-
-    /**
-     * Try elect a new master in candidates
-     *
-     * @param filter return true if the candidate is available
-     * @return true if elect success
-     */
-    private boolean tryElectMaster(final 
ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
-                                   final String assignBrokerAddress, final 
Set<String> candidates, final Predicate<String> filter) {
-        final int masterEpoch = 
this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
-        final int syncStateSetEpoch = 
this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
-
-        //Re-elect the assigned broker as master
-        if (StringUtils.isNotBlank(assignBrokerAddress) && 
filter.test(assignBrokerAddress)) {
-            final ElectMasterResponseHeader response = result.getResponse();
-            response.setNewMasterAddress(assignBrokerAddress);
-            response.setMasterEpoch(masterEpoch + 1);
-            response.setSyncStateSetEpoch(syncStateSetEpoch);
-            BrokerMemberGroup brokerMemberGroup = 
buildBrokerMemberGroup(brokerName);
-            if (null != brokerMemberGroup) {
-                response.setBrokerMemberGroup(brokerMemberGroup);
-                result.setBody(brokerMemberGroup.encode());
+            // elect by policy
+            String newMaster = electPolicy.elect(brokerInfo.getClusterName(), 
syncStateSet, allReplicaBrokers, oldMaster, assignBrokerAddress);
+            if (StringUtils.isNotEmpty(newMaster) && 
newMaster.equals(oldMaster)) {
+                // old master still valid, change nothing
+                String err = String.format("The old master %s is still alive, 
not need to elect new master for broker %s", oldMaster, 
brokerInfo.getBrokerName());
+                log.warn("{}", err);
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+                return result;
             }
-            final ElectMasterEvent event = new ElectMasterEvent(brokerName, 
assignBrokerAddress);
-            result.addEvent(event);
-            return true;
-        } else if (StringUtils.isNotBlank(assignBrokerAddress) && 
!filter.test(assignBrokerAddress)) {
-            return false;
-        }
-
-        for (final String candidate : candidates) {
-            if (filter.test(candidate)) {
+            // a new master is elected
+            if (StringUtils.isNotEmpty(newMaster)) {
+                final int masterEpoch = 
this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
+                final int syncStateSetEpoch = 
this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
                 final ElectMasterResponseHeader response = 
result.getResponse();
-                response.setNewMasterAddress(candidate);
+                response.setNewMasterAddress(newMaster);
                 response.setMasterEpoch(masterEpoch + 1);
                 response.setSyncStateSetEpoch(syncStateSetEpoch);
                 BrokerMemberGroup brokerMemberGroup = 
buildBrokerMemberGroup(brokerName);
@@ -247,12 +183,19 @@ public class ReplicasInfoManager {
                     response.setBrokerMemberGroup(brokerMemberGroup);
                     result.setBody(brokerMemberGroup.encode());
                 }
-                final ElectMasterEvent event = new 
ElectMasterEvent(brokerName, candidate);
+                final ElectMasterEvent event = new 
ElectMasterEvent(brokerName, newMaster);
                 result.addEvent(event);
-                return true;
+                return result;
             }
+            // If elect failed, we still need to apply an ElectMasterEvent to 
tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, 
brokerName);
+            result.addEvent(event);
+            
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Failed 
to elect a new broker master");
+            return result;
         }
-        return false;
+        result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, 
"Broker metadata is not existed");
+        return result;
     }
 
     private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
@@ -269,7 +212,7 @@ public class ReplicasInfoManager {
     }
 
     public ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerBroker(
-            final RegisterBrokerToControllerRequestHeader request) {
+        final RegisterBrokerToControllerRequestHeader request) {
         final String brokerName = request.getBrokerName();
         final String brokerAddress = request.getBrokerAddress();
         final ControllerResult<RegisterBrokerToControllerResponseHeader> 
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 5b583b493..95d4c2b10 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -114,7 +114,7 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
                     final RegisterBrokerToControllerResponseHeader 
responseHeader = (RegisterBrokerToControllerResponseHeader) 
response.readCustomHeader();
                     if (responseHeader != null && responseHeader.getBrokerId() 
>= 0) {
                         
this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), 
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                                responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
+                            responseHeader.getBrokerId(), 
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(), 
controllerRequest.getEpoch(), controllerRequest.getMaxOffset());
                     }
                     return response;
                 }
@@ -133,7 +133,8 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
             }
             case BROKER_HEARTBEAT: {
                 final BrokerHeartbeatRequestHeader requestHeader = 
(BrokerHeartbeatRequestHeader) 
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), 
requestHeader.getBrokerAddr());
+                
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), 
requestHeader.getBrokerAddr(),
+                        requestHeader.getEpoch(), 
requestHeader.getMaxOffset(), requestHeader.getConfirmOffset());
                 return 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat 
success");
             }
             case CONTROLLER_GET_SYNC_STATE_DATA: {
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 83a936c49..2ea427854 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -153,7 +153,7 @@ public class ControllerManagerTest {
 
         // Send heartbeat for broker2
         ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
-        executor.scheduleAtFixedRate(()-> {
+        executor.scheduleAtFixedRate(() -> {
             final BrokerHeartbeatRequestHeader heartbeatRequestHeader = new 
BrokerHeartbeatRequestHeader();
             heartbeatRequestHeader.setClusterName("cluster1");
             heartbeatRequestHeader.setBrokerName("broker1");
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 146fd9352..dce3167ef 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
 import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -175,11 +176,22 @@ public class DLedgerControllerTest {
         });
     }
 
+    public void setBrokerElectPolicy(DLedgerController controller, String... 
deathBroker) {
+        controller.setElectPolicy(new DefaultElectPolicy((clusterName, 
brokerAddress) -> {
+            for (String broker : deathBroker) {
+                if (broker.equals(brokerAddress)) {
+                    return false;
+                }
+            }
+            return true;
+        }, null));
+    }
+
     @Test
     public void testElectMaster() throws Exception {
         final DLedgerController leader = mockMetaData(false);
         final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
-        setBrokerAlivePredicate(leader, "127.0.0.1:9000");
+        setBrokerElectPolicy(leader, "127.0.0.1:9000");
         final RemotingCommand resp = leader.electMaster(request).get(10, 
TimeUnit.SECONDS);
         final ElectMasterResponseHeader response = (ElectMasterResponseHeader) 
resp.readCustomHeader();
         assertEquals(response.getMasterEpoch(), 2);
@@ -198,7 +210,7 @@ public class DLedgerControllerTest {
         // Now we trigger electMaster api, which means the old master is 
shutdown and want to elect a new master.
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, 
not more replicas can be elected as master, it will be failed.
         final ElectMasterRequestHeader electRequest = new 
ElectMasterRequestHeader("broker1");
-        setBrokerAlivePredicate(leader, "127.0.0.1:9000");
+        setBrokerElectPolicy(leader, "127.0.0.1:9000");
         leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
 
         final RemotingCommand resp = leader.getReplicaInfo(new 
GetReplicaInfoRequestHeader("broker1")).
@@ -238,7 +250,7 @@ public class DLedgerControllerTest {
         // However, event if the syncStateSet in statemachine is 
{"127.0.0.1:9000"}
         // the option {enableElectUncleanMaster = true}, so the controller 
sill can elect a new master
         final ElectMasterRequestHeader electRequest = new 
ElectMasterRequestHeader("broker1");
-        setBrokerAlivePredicate(leader, "127.0.0.1:9000");
+        setBrokerElectPolicy(leader, "127.0.0.1:9000");
         final CompletableFuture<RemotingCommand> future = 
leader.electMaster(electRequest);
         future.get(10, TimeUnit.SECONDS);
 
@@ -246,7 +258,7 @@ public class DLedgerControllerTest {
         final GetReplicaInfoResponseHeader replicaInfo = 
(GetReplicaInfoResponseHeader) resp.readCustomHeader();
         final SyncStateSet syncStateSet = 
RemotingSerializable.decode(resp.getBody(), SyncStateSet.class);
 
-         final HashSet<String> newSyncStateSet2 = new HashSet<>();
+        final HashSet<String> newSyncStateSet2 = new HashSet<>();
         newSyncStateSet2.add(replicaInfo.getMasterAddress());
         assertEquals(syncStateSet.getSyncStateSet(), newSyncStateSet2);
         assertNotEquals(replicaInfo.getMasterAddress(), "");
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index 5565e59bd..0f106cd71 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -44,7 +44,7 @@ public class DefaultBrokerHeartbeatManagerTest {
             System.out.println("Broker shutdown:" + brokerAddress);
             latch.countDown();
         });
-        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:7000", 1L, 3000L, null);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:7000", 1L, 3000L, null, 1, 1L);
         assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
         this.heartbeatManager.shutdown();
     }
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index ed47be2df..915e383b4 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -19,6 +19,7 @@ package 
org.apache.rocketmq.controller.impl.controller.impl.manager;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
@@ -30,11 +31,15 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
+import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
 import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -47,18 +52,30 @@ import static org.junit.Assert.assertTrue;
 public class ReplicasInfoManagerTest {
     private ReplicasInfoManager replicasInfoManager;
 
+    private DefaultBrokerHeartbeatManager heartbeatManager;
+
     @Before
     public void init() {
         final ControllerConfig config = new ControllerConfig();
         config.setEnableElectUncleanMaster(false);
+        config.setScanNotActiveBrokerInterval(300000000);
         this.replicasInfoManager = new ReplicasInfoManager(config);
+        this.heartbeatManager = new DefaultBrokerHeartbeatManager(config);
+        this.heartbeatManager.start();
+    }
+
+    @After
+    public void destroy() {
+        this.replicasInfoManager = null;
+        this.heartbeatManager.shutdown();
+        this.heartbeatManager = null;
     }
 
     public boolean registerNewBroker(String clusterName, String brokerName, 
String brokerAddress,
         boolean isFirstRegisteredBroker) {
         // Register new broker
         final RegisterBrokerToControllerRequestHeader registerRequest =
-            new RegisterBrokerToControllerRequestHeader(clusterName, 
brokerName, brokerAddress);
+                new RegisterBrokerToControllerRequestHeader(clusterName, 
brokerName, brokerAddress);
         final ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerResult = this.replicasInfoManager.registerBroker(registerRequest);
         apply(registerResult.getEvents());
 
@@ -77,7 +94,7 @@ public class ReplicasInfoManagerTest {
     private boolean alterNewInSyncSet(String brokerName, String masterAddress, 
int masterEpoch,
         Set<String> newSyncStateSet, int syncStateSetEpoch) {
         final AlterSyncStateSetRequestHeader alterRequest =
-            new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
+                new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
         final ControllerResult<AlterSyncStateSetResponseHeader> result = 
this.replicasInfoManager.alterSyncStateSet(alterRequest, new 
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
         apply(result.getEvents());
 
@@ -107,11 +124,84 @@ public class ReplicasInfoManagerTest {
         assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, 
newSyncStateSet, 1));
     }
 
+    public void mockHeartbeatDataMasterStillAlive() {
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9000", 1L, 10000000000L, null,
+                1, 3L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+                1, 2L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+                1, 3L);
+    }
+
+    public void mockHeartbeatDataHigherEpoch() {
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
+                1, 3L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+                1, 2L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+                0, 3L);
+    }
+
+
+    public void mockHeartbeatDataHigherOffset() {
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
+                1, 3L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
+                1, 2L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
+                1, 3L);
+    }
+
+    @Test
+    public void testElectMasterOldMasterStillAlive() {
+        mockMetaData();
+        final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
+        ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
+        mockHeartbeatDataMasterStillAlive();
+        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
+                electPolicy);
+        assertEquals(ResponseCode.CONTROLLER_INVALID_REQUEST, 
cResult.getResponseCode());
+    }
+
+    @Test
+    public void testElectMasterPreferHigherEpoch() {
+        mockMetaData();
+        final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
+        ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
+        mockHeartbeatDataHigherEpoch();
+        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
+                electPolicy);
+        System.out.println(cResult.getResponseCode());
+        final ElectMasterResponseHeader response = cResult.getResponse();
+        System.out.println(response);
+        assertEquals(response.getMasterEpoch(), 2);
+        assertFalse(response.getNewMasterAddress().isEmpty());
+        assertEquals("127.0.0.1:9001", response.getNewMasterAddress());
+    }
+
+    @Test
+    public void testElectMasterPreferHigherOffsetWhenEpochEquals() {
+        mockMetaData();
+        final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
+        ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
+        mockHeartbeatDataHigherOffset();
+        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
+                electPolicy);
+        System.out.println(cResult.getResponseCode());
+        final ElectMasterResponseHeader response = cResult.getResponse();
+        System.out.println(response);
+        assertEquals(response.getMasterEpoch(), 2);
+        assertFalse(response.getNewMasterAddress().isEmpty());
+        assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+    }
+
+
     @Test
     public void testElectMaster() {
         mockMetaData();
         final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
-        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request, (clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"));
+        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
+                new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
         assertFalse(response.getNewMasterAddress().isEmpty());
@@ -121,20 +211,22 @@ public class ReplicasInfoManagerTest {
         brokerSet.add("127.0.0.1:9000");
         brokerSet.add("127.0.0.1:9001");
         brokerSet.add("127.0.0.1:9002");
-        final ElectMasterRequestHeader assignRequest = new 
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9000");
-        final ControllerResult<ElectMasterResponseHeader> cResult1 = 
this.replicasInfoManager.electMaster(assignRequest, (clusterName, 
brokerAddress) -> brokerAddress.contains("127.0.0.1:9000"));
-        assertEquals( cResult1.getResponseCode(), 
ResponseCode.CONTROLLER_INVALID_REQUEST);
-
+        final ElectMasterRequestHeader assignRequest = new 
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+        final ControllerResult<ElectMasterResponseHeader> cResult1 = 
this.replicasInfoManager.electMaster(assignRequest,
+                new DefaultElectPolicy((clusterName, brokerAddress) -> 
brokerAddress.contains("127.0.0.1:9000"), null));
+        assertEquals(cResult1.getResponseCode(), 
ResponseCode.CONTROLLER_INVALID_REQUEST);
 
-        final ElectMasterRequestHeader assignRequest1 = new 
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
-        final ControllerResult<ElectMasterResponseHeader> cResult2 = 
this.replicasInfoManager.electMaster(assignRequest1, (clusterName, 
brokerAddress) -> brokerAddress.equals("127.0.0.1:9000"));
-        assertEquals( cResult2.getResponseCode(), 
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+        final ElectMasterRequestHeader assignRequest1 = new 
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+        final ControllerResult<ElectMasterResponseHeader> cResult2 = 
this.replicasInfoManager.electMaster(assignRequest1,
+                new DefaultElectPolicy((clusterName, brokerAddress) -> 
brokerAddress.equals("127.0.0.1:9000"), null));
+        assertEquals(cResult2.getResponseCode(), 
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
 
-        final ElectMasterRequestHeader assignRequest2 = new 
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
-        final ControllerResult<ElectMasterResponseHeader> cResult3 = 
this.replicasInfoManager.electMaster(assignRequest2, (clusterName, 
brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
-        assertEquals( cResult3.getResponseCode(), ResponseCode.SUCCESS);
+        final ElectMasterRequestHeader assignRequest2 = new 
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+        final ControllerResult<ElectMasterResponseHeader> cResult3 = 
this.replicasInfoManager.electMaster(assignRequest2,
+                new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+        assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
         final ElectMasterResponseHeader response3 = cResult3.getResponse();
-        assertEquals(response3.getNewMasterAddress(),"127.0.0.1:9001");
+        assertEquals(response3.getNewMasterAddress(), "127.0.0.1:9001");
         assertEquals(response.getMasterEpoch(), 2);
         assertFalse(response.getNewMasterAddress().isEmpty());
         assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
@@ -151,7 +243,8 @@ public class ReplicasInfoManagerTest {
         // Now we trigger electMaster api, which means the old master is 
shutdown and want to elect a new master.
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, 
not more replicas can be elected as master, it will be failed.
         final ElectMasterRequestHeader electRequest = new 
ElectMasterRequestHeader("broker1");
-        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(electRequest, (clusterName, brokerAddress) 
-> !brokerAddress.equals("127.0.0.1:9000"));
+        final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(electRequest,
+                new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
         final List<EventMessage> events = cResult.getEvents();
         assertEquals(events.size(), 1);
         final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
@@ -164,4 +257,4 @@ public class ReplicasInfoManagerTest {
         assertEquals(replicaInfo.getMasterEpoch(), 2);
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 2ea22dc6d..0814e24ef 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -392,6 +392,10 @@ public class AutoSwitchHAService extends DefaultHAService {
         return reputFromOffset;
     }
 
+    public int getLastEpoch() {
+        return this.epochCache.lastEpoch();
+    }
+
     public List<EpochEntry> getEpochEntries() {
         return this.epochCache.getAllEntries();
     }

Reply via email to