This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 3951411ef7a33e0018568242ea3bdcb73801b035 Author: TheR1sing3un <[email protected]> AuthorDate: Tue Feb 7 00:57:31 2023 +0800 fix(controller): fix some bug to pass AutoSwitchRoleIntegrationTest 1. fix some bug to pass AutoSwitchRoleIntegrationTest --- .../java/org/apache/rocketmq/broker/BrokerController.java | 2 +- .../apache/rocketmq/broker/controller/ReplicasManager.java | 4 ++++ .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 10 ++++------ .../org/apache/rocketmq/controller/ControllerManager.java | 9 ++++----- .../rocketmq/controller/impl/manager/ReplicasInfoManager.java | 3 ++- .../controller/processor/ControllerRequestProcessor.java | 5 ++--- .../remoting/protocol/body/RoleChangeNotifyEntry.java | 11 +++++++++-- .../protocol/header/controller/ElectMasterResponseHeader.java | 10 ---------- .../test/autoswitchrole/AutoSwitchRoleIntegrationTest.java | 2 -- .../org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java | 6 ++++-- .../apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 6 ++++-- .../main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java | 6 ++++-- .../tools/command/controller/ReElectMasterSubCommand.java | 7 +++++-- 13 files changed, 43 insertions(+), 38 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a96aa0405..bf9565770 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1747,7 +1747,7 @@ public class BrokerController { this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), + this.replicasManager.getBrokerId(), this.brokerConfig.getSendHeartbeatTimeoutMillis(), this.brokerConfig.isInBrokerContainer(), this.replicasManager.getLastEpoch(), this.messageStore.getMaxPhyOffset(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 4d2ed8e80..78f40495d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -772,4 +772,8 @@ public class ReplicasManager { public List<String> getAvailableControllerAddresses() { return new ArrayList<>(availableControllerAddresses.keySet()); } + + public Long getBrokerId() { + return brokerId; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index ab8880625..2c9790b29 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -3075,8 +3076,8 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { } } - public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName, - Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException { + public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, String brokerName, + Long brokerId) throws MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException { //get controller leader address final GetMetaDataResponseHeader controllerMetaData = this.getControllerMetaData(controllerAddr); @@ -3092,10 +3093,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { case ResponseCode.SUCCESS: { BrokerMemberGroup brokerMemberGroup = RemotingSerializable.decode(response.getBody(), BrokerMemberGroup.class); ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class); - if (null != responseHeader) { - responseHeader.setBrokerMemberGroup(brokerMemberGroup); - } - return responseHeader; + return new Pair<>(responseHeader, brokerMemberGroup); } default: break; diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 0f565ec81..96dcc6bc8 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -45,11 +45,11 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry; import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; @@ -131,11 +131,10 @@ public class ControllerManager { final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName)); final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS); - final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader(); - if (responseHeader != null) { - log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, responseHeader); + if (electMasterResponse.getCode() == ResponseCode.SUCCESS) { + log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, electMasterResponse); if (controllerConfig.isNotifyBrokerRoleChanged()) { - notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader)); + notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(electMasterResponse)); } } } catch (Exception e) { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index d2061bb24..5cf703c65 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -215,7 +215,6 @@ public class ReplicasInfoManager { response.setSyncStateSetEpoch(syncStateSetEpoch + 1); BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName); if (null != brokerMemberGroup) { - response.setBrokerMemberGroup(brokerMemberGroup); result.setBody(brokerMemberGroup.encode()); } final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); @@ -313,6 +312,8 @@ public class ReplicasInfoManager { // if master still exist response.setMasterBrokerId(syncStateInfo.getMasterBrokerId()); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(response.getMasterBrokerId())); + response.setMasterEpoch(syncStateInfo.getMasterEpoch()); + response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); } // if this broker's address has been changed, we need to update it if (!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java index 9e300b738..e5cdd926f 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java @@ -130,11 +130,10 @@ public class ControllerRequestProcessor implements NettyRequestProcessor { final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest); if (future != null) { final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); - final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader(); - if (response.getCode() == ResponseCode.SUCCESS && responseHeader != null) { + if (response.getCode() == ResponseCode.SUCCESS) { if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) { - this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader)); + this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(response)); } } return response; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java index 91f9e1e8d..4f3f31218 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.remoting.protocol.body; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; public class RoleChangeNotifyEntry { @@ -40,8 +42,13 @@ public class RoleChangeNotifyEntry { this.masterBrokerId = masterBrokerId; } - public static RoleChangeNotifyEntry convert(ElectMasterResponseHeader header) { - return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch()); + public static RoleChangeNotifyEntry convert(RemotingCommand electMasterResponse) { + final ElectMasterResponseHeader header = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader(); + BrokerMemberGroup brokerMemberGroup = null; + if (electMasterResponse.getBody() != null && electMasterResponse.getBody().length > 0) { + brokerMemberGroup = RemotingSerializable.decode(electMasterResponse.getBody(), BrokerMemberGroup.class); + } + return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch()); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index 658e2b592..b8a4c58cf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -26,7 +26,6 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { private String masterAddress; private Integer masterEpoch; private Integer syncStateSetEpoch; - private BrokerMemberGroup brokerMemberGroup; public ElectMasterResponseHeader() { } @@ -55,14 +54,6 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { this.syncStateSetEpoch = syncStateSetEpoch; } - public BrokerMemberGroup getBrokerMemberGroup() { - return brokerMemberGroup; - } - - public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) { - this.brokerMemberGroup = brokerMemberGroup; - } - public void setMasterBrokerId(Long masterBrokerId) { this.masterBrokerId = masterBrokerId; } @@ -78,7 +69,6 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { ", masterAddress='" + masterAddress + '\'' + ", masterEpoch=" + masterEpoch + ", syncStateSetEpoch=" + syncStateSetEpoch + - ", brokerMemberGroup=" + brokerMemberGroup + '}'; } diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java index d145fc516..71855a837 100644 --- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java +++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java @@ -179,9 +179,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase { BrokerController broker3 = startBroker(nameserverAddress, controllerAddress, brokerName, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE); waitSlaveReady(broker3.getMessageStore()); - checkMessage(broker3.getMessageStore(), topic, 10, 0); - putMessage(this.brokerController1.getMessageStore(), topic); checkMessage(broker3.getMessageStore(), topic, 20, 0); shutdownAndClearBroker(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index ce0c7a8a5..38fe063b2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageExt; @@ -41,6 +42,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; @@ -832,8 +834,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, - String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { + public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, + String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.electMaster(controllerAddr, clusterName, brokerName, brokerId); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index ac4b51c59..3c55e3f24 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -76,6 +77,7 @@ import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.TopicOffset; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; @@ -1843,8 +1845,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, - String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { + public Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, + String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr, clusterName, brokerName, brokerId); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index e8cb3e1f8..889974de8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageExt; @@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; @@ -461,8 +463,8 @@ public interface MQAdminExt extends MQAdmin { * @throws InterruptedException * @throws MQBrokerException */ - ElectMasterResponseHeader electMaster(String controllerAddr, String clusterName, String brokerName, - Long brokerId) throws RemotingException, InterruptedException, MQBrokerException; + Pair<ElectMasterResponseHeader, BrokerMemberGroup> electMaster(String controllerAddr, String clusterName, String brokerName, + Long brokerId) throws RemotingException, InterruptedException, MQBrokerException; /** * clean controller broker meta data diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java index 1861754c5..e45f307fa 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java @@ -20,8 +20,10 @@ package org.apache.rocketmq.tools.command.controller; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; @@ -72,13 +74,14 @@ public class ReElectMasterSubCommand implements SubCommand { try { defaultMQAdminExt.start(); - final ElectMasterResponseHeader metaData = defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, brokerId); + final Pair<ElectMasterResponseHeader, BrokerMemberGroup> pair = defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, brokerId); + final ElectMasterResponseHeader metaData = pair.getObject1(); + final BrokerMemberGroup brokerMemberGroup = pair.getObject2(); System.out.printf("\n#ClusterName\t%s", clusterName); System.out.printf("\n#BrokerName\t%s", brokerName); System.out.printf("\n#BrokerMasterAddr\t%s", metaData.getMasterAddress()); System.out.printf("\n#MasterEpoch\t%s", metaData.getMasterEpoch()); System.out.printf("\n#SyncStateSetEpoch\t%s\n", metaData.getSyncStateSetEpoch()); - BrokerMemberGroup brokerMemberGroup = metaData.getBrokerMemberGroup(); if (null != brokerMemberGroup && null != brokerMemberGroup.getBrokerAddrs()) { brokerMemberGroup.getBrokerAddrs().forEach((key, value) -> System.out.printf("\t#Broker\t%d\t%s\n", key, value)); }
