This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 9e7943edb [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066) 9e7943edb is described below commit 9e7943edb35eda6d05471c054c3ac888cc56a95d Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Sep 30 10:27:54 2022 +0800 [INLONG-6065][DataProxy] Delete the Pulsar client synchronously when deleting a topic (#6066) Co-authored-by: healchow <healc...@gmail.com> --- .../apache/inlong/dataproxy/sink/PulsarSink.java | 25 +++++++++++++++++----- .../dataproxy/sink/pulsar/PulsarClientService.java | 14 ++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index cac638ce5..c2014965d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -246,21 +246,36 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag /** * When topic.properties is re-enabled, the producer update is triggered */ - public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet, Set<String> endSet) { + public void diffSetPublish(PulsarClientService pulsarClientService, + Set<String> curTopicSet, Set<String> newTopicSet) { boolean changed = false; - for (String s : endSet) { - if (!originalSet.contains(s)) { + // create producers for new topics + for (String newTopic : newTopicSet) { + if (!curTopicSet.contains(newTopic)) { changed = true; try { - pulsarClientService.initTopicProducer(s); + pulsarClientService.initTopicProducer(newTopic); } catch (Exception e) { logger.error("get producer failed: ", e); } } } + // remove producers for deleted topics + for (String oldTopic : curTopicSet) { + if (!newTopicSet.contains(oldTopic)) { + changed = true; + try { + pulsarClientService.destroyProducerByTopic(oldTopic); + } catch (Exception e) { + logger.error("remove producer failed: ", e); + } + } + } if (changed) { - logger.info("topics.properties has changed, trigger diff publish for {}", getName()); topicProperties = configManager.getTopicProperties(); + logger.info("topics.properties has changed, trigger diff publish for {}," + + " old topic set = {}, new topic set = {}, current topicProperties = {}", + getName(), curTopicSet, newTopicSet, topicProperties); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java index 66b06e93d..dc0a87b07 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java @@ -323,6 +323,20 @@ public class PulsarClientService { return initTopicProducer(topic, null, null); } + public boolean destroyProducerByTopic(String topic) { + List<TopicProducerInfo> producerInfoList = producerInfoMap.remove(topic); + if (producerInfoList == null || producerInfoList.isEmpty()) { + return true; + } + for (TopicProducerInfo producerInfo : producerInfoList) { + if (producerInfo != null) { + producerInfo.close(); + logger.info("destroy producer for topic={}", topic); + } + } + return true; + } + private TopicProducerInfo getProducerInfo(int poolIndex, String topic, String inlongGroupId, String inlongStreamId) { List<TopicProducerInfo> producerList = initTopicProducer(topic, inlongGroupId, inlongStreamId);