This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new a78b25523 [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041) a78b25523 is described below commit a78b255236f1f50aed89ed92a8e0956e0ecf9e21 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Sep 28 10:02:23 2022 +0800 [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041) --- .../inlong/dataproxy/config/ConfigManager.java | 100 +++++++++++++++------ 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index f66b46209..554881448 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -101,6 +102,72 @@ public class ConfigManager { return topicConfig.getHolder(); } + public boolean addTopicProperties(Map<String, String> result) { + return updatePropertiesHolder(result, topicConfig, true); + } + + public boolean deleteTopicProperties(Map<String, String> result) { + return updatePropertiesHolder(result, topicConfig, false); + } + + public Map<String, String> getMxProperties() { + return mxConfig.getHolder(); + } + + public boolean addMxProperties(Map<String, String> result) { + return updatePropertiesHolder(result, mxConfig, true); + } + + public boolean deleteMxProperties(Map<String, String> result) { + return updatePropertiesHolder(result, mxConfig, false); + } + + public boolean updateTopicProperties(Map<String, String> result) { + return updatePropertiesHolder(result, topicConfig); + } + + public boolean updateMQClusterProperties(Map<String, String> result) { + return updatePropertiesHolder(result, mqClusterConfigHolder); + } + + public boolean updateMxProperties(Map<String, String> result) { + return updatePropertiesHolder(result, mxConfig); + } + + /** + * update old maps, reload local files if changed. + * + * @param result - map pending to be added + * @param holder - property holder + * @return true if changed else false. + */ + private boolean updatePropertiesHolder(Map<String, String> result, + PropertiesConfigHolder holder) { + boolean changed = false; + Map<String, String> tmpHolder = holder.forkHolder(); + // Delete non-existent configuration records + Iterator<Map.Entry<String, String>> it = tmpHolder.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, String> entry = it.next(); + if (!result.containsKey(entry.getKey())) { + it.remove(); + changed = true; + } + } + // add new configure records + for (Map.Entry<String, String> entry : result.entrySet()) { + String oldValue = tmpHolder.put(entry.getKey(), entry.getValue()); + if (!ObjectUtils.equals(oldValue, entry.getValue())) { + changed = true; + } + } + if (changed) { + return holder.loadFromHolderToFile(tmpHolder); + } else { + return false; + } + } + /** * update old maps, reload local files if changed. * @@ -109,8 +176,9 @@ public class ConfigManager { * @param addElseRemove - if add(true) else remove(false) * @return true if changed else false. */ - private boolean updatePropertiesHolder(Map<String, String> result, PropertiesConfigHolder holder, - boolean addElseRemove) { + private boolean updatePropertiesHolder(Map<String, String> result, + PropertiesConfigHolder holder, + boolean addElseRemove) { Map<String, String> tmpHolder = holder.forkHolder(); boolean changed = false; @@ -135,30 +203,6 @@ public class ConfigManager { } } - public boolean addTopicProperties(Map<String, String> result) { - return updatePropertiesHolder(result, topicConfig, true); - } - - public boolean deleteTopicProperties(Map<String, String> result) { - return updatePropertiesHolder(result, topicConfig, false); - } - - public boolean updateMQClusterProperties(Map<String, String> result) { - return updatePropertiesHolder(result, mqClusterConfigHolder, true); - } - - public Map<String, String> getMxProperties() { - return mxConfig.getHolder(); - } - - public boolean addMxProperties(Map<String, String> result) { - return updatePropertiesHolder(result, mxConfig, true); - } - - public boolean deleteMxProperties(Map<String, String> result) { - return updatePropertiesHolder(result, mxConfig, false); - } - public Map<String, String> getDcMappingProperties() { return dcConfig.getHolder(); } @@ -328,8 +372,8 @@ public class ConfigManager { groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic()); } } - configManager.addMxProperties(groupIdToMValue); - configManager.addTopicProperties(groupIdToTopic); + configManager.updateMxProperties(groupIdToMValue); + configManager.updateTopicProperties(groupIdToTopic); // other params for mq mqConfig.putAll(clusterSet.get(0).getParams()); configManager.updateMQClusterProperties(mqConfig);