This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 5cf0b4311 [INLONG-6156][DataProxy] Twice event write when topic is 
empty (#6157)
5cf0b4311 is described below

commit 5cf0b4311bade4e203f8c2153d724376144de997
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Wed Oct 12 18:15:31 2022 +0800

    [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157)
---
 .../org/apache/inlong/dataproxy/sink/PulsarSink.java    |  2 +-
 .../apache/inlong/dataproxy/sink/pulsar/SinkTask.java   | 17 ++++++++---------
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index c2014965d..b73223b11 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -498,7 +498,7 @@ public class PulsarSink extends AbstractSink implements 
Configurable, SendMessag
         }
         addStatistics(eventStat, false, 0);
         eventStat.incRetryCnt();
-        if (!eventStat.isOrderMessage() && needRetry) {
+        if (needRetry) {
             processResendEvent(eventStat);
         }
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index efbaedfb6..a0a488fa6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -32,6 +32,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
 import org.apache.inlong.dataproxy.sink.PulsarSink;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,6 +154,12 @@ public class SinkTask extends Thread {
                     logger.warn("Event is null!");
                     continue;
                 }
+                // check whether discard or send event
+                if (eventStat.getRetryCnt() > maxRetrySendCnt) {
+                    logger.warn("Message will be discard! send times reach to 
max retry cnt."
+                            + " topic = {}, max retry cnt = {}", topic, 
maxRetrySendCnt);
+                    continue;
+                }
                 // get topic
                 topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
                 if (StringUtils.isEmpty(topic)) {
@@ -162,21 +169,13 @@ public class SinkTask extends Thread {
                 }
                 if (topic == null || topic.equals("")) {
                     pulsarSink.handleMessageSendException(topic, eventStat,
-                            new Exception(ConfigConstants.TOPIC_KEY + " info 
is null"));
-                    processToReTrySend(eventStat);
-                    logger.warn("no topic specified, so will retry send!");
+                            new NotFoundException(ConfigConstants.TOPIC_KEY + 
" info is null"));
                     continue;
                 }
                 // check whether order-type message
                 if (eventStat.isOrderMessage()) {
                     sleep(1000);
                 }
-                // check whether discard or send event
-                if (eventStat.getRetryCnt() > maxRetrySendCnt) {
-                    logger.warn("Message will be discard! send times reach to 
max retry cnt."
-                            + " topic = {}, max retry cnt = {}", topic, 
maxRetrySendCnt);
-                    continue;
-                }
                 // check whether duplicated event
                 String clientSeqId = 
event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
                 if (pulsarConfig.getClientIdCache() && clientSeqId != null) {

Reply via email to