This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 90c1476899d035ad2ef7717cb7455177a2ef2edd
Author: RongtongJin <[email protected]>
AuthorDate: Wed Feb 1 16:27:40 2023 +0800

    Resolve the conflict
---
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java  | 16 ++++++++++------
 .../controller/impl/manager/ReplicasInfoManager.java     |  4 ++--
 .../controller/impl/manager/ReplicasInfoManagerTest.java |  6 +++---
 3 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index a5568d832..76a736ed6 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -99,9 +99,13 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
      */
     private long pullTimeDelayMillsWhenException = 3000;
     /**
-     * Flow control interval
+     * Flow control interval when cache is full
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 
50;
+    /**
+     * Flow control interval when broker throw flow control exception
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 
50;
     /**
      * Delay some time when suspend pull service
      */
@@ -264,7 +268,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 
* 1024);
 
         if (cachedMessageCount > 
this.defaultMQPushConsumer.getPullThresholdForQueue()) {
-            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
                     "the cached message count exceeds the threshold {}, so do 
flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
@@ -274,7 +278,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
 
         if (cachedMessageSizeInMiB > 
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
-            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
                     "the cached message size exceeds the threshold {} MiB, so 
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
@@ -285,7 +289,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
 
         if (!this.consumeOrderly) {
             if (processQueue.getMaxSpan() > 
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
-                this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+                this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                     log.warn(
                         "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, 
flowControlTimes={}",
@@ -505,7 +509,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
 
         if (processQueue.getWaiAckMsgCount() > 
this.defaultMQPushConsumer.getPopThresholdForQueue()) {
-            this.executePopPullRequestLater(popRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            this.executePopPullRequestLater(popRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn("the messages waiting to ack exceeds the threshold 
{}, so do flow control, popRequest={}, flowControlTimes={}, wait count={}",
                     this.defaultMQPushConsumer.getPopThresholdForQueue(), 
popRequest, queueFlowControlTimes, processQueue.getWaiAckMsgCount());
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index c9c5e0426..dc0339d0c 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -321,9 +321,9 @@ public class ReplicasInfoManager {
                 final ArrayList<BrokerReplicasInfo.ReplicaIdentity> 
inSyncReplicas = new ArrayList<>();
                 final ArrayList<BrokerReplicasInfo.ReplicaIdentity> 
notInSyncReplicas = new ArrayList<>();
 
-                brokerInfo.getBrokerIdTable().forEach((brokerAddress, 
brokerId) -> {
+                brokerReplicaInfo.getBrokerIdTable().forEach((brokerAddress, 
brokerId) -> {
                     if (syncStateSet.contains(brokerAddress)) {
-                        long id = StringUtils.equals(master, brokerAddress) ? 
MixAll.MASTER_ID : brokerInfo.getBrokerId(brokerAddress);
+                        long id = StringUtils.equals(master, brokerAddress) ? 
MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(brokerAddress);
                         inSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, id));
                     } else {
                         notInSyncReplicas.add(new 
BrokerReplicasInfo.ReplicaIdentity(brokerAddress, brokerId));
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 57d372349..8dc637842 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -32,7 +32,7 @@ import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
-import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -101,7 +101,7 @@ public class ReplicasInfoManagerTest {
 
         final GetReplicaInfoResponseHeader replicaInfoBefore = 
this.replicasInfoManager.getReplicaInfo(new 
GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
         byte[] body = 
this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)).getBody();
-        InSyncStateData syncStateDataBefore = 
RemotingSerializable.decode(body, InSyncStateData.class);
+        BrokerReplicasInfo syncStateDataBefore = 
RemotingSerializable.decode(body, BrokerReplicasInfo.class);
         // Try elect itself as a master
         ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, 
brokerAddress);
         final ControllerResult<ElectMasterResponseHeader> result = 
this.replicasInfoManager.electMaster(requestHeader, this.electPolicy);
@@ -131,7 +131,7 @@ public class ReplicasInfoManagerTest {
                 assertEquals(brokerId, replicaInfoAfter.getBrokerId());
                 return;
             }
-            if 
(syncStateDataBefore.getInSyncStateTable().containsKey(brokerAddress) || 
this.config.isEnableElectUncleanMaster()) {
+            if 
(syncStateDataBefore.getReplicasInfoTable().containsKey(brokerAddress) || 
this.config.isEnableElectUncleanMaster()) {
                 // can be elected successfully
                 assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
                 assertEquals(MixAll.MASTER_ID, replicaInfoAfter.getBrokerId());

Reply via email to