This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new bfd980ac3 [INLONG-6419][TubeMQ] Correct some misuse of META_DEFAULT_BROKER_PORT (#6420) bfd980ac3 is described below commit bfd980ac357143c5b470915056b8a681a7d79c41 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Nov 7 15:27:52 2022 +0800 [INLONG-6419][TubeMQ] Correct some misuse of META_DEFAULT_BROKER_PORT (#6420) --- .../org/apache/inlong/tubemq/client/consumer/RmtDataCache.java | 4 ++-- .../org/apache/inlong/tubemq/client/producer/ProducerManager.java | 4 ++-- .../apache/inlong/tubemq/corebase/utils/DataConverterUtil.java | 8 ++++++-- .../apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java | 3 ++- .../server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java | 2 +- .../master/metamanage/metastore/dao/entity/BrokerConfEntity.java | 2 +- 6 files changed, 14 insertions(+), 9 deletions(-) diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java index 3170f7db2..7cd18cf4c 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java @@ -443,8 +443,8 @@ public class RmtDataCache implements Closeable { StringBuilder sBuilder) { if (pkgCheckSum != lstBrokerConfigId.get()) { if (pkgBrokerInfos != null) { - brokersMap = - DataConverterUtil.convertBrokerInfo(pkgBrokerInfos); + brokersMap = DataConverterUtil.convertBrokerInfo( + pkgBrokerInfos, consumerConfig.isTlsEnable()); lstBrokerConfigId.set(pkgCheckSum); lastBrokerUpdatedTime = System.currentTimeMillis(); if (pkgBrokerInfos.isEmpty()) { diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java index 6ce1bfa06..fa5eb7fce 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java @@ -567,8 +567,8 @@ public class ProducerManager { long pkgCheckSum, StringBuilder sBuilder) { if (pkgCheckSum != brokerInfoCheckSum) { if (pkgBrokerInfos != null) { - brokersMap = - DataConverterUtil.convertBrokerInfo(pkgBrokerInfos); + brokersMap = DataConverterUtil.convertBrokerInfo( + pkgBrokerInfos, tubeClientConfig.isTlsEnable()); brokerInfoCheckSum = pkgCheckSum; lastBrokerUpdatedTime = System.currentTimeMillis(); if (pkgBrokerInfos.isEmpty()) { diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java index 3d63c8e19..8d3736b21 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/DataConverterUtil.java @@ -142,15 +142,19 @@ public class DataConverterUtil { * convert string info to @link BrokerInfo * * @param strBrokerInfos return a BrokerInfo Map + * @param enableTLS Whether to enable TLS */ - public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String> strBrokerInfos) { + public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String> strBrokerInfos, + boolean enableTLS) { Map<Integer, BrokerInfo> brokerInfoMap = new ConcurrentHashMap<>(); if (strBrokerInfos != null) { + int brokerPort = enableTLS + ? TBaseConstants.META_DEFAULT_BROKER_TLS_PORT : TBaseConstants.META_DEFAULT_BROKER_PORT; for (String info : strBrokerInfos) { if (info != null) { BrokerInfo brokerInfo = - new BrokerInfo(info, TBaseConstants.META_DEFAULT_BROKER_PORT); + new BrokerInfo(info, brokerPort); brokerInfoMap.put(brokerInfo.getBrokerId(), brokerInfo); } } diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java index f7ef9576e..4721e5d6a 100644 --- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java +++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/codec/DataConverterUtilTest.java @@ -44,7 +44,8 @@ public class DataConverterUtilTest { BrokerInfo broker = new BrokerInfo(0, "localhost", 1200); List<String> strInfoList = new ArrayList<>(); strInfoList.add("0:localhost:1200"); - Map<Integer, BrokerInfo> brokerMap = DataConverterUtil.convertBrokerInfo(strInfoList); + Map<Integer, BrokerInfo> brokerMap = + DataConverterUtil.convertBrokerInfo(strInfoList, false); assertEquals("broker should be equal", broker, brokerMap.get(broker.getBrokerId())); // partition convert diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java index 33a648f09..f00be53a9 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java @@ -220,7 +220,7 @@ public class BdbBrokerConfEntity implements Serializable { } public String getSimpleTLSBrokerInfo() { - if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_PORT) { + if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_TLS_PORT) { return this.brokerTLSSimpleInfo; } else { return this.brokerTLSFullInfo; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java index 55691bef8..4a414b1bc 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java @@ -188,7 +188,7 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable { } public String getSimpleTLSBrokerInfo() { - if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_PORT) { + if (getBrokerTLSPort() == TBaseConstants.META_DEFAULT_BROKER_TLS_PORT) { return this.brokerTLSSimpleInfo; } else { return this.brokerTLSFullInfo;