This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 287644e7c8db46771621ca735467624a966c0058
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Wed Nov 9 18:40:10 2022 +0800

    [INLONG-6406][DataProxy] Should support creating sink dynamically after 
started (addendum) (#6488)
---
 .../org/apache/inlong/dataproxy/sink/PulsarSink.java |  5 +----
 .../org/apache/inlong/dataproxy/sink/TubeSink.java   | 20 +++++++-------------
 .../dataproxy/sink/common/TubeProducerHolder.java    |  6 +++++-
 3 files changed, 13 insertions(+), 18 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 6fd938030..0e0330231 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -306,7 +306,6 @@ public class PulsarSink extends AbstractSink implements 
Configurable, SendMessag
 
         pulsarCluster = configManager.getMqClusterUrl2Token();
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
             ConfigManager.getInstance().updMqClusterStatus(true);
             logger.info("[{}] MQ Cluster service status ready!", getName());
         }
@@ -350,9 +349,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable, SendMessag
                         ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
         this.metricItemSet = new DataProxyMetricItemSet(clusterId, 
this.getName());
         MetricRegister.register(metricItemSet);
-        if (ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
-        }
+        this.canTake = true;
         logger.info("[{}] Pulsar sink started", getName());
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index dee798214..e1dbf90a3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -76,7 +76,6 @@ public class TubeSink extends AbstractSink implements 
Configurable {
     private static final Logger logger = 
LoggerFactory.getLogger(TubeSink.class);
     private static final MsgDedupHandler MSG_DEDUP_HANDLER = new 
MsgDedupHandler();
     private TubeProducerHolder producerHolder = null;
-    private volatile boolean canTake = false;
     private volatile boolean canSend = false;
     private volatile boolean isOverFlow = false;
     private ConfigManager configManager;
@@ -212,9 +211,6 @@ public class TubeSink extends AbstractSink implements 
Configurable {
         // start the cleaner thread
         super.start();
         this.canSend = true;
-        if (ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
-        }
         for (int i = 0; i < sinkThreadPool.length; i++) {
             sinkThreadPool[i] = new Thread(new TubeSinkTask(),
                     getName() + "_tube_sink_sender-" + i);
@@ -229,7 +225,6 @@ public class TubeSink extends AbstractSink implements 
Configurable {
             logger.info("Duplicated call, " + getName() + " has stopped!");
             return;
         }
-        this.canTake = false;
         // waiting inflight message processed
         int waitCount = 0;
         while (takenMsgCnt.get() > 0 && waitCount++ < 10) {
@@ -269,7 +264,7 @@ public class TubeSink extends AbstractSink implements 
Configurable {
 
     @Override
     public Status process() throws EventDeliveryException {
-        if (!this.canTake) {
+        if (!this.started.get()) {
             return Status.BACKOFF;
         }
         Status status = Status.READY;
@@ -319,6 +314,10 @@ public class TubeSink extends AbstractSink implements 
Configurable {
             logger.info("sink task {} started.", 
Thread.currentThread().getName());
             while (canSend) {
                 try {
+                    if (!started.get() && cachedMsgCnt.get() <= 0) {
+                        logger.info("Found started is false and taken message 
count is zero, braek!");
+                        break;
+                    }
                     if (isOverFlow) {
                         isOverFlow = false;
                         Thread.sleep(30);
@@ -327,10 +326,6 @@ public class TubeSink extends AbstractSink implements 
Configurable {
                     if (resendQueue.isEmpty()) {
                         event = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
                         if (event == null) {
-                            if (!canTake && takenMsgCnt.get() <= 0) {
-                                logger.info("Found canTake is false and taken 
message count is zero, braek!");
-                                break;
-                            }
                             continue;
                         }
                         cachedMsgCnt.decrementAndGet();
@@ -534,7 +529,7 @@ public class TubeSink extends AbstractSink implements 
Configurable {
 
         @Override
         public void run() {
-            if (!canTake && takenMsgCnt.get() <= 0) {
+            if (!started.get()) {
                 return;
             }
             logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + 
cachedMsgCnt.get()
@@ -621,7 +616,7 @@ public class TubeSink extends AbstractSink implements 
Configurable {
             if (producerHolder != null) {
                 try {
                     producerHolder.createProducersByTopicSet(addedTopics);
-                } catch (Exception e) {
+                } catch (Throwable e) {
                     logger.info(getName() + "'s publish new topic set fail.", 
e);
                 }
             }
@@ -675,7 +670,6 @@ public class TubeSink extends AbstractSink implements 
Configurable {
             tmpProducerHolder.stop();
         }
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            this.canTake = true;
             ConfigManager.getInstance().updMqClusterStatus(true);
             logger.info("[{}] MQ Cluster service status ready!", getName());
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
index 04b43c87f..37f828d24 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
@@ -263,7 +263,11 @@ public class TubeProducerHolder {
                 lastProducer = sessionFactory.createProducer();
                 lastPubTopicCnt.set(0);
             }
-            lastProducer.publish(subTopicSet);
+            try {
+                lastProducer.publish(subTopicSet);
+            } catch (Throwable e) {
+                logger.info(sinkName + " meta sink publish fail.", e);
+            }
             lastPubTopicCnt.addAndGet(subTopicSet.size());
             for (String topicItem : subTopicSet) {
                 producerMap.put(topicItem, lastProducer);

Reply via email to