This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch dledger-controller-brokerId in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 88e00547f5254a6b7fd3031a00fd2d640b84228f Author: TheR1sing3un <[email protected]> AuthorDate: Fri Feb 3 23:18:47 2023 +0800 feat(controller): refactor broker's information recording core from ip address to broker id 1. refactor broker's information recording core from ip address to broker id --- .../org/apache/rocketmq/common/BrokerAddrInfo.java | 43 ++-- .../controller/BrokerHeartbeatManager.java | 4 +- .../apache/rocketmq/controller/BrokerLiveInfo.java | 11 +- .../rocketmq/controller/elect/ElectPolicy.java | 9 +- .../controller/elect/impl/DefaultElectPolicy.java | 56 ++--- .../BrokerLiveInfoGetter.java} | 18 +- .../BrokerValidPredicate.java} | 20 +- .../impl/DefaultBrokerHeartbeatManager.java | 15 +- .../impl/event/AlterSyncStateSetEvent.java | 6 +- .../controller/impl/event/ApplyBrokerIdEvent.java | 21 +- .../impl/event/CleanBrokerDataEvent.java | 20 +- .../controller/impl/event/ElectMasterEvent.java | 24 +-- .../controller/impl/manager/BrokerReplicaInfo.java | 40 ++-- .../impl/manager/ReplicasInfoManager.java | 225 ++++++++++----------- .../controller/impl/manager/SyncStateInfo.java | 30 +-- .../namesrv/routeinfo/RouteInfoManager.java | 65 +++++- .../remoting/protocol/body/BrokerReplicasInfo.java | 48 +++-- .../remoting/protocol/body/SyncStateSet.java | 8 +- .../controller/AlterSyncStateSetRequestHeader.java | 22 +- .../CleanControllerBrokerDataRequestHeader.java | 14 +- .../controller/ElectMasterRequestHeader.java | 34 ++-- .../controller/ElectMasterResponseHeader.java | 24 +-- .../controller/GetReplicaInfoResponseHeader.java | 20 +- .../namesrv/BrokerHeartbeatRequestHeader.java | 2 +- 24 files changed, 436 insertions(+), 343 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java index cd122c83a..09dd36d0c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java @@ -16,27 +16,35 @@ */ package org.apache.rocketmq.common; +import java.util.Objects; + public class BrokerAddrInfo { private final String clusterName; - private final String brokerAddr; - private int hash; + private final String brokerName; + + private final Long brokerId; - public BrokerAddrInfo(String clusterName, String brokerAddr) { + public BrokerAddrInfo(String clusterName, String brokerName, Long brokerId) { this.clusterName = clusterName; - this.brokerAddr = brokerAddr; + this.brokerName = brokerName; + this.brokerId = brokerId; } public String getClusterName() { return clusterName; } - public String getBrokerAddr() { - return brokerAddr; + public Long getBrokerId() { + return brokerId; + } + + public String getBrokerName() { + return brokerName; } public boolean isEmpty() { - return clusterName.isEmpty() && brokerAddr.isEmpty(); + return clusterName.isEmpty() && brokerName.isEmpty() && brokerId == null; } @Override @@ -50,29 +58,22 @@ public class BrokerAddrInfo { if (obj instanceof BrokerAddrInfo) { BrokerAddrInfo addr = (BrokerAddrInfo) obj; - return clusterName.equals(addr.clusterName) && brokerAddr.equals(addr.brokerAddr); + return clusterName.equals(addr.clusterName) && brokerName.equals(addr.brokerName) && brokerId == addr.brokerId; } return false; } @Override public int hashCode() { - int h = hash; - if (h == 0 && clusterName.length() + brokerAddr.length() > 0) { - for (int i = 0; i < clusterName.length(); i++) { - h = 31 * h + clusterName.charAt(i); - } - h = 31 * h + '_'; - for (int i = 0; i < brokerAddr.length(); i++) { - h = 31 * h + brokerAddr.charAt(i); - } - hash = h; - } - return h; + return Objects.hash(this.clusterName, this.brokerName, this.brokerId); } @Override public String toString() { - return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" + brokerAddr + "]"; + return "BrokerAddrInfo{" + + "clusterName='" + clusterName + '\'' + + ", brokerName='" + brokerName + '\'' + + ", brokerId=" + brokerId + + '}'; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java index 7d9b78e8c..81e3cf31c 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java @@ -50,12 +50,12 @@ public interface BrokerHeartbeatManager { /** * Get broker live information by clusterName and brokerAddr */ - BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerAddr); + BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerName, Long brokerId); /** * Check whether broker active */ - boolean isBrokerActive(final String clusterName, final String brokerAddr); + boolean isBrokerActive(final String clusterName, final String brokerName, final Long brokerId); interface BrokerLifecycleListener { /** diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java index eb33b98a6..9fdd6c937 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java @@ -21,9 +21,9 @@ import io.netty.channel.Channel; public class BrokerLiveInfo { private final String brokerName; - private final String brokerAddr; + private String brokerAddr; private long heartbeatTimeoutMillis; - private final Channel channel; + private Channel channel; private long brokerId; private long lastUpdateTimestamp; private int epoch; @@ -141,4 +141,11 @@ public class BrokerLiveInfo { return confirmOffset; } + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java index aba8f5538..8e4e75a2c 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java @@ -24,13 +24,14 @@ public interface ElectPolicy { /** * elect a master * - * @param clusterName the brokerGroup belongs + * @param clusterName the broker group belongs to + * @param brokerName the broker group name * @param syncStateBrokers all broker replicas in syncStateSet * @param allReplicaBrokers all broker replicas * @param oldMaster old master - * @param brokerAddr broker address(can be used as prefer or assigned in some elect policy) - * @return new master's brokerAddr + * @param brokerId broker id(can be used as prefer or assigned in some elect policy) + * @return new master's broker id */ - String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String brokerAddr); + Long elect(String clusterName, String brokerName, Set<Long> syncStateBrokers, Set<Long> allReplicaBrokers, Long oldMaster, Long brokerId); } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java index 00cac1627..e7423675a 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.controller.elect.impl; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.controller.elect.ElectPolicy; import org.apache.rocketmq.controller.BrokerLiveInfo; +import org.apache.rocketmq.controller.helper.BrokerLiveInfoGetter; +import org.apache.rocketmq.controller.helper.BrokerValidPredicate; import java.util.Comparator; import java.util.Set; @@ -29,12 +31,12 @@ import java.util.stream.Collectors; public class DefaultElectPolicy implements ElectPolicy { - // <clusterName, brokerAddr>, Used to judge whether a broker + // <clusterName, brokerName, brokerAddr>, Used to judge whether a broker // has preliminary qualification to be selected as master - private BiPredicate<String, String> validPredicate; + private BrokerValidPredicate validPredicate; - // <clusterName, brokerAddr, BrokerLiveInfo>, Used to obtain the BrokerLiveInfo information of a broker - private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter; + // <clusterName, brokerName, brokerAddr, BrokerLiveInfo>, Used to obtain the BrokerLiveInfo information of a broker + private BrokerLiveInfoGetter brokerLiveInfoGetter; // Sort in descending order according to<epoch, offset>, and sort in ascending order according to priority private final Comparator<BrokerLiveInfo> comparator = (o1, o2) -> { @@ -46,9 +48,9 @@ public class DefaultElectPolicy implements ElectPolicy { } }; - public DefaultElectPolicy(BiPredicate<String, String> validPredicate, BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) { + public DefaultElectPolicy(BrokerValidPredicate validPredicate, BrokerLiveInfoGetter brokerLiveInfoGetter) { this.validPredicate = validPredicate; - this.additionalInfoGetter = additionalInfoGetter; + this.brokerLiveInfoGetter = brokerLiveInfoGetter; } public DefaultElectPolicy() { @@ -66,50 +68,50 @@ public class DefaultElectPolicy implements ElectPolicy { * @param clusterName the brokerGroup belongs * @param syncStateBrokers all broker replicas in syncStateSet * @param allReplicaBrokers all broker replicas - * @param oldMaster old master - * @param preferBrokerAddr the broker prefer to be elected + * @param oldMaster old master's broker id + * @param preferBrokerId the broker id prefer to be elected * @return master elected by our own policy */ @Override - public String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String preferBrokerAddr) { - String newMaster = null; + public Long elect(String clusterName, String brokerName, Set<Long> syncStateBrokers, Set<Long> allReplicaBrokers, Long oldMaster, Long preferBrokerId) { + Long newMaster = null; // try to elect in syncStateBrokers if (syncStateBrokers != null) { - newMaster = tryElect(clusterName, syncStateBrokers, oldMaster, preferBrokerAddr); + newMaster = tryElect(clusterName, brokerName, syncStateBrokers, oldMaster, preferBrokerId); } - if (StringUtils.isNotEmpty(newMaster)) { + if (newMaster != null) { return newMaster; } // try to elect in all allReplicaBrokers if (allReplicaBrokers != null) { - newMaster = tryElect(clusterName, allReplicaBrokers, oldMaster, preferBrokerAddr); + newMaster = tryElect(clusterName, brokerName, allReplicaBrokers, oldMaster, preferBrokerId); } return newMaster; } - private String tryElect(String clusterName, Set<String> brokers, String oldMaster, String preferBrokerAddr) { + private Long tryElect(String clusterName, String brokerName, Set<Long> brokers, Long oldMaster, Long preferBrokerId) { if (this.validPredicate != null) { - brokers = brokers.stream().filter(brokerAddr -> this.validPredicate.test(clusterName, brokerAddr)).collect(Collectors.toSet()); + brokers = brokers.stream().filter(brokerAddr -> this.validPredicate.check(clusterName, brokerName, brokerAddr)).collect(Collectors.toSet()); } if (!brokers.isEmpty()) { // if old master is still valid, and preferBrokerAddr is blank or is equals to oldMaster - if (brokers.contains(oldMaster) && (StringUtils.isBlank(preferBrokerAddr) || preferBrokerAddr.equals(oldMaster))) { + if (brokers.contains(oldMaster) && (preferBrokerId == null || preferBrokerId == oldMaster)) { return oldMaster; } // if preferBrokerAddr is valid, we choose it, otherwise we choose nothing - if (StringUtils.isNotBlank(preferBrokerAddr)) { - return brokers.contains(preferBrokerAddr) ? preferBrokerAddr : null; + if (preferBrokerId != null) { + return brokers.contains(preferBrokerId) ? preferBrokerId : null; } - if (this.additionalInfoGetter != null) { + if (this.brokerLiveInfoGetter != null) { // sort brokerLiveInfos by (epoch,maxOffset) TreeSet<BrokerLiveInfo> brokerLiveInfos = new TreeSet<>(this.comparator); - brokers.forEach(brokerAddr -> brokerLiveInfos.add(this.additionalInfoGetter.apply(clusterName, brokerAddr))); + brokers.forEach(brokerAddr -> brokerLiveInfos.add(this.brokerLiveInfoGetter.get(clusterName, brokerName, brokerAddr))); if (brokerLiveInfos.size() >= 1) { - return brokerLiveInfos.first().getBrokerAddr(); + return brokerLiveInfos.first().getBrokerId(); } } // elect random @@ -123,15 +125,15 @@ public class DefaultElectPolicy implements ElectPolicy { return additionalInfoGetter; } - public void setAdditionalInfoGetter(BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) { - this.additionalInfoGetter = additionalInfoGetter; + public void setBrokerLiveInfoGetter(BrokerLiveInfoGetter brokerLiveInfoGetter) { + this.brokerLiveInfoGetter = brokerLiveInfoGetter; } - public BiPredicate<String, String> getValidPredicate() { - return validPredicate; + public void setValidPredicate(BrokerValidPredicate validPredicate) { + this.validPredicate = validPredicate; } - public void setValidPredicate(BiPredicate<String, String> validPredicate) { - this.validPredicate = validPredicate; + public BrokerLiveInfoGetter getBrokerLiveInfoGetter() { + return brokerLiveInfoGetter; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLiveInfoGetter.java similarity index 56% copy from controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java copy to controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLiveInfoGetter.java index aba8f5538..4a302cdbd 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLiveInfoGetter.java @@ -14,23 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.controller.elect; +package org.apache.rocketmq.controller.helper; -import java.util.Set; +import org.apache.rocketmq.controller.BrokerLiveInfo; -public interface ElectPolicy { +public interface BrokerLiveInfoGetter { - /** - * elect a master - * - * @param clusterName the brokerGroup belongs - * @param syncStateBrokers all broker replicas in syncStateSet - * @param allReplicaBrokers all broker replicas - * @param oldMaster old master - * @param brokerAddr broker address(can be used as prefer or assigned in some elect policy) - * @return new master's brokerAddr - */ - String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String brokerAddr); + BrokerLiveInfo get(String clusterName, String brokerName, Long brokerId); } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerValidPredicate.java similarity index 55% copy from controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java copy to controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerValidPredicate.java index aba8f5538..d8c6a2f65 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerValidPredicate.java @@ -14,23 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.controller.elect; +package org.apache.rocketmq.controller.helper; +public interface BrokerValidPredicate { -import java.util.Set; - -public interface ElectPolicy { - - /** - * elect a master - * - * @param clusterName the brokerGroup belongs - * @param syncStateBrokers all broker replicas in syncStateSet - * @param allReplicaBrokers all broker replicas - * @param oldMaster old master - * @param brokerAddr broker address(can be used as prefer or assigned in some elect policy) - * @return new master's brokerAddr - */ - String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String brokerAddr); - + boolean check(String clusterName, String brokerName, Long brokerId); } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java index 2a5610c56..3045da85e 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java @@ -99,9 +99,10 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { this.brokerLifecycleListeners.add(listener); } - @Override public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, + @Override + public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { - BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr); + BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo); int realEpoch = Optional.ofNullable(epoch).orElse(-1); long realBrokerId = Optional.ofNullable(brokerId).orElse(-1L); @@ -126,6 +127,8 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { prev.setHeartbeatTimeoutMillis(realTimeoutMillis); prev.setElectionPriority(realElectionPriority); prev.setBrokerId(realBrokerId); + prev.setBrokerAddr(brokerAddr); + prev.setChannel(channel); if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset()) { prev.setEpoch(realEpoch); prev.setMaxOffset(realMaxOffset); @@ -153,13 +156,13 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager { } @Override - public BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerAddr) { - return this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerAddr)); + public BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerName, Long brokerId) { + return this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerName, brokerId)); } @Override - public boolean isBrokerActive(String clusterName, String brokerAddr) { - final BrokerLiveInfo info = this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerAddr)); + public boolean isBrokerActive(String clusterName, String brokerName, Long brokerId) { + final BrokerLiveInfo info = this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerName, brokerId)); if (info != null) { long last = info.getLastUpdateTimestamp(); long timeoutMillis = info.getHeartbeatTimeoutMillis(); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java index 2342e0e99..6af44b722 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java @@ -26,9 +26,9 @@ import java.util.Set; public class AlterSyncStateSetEvent implements EventMessage { private final String brokerName; - private final Set<String/*Address*/> newSyncStateSet; + private final Set<Long/*BrokerId*/> newSyncStateSet; - public AlterSyncStateSetEvent(String brokerName, Set<String> newSyncStateSet) { + public AlterSyncStateSetEvent(String brokerName, Set<Long> newSyncStateSet) { this.brokerName = brokerName; this.newSyncStateSet = new HashSet<>(newSyncStateSet); } @@ -42,7 +42,7 @@ public class AlterSyncStateSetEvent implements EventMessage { return brokerName; } - public Set<String> getNewSyncStateSet() { + public Set<Long> getNewSyncStateSet() { return new HashSet<>(newSyncStateSet); } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java index c4934d7c0..a0bf001c4 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java @@ -24,13 +24,17 @@ public class ApplyBrokerIdEvent implements EventMessage { private final String clusterName; private final String brokerName; private final String brokerAddress; + + private final String registerCheckCode; + private final long newBrokerId; - public ApplyBrokerIdEvent(String clusterName, String brokerName, String brokerAddress, long newBrokerId) { + public ApplyBrokerIdEvent(String clusterName, String brokerName, String brokerAddress, long newBrokerId, String registerCheckCode) { this.clusterName = clusterName; this.brokerName = brokerName; this.brokerAddress = brokerAddress; this.newBrokerId = newBrokerId; + this.registerCheckCode = registerCheckCode; } @Override @@ -54,13 +58,18 @@ public class ApplyBrokerIdEvent implements EventMessage { return clusterName; } + public String getRegisterCheckCode() { + return registerCheckCode; + } + @Override public String toString() { return "ApplyBrokerIdEvent{" + - "clusterName='" + clusterName + '\'' + - ", brokerName='" + brokerName + '\'' + - ", brokerAddress='" + brokerAddress + '\'' + - ", newBrokerId=" + newBrokerId + - '}'; + "clusterName='" + clusterName + '\'' + + ", brokerName='" + brokerName + '\'' + + ", brokerAddress='" + brokerAddress + '\'' + + ", registerCheckCode='" + registerCheckCode + '\'' + + ", newBrokerId=" + newBrokerId + + '}'; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java index 4678f90c4..e639e27e4 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java @@ -23,11 +23,11 @@ public class CleanBrokerDataEvent implements EventMessage { private String brokerName; - private Set<String> brokerAddressSet; + private Set<Long> brokerIdSetToClean; - public CleanBrokerDataEvent(String brokerName, Set<String> brokerAddressSet) { + public CleanBrokerDataEvent(String brokerName, Set<Long> brokerIdSetToClean) { this.brokerName = brokerName; - this.brokerAddressSet = brokerAddressSet; + this.brokerIdSetToClean = brokerIdSetToClean; } public String getBrokerName() { @@ -38,12 +38,12 @@ public class CleanBrokerDataEvent implements EventMessage { this.brokerName = brokerName; } - public Set<String> getBrokerAddressSet() { - return brokerAddressSet; + public void setBrokerIdSetToClean(Set<Long> brokerIdSetToClean) { + this.brokerIdSetToClean = brokerIdSetToClean; } - public void setBrokerAddressSet(Set<String> brokerAddressSet) { - this.brokerAddressSet = brokerAddressSet; + public Set<Long> getBrokerIdSetToClean() { + return brokerIdSetToClean; } /** @@ -57,8 +57,8 @@ public class CleanBrokerDataEvent implements EventMessage { @Override public String toString() { return "CleanBrokerDataEvent{" + - "brokerName='" + brokerName + '\'' + - ", brokerAddressSet=" + brokerAddressSet + - '}'; + "brokerName='" + brokerName + '\'' + + ", brokerIdSetToClean=" + brokerIdSetToClean + + '}'; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java index 970b5d8cd..71a56bdce 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java @@ -24,20 +24,20 @@ public class ElectMasterEvent implements EventMessage { // Mark whether a new master was elected. private final boolean newMasterElected; private final String brokerName; - private final String newMasterAddress; + private final Long newMasterBrokerId; public ElectMasterEvent(boolean newMasterElected, String brokerName) { - this(newMasterElected, brokerName, ""); + this(newMasterElected, brokerName, null); } - public ElectMasterEvent(String brokerName, String newMasterAddress) { - this(true, brokerName, newMasterAddress); + public ElectMasterEvent(String brokerName, Long newMasterBrokerId) { + this(true, brokerName, newMasterBrokerId); } - public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress) { + public ElectMasterEvent(boolean newMasterElected, String brokerName, Long newMasterBrokerId) { this.newMasterElected = newMasterElected; this.brokerName = brokerName; - this.newMasterAddress = newMasterAddress; + this.newMasterBrokerId = newMasterBrokerId; } @Override @@ -53,16 +53,16 @@ public class ElectMasterEvent implements EventMessage { return brokerName; } - public String getNewMasterAddress() { - return newMasterAddress; + public Long getNewMasterBrokerId() { + return newMasterBrokerId; } @Override public String toString() { return "ElectMasterEvent{" + - "newMasterElected=" + newMasterElected + - ", brokerName='" + brokerName + '\'' + - ", newMasterAddress='" + newMasterAddress + '\'' + - '}'; + "newMasterElected=" + newMasterElected + + ", brokerName='" + brokerName + '\'' + + ", newMasterBrokerId=" + newMasterBrokerId + + '}'; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java index e2a68a544..bc60c8b54 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java @@ -21,26 +21,30 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; /** * Broker replicas info, mapping from brokerAddress to {brokerId, brokerHaAddress}. */ public class BrokerReplicaInfo { private final String clusterName; + private final String brokerName; + // Start from 1 private final AtomicLong nextAssignBrokerId; - private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable; + + private final HashMap<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo; public BrokerReplicaInfo(String clusterName, String brokerName) { this.clusterName = clusterName; this.brokerName = brokerName; - this.brokerIdTable = new HashMap<>(); this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_SLAVE_ID); + this.brokerIdInfo = new HashMap<>(); } - public void removeBrokerAddress(final String address) { - this.brokerIdTable.remove(address); + public void removeBrokerId(final Long brokerId) { + this.brokerIdInfo.remove(brokerId); } public long newBrokerId() { @@ -55,26 +59,30 @@ public class BrokerReplicaInfo { return brokerName; } - public void addBroker(final String address, final Long brokerId) { - this.brokerIdTable.put(address, brokerId); + public void addBroker(final Long brokerId, final String ipAddress, final String registerCheckCode) { + this.brokerIdInfo.put(brokerId, new Pair<>(ipAddress, registerCheckCode)); } - public boolean isBrokerExist(final String address) { - return this.brokerIdTable.containsKey(address); + public boolean isBrokerExist(final Long brokerId) { + return this.brokerIdInfo.containsKey(brokerId); } - public Set<String> getAllBroker() { - return new HashSet<>(this.brokerIdTable.keySet()); + public Set<Long> getAllBroker() { + return new HashSet<>(this.brokerIdInfo.keySet()); } - public HashMap<String, Long> getBrokerIdTable() { - return new HashMap<>(this.brokerIdTable); + public HashMap<Long, String> getBrokerIdTable() { + HashMap<Long/*brokerId*/, String/*address*/> map = new HashMap<>(this.brokerIdInfo.size()); + this.brokerIdInfo.forEach((id, pair) -> { + map.put(id, pair.getObject1()); + }); + return map; } - public Long getBrokerId(final String address) { - if (this.brokerIdTable.containsKey(address)) { - return this.brokerIdTable.get(address); + public String getBrokerAddress(final Long brokerId) { + if (this.brokerIdInfo.containsKey(brokerId)) { + return this.brokerIdInfo.get(brokerId).getObject1(); } - return -1L; + return null; } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index dc0339d0c..c915131bf 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -25,11 +25,13 @@ import java.util.Set; import java.util.function.BiPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.ControllerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.controller.elect.ElectPolicy; +import org.apache.rocketmq.controller.helper.BrokerValidPredicate; import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent; import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent; import org.apache.rocketmq.controller.impl.event.CleanBrokerDataEvent; @@ -71,92 +73,92 @@ public class ReplicasInfoManager { } public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet( - final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, - final BiPredicate<String, String> brokerAlivePredicate) { + final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, + final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader()); final AlterSyncStateSetResponseHeader response = result.getResponse(); - if (isContainsBroker(brokerName)) { - final Set<String> newSyncStateSet = syncStateSet.getSyncStateSet(); - final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); - final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); + if (!isContainsBroker(brokerName)) { + result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, "Broker metadata is not existed"); + return result; + } + final Set<Long> newSyncStateSet = syncStateSet.getSyncStateSet(); + final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); + final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); - // Check whether the oldSyncStateSet is equal with newSyncStateSet - final Set<String> oldSyncStateSet = syncStateInfo.getSyncStateSet(); - if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { - String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; - LOGGER.warn("{}", err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, err); - return result; - } + // Check whether the oldSyncStateSet is equal with newSyncStateSet + final Set<Long> oldSyncStateSet = syncStateInfo.getSyncStateSet(); + if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { + String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; + LOGGER.warn("{}", err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, err); + return result; + } - // Check master - if (!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) { - String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", - syncStateInfo.getMasterAddress(), request.getMasterAddress()); - LOGGER.error("{}", err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err); - return result; - } + // Check master + if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { + String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", + syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); + LOGGER.error("{}", err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err); + return result; + } - // Check master epoch - if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { - String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", + // Check master epoch + if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { + String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); - LOGGER.error("{}", err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err); - return result; - } + LOGGER.error("{}", err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err); + return result; + } - // Check syncStateSet epoch - if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { - String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", + // Check syncStateSet epoch + if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { + String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); + LOGGER.error("{}", err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH, err); + return result; + } + + // Check newSyncStateSet correctness + for (Long replica : newSyncStateSet) { + if (!brokerReplicaInfo.isBrokerExist(replica)) { + String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); LOGGER.error("{}", err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH, err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REPLICAS, err); return result; } - - // Check newSyncStateSet correctness - for (String replicas : newSyncStateSet) { - if (!brokerReplicaInfo.isBrokerExist(replicas)) { - String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replicas); - LOGGER.error("{}", err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REPLICAS, err); - return result; - } - if (!brokerAlivePredicate.test(brokerReplicaInfo.getClusterName(), replicas)) { - String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replicas); - LOGGER.error(err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE, err); - return result; - } - } - - if (!newSyncStateSet.contains(syncStateInfo.getMasterAddress())) { - String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterAddress()); + if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { + String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); LOGGER.error(err); - result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE, err); return result; } + } - // Generate event - int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; - response.setNewSyncStateSetEpoch(epoch); - result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode()); - final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); - result.addEvent(event); + if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { + String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); + LOGGER.error(err); + result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, err); return result; } - result.setCodeAndRemark(ResponseCode.CONTROLLER_ALTER_SYNC_STATE_SET_FAILED, "Broker metadata is not existed"); + + // Generate event + int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; + response.setNewSyncStateSetEpoch(epoch); + result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode()); + final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); + result.addEvent(event); return result; } public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request, - final ElectPolicy electPolicy) { + final ElectPolicy electPolicy) { final String brokerName = request.getBrokerName(); - final String brokerAddress = request.getBrokerAddress(); + final Long brokerId = request.getBrokerId(); final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader()); final ElectMasterResponseHeader response = result.getResponse(); if (!isContainsBroker(brokerName)) { @@ -168,45 +170,45 @@ public class ReplicasInfoManager { final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); - final Set<String> syncStateSet = syncStateInfo.getSyncStateSet(); - final String oldMaster = syncStateInfo.getMasterAddress(); - Set<String> allReplicaBrokers = controllerConfig.isEnableElectUncleanMaster() ? brokerReplicaInfo.getAllBroker() : null; - String newMaster = null; + final Set<Long> syncStateSet = syncStateInfo.getSyncStateSet(); + final Long oldMaster = syncStateInfo.getMasterBrokerId(); + Set<Long> allReplicaBrokers = controllerConfig.isEnableElectUncleanMaster() ? brokerReplicaInfo.getAllBroker() : null; + Long newMaster = null; if (syncStateInfo.isFirstTimeForElect()) { // If never have a master in this broker set, in other words, it is the first time to elect a master // elect it as the first master - newMaster = brokerAddress; + newMaster = brokerId; } // elect by policy if (newMaster == null) { // we should assign this assignedBrokerAddr when the brokerAddress need to be elected by force - String assignedBrokerAddr = request.isForceElect() ? brokerAddress : null; - newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerAddr); + Long assignedBrokerId = request.isForceElect() ? brokerId : null; + newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } - if (StringUtils.isNotEmpty(newMaster) && newMaster.equals(oldMaster)) { + if (newMaster != null && newMaster.equals(oldMaster)) { // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setMasterEpoch(syncStateInfo.getMasterEpoch()); response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); - response.setMasterAddress(syncStateInfo.getMasterAddress()); - response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress())); + response.setMasterBrokerId(oldMaster); + response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster)); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected - if (StringUtils.isNotEmpty(newMaster)) { + if (newMaster != null) { final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); - response.setMasterAddress(newMaster); + response.setMasterBrokerId(newMaster); + response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster)); response.setMasterEpoch(masterEpoch + 1); response.setSyncStateSetEpoch(syncStateSetEpoch + 1); - response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress())); BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName); if (null != brokerMemberGroup) { response.setBrokerMemberGroup(brokerMemberGroup); @@ -219,7 +221,7 @@ public class ReplicasInfoManager { // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. - if (request.getBrokerAddress() == null) { + if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); @@ -234,9 +236,9 @@ public class ReplicasInfoManager { if (isContainsBroker(brokerName)) { final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerName); - final HashMap<String, Long> brokerIdTable = brokerReplicaInfo.getBrokerIdTable(); + final HashMap<Long, String> brokerIdTable = brokerReplicaInfo.getBrokerIdTable(); final HashMap<Long, String> memberGroup = new HashMap<>(); - brokerIdTable.forEach((addr, id) -> memberGroup.put(id, addr)); + brokerIdTable.forEach((id, addr) -> memberGroup.put(id, addr)); group.setBrokerAddrs(memberGroup); return group; } @@ -244,7 +246,7 @@ public class ReplicasInfoManager { } public ControllerResult<RegisterBrokerToControllerResponseHeader> registerBroker( - final RegisterBrokerToControllerRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) { + final RegisterBrokerToControllerRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) { String brokerAddress = request.getBrokerAddress(); final String brokerName = request.getBrokerName(); final String clusterName = request.getClusterName(); @@ -295,12 +297,10 @@ public class ReplicasInfoManager { // If exist broker metadata, just return metadata final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); - final String masterAddress = syncStateInfo.getMasterAddress(); - response.setMasterAddress(masterAddress); + final Long masterBrokerId = syncStateInfo.getMasterBrokerId(); + response.setMasterBrokerId(masterBrokerId); + response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(masterBrokerId)); response.setMasterEpoch(syncStateInfo.getMasterEpoch()); - if (StringUtils.isNotEmpty(request.getBrokerAddress())) { - response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress())); - } result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode()); return result; } @@ -316,21 +316,20 @@ public class ReplicasInfoManager { // If exist broker metadata, just return metadata final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); - final Set<String> syncStateSet = syncStateInfo.getSyncStateSet(); - final String master = syncStateInfo.getMasterAddress(); + final Set<Long> syncStateSet = syncStateInfo.getSyncStateSet(); + final Long masterBrokerId = syncStateInfo.getMasterBrokerId(); final ArrayList<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas = new ArrayList<>(); final ArrayList<BrokerReplicasInfo.ReplicaIdentity> notInSyncReplicas = new ArrayList<>(); - brokerReplicaInfo.getBrokerIdTable().forEach((brokerAddress, brokerId) -> { - if (syncStateSet.contains(brokerAddress)) { - long id = StringUtils.equals(master, brokerAddress) ? MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(brokerAddress); - inSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id)); + brokerReplicaInfo.getBrokerIdTable().forEach((brokerId, brokerAddress) -> { + if (syncStateSet.contains(brokerId)) { + inSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress)); } else { - notInSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId)); + notInSyncReplicas.add(new BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress)); } }); - final BrokerReplicasInfo.ReplicasInfo inSyncState = new BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas); + final BrokerReplicasInfo.ReplicasInfo inSyncState = new BrokerReplicasInfo.ReplicasInfo(masterBrokerId, brokerReplicaInfo.getBrokerAddress(masterBrokerId), syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas); brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState); } } @@ -339,27 +338,27 @@ public class ReplicasInfoManager { } public ControllerResult<Void> cleanBrokerData(final CleanControllerBrokerDataRequestHeader requestHeader, - final BiPredicate<String, String> brokerAlivePredicate) { + final BrokerValidPredicate validPredicate) { final ControllerResult<Void> result = new ControllerResult<>(); final String clusterName = requestHeader.getClusterName(); final String brokerName = requestHeader.getBrokerName(); - final String brokerAddrs = requestHeader.getBrokerAddress(); + final String brokerIdSetToClean = requestHeader.getBrokerIdSetToClean(); - Set<String> brokerAddressSet = null; + Set<Long> brokerIdSet = null; if (!requestHeader.isCleanLivingBroker()) { //if SyncStateInfo.masterAddress is not empty, at least one broker with the same BrokerName is alive SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); - if (StringUtils.isBlank(brokerAddrs) && null != syncStateInfo && StringUtils.isNotEmpty(syncStateInfo.getMasterAddress())) { + if (StringUtils.isBlank(brokerIdSetToClean) && null != syncStateInfo && syncStateInfo.getMasterBrokerId() != null) { String remark = String.format("Broker %s is still alive, clean up failure", requestHeader.getBrokerName()); result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, remark); return result; } - if (StringUtils.isNotBlank(brokerAddrs)) { - brokerAddressSet = Stream.of(brokerAddrs.split(";")).collect(Collectors.toSet()); - for (String brokerAddr : brokerAddressSet) { - if (brokerAlivePredicate.test(clusterName, brokerAddr)) { - String remark = String.format("Broker [%s, %s] is still alive, clean up failure", requestHeader.getBrokerName(), brokerAddr); + if (StringUtils.isNotBlank(brokerIdSetToClean)) { + brokerIdSet = Stream.of(brokerIdSetToClean.split(";")).map(idStr -> Long.valueOf(idStr)).collect(Collectors.toSet()); + for (Long brokerId : brokerIdSet) { + if (validPredicate.check(clusterName, brokerName, brokerId)) { + String remark = String.format("Broker [%s, %s] is still alive, clean up failure", requestHeader.getBrokerName(), brokerId); result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, remark); return result; } @@ -367,7 +366,7 @@ public class ReplicasInfoManager { } } if (isContainsBroker(brokerName)) { - final CleanBrokerDataEvent event = new CleanBrokerDataEvent(brokerName, brokerAddressSet); + final CleanBrokerDataEvent event = new CleanBrokerDataEvent(brokerName, brokerIdSet); result.addEvent(event); return result; } @@ -412,8 +411,8 @@ public class ReplicasInfoManager { final String brokerName = event.getBrokerName(); if (isContainsBroker(brokerName)) { final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); - if (!brokerReplicaInfo.isBrokerExist(event.getBrokerAddress())) { - brokerReplicaInfo.addBroker(event.getBrokerAddress(), event.getNewBrokerId()); + if (!brokerReplicaInfo.isBrokerExist(event.getNewBrokerId())) { + brokerReplicaInfo.addBroker(event.getNewBrokerId(), event.getBrokerAddress(), event.getRegisterCheckCode()); } } else { // First time to register in this broker set @@ -421,7 +420,7 @@ public class ReplicasInfoManager { final String clusterName = event.getClusterName(); final BrokerReplicaInfo brokerReplicaInfo = new BrokerReplicaInfo(clusterName, brokerName); long brokerId = brokerReplicaInfo.newBrokerId(); - brokerReplicaInfo.addBroker(event.getBrokerAddress(), brokerId); + brokerReplicaInfo.addBroker(brokerId, event.getBrokerAddress(), event.getRegisterCheckCode()); this.replicaInfoTable.put(brokerName, brokerReplicaInfo); final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName); // Initialize an empty syncStateInfo for this broker set @@ -431,7 +430,7 @@ public class ReplicasInfoManager { private void handleElectMaster(final ElectMasterEvent event) { final String brokerName = event.getBrokerName(); - final String newMaster = event.getNewMasterAddress(); + final Long newMaster = event.getNewMasterBrokerId(); if (isContainsBroker(brokerName)) { final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); @@ -440,13 +439,13 @@ public class ReplicasInfoManager { syncStateInfo.updateMasterInfo(newMaster); // Record new newSyncStateSet list - final HashSet<String> newSyncStateSet = new HashSet<>(); + final HashSet<Long> newSyncStateSet = new HashSet<>(); newSyncStateSet.add(newMaster); syncStateInfo.updateSyncStateSetInfo(newSyncStateSet); } else { // If new master was not elected, which means old master was shutdown and the newSyncStateSet list had no more replicas // So we should delete old master, but retain newSyncStateSet list. - syncStateInfo.updateMasterInfo(""); + syncStateInfo.updateMasterInfo(null); } return; } @@ -456,9 +455,9 @@ public class ReplicasInfoManager { private void handleCleanBrokerDataEvent(final CleanBrokerDataEvent event) { final String brokerName = event.getBrokerName(); - final Set<String> brokerAddressSet = event.getBrokerAddressSet(); + final Set<Long> brokerIdSetToClean = event.getBrokerIdSetToClean(); - if (null == brokerAddressSet || brokerAddressSet.isEmpty()) { + if (null == brokerIdSetToClean || brokerIdSetToClean.isEmpty()) { this.replicaInfoTable.remove(brokerName); this.syncStateSetInfoTable.remove(brokerName); return; @@ -468,9 +467,9 @@ public class ReplicasInfoManager { } final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); - for (String brokerAddress : brokerAddressSet) { - brokerReplicaInfo.removeBrokerAddress(brokerAddress); - syncStateInfo.removeSyncState(brokerAddress); + for (Long brokerId : brokerIdSetToClean) { + brokerReplicaInfo.removeBrokerId(brokerId); + syncStateInfo.removeFromSyncState(brokerId); } if (brokerReplicaInfo.getBrokerIdTable().isEmpty()) { this.replicaInfoTable.remove(brokerName); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java index 29570b5ea..0951df93a 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.controller.impl.manager; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import org.apache.commons.lang3.StringUtils; /** * Manages the syncStateSet of broker replicas. @@ -27,10 +26,11 @@ import org.apache.commons.lang3.StringUtils; public class SyncStateInfo { private final String clusterName; private final String brokerName; - private Set<String/*Address*/> syncStateSet; + + private Set<Long/*brokerId*/> syncStateSet; private int syncStateSetEpoch; - private String masterAddress; + private Long masterBrokerId; private int masterEpoch; public SyncStateInfo(String clusterName, String brokerName) { @@ -42,23 +42,23 @@ public class SyncStateInfo { } - public SyncStateInfo(String clusterName, String brokerName, String masterAddress) { + public SyncStateInfo(String clusterName, String brokerName, Long masterBrokerId) { this.clusterName = clusterName; this.brokerName = brokerName; - this.masterAddress = masterAddress; + this.masterBrokerId = masterBrokerId; this.masterEpoch = 1; this.syncStateSet = new HashSet<>(); - this.syncStateSet.add(masterAddress); + this.syncStateSet.add(masterBrokerId); this.syncStateSetEpoch = 1; } - public void updateMasterInfo(String masterAddress) { - this.masterAddress = masterAddress; + public void updateMasterInfo(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; this.masterEpoch++; } - public void updateSyncStateSetInfo(Set<String> newSyncStateSet) { + public void updateSyncStateSetInfo(Set<Long> newSyncStateSet) { this.syncStateSet = new HashSet<>(newSyncStateSet); this.syncStateSetEpoch++; } @@ -68,7 +68,7 @@ public class SyncStateInfo { } public boolean isMasterExist() { - return StringUtils.isNotEmpty(this.masterAddress); + return masterBrokerId != null; } public String getClusterName() { @@ -79,7 +79,7 @@ public class SyncStateInfo { return brokerName; } - public Set<String> getSyncStateSet() { + public Set<Long> getSyncStateSet() { return new HashSet<>(syncStateSet); } @@ -87,15 +87,15 @@ public class SyncStateInfo { return syncStateSetEpoch; } - public String getMasterAddress() { - return masterAddress; + public Long getMasterBrokerId() { + return masterBrokerId; } public int getMasterEpoch() { return masterEpoch; } - public void removeSyncState(final String address) { - syncStateSet.remove(address); + public void removeFromSyncState(final Long brokerId) { + syncStateSet.remove(brokerId); } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index fc36a8c8e..772cb7094 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.BrokerAddrInfo; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -1073,6 +1072,70 @@ public class RouteInfoManager { } } +/** + * broker address information + */ +class BrokerAddrInfo { + private String clusterName; + private String brokerAddr; + + private int hash; + + public BrokerAddrInfo(String clusterName, String brokerAddr) { + this.clusterName = clusterName; + this.brokerAddr = brokerAddr; + } + + public String getClusterName() { + return clusterName; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public boolean isEmpty() { + return clusterName.isEmpty() && brokerAddr.isEmpty(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + + if (obj instanceof BrokerAddrInfo) { + BrokerAddrInfo addr = (BrokerAddrInfo) obj; + return clusterName.equals(addr.clusterName) && brokerAddr.equals(addr.brokerAddr); + } + return false; + } + + @Override + public int hashCode() { + int h = hash; + if (h == 0 && clusterName.length() + brokerAddr.length() > 0) { + for (int i = 0; i < clusterName.length(); i++) { + h = 31 * h + clusterName.charAt(i); + } + h = 31 * h + '_'; + for (int i = 0; i < brokerAddr.length(); i++) { + h = 31 * h + brokerAddr.charAt(i); + } + hash = h; + } + return h; + } + + @Override + public String toString() { + return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" + brokerAddr + "]"; + } +} + class BrokerLiveInfo { private long lastUpdateTimestamp; private long heartbeatTimeoutMillis; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java index fece50d2e..c7e410b80 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java @@ -42,15 +42,19 @@ public class BrokerReplicasInfo extends RemotingSerializable { } public static class ReplicasInfo extends RemotingSerializable { + + private Long masterBrokerId; + private String masterAddress; private int masterEpoch; private int syncStateSetEpoch; private List<ReplicaIdentity> inSyncReplicas; private List<ReplicaIdentity> notInSyncReplicas; - public ReplicasInfo(String masterAddress, int masterEpoch, int syncStateSetEpoch, + public ReplicasInfo(Long masterBrokerId, String masterAddress, int masterEpoch, int syncStateSetEpoch, List<ReplicaIdentity> inSyncReplicas, List<ReplicaIdentity> notInSyncReplicas) { + this.masterBrokerId = masterBrokerId; this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; @@ -99,23 +103,42 @@ public class BrokerReplicasInfo extends RemotingSerializable { List<ReplicaIdentity> notInSyncReplicas) { this.notInSyncReplicas = notInSyncReplicas; } + + public void setMasterBrokerId(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; + } + + public Long getMasterBrokerId() { + return masterBrokerId; + } } public static class ReplicaIdentity extends RemotingSerializable { - private String address; + private String brokerName; private Long brokerId; - public ReplicaIdentity(String address, Long brokerId) { - this.address = address; + private String brokerAddress; + + public ReplicaIdentity(String brokerName, Long brokerId, String brokerAddress) { + this.brokerName = brokerName; this.brokerId = brokerId; + this.brokerAddress = brokerAddress; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; } - public String getAddress() { - return address; + public String getBrokerAddress() { + return brokerAddress; } - public void setAddress(String address) { - this.address = address; + public void setBrokerAddress(String brokerAddress) { + this.brokerAddress = brokerAddress; } public Long getBrokerId() { @@ -128,10 +151,11 @@ public class BrokerReplicasInfo extends RemotingSerializable { @Override public String toString() { - return "{" + - "address='" + address + '\'' + - ", brokerId=" + brokerId + - '}'; + return "ReplicaIdentity{" + + "brokerName='" + brokerName + '\'' + + ", brokerId=" + brokerId + + ", brokerAddress='" + brokerAddress + '\'' + + '}'; } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java index ced216d85..f0a71f8a9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SyncStateSet.java @@ -22,19 +22,19 @@ import java.util.Set; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class SyncStateSet extends RemotingSerializable { - private Set<String> syncStateSet; + private Set<Long> syncStateSet; private int syncStateSetEpoch; - public SyncStateSet(Set<String> syncStateSet, int syncStateSetEpoch) { + public SyncStateSet(Set<Long> syncStateSet, int syncStateSetEpoch) { this.syncStateSet = new HashSet<>(syncStateSet); this.syncStateSetEpoch = syncStateSetEpoch; } - public Set<String> getSyncStateSet() { + public Set<Long> getSyncStateSet() { return new HashSet<>(syncStateSet); } - public void setSyncStateSet(Set<String> syncStateSet) { + public void setSyncStateSet(Set<Long> syncStateSet) { this.syncStateSet = new HashSet<>(syncStateSet); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java index 3e01379f8..9fbf74e1f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/AlterSyncStateSetRequestHeader.java @@ -21,15 +21,15 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class AlterSyncStateSetRequestHeader implements CommandCustomHeader { private String brokerName; - private String masterAddress; + private Long masterBrokerId; private int masterEpoch; public AlterSyncStateSetRequestHeader() { } - public AlterSyncStateSetRequestHeader(String brokerName, String masterAddress, int masterEpoch) { + public AlterSyncStateSetRequestHeader(String brokerName, Long masterBrokerId, int masterEpoch) { this.brokerName = brokerName; - this.masterAddress = masterAddress; + this.masterBrokerId = masterBrokerId; this.masterEpoch = masterEpoch; } @@ -41,12 +41,12 @@ public class AlterSyncStateSetRequestHeader implements CommandCustomHeader { this.brokerName = brokerName; } - public String getMasterAddress() { - return masterAddress; + public Long getMasterBrokerId() { + return masterBrokerId; } - public void setMasterAddress(String masterAddress) { - this.masterAddress = masterAddress; + public void setMasterBrokerId(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; } public int getMasterEpoch() { @@ -60,10 +60,10 @@ public class AlterSyncStateSetRequestHeader implements CommandCustomHeader { @Override public String toString() { return "AlterSyncStateSetRequestHeader{" + - "brokerName='" + brokerName + '\'' + - ", masterAddress='" + masterAddress + '\'' + - ", masterEpoch=" + masterEpoch + - '}'; + "brokerName='" + brokerName + '\'' + + ", masterBrokerId=" + masterBrokerId + + ", masterEpoch=" + masterEpoch + + '}'; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java index 2c49c437c..b2d6640b7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/CleanControllerBrokerDataRequestHeader.java @@ -31,18 +31,18 @@ public class CleanControllerBrokerDataRequestHeader implements CommandCustomHead private String brokerName; @CFNullable - private String brokerAddress; + private String brokerIdSetToClean; private boolean isCleanLivingBroker = false; public CleanControllerBrokerDataRequestHeader() { } - public CleanControllerBrokerDataRequestHeader(String clusterName, String brokerName, String brokerAddress, + public CleanControllerBrokerDataRequestHeader(String clusterName, String brokerName, String brokerIdSetToClean, boolean isCleanLivingBroker) { this.clusterName = clusterName; this.brokerName = brokerName; - this.brokerAddress = brokerAddress; + this.brokerIdSetToClean = brokerIdSetToClean; this.isCleanLivingBroker = isCleanLivingBroker; } @@ -71,12 +71,12 @@ public class CleanControllerBrokerDataRequestHeader implements CommandCustomHead this.brokerName = brokerName; } - public String getBrokerAddress() { - return brokerAddress; + public String getBrokerIdSetToClean() { + return brokerIdSetToClean; } - public void setBrokerAddress(String brokerAddress) { - this.brokerAddress = brokerAddress; + public void setBrokerIdSetToClean(String brokerIdSetToClean) { + this.brokerIdSetToClean = brokerIdSetToClean; } public boolean isCleanLivingBroker() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java index bb9cfca3e..5db4f4c93 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java @@ -29,14 +29,14 @@ public class ElectMasterRequestHeader implements CommandCustomHeader { private String brokerName; /** - * brokerAddress - * for brokerTrigger electMaster: this brokerAddress will be elected as a master when it is the first time to elect + * brokerId + * for brokerTrigger electMaster: this brokerId will be elected as a master when it is the first time to elect * in this broker-set - * for adminTrigger electMaster: this brokerAddress is also named assignedBrokerAddress, which means we must prefer to elect + * for adminTrigger electMaster: this brokerAddress is also named assignedBrokerId, which means we must prefer to elect * it as a new master when this broker is valid. */ @CFNotNull - private String brokerAddress; + private Long brokerId; @CFNotNull private Boolean forceElect = false; @@ -48,30 +48,30 @@ public class ElectMasterRequestHeader implements CommandCustomHeader { this.brokerName = brokerName; } - public ElectMasterRequestHeader(String clusterName, String brokerName, String brokerAddress) { + public ElectMasterRequestHeader(String clusterName, String brokerName, Long brokerId) { this.clusterName = clusterName; this.brokerName = brokerName; - this.brokerAddress = brokerAddress; + this.brokerId = brokerId; } - public ElectMasterRequestHeader(String clusterName, String brokerName, String brokerAddress, boolean forceElect) { + public ElectMasterRequestHeader(String clusterName, String brokerName, Long brokerId, boolean forceElect) { this.clusterName = clusterName; this.brokerName = brokerName; - this.brokerAddress = brokerAddress; + this.brokerId = brokerId; this.forceElect = forceElect; } public static ElectMasterRequestHeader ofBrokerTrigger(String clusterName, String brokerName, - String brokerAddress) { - return new ElectMasterRequestHeader(clusterName, brokerName, brokerAddress); + Long brokerId) { + return new ElectMasterRequestHeader(clusterName, brokerName, brokerId); } public static ElectMasterRequestHeader ofControllerTrigger(String brokerName) { return new ElectMasterRequestHeader(brokerName); } - public static ElectMasterRequestHeader ofAdminTrigger(String clusterName, String brokerName, String brokerAddress) { - return new ElectMasterRequestHeader(clusterName, brokerName, brokerAddress, true); + public static ElectMasterRequestHeader ofAdminTrigger(String clusterName, String brokerName, Long brokerId) { + return new ElectMasterRequestHeader(clusterName, brokerName, brokerId, true); } public String getBrokerName() { @@ -82,12 +82,12 @@ public class ElectMasterRequestHeader implements CommandCustomHeader { this.brokerName = brokerName; } - public String getBrokerAddress() { - return brokerAddress; + public Long getBrokerId() { + return brokerId; } - public void setBrokerAddress(String brokerAddress) { - this.brokerAddress = brokerAddress; + public void setBrokerId(Long brokerId) { + this.brokerId = brokerId; } public String getClusterName() { @@ -107,7 +107,7 @@ public class ElectMasterRequestHeader implements CommandCustomHeader { return "ElectMasterRequestHeader{" + "clusterName='" + clusterName + '\'' + ", brokerName='" + brokerName + '\'' + - ", brokerAddress='" + brokerAddress + '\'' + + ", brokerId=" + brokerId + ", forceElect=" + forceElect + '}'; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index 811d64150..1544b37db 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -21,13 +21,13 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; public class ElectMasterResponseHeader implements CommandCustomHeader { + + private Long masterBrokerId; private String masterAddress; private int masterEpoch; private int syncStateSetEpoch; private BrokerMemberGroup brokerMemberGroup; - private long brokerId = -1; - public ElectMasterResponseHeader() { } @@ -63,23 +63,23 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { this.brokerMemberGroup = brokerMemberGroup; } - public long getBrokerId() { - return brokerId; + public void setMasterBrokerId(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; } - public void setBrokerId(long brokerId) { - this.brokerId = brokerId; + public Long getMasterBrokerId() { + return masterBrokerId; } @Override public String toString() { return "ElectMasterResponseHeader{" + - "masterAddress='" + masterAddress + '\'' + - ", masterEpoch=" + masterEpoch + - ", syncStateSetEpoch=" + syncStateSetEpoch + - ", brokerMemberGroup=" + brokerMemberGroup + - ", brokerId=" + brokerId + - '}'; + "masterBrokerId=" + masterBrokerId + + ", masterAddress='" + masterAddress + '\'' + + ", masterEpoch=" + masterEpoch + + ", syncStateSetEpoch=" + syncStateSetEpoch + + ", brokerMemberGroup=" + brokerMemberGroup + + '}'; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java index fc6d4ce2e..a7b6bbefa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/GetReplicaInfoResponseHeader.java @@ -20,10 +20,10 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; public class GetReplicaInfoResponseHeader implements CommandCustomHeader { + + private Long masterBrokerId; private String masterAddress; private int masterEpoch; - // BrokerId for current replicas. - private long brokerId = -1L; public GetReplicaInfoResponseHeader() { } @@ -44,21 +44,21 @@ public class GetReplicaInfoResponseHeader implements CommandCustomHeader { this.masterEpoch = masterEpoch; } - public long getBrokerId() { - return brokerId; + public Long getMasterBrokerId() { + return masterBrokerId; } - public void setBrokerId(long brokerId) { - this.brokerId = brokerId; + public void setMasterBrokerId(Long masterBrokerId) { + this.masterBrokerId = masterBrokerId; } @Override public String toString() { return "GetReplicaInfoResponseHeader{" + - "masterAddress='" + masterAddress + '\'' + - ", masterEpoch=" + masterEpoch + - ", brokerId=" + brokerId + - '}'; + "masterBrokerId=" + masterBrokerId + + ", masterAddress='" + masterAddress + '\'' + + ", masterEpoch=" + masterEpoch + + '}'; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java index eb7332fdf..8469defe2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java @@ -29,7 +29,7 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader { private String brokerAddr; @CFNotNull private String brokerName; - @CFNullable + @CFNotNull private Long brokerId; @CFNullable private Integer epoch;
