This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c3d72b0ff [INLONG-8192][DataProxy] The topic name generated by 
dataproxy is incorrect (#8194)
6c3d72b0ff is described below

commit 6c3d72b0ffb0064a0147b41b0d774f3570bd6a63
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Thu Jun 8 21:30:54 2023 +0800

    [INLONG-8192][DataProxy] The topic name generated by dataproxy is incorrect 
(#8194)
---
 .../dataproxy/config/holder/MetaConfigHolder.java    | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 9c7078ee48..70a07bde2c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -145,12 +145,9 @@ public class MetaConfigHolder extends ConfigHolder {
     }
 
     public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
-        if (StringUtils.isBlank(inDataMd5) || 
StringUtils.isBlank(inDataJsonStr)) {
-            return false;
-        }
-        if (inDataMd5.equalsIgnoreCase(dataMd5)) {
-            LOG.info("Update json {}, but the stored md5sum {} is equals to 
changed md5sum {}",
-                    getFileName(), dataMd5, inDataMd5);
+        if (StringUtils.isBlank(inDataMd5)
+                || StringUtils.isBlank(inDataJsonStr)
+                || inDataMd5.equalsIgnoreCase(dataMd5)) {
             return false;
         }
         if (storeConfigToFile(inDataJsonStr)) {
@@ -368,12 +365,15 @@ public class MetaConfigHolder extends ConfigHolder {
             }
             tenant = 
idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, "");
             nameSpace = 
idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, "");
+            if (StringUtils.isBlank(idObject.getTopic())) {
+                // namespace field must exist and value not be empty,
+                // otherwise it is an illegal configuration item.
+                continue;
+            }
             if (mqType.equals(CacheType.TUBE)) {
-                if (StringUtils.isNotBlank(nameSpace)) {
-                    topicName = nameSpace;
-                }
+                topicName = nameSpace;
             } else if (mqType.equals(CacheType.KAFKA)) {
-                if (StringUtils.isNotBlank(nameSpace)) {
+                if (topicName.equals(streamId)) {
                     topicName = 
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName);
                 }
             }

Reply via email to