This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 08a12a64e [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039) 08a12a64e is described below commit 08a12a64e133fbed5ac6583b4f2d6da871651e45 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Sep 27 18:51:32 2022 +0800 [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039) --- .../tubemq/client/config/ConsumerConfig.java | 2 +- .../client/consumer/BaseMessageConsumer.java | 19 +- .../tubemq/client/consumer/RmtDataCache.java | 209 +++++++++++---------- .../consumer/SimpleClientBalanceConsumer.java | 4 +- .../corebase/policies/FlowCtrlRuleHandler.java | 34 ++-- .../corebase/policies/TestFlowCtrlRuleHandler.java | 3 +- .../inlong/tubemq/server/broker/TubeBroker.java | 48 ++--- 7 files changed, 166 insertions(+), 153 deletions(-) diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java index 6dd19b8d3..590032d92 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java @@ -245,7 +245,7 @@ public class ConsumerConfig extends TubeClientConfig { } public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) { - this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes; + this.maxSubInfoReportIntvlTimes = Math.max(maxSubInfoReportIntvlTimes, 3); } private void validConsumerGroupParameter(String consumerGroup) { diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java index 177fcbb82..5840c28e4 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java @@ -111,7 +111,7 @@ public class BaseMessageConsumer implements MessageConsumer { // -1: Unsubscribed // 0: In Process // 1: Subscribed - private AtomicInteger subStatus = new AtomicInteger(-1); + private final AtomicInteger subStatus = new AtomicInteger(-1); // rebalance private int reportIntervalTimes = 0; private int rebalanceRetryTimes = 0; @@ -610,7 +610,7 @@ public class BaseMessageConsumer implements MessageConsumer { masterService.consumerRegisterC2M(createMasterRegisterRequest(), AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable()); if (response != null && response.getSuccess()) { - processRegisterAllocAndRspFlowRules(response); + processRegisterAllocAndRspFlowRules(response, strBuffer); processRegAuthorizedToken(response); break; } @@ -1094,11 +1094,12 @@ public class BaseMessageConsumer implements MessageConsumer { return builder.build(); } - private void processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response) { + private void processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response, + StringBuilder strBuffer) { if (response.hasNotAllocated() && !response.getNotAllocated()) { consumeSubInfo.compareAndSetIsNotAllocated(true, false); } - rmtDataCache.updFlowCtrlInfoInfo(response); + rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer); } private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C response) { @@ -1107,11 +1108,12 @@ public class BaseMessageConsumer implements MessageConsumer { } } - private void procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response) { + private void procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response, + StringBuilder strBuffer) { if (response.hasNotAllocated() && !response.getNotAllocated()) { consumeSubInfo.compareAndSetIsNotAllocated(true, false); } - rmtDataCache.updFlowCtrlInfoInfo(response); + rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer); } private ClientMaster.MasterCertificateInfo genMasterCertificateInfo(boolean force) { @@ -1478,7 +1480,7 @@ public class BaseMessageConsumer implements MessageConsumer { } else { // Process the successful response. Record the response information, // including control rules and latest auth token. - processRegisterAllocAndRspFlowRules(regResponse); + processRegisterAllocAndRspFlowRules(regResponse, strBuffer); processRegAuthorizedToken(regResponse); logger.info(strBuffer.append("[Re-register] ") .append(consumerId).toString()); @@ -1505,7 +1507,7 @@ public class BaseMessageConsumer implements MessageConsumer { // Process the heartbeat success response heartbeatRetryTimes = 0; // Get the authorization rules and update the local rules - procHeartBeatRspAllocAndFlowRules(response); + procHeartBeatRspAllocAndFlowRules(response, strBuffer); // Get the latest authorized token processHeartBeatAuthorizedToken(response); // Check if master requires to check authorization next time. If so, set the flag @@ -1695,5 +1697,4 @@ public class BaseMessageConsumer implements MessageConsumer { } } } - } diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java index d3dd0762f..3170f7db2 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java @@ -73,7 +73,7 @@ public class RmtDataCache implements Closeable { private long lastEmptyBrokerPrintTime = 0; private long lastEmptyTopicPrintTime = 0; private long lastBrokerUpdatedTime = System.currentTimeMillis(); - private AtomicLong lstBrokerConfigId = + private final AtomicLong lstBrokerConfigId = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); private Map<Integer, BrokerInfo> brokersMap = new ConcurrentHashMap<>(); @@ -96,12 +96,15 @@ public class RmtDataCache implements Closeable { new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); private boolean isFirstReport = true; private long reportIntCount = 0; + private final long maxReportTimes; // partition cache private final AtomicInteger waitCont = new AtomicInteger(0); private final ConcurrentHashMap<String, Timeout> timeouts = new ConcurrentHashMap<>(); private final ConcurrentLinkedQueue<String> indexPartition = new ConcurrentLinkedQueue<String>(); + private volatile long lstReportTime = 0; + private final AtomicLong partMapChgTime = new AtomicLong(0); private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap = @@ -116,7 +119,7 @@ public class RmtDataCache implements Closeable { new ConcurrentHashMap<>(); private final ConcurrentHashMap<String/* partitionKey */, Integer> partRegisterBookMap = new ConcurrentHashMap<>(); - private AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicBoolean isClosed = new AtomicBoolean(false); private CountDownLatch dataProcessSync = new CountDownLatch(0); /** @@ -130,6 +133,7 @@ public class RmtDataCache implements Closeable { if (refCont.incrementAndGet() == 1) { timer = new HashedWheelTimer(); } + this.maxReportTimes = consumerConfig.getMaxSubInfoReportIntvlTimes() * 10L; Map<Partition, ConsumeOffsetInfo> tmpPartOffsetMap = new HashMap<>(); if (partitionList != null) { for (Partition partition : partitionList) { @@ -163,41 +167,39 @@ public class RmtDataCache implements Closeable { * update ops task in cache * * @param opsTaskInfo ops task info + * @param strBuff the string buffer * */ - public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo) { + public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo, StringBuilder strBuff) { if (opsTaskInfo == null) { return; } - // update flowctrl info - if (opsTaskInfo.hasGroupFlowCheckId()) { - if (opsTaskInfo.getGroupFlowCheckId() >= 0 - && opsTaskInfo.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) { - try { - groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED, - opsTaskInfo.getGroupFlowCheckId(), opsTaskInfo.getGroupFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1); - } - } - } - if (opsTaskInfo.hasDefFlowCheckId()) { - if (opsTaskInfo.getDefFlowCheckId() >= 0 - && opsTaskInfo.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) { - try { - defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED, - opsTaskInfo.getDefFlowCheckId(), opsTaskInfo.getDefFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1); - } + // update group flowctrl info + if (opsTaskInfo.hasGroupFlowCheckId() + && opsTaskInfo.getGroupFlowCheckId() >= 0 + && opsTaskInfo.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) { + try { + groupFlowCtrlRuleHandler.updateFlowCtrlInfo( + opsTaskInfo.getQryPriorityId(), + opsTaskInfo.getGroupFlowCheckId(), + opsTaskInfo.getGroupFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1); + } + } + // update default flowctrl info + if (opsTaskInfo.hasDefFlowCheckId() + && opsTaskInfo.getDefFlowCheckId() >= 0 + && opsTaskInfo.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) { + try { + defFlowCtrlRuleHandler.updateFlowCtrlInfo( + TBaseConstants.META_VALUE_UNDEFINED, + opsTaskInfo.getDefFlowCheckId(), + opsTaskInfo.getDefFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1); } } - // update priority id - int qryPriorityId = opsTaskInfo.hasQryPriorityId() - ? opsTaskInfo.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId(); - if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) { - groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId); - } // update consume control info if (opsTaskInfo.hasCsmFrmMaxOffsetCtrlId() && opsTaskInfo.getCsmFrmMaxOffsetCtrlId() >= 0) { @@ -217,83 +219,81 @@ public class RmtDataCache implements Closeable { /** * update ops task in cache * - * @param response master register response + * @param response master register response + * @param strBuff the string buffer * */ - public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response) { + public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response, + StringBuilder strBuff) { if (response == null) { return; } - // update flowctrl info - if (response.hasGroupFlowCheckId()) { - if (response.getGroupFlowCheckId() >= 0 - && response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) { - try { - groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED, - response.getGroupFlowCheckId(), response.getGroupFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1); - } - } - } - if (response.hasDefFlowCheckId()) { - if (response.getDefFlowCheckId() >= 0 - && response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) { - try { - defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED, - response.getDefFlowCheckId(), response.getDefFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1); - } + // update group flowctrl info + if (response.hasGroupFlowCheckId() + && response.getGroupFlowCheckId() >= 0 + && response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) { + try { + groupFlowCtrlRuleHandler.updateFlowCtrlInfo( + response.getQryPriorityId(), + response.getGroupFlowCheckId(), + response.getGroupFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1); + } + } + // update default flowctrl info + if (response.hasDefFlowCheckId() + && response.getDefFlowCheckId() >= 0 + && response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) { + try { + defFlowCtrlRuleHandler.updateFlowCtrlInfo( + TBaseConstants.META_VALUE_UNDEFINED, + response.getDefFlowCheckId(), + response.getDefFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1); } } - // update priority id - int qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId(); - if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) { - groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId); - } } /** * update ops task in cache * - * @param response master register response + * @param response master register response + * @param strBuff the string buffer * */ - public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response) { + public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response, + StringBuilder strBuff) { if (response == null) { return; } - // update flowctrl info - if (response.hasGroupFlowCheckId()) { - if (response.getGroupFlowCheckId() >= 0 - && response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) { - try { - groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED, - response.getGroupFlowCheckId(), response.getGroupFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1); - } - } - } - if (response.hasDefFlowCheckId()) { - if (response.getDefFlowCheckId() >= 0 - && response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) { - try { - defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED, - response.getDefFlowCheckId(), response.getDefFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1); - } + // update group flowctrl info + if (response.hasGroupFlowCheckId() + && response.getGroupFlowCheckId() >= 0 + && response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) { + try { + groupFlowCtrlRuleHandler.updateFlowCtrlInfo( + response.getQryPriorityId(), + response.getGroupFlowCheckId(), + response.getGroupFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1); + } + } + // update default flowctrl info + if (response.hasDefFlowCheckId() + && response.getDefFlowCheckId() >= 0 + && response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) { + try { + defFlowCtrlRuleHandler.updateFlowCtrlInfo( + TBaseConstants.META_VALUE_UNDEFINED, + response.getDefFlowCheckId(), + response.getDefFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1); } } - // update priority id - int qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId(); - if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) { - groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId); - } } public boolean isCsmFromMaxOffset() { @@ -566,12 +566,16 @@ public class RmtDataCache implements Closeable { if (!this.partitionMap.isEmpty()) { isFirstReport = false; builder.setReportSubInfo(true); + lstReportTime = partMapChgTime.get(); + builder.addAllPartSubInfo(getSubscribedPartitionInfo()); + } + } else { + if (lstReportTime != partMapChgTime.get() + || ((++this.reportIntCount) % this.maxReportTimes == 0)) { + builder.setReportSubInfo(true); + lstReportTime = partMapChgTime.get(); builder.addAllPartSubInfo(getSubscribedPartitionInfo()); } - } else if ((++this.reportIntCount) - % consumerConfig.getMaxSubInfoReportIntvlTimes() == 0) { - builder.setReportSubInfo(true); - builder.addAllPartSubInfo(getSubscribedPartitionInfo()); } return builder.build(); } @@ -1068,7 +1072,7 @@ public class RmtDataCache implements Closeable { for (Map.Entry<BrokerInfo, List<Partition>> entry : unRegisterInfoMap.entrySet()) { for (Partition partition : entry.getValue()) { PartitionExt partitionExt = - partitionMap.remove(partition.getPartitionKey()); + rmvPartitionFromMap(partition.getPartitionKey()); if (partitionExt != null) { lastPackConsumed = partitionExt.isLastPackConsumed(); if (!cancelTimeTask(partition.getPartitionKey()) @@ -1137,7 +1141,7 @@ public class RmtDataCache implements Closeable { try { waitPartitions(partitionKeys, inUseWaitPeriodMs); PartitionExt partitionExt = - partitionMap.remove(partitionKey); + rmvPartitionFromMap(partitionKey); if (partitionExt == null) { result.setSuccResult(null); return result.isSuccess(); @@ -1187,7 +1191,7 @@ public class RmtDataCache implements Closeable { * @param partition partition to be removed */ public void removePartition(Partition partition) { - partitionMap.remove(partition.getPartitionKey()); + rmvPartitionFromMap(partition.getPartitionKey()); cancelTimeTask(partition.getPartitionKey()); indexPartition.remove(partition.getPartitionKey()); partitionUsedMap.remove(partition.getPartitionKey()); @@ -1505,7 +1509,7 @@ public class RmtDataCache implements Closeable { } updateOffsetCache(partition.getPartitionKey(), entry.getValue().getCurrOffset(), entry.getValue().getMaxOffset()); - partitionMap.put(partition.getPartitionKey(), + addPartitionToMap(partition.getPartitionKey(), new PartitionExt(this.groupFlowCtrlRuleHandler, this.defFlowCtrlRuleHandler, partition.getBroker(), partition.getTopic(), partition.getPartitionId())); @@ -1530,6 +1534,19 @@ public class RmtDataCache implements Closeable { && this.dataProcessSync.getCount() != 0); } + private void addPartitionToMap(String partKey, PartitionExt partitionExt) { + partitionMap.put(partKey, partitionExt); + partMapChgTime.set(System.currentTimeMillis()); + } + + private PartitionExt rmvPartitionFromMap(String partKey) { + PartitionExt tmpPartExt = partitionMap.remove(partKey); + if (tmpPartExt != null) { + partMapChgTime.set(System.currentTimeMillis()); + } + return tmpPartExt; + } + private void pauseProcess() { this.dataProcessSync = new CountDownLatch(1); } diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java index d1a1a68aa..330ab1d6b 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java @@ -1035,7 +1035,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer { lstMetaQueryTime.set(System.currentTimeMillis()); } // Get the authorization rules and update the local rules - clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo()); + clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo(), strBuffer); // Get the latest authorized token processHeartBeatAuthorizedToken(response); // Warning if heartbeat interval is too long @@ -1171,7 +1171,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer { clientRmtDataCache.updateReg2MasterTime(); clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(), response.getBrokerConfigListList(), sBuffer); - clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo()); + clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo(), sBuffer); processRegAuthorizedToken(response); result.setSuccResult(); return result.isSuccess(); diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java index 31ecf1180..93cab8faa 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java @@ -88,30 +88,27 @@ public class FlowCtrlRuleHandler { /** * Parse flow control information and update stored cached old content * - * @param qyrPriorityId query priority id + * @param qryPriorityId the query priority id * @param flowCtrlId flow control information id * @param flowCtrlInfo flow control information content - * @throws Exception Exception thrown + * @param strBuff the string buffer + * @throws Exception the exception thrown */ - public void updateFlowCtrlInfo(final int qyrPriorityId, - final long flowCtrlId, - final String flowCtrlInfo) throws Exception { + public void updateFlowCtrlInfo(int qryPriorityId, long flowCtrlId, + String flowCtrlInfo, StringBuilder strBuff) throws Exception { if (flowCtrlId == this.flowCtrlId.get()) { return; } + long befFlowCtrlId; + int befQryPriorityId = TBaseConstants.META_VALUE_UNDEFINED; Map<Integer, List<FlowCtrlItem>> flowCtrlItemsMap = null; if (TStringUtils.isNotBlank(flowCtrlInfo)) { flowCtrlItemsMap = parseFlowCtrlInfo(flowCtrlInfo); } writeLock.lock(); try { - this.flowCtrlId.set(flowCtrlId); + befFlowCtrlId = this.flowCtrlId.getAndSet(flowCtrlId); this.strFlowCtrlInfo = flowCtrlInfo; - logger.info(new StringBuilder(512) - .append("[Flow Ctrl] Updated ").append(flowCtrlName) - .append(" to flowId=").append(flowCtrlId) - .append(",qyrPriorityId=").append(qyrPriorityId).toString()); - this.qryPriorityId.set(qyrPriorityId); clearStatisData(); if (flowCtrlItemsMap == null || flowCtrlItemsMap.isEmpty()) { @@ -120,10 +117,24 @@ public class FlowCtrlRuleHandler { flowCtrlRuleSet = flowCtrlItemsMap; initialStatisData(); } + if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED + && qryPriorityId != this.qryPriorityId.get()) { + befQryPriorityId = this.qryPriorityId.getAndSet(qryPriorityId); + } this.lastUpdateTime = System.currentTimeMillis(); } finally { writeLock.unlock(); } + strBuff.append("[Flow Ctrl] Update ").append(flowCtrlName) + .append(", flowId from ").append(befFlowCtrlId) + .append(" to ").append(flowCtrlId); + if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED + && qryPriorityId != befQryPriorityId) { + strBuff.append(", qryPriorityId from ").append(befQryPriorityId) + .append(" to ").append(qryPriorityId); + } + logger.info(strBuff.toString()); + strBuff.delete(0, strBuff.length()); } /** @@ -782,5 +793,4 @@ public class FlowCtrlRuleHandler { } return timeHour * 100 + timeMin; } - } diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java index 09fc784fa..0ea34a238 100644 --- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java +++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java @@ -39,8 +39,9 @@ public class TestFlowCtrlRuleHandler { @Test public void testFlowCtrlRuleHandler() { try { + StringBuilder strBuff = new StringBuilder(512); FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true); - handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo()); + handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo(), strBuff); TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00"); Calendar rightNow = Calendar.getInstance(timeZone); int hour = rightNow.get(Calendar.HOUR_OF_DAY); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java index 397ccfd1f..437b51f48 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java @@ -329,24 +329,14 @@ public class TubeBroker implements Stoppable { // update default flow controller rules FlowCtrlRuleHandler defFlowCtrlHandler = metadataManager.getFlowCtrlRuleHandler(); - long flowCheckId = defFlowCtrlHandler.getFlowCtrlId(); - int qryPriorityId = defFlowCtrlHandler.getQryPriorityId(); - if (response.hasFlowCheckId()) { - qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : qryPriorityId; - if (response.getFlowCheckId() != flowCheckId) { - flowCheckId = response.getFlowCheckId(); - try { - defFlowCtrlHandler - .updateFlowCtrlInfo(qryPriorityId, - flowCheckId, response.getFlowControlInfo()); - } catch (Exception e1) { - logger.warn( - "[HeartBeat response] found parse flowCtrl rules failure", e1); - } - } - if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) { - defFlowCtrlHandler.setQryPriorityId(qryPriorityId); + if (response.hasFlowCheckId() + && response.getFlowCheckId() >= 0 + && response.getFlowCheckId() != defFlowCtrlHandler.getFlowCtrlId()) { + try { + defFlowCtrlHandler.updateFlowCtrlInfo(response.getQryPriorityId(), + response.getFlowCheckId(), response.getFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[HeartBeat response] found update flowCtrl rules failure", e1); } } // update configure report requirement @@ -450,20 +440,14 @@ public class TubeBroker implements Stoppable { // process default flow controller rules FlowCtrlRuleHandler defFlowCtrlHandler = metadataManager.getFlowCtrlRuleHandler(); - if (response.hasFlowCheckId()) { - int qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : defFlowCtrlHandler.getQryPriorityId(); - if (response.getFlowCheckId() != defFlowCtrlHandler.getFlowCtrlId()) { - try { - defFlowCtrlHandler - .updateFlowCtrlInfo(response.getQryPriorityId(), - response.getFlowCheckId(), response.getFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Register response] found parse flowCtrl rules failure", e1); - } - } - if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) { - defFlowCtrlHandler.setQryPriorityId(qryPriorityId); + if (response.hasFlowCheckId() + && response.getFlowCheckId() >= 0 + && response.getFlowCheckId() != defFlowCtrlHandler.getFlowCtrlId()) { + try { + defFlowCtrlHandler.updateFlowCtrlInfo(response.getQryPriorityId(), + response.getFlowCheckId(), response.getFlowControlInfo(), strBuff); + } catch (Exception e1) { + logger.warn("[Register response] update default flowCtrl rules failure", e1); } } // update auth info