This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new f92f53513 [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059) f92f53513 is described below commit f92f535134268bd5730a37adea7cb038f38e4b78 Author: Goson Zhang <4675...@qq.com> AuthorDate: Thu Sep 29 15:09:33 2022 +0800 [INLONG-6058][DataProxy] Add parameter checking and optimize client creation logic (#6059) --- .../inlong/dataproxy/config/ConfigManager.java | 6 ++++-- .../dataproxy/sink/pulsar/PulsarClientService.java | 25 ++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index 554881448..fa05427b9 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL; @@ -55,6 +56,7 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_IN * Config manager class. */ public class ConfigManager { + private static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class); public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>(); private static volatile boolean isInit = false; @@ -157,7 +159,8 @@ public class ConfigManager { // add new configure records for (Map.Entry<String, String> entry : result.entrySet()) { String oldValue = tmpHolder.put(entry.getKey(), entry.getValue()); - if (!ObjectUtils.equals(oldValue, entry.getValue())) { + if ((entry.getValue() == null && !Objects.equals("null", oldValue)) + || (entry.getValue() != null && !Objects.equals(entry.getValue(), oldValue))) { changed = true; } } @@ -256,7 +259,6 @@ public class ConfigManager { */ public static class ReloadConfigWorker extends Thread { - private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class); private final ConfigManager configManager; private final CloseableHttpClient httpClient; private final Gson gson = new Gson(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java index bf4558313..66b06e93d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java @@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.pulsar; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -363,10 +364,15 @@ public class PulsarClientService { private void removeProducers(PulsarClient pulsarClient) { for (List<TopicProducerInfo> producers : producerInfoMap.values()) { - for (TopicProducerInfo topicProducer : producers) { - if (topicProducer.getPulsarClient().equals(pulsarClient)) { - topicProducer.close(); - producers.remove(topicProducer); + if (producers == null || producers.isEmpty()) { + continue; + } + Iterator<TopicProducerInfo> it = producers.iterator(); + while (it.hasNext()) { + TopicProducerInfo entry = it.next(); + if (entry.getPulsarClient().equals(pulsarClient)) { + entry.close(); + it.remove(); } } } @@ -412,10 +418,17 @@ public class PulsarClientService { topic); info.initProducer(); if (info.isCanUseToSendMessage()) { - producerInfoMap.computeIfAbsent(topic, k -> new ArrayList<>()).add(info); + List<TopicProducerInfo> producerInfos = producerInfoMap.get(topic); + if (producerInfos == null) { + List<TopicProducerInfo> tmpProdInfos = new ArrayList<>(); + producerInfos = producerInfoMap.putIfAbsent(topic, tmpProdInfos); + if (producerInfos == null) { + producerInfos = tmpProdInfos; + } + } + producerInfos.add(info); } } - } catch (PulsarClientException e) { callBack.handleCreateClientException(url); logger.error("create connection error in pulsar sink, "