This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 9cf1a8fdc [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929) 9cf1a8fdc is described below commit 9cf1a8fdcaa6296a84b0f56164c90f2f18346610 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Sep 19 19:26:38 2022 +0800 [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items(addendum) (#5929) --- .../inlong/dataproxy/sink/common/TubeUtils.java | 6 +- .../dataproxy/source/ServerMessageFactory.java | 7 +-- .../dataproxy/source/SimpleMessageHandler.java | 67 ++++++++++++---------- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java index 16c10e384..c1ed4b44d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java @@ -61,9 +61,9 @@ public class TubeUtils { Message message = new Message(topicName, event.getBody()); String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER); if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) { - String streamId = headers.get(Constants.INLONG_STREAM_ID); - message.putSystemHeader(streamId, - headers.get(ConfigConstants.PKG_TIME_KEY)); + long dataTimeL = Long.parseLong(headers.get(ConfigConstants.PKG_TIME_KEY)); + message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID), + DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL)); } else { long dataTimeL = Long.parseLong(headers.get(AttributeConstants.DATA_TIME)); message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID), diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java index 69e891396..93a2a8c95 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java @@ -26,7 +26,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import java.lang.reflect.Constructor; import java.util.concurrent.TimeUnit; import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.source.AbstractSource; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.common.monitor.MonitorIndex; import org.apache.inlong.common.monitor.MonitorIndexExt; @@ -46,7 +45,7 @@ public class ServerMessageFactory private static int MSG_LENGTH_LEN = 4; - private AbstractSource source; + private BaseSource source; private ChannelProcessor processor; @@ -93,7 +92,7 @@ public class ServerMessageFactory * @param monitorIndexExt * @param name */ - public ServerMessageFactory(AbstractSource source, ChannelGroup allChannels, String protocol, + public ServerMessageFactory(BaseSource source, ChannelGroup allChannels, String protocol, ServiceDecoder serviceDecoder, String messageHandlerName, Integer maxMsgLength, String topic, String attr, Boolean filterEmptyMsg, Integer maxCons, Boolean isCompressed, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt, @@ -135,7 +134,7 @@ public class ServerMessageFactory .forName(messageHandlerName); Constructor<?> ctor = clazz.getConstructor( - AbstractSource.class, ServiceDecoder.class, ChannelGroup.class, + BaseSource.class, ServiceDecoder.class, ChannelGroup.class, String.class, String.class, Boolean.class, Integer.class, Boolean.class, MonitorIndex.class, MonitorIndexExt.class, String.class); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java index b101b9651..ea2691980 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java @@ -22,26 +22,24 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID; import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.group.ChannelGroup; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.group.ChannelGroup; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.ChannelException; @@ -62,9 +60,6 @@ import org.apache.inlong.dataproxy.utils.InLongMsgVer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; - /** * Server message handler * @@ -82,10 +77,20 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { .on(AttributeConstants.SEPARATOR) .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - private static final DateTimeFormatter DATE_FORMATTER - = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); - private static final ZoneId defZoneId = ZoneId.systemDefault(); + private static final ThreadLocal<SimpleDateFormat> dateFormator = new ThreadLocal<SimpleDateFormat>() { + + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMddHHmm"); + } + }; + private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyyMMddHHmmss"); + } + }; private BaseSource source; private final ChannelGroup allChannels; private int maxConnections = Integer.MAX_VALUE; @@ -381,18 +386,20 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData()); } } - // get msgTime - long currTIme = System.currentTimeMillis(); - String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME); - long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme); - LocalDateTime localDateTime = - LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId); - String pkgTimeStr = DATE_FORMATTER.format(localDateTime); - headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis)); - headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); - // get data time - long dtTime = NumberUtils.toLong( - commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme); + + long pkgTimeInMillis = inLongMsg.getCreatetime(); + String pkgTimeStr = dateFormator.get().format(pkgTimeInMillis); + + if (inLongMsgVer == 4) { + if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) { + pkgTimeStr = commonAttrMap.get(ConfigConstants.PKG_TIME_KEY); + } else { + pkgTimeStr = dateFormator.get().format(System.currentTimeMillis()); + } + } + + long dtTime = NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME), + System.currentTimeMillis()); headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime)); headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey()); @@ -415,6 +422,8 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString()); } + headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); + // process proxy message list this.processProxyMessageList(headers, streamIdEntry.getValue()); }