This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 668ff36c5ff6e65a1dbdcc58060dee6c9d9a8370 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Sep 17 19:35:15 2022 +0800 [INLONG-5918][DataProxy] Optimize PulsarSink class (#5923) --- .../apache/inlong/dataproxy/sink/PulsarSink.java | 106 +++++++-------------- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 4 +- .../dataproxy/sink/pulsar/PulsarClientService.java | 37 ++++--- .../inlong/dataproxy/sink/pulsar/SinkTask.java | 68 +++++-------- 4 files changed, 79 insertions(+), 136 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 8127a4305..94b5ed2cc 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -51,6 +51,7 @@ import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack; import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService; import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack; import org.apache.inlong.dataproxy.sink.pulsar.SinkTask; +import org.apache.inlong.dataproxy.utils.DateTimeUtils; import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder; import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.inlong.dataproxy.utils.NetworkUtils; @@ -122,10 +123,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag return System.currentTimeMillis(); } }); - /* - * properties for header info - */ - private static final String TOPIC = "topic"; /* * for stat */ @@ -407,14 +404,16 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag + "last long time it will cause memoryChannel full and fileChannel write.)", getName()); tx.rollback(); // metric - dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, "")); + dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, + event.getHeaders().get(ConfigConstants.TOPIC_KEY)); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); metricItem.readFailCount.incrementAndGet(); metricItem.readFailSize.addAndGet(event.getBody().length); } else { tx.commit(); // metric - dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, "")); + dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, + event.getHeaders().get(ConfigConstants.TOPIC_KEY)); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); metricItem.readSuccessCount.incrementAndGet(); metricItem.readSuccessSize.addAndGet(event.getBody().length); @@ -437,70 +436,33 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag return status; } - private void editStatistic(final Event event, String keyPostfix, boolean isOrder) { - String topic = ""; - String streamId = ""; - String nodeIp; - if (event != null) { - if (event.getHeaders().containsKey(TOPIC)) { - topic = event.getHeaders().get(TOPIC); - } - if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) { - streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); - } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) { - streamId = event.getHeaders().get(AttributeConstants.INAME); - } - - // Compatible agent - if (event.getHeaders().containsKey("ip")) { - event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip")); - event.getHeaders().remove("ip"); - } - - // Compatible agent - if (event.getHeaders().containsKey("time")) { - event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time")); - event.getHeaders().remove("time"); - } - - if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) { - nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); - if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) { - if (nodeIp != null) { - nodeIp = nodeIp.split(":")[0]; - } - - long msgCounterL = 1L; - // msg counter - if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) { - msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); - } - - String orderType = "non-order"; - if (isOrder) { - orderType = "order"; - } - StringBuilder newBase = new StringBuilder(); - newBase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR) - .append(streamId).append(SEPARATOR).append(nodeIp) - .append(SEPARATOR).append(NetworkUtils.getLocalIp()) - .append(SEPARATOR).append(orderType).append(SEPARATOR) - .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); - - long messageSize = event.getBody().length; - if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) { - messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN)); - } - - if (keyPostfix != null && !keyPostfix.equals("")) { - monitorIndex.addAndGet(new String(newBase), 0, 0, 0, (int) msgCounterL); - if (logPrinterB.shouldPrint()) { - logger.warn("error cannot send event, {} event size is {}", topic, messageSize); - } - } else { - monitorIndex.addAndGet(new String(newBase), (int) msgCounterL, 1, messageSize, 0); - } - } + private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) { + if (event == null + || pulsarConfig.getStatIntervalSec() <= 0) { + return; + } + // get statistic items + String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); + String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); + String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); + int intMsgCnt = Integer.parseInt( + event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); + long dataTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.DATA_TIME)); + String orderType = isOrder ? "order" : "non-order"; + StringBuilder newBase = new StringBuilder(512) + .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR) + .append(streamId).append(SEPARATOR).append(nodeIp) + .append(SEPARATOR).append(NetworkUtils.getLocalIp()) + .append(SEPARATOR).append(orderType).append(SEPARATOR) + .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL)); + long messageSize = event.getBody().length; + if (isSuccess) { + monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0); + } else { + monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt); + if (logPrinterB.shouldPrint()) { + logger.warn("error cannot send event, {} event size is {}", topic, messageSize); } } } @@ -549,7 +511,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag metricItem.sendCount.incrementAndGet(); metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length); monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS"); - editStatistic(eventStat.getEvent(), null, eventStat.isOrderMessage()); + editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage()); } @@ -571,7 +533,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag logger.error("send failed for " + getName(), e); } if (eventStat.getRetryCnt() == 0) { - editStatistic(eventStat.getEvent(), "failure", eventStat.isOrderMessage()); + editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage()); } } Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index a27a00758..e3fcc1155 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -516,8 +516,8 @@ public class TubeSink extends AbstractSink implements Configurable { long dataTimeL = Long.parseLong( event.getHeaders().get(AttributeConstants.DATA_TIME)); // build statistic key - StringBuilder newBase = new StringBuilder(512); - newBase.append(getName()).append(SEP_HASHTAG).append(topic) + StringBuilder newBase = new StringBuilder(512) + .append(getName()).append(SEP_HASHTAG).append(topic) .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG) .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp()) .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java index 28af3fa1c..47cdff3fa 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java @@ -18,6 +18,13 @@ package org.apache.inlong.dataproxy.sink.pulsar; import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import io.netty.buffer.ByteBuf; import org.apache.commons.lang3.StringUtils; import org.apache.flume.Event; @@ -43,15 +50,6 @@ import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - public class PulsarClientService { private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class); @@ -82,8 +80,6 @@ public class PulsarClientService { private int maxBatchingMessages = 1000; private long maxBatchingPublishDelayMillis = 1; private long retryIntervalWhenSendMsgError = 30 * 1000L; - private String localIp = "127.0.0.1"; - private int sinkThreadPoolSize; /** @@ -120,7 +116,6 @@ public class PulsarClientService { maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis(); producerInfoMap = new ConcurrentHashMap<>(); topicSendIndexMap = new ConcurrentHashMap<>(); - localIp = NetworkUtils.getLocalIp(); } public void initCreateConnection(CreatePulsarClientCallBack callBack) { @@ -141,11 +136,15 @@ public class PulsarClientService { * send message */ public boolean sendMessage(int poolIndex, String topic, Event event, - SendMessageCallBack sendMessageCallBack, EventStat es) { + SendMessageCallBack sendMessageCallBack, EventStat es) { TopicProducerInfo producerInfo = null; boolean result; - final String inlongStreamId = getInlongStreamId(event); - final String inlongGroupId = getInlongGroupId(event); + final String pkgVersion = + event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER); + final String inlongStreamId = + event.getHeaders().get(AttributeConstants.GROUP_ID); + final String inlongGroupId = + event.getHeaders().get(AttributeConstants.STREAM_ID); try { producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId); } catch (Exception e) { @@ -166,11 +165,6 @@ public class PulsarClientService { sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer info is null")); return true; } - - Map<String, String> proMap = new HashMap<>(); - proMap.put("data_proxy_ip", localIp); - proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); - TopicProducerInfo forCallBackP = producerInfo; Producer producer = producerInfo.getProducer(poolIndex); if (producer == null) { @@ -179,6 +173,9 @@ public class PulsarClientService { sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is null")); return true; } + // build and send message + Map<String, String> proMap = + MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion); if (es.isOrderMessage()) { String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY); try { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java index 2b44a2d88..efbaedfb6 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java @@ -41,8 +41,6 @@ public class SinkTask extends Thread { private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60 * 1000); - private static String TOPIC = "topic"; - /* * default value */ @@ -150,70 +148,56 @@ public class SinkTask extends Thread { sinkCounter.incrementEventDrainAttemptCount(); event = eventStat.getEvent(); } - - /* - * get topic - */ - if (event.getHeaders().containsKey(TOPIC)) { - topic = event.getHeaders().get(TOPIC); + // check event status + if (event == null) { + logger.warn("Event is null!"); + continue; } + // get topic + topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); if (StringUtils.isEmpty(topic)) { String groupId = event.getHeaders().get(AttributeConstants.GROUP_ID); String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); topic = MessageUtils.getTopic(pulsarSink.getTopicsProperties(), groupId, streamId); } - - if (event == null) { - logger.warn("Event is null!"); - continue; - } - if (topic == null || topic.equals("")) { - pulsarSink.handleMessageSendException(topic, eventStat, new Exception("topic" - + " info is null")); + pulsarSink.handleMessageSendException(topic, eventStat, + new Exception(ConfigConstants.TOPIC_KEY + " info is null")); processToReTrySend(eventStat); logger.warn("no topic specified, so will retry send!"); continue; } - + // check whether order-type message if (eventStat.isOrderMessage()) { sleep(1000); } - + // check whether discard or send event if (eventStat.getRetryCnt() > maxRetrySendCnt) { logger.warn("Message will be discard! send times reach to max retry cnt." + " topic = {}, max retry cnt = {}", topic, maxRetrySendCnt); continue; } - + // check whether duplicated event String clientSeqId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID); - - boolean hasSend = false; if (pulsarConfig.getClientIdCache() && clientSeqId != null) { - hasSend = agentIdCache.asMap().containsKey(clientSeqId); - } - - if (pulsarConfig.getClientIdCache() && clientSeqId != null && hasSend) { + boolean hasSend = agentIdCache.asMap().containsKey(clientSeqId); agentIdCache.put(clientSeqId, System.currentTimeMillis()); - if (logPrinterA.shouldPrint()) { - logger.info("{} agent package {} existed,just discard.", - getName(), clientSeqId); - } - } else { - if (pulsarConfig.getClientIdCache() && clientSeqId != null) { - agentIdCache.put(clientSeqId, System.currentTimeMillis()); - } - boolean sendResult = pulsarClientService.sendMessage(poolIndex, topic, - event, pulsarSink, eventStat); - if (!sendResult) { - /* - * only for order message - */ - processToReTrySend(eventStat); + if (hasSend) { + if (logPrinterA.shouldPrint()) { + logger.info("{} agent package {} existed,just discard.", + getName(), clientSeqId); + } + continue; } - currentInFlightCount.incrementAndGet(); - decrementFlag = true; } + // send message + if (!pulsarClientService.sendMessage( + poolIndex, topic, event, pulsarSink, eventStat)) { + // only for order message + processToReTrySend(eventStat); + } + currentInFlightCount.incrementAndGet(); + decrementFlag = true; } catch (InterruptedException e) { logger.error("Thread {} has been interrupted!", Thread.currentThread().getName());