This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new bd30f354f [INLONG-5917][DataProxy] Optimize TubeSink class (#5920) bd30f354f is described below commit bd30f354fa2d69e45a7038057680d8dc774aa26c Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Sep 17 15:10:37 2022 +0800 [INLONG-5917][DataProxy] Optimize TubeSink class (#5920) --- .../inlong/dataproxy/consts/ConfigConstants.java | 2 + .../dataproxy/http/SimpleMessageHandler.java | 2 + .../inlong/dataproxy/metrics/audit/AuditUtils.java | 23 +++- .../dataproxy/sink/SimpleMessageTubeSink.java | 3 +- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 136 +++++++-------------- .../inlong/dataproxy/sink/common/TubeUtils.java | 36 +++--- .../dataproxy/source/ServerMessageHandler.java | 2 + .../dataproxy/source/SimpleMessageHandler.java | 4 +- .../inlong/dataproxy/utils/InLongMsgVer.java | 54 ++++++++ .../inlong/dataproxy/utils/MessageUtils.java | 34 ++++++ 10 files changed, 178 insertions(+), 118 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java index ae91944d5..f89bc8fe1 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java @@ -75,6 +75,8 @@ public class ConfigConstants { public static final String TOPIC_KEY = "topic"; public static final String REMOTE_IP_KEY = "srcIp"; + public static final String DATAPROXY_IP_KEY = "dpIp"; + public static final String MSG_ENCODE_VER = "msgEnType"; public static final String REMOTE_IDC_KEY = "idc"; public static final String MSG_COUNTER_KEY = "msgcnt"; public static final String PKG_COUNTER_KEY = "pkgcnt"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index 0689039e1..16bb5c917 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -41,6 +41,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.ServiceDecoder; import org.apache.inlong.dataproxy.utils.DateTimeUtils; +import org.apache.inlong.dataproxy.utils.InLongMsgVer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,6 +153,7 @@ public class SimpleMessageHandler implements MessageHandler { headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP); headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE); headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount); + headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName()); byte[] data = inLongMsg.buildArray(); headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); Event event = EventBuilder.withBody(data, headers); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java index f7e9ae6b6..24c251626 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.dataproxy.metrics.audit; +import java.util.HashSet; +import java.util.Map; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.StringUtils; @@ -28,9 +30,7 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.utils.Constants; - -import java.util.HashSet; -import java.util.Map; +import org.apache.inlong.dataproxy.utils.InLongMsgVer; /** * @@ -79,13 +79,17 @@ public class AuditUtils { /** * add - * + * * @param auditID * @param event */ public static void add(int auditID, Event event) { - if (IS_AUDIT && event != null) { - Map<String, String> headers = event.getHeaders(); + if (!IS_AUDIT || event == null) { + return; + } + Map<String, String> headers = event.getHeaders(); + String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER); + if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) { String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers); String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers); long logTime = getLogTime(headers); @@ -95,6 +99,13 @@ public class AuditUtils { } AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, msgCount, event.getBody().length); + } else { + String groupId = headers.get(AttributeConstants.GROUP_ID); + String streamId = headers.get(AttributeConstants.STREAM_ID); + long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME)); + long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY)); + AuditImp.getInstance().add(auditID, groupId, + streamId, dataTime, msgCount, event.getBody().length); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java index fd4788b09..c8c670e70 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java @@ -353,8 +353,7 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable logger.info("{} agent package {} existed,just discard.", getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID)); } else { - producer.sendMessage(TubeUtils.buildMessage( - topic, event, true), new MyCallback(es)); + producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es)); flag.set(true); } illegalTopicMap.remove(topic); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 2d1374d37..713cdfd72 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -29,10 +29,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Preconditions; import org.apache.commons.collections.SetUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -58,7 +56,7 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler; import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder; import org.apache.inlong.dataproxy.sink.common.TubeUtils; -import org.apache.inlong.dataproxy.utils.Constants; +import org.apache.inlong.dataproxy.utils.DateTimeUtils; import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder; import org.apache.inlong.dataproxy.utils.NetworkUtils; import org.apache.inlong.tubemq.client.exception.TubeClientException; @@ -74,7 +72,6 @@ public class TubeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(TubeSink.class); private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler(); private TubeProducerHolder producerHolder = null; - private static final String TOPIC = "topic"; private volatile boolean canTake = false; private volatile boolean canSend = false; private volatile boolean isOverFlow = false; @@ -274,12 +271,8 @@ public class TubeSink extends AbstractSink implements Configurable { diskRateLimiter.acquire(event.getBody().length); } Map<String, String> dimensions; - if (event.getHeaders().containsKey(TOPIC)) { - dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, - event.getHeaders().get(TOPIC)); - } else { - dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, ""); - } + dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, + event.getHeaders().getOrDefault(ConfigConstants.TOPIC_KEY, "")); if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) { tx.commit(); cachedMsgCnt.incrementAndGet(); @@ -333,20 +326,8 @@ public class TubeSink extends AbstractSink implements Configurable { isOverFlow = false; Thread.sleep(30); } - event = null; - topic = null; // get event from queues - if (!resendQueue.isEmpty()) { - es = resendQueue.poll(); - if (es == null) { - continue; - } - resendMsgCnt.decrementAndGet(); - event = es.getEvent(); - if (event.getHeaders().containsKey(TOPIC)) { - topic = event.getHeaders().get(TOPIC); - } - } else { + if (resendQueue.isEmpty()) { event = eventQueue.poll(2000, TimeUnit.MILLISECONDS); if (event == null) { if (!canTake && takenMsgCnt.get() <= 0) { @@ -358,10 +339,15 @@ public class TubeSink extends AbstractSink implements Configurable { cachedMsgCnt.decrementAndGet(); takenMsgCnt.incrementAndGet(); es = new EventStat(event); - if (event.getHeaders().containsKey(TOPIC)) { - topic = event.getHeaders().get(TOPIC); + } else { + es = resendQueue.poll(); + if (es == null) { + continue; } + resendMsgCnt.decrementAndGet(); + event = es.getEvent(); } + topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); // valid event status if (StringUtils.isBlank(topic)) { blankTopicDiscardMsgCnt.incrementAndGet(); @@ -425,8 +411,7 @@ public class TubeSink extends AbstractSink implements Configurable { event.getHeaders().get(ConfigConstants.SEQUENCE_ID)); return false; } else { - producer.sendMessage(TubeUtils.buildMessage( - topic, event, false), new MyCallback(es)); + producer.sendMessage(TubeUtils.buildMessage(topic, event), new MyCallback(es)); inflightMsgCnt.incrementAndGet(); return true; } @@ -491,7 +476,8 @@ public class TubeSink extends AbstractSink implements Configurable { Map<String, String> dimensions = new HashMap<>(); dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, TubeSink.this.getName()); dimensions.put(DataProxyMetricItem.KEY_SINK_ID, TubeSink.this.getName()); - dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, "")); + dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, + event.getHeaders().get(ConfigConstants.TOPIC_KEY)); DataProxyMetricItem.fillInlongId(event, dimensions); DataProxyMetricItem.fillAuditFormatTime(event, dimensions); DataProxyMetricItem metricItem = TubeSink.this.metricItemSet.findMetricItem(dimensions); @@ -501,14 +487,13 @@ public class TubeSink extends AbstractSink implements Configurable { AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event); if (sendTime > 0) { long currentTime = System.currentTimeMillis(); - long msgTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME), - sendTime); - long sinkDuration = currentTime - sendTime; - long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime); - long wholeDuration = currentTime - msgTime; - metricItem.sinkDuration.addAndGet(sinkDuration); - metricItem.nodeDuration.addAndGet(nodeDuration); - metricItem.wholeDuration.addAndGet(wholeDuration); + long msgDataTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.DATA_TIME)); + long msgRcvTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.RCV_TIME)); + metricItem.sinkDuration.addAndGet(currentTime - sendTime); + metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL); + metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL); } } else { metricItem.sendFailCount.incrementAndGet(); @@ -517,60 +502,31 @@ public class TubeSink extends AbstractSink implements Configurable { } private void editStatistic(final Event event, boolean isSuccess) { - String topic = ""; - String streamId = ""; - String nodeIp; - if (event != null) { - if (event.getHeaders().containsKey(TOPIC)) { - topic = event.getHeaders().get(TOPIC); - } - if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) { - streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); - } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) { - streamId = event.getHeaders().get(AttributeConstants.INAME); - } - // Compatible agent - if (event.getHeaders().containsKey("ip")) { - event.getHeaders().put(ConfigConstants.REMOTE_IP_KEY, event.getHeaders().get("ip")); - event.getHeaders().remove("ip"); - } - // Compatible agent - if (event.getHeaders().containsKey("time")) { - event.getHeaders().put(AttributeConstants.DATA_TIME, event.getHeaders().get("time")); - event.getHeaders().remove("time"); - } - if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IP_KEY)) { - nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); - if (event.getHeaders().containsKey(ConfigConstants.REMOTE_IDC_KEY)) { - if (nodeIp != null) { - nodeIp = nodeIp.split(":")[0]; - } - long msgCounterL = 1L; - // msg counter - if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) { - msgCounterL = Integer.parseInt(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); - } - StringBuilder newBase = new StringBuilder(); - newBase.append(getName()).append(SEP_HASHTAG).append(topic) - .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG) - .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp()) - .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG) - .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); - long messageSize = event.getBody().length; - if (event.getHeaders().get(ConfigConstants.TOTAL_LEN) != null) { - messageSize = Long.parseLong(event.getHeaders().get(ConfigConstants.TOTAL_LEN)); - } - if (statIntervalSec > 0) { - if (isSuccess) { - monitorIndex.addAndGet(new String(newBase), - (int) msgCounterL, 1, messageSize, 0); - } else { - monitorIndex.addAndGet(new String(newBase), - 0, 0, 0, (int) msgCounterL); - } - } - } - } + if (event == null || statIntervalSec <= 0) { + return; + } + // get statistic items + String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY); + String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); + String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY); + int intMsgCnt = Integer.parseInt( + event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); + long dataTimeL = Long.parseLong( + event.getHeaders().get(AttributeConstants.DATA_TIME)); + // build statistic key + StringBuilder newBase = new StringBuilder(512); + newBase.append(getName()).append(SEP_HASHTAG).append(topic) + .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG) + .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp()) + .append(SEP_HASHTAG).append("non-order").append(SEP_HASHTAG) + .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL)); + // count data + if (isSuccess) { + monitorIndex.addAndGet(newBase.toString(), + intMsgCnt, 1, event.getBody().length, 0); + } else { + monitorIndex.addAndGet(newBase.toString(), + 0, 0, 0, intMsgCnt); } } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java index 6a67978f8..16c10e384 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java @@ -23,7 +23,9 @@ import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.utils.Constants; -import org.apache.inlong.dataproxy.utils.NetworkUtils; +import org.apache.inlong.dataproxy.utils.DateTimeUtils; +import org.apache.inlong.dataproxy.utils.InLongMsgVer; +import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.inlong.tubemq.client.config.TubeClientConfig; import org.apache.inlong.tubemq.corebase.Message; @@ -52,28 +54,24 @@ public class TubeUtils { * * @param topicName the topic name of message * @param event the DataProxy event - * @param addExtraAttrs whether to add extra attributes * @return the message object */ - public static Message buildMessage(String topicName, - Event event, boolean addExtraAttrs) { + public static Message buildMessage(String topicName, Event event) { + Map<String, String> headers = event.getHeaders(); Message message = new Message(topicName, event.getBody()); - message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp()); - String streamId = ""; - if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) { - streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); - } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) { - streamId = event.getHeaders().get(AttributeConstants.INAME); + String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER); + if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) { + String streamId = headers.get(Constants.INLONG_STREAM_ID); + message.putSystemHeader(streamId, + headers.get(ConfigConstants.PKG_TIME_KEY)); + } else { + long dataTimeL = Long.parseLong(headers.get(AttributeConstants.DATA_TIME)); + message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID), + DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL)); } - message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); - if (addExtraAttrs) { - // common attributes - Map<String, String> headers = event.getHeaders(); - message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID)); - message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID)); - message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC)); - message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME)); - message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP)); + Map<String, String> extraAttrMap = MessageUtils.getXfsAttrs(headers, pkgVersion); + for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) { + message.setAttrKeyVal(entry.getKey(), entry.getValue()); } return message; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index ab9055ad6..cccead3e3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -56,6 +56,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.utils.DateTimeUtils; +import org.apache.inlong.dataproxy.utils.InLongMsgVer; import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.inlong.dataproxy.utils.NetworkUtils; import org.slf4j.Logger; @@ -412,6 +413,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE); headers.put(ConfigConstants.MSG_COUNTER_KEY, commonAttrMap.get(AttributeConstants.MESSAGE_COUNT)); + headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName()); headers.put(AttributeConstants.RCV_TIME, commonAttrMap.get(AttributeConstants.RCV_TIME)); // add extra key-value information diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java index 71372da18..1e2d54405 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java @@ -59,6 +59,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.utils.Constants; +import org.apache.inlong.dataproxy.utils.InLongMsgVer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -463,8 +464,9 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId()); headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId()); headers.put(Constants.TOPIC, proxyMessage.getTopic()); - //headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME)); + headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME)); headers.put(Constants.HEADER_KEY_SOURCE_IP, commonHeaders.get(AttributeConstants.NODE_IP)); + headers.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V1.getName()); Event event = EventBuilder.withBody(proxyMessage.getData(), headers); return event; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java new file mode 100644 index 000000000..626398df6 --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.utils; + +public enum InLongMsgVer { + INLONG_V0(0, "V0", "The inlong-msg V0 format"), + INLONG_V1(1, "V1", "The inlong-msg V1 format"); + + InLongMsgVer(int id, String name, String desc) { + this.id = id; + this.name = name; + this.desc = desc; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getDesc() { + return desc; + } + + public static InLongMsgVer valueOf(int value) { + for (InLongMsgVer inLongMsgVer : InLongMsgVer.values()) { + if (inLongMsgVer.getId() == value) { + return inLongMsgVer; + } + } + return INLONG_V0; + } + + private final int id; + private final String name; + private final String desc; +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java index 4e9fd2395..d8835658d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java @@ -18,10 +18,13 @@ package org.apache.inlong.dataproxy.utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.flume.Event; +import org.apache.inlong.common.util.NetworkUtils; import org.apache.inlong.dataproxy.consts.AttributeConstants; +import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.source.MsgType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,4 +110,35 @@ public class MessageUtils { return topic; } + public static Map<String, String> getXfsAttrs(Map<String, String> headers, String pkgVersion) { + // common attributes + Map<String, String> attrs = new HashMap<>(); + attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion); + if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) { + attrs.put("dataproxyip", NetworkUtils.getLocalIp()); + attrs.put(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID)); + attrs.put(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID)); + attrs.put(Constants.TOPIC, headers.get(Constants.TOPIC)); + attrs.put(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME)); + attrs.put(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP)); + } else { + // + attrs.put(Constants.INLONG_GROUP_ID, + headers.get(AttributeConstants.GROUP_ID)); + attrs.put(Constants.INLONG_STREAM_ID, + headers.get(AttributeConstants.STREAM_ID)); + attrs.put(Constants.TOPIC, + headers.get(ConfigConstants.TOPIC_KEY)); + attrs.put(Constants.HEADER_KEY_MSG_TIME, + headers.get(AttributeConstants.DATA_TIME)); + attrs.put(Constants.HEADER_KEY_SOURCE_IP, + headers.get(ConfigConstants.REMOTE_IP_KEY)); + attrs.put(Constants.HEADER_KEY_SOURCE_TIME, + headers.get(AttributeConstants.RCV_TIME)); + attrs.put(ConfigConstants.DATAPROXY_IP_KEY, + NetworkUtils.getLocalIp()); + } + return attrs; + } + }