This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0133149cd [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130) 0133149cd is described below commit 0133149cdddab4478be5f45477c066f3d5c72a55 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Oct 11 09:57:50 2022 +0800 [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130) --- .../{AllowedSetting.java => MaxMsgSizeHolder.java} | 26 +- .../tubemq/client/producer/ProducerManager.java | 120 +++---- .../client/producer/SimpleMessageProducer.java | 4 +- .../tubemq/corebase/utils/DataConverterUtil.java | 60 ++-- .../inlong/tubemq/corebase/utils/Tuple3.java | 9 + .../inlong/tubemq/corerpc/RemoteConErrStats.java | 4 +- .../inlong/tubemq/corerpc/client/CallFuture.java | 5 +- .../corerpc/codec/DataConverterUtilTest.java | 6 +- .../inlong/tubemq/server/master/TMaster.java | 392 +++++++++++---------- .../nodemanage/nodebroker/BrokerAbnHolder.java | 6 +- .../nodemanage/nodebroker/BrokerPSInfoHolder.java | 30 +- .../nodemanage/nodebroker/BrokerRunManager.java | 7 +- .../nodemanage/nodebroker/BrokerSyncData.java | 1 - .../nodemanage/nodebroker/BrokerTopicInfoView.java | 44 ++- .../nodemanage/nodebroker/DefBrokerRunManager.java | 15 +- .../master/web/handler/WebMasterInfoHandler.java | 37 +- .../master/web/handler/WebTopicDeployHandler.java | 139 ++++---- 17 files changed, 485 insertions(+), 420 deletions(-) diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java similarity index 66% rename from inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java rename to inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java index 22d02dc6b..80c84049b 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java @@ -17,6 +17,8 @@ package org.apache.inlong.tubemq.client.producer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.inlong.tubemq.corebase.TBaseConstants; @@ -24,16 +26,17 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster; import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils; /** - * The class class caches the dynamic settings + * The class caches the max msg size settings * returned from the server. */ -public class AllowedSetting { - private AtomicLong configId = +public class MaxMsgSizeHolder { + private final AtomicLong configId = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); - private AtomicInteger maxMsgSize = + private final AtomicInteger defMaxMsgSize = new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE); + private Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>(); - public AllowedSetting() { + public MaxMsgSizeHolder() { } @@ -44,18 +47,23 @@ public class AllowedSetting { configId.set(allowedConfig.getConfigId()); } if (allowedConfig.hasMaxMsgSize() - && allowedConfig.getMaxMsgSize() != maxMsgSize.get()) { - maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB( + && allowedConfig.getMaxMsgSize() != defMaxMsgSize.get()) { + defMaxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB( allowedConfig.getMaxMsgSize())); } } } + public void updTopicMaxSizeInB(Map<String, Integer> topicMaxSizeInBMap) { + this.topicMaxSizeInBMap = topicMaxSizeInBMap; + } + public long getConfigId() { return configId.get(); } - public int getMaxMsgSize() { - return maxMsgSize.get(); + public int getDefMaxMsgSize(String topicName) { + Integer maxMsgSizeInB = topicMaxSizeInBMap.get(topicName); + return maxMsgSizeInB == null ? defMaxMsgSize.get() : maxMsgSizeInB; } } diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java index afe929f77..6ce1bfa06 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java @@ -52,6 +52,7 @@ import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil; import org.apache.inlong.tubemq.corebase.utils.MixedUtils; import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.corebase.utils.ThreadUtils; +import org.apache.inlong.tubemq.corebase.utils.Tuple2; import org.apache.inlong.tubemq.corerpc.RpcConfig; import org.apache.inlong.tubemq.corerpc.RpcConstants; import org.apache.inlong.tubemq.corerpc.RpcServiceFactory; @@ -81,8 +82,8 @@ public class ProducerManager { private final ScheduledExecutorService heartbeatService; private final AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); - private final AllowedSetting allowedSetting = - new AllowedSetting(); + private final MaxMsgSizeHolder msgSizeHolder = + new MaxMsgSizeHolder(); private final AtomicReference<String> authAuthorizedTokenRef = new AtomicReference<>(""); private final ClientAuthenticateHandler authenticateHandler = @@ -341,10 +342,11 @@ public class ProducerManager { /** * Get allowed message size. * + * @param topicName the topic name * @return max allowed message size */ - public int getMaxMsgSize() { - return allowedSetting.getMaxMsgSize(); + public int getMaxMsgSize(String topicName) { + return msgSizeHolder.getDefMaxMsgSize(topicName); } /** @@ -516,27 +518,32 @@ public class ProducerManager { return builder.build(); } - private void updateTopicPartitions(List<TopicInfo> topicInfoList) { + private void updateTopicConfigure( + Tuple2<Map<String, Integer>, List<TopicInfo>> topicInfoTuple) { + int baseValue; + Partition tmpPart; + List<Partition> partList; + Map<Integer, List<Partition>> brokerPartList; + // update topic max msg size + msgSizeHolder.updTopicMaxSizeInB(topicInfoTuple.getF0()); + // update topic Partition info Map<String, Map<Integer, List<Partition>>> partitionListMap = new ConcurrentHashMap<>(); - for (TopicInfo topicInfo : topicInfoList) { - Map<Integer, List<Partition>> brokerPartList = + for (TopicInfo topicInfo : topicInfoTuple.getF1()) { + brokerPartList = partitionListMap.get(topicInfo.getTopic()); if (brokerPartList == null) { brokerPartList = new ConcurrentHashMap<>(); partitionListMap.put(topicInfo.getTopic(), brokerPartList); } for (int j = 0; j < topicInfo.getTopicStoreNum(); j++) { - int baseValue = j * TBaseConstants.META_STORE_INS_BASE; + baseValue = j * TBaseConstants.META_STORE_INS_BASE; for (int i = 0; i < topicInfo.getPartitionNum(); i++) { - Partition part = - new Partition(topicInfo.getBroker(), topicInfo.getTopic(), baseValue + i); - List<Partition> partList = brokerPartList.get(part.getBrokerId()); - if (partList == null) { - partList = new ArrayList<>(); - brokerPartList.put(part.getBrokerId(), partList); - } - partList.add(part); + tmpPart = new Partition(topicInfo.getBroker(), + topicInfo.getTopic(), baseValue + i); + partList = brokerPartList.computeIfAbsent( + tmpPart.getBrokerId(), k -> new ArrayList<>()); + partList.add(tmpPart); } } } @@ -595,20 +602,36 @@ public class ProducerManager { processAuthorizedToken(response.getAuthorizedInfo()); } if (response.hasAppdConfig()) { - procAllowedConfig4P(response.getAppdConfig()); + msgSizeHolder.updAllowedSetting(response.getAppdConfig()); } } - private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response) { + private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response, + StringBuilder strBuff) { if (response.hasRequireAuth()) { nextWithAuthInfo2M.set(response.getRequireAuth()); } if (response.hasAppdConfig()) { - procAllowedConfig4P(response.getAppdConfig()); + msgSizeHolder.updAllowedSetting(response.getAppdConfig()); } if (response.hasAuthorizedInfo()) { processAuthorizedToken(response.getAuthorizedInfo()); } + if (response.getErrCode() == TErrCodeConstants.NOT_READY) { + lastHeartbeatTime = System.currentTimeMillis(); + return; + } + if (response.getBrokerCheckSum() != brokerInfoCheckSum) { + updateBrokerInfoList(false, response.getBrokerInfosList(), + response.getBrokerCheckSum(), strBuff); + } + if (response.getTopicInfosList().isEmpty() + && System.currentTimeMillis() - lastEmptyTopicPrintTime > 60000) { + logger.warn("[Heartbeat Update] found empty topicList update!"); + lastEmptyTopicPrintTime = System.currentTimeMillis(); + } + updateTopicConfigure(DataConverterUtil + .convertTopicInfo(brokersMap, response.getTopicInfosList())); } private void processAuthorizedToken(ClientMaster.MasterAuthorizedInfo inAuthorizedTokenInfo) { @@ -654,27 +677,20 @@ public class ProducerManager { private ClientMaster.ApprovedClientConfig.Builder buildAllowedConfig4P() { ClientMaster.ApprovedClientConfig.Builder appdConfig = ClientMaster.ApprovedClientConfig.newBuilder(); - appdConfig.setConfigId(allowedSetting.getConfigId()); + appdConfig.setConfigId(msgSizeHolder.getConfigId()); return appdConfig; } - // set allowed configure info - private void procAllowedConfig4P(ClientMaster.ApprovedClientConfig allowedConfig) { - if (allowedConfig != null) { - allowedSetting.updAllowedSetting(allowedConfig); - } - } - // #lizard forgives private class ProducerHeartbeatTask implements Runnable { @Override public void run() { - StringBuilder sBuilder = new StringBuilder(512); + StringBuilder strBuff = new StringBuilder(512); while (!heartBeatStatus.compareAndSet(0, 1)) { ThreadUtils.sleep(100); } // print metrics information - clientStatsInfo.selfPrintStatsInfo(false, true, sBuilder); + clientStatsInfo.selfPrintStatsInfo(false, true, strBuff); // check whether public topics if (publishTopics.isEmpty()) { return; @@ -689,71 +705,49 @@ public class ProducerManager { clientStatsInfo.bookHB2MasterException(); logger.error("[Heartbeat Failed] receive null HeartResponseM2P response!"); } else { - logger.error(sBuilder.append("[Heartbeat Failed] ") + logger.error(strBuff.append("[Heartbeat Failed] ") .append(response.getErrMsg()).toString()); - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) { clientStatsInfo.bookHB2MasterTimeout(); try { register2Master(); } catch (Throwable ee) { - logger.error(sBuilder + logger.error(strBuff .append("[Heartbeat Failed] re-register failure, error is ") .append(ee.getMessage()).toString()); - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); } } else { clientStatsInfo.bookHB2MasterException(); if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) { - adjustHeartBeatPeriod("certificate failure", sBuilder); + adjustHeartBeatPeriod("certificate failure", strBuff); } } } return; } - processHeartBeatSyncInfo(response); - if (response.getErrCode() == TErrCodeConstants.NOT_READY) { - lastHeartbeatTime = System.currentTimeMillis(); - return; - } - if (response.getBrokerCheckSum() != brokerInfoCheckSum) { - updateBrokerInfoList(false, response.getBrokerInfosList(), - response.getBrokerCheckSum(), sBuilder); - } - if (response.getTopicInfosList() != null) { - if (response.getTopicInfosList().isEmpty() - && System.currentTimeMillis() - lastEmptyTopicPrintTime > 60000) { - logger.warn("[Heartbeat Update] found empty topicList update!"); - lastEmptyTopicPrintTime = System.currentTimeMillis(); - } - updateTopicPartitions(DataConverterUtil - .convertTopicInfo(brokersMap, response.getTopicInfosList())); - } else { - logger.error(sBuilder - .append("[Heartbeat Failed] Found brokerList or topicList is null, brokerList is ") - .append(response.getBrokerInfosList() != null).toString()); - sBuilder.delete(0, sBuilder.length()); - } + processHeartBeatSyncInfo(response, strBuff); heartbeatRetryTimes = 0; long currentTime = System.currentTimeMillis(); if ((currentTime - lastHeartbeatTime) > (tubeClientConfig.getHeartbeatPeriodMs() * 4)) { - logger.warn(sBuilder.append(producerId) + logger.warn(strBuff.append(producerId) .append(" heartbeat interval is too long, please check! Total time : ") .append(currentTime - lastHeartbeatTime).toString()); - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); } lastHeartbeatTime = currentTime; } catch (Throwable e) { - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); if (!(e.getCause() != null && e.getCause() instanceof ClientClosedException)) { logger.error("Heartbeat failed,retry later.Reason:{}", - sBuilder.append(e.getClass().getSimpleName()) + strBuff.append(e.getClass().getSimpleName()) .append("#").append(e.getMessage()).toString()); - sBuilder.delete(0, sBuilder.length()); + strBuff.delete(0, strBuff.length()); } - adjustHeartBeatPeriod("heartbeat exception", sBuilder); + adjustHeartBeatPeriod("heartbeat exception", strBuff); } finally { heartBeatStatus.compareAndSet(1, 0); } diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java index 291c56e71..e049e9804 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java @@ -316,11 +316,11 @@ public class SimpleMessageProducer implements MessageProducer { } int msgSize = TStringUtils.isBlank(message.getAttribute()) ? message.getData().length : (message.getData().length + message.getAttribute().length()); - if (msgSize > producerManager.getMaxMsgSize()) { + if (msgSize > producerManager.getMaxMsgSize(message.getTopic())) { throw new TubeClientException(new StringBuilder(512) .append("Illegal parameter: over max message length for the total size of") .append(" message data and attribute, allowed size is ") - .append(producerManager.getMaxMsgSize()) + .append(producerManager.getMaxMsgSize(message.getTopic())) .append(", message's real size is ").append(msgSize).toString()); } if (isShutDown.get()) { diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java index 39b2c5b06..3d63c8e19 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java @@ -96,31 +96,46 @@ public class DataConverterUtil { /** * convert string info with a brokerInfo list to @link TopicInfo * - * @param brokerInfoMap + * @param brokerInfoMap broker configure map * @param strTopicInfos return a list of TopicInfo */ - public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerInfoMap, - List<String> strTopicInfos) { + public static Tuple2<Map<String, Integer>, List<TopicInfo>> convertTopicInfo( + Map<Integer, BrokerInfo> brokerInfoMap, List<String> strTopicInfos) { List<TopicInfo> topicList = new ArrayList<>(); - if (strTopicInfos != null) { - for (String info : strTopicInfos) { - if (info != null) { - info = info.trim(); - String[] strInfo = info.split(TokenConstants.SEGMENT_SEP); - String[] strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP); - for (String s : strTopicInfoSet) { - String[] strTopicInfo = s.split(TokenConstants.ATTR_SEP); - BrokerInfo brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0])); - if (brokerInfo != null) { - topicList.add(new TopicInfo(brokerInfo, - strInfo[0], Integer.parseInt(strTopicInfo[1]), - Integer.parseInt(strTopicInfo[2]), true, true)); - } - } + Map<String, Integer> topicMaxSizeInBMap = new ConcurrentHashMap<>(); + if (strTopicInfos == null || strTopicInfos.isEmpty()) { + return new Tuple2<>(topicMaxSizeInBMap, topicList); + } + String[] strInfo; + String[] strTopicInfoSet; + String[] strTopicInfo; + BrokerInfo brokerInfo; + for (String info : strTopicInfos) { + if (info == null || info.isEmpty()) { + continue; + } + info = info.trim(); + strInfo = info.split(TokenConstants.SEGMENT_SEP, -1); + strTopicInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP); + for (String s : strTopicInfoSet) { + strTopicInfo = s.split(TokenConstants.ATTR_SEP); + brokerInfo = brokerInfoMap.get(Integer.parseInt(strTopicInfo[0])); + if (brokerInfo != null) { + topicList.add(new TopicInfo(brokerInfo, + strInfo[0], Integer.parseInt(strTopicInfo[1]), + Integer.parseInt(strTopicInfo[2]), true, true)); } } + if (strInfo.length == 2 || TStringUtils.isEmpty(strInfo[2])) { + continue; + } + try { + topicMaxSizeInBMap.put(strInfo[0], Integer.parseInt(strInfo[2])); + } catch (Throwable e) { + // + } } - return topicList; + return new Tuple2<>(topicMaxSizeInBMap, topicList); } /** @@ -166,11 +181,8 @@ public class DataConverterUtil { } String topicName = strInfo[0].trim(); String[] strCondInfo = strInfo[1].split(TokenConstants.ARRAY_SEP); - TreeSet<String> conditionSet = topicConditions.get(topicName); - if (conditionSet == null) { - conditionSet = new TreeSet<>(); - topicConditions.put(topicName, conditionSet); - } + TreeSet<String> conditionSet = + topicConditions.computeIfAbsent(topicName, k -> new TreeSet<>()); for (String cond : strCondInfo) { if (TStringUtils.isNotBlank(cond)) { conditionSet.add(cond.trim()); diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java index 74e628625..2e857f4b6 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/Tuple3.java @@ -56,6 +56,15 @@ public class Tuple3<T0, T1, T2> { return f2; } + public void setF0AndF1(T0 value0, T1 value1) { + this.f0 = value0; + this.f1 = value1; + } + + public void setF2(T2 value2) { + this.f2 = value2; + } + public void setFieldsValue(T0 value0, T1 value1, T2 value2) { this.f0 = value0; this.f1 = value1; diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java index deb16d692..35728d5c9 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RemoteConErrStats.java @@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicLong; public class RemoteConErrStats { private long statisticDuration = 60000; private int maxConnAllowedFailCount = 5; - private AtomicLong errCounter = new AtomicLong(0); - private AtomicLong lastTimeStamp = new AtomicLong(0); + private final AtomicLong errCounter = new AtomicLong(0); + private final AtomicLong lastTimeStamp = new AtomicLong(0); public RemoteConErrStats(final long statisticDuration, final int maxConnAllowedFailCount) { diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java index 492017e7e..77837a28d 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/client/CallFuture.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; * A Future implementation for RPCs. */ public class CallFuture<T> implements Future<T>, Callback<T> { + private static final String errMsgInfo = "Wait response message timeout!"; private final CountDownLatch latch = new CountDownLatch(1); private final Callback<T> chainedCallback; private T result = null; @@ -131,7 +132,7 @@ public class CallFuture<T> implements Future<T>, Callback<T> { } return result; } else { - throw new TimeoutException(); + throw new TimeoutException(errMsgInfo); } } @@ -155,7 +156,7 @@ public class CallFuture<T> implements Future<T>, Callback<T> { public void await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { if (!latch.await(timeout, unit)) { - throw new TimeoutException(); + throw new TimeoutException(errMsgInfo); } } diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java index bc5e26d93..f7ef9576e 100644 --- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java +++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java @@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.corebase.cluster.Partition; import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo; import org.apache.inlong.tubemq.corebase.cluster.TopicInfo; import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil; +import org.apache.inlong.tubemq.corebase.utils.Tuple2; import org.junit.Test; public class DataConverterUtilTest { @@ -65,8 +66,9 @@ public class DataConverterUtilTest { strInfoList.clear(); // topic#brokerId:partitionNum:topicStoreNum strInfoList.add("tube#0:10:5"); - List<TopicInfo> topicList = DataConverterUtil.convertTopicInfo(brokerMap, strInfoList); - assertEquals("topic should be equal", topic, topicList.get(0)); + Tuple2<Map<String, Integer>, List<TopicInfo>> topicConfTuple = + DataConverterUtil.convertTopicInfo(brokerMap, strInfoList); + assertEquals("topic should be equal", topic, topicConfTuple.getF1().get(0)); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index c18c54b22..2b802de98 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -82,6 +82,7 @@ import org.apache.inlong.tubemq.corebase.utils.OpsSyncInfo; import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.corebase.utils.ThreadUtils; import org.apache.inlong.tubemq.corebase.utils.Tuple2; +import org.apache.inlong.tubemq.corebase.utils.Tuple3; import org.apache.inlong.tubemq.corerpc.RpcConfig; import org.apache.inlong.tubemq.corerpc.RpcConstants; import org.apache.inlong.tubemq.corerpc.RpcServiceFactory; @@ -352,13 +353,17 @@ public class TMaster extends HasThread implements MasterService, Stoppable { heartbeatManager.regProducerNode(producerId); producerHolder.setProducerInfo(producerId, new HashSet<>(transTopicSet), hostName, overtls); + // get current configure information Tuple2<Long, Map<Integer, String>> brokerStaticInfo = brokerRunManager.getBrokerStaticInfo(overtls); + Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple = + getTopicConfigureInfos(producerId, true); builder.setBrokerCheckSum(brokerStaticInfo.getF0()); builder.addAllBrokerInfos(brokerStaticInfo.getF1().values()); - builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build()); + builder.setAuthorizedInfo(genAuthorizedInfo( + certResult.authorizedToken, false).build()); ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder = - buildApprovedClientConfig(request.getAppdConfig()); + buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple); if (clientConfigBuilder != null) { builder.setAppdConfig(clientConfigBuilder); } @@ -443,24 +448,30 @@ public class TMaster extends HasThread implements MasterService, Stoppable { topicPSInfoManager.addProducerTopicPubInfo(producerId, transTopicSet); producerHolder.updateProducerInfo(producerId, transTopicSet, hostName, overtls); - Map<String, String> availTopicPartitions = getProducerTopicPartitionInfo(producerId); - builder.addAllTopicInfos(availTopicPartitions.values()); - builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build()); + // get current configure information and set Tuple2<Long, Map<Integer, String>> brokerStaticInfo = brokerRunManager.getBrokerStaticInfo(overtls); + final Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple = + getTopicConfigureInfos(producerId, false); + builder.setAuthorizedInfo(genAuthorizedInfo( + certResult.authorizedToken, false).build()); builder.setBrokerCheckSum(brokerStaticInfo.getF0()); if (brokerStaticInfo.getF0() != inBrokerCheckSum) { builder.addAllBrokerInfos(brokerStaticInfo.getF1().values()); } + if (prodTopicConfigTuple.getF2() != null) { + builder.addAllTopicInfos(prodTopicConfigTuple.getF2().values()); + } ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder = - buildApprovedClientConfig(request.getAppdConfig()); + buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple); if (clientConfigBuilder != null) { builder.setAppdConfig(clientConfigBuilder); } if (logger.isDebugEnabled()) { logger.debug(strBuffer.append("[Push Producer's available topic count:]") .append(producerId).append(TokenConstants.LOG_SEG_SEP) - .append(availTopicPartitions.size()).toString()); + .append((prodTopicConfigTuple.getF2() == null) + ? 0 : prodTopicConfigTuple.getF2().size()).toString()); } builder.setSuccess(true); builder.setErrCode(TErrCodeConstants.SUCCESS); @@ -564,6 +575,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } final Set<String> reqTopicSet = (Set<String>) result.getRetData(); + final Map<String, TreeSet<String>> reqTopicConditions = + DataConverterUtil.convertTopicConditions(request.getTopicConditionList()); String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : ""; ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound()) ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL; @@ -576,8 +589,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } Map<String, Long> requiredPartMap = (Map<String, Long>) paramCheckResult.checkData; - Map<String, TreeSet<String>> reqTopicConditions = - DataConverterUtil.convertTopicConditions(request.getTopicConditionList()); String sessionKey = request.hasSessionKey() ? request.getSessionKey() : ""; long sessionTime = request.hasSessionTime() ? request.getSessionTime() : System.currentTimeMillis(); @@ -642,20 +653,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable { consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData; topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet); if (CollectionUtils.isNotEmpty(subscribeList)) { - Map<String, Map<String, Partition>> topicPartSubMap = - new HashMap<>(); + int reportCnt = 0; + Map<String, Partition> partMap; + Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>(); currentSubInfo.put(consumerId, topicPartSubMap); + strBuffer.append("[SubInfo Report] client=").append(consumerId) + .append(", subscribed partitions=["); for (SubscribeInfo info : subscribeList) { - Map<String, Partition> partMap = topicPartSubMap.get(info.getTopic()); - if (partMap == null) { - partMap = new HashMap<>(); - topicPartSubMap.put(info.getTopic(), partMap); - } + partMap = topicPartSubMap.computeIfAbsent( + info.getTopic(), k -> new HashMap<>()); partMap.put(info.getPartition().getPartitionKey(), info.getPartition()); - logger.info(strBuffer.append("[SubInfo Report]") - .append(info.toString()).toString()); - strBuffer.delete(0, strBuffer.length()); + if (reportCnt++ > 0) { + strBuffer.append(","); + } + strBuffer.append(info.getPartitionStr()); } + strBuffer.append("]"); + logger.info(strBuffer.toString()); + strBuffer.delete(0, strBuffer.length()); } heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId)); } catch (IOException e) { @@ -1601,19 +1616,29 @@ public class TMaster extends HasThread implements MasterService, Stoppable { * @param producerId * @return */ - private Map<String, String> getProducerTopicPartitionInfo(String producerId) { - ProducerInfo producerInfo = - producerHolder.getProducerInfo(producerId); + private Tuple3<Long, Integer, Map<String, String>> getTopicConfigureInfos(String producerId, + boolean onlyMsgSize) { + Tuple3<Long, Integer, Map<String, String>> result = new Tuple3<>(); + ClusterSettingEntity clusterEntity = + defMetaDataService.getClusterDefSetting(false); + result.setF0AndF1(clusterEntity.getSerialId(), + clusterEntity.getMaxMsgSizeInB()); + if (onlyMsgSize) { + return result; + } + ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId); if (producerInfo == null) { - return new HashMap<>(); + return result; } - Set<String> producerInfoTopicSet = - producerInfo.getTopicSet(); - if ((producerInfoTopicSet == null) - || (producerInfoTopicSet.isEmpty())) { - return new HashMap<>(); + Set<String> publishedTopicSet = producerInfo.getTopicSet(); + if ((publishedTopicSet == null) + || (publishedTopicSet.isEmpty())) { + return result; } - return brokerRunManager.getPubBrokerAcceptPubPartInfo(producerInfoTopicSet); + Map<String, Integer> topicAndSizeMap = + defMetaDataService.getMaxMsgSizeInBByTopics(result.getF1(), publishedTopicSet); + result.setF2(brokerRunManager.getPubBrokerAcceptPubPartInfo(topicAndSizeMap)); + return result; } @Override @@ -1642,12 +1667,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final StringBuilder strBuffer = new StringBuilder(512); final long balanceId = idGenerator.incrementAndGet(); if (defMetaDataService != null) { - logger.info(strBuffer.append("[Balance Start] ").append(balanceId) + logger.info(strBuffer.append("[Balance Status] ").append(balanceId) .append(", isMaster=").append(defMetaDataService.isSelfMaster()) .append(", isPrimaryNodeActive=") .append(defMetaDataService.isPrimaryNodeActive()).toString()); } else { - logger.info(strBuffer.append("[Balance Start] ").append(balanceId) + logger.info(strBuffer.append("[Balance Status] ").append(balanceId) .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString()); } strBuffer.delete(0, strBuffer.length()); @@ -1655,7 +1680,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { processClientBalanceMetaInfo(balanceId, strBuffer); // process server-balance processServerBalance(tMaster, balanceId, strBuffer); - logger.info(strBuffer.append("[Balance End] ").append(balanceId).toString()); } private void processServerBalance(TMaster tMaster, @@ -1663,7 +1687,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { StringBuilder sBuffer) { int curDoingTasks = this.curSvrBalanceParal.get(); if (curDoingTasks > 0) { - logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId) + logger.info(sBuffer.append("[Svr-Balance Status] ").append(balanceId) .append(" the Server-Balance has ").append(curDoingTasks) .append(" task(s) in progress!").toString()); sBuffer.delete(0, sBuffer.length()); @@ -1704,13 +1728,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (subGroups.isEmpty()) { return; } + final StringBuilder strBuffer = new StringBuilder(512); // first process reset rebalance task; try { tMaster.processResetbalance(balanceId, - isStartBalance, subGroups); + isStartBalance, subGroups, strBuffer); } catch (Throwable e) { logger.warn(new StringBuilder(1024) - .append("[Svr-Balance processor] Error during reset-reb,") + .append("[Svr-Balance Status] Error during reset-reb,") .append("the groups that may be affected are ") .append(subGroups).append(",error is ") .append(e).toString()); @@ -1721,16 +1746,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable { // second process normal balance task; try { tMaster.processRebalance(balanceId, - isStartBalance, subGroups); + isStartBalance, subGroups, strBuffer); } catch (Throwable e) { logger.warn(new StringBuilder(1024) - .append("[Svr-Balance processor] Error during normal-reb,") + .append("[Svr-Balance Status] Error during normal-reb,") .append("the groups that may be affected are ") .append(subGroups).append(",error is ") .append(e).toString()); } } catch (Throwable e) { - logger.warn("[Svr-Balance processor] Error during process", e); + logger.warn("[Svr-Balance Status] Error during process", e); } finally { if (curSvrBalanceParal.decrementAndGet() == 0) { MasterSrvStatsHolder.updSvrBalanceDurations( @@ -1742,14 +1767,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } startupBalance = false; - logger.info(sBuffer.append("[Svr-Balance End] ").append(balanceId).toString()); - sBuffer.delete(0, sBuffer.length()); } private void processClientBalanceMetaInfo(long balanceId, StringBuilder sBuffer) { int curDoingTasks = this.curCltBalanceParal.get(); if (curDoingTasks > 0) { - logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId) + logger.info(sBuffer.append("[Clt-Balance Status] ").append(balanceId) .append(" the Client-Balance has ").append(curDoingTasks) .append(" task(s) in progress!").toString()); sBuffer.delete(0, sBuffer.length()); @@ -1794,7 +1817,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { freshTopicMetaInfo(consumeGroupInfo, sBuffer2); } } catch (Throwable e) { - logger.warn("[Clt-Balance processor] Error during process", e); + logger.warn("[Clt-Balance Status] Error during process", e); } finally { curCltBalanceParal.decrementAndGet(); } @@ -1802,8 +1825,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { }); } } - logger.info(sBuffer.append("[Clt-Balance End] ").append(balanceId).toString()); - sBuffer.delete(0, sBuffer.length()); } public void freshTopicMetaInfo(ConsumeGroupInfo consumeGroupInfo, StringBuilder sBuffer) { @@ -1826,7 +1847,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } int count = 0; int statusId = 0; - TopicInfo topicInfo; + Tuple2<Boolean, TopicInfo> topicSubInfo = new Tuple2<>(); Set<String> fbdTopicSet = defMetaDataService.getDisableTopicByGroupName(groupInfo.getGroupName()); for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployInfoMap.entrySet()) { @@ -1834,14 +1855,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { continue; } count = 0; - statusId = 0; for (TopicDeployEntity deployInfo : entry.getValue()) { - topicInfo = - brokerRunManager.getPubBrokerTopicInfo( - deployInfo.getBrokerId(), deployInfo.getTopicName()); - if (topicInfo != null - && topicInfo.isAcceptSubscribe() - && !fbdTopicSet.contains(topicInfo.getTopic())) { + statusId = 0; + brokerRunManager.getSubBrokerTopicInfo(deployInfo.getBrokerId(), + deployInfo.getTopicName(), topicSubInfo); + if (topicSubInfo.getF0() + && topicSubInfo.getF1() != null + && topicSubInfo.getF1().isAcceptSubscribe() + && !fbdTopicSet.contains(deployInfo.getTopicName())) { statusId = 1; } if (count++ == 0) { @@ -1868,11 +1889,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { * @param rebalanceId the re-balance id * @param isFirstReb whether is first re-balance * @param groups the need re-balance group set + * @param strBuffer string buffer */ - public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) { + public void processRebalance(long rebalanceId, boolean isFirstReb, + List<String> groups, StringBuilder strBuffer) { // #lizard forgives - Map<String, Map<String, List<Partition>>> finalSubInfoMap = null; - final StringBuilder strBuffer = new StringBuilder(512); + Map<String, Map<String, List<Partition>>> finalSubInfoMap; // choose different load balance strategy if (isFirstReb) { finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder, @@ -1881,122 +1903,91 @@ public class TMaster extends HasThread implements MasterService, Stoppable { finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo, consumerHolder, brokerRunManager, groups, defMetaDataService, strBuffer); } + boolean included; + String consumerId; + boolean isDelEmpty; + boolean isAddEmtpy; + ConsumerInfo consumerInfo; + Set<String> blackTopicSet; + List<SubscribeInfo> deletedSubInfoList; + List<SubscribeInfo> addedSubInfoList; + Map<String, Partition> currentPartMap; + Map<String, Map<String, Partition>> curTopicSubInfoMap; // allocate partitions to consumers for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) { if (entry == null) { continue; } - String consumerId = entry.getKey(); + consumerId = entry.getKey(); if (consumerId == null) { continue; } - ConsumerInfo consumerInfo = - consumerHolder.getConsumerInfo(consumerId); + consumerInfo = consumerHolder.getConsumerInfo(consumerId); if (consumerInfo == null) { continue; } - Set<String> blackTopicSet = + addedSubInfoList = new ArrayList<>(); + deletedSubInfoList = new ArrayList<>(); + blackTopicSet = defMetaDataService.getDisableTopicByGroupName(consumerInfo.getGroupName()); - Map<String, List<Partition>> topicSubPartMap = entry.getValue(); - List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); - List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); - for (Map.Entry<String, List<Partition>> topicEntry : topicSubPartMap.entrySet()) { + for (Map.Entry<String, List<Partition>> topicEntry : entry.getValue().entrySet()) { if (topicEntry == null) { continue; } - String topic = topicEntry.getKey(); - List<Partition> finalPartList = topicEntry.getValue(); - Map<String, Partition> currentPartMap = null; - Map<String, Map<String, Partition>> curTopicSubInfoMap = - currentSubInfo.get(consumerId); - if (curTopicSubInfoMap == null || curTopicSubInfoMap.get(topic) == null) { + curTopicSubInfoMap = currentSubInfo.get(consumerId); + if (curTopicSubInfoMap == null + || curTopicSubInfoMap.get(topicEntry.getKey()) == null) { currentPartMap = new HashMap<>(); } else { - currentPartMap = curTopicSubInfoMap.get(topic); + currentPartMap = curTopicSubInfoMap.get(topicEntry.getKey()); if (currentPartMap == null) { currentPartMap = new HashMap<>(); } } - if (consumerInfo.isOverTLS()) { - for (Partition currentPart : currentPartMap.values()) { - if (!blackTopicSet.contains(currentPart.getTopic())) { - boolean found = false; - for (Partition newPart : finalPartList) { - if (newPart.getPartitionFullStr(true) - .equals(currentPart.getPartitionFullStr(true))) { - found = true; - break; - } - } - if (found) { + for (Partition currentPart : currentPartMap.values()) { + included = false; + if (!blackTopicSet.contains(currentPart.getTopic())) { + for (Partition newPart : topicEntry.getValue()) { + if (newPart == null) { continue; } - } - deletedSubInfoList - .add(new SubscribeInfo(consumerId, consumerInfo.getGroupName(), - consumerInfo.isOverTLS(), currentPart)); - } - for (Partition finalPart : finalPartList) { - if (!blackTopicSet.contains(finalPart.getTopic())) { - boolean found = false; - for (Partition curPart : currentPartMap.values()) { - if (finalPart.getPartitionFullStr(true) - .equals(curPart.getPartitionFullStr(true))) { - found = true; - break; - } - } - if (found) { - continue; + if (newPart.getPartitionKey().equals( + currentPart.getPartitionKey())) { + included = true; + break; } - addedSubInfoList.add(new SubscribeInfo(consumerId, - consumerInfo.getGroupName(), true, finalPart)); } } - } else { - for (Partition currentPart : currentPartMap.values()) { - if ((blackTopicSet.contains(currentPart.getTopic())) - || (!finalPartList.contains(currentPart))) { - deletedSubInfoList.add(new SubscribeInfo(consumerId, - consumerInfo.getGroupName(), false, currentPart)); - } + if (!included) { + deletedSubInfoList.add(new SubscribeInfo(consumerId, + consumerInfo.getGroupName(), consumerInfo.isOverTLS(), currentPart)); } - for (Partition finalPart : finalPartList) { - if ((currentPartMap.get(finalPart.getPartitionKey()) == null) - && (!blackTopicSet.contains(finalPart.getTopic()))) { - addedSubInfoList.add(new SubscribeInfo(consumerId, - consumerInfo.getGroupName(), false, finalPart)); - } + } + for (Partition finalPart : topicEntry.getValue()) { + if ((currentPartMap.get(finalPart.getPartitionKey()) == null) + && (!blackTopicSet.contains(finalPart.getTopic()))) { + addedSubInfoList.add(new SubscribeInfo(consumerId, + consumerInfo.getGroupName(), consumerInfo.isOverTLS(), finalPart)); } } } - boolean isDelEmpty = deletedSubInfoList.isEmpty(); - boolean isAddEmtpy = addedSubInfoList.isEmpty(); + isDelEmpty = deletedSubInfoList.isEmpty(); + isAddEmtpy = addedSubInfoList.isEmpty(); if (!isDelEmpty) { - EventType opType = - (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT; - consumerEventManager - .addDisconnectEvent(consumerId, - new ConsumerEvent(rebalanceId, opType, - deletedSubInfoList, EventStatus.TODO)); - for (SubscribeInfo info : deletedSubInfoList) { - logger.info(strBuffer.append("[Disconnect]") - .append(info.toString()).toString()); - strBuffer.delete(0, strBuffer.length()); - } + consumerEventManager.addDisconnectEvent(consumerId, + new ConsumerEvent(rebalanceId, + (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT, + deletedSubInfoList, EventStatus.TODO)); + printTODOContent(rebalanceId, consumerId, + "Disconnect", deletedSubInfoList, strBuffer); } if (!isAddEmtpy) { - EventType opType = - (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT; - consumerEventManager - .addConnectEvent(consumerId, - new ConsumerEvent(rebalanceId, opType, - addedSubInfoList, EventStatus.TODO)); - for (SubscribeInfo info : addedSubInfoList) { - logger.info(strBuffer.append("[Connect]") - .append(info.toString()).toString()); - strBuffer.delete(0, strBuffer.length()); - } + consumerEventManager.addConnectEvent(consumerId, + new ConsumerEvent(rebalanceId, + (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT, + addedSubInfoList, EventStatus.TODO)); + printTODOContent(rebalanceId, consumerId, + "Connect", addedSubInfoList, strBuffer); } } } @@ -2004,10 +1995,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { /** * process Reset balance */ - public void processResetbalance(long rebalanceId, boolean isFirstReb, List<String> groups) { + public void processResetbalance(long rebalanceId, boolean isFirstReb, + List<String> groups, StringBuilder strBuffer) { // #lizard forgives - final StringBuilder strBuffer = new StringBuilder(512); - Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap = null; + Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap; // choose different load balance strategy if (isFirstReb) { finalSubInfoMap = this.loadBalancer.resetBukAssign(consumerHolder, @@ -2016,46 +2007,51 @@ public class TMaster extends HasThread implements MasterService, Stoppable { finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo, consumerHolder, brokerRunManager, groups, this.defMetaDataService, strBuffer); } + String consumerId; + boolean isAddEmtpy; + boolean isDelEmpty; + ConsumerInfo consumerInfo; + Set<String> blackTopicSet; + List<SubscribeInfo> addedSubInfoList; + List<SubscribeInfo> deletedSubInfoList; + Map<String, Partition> finalPartMap; + Map<String, Partition> currentPartMap; + Map<String, Map<String, Partition>> curTopicSubInfoMap; // filter for (Map.Entry<String, Map<String, Map<String, Partition>>> entry : finalSubInfoMap.entrySet()) { if (entry == null) { continue; } - String consumerId = entry.getKey(); + consumerId = entry.getKey(); if (consumerId == null) { continue; } - ConsumerInfo consumerInfo = - consumerHolder.getConsumerInfo(consumerId); + consumerInfo = consumerHolder.getConsumerInfo(consumerId); if (consumerInfo == null) { continue; } // allocate partitions to consumers - Set<String> blackTopicSet = + addedSubInfoList = new ArrayList<>(); + deletedSubInfoList = new ArrayList<>(); + blackTopicSet = defMetaDataService.getDisableTopicByGroupName(consumerInfo.getGroupName()); - Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue(); - List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); - List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); - for (Map.Entry<String, Map<String, Partition>> topicEntry : topicSubPartMap.entrySet()) { + for (Map.Entry<String, Map<String, Partition>> topicEntry : entry.getValue().entrySet()) { if (topicEntry == null) { continue; } - String topic = topicEntry.getKey(); - Map<String, Partition> finalPartMap = topicEntry.getValue(); - Map<String, Partition> currentPartMap = null; - Map<String, Map<String, Partition>> curTopicSubInfoMap = - currentSubInfo.get(consumerId); + curTopicSubInfoMap = currentSubInfo.get(consumerId); if (curTopicSubInfoMap == null - || curTopicSubInfoMap.get(topic) == null) { + || curTopicSubInfoMap.get(topicEntry.getKey()) == null) { currentPartMap = new HashMap<>(); } else { - currentPartMap = curTopicSubInfoMap.get(topic); + currentPartMap = curTopicSubInfoMap.get(topicEntry.getKey()); if (currentPartMap == null) { currentPartMap = new HashMap<>(); } } // filter + finalPartMap = topicEntry.getValue(); for (Partition currentPart : currentPartMap.values()) { if ((blackTopicSet.contains(currentPart.getTopic())) || (finalPartMap.get(currentPart.getPartitionKey()) == null)) { @@ -2073,35 +2069,55 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } // generate consumer event - boolean isDelEmpty = deletedSubInfoList.isEmpty(); - boolean isAddEmtpy = addedSubInfoList.isEmpty(); + isDelEmpty = deletedSubInfoList.isEmpty(); + isAddEmtpy = addedSubInfoList.isEmpty(); if (!isDelEmpty) { - EventType opType = - (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT; consumerEventManager.addDisconnectEvent(consumerId, - new ConsumerEvent(rebalanceId, opType, + new ConsumerEvent(rebalanceId, + (!isAddEmtpy) ? EventType.DISCONNECT : EventType.ONLY_DISCONNECT, deletedSubInfoList, EventStatus.TODO)); - for (SubscribeInfo info : deletedSubInfoList) { - logger.info(strBuffer.append("[ResetDisconnect]") - .append(info.toString()).toString()); - strBuffer.delete(0, strBuffer.length()); - } + printTODOContent(rebalanceId, consumerId, + "ResetDisconnect", deletedSubInfoList, strBuffer); } if (!isAddEmtpy) { - EventType opType = - (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT; consumerEventManager.addConnectEvent(consumerId, - new ConsumerEvent(rebalanceId, opType, + new ConsumerEvent(rebalanceId, + (!isDelEmpty) ? EventType.CONNECT : EventType.ONLY_CONNECT, addedSubInfoList, EventStatus.TODO)); - for (SubscribeInfo info : addedSubInfoList) { - logger.info(strBuffer.append("[ResetConnect]") - .append(info.toString()).toString()); - strBuffer.delete(0, strBuffer.length()); - } + printTODOContent(rebalanceId, consumerId, + "ResetConnect", addedSubInfoList, strBuffer); } } } + /** + * Print the TODO subscribe info + * + * @param rebalanceId the rebalance id + * @param consumerId the consumer id + * @param opType the operation type + * @param subInfoList the subscribe set + * @param strBuffer the string buffer + */ + private void printTODOContent(long rebalanceId, String consumerId, String opType, + List<SubscribeInfo> subInfoList, StringBuilder strBuffer) { + int recordCnt = 0; + strBuffer.append("[").append(opType).append("] TODO, rebalanceId=").append(rebalanceId) + .append(", client=").append(consumerId).append(", partitions=["); + for (SubscribeInfo info : subInfoList) { + if (info == null) { + continue; + } + if (recordCnt++ > 0) { + strBuffer.append(","); + } + strBuffer.append(info.getPartitionStr()); + } + strBuffer.append("]"); + logger.info(strBuffer.toString()); + strBuffer.delete(0, strBuffer.length()); + } + /** * check if master subscribe info consist consumer subscribe info * @@ -2177,6 +2193,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { List<String> groupsNeedToBalance = new ArrayList<>(); Set<String> groupHasUnfinishedEvent = new HashSet<>(); if (consumerEventManager.hasEvent()) { + String group; Set<String> consumerIdSet = consumerEventManager.getUnProcessedIdSet(); Map<String, TimeoutInfo> heartbeatMap = @@ -2185,7 +2202,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (consumerId == null) { continue; } - String group = consumerHolder.getGroupName(consumerId); + group = consumerHolder.getGroupName(consumerId); if (group == null) { continue; } @@ -2238,23 +2255,20 @@ public class TMaster extends HasThread implements MasterService, Stoppable { * build approved client configure * * @param inClientConfig client reported Configure info + * @param prodConfigTuple published max topic size tuple * @return ApprovedClientConfig */ private ClientMaster.ApprovedClientConfig.Builder buildApprovedClientConfig( - ClientMaster.ApprovedClientConfig inClientConfig) { + ClientMaster.ApprovedClientConfig inClientConfig, + Tuple3<Long, Integer, Map<String, String>> prodConfigTuple) { ClientMaster.ApprovedClientConfig.Builder outClientConfig = null; - if (inClientConfig != null) { - outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder(); - ClusterSettingEntity settingEntity = - this.defMetaDataService.getClusterDefSetting(false); - if (settingEntity == null) { - outClientConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED); - } else { - outClientConfig.setConfigId(settingEntity.getSerialId()); - if (settingEntity.getSerialId() != inClientConfig.getConfigId()) { - outClientConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB()); - } - } + if (inClientConfig == null) { + return outClientConfig; + } + outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder(); + outClientConfig.setConfigId(prodConfigTuple.getF0()); + if (inClientConfig.getConfigId() != prodConfigTuple.getF0()) { + outClientConfig.setMaxMsgSize(prodConfigTuple.getF1()); } return outClientConfig; } @@ -2407,7 +2421,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { .append(consumerId).append(" report the info: ["); printHeader = false; } - sBuffer.append(type).append(info.toString()).append(", "); + sBuffer.append(type).append(info).append(", "); return printHeader; } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java index 8b066bab9..7a90fab08 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java @@ -165,8 +165,8 @@ public class BrokerAbnHolder { if (brokerFbdInfo == null) { return retTuple; } - retTuple.setF0AndF1(brokerFbdInfo.newStatus.isAcceptPublish(), - brokerFbdInfo.newStatus.isAcceptSubscribe()); + retTuple.setF0AndF1(!brokerFbdInfo.newStatus.isAcceptPublish(), + !brokerFbdInfo.newStatus.isAcceptSubscribe()); return retTuple; } @@ -191,7 +191,6 @@ public class BrokerAbnHolder { brokerForbiddenCount.set(0); brokerAbnormalMap.clear(); brokerForbiddenMap.clear(); - } public int getCurrentBrokerCount() { @@ -355,5 +354,4 @@ public class BrokerAbnHolder { .toString(); } } - } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java index d3cf14c48..d8433c84f 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java @@ -166,13 +166,26 @@ public class BrokerPSInfoHolder { return subTopicInfoView.getAcceptSubParts(topic, enableSubBrokerIdSet); } + /** + * Get the subscribed TopicInfo information of topic in broker + * + * @param brokerId need query broker + * @param topic need query topic + * @param result query result(broker accept subscribe, null or topicInfo configure) + */ + public void getBrokerSubPushedTopicInfo(int brokerId, String topic, + Tuple2<Boolean, TopicInfo> result) { + result.setF0AndF1(enableSubBrokerIdSet.contains(brokerId), + subTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic)); + } + /** * Gets the string map of topic partitions whose publish status is enabled * - * @param topicSet need query topic set + * @param topicSizeMap need query topic set and topic's max message size */ - public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet) { - return pubTopicInfoView.getAcceptPubPartInfo(topicSet, enablePubBrokerIdSet); + public Map<String, String> getAcceptPubPartInfo(Map<String, Integer> topicSizeMap) { + return pubTopicInfoView.getAcceptPubPartInfo(topicSizeMap, enablePubBrokerIdSet); } /** @@ -180,11 +193,14 @@ public class BrokerPSInfoHolder { * * @param brokerId need query broker * @param topic need query topic - * - * @return null or topicInfo configure + * @param result query result(broker accept publish, + * broker accept subscribe, null or topicInfo configure) */ - public TopicInfo getBrokerPubPushedTopicInfo(int brokerId, String topic) { - return pubTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic); + public void getBrokerPubPushedTopicInfo(int brokerId, String topic, + Tuple3<Boolean, Boolean, TopicInfo> result) { + result.setFieldsValue(enablePubBrokerIdSet.contains(brokerId), + enableSubBrokerIdSet.contains(brokerId), + pubTopicInfoView.getBrokerPushedTopicInfo(brokerId, topic)); } /** diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java index 73d8e5a1d..1d768a5d3 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java @@ -92,7 +92,7 @@ public interface BrokerRunManager { BrokerAbnHolder getBrokerAbnHolder(); - Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet); + Map<String, String> getPubBrokerAcceptPubPartInfo(Map<String, Integer> topicSizeMap); int getSubTopicMaxBrokerCount(Set<String> topicSet); @@ -100,7 +100,10 @@ public interface BrokerRunManager { List<Partition> getSubBrokerAcceptSubParts(String topic); - TopicInfo getPubBrokerTopicInfo(int brokerId, String topic); + void getSubBrokerTopicInfo(int brokerId, String topic, Tuple2<Boolean, TopicInfo> result); + + void getPubBrokerTopicInfo(int brokerId, String topic, + Tuple3<Boolean, Boolean, TopicInfo> result); void getPubBrokerPushedTopicInfo(int brokerId, Tuple3<Boolean, Boolean, List<TopicInfo>> result); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java index 77cd64faf..a9a50b764 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java @@ -338,5 +338,4 @@ public class BrokerSyncData { } return CheckSum.crc32(buffer.array()); } - } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java index 6ac080b97..b63251867 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java @@ -195,23 +195,23 @@ public class BrokerTopicInfoView { /** * Gets the string map of topic partitions whose publish status is enabled * - * @param topicSet need query topic set + * @param topicSizeMap need query topic and maxsize map * @param enablePubBrokerIdSet need filtered broker id set * @return query result */ - public Map<String, String> getAcceptPubPartInfo(Set<String> topicSet, + public Map<String, String> getAcceptPubPartInfo(Map<String, Integer> topicSizeMap, Set<Integer> enablePubBrokerIdSet) { TopicInfo topicInfo; ConcurrentHashMap<Integer, TopicInfo> topicInfoView; Map<String, String> topicPartStrMap = new HashMap<>(); Map<String, StringBuilder> topicPartBufferMap = new HashMap<>(); - if (topicSet == null || topicSet.isEmpty()) { + if (topicSizeMap == null || topicSizeMap.isEmpty()) { return topicPartStrMap; } - for (String topic : topicSet) { - if (topic == null) { - continue; - } + // build topic-partition information + StringBuilder tmpValue; + StringBuilder confValue; + for (String topic : topicSizeMap.keySet()) { topicInfoView = topicConfInfoMap.get(topic); if (topicInfoView == null || topicInfoView.isEmpty()) { @@ -225,23 +225,33 @@ public class BrokerTopicInfoView { } topicInfo = entry.getValue(); if (topicInfo.isAcceptPublish()) { - StringBuilder tmpValue = topicPartBufferMap.get(topic); - if (tmpValue == null) { - StringBuilder strBuffer = - new StringBuilder(512).append(topic) - .append(TokenConstants.SEGMENT_SEP) - .append(topicInfo.getSimpleValue()); - topicPartBufferMap.put(topic, strBuffer); + confValue = topicPartBufferMap.get(topic); + if (confValue == null) { + tmpValue = new StringBuilder(512).append(topic) + .append(TokenConstants.SEGMENT_SEP) + .append(topicInfo.getSimpleValue()); + topicPartBufferMap.put(topic, tmpValue); } else { - tmpValue.append(TokenConstants.ARRAY_SEP) + confValue.append(TokenConstants.ARRAY_SEP) .append(topicInfo.getSimpleValue()); } } } } + // append max message size information + Integer maxMsgSize; for (Map.Entry<String, StringBuilder> entry : topicPartBufferMap.entrySet()) { - if (entry.getValue() != null) { - topicPartStrMap.put(entry.getKey(), entry.getValue().toString()); + if (entry.getValue() == null) { + continue; + } + confValue = topicPartBufferMap.get(entry.getKey()); + maxMsgSize = topicSizeMap.get(entry.getKey()); + if (maxMsgSize == null) { + topicPartStrMap.put(entry.getKey(), + confValue.append(TokenConstants.SEGMENT_SEP).toString()); + } else { + topicPartStrMap.put(entry.getKey(), + confValue.append(TokenConstants.SEGMENT_SEP).append(maxMsgSize).toString()); } } topicPartBufferMap.clear(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java index c274f67a5..26678c2b1 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java @@ -509,8 +509,8 @@ public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver { } @Override - public Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> topicSet) { - return brokerPubSubInfo.getAcceptPubPartInfo(topicSet); + public Map<String, String> getPubBrokerAcceptPubPartInfo(Map<String, Integer> topicSizeMap) { + return brokerPubSubInfo.getAcceptPubPartInfo(topicSizeMap); } @Override @@ -529,8 +529,15 @@ public class DefBrokerRunManager implements BrokerRunManager, ConfigObserver { } @Override - public TopicInfo getPubBrokerTopicInfo(int brokerId, String topic) { - return brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic); + public void getSubBrokerTopicInfo(int brokerId, String topic, + Tuple2<Boolean, TopicInfo> result) { + brokerPubSubInfo.getBrokerSubPushedTopicInfo(brokerId, topic, result); + } + + @Override + public void getPubBrokerTopicInfo(int brokerId, String topic, + Tuple3<Boolean, Boolean, TopicInfo> result) { + brokerPubSubInfo.getBrokerPubPushedTopicInfo(brokerId, topic, result); } @Override diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java index 9543ae632..0dc4dbb97 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java @@ -27,7 +27,7 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants; import org.apache.inlong.tubemq.corebase.cluster.TopicInfo; import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet; -import org.apache.inlong.tubemq.corebase.utils.Tuple2; +import org.apache.inlong.tubemq.corebase.utils.Tuple3; import org.apache.inlong.tubemq.server.common.TServerConstants; import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef; import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils; @@ -68,6 +68,7 @@ public class WebMasterInfoHandler extends AbstractWebHandler { // register modify method registerModifyWebMethod("admin_transfer_current_master", "transferCurrentMaster"); + // register modify method registerModifyWebMethod("admin_set_cluster_default_setting", "adminSetClusterDefSetting"); registerModifyWebMethod("admin_update_cluster_default_setting", @@ -274,9 +275,11 @@ public class WebMasterInfoHandler extends AbstractWebHandler { int totalRunTopicStoreCount = 0; boolean isSrvAcceptPublish = false; boolean isSrvAcceptSubscribe = false; - boolean isAcceptPublish = false; - boolean isAcceptSubscribe = false; boolean enableAuthControl = false; + TopicPropGroup topicProps; + TopicCtrlEntity authEntity; + BrokerConfEntity brokerConfEntity; + Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>(); WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer); for (Map.Entry<String, List<TopicDeployEntity>> entry : topicConfMap.entrySet()) { if (totalCount++ > 0) { @@ -290,40 +293,32 @@ public class WebMasterInfoHandler extends AbstractWebHandler { isSrvAcceptPublish = false; isSrvAcceptSubscribe = false; enableAuthControl = false; - isAcceptPublish = false; - isAcceptSubscribe = false; for (TopicDeployEntity entity : entry.getValue()) { - BrokerConfEntity brokerConfEntity = + brokerConfEntity = defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId()); if (brokerConfEntity == null) { continue; } brokerCount++; - Tuple2<Boolean, Boolean> pubSubStatus = - WebParameterUtils.getPubSubStatusByManageStatus( - brokerConfEntity.getManageStatus().getCode()); - isAcceptPublish = pubSubStatus.getF0(); - isAcceptSubscribe = pubSubStatus.getF1(); - TopicPropGroup topicProps = entity.getTopicProps(); + topicProps = entity.getTopicProps(); totalCfgTopicStoreCount += topicProps.getNumTopicStores(); totalCfgNumPartCount += topicProps.getNumPartitions() * topicProps.getNumTopicStores(); - TopicInfo topicInfo = - brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName()); - if (topicInfo != null) { - if (isAcceptPublish && topicInfo.isAcceptPublish()) { + brokerRunManager.getPubBrokerTopicInfo( + entity.getBrokerId(), entity.getTopicName(), topicInfoTuple); + if (topicInfoTuple.getF2() != null) { + if (topicInfoTuple.getF0() && topicInfoTuple.getF2().isAcceptPublish()) { isSrvAcceptPublish = true; } - if (isAcceptSubscribe && topicInfo.isAcceptSubscribe()) { + if (topicInfoTuple.getF1() && topicInfoTuple.getF2().isAcceptSubscribe()) { isSrvAcceptSubscribe = true; } - totalRunTopicStoreCount += topicInfo.getTopicStoreNum(); + totalRunTopicStoreCount += topicInfoTuple.getF2().getTopicStoreNum(); totalRunNumPartCount += - topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum(); + topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum(); } } - TopicCtrlEntity authEntity = - defMetaDataService.getTopicCtrlByTopicName(entry.getKey()); + authEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey()); if (authEntity != null) { enableAuthControl = authEntity.isAuthCtrlEnable(); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java index 344cb0f71..8d991bbb2 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java @@ -28,8 +28,8 @@ import org.apache.inlong.tubemq.corebase.TBaseConstants; import org.apache.inlong.tubemq.corebase.cluster.TopicInfo; import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.Tuple2; +import org.apache.inlong.tubemq.corebase.utils.Tuple3; import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef; -import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus; import org.apache.inlong.tubemq.server.common.statusdef.TopicStatus; import org.apache.inlong.tubemq.server.common.statusdef.TopicStsChgType; import org.apache.inlong.tubemq.server.common.utils.WebParameterUtils; @@ -607,16 +607,19 @@ public class WebTopicDeployHandler extends AbstractWebHandler { Map<String, List<TopicDeployEntity>> topicDeployInfoMap) { // build query result int totalCnt = 0; + int itemCount = 0; + int condStatusId = 1; int maxMsgSizeInMB = 0; int totalCfgNumPartCount = 0; int totalRunNumPartCount = 0; boolean enableAuthCtrl; boolean isSrvAcceptPublish = false; boolean isSrvAcceptSubscribe = false; - boolean isAcceptPublish = false; - boolean isAcceptSubscribe = false; - ManageStatus manageStatus; - Tuple2<Boolean, Boolean> pubSubStatus; + String strManageStatus; + TopicCtrlEntity ctrlEntity; + BrokerConfEntity brokerConfEntity; + List<GroupConsumeCtrlEntity> groupCtrlInfoLst; + Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>(); BrokerRunManager brokerRunManager = master.getBrokerRunManager(); ClusterSettingEntity defSetting = defMetaDataService.getClusterDefSetting(false); WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer); @@ -625,11 +628,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler { totalRunNumPartCount = 0; isSrvAcceptPublish = false; isSrvAcceptSubscribe = false; - isAcceptPublish = false; - isAcceptSubscribe = false; - enableAuthCtrl = false; - TopicCtrlEntity ctrlEntity = - defMetaDataService.getTopicCtrlByTopicName(entry.getKey()); + ctrlEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey()); if (ctrlEntity == null) { continue; } @@ -644,54 +643,55 @@ public class WebTopicDeployHandler extends AbstractWebHandler { sBuffer.append("{\"topicName\":\"").append(entry.getKey()) .append("\",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB) .append(",\"topicInfo\":["); - int brokerCount = 0; + itemCount = 0; for (TopicDeployEntity entity : entry.getValue()) { - if (brokerCount++ > 0) { + if (itemCount++ > 0) { sBuffer.append(","); } totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores(); entity.toWebJsonStr(sBuffer, true, false); sBuffer.append(",\"runInfo\":{"); - BrokerConfEntity brokerConfEntity = + strManageStatus = "-"; + brokerConfEntity = defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId()); - String strManageStatus = "-"; if (brokerConfEntity != null) { - manageStatus = brokerConfEntity.getManageStatus(); - strManageStatus = manageStatus.getDescription(); - pubSubStatus = manageStatus.getPubSubStatus(); - isAcceptPublish = pubSubStatus.getF0(); - isAcceptSubscribe = pubSubStatus.getF1(); + strManageStatus = brokerConfEntity.getManageStatus().getDescription(); } - TopicInfo topicInfo = - brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName()); - if (topicInfo == null) { + brokerRunManager.getPubBrokerTopicInfo( + entity.getBrokerId(), entity.getTopicName(), topicInfoTuple); + if (topicInfoTuple.getF2() == null) { sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"") .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\""); } else { - if (isAcceptPublish) { - sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish()); - if (topicInfo.isAcceptPublish()) { + if (topicInfoTuple.getF0()) { + sBuffer.append("\"acceptPublish\":") + .append(topicInfoTuple.getF2().isAcceptPublish()); + if (topicInfoTuple.getF2().isAcceptPublish()) { isSrvAcceptPublish = true; } } else { sBuffer.append("\"acceptPublish\":false"); } - if (isAcceptSubscribe) { - sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe()); - if (topicInfo.isAcceptSubscribe()) { + if (topicInfoTuple.getF1()) { + sBuffer.append(",\"acceptSubscribe\":") + .append(topicInfoTuple.getF2().isAcceptSubscribe()); + if (topicInfoTuple.getF2().isAcceptSubscribe()) { isSrvAcceptSubscribe = true; } } else { sBuffer.append(",\"acceptSubscribe\":false"); } - totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum(); - sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum()) - .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum()) + totalRunNumPartCount += + topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum(); + sBuffer.append(",\"numPartitions\":") + .append(topicInfoTuple.getF2().getPartitionNum()) + .append(",\"numTopicStores\":") + .append(topicInfoTuple.getF2().getTopicStoreNum()) .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\""); } sBuffer.append("}}"); } - sBuffer.append("],\"infoCount\":").append(brokerCount) + sBuffer.append("],\"infoCount\":").append(itemCount) .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount) .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish) .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe) @@ -702,9 +702,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler { .append(",\"createUser\":\"").append(ctrlEntity.getModifyUser()) .append("\",\"createDate\":\"").append(ctrlEntity.getModifyDateStr()) .append("\",\"authConsumeGroup\":["); - List<GroupConsumeCtrlEntity> groupCtrlInfoLst = - defMetaDataService.getConsumeCtrlByTopic(entry.getKey()); - int itemCount = 0; + itemCount = 0; + groupCtrlInfoLst = defMetaDataService.getConsumeCtrlByTopic(entry.getKey()); for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) { if (itemCount++ > 0) { sBuffer.append(","); @@ -718,7 +717,6 @@ public class WebTopicDeployHandler extends AbstractWebHandler { } sBuffer.append("],\"groupCount\":").append(itemCount).append(",\"authFilterCondSet\":["); itemCount = 0; - int condStatusId = 1; for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) { if (itemCount++ > 0) { sBuffer.append(","); @@ -753,14 +751,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler { Map<String, List<TopicDeployEntity>> topicDeployMap) { // build query result int totalCnt = 0; + int itemCount = 0; int totalCfgNumPartCount = 0; int totalRunNumPartCount = 0; boolean isSrvAcceptPublish = false; boolean isSrvAcceptSubscribe = false; - boolean isAcceptPublish = false; - boolean isAcceptSubscribe = false; - ManageStatus manageStatus; - Tuple2<Boolean, Boolean> pubSubStatus; + String strManageStatus; + TopicCtrlEntity ctrlEntity; + BrokerConfEntity brokerConfEntity; + List<GroupConsumeCtrlEntity> groupCtrlInfoLst; + Tuple3<Boolean, Boolean, TopicInfo> topicInfoTuple = new Tuple3<>(); BrokerRunManager brokerRunManager = master.getBrokerRunManager(); WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer); for (Map.Entry<String, List<TopicDeployEntity>> entry : topicDeployMap.entrySet()) { @@ -768,9 +768,7 @@ public class WebTopicDeployHandler extends AbstractWebHandler { totalRunNumPartCount = 0; isSrvAcceptPublish = false; isSrvAcceptSubscribe = false; - isAcceptPublish = false; - isAcceptSubscribe = false; - TopicCtrlEntity ctrlEntity = + ctrlEntity = defMetaDataService.getTopicCtrlByTopicName(entry.getKey()); if (ctrlEntity == null) { continue; @@ -780,71 +778,71 @@ public class WebTopicDeployHandler extends AbstractWebHandler { } ctrlEntity.toWebJsonStr(sBuffer, true, false); sBuffer.append(",\"deployInfo\":["); - int brokerCount = 0; + itemCount = 0; for (TopicDeployEntity entity : entry.getValue()) { - if (brokerCount++ > 0) { + if (itemCount++ > 0) { sBuffer.append(","); } totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores(); entity.toWebJsonStr(sBuffer, true, false); sBuffer.append(",\"runInfo\":{"); - BrokerConfEntity brokerConfEntity = + brokerConfEntity = defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId()); - - String strManageStatus = "-"; + strManageStatus = "-"; if (brokerConfEntity != null) { - manageStatus = brokerConfEntity.getManageStatus(); - strManageStatus = manageStatus.getDescription(); - pubSubStatus = manageStatus.getPubSubStatus(); - isAcceptPublish = pubSubStatus.getF0(); - isAcceptSubscribe = pubSubStatus.getF1(); + strManageStatus = brokerConfEntity.getManageStatus().getDescription(); } - TopicInfo topicInfo = - brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), entity.getTopicName()); - if (topicInfo == null) { + brokerRunManager.getPubBrokerTopicInfo( + entity.getBrokerId(), entity.getTopicName(), topicInfoTuple); + if (topicInfoTuple.getF2() == null) { sBuffer.append("\"acceptPublish\":\"-\"").append(",\"acceptSubscribe\":\"-\"") .append(",\"numPartitions\":\"-\"").append(",\"brokerManageStatus\":\"-\""); } else { - if (isAcceptPublish) { - sBuffer.append("\"acceptPublish\":").append(topicInfo.isAcceptPublish()); - if (topicInfo.isAcceptPublish()) { + if (topicInfoTuple.getF0()) { + sBuffer.append("\"acceptPublish\":") + .append(topicInfoTuple.getF2().isAcceptPublish()); + if (topicInfoTuple.getF2().isAcceptPublish()) { isSrvAcceptPublish = true; } } else { sBuffer.append("\"acceptPublish\":false"); } - if (isAcceptSubscribe) { - sBuffer.append(",\"acceptSubscribe\":").append(topicInfo.isAcceptSubscribe()); - if (topicInfo.isAcceptSubscribe()) { + if (topicInfoTuple.getF1()) { + sBuffer.append(",\"acceptSubscribe\":") + .append(topicInfoTuple.getF2().isAcceptSubscribe()); + if (topicInfoTuple.getF2().isAcceptSubscribe()) { isSrvAcceptSubscribe = true; } } else { sBuffer.append(",\"acceptSubscribe\":false"); } - totalRunNumPartCount += topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum(); - sBuffer.append(",\"numPartitions\":").append(topicInfo.getPartitionNum()) - .append(",\"numTopicStores\":").append(topicInfo.getTopicStoreNum()) + totalRunNumPartCount += + topicInfoTuple.getF2().getPartitionNum() * topicInfoTuple.getF2().getTopicStoreNum(); + sBuffer.append(",\"numPartitions\":") + .append(topicInfoTuple.getF2().getPartitionNum()) + .append(",\"numTopicStores\":") + .append(topicInfoTuple.getF2().getTopicStoreNum()) .append(",\"brokerManageStatus\":\"").append(strManageStatus).append("\""); } sBuffer.append("}}"); } - sBuffer.append("],\"infoCount\":").append(brokerCount) + sBuffer.append("],\"infoCount\":").append(itemCount) .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount) .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish) .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe) .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount); if (withAuthInfo) { sBuffer.append(",\"groupAuthInfo\":["); - List<GroupConsumeCtrlEntity> groupCtrlInfoLst = + groupCtrlInfoLst = defMetaDataService.getConsumeCtrlByTopic(entry.getKey()); - int countJ = 0; + itemCount = 0; for (GroupConsumeCtrlEntity groupEntity : groupCtrlInfoLst) { - if (countJ++ > 0) { + if (itemCount++ > 0) { sBuffer.append(","); } groupEntity.toWebJsonStr(sBuffer, true, true); } - sBuffer.append("],\"groupAuthCount\":").append(countJ); + sBuffer.append("],\"groupAuthCount\":").append(itemCount); } sBuffer.append("}"); } @@ -1073,5 +1071,4 @@ public class WebTopicDeployHandler extends AbstractWebHandler { } return buildRetInfo(retInfo, sBuffer); } - }