This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a12a64e [INLONG-6038][TubeMQ] Optimize 
FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039)
08a12a64e is described below

commit 08a12a64e133fbed5ac6583b4f2d6da871651e45
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Sep 27 18:51:32 2022 +0800

    [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() 
(#6039)
---
 .../tubemq/client/config/ConsumerConfig.java       |   2 +-
 .../client/consumer/BaseMessageConsumer.java       |  19 +-
 .../tubemq/client/consumer/RmtDataCache.java       | 209 +++++++++++----------
 .../consumer/SimpleClientBalanceConsumer.java      |   4 +-
 .../corebase/policies/FlowCtrlRuleHandler.java     |  34 ++--
 .../corebase/policies/TestFlowCtrlRuleHandler.java |   3 +-
 .../inlong/tubemq/server/broker/TubeBroker.java    |  48 ++---
 7 files changed, 166 insertions(+), 153 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
index 6dd19b8d3..590032d92 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
@@ -245,7 +245,7 @@ public class ConsumerConfig extends TubeClientConfig {
     }
 
     public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) {
-        this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+        this.maxSubInfoReportIntvlTimes = Math.max(maxSubInfoReportIntvlTimes, 
3);
     }
 
     private void validConsumerGroupParameter(String consumerGroup) {
diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 177fcbb82..5840c28e4 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -111,7 +111,7 @@ public class BaseMessageConsumer implements MessageConsumer 
{
     // -1: Unsubscribed
     // 0: In Process
     // 1: Subscribed
-    private AtomicInteger subStatus = new AtomicInteger(-1);
+    private final AtomicInteger subStatus = new AtomicInteger(-1);
     // rebalance
     private int reportIntervalTimes = 0;
     private int rebalanceRetryTimes = 0;
@@ -610,7 +610,7 @@ public class BaseMessageConsumer implements MessageConsumer 
{
                         
masterService.consumerRegisterC2M(createMasterRegisterRequest(),
                                 AddressUtils.getLocalAddress(), 
consumerConfig.isTlsEnable());
                 if (response != null && response.getSuccess()) {
-                    processRegisterAllocAndRspFlowRules(response);
+                    processRegisterAllocAndRspFlowRules(response, strBuffer);
                     processRegAuthorizedToken(response);
                     break;
                 }
@@ -1094,11 +1094,12 @@ public class BaseMessageConsumer implements 
MessageConsumer {
         return builder.build();
     }
 
-    private void 
processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response) {
+    private void 
processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response,
+                                                     StringBuilder strBuffer) {
         if (response.hasNotAllocated() && !response.getNotAllocated()) {
             consumeSubInfo.compareAndSetIsNotAllocated(true, false);
         }
-        rmtDataCache.updFlowCtrlInfoInfo(response);
+        rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
     }
 
     private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C 
response) {
@@ -1107,11 +1108,12 @@ public class BaseMessageConsumer implements 
MessageConsumer {
         }
     }
 
-    private void 
procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response) {
+    private void 
procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response,
+                                                   StringBuilder strBuffer) {
         if (response.hasNotAllocated() && !response.getNotAllocated()) {
             consumeSubInfo.compareAndSetIsNotAllocated(true, false);
         }
-        rmtDataCache.updFlowCtrlInfoInfo(response);
+        rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
     }
 
     private ClientMaster.MasterCertificateInfo 
genMasterCertificateInfo(boolean force) {
@@ -1478,7 +1480,7 @@ public class BaseMessageConsumer implements 
MessageConsumer {
                             } else {
                                 // Process the successful response. Record the 
response information,
                                 // including control rules and latest auth 
token.
-                                
processRegisterAllocAndRspFlowRules(regResponse);
+                                
processRegisterAllocAndRspFlowRules(regResponse, strBuffer);
                                 processRegAuthorizedToken(regResponse);
                                 logger.info(strBuffer.append("[Re-register] ")
                                         .append(consumerId).toString());
@@ -1505,7 +1507,7 @@ public class BaseMessageConsumer implements 
MessageConsumer {
                 // Process the heartbeat success response
                 heartbeatRetryTimes = 0;
                 // Get the authorization rules and update the local rules
-                procHeartBeatRspAllocAndFlowRules(response);
+                procHeartBeatRspAllocAndFlowRules(response, strBuffer);
                 // Get the latest authorized token
                 processHeartBeatAuthorizedToken(response);
                 // Check if master requires to check authorization next time. 
If so, set the flag
@@ -1695,5 +1697,4 @@ public class BaseMessageConsumer implements 
MessageConsumer {
             }
         }
     }
-
 }
diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
index d3dd0762f..3170f7db2 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
@@ -73,7 +73,7 @@ public class RmtDataCache implements Closeable {
     private long lastEmptyBrokerPrintTime = 0;
     private long lastEmptyTopicPrintTime = 0;
     private long lastBrokerUpdatedTime = System.currentTimeMillis();
-    private AtomicLong lstBrokerConfigId =
+    private final AtomicLong lstBrokerConfigId =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
     private Map<Integer, BrokerInfo> brokersMap =
             new ConcurrentHashMap<>();
@@ -96,12 +96,15 @@ public class RmtDataCache implements Closeable {
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
     private boolean isFirstReport = true;
     private long reportIntCount = 0;
+    private final long maxReportTimes;
     // partition cache
     private final AtomicInteger waitCont = new AtomicInteger(0);
     private final ConcurrentHashMap<String, Timeout> timeouts =
             new ConcurrentHashMap<>();
     private final ConcurrentLinkedQueue<String> indexPartition =
             new ConcurrentLinkedQueue<String>();
+    private volatile long lstReportTime = 0;
+    private final AtomicLong partMapChgTime = new AtomicLong(0);
     private final ConcurrentHashMap<String /* index */, PartitionExt> 
partitionMap =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap 
=
@@ -116,7 +119,7 @@ public class RmtDataCache implements Closeable {
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* partitionKey */, Integer> 
partRegisterBookMap =
             new ConcurrentHashMap<>();
-    private AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
     private CountDownLatch dataProcessSync = new CountDownLatch(0);
 
     /**
@@ -130,6 +133,7 @@ public class RmtDataCache implements Closeable {
         if (refCont.incrementAndGet() == 1) {
             timer = new HashedWheelTimer();
         }
+        this.maxReportTimes = consumerConfig.getMaxSubInfoReportIntvlTimes() * 
10L;
         Map<Partition, ConsumeOffsetInfo> tmpPartOffsetMap = new HashMap<>();
         if (partitionList != null) {
             for (Partition partition : partitionList) {
@@ -163,41 +167,39 @@ public class RmtDataCache implements Closeable {
      * update ops task in cache
      *
      * @param opsTaskInfo ops task info
+     * @param strBuff   the string buffer
      *
      */
-    public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo) {
+    public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo, 
StringBuilder strBuff) {
         if (opsTaskInfo == null) {
             return;
         }
-        // update flowctrl info
-        if (opsTaskInfo.hasGroupFlowCheckId()) {
-            if (opsTaskInfo.getGroupFlowCheckId() >= 0
-                    && opsTaskInfo.getGroupFlowCheckId() != 
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
-                try {
-                    
groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
-                            opsTaskInfo.getGroupFlowCheckId(), 
opsTaskInfo.getGroupFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Remote Data Cache] found parse group 
flowCtrl rules failure", e1);
-                }
-            }
-        }
-        if (opsTaskInfo.hasDefFlowCheckId()) {
-            if (opsTaskInfo.getDefFlowCheckId() >= 0
-                    && opsTaskInfo.getDefFlowCheckId() != 
defFlowCtrlRuleHandler.getFlowCtrlId()) {
-                try {
-                    
defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
-                            opsTaskInfo.getDefFlowCheckId(), 
opsTaskInfo.getDefFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Remote Data Cache] found parse default 
flowCtrl rules failure", e1);
-                }
+        // update group flowctrl info
+        if (opsTaskInfo.hasGroupFlowCheckId()
+                && opsTaskInfo.getGroupFlowCheckId() >= 0
+                && opsTaskInfo.getGroupFlowCheckId() != 
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+            try {
+                groupFlowCtrlRuleHandler.updateFlowCtrlInfo(
+                        opsTaskInfo.getQryPriorityId(),
+                        opsTaskInfo.getGroupFlowCheckId(),
+                        opsTaskInfo.getGroupFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Remote Data Cache] found parse group flowCtrl 
rules failure", e1);
+            }
+        }
+        // update default flowctrl info
+        if (opsTaskInfo.hasDefFlowCheckId()
+                && opsTaskInfo.getDefFlowCheckId() >= 0
+                && opsTaskInfo.getDefFlowCheckId() != 
defFlowCtrlRuleHandler.getFlowCtrlId()) {
+            try {
+                defFlowCtrlRuleHandler.updateFlowCtrlInfo(
+                        TBaseConstants.META_VALUE_UNDEFINED,
+                        opsTaskInfo.getDefFlowCheckId(),
+                        opsTaskInfo.getDefFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Remote Data Cache] found parse default flowCtrl 
rules failure", e1);
             }
         }
-        // update priority id
-        int qryPriorityId = opsTaskInfo.hasQryPriorityId()
-                ? opsTaskInfo.getQryPriorityId() : 
groupFlowCtrlRuleHandler.getQryPriorityId();
-        if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
-            groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
-        }
         // update consume control info
         if (opsTaskInfo.hasCsmFrmMaxOffsetCtrlId()
                 && opsTaskInfo.getCsmFrmMaxOffsetCtrlId() >= 0) {
@@ -217,83 +219,81 @@ public class RmtDataCache implements Closeable {
     /**
      * update ops task in cache
      *
-     * @param response master register response
+     * @param response   master register response
+     * @param strBuff  the string buffer
      *
      */
-    public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response) 
{
+    public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response,
+                                    StringBuilder strBuff) {
         if (response == null) {
             return;
         }
-        // update flowctrl info
-        if (response.hasGroupFlowCheckId()) {
-            if (response.getGroupFlowCheckId() >= 0
-                    && response.getGroupFlowCheckId() != 
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
-                try {
-                    
groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
-                            response.getGroupFlowCheckId(), 
response.getGroupFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Remote Data Cache] found parse group 
flowCtrl rules failure", e1);
-                }
-            }
-        }
-        if (response.hasDefFlowCheckId()) {
-            if (response.getDefFlowCheckId() >= 0
-                    && response.getDefFlowCheckId() != 
defFlowCtrlRuleHandler.getFlowCtrlId()) {
-                try {
-                    
defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
-                            response.getDefFlowCheckId(), 
response.getDefFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Remote Data Cache] found parse default 
flowCtrl rules failure", e1);
-                }
+        // update group flowctrl info
+        if (response.hasGroupFlowCheckId()
+                && response.getGroupFlowCheckId() >= 0
+                && response.getGroupFlowCheckId() != 
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+            try {
+                groupFlowCtrlRuleHandler.updateFlowCtrlInfo(
+                        response.getQryPriorityId(),
+                        response.getGroupFlowCheckId(),
+                        response.getGroupFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Remote Data Cache] found parse group flowCtrl 
rules failure", e1);
+            }
+        }
+        // update default flowctrl info
+        if (response.hasDefFlowCheckId()
+                && response.getDefFlowCheckId() >= 0
+                && response.getDefFlowCheckId() != 
defFlowCtrlRuleHandler.getFlowCtrlId()) {
+            try {
+                defFlowCtrlRuleHandler.updateFlowCtrlInfo(
+                        TBaseConstants.META_VALUE_UNDEFINED,
+                        response.getDefFlowCheckId(),
+                        response.getDefFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Remote Data Cache] found parse default flowCtrl 
rules failure", e1);
             }
         }
-        // update priority id
-        int qryPriorityId = response.hasQryPriorityId()
-                ? response.getQryPriorityId() : 
groupFlowCtrlRuleHandler.getQryPriorityId();
-        if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
-            groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
-        }
     }
 
     /**
      * update ops task in cache
      *
-     * @param response master register response
+     * @param response   master register response
+     * @param strBuff  the string buffer
      *
      */
-    public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response) {
+    public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response,
+                                    StringBuilder strBuff) {
         if (response == null) {
             return;
         }
-        // update flowctrl info
-        if (response.hasGroupFlowCheckId()) {
-            if (response.getGroupFlowCheckId() >= 0
-                    && response.getGroupFlowCheckId() != 
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
-                try {
-                    
groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
-                            response.getGroupFlowCheckId(), 
response.getGroupFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Remote Data Cache] found parse group 
flowCtrl rules failure", e1);
-                }
-            }
-        }
-        if (response.hasDefFlowCheckId()) {
-            if (response.getDefFlowCheckId() >= 0
-                    && response.getDefFlowCheckId() != 
defFlowCtrlRuleHandler.getFlowCtrlId()) {
-                try {
-                    
defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
-                            response.getDefFlowCheckId(), 
response.getDefFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Remote Data Cache] found parse default 
flowCtrl rules failure", e1);
-                }
+        // update group flowctrl info
+        if (response.hasGroupFlowCheckId()
+                && response.getGroupFlowCheckId() >= 0
+                && response.getGroupFlowCheckId() != 
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+            try {
+                groupFlowCtrlRuleHandler.updateFlowCtrlInfo(
+                        response.getQryPriorityId(),
+                        response.getGroupFlowCheckId(),
+                        response.getGroupFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Remote Data Cache] found parse group flowCtrl 
rules failure", e1);
+            }
+        }
+        // update default flowctrl info
+        if (response.hasDefFlowCheckId()
+                && response.getDefFlowCheckId() >= 0
+                && response.getDefFlowCheckId() != 
defFlowCtrlRuleHandler.getFlowCtrlId()) {
+            try {
+                defFlowCtrlRuleHandler.updateFlowCtrlInfo(
+                        TBaseConstants.META_VALUE_UNDEFINED,
+                        response.getDefFlowCheckId(),
+                        response.getDefFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Remote Data Cache] found parse default flowCtrl 
rules failure", e1);
             }
         }
-        // update priority id
-        int qryPriorityId = response.hasQryPriorityId()
-                ? response.getQryPriorityId() : 
groupFlowCtrlRuleHandler.getQryPriorityId();
-        if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
-            groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
-        }
     }
 
     public boolean isCsmFromMaxOffset() {
@@ -566,12 +566,16 @@ public class RmtDataCache implements Closeable {
             if (!this.partitionMap.isEmpty()) {
                 isFirstReport = false;
                 builder.setReportSubInfo(true);
+                lstReportTime = partMapChgTime.get();
+                builder.addAllPartSubInfo(getSubscribedPartitionInfo());
+            }
+        } else {
+            if (lstReportTime != partMapChgTime.get()
+                    || ((++this.reportIntCount) % this.maxReportTimes == 0)) {
+                builder.setReportSubInfo(true);
+                lstReportTime = partMapChgTime.get();
                 builder.addAllPartSubInfo(getSubscribedPartitionInfo());
             }
-        } else if ((++this.reportIntCount)
-                % consumerConfig.getMaxSubInfoReportIntvlTimes() == 0) {
-            builder.setReportSubInfo(true);
-            builder.addAllPartSubInfo(getSubscribedPartitionInfo());
         }
         return builder.build();
     }
@@ -1068,7 +1072,7 @@ public class RmtDataCache implements Closeable {
             for (Map.Entry<BrokerInfo, List<Partition>> entry : 
unRegisterInfoMap.entrySet()) {
                 for (Partition partition : entry.getValue()) {
                     PartitionExt partitionExt =
-                            partitionMap.remove(partition.getPartitionKey());
+                            rmvPartitionFromMap(partition.getPartitionKey());
                     if (partitionExt != null) {
                         lastPackConsumed = partitionExt.isLastPackConsumed();
                         if (!cancelTimeTask(partition.getPartitionKey())
@@ -1137,7 +1141,7 @@ public class RmtDataCache implements Closeable {
         try {
             waitPartitions(partitionKeys, inUseWaitPeriodMs);
             PartitionExt partitionExt =
-                    partitionMap.remove(partitionKey);
+                    rmvPartitionFromMap(partitionKey);
             if (partitionExt == null) {
                 result.setSuccResult(null);
                 return result.isSuccess();
@@ -1187,7 +1191,7 @@ public class RmtDataCache implements Closeable {
      * @param partition partition to be removed
      */
     public void removePartition(Partition partition) {
-        partitionMap.remove(partition.getPartitionKey());
+        rmvPartitionFromMap(partition.getPartitionKey());
         cancelTimeTask(partition.getPartitionKey());
         indexPartition.remove(partition.getPartitionKey());
         partitionUsedMap.remove(partition.getPartitionKey());
@@ -1505,7 +1509,7 @@ public class RmtDataCache implements Closeable {
             }
             updateOffsetCache(partition.getPartitionKey(),
                     entry.getValue().getCurrOffset(), 
entry.getValue().getMaxOffset());
-            partitionMap.put(partition.getPartitionKey(),
+            addPartitionToMap(partition.getPartitionKey(),
                     new PartitionExt(this.groupFlowCtrlRuleHandler,
                             this.defFlowCtrlRuleHandler, partition.getBroker(),
                             partition.getTopic(), partition.getPartitionId()));
@@ -1530,6 +1534,19 @@ public class RmtDataCache implements Closeable {
                 && this.dataProcessSync.getCount() != 0);
     }
 
+    private void addPartitionToMap(String partKey, PartitionExt partitionExt) {
+        partitionMap.put(partKey, partitionExt);
+        partMapChgTime.set(System.currentTimeMillis());
+    }
+
+    private PartitionExt rmvPartitionFromMap(String partKey) {
+        PartitionExt tmpPartExt = partitionMap.remove(partKey);
+        if (tmpPartExt != null) {
+            partMapChgTime.set(System.currentTimeMillis());
+        }
+        return tmpPartExt;
+    }
+
     private void pauseProcess() {
         this.dataProcessSync = new CountDownLatch(1);
     }
diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index d1a1a68aa..330ab1d6b 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -1035,7 +1035,7 @@ public class SimpleClientBalanceConsumer implements 
ClientBalanceConsumer {
                     lstMetaQueryTime.set(System.currentTimeMillis());
                 }
                 // Get the authorization rules and update the local rules
-                clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo());
+                clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo(), 
strBuffer);
                 // Get the latest authorized token
                 processHeartBeatAuthorizedToken(response);
                 // Warning if heartbeat interval is too long
@@ -1171,7 +1171,7 @@ public class SimpleClientBalanceConsumer implements 
ClientBalanceConsumer {
             clientRmtDataCache.updateReg2MasterTime();
             
clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(),
                     response.getBrokerConfigListList(), sBuffer);
-            clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo());
+            clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo(), 
sBuffer);
             processRegAuthorizedToken(response);
             result.setSuccResult();
             return result.isSuccess();
diff --git 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index 31ecf1180..93cab8faa 100644
--- 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -88,30 +88,27 @@ public class FlowCtrlRuleHandler {
     /**
      * Parse flow control information and update stored cached old content
      *
-     * @param qyrPriorityId    query priority id
+     * @param qryPriorityId    the query priority id
      * @param flowCtrlId       flow control information id
      * @param flowCtrlInfo     flow control information content
-     * @throws Exception       Exception thrown
+     * @param strBuff          the string buffer
+     * @throws Exception       the exception thrown
      */
-    public void updateFlowCtrlInfo(final int qyrPriorityId,
-                                   final long flowCtrlId,
-                                   final String flowCtrlInfo) throws Exception 
{
+    public void updateFlowCtrlInfo(int qryPriorityId, long flowCtrlId,
+                                   String flowCtrlInfo, StringBuilder strBuff) 
throws Exception {
         if (flowCtrlId == this.flowCtrlId.get()) {
             return;
         }
+        long befFlowCtrlId;
+        int befQryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
         Map<Integer, List<FlowCtrlItem>> flowCtrlItemsMap = null;
         if (TStringUtils.isNotBlank(flowCtrlInfo)) {
             flowCtrlItemsMap = parseFlowCtrlInfo(flowCtrlInfo);
         }
         writeLock.lock();
         try {
-            this.flowCtrlId.set(flowCtrlId);
+            befFlowCtrlId = this.flowCtrlId.getAndSet(flowCtrlId);
             this.strFlowCtrlInfo = flowCtrlInfo;
-            logger.info(new StringBuilder(512)
-                .append("[Flow Ctrl] Updated ").append(flowCtrlName)
-                .append(" to flowId=").append(flowCtrlId)
-                .append(",qyrPriorityId=").append(qyrPriorityId).toString());
-            this.qryPriorityId.set(qyrPriorityId);
             clearStatisData();
             if (flowCtrlItemsMap == null
                     || flowCtrlItemsMap.isEmpty()) {
@@ -120,10 +117,24 @@ public class FlowCtrlRuleHandler {
                 flowCtrlRuleSet = flowCtrlItemsMap;
                 initialStatisData();
             }
+            if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+                    && qryPriorityId != this.qryPriorityId.get()) {
+                befQryPriorityId = this.qryPriorityId.getAndSet(qryPriorityId);
+            }
             this.lastUpdateTime = System.currentTimeMillis();
         } finally {
             writeLock.unlock();
         }
+        strBuff.append("[Flow Ctrl] Update ").append(flowCtrlName)
+                .append(", flowId from ").append(befFlowCtrlId)
+                .append(" to ").append(flowCtrlId);
+        if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+                && qryPriorityId != befQryPriorityId) {
+            strBuff.append(", qryPriorityId from ").append(befQryPriorityId)
+                    .append(" to ").append(qryPriorityId);
+        }
+        logger.info(strBuff.toString());
+        strBuff.delete(0, strBuff.length());
     }
 
     /**
@@ -782,5 +793,4 @@ public class FlowCtrlRuleHandler {
         }
         return timeHour * 100 + timeMin;
     }
-
 }
diff --git 
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
 
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
index 09fc784fa..0ea34a238 100644
--- 
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
+++ 
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
@@ -39,8 +39,9 @@ public class TestFlowCtrlRuleHandler {
     @Test
     public void testFlowCtrlRuleHandler() {
         try {
+            StringBuilder strBuff = new StringBuilder(512);
             FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true);
-            handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
+            handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo(), strBuff);
             TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00");
             Calendar rightNow = Calendar.getInstance(timeZone);
             int hour = rightNow.get(Calendar.HOUR_OF_DAY);
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index 397ccfd1f..437b51f48 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -329,24 +329,14 @@ public class TubeBroker implements Stoppable {
         // update default flow controller rules
         FlowCtrlRuleHandler defFlowCtrlHandler =
                 metadataManager.getFlowCtrlRuleHandler();
-        long flowCheckId = defFlowCtrlHandler.getFlowCtrlId();
-        int qryPriorityId = defFlowCtrlHandler.getQryPriorityId();
-        if (response.hasFlowCheckId()) {
-            qryPriorityId = response.hasQryPriorityId()
-                    ? response.getQryPriorityId() : qryPriorityId;
-            if (response.getFlowCheckId() != flowCheckId) {
-                flowCheckId = response.getFlowCheckId();
-                try {
-                    defFlowCtrlHandler
-                            .updateFlowCtrlInfo(qryPriorityId,
-                                    flowCheckId, 
response.getFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn(
-                            "[HeartBeat response] found parse flowCtrl rules 
failure", e1);
-                }
-            }
-            if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
-                defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
+        if (response.hasFlowCheckId()
+                && response.getFlowCheckId() >= 0
+                && response.getFlowCheckId() != 
defFlowCtrlHandler.getFlowCtrlId()) {
+            try {
+                
defFlowCtrlHandler.updateFlowCtrlInfo(response.getQryPriorityId(),
+                        response.getFlowCheckId(), 
response.getFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[HeartBeat response] found update flowCtrl rules 
failure", e1);
             }
         }
         // update configure report requirement
@@ -450,20 +440,14 @@ public class TubeBroker implements Stoppable {
         // process default flow controller rules
         FlowCtrlRuleHandler defFlowCtrlHandler =
                 metadataManager.getFlowCtrlRuleHandler();
-        if (response.hasFlowCheckId()) {
-            int qryPriorityId = response.hasQryPriorityId()
-                    ? response.getQryPriorityId() : 
defFlowCtrlHandler.getQryPriorityId();
-            if (response.getFlowCheckId() != 
defFlowCtrlHandler.getFlowCtrlId()) {
-                try {
-                    defFlowCtrlHandler
-                            .updateFlowCtrlInfo(response.getQryPriorityId(),
-                                    response.getFlowCheckId(), 
response.getFlowControlInfo());
-                } catch (Exception e1) {
-                    logger.warn("[Register response] found parse flowCtrl 
rules failure", e1);
-                }
-            }
-            if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
-                defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
+        if (response.hasFlowCheckId()
+                && response.getFlowCheckId() >= 0
+                && response.getFlowCheckId() != 
defFlowCtrlHandler.getFlowCtrlId()) {
+            try {
+                
defFlowCtrlHandler.updateFlowCtrlInfo(response.getQryPriorityId(),
+                        response.getFlowCheckId(), 
response.getFlowControlInfo(), strBuff);
+            } catch (Exception e1) {
+                logger.warn("[Register response] update default flowCtrl rules 
failure", e1);
             }
         }
         // update auth info

Reply via email to