This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 63a014b81 [ISSUE#6342] Local SyncStatSet sync to remote value when
changeToMaster (#6352)
63a014b81 is described below
commit 63a014b810387d11c9eaddd66fdbd80d158076ab
Author: Ji Juntao <[email protected]>
AuthorDate: Thu Mar 16 11:48:04 2023 +0800
[ISSUE#6342] Local SyncStatSet sync to remote value when changeToMaster
(#6352)
* fix ISSUE#6342
* refactor
* correct the syncStateSet in elect process.
* move syncStateSet into request/response's body.
* optimize the broker electing switch's branch.
---
.../broker/controller/ReplicasManager.java | 25 ++++---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 16 ++--
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../controller/ReplicasManagerRegisterTest.java | 31 ++++----
.../broker/controller/ReplicasManagerTest.java | 22 ++++--
.../rocketmq/controller/ControllerManager.java | 2 +
.../impl/manager/ReplicasInfoManager.java | 13 +++-
.../protocol/body/ElectMasterResponseBody.java | 86 ++++++++++++++++++++++
.../protocol/body/RoleChangeNotifyEntry.java | 20 ++++-
.../controller/ElectMasterResponseHeader.java | 1 +
.../store/ha/autoswitch/AutoSwitchHAService.java | 10 +++
11 files changed, 191 insertions(+), 39 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index d3a1c1fb8..9938a2aac 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -227,17 +227,17 @@ public class ReplicasManager {
}
public synchronized void changeBrokerRole(final Long newMasterBrokerId,
final String newMasterAddress, final Integer newMasterEpoch,
- final Integer syncStateSetEpoch)
{
+ final Integer syncStateSetEpoch,
final Set<Long> syncStateSet) {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
- changeToMaster(newMasterEpoch, syncStateSetEpoch);
+ changeToMaster(newMasterEpoch, syncStateSetEpoch,
syncStateSet);
} else {
changeToSlave(newMasterAddress, newMasterEpoch,
newMasterBrokerId);
}
}
}
- public void changeToMaster(final int newMasterEpoch, final int
syncStateSetEpoch) {
+ public void changeToMaster(final int newMasterEpoch, final int
syncStateSetEpoch, final Set<Long> syncStateSet) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{},
replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(),
this.brokerAddress, newMasterEpoch);
@@ -245,8 +245,7 @@ public class ReplicasManager {
this.masterEpoch = newMasterEpoch;
// Change SyncStateSet
- final HashSet<Long> newSyncStateSet = new HashSet<>();
- newSyncStateSet.add(this.brokerControllerId);
+ final HashSet<Long> newSyncStateSet = new
HashSet<>(syncStateSet);
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
// Change record
@@ -365,8 +364,10 @@ public class ReplicasManager {
private boolean brokerElect() {
// Broker try to elect itself as a master in broker set.
try {
- ElectMasterResponseHeader tryElectResponse =
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(),
+ Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair =
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.getBrokerName(),
this.brokerControllerId);
+ ElectMasterResponseHeader tryElectResponse =
tryElectResponsePair.getObject1();
+ Set<Long> syncStateSet = tryElectResponsePair.getObject2();
final String masterAddress = tryElectResponse.getMasterAddress();
final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
@@ -375,7 +376,7 @@ public class ReplicasManager {
}
if (masterBrokerId.equals(this.brokerControllerId)) {
- changeToMaster(tryElectResponse.getMasterEpoch(),
tryElectResponse.getSyncStateSetEpoch());
+ changeToMaster(tryElectResponse.getMasterEpoch(),
tryElectResponse.getSyncStateSetEpoch(), syncStateSet);
} else {
changeToSlave(masterAddress,
tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
}
@@ -544,15 +545,17 @@ public class ReplicasManager {
*/
private boolean registerBrokerToController() {
try {
- RegisterBrokerToControllerResponseHeader response =
this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(), brokerControllerId, brokerAddress,
controllerLeaderAddress);
- if (response == null) return false;
+ Pair<RegisterBrokerToControllerResponseHeader, Set<Long>>
responsePair =
this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(), brokerControllerId, brokerAddress,
controllerLeaderAddress);
+ if (responsePair == null) return false;
+ RegisterBrokerToControllerResponseHeader response =
responsePair.getObject1();
+ Set<Long> syncStateSet = responsePair.getObject2();
final Long masterBrokerId = response.getMasterBrokerId();
final String masterAddress = response.getMasterAddress();
if (masterBrokerId == null) {
return true;
}
if (this.brokerControllerId.equals(masterBrokerId)) {
- changeToMaster(response.getMasterEpoch(),
response.getSyncStateSetEpoch());
+ changeToMaster(response.getMasterEpoch(),
response.getSyncStateSetEpoch(), syncStateSet);
} else {
changeToSlave(masterAddress, response.getMasterEpoch(),
masterBrokerId);
}
@@ -635,7 +638,7 @@ public class ReplicasManager {
if (StringUtils.isNoneEmpty(newMasterAddress) &&
masterBrokerId != null) {
if
(masterBrokerId.equals(this.brokerControllerId)) {
// If this broker is now the master
- changeToMaster(newMasterEpoch,
syncStateSet.getSyncStateSetEpoch());
+ changeToMaster(newMasterEpoch,
syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet());
} else {
// If this broker is now the slave, and master
has been changed
changeToSlave(newMasterAddress,
newMasterEpoch, masterBrokerId);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4f298e1aa..144f05016 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -80,6 +80,7 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import
org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
import
org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
@@ -1172,7 +1173,7 @@ public class BrokerOuterAPI {
/**
* Broker try to elect itself as a master in broker set
*/
- public ElectMasterResponseHeader brokerElect(String controllerAddress,
String clusterName, String brokerName,
+ public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String
controllerAddress, String clusterName, String brokerName,
Long brokerId) throws
Exception {
final ElectMasterRequestHeader requestHeader =
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
@@ -1184,12 +1185,15 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), "Controller
leader was changed");
}
case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
- throw new MQBrokerException(response.getCode(),
response.getRemark());
case CONTROLLER_ELECT_MASTER_FAILED:
+ throw new MQBrokerException(response.getCode(),
response.getRemark());
case CONTROLLER_MASTER_STILL_EXIST:
case SUCCESS:
- return (ElectMasterResponseHeader)
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+ final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader)
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+ final ElectMasterResponseBody responseBody =
RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class);
+ return new Pair<>(responseHeader,
responseBody.getSyncStateSet());
}
+
throw new MQBrokerException(response.getCode(), response.getRemark());
}
@@ -1215,13 +1219,15 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public RegisterBrokerToControllerResponseHeader
registerBrokerToController(final String clusterName, final String brokerName,
final Long brokerId, final String brokerAddress, final String
controllerAddress) throws Exception {
+ public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>>
registerBrokerToController(final String clusterName, final String brokerName,
final Long brokerId, final String brokerAddress, final String
controllerAddress) throws Exception {
final RegisterBrokerToControllerRequestHeader requestHeader = new
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId,
brokerAddress);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
if (response.getCode() == SUCCESS) {
- return (RegisterBrokerToControllerResponseHeader)
response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
+ RegisterBrokerToControllerResponseHeader responseHeader =
(RegisterBrokerToControllerResponseHeader)
response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
+ Set<Long> syncStateSet =
RemotingSerializable.decode(response.getBody(),
SyncStateSet.class).getSyncStateSet();
+ return new Pair<>(responseHeader, syncStateSet);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f74ab330b..65e45e817 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -111,6 +111,7 @@ import
org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
@@ -2628,6 +2629,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
NotifyBrokerRoleChangedRequestHeader requestHeader =
(NotifyBrokerRoleChangedRequestHeader)
request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
+ SyncStateSet syncStateSetInfo =
RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -2635,7 +2637,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
final ReplicasManager replicasManager =
this.brokerController.getReplicasManager();
if (replicasManager != null) {
-
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(),
requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(),
requestHeader.getSyncStateSetEpoch());
+
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(),
requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(),
requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
index f0e82b638..7fb9d9aeb 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.UtilAll;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
@@ -45,6 +46,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.UUID;
import static org.awaitility.Awaitility.await;
@@ -70,6 +73,7 @@ public class ReplicasManagerRegisterTest {
public static final String CONTROLLER_ADDR = "127.0.0.1:8888";
public static final BrokerConfig BROKER_CONFIG;
+ private final HashSet<Long> syncStateSet = new
HashSet<>(Arrays.asList(1L));
static {
BROKER_CONFIG = new BrokerConfig();
@@ -122,10 +126,11 @@ public class ReplicasManagerRegisterTest {
@Test
public void testBrokerRegisterSuccess() throws Exception {
+
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
1L));
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(),
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
- when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new
RegisterBrokerToControllerResponseHeader());
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), syncStateSet));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
ReplicasManager replicasManager0 = new
ReplicasManager(mockedBrokerController);
replicasManager0.start();
@@ -144,8 +149,8 @@ public class ReplicasManagerRegisterTest {
public void testBrokerRegisterSuccessAndRestartWithChangedBrokerConfig()
throws Exception {
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
1L));
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(),
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
- when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new
RegisterBrokerToControllerResponseHeader());
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), syncStateSet));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
ReplicasManager replicasManager0 = new
ReplicasManager(mockedBrokerController);
replicasManager0.start();
@@ -196,8 +201,8 @@ public class ReplicasManagerRegisterTest {
ReplicasManager replicasManager = new
ReplicasManager(mockedBrokerController);
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
1L));
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(),
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
- when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new
RegisterBrokerToControllerResponseHeader());
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), syncStateSet));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager);
PowerMockito.doReturn(false).when(spyReplicasManager,
"createTempMetadataFile", anyLong());
@@ -216,8 +221,8 @@ public class ReplicasManagerRegisterTest {
ReplicasManager replicasManager = new
ReplicasManager(mockedBrokerController);
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
1L));
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(),
any(), any())).thenThrow(new RuntimeException());
- when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new
RegisterBrokerToControllerResponseHeader());
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), syncStateSet));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
replicasManager.start();
@@ -236,8 +241,8 @@ public class ReplicasManagerRegisterTest {
ReplicasManager replicasManager = new
ReplicasManager(mockedBrokerController);
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
1L));
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(),
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
- when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new
RegisterBrokerToControllerResponseHeader());
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), syncStateSet));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager);
PowerMockito.doReturn(false).when(spyReplicasManager,
"createMetadataFileAndDeleteTemp");
@@ -280,7 +285,7 @@ public class ReplicasManagerRegisterTest {
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
1L));
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(),
any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenThrow(new RuntimeException());
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
replicasManager.start();
@@ -299,7 +304,7 @@ public class ReplicasManagerRegisterTest {
replicasManager.shutdown();
Mockito.reset(mockedBrokerOuterAPI);
- when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1,
1));
+ when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(),
anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L,
"127.0.0.1:13131", 1, 1), syncStateSet));
when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn(
new GetMetaDataResponseHeader("default-group", "dledger-a",
CONTROLLER_ADDR, true, CONTROLLER_ADDR));
when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
@@ -310,7 +315,7 @@ public class ReplicasManagerRegisterTest {
when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME,
2L));
// because apply brokerId: 1 has succeeded, so next request which try
to apply brokerId: 1 will be failed
when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), eq(1L), any(),
any())).thenThrow(new RuntimeException());
- when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new
RegisterBrokerToControllerResponseHeader());
+ when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), syncStateSet));
replicasManagerNew.start();
Assert.assertEquals(ReplicasManager.State.RUNNING,
replicasManagerNew.getState());
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index a5cf63d37..e03828cff 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.broker.controller;
import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -116,6 +118,10 @@ public class ReplicasManagerTest {
private static final Long SYNC_STATE = 1L;
+ private static final HashSet<Long> SYNC_STATE_SET_1 = new
HashSet<Long>(Arrays.asList(BROKER_ID_1));
+
+ private static final HashSet<Long> SYNC_STATE_SET_2 = new
HashSet<Long>(Arrays.asList(BROKER_ID_2));
+
@Before
public void before() throws Exception {
UtilAll.deleteFile(new File(STORE_BASE_PATH));
@@ -154,9 +160,9 @@ public class ReplicasManagerTest {
when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
when(brokerOuterAPI.getNextBrokerId(any(), any(),
any())).thenReturn(getNextBrokerIdResponseHeader);
when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(),
any())).thenReturn(applyBrokerIdResponseHeader);
- when(brokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(registerBrokerToControllerResponseHeader);
+ when(brokerOuterAPI.registerBrokerToController(any(), any(),
anyLong(), any(), any())).thenReturn(new Pair<>(new
RegisterBrokerToControllerResponseHeader(), SYNC_STATE_SET_1));
when(brokerOuterAPI.getReplicaInfo(any(), any())).thenReturn(result);
- when(brokerOuterAPI.brokerElect(any(), any(), any(),
any())).thenReturn(brokerTryElectResponseHeader);
+ when(brokerOuterAPI.brokerElect(any(), any(), any(),
any())).thenReturn(new Pair<>(brokerTryElectResponseHeader, SYNC_STATE_SET_1));
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
replicasManager.start();
@@ -173,18 +179,24 @@ public class ReplicasManagerTest {
@Test
public void changeBrokerRoleTest() {
+ HashSet<Long> syncStateSetA = new HashSet<>();
+ syncStateSetA.add(BROKER_ID_1);
+ HashSet<Long> syncStateSetB = new HashSet<>();
+ syncStateSetA.add(BROKER_ID_2);
// not equal to localAddress
- Assertions.assertThatCode(() ->
replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS,
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH))
+ Assertions.assertThatCode(() ->
replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS,
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetB))
.doesNotThrowAnyException();
// equal to localAddress
- Assertions.assertThatCode(() ->
replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS,
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH))
+ Assertions.assertThatCode(() ->
replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS,
NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetA))
.doesNotThrowAnyException();
}
@Test
public void changeToMasterTest() {
- Assertions.assertThatCode(() ->
replicasManager.changeToMaster(NEW_MASTER_EPOCH,
OLD_MASTER_EPOCH)).doesNotThrowAnyException();
+ HashSet<Long> syncStateSet = new HashSet<>();
+ syncStateSet.add(BROKER_ID_1);
+ Assertions.assertThatCode(() ->
replicasManager.changeToMaster(NEW_MASTER_EPOCH, OLD_MASTER_EPOCH,
syncStateSet)).doesNotThrowAnyException();
}
@Test
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index c4fbf0c8d..a9949bde0 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -53,6 +53,7 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import
org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
@@ -188,6 +189,7 @@ public class ControllerManager {
final NotifyBrokerRoleChangedRequestHeader requestHeader = new
NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(),
entry.getMasterBrokerId(),
entry.getMasterEpoch(), entry.getSyncStateSetEpoch());
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED,
requestHeader);
+ request.setBody(new SyncStateSet(entry.getSyncStateSet(),
entry.getSyncStateSetEpoch()).encode());
try {
this.remotingClient.invokeOneway(brokerAddr, request, 3000);
} catch (final Exception e) {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 728cf87e2..103cb68e2 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
+import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -201,6 +202,8 @@ public class ReplicasInfoManager {
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
response.setMasterBrokerId(oldMaster);
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster));
+
+ result.setBody(new ElectMasterResponseBody(syncStateSet).encode());
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
return result;
}
@@ -209,14 +212,21 @@ public class ReplicasInfoManager {
if (newMaster != null) {
final int masterEpoch = syncStateInfo.getMasterEpoch();
final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
+ final HashSet<Long> newSyncStateSet = new HashSet<>();
+ newSyncStateSet.add(newMaster);
+
response.setMasterBrokerId(newMaster);
response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster));
response.setMasterEpoch(masterEpoch + 1);
response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
+ ElectMasterResponseBody responseBody = new
ElectMasterResponseBody(newSyncStateSet);
+
BrokerMemberGroup brokerMemberGroup =
buildBrokerMemberGroup(brokerName);
if (null != brokerMemberGroup) {
- result.setBody(brokerMemberGroup.encode());
+ responseBody.setBrokerMemberGroup(brokerMemberGroup);
}
+
+ result.setBody(responseBody.encode());
final ElectMasterEvent event = new ElectMasterEvent(brokerName,
newMaster);
result.addEvent(event);
return result;
@@ -315,6 +325,7 @@ public class ReplicasInfoManager {
response.setMasterEpoch(syncStateInfo.getMasterEpoch());
response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
}
+ result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(),
syncStateInfo.getSyncStateSetEpoch()).encode());
// if this broker's address has been changed, we need to update it
if
(!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) {
final UpdateBrokerAddressEvent event = new
UpdateBrokerAddressEvent(clusterName, brokerName, brokerAddress, brokerId);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java
new file mode 100644
index 000000000..8aef636fa
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import com.google.common.base.Objects;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ElectMasterResponseBody extends RemotingSerializable {
+ private BrokerMemberGroup brokerMemberGroup;
+ private Set<Long> syncStateSet;
+
+ // Provide default constructor for serializer
+ public ElectMasterResponseBody() {
+ this.syncStateSet = new HashSet<Long>();
+ this.brokerMemberGroup = null;
+ }
+
+ public ElectMasterResponseBody(final Set<Long> syncStateSet) {
+ this.syncStateSet = syncStateSet;
+ this.brokerMemberGroup = null;
+ }
+
+ public ElectMasterResponseBody(final BrokerMemberGroup brokerMemberGroup,
final Set<Long> syncStateSet) {
+ this.brokerMemberGroup = brokerMemberGroup;
+ this.syncStateSet = syncStateSet;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ElectMasterResponseBody that = (ElectMasterResponseBody) o;
+ return Objects.equal(brokerMemberGroup, that.brokerMemberGroup) &&
+ Objects.equal(syncStateSet, that.syncStateSet);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(brokerMemberGroup, syncStateSet);
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerMemberGroup{" +
+ "brokerMemberGroup='" + brokerMemberGroup.toString() + '\'' +
+ ", syncStateSet='" + syncStateSet.toString() +
+ '}';
+ }
+
+ public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) {
+ this.brokerMemberGroup = brokerMemberGroup;
+ }
+
+ public BrokerMemberGroup getBrokerMemberGroup() {
+ return brokerMemberGroup;
+ }
+
+ public void setSyncStateSet(Set<Long> syncStateSet) {
+ this.syncStateSet = syncStateSet;
+ }
+
+ public Set<Long> getSyncStateSet() {
+ return syncStateSet;
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
index 4f3f31218..ab25df0d1 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import java.util.Set;
+
public class RoleChangeNotifyEntry {
private final BrokerMemberGroup brokerMemberGroup;
@@ -34,21 +36,29 @@ public class RoleChangeNotifyEntry {
private final int syncStateSetEpoch;
- public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String
masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch) {
+ private final Set<Long> syncStateSet;
+
+ public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String
masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch,
Set<Long> syncStateSet) {
this.brokerMemberGroup = brokerMemberGroup;
this.masterAddress = masterAddress;
this.masterEpoch = masterEpoch;
this.syncStateSetEpoch = syncStateSetEpoch;
this.masterBrokerId = masterBrokerId;
+ this.syncStateSet = syncStateSet;
}
public static RoleChangeNotifyEntry convert(RemotingCommand
electMasterResponse) {
final ElectMasterResponseHeader header = (ElectMasterResponseHeader)
electMasterResponse.readCustomHeader();
BrokerMemberGroup brokerMemberGroup = null;
+ Set<Long> syncStateSet = null;
+
if (electMasterResponse.getBody() != null &&
electMasterResponse.getBody().length > 0) {
- brokerMemberGroup =
RemotingSerializable.decode(electMasterResponse.getBody(),
BrokerMemberGroup.class);
+ ElectMasterResponseBody body =
RemotingSerializable.decode(electMasterResponse.getBody(),
ElectMasterResponseBody.class);
+ brokerMemberGroup = body.getBrokerMemberGroup();
+ syncStateSet = body.getSyncStateSet();
}
- return new RoleChangeNotifyEntry(brokerMemberGroup,
header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(),
header.getSyncStateSetEpoch());
+
+ return new RoleChangeNotifyEntry(brokerMemberGroup,
header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(),
header.getSyncStateSetEpoch(), syncStateSet);
}
@@ -71,4 +81,8 @@ public class RoleChangeNotifyEntry {
public Long getMasterBrokerId() {
return masterBrokerId;
}
+
+ public Set<Long> getSyncStateSet() {
+ return syncStateSet;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index d3c897538..aaf3b10b8 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -19,6 +19,7 @@ package
org.apache.rocketmq.remoting.protocol.header.controller;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
public class ElectMasterResponseHeader implements CommandCustomHeader {
private Long masterBrokerId;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 425861455..683018b67 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -246,6 +246,16 @@ public class AutoSwitchHAService extends DefaultHAService {
}
}
}
+
+ // If the slaveBrokerId is in syncStateSet but not in
connectionCaughtUpTimeTable,
+ // it means that the broker has not connected.
+ for (Long slaveBrokerId : newSyncStateSet) {
+ if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
+ newSyncStateSet.remove(slaveBrokerId);
+ isSyncStateSetChanged = true;
+ }
+ }
+
if (isSyncStateSetChanged) {
markSynchronizingSyncStateSet(newSyncStateSet);
}