This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new ac9c9de90 [INLONG-5880][DataProxy] Add data reporting time process logic (#5882) ac9c9de90 is described below commit ac9c9de90fca19b4091a6def4803527cf8ad1936 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Sep 14 11:26:21 2022 +0800 [INLONG-5880][DataProxy] Add data reporting time process logic (#5882) --- .../dataproxy/http/SimpleMessageHandler.java | 9 +++- .../inlong/dataproxy/metrics/audit/AuditUtils.java | 8 +++- .../dataproxy/source/DefaultServiceDecoder.java | 6 +-- .../dataproxy/source/ServerMessageHandler.java | 43 +++++++++--------- .../dataproxy/source/SimpleMessageHandler.java | 51 +++++++++------------- 5 files changed, 60 insertions(+), 57 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index 0eeb614da..1969f7cea 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -28,6 +28,7 @@ import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; @@ -44,6 +45,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.ServiceDecoder; +import org.apache.inlong.dataproxy.utils.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,9 +138,14 @@ public class SimpleMessageHandler implements MessageHandler { headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount); byte[] data = inLongMsg.buildArray(); headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length)); + // add msgTime if not existed + long currTIme = System.currentTimeMillis(); + String strMsgTime = request.getParameter(Constants.HEADER_KEY_MSG_TIME); + long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme); LocalDateTime localDateTime = - LocalDateTime.ofInstant(Instant.ofEpochMilli(inLongMsg.getCreatetime()), defZoneId); + LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId); String pkgTime = DATE_FORMATTER.format(localDateTime); + headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis)); headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime); Event event = EventBuilder.withBody(data, headers); inLongMsg.reset(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java index f2bbbe52e..f7e9ae6b6 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java @@ -25,6 +25,7 @@ import org.apache.inlong.audit.AuditImp; import org.apache.inlong.audit.util.AuditConfig; import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder; import org.apache.inlong.dataproxy.consts.AttributeConstants; +import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.utils.Constants; @@ -88,7 +89,12 @@ public class AuditUtils { String inlongGroupId = DataProxyMetricItem.getInlongGroupId(headers); String inlongStreamId = DataProxyMetricItem.getInlongStreamId(headers); long logTime = getLogTime(headers); - AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length); + long msgCount = 1L; + if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) { + msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); + } + AuditImp.getInstance().add(auditID, inlongGroupId, + inlongStreamId, logTime, msgCount, event.getBody().length); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java index cb4497cbe..355dc7a2e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java @@ -26,6 +26,7 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.exception.ErrorCode; import org.apache.inlong.dataproxy.exception.MessageIDException; +import org.apache.inlong.dataproxy.utils.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; @@ -112,9 +113,8 @@ public class DefaultServiceDecoder implements ServiceDecoder { long uniq, long dataTime, int msgCount) { commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq)); String time = ""; - if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) { - time = commonAttrMap - .get(ConfigConstants.PKG_TIME_KEY); + if (commonAttrMap.containsKey(Constants.HEADER_KEY_MSG_TIME)) { + time = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME); } else { time = String.valueOf(dataTime); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index d79143260..9d3b32c4e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; @@ -44,6 +45,7 @@ import org.apache.inlong.dataproxy.exception.MessageIDException; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; +import org.apache.inlong.dataproxy.utils.Constants; import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.inlong.dataproxy.utils.NetworkUtils; import org.slf4j.Logger; @@ -54,6 +56,10 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -85,8 +91,9 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { .on(AttributeConstants.SEPARATOR) .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - private static final ThreadLocal<SimpleDateFormat> dateFormator = - ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmm")); + private static final DateTimeFormatter DATE_FORMATTER + = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); + private static final ZoneId defZoneId = ZoneId.systemDefault(); private AbstractSource source; @@ -416,23 +423,19 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData()); } } - - long pkgTimeInMillis = inLongMsg.getCreatetime(); - String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis); - - if (inLongMsgVer == 4) { - if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) { - pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY); - } else { - pkgTimeStr = dateFormator.get().format(System.currentTimeMillis()); - } - } - - if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) { - headers.put(AttributeConstants.DATA_TIME, commonAttrMap.get(AttributeConstants.DATA_TIME)); - } else { - headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis())); - } + // get msgTime + long currTIme = System.currentTimeMillis(); + String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME); + long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme); + LocalDateTime localDateTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId); + String pkgTimeStr = DATE_FORMATTER.format(localDateTime); + headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis)); + headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); + // get data time + long dtTime = NumberUtils.toLong( + commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme); + headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime)); if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) { headers.put(AttributeConstants.MESSAGE_IS_ACK, "false"); @@ -474,8 +477,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { .append(SEPARATOR).append(sequenceId); headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString()); } - - headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); Event event = EventBuilder.withBody(data, headers); String orderType = "non-order"; if (MessageUtils.isSyncSendForOrder(event)) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java index 3cc105aa7..38ca9ee32 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java @@ -32,7 +32,10 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -79,20 +82,10 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { .on(AttributeConstants.SEPARATOR) .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() { + private static final DateTimeFormatter DATE_FORMATTER + = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); + private static final ZoneId defZoneId = ZoneId.systemDefault(); - @Override - protected SimpleDateFormat initialValue() { - return new SimpleDateFormat("yyyyMMddHHmm"); - } - }; - private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() { - - @Override - protected SimpleDateFormat initialValue() { - return new SimpleDateFormat("yyyyMMddHHmmss"); - } - }; private AbstractSource source; private final ChannelGroup allChannels; private int maxConnections = Integer.MAX_VALUE; @@ -392,20 +385,18 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData()); } } - - long pkgTimeInMillis = inLongMsg.getCreatetime(); - String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis); - - if (inLongMsgVer == 4) { - if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) { - pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY); - } else { - pkgTimeStr = dateFormator.get().format(System.currentTimeMillis()); - } - } - - long dtTime = NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME), - System.currentTimeMillis()); + // get msgTime + long currTIme = System.currentTimeMillis(); + String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME); + long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme); + LocalDateTime localDateTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId); + String pkgTimeStr = DATE_FORMATTER.format(localDateTime); + headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis)); + headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); + // get data time + long dtTime = NumberUtils.toLong( + commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme); headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime)); headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey()); @@ -428,8 +419,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString()); } - headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); - // process proxy message list this.processProxyMessageList(headers, streamIdEntry.getValue()); } @@ -474,7 +463,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId()); headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId()); headers.put(Constants.TOPIC, proxyMessage.getTopic()); - headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME)); + //headers.put(Constants.HEADER_KEY_MSG_TIME, commonHeaders.get(AttributeConstants.DATA_TIME)); headers.put(Constants.HEADER_KEY_SOURCE_IP, commonHeaders.get(AttributeConstants.NODE_IP)); Event event = EventBuilder.withBody(proxyMessage.getData(), headers); return event;