This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 63a014b81 [ISSUE#6342] Local SyncStatSet sync to remote value when 
changeToMaster (#6352)
63a014b81 is described below

commit 63a014b810387d11c9eaddd66fdbd80d158076ab
Author: Ji Juntao <[email protected]>
AuthorDate: Thu Mar 16 11:48:04 2023 +0800

    [ISSUE#6342] Local SyncStatSet sync to remote value when changeToMaster 
(#6352)
    
    * fix ISSUE#6342
    
    * refactor
    
    * correct the syncStateSet in elect process.
    
    * move syncStateSet into request/response's body.
    
    * optimize the broker electing switch's branch.
---
 .../broker/controller/ReplicasManager.java         | 25 ++++---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 16 ++--
 .../broker/processor/AdminBrokerProcessor.java     |  4 +-
 .../controller/ReplicasManagerRegisterTest.java    | 31 ++++----
 .../broker/controller/ReplicasManagerTest.java     | 22 ++++--
 .../rocketmq/controller/ControllerManager.java     |  2 +
 .../impl/manager/ReplicasInfoManager.java          | 13 +++-
 .../protocol/body/ElectMasterResponseBody.java     | 86 ++++++++++++++++++++++
 .../protocol/body/RoleChangeNotifyEntry.java       | 20 ++++-
 .../controller/ElectMasterResponseHeader.java      |  1 +
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 10 +++
 11 files changed, 191 insertions(+), 39 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index d3a1c1fb8..9938a2aac 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -227,17 +227,17 @@ public class ReplicasManager {
     }
 
     public synchronized void changeBrokerRole(final Long newMasterBrokerId, 
final String newMasterAddress, final Integer newMasterEpoch,
-                                              final Integer syncStateSetEpoch) 
{
+                                              final Integer syncStateSetEpoch, 
final Set<Long> syncStateSet) {
         if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
             if (newMasterBrokerId.equals(this.brokerControllerId)) {
-                changeToMaster(newMasterEpoch, syncStateSetEpoch);
+                changeToMaster(newMasterEpoch, syncStateSetEpoch, 
syncStateSet);
             } else {
                 changeToSlave(newMasterAddress, newMasterEpoch, 
newMasterBrokerId);
             }
         }
     }
 
-    public void changeToMaster(final int newMasterEpoch, final int 
syncStateSetEpoch) {
+    public void changeToMaster(final int newMasterEpoch, final int 
syncStateSetEpoch, final Set<Long> syncStateSet) {
         synchronized (this) {
             if (newMasterEpoch > this.masterEpoch) {
                 LOGGER.info("Begin to change to master, brokerName:{}, 
replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), 
this.brokerAddress, newMasterEpoch);
@@ -245,8 +245,7 @@ public class ReplicasManager {
                 this.masterEpoch = newMasterEpoch;
 
                 // Change SyncStateSet
-                final HashSet<Long> newSyncStateSet = new HashSet<>();
-                newSyncStateSet.add(this.brokerControllerId);
+                final HashSet<Long> newSyncStateSet = new 
HashSet<>(syncStateSet);
                 changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
 
                 // Change record
@@ -365,8 +364,10 @@ public class ReplicasManager {
     private boolean brokerElect() {
         // Broker try to elect itself as a master in broker set.
         try {
-            ElectMasterResponseHeader tryElectResponse = 
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(),
+            Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair = 
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, 
this.brokerConfig.getBrokerClusterName(),
                     this.brokerConfig.getBrokerName(), 
this.brokerControllerId);
+            ElectMasterResponseHeader tryElectResponse = 
tryElectResponsePair.getObject1();
+            Set<Long> syncStateSet = tryElectResponsePair.getObject2();
             final String masterAddress = tryElectResponse.getMasterAddress();
             final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
             if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
@@ -375,7 +376,7 @@ public class ReplicasManager {
             }
 
             if (masterBrokerId.equals(this.brokerControllerId)) {
-                changeToMaster(tryElectResponse.getMasterEpoch(), 
tryElectResponse.getSyncStateSetEpoch());
+                changeToMaster(tryElectResponse.getMasterEpoch(), 
tryElectResponse.getSyncStateSetEpoch(), syncStateSet);
             } else {
                 changeToSlave(masterAddress, 
tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
             }
@@ -544,15 +545,17 @@ public class ReplicasManager {
      */
     private boolean registerBrokerToController() {
         try {
-            RegisterBrokerToControllerResponseHeader response = 
this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(),
 brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, 
controllerLeaderAddress);
-            if (response == null) return false;
+            Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> 
responsePair = 
this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(),
 brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, 
controllerLeaderAddress);
+            if (responsePair == null) return false;
+            RegisterBrokerToControllerResponseHeader response = 
responsePair.getObject1();
+            Set<Long> syncStateSet = responsePair.getObject2();
             final Long masterBrokerId = response.getMasterBrokerId();
             final String masterAddress = response.getMasterAddress();
             if (masterBrokerId == null) {
                 return true;
             }
             if (this.brokerControllerId.equals(masterBrokerId)) {
-                changeToMaster(response.getMasterEpoch(), 
response.getSyncStateSetEpoch());
+                changeToMaster(response.getMasterEpoch(), 
response.getSyncStateSetEpoch(), syncStateSet);
             } else {
                 changeToSlave(masterAddress, response.getMasterEpoch(), 
masterBrokerId);
             }
@@ -635,7 +638,7 @@ public class ReplicasManager {
                         if (StringUtils.isNoneEmpty(newMasterAddress) && 
masterBrokerId != null) {
                             if 
(masterBrokerId.equals(this.brokerControllerId)) {
                                 // If this broker is now the master
-                                changeToMaster(newMasterEpoch, 
syncStateSet.getSyncStateSetEpoch());
+                                changeToMaster(newMasterEpoch, 
syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet());
                             } else {
                                 // If this broker is now the slave, and master 
has been changed
                                 changeToSlave(newMasterAddress, 
newMasterEpoch, masterBrokerId);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4f298e1aa..144f05016 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -80,6 +80,7 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import 
org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
 import 
org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
@@ -1172,7 +1173,7 @@ public class BrokerOuterAPI {
     /**
      * Broker try to elect itself as a master in broker set
      */
-    public ElectMasterResponseHeader brokerElect(String controllerAddress, 
String clusterName, String brokerName,
+    public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String 
controllerAddress, String clusterName, String brokerName,
                                                  Long brokerId) throws 
Exception {
 
         final ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
@@ -1184,12 +1185,15 @@ public class BrokerOuterAPI {
                 throw new MQBrokerException(response.getCode(), "Controller 
leader was changed");
             }
             case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
-                throw new MQBrokerException(response.getCode(), 
response.getRemark());
             case CONTROLLER_ELECT_MASTER_FAILED:
+                throw new MQBrokerException(response.getCode(), 
response.getRemark());
             case CONTROLLER_MASTER_STILL_EXIST:
             case SUCCESS:
-                return (ElectMasterResponseHeader) 
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+                final ElectMasterResponseHeader responseHeader = 
(ElectMasterResponseHeader) 
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+                final ElectMasterResponseBody responseBody = 
RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class);
+                return new Pair<>(responseHeader, 
responseBody.getSyncStateSet());
         }
+
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
@@ -1215,13 +1219,15 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public RegisterBrokerToControllerResponseHeader 
registerBrokerToController(final String clusterName, final String brokerName, 
final Long brokerId, final String brokerAddress, final String 
controllerAddress) throws Exception {
+    public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> 
registerBrokerToController(final String clusterName, final String brokerName, 
final Long brokerId, final String brokerAddress, final String 
controllerAddress) throws Exception {
         final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, 
brokerAddress);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
         if (response.getCode() == SUCCESS) {
-            return (RegisterBrokerToControllerResponseHeader) 
response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
+            RegisterBrokerToControllerResponseHeader responseHeader = 
(RegisterBrokerToControllerResponseHeader) 
response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
+            Set<Long> syncStateSet = 
RemotingSerializable.decode(response.getBody(), 
SyncStateSet.class).getSyncStateSet();
+            return new Pair<>(responseHeader, syncStateSet);
         }
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f74ab330b..65e45e817 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -111,6 +111,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
 import 
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
@@ -2628,6 +2629,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         NotifyBrokerRoleChangedRequestHeader requestHeader = 
(NotifyBrokerRoleChangedRequestHeader) 
request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
+        SyncStateSet syncStateSetInfo = 
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
 
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
@@ -2635,7 +2637,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         final ReplicasManager replicasManager = 
this.brokerController.getReplicasManager();
         if (replicasManager != null) {
-            
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), 
requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), 
requestHeader.getSyncStateSetEpoch());
+            
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), 
requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), 
requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
index f0e82b638..7fb9d9aeb 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.UtilAll;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
@@ -45,6 +46,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.UUID;
 
 import static org.awaitility.Awaitility.await;
@@ -70,6 +73,7 @@ public class ReplicasManagerRegisterTest {
     public static final String CONTROLLER_ADDR = "127.0.0.1:8888";
 
     public static final BrokerConfig BROKER_CONFIG;
+    private final HashSet<Long> syncStateSet = new 
HashSet<>(Arrays.asList(1L));
 
     static {
         BROKER_CONFIG = new BrokerConfig();
@@ -122,10 +126,11 @@ public class ReplicasManagerRegisterTest {
 
     @Test
     public void testBrokerRegisterSuccess() throws Exception {
+
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
1L));
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), 
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
-        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new 
RegisterBrokerToControllerResponseHeader());
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(),  syncStateSet));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
 
         ReplicasManager replicasManager0 = new 
ReplicasManager(mockedBrokerController);
         replicasManager0.start();
@@ -144,8 +149,8 @@ public class ReplicasManagerRegisterTest {
     public void testBrokerRegisterSuccessAndRestartWithChangedBrokerConfig() 
throws Exception {
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
1L));
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), 
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
-        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new 
RegisterBrokerToControllerResponseHeader());
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(),  syncStateSet));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
 
         ReplicasManager replicasManager0 = new 
ReplicasManager(mockedBrokerController);
         replicasManager0.start();
@@ -196,8 +201,8 @@ public class ReplicasManagerRegisterTest {
         ReplicasManager replicasManager = new 
ReplicasManager(mockedBrokerController);
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
1L));
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), 
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
-        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new 
RegisterBrokerToControllerResponseHeader());
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(),  syncStateSet));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
         ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager);
         PowerMockito.doReturn(false).when(spyReplicasManager, 
"createTempMetadataFile", anyLong());
 
@@ -216,8 +221,8 @@ public class ReplicasManagerRegisterTest {
         ReplicasManager replicasManager = new 
ReplicasManager(mockedBrokerController);
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
1L));
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), 
any(), any())).thenThrow(new RuntimeException());
-        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new 
RegisterBrokerToControllerResponseHeader());
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(),  syncStateSet));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
 
         replicasManager.start();
 
@@ -236,8 +241,8 @@ public class ReplicasManagerRegisterTest {
         ReplicasManager replicasManager = new 
ReplicasManager(mockedBrokerController);
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
1L));
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), 
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
-        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new 
RegisterBrokerToControllerResponseHeader());
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(),  syncStateSet));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
 
         ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager);
         PowerMockito.doReturn(false).when(spyReplicasManager, 
"createMetadataFileAndDeleteTemp");
@@ -280,7 +285,7 @@ public class ReplicasManagerRegisterTest {
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
1L));
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), 
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
         when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenThrow(new RuntimeException());
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
 
         replicasManager.start();
 
@@ -299,7 +304,7 @@ public class ReplicasManagerRegisterTest {
         replicasManager.shutdown();
 
         Mockito.reset(mockedBrokerOuterAPI);
-        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 
1));
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), 
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, 
"127.0.0.1:13131", 1, 1), syncStateSet));
         when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn(
                 new GetMetaDataResponseHeader("default-group", "dledger-a", 
CONTROLLER_ADDR, true, CONTROLLER_ADDR));
         
when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
@@ -310,7 +315,7 @@ public class ReplicasManagerRegisterTest {
         when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 
2L));
         // because apply brokerId: 1 has succeeded, so next request which try 
to apply brokerId: 1 will be failed
         when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), eq(1L), any(), 
any())).thenThrow(new RuntimeException());
-        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new 
RegisterBrokerToControllerResponseHeader());
+        when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(),  syncStateSet));
         replicasManagerNew.start();
 
         Assert.assertEquals(ReplicasManager.State.RUNNING, 
replicasManagerNew.getState());
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index a5cf63d37..e03828cff 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.broker.controller;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -116,6 +118,10 @@ public class ReplicasManagerTest {
 
     private static final Long SYNC_STATE = 1L;
 
+    private static final HashSet<Long> SYNC_STATE_SET_1 = new 
HashSet<Long>(Arrays.asList(BROKER_ID_1));
+
+    private static final HashSet<Long> SYNC_STATE_SET_2 = new 
HashSet<Long>(Arrays.asList(BROKER_ID_2));
+
     @Before
     public void before() throws Exception {
         UtilAll.deleteFile(new File(STORE_BASE_PATH));
@@ -154,9 +160,9 @@ public class ReplicasManagerTest {
         when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
         when(brokerOuterAPI.getNextBrokerId(any(), any(), 
any())).thenReturn(getNextBrokerIdResponseHeader);
         when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), 
any())).thenReturn(applyBrokerIdResponseHeader);
-        when(brokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(registerBrokerToControllerResponseHeader);
+        when(brokerOuterAPI.registerBrokerToController(any(), any(), 
anyLong(), any(), any())).thenReturn(new Pair<>(new 
RegisterBrokerToControllerResponseHeader(), SYNC_STATE_SET_1));
         when(brokerOuterAPI.getReplicaInfo(any(), any())).thenReturn(result);
-        when(brokerOuterAPI.brokerElect(any(), any(), any(), 
any())).thenReturn(brokerTryElectResponseHeader);
+        when(brokerOuterAPI.brokerElect(any(), any(), any(), 
any())).thenReturn(new Pair<>(brokerTryElectResponseHeader, SYNC_STATE_SET_1));
         replicasManager = new ReplicasManager(brokerController);
         autoSwitchHAService.init(defaultMessageStore);
         replicasManager.start();
@@ -173,18 +179,24 @@ public class ReplicasManagerTest {
 
     @Test
     public void changeBrokerRoleTest() {
+        HashSet<Long> syncStateSetA = new HashSet<>();
+        syncStateSetA.add(BROKER_ID_1);
+        HashSet<Long> syncStateSetB = new HashSet<>();
+        syncStateSetA.add(BROKER_ID_2);
         // not equal to localAddress
-        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, 
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH))
+        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, 
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetB))
                 .doesNotThrowAnyException();
 
         // equal to localAddress
-        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, 
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH))
+        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, 
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetA))
                 .doesNotThrowAnyException();
     }
 
     @Test
     public void changeToMasterTest() {
-        Assertions.assertThatCode(() -> 
replicasManager.changeToMaster(NEW_MASTER_EPOCH, 
OLD_MASTER_EPOCH)).doesNotThrowAnyException();
+        HashSet<Long> syncStateSet = new HashSet<>();
+        syncStateSet.add(BROKER_ID_1);
+        Assertions.assertThatCode(() -> 
replicasManager.changeToMaster(NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, 
syncStateSet)).doesNotThrowAnyException();
     }
 
     @Test
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index c4fbf0c8d..a9949bde0 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -53,6 +53,7 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
@@ -188,6 +189,7 @@ public class ControllerManager {
             final NotifyBrokerRoleChangedRequestHeader requestHeader = new 
NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), 
entry.getMasterBrokerId(),
                     entry.getMasterEpoch(), entry.getSyncStateSetEpoch());
             final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, 
requestHeader);
+            request.setBody(new SyncStateSet(entry.getSyncStateSet(), 
entry.getSyncStateSetEpoch()).encode());
             try {
                 this.remotingClient.invokeOneway(brokerAddr, request, 3000);
             } catch (final Exception e) {
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 728cf87e2..103cb68e2 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
+import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -201,6 +202,8 @@ public class ReplicasInfoManager {
             
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
             response.setMasterBrokerId(oldMaster);
             
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster));
+
+            result.setBody(new ElectMasterResponseBody(syncStateSet).encode());
             
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
             return result;
         }
@@ -209,14 +212,21 @@ public class ReplicasInfoManager {
         if (newMaster != null) {
             final int masterEpoch = syncStateInfo.getMasterEpoch();
             final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
+            final HashSet<Long> newSyncStateSet = new HashSet<>();
+            newSyncStateSet.add(newMaster);
+
             response.setMasterBrokerId(newMaster);
             
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster));
             response.setMasterEpoch(masterEpoch + 1);
             response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
+            ElectMasterResponseBody responseBody = new 
ElectMasterResponseBody(newSyncStateSet);
+
             BrokerMemberGroup brokerMemberGroup = 
buildBrokerMemberGroup(brokerName);
             if (null != brokerMemberGroup) {
-                result.setBody(brokerMemberGroup.encode());
+                responseBody.setBrokerMemberGroup(brokerMemberGroup);
             }
+
+            result.setBody(responseBody.encode());
             final ElectMasterEvent event = new ElectMasterEvent(brokerName, 
newMaster);
             result.addEvent(event);
             return result;
@@ -315,6 +325,7 @@ public class ReplicasInfoManager {
             response.setMasterEpoch(syncStateInfo.getMasterEpoch());
             
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
         }
+        result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), 
syncStateInfo.getSyncStateSetEpoch()).encode());
         // if this broker's address has been changed, we need to update it
         if 
(!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) {
             final UpdateBrokerAddressEvent event = new 
UpdateBrokerAddressEvent(clusterName, brokerName, brokerAddress, brokerId);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java
new file mode 100644
index 000000000..8aef636fa
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import com.google.common.base.Objects;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ElectMasterResponseBody extends RemotingSerializable {
+    private BrokerMemberGroup brokerMemberGroup;
+    private Set<Long> syncStateSet;
+
+    // Provide default constructor for serializer
+    public ElectMasterResponseBody() {
+        this.syncStateSet = new HashSet<Long>();
+        this.brokerMemberGroup = null;
+    }
+
+    public ElectMasterResponseBody(final Set<Long> syncStateSet) {
+        this.syncStateSet = syncStateSet;
+        this.brokerMemberGroup = null;
+    }
+
+    public ElectMasterResponseBody(final BrokerMemberGroup brokerMemberGroup, 
final Set<Long> syncStateSet) {
+        this.brokerMemberGroup = brokerMemberGroup;
+        this.syncStateSet = syncStateSet;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ElectMasterResponseBody that = (ElectMasterResponseBody) o;
+        return Objects.equal(brokerMemberGroup, that.brokerMemberGroup) &&
+            Objects.equal(syncStateSet, that.syncStateSet);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(brokerMemberGroup, syncStateSet);
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerMemberGroup{" +
+            "brokerMemberGroup='" + brokerMemberGroup.toString() + '\'' +
+            ", syncStateSet='" + syncStateSet.toString() +
+            '}';
+    }
+
+    public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) {
+        this.brokerMemberGroup = brokerMemberGroup;
+    }
+
+    public BrokerMemberGroup getBrokerMemberGroup() {
+        return brokerMemberGroup;
+    }
+
+    public void setSyncStateSet(Set<Long> syncStateSet) {
+        this.syncStateSet = syncStateSet;
+    }
+
+    public Set<Long> getSyncStateSet() {
+        return syncStateSet;
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
index 4f3f31218..ab25df0d1 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 
+import java.util.Set;
+
 public class RoleChangeNotifyEntry {
 
     private final BrokerMemberGroup brokerMemberGroup;
@@ -34,21 +36,29 @@ public class RoleChangeNotifyEntry {
 
     private final int syncStateSetEpoch;
 
-    public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String 
masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch) {
+    private final Set<Long> syncStateSet;
+
+    public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String 
masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch, 
Set<Long> syncStateSet) {
         this.brokerMemberGroup = brokerMemberGroup;
         this.masterAddress = masterAddress;
         this.masterEpoch = masterEpoch;
         this.syncStateSetEpoch = syncStateSetEpoch;
         this.masterBrokerId = masterBrokerId;
+        this.syncStateSet = syncStateSet;
     }
 
     public static RoleChangeNotifyEntry convert(RemotingCommand 
electMasterResponse) {
         final ElectMasterResponseHeader header = (ElectMasterResponseHeader) 
electMasterResponse.readCustomHeader();
         BrokerMemberGroup brokerMemberGroup = null;
+        Set<Long> syncStateSet = null;
+
         if (electMasterResponse.getBody() != null && 
electMasterResponse.getBody().length > 0) {
-            brokerMemberGroup = 
RemotingSerializable.decode(electMasterResponse.getBody(), 
BrokerMemberGroup.class);
+            ElectMasterResponseBody body = 
RemotingSerializable.decode(electMasterResponse.getBody(), 
ElectMasterResponseBody.class);
+            brokerMemberGroup = body.getBrokerMemberGroup();
+            syncStateSet = body.getSyncStateSet();
         }
-        return new RoleChangeNotifyEntry(brokerMemberGroup, 
header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), 
header.getSyncStateSetEpoch());
+
+        return new RoleChangeNotifyEntry(brokerMemberGroup, 
header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), 
header.getSyncStateSetEpoch(), syncStateSet);
     }
 
 
@@ -71,4 +81,8 @@ public class RoleChangeNotifyEntry {
     public Long getMasterBrokerId() {
         return masterBrokerId;
     }
+
+    public Set<Long> getSyncStateSet() {
+        return syncStateSet;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index d3c897538..aaf3b10b8 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -19,6 +19,7 @@ package 
org.apache.rocketmq.remoting.protocol.header.controller;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
+
 public class ElectMasterResponseHeader implements CommandCustomHeader {
 
     private Long masterBrokerId;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 425861455..683018b67 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -246,6 +246,16 @@ public class AutoSwitchHAService extends DefaultHAService {
                 }
             }
         }
+
+        // If the slaveBrokerId is in syncStateSet but not in 
connectionCaughtUpTimeTable,
+        // it means that the broker has not connected.
+        for (Long slaveBrokerId : newSyncStateSet) {
+            if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
+                newSyncStateSet.remove(slaveBrokerId);
+                isSyncStateSetChanged = true;
+            }
+        }
+
         if (isSyncStateSetChanged) {
             markSynchronizingSyncStateSet(newSyncStateSet);
         }


Reply via email to