This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 5cf0b4311 [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157) 5cf0b4311 is described below commit 5cf0b4311bade4e203f8c2153d724376144de997 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Oct 12 18:15:31 2022 +0800 [INLONG-6156][DataProxy] Twice event write when topic is empty (#6157) --- .../org/apache/inlong/dataproxy/sink/PulsarSink.java | 2 +- .../apache/inlong/dataproxy/sink/pulsar/SinkTask.java | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index c2014965d..b73223b11 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -498,7 +498,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag } addStatistics(eventStat, false, 0); eventStat.incRetryCnt(); - if (!eventStat.isOrderMessage() && needRetry) { + if (needRetry) { processResendEvent(eventStat); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java index efbaedfb6..a0a488fa6 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java @@ -32,6 +32,7 @@ import org.apache.inlong.dataproxy.sink.EventStat; import org.apache.inlong.dataproxy.sink.PulsarSink; import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +154,12 @@ public class SinkTask extends Thread { logger.warn("Event is null!"); continue; } + // check whether discard or send event + if (eventStat.getRetryCnt() > maxRetrySendCnt) { + logger.warn("Message will be discard! send times reach to max retry cnt." + + " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt); + continue; + } // get topic topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); if (StringUtils.isEmpty(topic)) { @@ -162,21 +169,13 @@ public class SinkTask extends Thread { } if (topic == null || topic.equals("")) { pulsarSink.handleMessageSendException(topic, eventStat, - new Exception(ConfigConstants.TOPIC_KEY + " info is null")); - processToReTrySend(eventStat); - logger.warn("no topic specified, so will retry send!"); + new NotFoundException(ConfigConstants.TOPIC_KEY + " info is null")); continue; } // check whether order-type message if (eventStat.isOrderMessage()) { sleep(1000); } - // check whether discard or send event - if (eventStat.getRetryCnt() > maxRetrySendCnt) { - logger.warn("Message will be discard! send times reach to max retry cnt." - + " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt); - continue; - } // check whether duplicated event String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID); if (pulsarConfig.getClientIdCache() && clientSeqId != null) {