This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6c3d72b0ff [INLONG-8192][DataProxy] The topic name generated by dataproxy is incorrect (#8194) 6c3d72b0ff is described below commit 6c3d72b0ffb0064a0147b41b0d774f3570bd6a63 Author: Goson Zhang <4675...@qq.com> AuthorDate: Thu Jun 8 21:30:54 2023 +0800 [INLONG-8192][DataProxy] The topic name generated by dataproxy is incorrect (#8194) --- .../dataproxy/config/holder/MetaConfigHolder.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java index 9c7078ee48..70a07bde2c 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java @@ -145,12 +145,9 @@ public class MetaConfigHolder extends ConfigHolder { } public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) { - if (StringUtils.isBlank(inDataMd5) || StringUtils.isBlank(inDataJsonStr)) { - return false; - } - if (inDataMd5.equalsIgnoreCase(dataMd5)) { - LOG.info("Update json {}, but the stored md5sum {} is equals to changed md5sum {}", - getFileName(), dataMd5, inDataMd5); + if (StringUtils.isBlank(inDataMd5) + || StringUtils.isBlank(inDataJsonStr) + || inDataMd5.equalsIgnoreCase(dataMd5)) { return false; } if (storeConfigToFile(inDataJsonStr)) { @@ -368,12 +365,15 @@ public class MetaConfigHolder extends ConfigHolder { } tenant = idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, ""); nameSpace = idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, ""); + if (StringUtils.isBlank(idObject.getTopic())) { + // namespace field must exist and value not be empty, + // otherwise it is an illegal configuration item. + continue; + } if (mqType.equals(CacheType.TUBE)) { - if (StringUtils.isNotBlank(nameSpace)) { - topicName = nameSpace; - } + topicName = nameSpace; } else if (mqType.equals(CacheType.KAFKA)) { - if (StringUtils.isNotBlank(nameSpace)) { + if (topicName.equals(streamId)) { topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName); } }