This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 287644e7c8db46771621ca735467624a966c0058 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Nov 9 18:40:10 2022 +0800 [INLONG-6406][DataProxy] Should support creating sink dynamically after started (addendum) (#6488) --- .../org/apache/inlong/dataproxy/sink/PulsarSink.java | 5 +---- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 20 +++++++------------- .../dataproxy/sink/common/TubeProducerHolder.java | 6 +++++- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 6fd938030..0e0330231 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -306,7 +306,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag pulsarCluster = configManager.getMqClusterUrl2Token(); if (!ConfigManager.getInstance().isMqClusterReady()) { - this.canTake = true; ConfigManager.getInstance().updMqClusterStatus(true); logger.info("[{}] MQ Cluster service status ready!", getName()); } @@ -350,9 +349,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName()); MetricRegister.register(metricItemSet); - if (ConfigManager.getInstance().isMqClusterReady()) { - this.canTake = true; - } + this.canTake = true; logger.info("[{}] Pulsar sink started", getName()); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index dee798214..e1dbf90a3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -76,7 +76,6 @@ public class TubeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(TubeSink.class); private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler(); private TubeProducerHolder producerHolder = null; - private volatile boolean canTake = false; private volatile boolean canSend = false; private volatile boolean isOverFlow = false; private ConfigManager configManager; @@ -212,9 +211,6 @@ public class TubeSink extends AbstractSink implements Configurable { // start the cleaner thread super.start(); this.canSend = true; - if (ConfigManager.getInstance().isMqClusterReady()) { - this.canTake = true; - } for (int i = 0; i < sinkThreadPool.length; i++) { sinkThreadPool[i] = new Thread(new TubeSinkTask(), getName() + "_tube_sink_sender-" + i); @@ -229,7 +225,6 @@ public class TubeSink extends AbstractSink implements Configurable { logger.info("Duplicated call, " + getName() + " has stopped!"); return; } - this.canTake = false; // waiting inflight message processed int waitCount = 0; while (takenMsgCnt.get() > 0 && waitCount++ < 10) { @@ -269,7 +264,7 @@ public class TubeSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { - if (!this.canTake) { + if (!this.started.get()) { return Status.BACKOFF; } Status status = Status.READY; @@ -319,6 +314,10 @@ public class TubeSink extends AbstractSink implements Configurable { logger.info("sink task {} started.", Thread.currentThread().getName()); while (canSend) { try { + if (!started.get() && cachedMsgCnt.get() <= 0) { + logger.info("Found started is false and taken message count is zero, braek!"); + break; + } if (isOverFlow) { isOverFlow = false; Thread.sleep(30); @@ -327,10 +326,6 @@ public class TubeSink extends AbstractSink implements Configurable { if (resendQueue.isEmpty()) { event = eventQueue.poll(2000, TimeUnit.MILLISECONDS); if (event == null) { - if (!canTake && takenMsgCnt.get() <= 0) { - logger.info("Found canTake is false and taken message count is zero, braek!"); - break; - } continue; } cachedMsgCnt.decrementAndGet(); @@ -534,7 +529,7 @@ public class TubeSink extends AbstractSink implements Configurable { @Override public void run() { - if (!canTake && takenMsgCnt.get() <= 0) { + if (!started.get()) { return; } logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get() @@ -621,7 +616,7 @@ public class TubeSink extends AbstractSink implements Configurable { if (producerHolder != null) { try { producerHolder.createProducersByTopicSet(addedTopics); - } catch (Exception e) { + } catch (Throwable e) { logger.info(getName() + "'s publish new topic set fail.", e); } } @@ -675,7 +670,6 @@ public class TubeSink extends AbstractSink implements Configurable { tmpProducerHolder.stop(); } if (!ConfigManager.getInstance().isMqClusterReady()) { - this.canTake = true; ConfigManager.getInstance().updMqClusterStatus(true); logger.info("[{}] MQ Cluster service status ready!", getName()); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java index 04b43c87f..37f828d24 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java @@ -263,7 +263,11 @@ public class TubeProducerHolder { lastProducer = sessionFactory.createProducer(); lastPubTopicCnt.set(0); } - lastProducer.publish(subTopicSet); + try { + lastProducer.publish(subTopicSet); + } catch (Throwable e) { + logger.info(sinkName + " meta sink publish fail.", e); + } lastPubTopicCnt.addAndGet(subTopicSet.size()); for (String topicItem : subTopicSet) { producerMap.put(topicItem, lastProducer);