This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ea930182 GetSyncStateSetSubCommand can also print that the broker is
not in syncStateSet (#5935)
0ea930182 is described below
commit 0ea930182c60f2dbb2bf036223d9bd941fcd2f3b
Author: rongtong <[email protected]>
AuthorDate: Sat Jan 28 10:08:51 2023 +0800
GetSyncStateSetSubCommand can also print that the broker is not in
syncStateSet (#5935)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 6 +--
.../impl/manager/ReplicasInfoManager.java | 26 ++++++----
...nSyncStateData.java => BrokerReplicasInfo.java} | 60 +++++++++++++---------
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 4 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 4 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 4 +-
.../command/ha/GetSyncStateSetSubCommand.java | 23 +++++----
7 files changed, 75 insertions(+), 52 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f251e2b00..8347f3653 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -109,7 +109,7 @@ import
org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -2982,7 +2982,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public InSyncStateData getInSyncStateData(final String controllerAddress,
+ public BrokerReplicasInfo getInSyncStateData(final String
controllerAddress,
final List<String> brokers) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException, RemotingCommandException {
// Get controller leader address.
final GetMetaDataResponseHeader controllerMetaData =
getControllerMetaData(controllerAddress);
@@ -2997,7 +2997,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
- return RemotingSerializable.decode(response.getBody(),
InSyncStateData.class);
+ return RemotingSerializable.decode(response.getBody(),
BrokerReplicasInfo.class);
}
default:
break;
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a820b069e..1c5a805b6 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -41,7 +41,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -264,7 +264,7 @@ public class ReplicasInfoManager {
canBeElectedAsMaster =
syncStateInfo.getSyncStateSet().contains(brokerAddress) ||
this.controllerConfig.isEnableElectUncleanMaster();
}
if (!canBeElectedAsMaster) {
- // still need to apply an ElectMasterEvent to tell the
statemachine
+ // still need to apply an ElectMasterEvent to tell the
statemachine
// that the master was shutdown and no new master was
elected. set SyncStateInfo.masterAddress empty
final ElectMasterEvent event = new ElectMasterEvent(false,
brokerName);
result.addEvent(event);
@@ -322,7 +322,7 @@ public class ReplicasInfoManager {
public ControllerResult<Void> getSyncStateData(final List<String>
brokerNames) {
final ControllerResult<Void> result = new ControllerResult<>();
- final InSyncStateData inSyncStateData = new InSyncStateData();
+ final BrokerReplicasInfo brokerReplicasInfo = new BrokerReplicasInfo();
for (String brokerName : brokerNames) {
if (isContainsBroker(brokerName)) {
// If exist broker metadata, just return metadata
@@ -330,17 +330,23 @@ public class ReplicasInfoManager {
final BrokerInfo brokerInfo =
this.replicaInfoTable.get(brokerName);
final Set<String> syncStateSet =
syncStateInfo.getSyncStateSet();
final String master = syncStateInfo.getMasterAddress();
- final ArrayList<InSyncStateData.InSyncMember> inSyncMembers =
new ArrayList<>();
- syncStateSet.forEach(replicas -> {
- long brokerId = StringUtils.equals(master, replicas) ?
MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas);
- inSyncMembers.add(new
InSyncStateData.InSyncMember(replicas, brokerId));
+ final ArrayList<BrokerReplicasInfo.ReplicaIdentity>
inSyncReplicas = new ArrayList<>();
+ final ArrayList<BrokerReplicasInfo.ReplicaIdentity>
notInSyncReplicas = new ArrayList<>();
+
+ brokerInfo.getBrokerIdTable().forEach((brokerAddress,
brokerId) -> {
+ if (syncStateSet.contains(brokerAddress)) {
+ long id = StringUtils.equals(master, brokerAddress) ?
MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress);
+ inSyncReplicas.add(new
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id));
+ } else {
+ notInSyncReplicas.add(new
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId));
+ }
});
- final InSyncStateData.InSyncStateSet inSyncState = new
InSyncStateData.InSyncStateSet(master, syncStateInfo.getMasterEpoch(),
syncStateInfo.getSyncStateSetEpoch(), inSyncMembers);
- inSyncStateData.addInSyncState(brokerName, inSyncState);
+ final BrokerReplicasInfo.ReplicasInfo inSyncState = new
BrokerReplicasInfo.ReplicasInfo(master, syncStateInfo.getMasterEpoch(),
syncStateInfo.getSyncStateSetEpoch(), inSyncReplicas, notInSyncReplicas);
+ brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState);
}
}
- result.setBody(inSyncStateData.encode());
+ result.setBody(brokerReplicasInfo.encode());
return result;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
similarity index 59%
rename from
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
rename to
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
index 2496f260a..fece50d2e 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/InSyncStateData.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
@@ -21,38 +21,41 @@ import java.util.List;
import java.util.Map;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-public class InSyncStateData extends RemotingSerializable {
- private Map<String/*brokerName*/, InSyncStateSet> inSyncStateTable;
+public class BrokerReplicasInfo extends RemotingSerializable {
+ private Map<String/*brokerName*/, ReplicasInfo> replicasInfoTable;
- public InSyncStateData() {
- this.inSyncStateTable = new HashMap<>();
+ public BrokerReplicasInfo() {
+ this.replicasInfoTable = new HashMap<>();
}
- public void addInSyncState(final String brokerName, final InSyncStateSet
inSyncState) {
- this.inSyncStateTable.put(brokerName, inSyncState);
+ public void addReplicaInfo(final String brokerName, final ReplicasInfo
replicasInfo) {
+ this.replicasInfoTable.put(brokerName, replicasInfo);
}
- public Map<String, InSyncStateSet> getInSyncStateTable() {
- return inSyncStateTable;
+ public Map<String, ReplicasInfo> getReplicasInfoTable() {
+ return replicasInfoTable;
}
- public void setInSyncStateTable(
- Map<String, InSyncStateSet> inSyncStateTable) {
- this.inSyncStateTable = inSyncStateTable;
+ public void setReplicasInfoTable(
+ Map<String, ReplicasInfo> replicasInfoTable) {
+ this.replicasInfoTable = replicasInfoTable;
}
- public static class InSyncStateSet extends RemotingSerializable {
+ public static class ReplicasInfo extends RemotingSerializable {
private String masterAddress;
private int masterEpoch;
private int syncStateSetEpoch;
- private List<InSyncMember> inSyncMembers;
+ private List<ReplicaIdentity> inSyncReplicas;
+ private List<ReplicaIdentity> notInSyncReplicas;
- public InSyncStateSet(String masterAddress, int masterEpoch, int
syncStateSetEpoch,
- List<InSyncMember> inSyncMembers) {
+ public ReplicasInfo(String masterAddress, int masterEpoch, int
syncStateSetEpoch,
+ List<ReplicaIdentity> inSyncReplicas,
+ List<ReplicaIdentity> notInSyncReplicas) {
this.masterAddress = masterAddress;
this.masterEpoch = masterEpoch;
this.syncStateSetEpoch = syncStateSetEpoch;
- this.inSyncMembers = inSyncMembers;
+ this.inSyncReplicas = inSyncReplicas;
+ this.notInSyncReplicas = notInSyncReplicas;
}
public String getMasterAddress() {
@@ -79,21 +82,30 @@ public class InSyncStateData extends RemotingSerializable {
this.syncStateSetEpoch = syncStateSetEpoch;
}
- public List<InSyncMember> getInSyncMembers() {
- return inSyncMembers;
+ public List<ReplicaIdentity> getInSyncReplicas() {
+ return inSyncReplicas;
}
- public void setInSyncMembers(
- List<InSyncMember> inSyncMembers) {
- this.inSyncMembers = inSyncMembers;
+ public void setInSyncReplicas(
+ List<ReplicaIdentity> inSyncReplicas) {
+ this.inSyncReplicas = inSyncReplicas;
+ }
+
+ public List<ReplicaIdentity> getNotInSyncReplicas() {
+ return notInSyncReplicas;
+ }
+
+ public void setNotInSyncReplicas(
+ List<ReplicaIdentity> notInSyncReplicas) {
+ this.notInSyncReplicas = notInSyncReplicas;
}
}
- public static class InSyncMember extends RemotingSerializable {
+ public static class ReplicaIdentity extends RemotingSerializable {
private String address;
private Long brokerId;
- public InSyncMember(String address, Long brokerId) {
+ public ReplicaIdentity(String address, Long brokerId) {
this.address = address;
this.brokerId = brokerId;
}
@@ -116,7 +128,7 @@ public class InSyncStateData extends RemotingSerializable {
@Override
public String toString() {
- return "InSyncMember{" +
+ return "{" +
"address='" + address + '\'' +
", brokerId=" + brokerId +
'}';
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 7bc308036..f70580dc6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -51,7 +51,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -776,7 +776,7 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public InSyncStateData getInSyncStateData(String controllerAddress,
+ public BrokerReplicasInfo getInSyncStateData(String controllerAddress,
List<String> brokers) throws RemotingException, InterruptedException,
MQBrokerException {
return
this.defaultMQAdminExtImpl.getInSyncStateData(controllerAddress, brokers);
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 0460ed95b..fc3e079fe 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -86,7 +86,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -1819,7 +1819,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public InSyncStateData getInSyncStateData(String controllerAddress,
+ public BrokerReplicasInfo getInSyncStateData(String controllerAddress,
List<String> brokers) throws RemotingException, InterruptedException,
MQBrokerException {
return
this.mqClientInstance.getMQClientAPIImpl().getInSyncStateData(controllerAddress,
brokers);
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index ebf878f32..2d19af5f2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -48,7 +48,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
@@ -411,7 +411,7 @@ public interface MQAdminExt extends MQAdmin {
HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException;
- InSyncStateData getInSyncStateData(String controllerAddress,
+ BrokerReplicasInfo getInSyncStateData(String controllerAddress,
List<String> brokers) throws RemotingException, InterruptedException,
MQBrokerException;
EpochEntryCache getBrokerEpochCache(
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
index e9699e713..252dd99fb 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/GetSyncStateSetSubCommand.java
@@ -24,7 +24,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
@@ -114,15 +114,20 @@ public class GetSyncStateSetSubCommand implements
SubCommand {
private void printData(String controllerAddress, List<String> brokerNames,
DefaultMQAdminExt defaultMQAdminExt) throws Exception {
if (brokerNames.size() > 0) {
- final InSyncStateData syncStateData =
defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames);
- final Map<String, InSyncStateData.InSyncStateSet> syncTable =
syncStateData.getInSyncStateTable();
- for (Map.Entry<String, InSyncStateData.InSyncStateSet> next :
syncTable.entrySet()) {
- final List<InSyncStateData.InSyncMember> syncMembers =
next.getValue().getInSyncMembers();
-
System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetMemberNums\t%d\n",
+ final BrokerReplicasInfo brokerReplicasInfo =
defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames);
+ final Map<String, BrokerReplicasInfo.ReplicasInfo>
replicasInfoTable = brokerReplicasInfo.getReplicasInfoTable();
+ for (Map.Entry<String, BrokerReplicasInfo.ReplicasInfo> next :
replicasInfoTable.entrySet()) {
+ final List<BrokerReplicasInfo.ReplicaIdentity> inSyncReplicas
= next.getValue().getInSyncReplicas();
+ final List<BrokerReplicasInfo.ReplicaIdentity>
notInSyncReplicas = next.getValue().getNotInSyncReplicas();
+
System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetNums\t%d\n",
next.getKey(), next.getValue().getMasterAddress(),
next.getValue().getMasterEpoch(), next.getValue().getSyncStateSetEpoch(),
- syncMembers.size());
- for (InSyncStateData.InSyncMember member : syncMembers) {
- System.out.printf("\n member:\t%s\n", member.toString());
+ inSyncReplicas.size());
+ for (BrokerReplicasInfo.ReplicaIdentity member :
inSyncReplicas) {
+ System.out.printf("\n InSyncReplica:\t%s\n",
member.toString());
+ }
+
+ for (BrokerReplicasInfo.ReplicaIdentity member :
notInSyncReplicas) {
+ System.out.printf("\n NotInSyncReplica:\t%s\n",
member.toString());
}
}
}