This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 59352497b [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925) 59352497b is described below commit 59352497b14fd0dd835a2038b699adc5266cf590 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Sep 19 11:07:46 2022 +0800 [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925) --- .../inlong/dataproxy/consts/ConfigConstants.java | 1 + .../inlong/dataproxy/http/HttpBaseSource.java | 9 +- .../dataproxy/http/SimpleMessageHandler.java | 35 ++--- .../dataproxy/metrics/DataProxyMetricItemSet.java | 128 +++++++++++++++ .../apache/inlong/dataproxy/sink/PulsarSink.java | 174 ++++++++++----------- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 121 +++++--------- .../dataproxy/sink/pulsar/PulsarClientService.java | 8 +- .../dataproxy/sink/pulsar/SendMessageCallBack.java | 2 +- .../apache/inlong/dataproxy/source/BaseSource.java | 24 ++- .../dataproxy/source/ServerMessageHandler.java | 65 +++----- .../dataproxy/source/SimpleMessageHandler.java | 11 +- .../inlong/dataproxy/source/SimpleTcpSource.java | 14 -- .../inlong/dataproxy/source/SimpleUdpSource.java | 1 + 13 files changed, 320 insertions(+), 273 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java index 1a6fcada1..ae6bc380f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java @@ -96,6 +96,7 @@ public class ConfigConstants { public static final String CLUSTER_ID_KEY = "clusterId"; public static final String MANAGER_HOST = "manager.hosts"; public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name"; + public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy"; public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag"; public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; public static final String PROXY_REPORT_IP = "proxy.report.ip"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java index 84e45e2fd..973bc128c 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java @@ -29,6 +29,7 @@ import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.common.monitor.MonitorIndex; import org.apache.inlong.common.monitor.MonitorIndexExt; import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor; +import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.utils.ConfStringUtils; @@ -77,7 +78,13 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource, statIntervalSec, maxMonitorCnt); } // register metrics - this.metricItemSet = new DataProxyMetricItemSet(this.getName()); + ConfigManager configManager = ConfigManager.getInstance(); + String clusterId = + configManager.getCommonProperties().getOrDefault( + ConfigConstants.PROXY_CLUSTER_NAME, + ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); + this.metricItemSet = + new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port)); MetricRegister.register(metricItemSet); super.start(); logger.info("{} started!", this.getName()); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index 16bb5c917..c5ca7c1d7 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -36,7 +36,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.http.exception.MessageProcessException; -import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.ServiceDecoder; @@ -175,14 +174,14 @@ public class SimpleMessageHandler implements MessageHandler { intMsgCnt, 1, data.length, 0); monitorIndexExt.incrementAndGet("EVENT_SUCCESS"); } - addMetric(true, data.length, event); + addStatistics(true, data.length, event); } catch (ChannelException ex) { if (monitorIndex != null) { monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, intMsgCnt); monitorIndexExt.incrementAndGet("EVENT_DROPPED"); } - addMetric(false, data.length, event); + addStatistics(false, data.length, event); logCounter++; if (logCounter == 1 || logCounter % 1000 == 0) { LOG.error("Error writing to channel, and will retry after 1s, ex={}," @@ -223,31 +222,19 @@ public class SimpleMessageHandler implements MessageHandler { } /** - * add audit metric + * add statistics information * - * @param result success or failure + * @param isSuccess success or failure * @param size message size * @param event message event */ - private void addMetric(boolean result, long size, Event event) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.metricItemSet.getName()); - dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, this.metricItemSet.getName()); - DataProxyMetricItem.fillInlongId(event, dimensions); - DataProxyMetricItem.fillAuditFormatTime(event, dimensions); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - if (result) { - metricItem.readSuccessCount.incrementAndGet(); - metricItem.readSuccessSize.addAndGet(size); - try { - AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event); - } catch (Exception e) { - LOG.error("add audit metric has exception e= {}", e); - } - } else { - metricItem.readFailCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(size); + private void addStatistics(boolean isSuccess, long size, Event event) { + if (event == null) { + return; + } + metricItemSet.fillSrcMetricItemsByEvent(event, isSuccess, size); + if (isSuccess) { + AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event); } } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java index 811442428..e420e03a2 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java @@ -17,8 +17,15 @@ package org.apache.inlong.dataproxy.metrics; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.flume.Event; import org.apache.inlong.common.metric.MetricDomain; import org.apache.inlong.common.metric.MetricItemSet; +import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder; +import org.apache.inlong.dataproxy.consts.AttributeConstants; +import org.apache.inlong.dataproxy.consts.ConfigConstants; /** * @@ -26,6 +33,8 @@ import org.apache.inlong.common.metric.MetricItemSet; */ @MetricDomain(name = "DataProxy") public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { + private String clusterId = null; + private String sourceDataId = null; /** * Constructor @@ -36,6 +45,125 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { super(name); } + /** + * Constructor + * + * @param clusterId the cluster id + * @param name the module name + */ + public DataProxyMetricItemSet(String clusterId, String name) { + super(name); + this.clusterId = clusterId; + } + + /** + * Constructor + * + * @param clusterId the cluster id + * @param name the module name + * @param sourceDataId the source data id + */ + public DataProxyMetricItemSet(String clusterId, String name, String sourceDataId) { + super(name); + this.clusterId = clusterId; + this.sourceDataId = sourceDataId; + } + + /** + * Fill source metric items by event + * + * @param event the event object + * @param isSuccess whether success read + * @param size the message size + */ + public void fillSrcMetricItemsByEvent(Event event, boolean isSuccess, long size) { + fillMetricItemsByEvent(event, true, true, isSuccess, size, 0); + } + + /** + * Fill sink metric items by event + * + * @param event the event object + * @param isSuccess whether success read or send + * @param size the message size + */ + public void fillSinkReadMetricItemsByEvent(Event event, boolean isSuccess, long size) { + fillMetricItemsByEvent(event, false, true, isSuccess, size, 0); + } + + /** + * Fill sink send metric items by event + * + * @param event the event object + * @param sentTime the sent time + * @param isSuccess whether success read or send + * @param size the message size + */ + public void fillSinkSendMetricItemsByEvent(Event event, long sentTime, + boolean isSuccess, long size) { + fillMetricItemsByEvent(event, false, false, isSuccess, size, sentTime); + } + + /** + * Fill metric items by event + * + * @param event the event object + * @param isSource whether source part + * @param isReadOp whether read operation + * @param isSuccess whether success read or send + * @param size the message size + */ + private void fillMetricItemsByEvent(Event event, boolean isSource, + boolean isReadOp, boolean isSuccess, + long size, long sendTime) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, clusterId); + dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, + event.getHeaders().get(AttributeConstants.GROUP_ID)); + dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, + event.getHeaders().get(AttributeConstants.STREAM_ID)); + long dataTime = NumberUtils.toLong( + event.getHeaders().get(AttributeConstants.DATA_TIME)); + long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval(); + dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime)); + if (isSource) { + dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, name); + dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, sourceDataId); + } else { + dimensions.put(DataProxyMetricItem.KEY_SINK_ID, name); + dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, + event.getHeaders().get(ConfigConstants.TOPIC_KEY)); + } + DataProxyMetricItem metricItem = findMetricItem(dimensions); + if (isReadOp) { + if (isSuccess) { + metricItem.readSuccessCount.incrementAndGet(); + metricItem.readSuccessSize.addAndGet(size); + } else { + metricItem.readFailCount.incrementAndGet(); + metricItem.readFailSize.addAndGet(size); + } + } else { + if (isSuccess) { + metricItem.sendSuccessCount.incrementAndGet(); + metricItem.sendSuccessSize.addAndGet(event.getBody().length); + if (sendTime > 0) { + long currentTime = System.currentTimeMillis(); + long msgDataTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.DATA_TIME)); + long msgRcvTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.RCV_TIME)); + metricItem.sinkDuration.addAndGet(currentTime - sendTime); + metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL); + metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL); + } + } else { + metricItem.sendFailCount.incrementAndGet(); + metricItem.sendFailSize.addAndGet(event.getBody().length); + } + } + } + /** * createItem * diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 69da396bc..3785ae993 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -17,6 +17,9 @@ package org.apache.inlong.dataproxy.sink; +import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG; +import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT; + import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -24,6 +27,16 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import io.netty.handler.codec.TooLongFrameException; import org.apache.flume.Channel; import org.apache.flume.Context; @@ -45,8 +58,8 @@ import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; -import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; +import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack; import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService; import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack; @@ -62,19 +75,6 @@ import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT; - /** * Use pulsarSink need adding such config, if these ara not config in dataproxy-pulsar.conf, * PulsarSink will use default value. @@ -172,7 +172,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag /* * metric */ - private Map<String, String> dimensions; private DataProxyMetricItemSet metricItemSet; private ConfigManager configManager; private Map<String, String> topicProperties; @@ -196,7 +195,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag topicProperties = configManager.getTopicProperties(); pulsarCluster = configManager.getMqClusterUrl2Token(); pulsarConfig = configManager.getMqClusterConfig(); //pulsar common config - Map<String, String> commonProperties = configManager.getCommonProperties(); sinkThreadPoolSize = pulsarConfig.getThreadNum(); if (sinkThreadPoolSize <= 0) { sinkThreadPoolSize = 1; @@ -294,11 +292,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag @Override public void start() { logger.info("[{}] pulsar sink starting...", getName()); - // register metrics - this.dimensions = new HashMap<>(); - this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); - sinkCounter.start(); pulsarClientService.initCreateConnection(this); @@ -326,8 +319,13 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i); sinkThreadPool[i].start(); } - - this.metricItemSet = new DataProxyMetricItemSet(this.getName()); + // register metricItemSet + ConfigManager configManager = ConfigManager.getInstance(); + String clusterId = + configManager.getCommonProperties().getOrDefault( + ConfigConstants.PROXY_CLUSTER_NAME, + ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); + this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName()); MetricRegister.register(metricItemSet); this.canTake = true; logger.info("[{}] Pulsar sink started", getName()); @@ -405,19 +403,13 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag + "last long time it will cause memoryChannel full and fileChannel write.)", getName()); tx.rollback(); // metric - dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, - event.getHeaders().get(ConfigConstants.TOPIC_KEY)); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readFailCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + this.metricItemSet.fillSinkReadMetricItemsByEvent( + event, false, event.getBody().length); } else { tx.commit(); // metric - dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, - event.getHeaders().get(ConfigConstants.TOPIC_KEY)); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readSuccessCount.incrementAndGet(); - metricItem.readSuccessSize.addAndGet(event.getBody().length); + this.metricItemSet.fillSinkReadMetricItemsByEvent( + event, true, event.getBody().length); } } else { status = Status.BACKOFF; @@ -437,37 +429,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag return status; } - private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) { - if (event == null - || pulsarConfig.getStatIntervalSec() <= 0) { - return; - } - // get statistic items - String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); - String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); - String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); - int intMsgCnt = Integer.parseInt( - event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); - long dataTimeL = Long.parseLong( - event.getHeaders().get(AttributeConstants.DATA_TIME)); - String orderType = isOrder ? "order" : "non-order"; - StringBuilder newBase = new StringBuilder(512) - .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR) - .append(streamId).append(SEPARATOR).append(nodeIp) - .append(SEPARATOR).append(NetworkUtils.getLocalIp()) - .append(SEPARATOR).append(orderType).append(SEPARATOR) - .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL)); - long messageSize = event.getBody().length; - if (isSuccess) { - monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0); - } else { - monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt); - if (logPrinterB.shouldPrint()) { - logger.warn("error cannot send event, {} event size is {}", topic, messageSize); - } - } - } - @Override public void handleCreateClientSuccess(String url) { logger.info("createConnection success for url = {}", url); @@ -481,7 +442,8 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag } @Override - public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) { + public void handleMessageSendSuccess(String topic, Object result, + EventStat eventStat, long startTime) { /* * Statistics pulsar performance */ @@ -505,20 +467,11 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag getName(), (nowCnt - oldCnt), (t2 - t1)); t1 = t2; } - Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.sendSuccessCount.incrementAndGet(); - metricItem.sendSuccessSize.addAndGet(eventStat.getEvent().getBody().length); - metricItem.sendCount.incrementAndGet(); - metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length); - monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS"); - editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage()); - + addStatistics(eventStat, true, startTime); } @Override public void handleMessageSendException(String topic, EventStat eventStat, Object e) { - monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP"); boolean needRetry = true; if (e instanceof NotFoundException) { logger.error("NotFoundException for topic " + topic + ", message will be discard!", e); @@ -533,26 +486,67 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag if (logPrinterB.shouldPrint()) { logger.error("send failed for " + getName(), e); } - if (eventStat.getRetryCnt() == 0) { - editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage()); - } } - Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.sendFailCount.incrementAndGet(); - metricItem.sendFailSize.addAndGet(eventStat.getEvent().getBody().length); + addStatistics(eventStat, false, 0); eventStat.incRetryCnt(); if (!eventStat.isOrderMessage() && needRetry) { processResendEvent(eventStat); } } - private Map<String, String> getNewDimension(String otherKey, String value) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); - dimensions.put(otherKey, value); - return dimensions; + /** + * Add statistics information + * + * @param eventStat the statistic event + * @param isSuccess is processed successfully + * @param sendTime the send time when success processed + */ + private void addStatistics(EventStat eventStat, boolean isSuccess, long sendTime) { + if (eventStat == null || eventStat.getEvent() == null) { + return; + } + Event event = eventStat.getEvent(); + // add jmx metric items; + PulsarSink.this.metricItemSet.fillSinkSendMetricItemsByEvent( + event, sendTime, isSuccess, event.getBody().length); + if (isSuccess) { + AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event); + } + if (pulsarConfig.getStatIntervalSec() <= 0) { + return; + } + // add monitor items base file storage + String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); + String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); + String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); + int intMsgCnt = Integer.parseInt( + event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); + long dataTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.DATA_TIME)); + String orderType = eventStat.isOrderMessage() ? "order" : "non-order"; + // build statistic key + StringBuilder newBase = new StringBuilder(512) + .append(getName()).append(SEP_HASHTAG).append(topic) + .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG) + .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp()) + .append(SEP_HASHTAG).append(orderType).append(SEP_HASHTAG) + .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL)); + // count data + if (isSuccess) { + monitorIndex.addAndGet(newBase.toString(), + intMsgCnt, 1, event.getBody().length, 0); + monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS"); + } else { + monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP"); + if (eventStat.getRetryCnt() == 0) { + monitorIndex.addAndGet(newBase.toString(), + 0, 0, 0, intMsgCnt); + if (logPrinterB.shouldPrint()) { + logger.warn("error cannot send event, {} event size is {}", + topic, event.getBody().length); + } + } + } } private boolean processEvent(EventStat eventStat) { @@ -626,7 +620,9 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag } } } catch (Throwable throwable) { - monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED"); + if (pulsarConfig.getStatIntervalSec() > 0) { + monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED"); + } if (logPrinterC.shouldPrint()) { logger.error(getName() + " Discard msg because put events to both of " + "queue and fileChannel fail", throwable); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index f1f2b6bbb..86787136a 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -21,7 +21,6 @@ import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG; import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT; import com.google.common.base.Preconditions; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -53,7 +52,6 @@ import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; -import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler; @@ -94,7 +92,6 @@ public class TubeSink extends AbstractSink implements Configurable { // used for RoundRobin different cluster while send message private RateLimiter diskRateLimiter; private Thread[] sinkThreadPool; - private Map<String, String> dimensions; private DataProxyMetricItemSet metricItemSet; private final AtomicBoolean started = new AtomicBoolean(false); private static final LogCounter LOG_SINK_TASK_PRINTER = @@ -186,12 +183,13 @@ public class TubeSink extends AbstractSink implements Configurable { monitorIndexExt = new MonitorIndexExt("Tube_Sink_monitors#" + this.getName(), statIntervalSec, maxMonitorCnt); } - // initial dimensions - this.dimensions = new HashMap<>(); - this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); // register metrics - this.metricItemSet = new DataProxyMetricItemSet(this.getName()); + ConfigManager configManager = ConfigManager.getInstance(); + String clusterId = + configManager.getCommonProperties().getOrDefault( + ConfigConstants.PROXY_CLUSTER_NAME, + ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); + this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName()); MetricRegister.register(metricItemSet); // create tube connection try { @@ -272,27 +270,17 @@ public class TubeSink extends AbstractSink implements Configurable { if (diskRateLimiter != null) { diskRateLimiter.acquire(event.getBody().length); } - Map<String, String> dimensions; - dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, - event.getHeaders().getOrDefault(ConfigConstants.TOPIC_KEY, "")); if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) { tx.commit(); cachedMsgCnt.incrementAndGet(); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readSuccessCount.incrementAndGet(); - metricItem.readSuccessSize.addAndGet(event.getBody().length); + this.metricItemSet.fillSinkReadMetricItemsByEvent( + event, true, event.getBody().length); } else { tx.rollback(); - //logger.info("[{}] Channel --> Queue(has no enough space,current code point) " - // + "--> TubeMQ, check if TubeMQ server or network is ok.(if this situation last long time " - // + "it will cause memoryChannel full and fileChannel write.)", getName()); - // metric - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readFailCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + this.metricItemSet.fillSinkReadMetricItemsByEvent( + event, false, event.getBody().length); } } else { - // logger.info("[{}]No data to process in the channel.",getName()); status = Status.BACKOFF; tx.commit(); } @@ -436,17 +424,9 @@ public class TubeSink extends AbstractSink implements Configurable { successMsgCnt.incrementAndGet(); inflightMsgCnt.decrementAndGet(); takenMsgCnt.decrementAndGet(); - this.addMetric(myEventStat.getEvent(), true, sendTime); - if (statIntervalSec > 0) { - monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS); - } - this.editStatistic(myEventStat.getEvent(), true); + this.addStatistics(myEventStat.getEvent(), true, false, sendTime); } else { - this.addMetric(myEventStat.getEvent(), false, 0); - if (statIntervalSec > 0) { - monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE); - } - this.editStatistic(myEventStat.getEvent(), false); + this.addStatistics(myEventStat.getEvent(), false, false, 0); if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) { logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", result.getErrMsg(), resendQueue.size(), @@ -464,50 +444,34 @@ public class TubeSink extends AbstractSink implements Configurable { @Override public void onException(final Throwable e) { - if (statIntervalSec > 0) { - monitorIndexExt.incrementAndGet(KEY_SINK_EXP); - } - this.editStatistic(myEventStat.getEvent(), false); + addStatistics(myEventStat.getEvent(), false, true, 0); resendEvent(myEventStat, true); } /** - * addMetric + * Add statistics information + * + * @param event the statistic event + * @param isSuccess is processed successfully + * @param isException is exception when failure processed + * @param sendTime the send time when success processed */ - private void addMetric(Event event, boolean result, long sendTime) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, TubeSink.this.getName()); - dimensions.put(DataProxyMetricItem.KEY_SINK_ID, TubeSink.this.getName()); - dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, - event.getHeaders().get(ConfigConstants.TOPIC_KEY)); - DataProxyMetricItem.fillInlongId(event, dimensions); - DataProxyMetricItem.fillAuditFormatTime(event, dimensions); - DataProxyMetricItem metricItem = TubeSink.this.metricItemSet.findMetricItem(dimensions); - if (result) { - metricItem.sendSuccessCount.incrementAndGet(); - metricItem.sendSuccessSize.addAndGet(event.getBody().length); + private void addStatistics(Event event, boolean isSuccess, + boolean isException, long sendTime) { + if (event == null) { + return; + } + // add jmx metric items; + TubeSink.this.metricItemSet.fillSinkSendMetricItemsByEvent( + event, sendTime, isSuccess, event.getBody().length); + // add audit items; + if (isSuccess) { AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event); - if (sendTime > 0) { - long currentTime = System.currentTimeMillis(); - long msgDataTimeL = Long.parseLong( - event.getHeaders().get(AttributeConstants.DATA_TIME)); - long msgRcvTimeL = Long.parseLong( - event.getHeaders().get(AttributeConstants.RCV_TIME)); - metricItem.sinkDuration.addAndGet(currentTime - sendTime); - metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL); - metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL); - } - } else { - metricItem.sendFailCount.incrementAndGet(); - metricItem.sendFailSize.addAndGet(event.getBody().length); } - } - - private void editStatistic(final Event event, boolean isSuccess) { - if (event == null || statIntervalSec <= 0) { + if (statIntervalSec <= 0) { return; } - // get statistic items + // add monitor items base file storage String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); @@ -526,9 +490,14 @@ public class TubeSink extends AbstractSink implements Configurable { if (isSuccess) { monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, event.getBody().length, 0); + monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS); } else { monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt); + monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE); + if (isException) { + monitorIndexExt.incrementAndGet(KEY_SINK_EXP); + } } } } @@ -590,14 +559,6 @@ public class TubeSink extends AbstractSink implements Configurable { } } - private Map<String, String> getNewDimension(String otherKey, String value) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); - dimensions.put(otherKey, value); - return dimensions; - } - /** * Differentiate unpublished topic sets and publish them * attention: only append added topics @@ -695,14 +656,4 @@ public class TubeSink extends AbstractSink implements Configurable { } return tmpMasterAddr; } - - /** - * get metricItemSet - * - * @return the metricItemSet - */ - private DataProxyMetricItemSet getMetricItemSet() { - return metricItemSet; - } - } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java index 7ef6228f9..6510bce57 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java @@ -34,7 +34,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; -import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.EventStat; import org.apache.inlong.dataproxy.source.MsgType; import org.apache.inlong.dataproxy.utils.MessageUtils; @@ -175,6 +174,7 @@ public class PulsarClientService { // build and send message Map<String, String> proMap = MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion); + long startTime = System.currentTimeMillis(); if (es.isOrderMessage()) { String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY); try { @@ -183,8 +183,7 @@ public class PulsarClientService { .key(partitionKey) .value(event.getBody()) .send(); - sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es); - AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event); + sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es, startTime); forCallBackP.setCanUseSend(true); result = true; } catch (PulsarClientException ex) { @@ -204,9 +203,8 @@ public class PulsarClientService { .value(event.getBody()) .sendAsync() .thenAccept((msgId) -> { - AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event); forCallBackP.setCanUseSend(true); - sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es); + sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es, startTime); }) .exceptionally((e) -> { forCallBackP.setCanUseSend(false); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java index ce029759c..4d5b3a393 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java @@ -21,7 +21,7 @@ import org.apache.inlong.dataproxy.sink.EventStat; public interface SendMessageCallBack { - void handleMessageSendSuccess(String topic, Object msgId, EventStat es); + void handleMessageSendSuccess(String topic, Object msgId, EventStat es, long startTime); void handleMessageSendException(String topic, EventStat es, Object exception); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java index 12d68bf2a..8fed4bbc3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java @@ -34,10 +34,13 @@ import org.apache.flume.FlumeException; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurables; import org.apache.flume.source.AbstractSource; +import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor; +import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.common.monitor.MonitorIndex; import org.apache.inlong.common.monitor.MonitorIndexExt; +import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +86,8 @@ public abstract class BaseSource protected boolean customProcessor = false; + private DataProxyMetricItemSet metricItemSet; + /* * monitor */ @@ -157,6 +162,15 @@ public abstract class BaseSource FailoverChannelProcessorHolder.setChannelProcessor(newProcessor); } super.start(); + // initial metric item set + ConfigManager configManager = ConfigManager.getInstance(); + String clusterId = + configManager.getCommonProperties().getOrDefault( + ConfigConstants.PROXY_CLUSTER_NAME, + ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); + this.metricItemSet = + new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port)); + MetricRegister.register(metricItemSet); /* * init monitor logic */ @@ -298,7 +312,7 @@ public abstract class BaseSource ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance(); Class<? extends ChannelInitializer> clazz = (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName); - Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class, + Constructor ctor = clazz.getConstructor(BaseSource.class, ChannelGroup.class, String.class, ServiceDecoder.class, String.class, Integer.class, String.class, String.class, Boolean.class, Integer.class, Boolean.class, MonitorIndex.class, @@ -319,6 +333,14 @@ public abstract class BaseSource return fac; } + /** + * get metricItemSet + * @return the metricItemSet + */ + public DataProxyMetricItemSet getMetricItemSet() { + return metricItemSet; + } + public Context getContext() { return context; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index 6e92cb6ba..e4a436448 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -42,7 +42,6 @@ import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.AbstractSource; import org.apache.inlong.common.monitor.MonitorIndex; import org.apache.inlong.common.monitor.MonitorIndexExt; import org.apache.inlong.common.msg.InLongMsg; @@ -53,7 +52,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.exception.MessageIDException; -import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.utils.DateTimeUtils; @@ -82,7 +80,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { .on(AttributeConstants.SEPARATOR) .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - private AbstractSource source; + private BaseSource source; private final ChannelGroup allChannels; @@ -124,7 +122,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { * @param monitorIndexExt MonitorIndexExt * @param protocolType protocolType */ - public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder, + public ServerMessageHandler(BaseSource source, ServiceDecoder serviceDecoder, ChannelGroup allChannels, String topic, String attr, Boolean filterEmptyMsg, Integer maxCons, Boolean isCompressed, MonitorIndex monitorIndex, @@ -137,16 +135,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { if (null != attr) { this.defaultMXAttr = attr; } - this.filterEmptyMsg = filterEmptyMsg; this.isCompressed = isCompressed; this.maxConnections = maxCons; this.protocolType = protocolType; - if (source instanceof SimpleTcpSource) { - this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet(); - } else { - this.metricItemSet = new DataProxyMetricItemSet(this.toString()); - } + this.metricItemSet = source.getMetricItemSet(); this.monitorIndex = monitorIndex; this.monitorIndexExt = monitorIndexExt; } @@ -461,7 +454,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { try { processor.processEvent(event); monitorIndexExt.incrementAndGet("EVENT_SUCCESS"); - this.addMetric(true, data.length, event); + this.addStatistics(true, data.length, event); monitorIndex.addAndGet(strBuff.toString(), streamMsgCnt, 1, data.length, 0); strBuff.delete(0, strBuff.length()); @@ -469,7 +462,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { logger.error("Error writting to channel,data will discard.", ex); monitorIndexExt.incrementAndGet("EVENT_DROPPED"); monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt); - this.addMetric(false, data.length, event); + this.addStatistics(false, data.length, event); strBuff.delete(0, strBuff.length()); throw new ChannelException("ProcessEvent error can't write event to channel."); } @@ -548,7 +541,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null) { logger.error("Get null msg, just skip!"); - this.addMetric(false, 0, null); return; } ByteBuf cb = (ByteBuf) msg; @@ -558,7 +550,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { int len = cb.readableBytes(); if (len == 0 && this.filterEmptyMsg) { logger.warn("Get empty msg from {}, just skip!", strRemoteIP); - this.addMetric(false, 0, null); return; } // parse message @@ -569,12 +560,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { strRemoteIP, msgRcvTime, remoteChannel); if (resultMap == null || resultMap.isEmpty()) { logger.info("Parse message result is null, from {}", strRemoteIP); - this.addMetric(false, 0, null); return; } } catch (MessageIDException ex) { logger.error("MessageIDException ex = {}", ex); - this.addMetric(false, 0, null); throw new IOException(ex.getCause()); } // process message by msgType @@ -583,12 +572,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5); heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1}); remoteChannel.writeAndFlush(heartbeatBuffer); - this.addMetric(false, 0, null); return; } // process heart beat 8 if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) { - this.addMetric(false, 0, null); return; } // process data message @@ -616,10 +603,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } try { processor.processEvent(event); - this.addMetric(true, body.length, event); + this.addStatistics(true, body.length, event); } catch (Throwable ex) { logger.error("Error writing to controller,data will discard.", ex); - this.addMetric(false, body.length, event); + this.addStatistics(false, body.length, event); throw new ChannelException( "Process Controller Event error can't write event to channel."); } @@ -640,10 +627,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } try { processor.processEvent(event); - this.addMetric(true, body.length, event); + this.addStatistics(true, body.length, event); } catch (Throwable ex) { logger.error("Error writing to controller,data will discard.", ex); - this.addMetric(false, body.length, event); + this.addStatistics(false, body.length, event); throw new ChannelException( "Process Controller Event error can't write event to channel."); } @@ -677,31 +664,19 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } /** - * addMetric + * add statistics information * - * @param result - * @param size - * @param event + * @param isSuccess success or failure + * @param size message size + * @param event message event */ - private void addMetric(boolean result, long size, Event event) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName()); - dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName()); - DataProxyMetricItem.fillInlongId(event, dimensions); - DataProxyMetricItem.fillAuditFormatTime(event, dimensions); - DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - if (result) { - metricItem.readSuccessCount.incrementAndGet(); - metricItem.readSuccessSize.addAndGet(size); - try { - AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event); - } catch (Exception e) { - logger.error("add metric has exception e= {}", e); - } - } else { - metricItem.readFailCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(size); + private void addStatistics(boolean isSuccess, long size, Event event) { + if (event == null) { + return; + } + this.metricItemSet.fillSrcMetricItemsByEvent(event, isSuccess, size); + if (isSuccess) { + AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event); } } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java index 1e2d54405..b101b9651 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java @@ -48,7 +48,6 @@ import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.AbstractSource; import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.dataproxy.base.ProxyMessage; import org.apache.inlong.dataproxy.config.ConfigManager; @@ -87,7 +86,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); private static final ZoneId defZoneId = ZoneId.systemDefault(); - private AbstractSource source; + private BaseSource source; private final ChannelGroup allChannels; private int maxConnections = Integer.MAX_VALUE; private boolean filterEmptyMsg = false; @@ -112,7 +111,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { * @param isCompressed * @param protocolType */ - public SimpleMessageHandler(AbstractSource source, ServiceDecoder serProcessor, + public SimpleMessageHandler(BaseSource source, ServiceDecoder serProcessor, ChannelGroup allChannels, String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength, Integer maxCons, @@ -130,11 +129,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { this.isCompressed = isCompressed; this.maxConnections = maxCons; this.protocolType = protocolType; - if (source instanceof SimpleTcpSource) { - this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet(); - } else { - this.metricItemSet = new DataProxyMetricItemSet(this.toString()); - } + this.metricItemSet = source.getMetricItemSet(); } private String getRemoteIp(Channel channel) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java index 1f1a86e66..734fd95dd 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java @@ -39,9 +39,7 @@ import org.apache.commons.io.IOUtils; import org.apache.flume.Context; import org.apache.flume.EventDrivenSource; import org.apache.flume.conf.Configurable; -import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.dataproxy.consts.ConfigConstants; -import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.utils.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,8 +81,6 @@ public class SimpleTcpSource extends BaseSource private ServerBootstrap bootstrap; - private DataProxyMetricItemSet metricItemSet; - public SimpleTcpSource() { super(); @@ -182,8 +178,6 @@ public class SimpleTcpSource extends BaseSource @Override public synchronized void startSource() { logger.info("start " + this.getName()); - this.metricItemSet = new DataProxyMetricItemSet(this.getName()); - MetricRegister.register(metricItemSet); checkBlackListThread = new CheckBlackListThread(); checkBlackListThread.start(); // ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT); @@ -270,14 +264,6 @@ public class SimpleTcpSource extends BaseSource } } - /** - * get metricItemSet - * @return the metricItemSet - */ - public DataProxyMetricItemSet getMetricItemSet() { - return metricItemSet; - } - @Override public String getProtocolName() { return "tcp"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java index 5c17451d3..be42b4494 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java @@ -50,6 +50,7 @@ public class SimpleUdpSource @Override public void startSource() { // setup Netty server + logger.info("start " + this.getName()); bootstrap = new Bootstrap(); logger.info("Set max workers : {} ;",maxThreads); bootstrap.channel(NioDatagramChannel.class);