This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 07e429e8f [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916) 07e429e8f is described below commit 07e429e8f0d5c99d3779eedf67afc779a58bbb2a Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Sep 16 17:13:21 2022 +0800 [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916) --- .../dataproxy/http/SimpleMessageHandler.java | 1 - .../dataproxy/source/DefaultServiceDecoder.java | 253 ++++++------ .../dataproxy/source/ServerMessageHandler.java | 436 +++++++++------------ .../inlong/dataproxy/source/ServiceDecoder.java | 11 +- .../dataproxy/source/SimpleMessageHandler.java | 6 +- 5 files changed, 313 insertions(+), 394 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index 3f1bf425c..0689039e1 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -153,7 +153,6 @@ public class SimpleMessageHandler implements MessageHandler { headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE); headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount); byte[] data = inLongMsg.buildArray(); - headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length)); headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); Event event = EventBuilder.withBody(data, headers); inLongMsg.reset(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java index 355dc7a2e..11ac95e4e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java @@ -18,28 +18,27 @@ package org.apache.inlong.dataproxy.source; import com.google.common.base.Splitter; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.inlong.dataproxy.base.ProxyMessage; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.exception.ErrorCode; import org.apache.inlong.dataproxy.exception.MessageIDException; -import org.apache.inlong.dataproxy.utils.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class DefaultServiceDecoder implements ServiceDecoder { private static final int BIN_MSG_TOTALLEN_OFFSET = 0; @@ -109,33 +108,19 @@ public class DefaultServiceDecoder implements ServiceDecoder { return resultMap; } - private void handleDateTime(Map<String, String> commonAttrMap, Channel channel, - long uniq, long dataTime, int msgCount) { + private void handleDateTime(Map<String, String> commonAttrMap, long uniq, + long dataTime, int msgCount, String strRemoteIP, + long msgRcvTime) { commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq)); - String time = ""; - if (commonAttrMap.containsKey(Constants.HEADER_KEY_MSG_TIME)) { - time = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME); - } else { - time = String.valueOf(dataTime); - } - StringBuilder sidBuilder = new StringBuilder(); - /* - * udp need use msgEvent get remote address - */ - String remoteAddress = ""; - if (channel != null && channel.remoteAddress() != null) { - remoteAddress = channel.remoteAddress().toString(); - } - sidBuilder.append(remoteAddress).append("#").append(time) - .append("#").append(uniq); - commonAttrMap.put(AttributeConstants.SEQUENCE_ID, new String(sidBuilder)); - - // datetime from sdk + String time = String.valueOf(dataTime); + commonAttrMap.put(AttributeConstants.SEQUENCE_ID, + new StringBuilder(256).append(strRemoteIP) + .append("#").append(time).append("#").append(uniq).toString()); + // dt from sdk commonAttrMap.put(AttributeConstants.DATA_TIME, String.valueOf(dataTime)); commonAttrMap - .put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis())); - commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, - String.valueOf(msgCount != 0 ? msgCount : 1)); + .put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); + commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(msgCount)); } private boolean handleExtMap(Map<String, String> commonAttrMap, ByteBuf cb, @@ -165,18 +150,18 @@ public class DefaultServiceDecoder implements ServiceDecoder { } private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int extendField, - int msgHeadPos, int totalDataLen, int attrLen, String strAttr, int bodyLen) { + int msgHeadPos, int totalDataLen, int attrLen, + String strAttr, int bodyLen, long msgRcvTime) { // whether enable trace - boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1); ByteBuffer dataBuf; + boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1); if (!enableTrace) { dataBuf = ByteBuffer.allocate(totalDataLen + BIN_MSG_TOTALLEN_SIZE); cb.getBytes(msgHeadPos, dataBuf.array(), 0, totalDataLen + BIN_MSG_TOTALLEN_SIZE); } else { - String traceInfo; + // get local address String strNode2Ip = null; - SocketAddress loacalSockAddr = channel.localAddress(); if (null != loacalSockAddr) { strNode2Ip = loacalSockAddr.toString(); @@ -187,11 +172,9 @@ public class DefaultServiceDecoder implements ServiceDecoder { strNode2Ip, loacalSockAddr); } } - - traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + System.currentTimeMillis(); - + // build trace information int newTotalLen = 0; - + String traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + msgRcvTime; if (attrLen != 0) { newTotalLen = totalDataLen + traceInfo.length() + "&".length(); strAttr = strAttr + "&" + traceInfo; @@ -199,7 +182,7 @@ public class DefaultServiceDecoder implements ServiceDecoder { newTotalLen = totalDataLen + traceInfo.length(); strAttr = traceInfo; } - + // build trace information bytes dataBuf = ByteBuffer.allocate(newTotalLen + BIN_MSG_TOTALLEN_SIZE); cb.getBytes(msgHeadPos, dataBuf.array(), 0, bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE @@ -207,11 +190,9 @@ public class DefaultServiceDecoder implements ServiceDecoder { dataBuf.putShort( bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE - BIN_MSG_MAGIC_SIZE), (short) strAttr.length()); - System.arraycopy(strAttr.getBytes(StandardCharsets.UTF_8), 0, dataBuf.array(), bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_MAGIC_SIZE), strAttr.length()); - dataBuf.putInt(0, newTotalLen); dataBuf.putShort(newTotalLen + BIN_MSG_TOTALLEN_SIZE - BIN_MSG_MAGIC_SIZE, (short) 0xee01); @@ -223,57 +204,51 @@ public class DefaultServiceDecoder implements ServiceDecoder { * extract bin data, message type is 7 */ private Map<String, Object> extractNewBinData(Map<String, Object> resultMap, - ByteBuf cb, Channel channel, - int totalDataLen, MsgType msgType) throws Exception { + ByteBuf cb, Channel channel, + int totalDataLen, MsgType msgType, + String strRemoteIP, + long msgRcvTime) throws Exception { int msgHeadPos = cb.readerIndex() - 5; - + // get body length int bodyLen = cb.getInt(msgHeadPos + BIN_MSG_BODYLEN_OFFSET); + if (bodyLen == 0) { + throw new Exception( + "Error msg, bodyLen is empty; connection info:" + strRemoteIP); + } + // get attribute length int attrLen = cb.getShort(msgHeadPos + BIN_MSG_BODY_OFFSET + bodyLen); + // get msg magic int msgMagic = cb.getUnsignedShort(msgHeadPos + BIN_MSG_BODY_OFFSET + bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen); - - if (bodyLen == 0) { - throw new Exception(new Throwable("err msg, bodyLen is empty" - + ";Connection info:" + channel.toString())); - } - - if ((totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_MSG_FORMAT_SIZE)) - || (msgMagic != BIN_MSG_MAGIC)) { - throw new Exception(new Throwable( - "err msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen=" - + bodyLen + ",totalDataLen=" + totalDataLen + ",attrLen=" + attrLen + if ((msgMagic != BIN_MSG_MAGIC) + || (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_MSG_FORMAT_SIZE))) { + throw new Exception( + "Error msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen=" + + bodyLen + ",attrLen=" + attrLen + ",totalDataLen=" + totalDataLen + ";magic=" + Integer.toHexString(msgMagic) - + ";Connection info:" + channel.toString())); + + "; connection info:" + strRemoteIP); } - + // read data from ByteBuf int groupIdNum = cb.readUnsignedShort(); int streamIdNum = cb.readUnsignedShort(); final int extendField = cb.readUnsignedShort(); long dataTime = cb.readUnsignedInt(); + dataTime = dataTime * 1000; int msgCount = cb.readUnsignedShort(); + msgCount = (msgCount != 0) ? msgCount : 1; long uniq = cb.readUnsignedInt(); - - dataTime = dataTime * 1000; - Map<String, String> commonAttrMap = new HashMap<String, String>(); cb.skipBytes(BIN_MSG_BODYLEN_SIZE + bodyLen + BIN_MSG_ATTRLEN_SIZE); - resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap); - - resultMap.put(ConfigConstants.EXTRA_ATTR, ((extendField & 0x1) == 0x1) ? "true" : "false"); - // read body data byte[] bodyData = new byte[bodyLen]; cb.getBytes(msgHeadPos + BIN_MSG_BODY_OFFSET, bodyData, 0, bodyLen); - resultMap.put(ConfigConstants.DECODER_BODY, bodyData); - // read attr and write to map. String strAttr = null; + Map<String, String> commonAttrMap = new HashMap<>(); if (attrLen != 0) { byte[] attrData = new byte[attrLen]; cb.readBytes(attrData, 0, attrLen); strAttr = new String(attrData, StandardCharsets.UTF_8); - LOG.debug("strAttr = {}, length = {}", strAttr, strAttr.length()); resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr); - try { commonAttrMap.putAll(mapSplitter.split(strAttr)); } catch (Exception e) { @@ -281,26 +256,23 @@ public class DefaultServiceDecoder implements ServiceDecoder { throw new MessageIDException(uniq, ErrorCode.ATTR_ERROR, new Throwable("[Parse Error]new six segment protocol ,attr is " - + strAttr + " , channel info:" + channel.toString())); + + strAttr + " , channel info:" + strRemoteIP)); } } - + // build attributes + resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap); + resultMap.put(ConfigConstants.EXTRA_ATTR, ((extendField & 0x1) == 0x1) ? "true" : "false"); + resultMap.put(ConfigConstants.DECODER_BODY, bodyData); try { - handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount); - final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos); + // handle common attribute information + handleDateTime(commonAttrMap, uniq, dataTime, msgCount, strRemoteIP, msgRcvTime); + final boolean isIndexMsg = + handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos); ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos, - totalDataLen, attrLen, strAttr, bodyLen); - - String groupId = null; - String streamId = null; - - if (commonAttrMap.containsKey(AttributeConstants.GROUP_ID)) { - groupId = commonAttrMap.get(AttributeConstants.GROUP_ID); - } - if (commonAttrMap.containsKey(AttributeConstants.STREAM_ID)) { - streamId = commonAttrMap.get(AttributeConstants.STREAM_ID); - } - + totalDataLen, attrLen, strAttr, bodyLen, msgRcvTime); + // Check if groupId and streamId are number-to-name + String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID); + String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID); if ((groupId != null) && (streamId != null)) { commonAttrMap.put(AttributeConstants.NUM2NAME, "FALSE"); dataBuf.putShort(BIN_MSG_EXTEND_OFFSET, (short) (extendField | 0x4)); @@ -312,15 +284,16 @@ public class DefaultServiceDecoder implements ServiceDecoder { commonAttrMap.put(AttributeConstants.STREAMID_NUM, String.valueOf(streamIdNum)); } } - - if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType) && !index) { - List<ProxyMessage> msgList = new ArrayList<>(1); - msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, dataBuf.array())); - resultMap.put(ConfigConstants.MSG_LIST, msgList); - } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { + // build ProxyMessage + if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { List<ProxyMessage> msgList = new ArrayList<>(1); - msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, - (byte[]) resultMap.get(ConfigConstants.FILE_BODY))); + if (isIndexMsg) { + msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, + (byte[]) resultMap.get(ConfigConstants.FILE_BODY))); + } else { + msgList.add(new ProxyMessage(groupId, + streamId, commonAttrMap, dataBuf.array())); + } resultMap.put(ConfigConstants.MSG_LIST, msgList); } } catch (Exception ex) { @@ -328,7 +301,6 @@ public class DefaultServiceDecoder implements ServiceDecoder { cb.clear(); throw new MessageIDException(uniq, ErrorCode.OTHER_ERROR, ex.getCause()); } - return resultMap; } @@ -336,97 +308,97 @@ public class DefaultServiceDecoder implements ServiceDecoder { * extract bin data, message type less than 7 */ private Map<String, Object> extractDefaultData(Map<String, Object> resultMap, - ByteBuf cb, Channel channel, - int totalDataLen, MsgType msgType) throws Exception { + ByteBuf cb, int totalDataLen, + MsgType msgType, String strRemoteIP, + long msgRcvTime) throws Exception { int bodyLen = cb.readInt(); if (bodyLen == 0) { - throw new Exception(new Throwable("err msg, bodyLen is empty" + ";" - + "Connection info:" + channel.toString())); + throw new Exception("Error msg: bodyLen is empty, connection info:" + strRemoteIP); } // if body len is bigger than totalDataLen - 5(bodyLen bytes + message type bytes), // that means an invalid message, reject it. if (bodyLen > totalDataLen - 5) { - throw new Exception(new Throwable("err msg, firstLen > totalDataLen, and bodyLen=" + throw new Exception("Error msg, firstLen > totalDataLen, and bodyLen=" + bodyLen + ",totalDataLen=" + totalDataLen - + ";Connection info:" + channel.toString())); + + ", connection info:" + strRemoteIP); } - // extract body bytes byte[] bodyData = new byte[bodyLen]; cb.readBytes(bodyData, 0, bodyLen); resultMap.put(ConfigConstants.DECODER_BODY, bodyData); - + // extract attribute int attrLen = cb.readInt(); // 9 means bodyLen bytes(4) + message type bytes(1) + attrLen bytes(4) if (totalDataLen != 9 + attrLen + bodyLen) { - throw new Exception(new Throwable( - "err msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + bodyLen + throw new Exception( + "Error msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + bodyLen + ",totalDataLen=" + totalDataLen + ",attrDataLen=" + attrLen - + ";Connection info:" + channel.toString())); + + ", connection info:" + strRemoteIP); } - // extract attr bytes byte[] attrData = new byte[attrLen]; cb.readBytes(attrData, 0, attrLen); - String strAttr = new String(attrData, StandardCharsets.UTF_8); - resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr); - // convert attr bytes to map Map<String, String> commonAttrMap; + String strAttr = new String(attrData, StandardCharsets.UTF_8); try { commonAttrMap = new HashMap<>(mapSplitter.split(strAttr)); } catch (Exception e) { - throw new Exception(new Throwable("Parse commonAttrMap error.commonAttrString is: " - + strAttr + " ,channel is :" + channel.toString())); + throw new Exception("Parse commonAttrMap error.commonAttrString is: " + + strAttr + " , connection info:" + strRemoteIP); } + resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr); resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap); - // decompress body data if compress type exists. String compressType = commonAttrMap.get(AttributeConstants.COMPRESS_TYPE); - resultMap.put(ConfigConstants.COMPRESS_TYPE, compressType); if (StringUtils.isNotBlank(compressType)) { + resultMap.put(ConfigConstants.COMPRESS_TYPE, compressType); byte[] unCompressedData = processUnCompress(bodyData, compressType); if (unCompressedData == null || unCompressedData.length == 0) { - throw new Exception(new Throwable("Uncompressed data error!compress type:" - + compressType + ";data:" + new String(bodyData, StandardCharsets.UTF_8) - + ";attr:" + strAttr + ";channel:" + channel.toString())); + throw new Exception("Uncompressed data error! compress type:" + + compressType + ";attr:" + strAttr + + " , connection info:" + strRemoteIP); } bodyData = unCompressedData; } - // fill up attr map with some keys. - commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis())); String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID); String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID); - - // add message count attr - String cntStr = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT); - int msgCnt = cntStr != null ? Integer.parseInt(cntStr) : 1; - commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(msgCnt)); - + String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME); + long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime); + commonAttrMap.put(AttributeConstants.DATA_TIME, String.valueOf(longDataTime)); + commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); + // check message count attr + String strMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT); + int intMsgCnt = NumberUtils.toInt(strMsgCnt, 1); + commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(intMsgCnt)); // extract data from bodyData and if message type is 5, convert data into list. + int calCnt = 0; List<ProxyMessage> msgList = null; ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData); if (MsgType.MSG_MULTI_BODY.equals(msgType)) { - msgList = new ArrayList<>(msgCnt); + msgList = new ArrayList<>(intMsgCnt); while (bodyBuffer.remaining() > 0) { int singleMsgLen = bodyBuffer.getInt(); if (singleMsgLen <= 0 || singleMsgLen > bodyBuffer.remaining()) { - throw new Exception(new Throwable("[Malformed Data]Invalid data len!channel is " - + channel.toString())); + throw new Exception( + "[Malformed Data]Invalid data len! channel is " + strRemoteIP); } byte[] record = new byte[singleMsgLen]; bodyBuffer.get(record); - ProxyMessage message = new ProxyMessage(groupId, streamId, commonAttrMap, record); msgList.add(message); + calCnt++; } } else { msgList = new ArrayList<>(1); msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, bodyData)); + calCnt++; + } + if (calCnt != intMsgCnt) { + commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(calCnt)); } resultMap.put(ConfigConstants.MSG_LIST, msgList); - return resultMap; } @@ -452,7 +424,8 @@ public class DefaultServiceDecoder implements ServiceDecoder { * +--------+--------+--------+----------------+--------+----------------+------------------------+ */ @Override - public Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception { + public Map<String, Object> extractData(ByteBuf cb, String strRemoteIP, + long msgRcvTime, Channel channel) throws Exception { Map<String, Object> resultMap = new HashMap<>(); if (null == cb) { LOG.error("cb == null"); @@ -460,8 +433,8 @@ public class DefaultServiceDecoder implements ServiceDecoder { } int totalLen = cb.readableBytes(); if (ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen) { - throw new Exception(new Throwable("err msg, ConfigConstants.MSG_MAX_LENGTH_BYTES " - + "< totalLen, and totalLen=" + totalLen)); + throw new Exception("Error msg, ConfigConstants.MSG_MAX_LENGTH_BYTES " + + "< totalLen, and totalLen=" + totalLen); } // save index, reset it if buffer is not satisfied. cb.markReaderIndex(); @@ -481,14 +454,16 @@ public class DefaultServiceDecoder implements ServiceDecoder { if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) { return extractNewBinHB(resultMap, cb, channel, totalDataLen); } - + // process data message if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) { resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : ""); - return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType); + return extractNewBinData(resultMap, cb, + channel, totalDataLen, msgType, + strRemoteIP, msgRcvTime); } else { - return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType); + return extractDefaultData(resultMap, cb, + totalDataLen, msgType, strRemoteIP, msgRcvTime); } - } else { // reset index. cb.resetReaderIndex(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index 9d3b32c4e..ab9055ad6 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -17,8 +17,20 @@ package org.apache.inlong.dataproxy.source; +import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR; +import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist; + import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -26,7 +38,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; @@ -40,37 +51,16 @@ import org.apache.inlong.dataproxy.base.ProxyMessage; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; -import org.apache.inlong.dataproxy.exception.ErrorCode; import org.apache.inlong.dataproxy.exception.MessageIDException; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; -import org.apache.inlong.dataproxy.utils.Constants; +import org.apache.inlong.dataproxy.utils.DateTimeUtils; import org.apache.inlong.dataproxy.utils.MessageUtils; import org.apache.inlong.dataproxy.utils.NetworkUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR; -import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA; -import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID; -import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist; - /** * Server message handler * @@ -91,10 +81,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { .on(AttributeConstants.SEPARATOR) .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - private static final DateTimeFormatter DATE_FORMATTER - = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); - private static final ZoneId defZoneId = ZoneId.systemDefault(); - private AbstractSource source; private final ChannelGroup allChannels; @@ -180,8 +166,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { strRemoteIp = strRemoteIp.substring(1, strRemoteIp.indexOf(':')); } catch (Exception ee) { logger.warn("fail to get the remote IP, and strIP={},remoteSocketAddress={}", - strRemoteIp, - remoteSocketAddress); + strRemoteIp, remoteSocketAddress); } } return strRemoteIp; @@ -269,10 +254,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap, - Map<String, String> attrMap, AtomicReference<String> topicInfo) { + Map<String, String> attrMap, AtomicReference<String> topicInfo) { String groupId = message.getGroupId(); String streamId = message.getStreamId(); if (null != groupId) { + // get configured group Id String from = commonAttrMap.get(AttributeConstants.FROM); if ("dc".equals(from)) { String dcInterfaceId = message.getStreamId(); @@ -284,14 +270,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { message.setGroupId(groupId); } } - - String value = MessageUtils.getTopic(configManager.getTopicProperties(), groupId, - streamId); - if (StringUtils.isNotEmpty(value)) { - topicInfo.set(value.trim()); + // get configured topic name + String configTopic = MessageUtils.getTopic( + configManager.getTopicProperties(), groupId, streamId); + if (StringUtils.isNotEmpty(configTopic)) { + topicInfo.set(configTopic.trim()); } - - Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId); + // get configured m value + Map<String, String> mxValue = + configManager.getMxPropertiesMaps().get(groupId); if (mxValue != null && mxValue.size() != 0) { message.getAttributeMap().putAll(mxValue); } else { @@ -301,7 +288,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME); String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM); String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM); - + // get configured groupId and steamId by numbers if (configManager.getGroupIdMappingProperties() != null && configManager.getStreamIdMappingProperties() != null) { groupId = configManager.getGroupIdMappingProperties().get(groupIdNum); @@ -311,21 +298,21 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { String enableTrans = (configManager.getGroupIdEnableMappingProperties() == null) ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum); - if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE" - .equalsIgnoreCase(num2name))) { + if (("TRUE".equalsIgnoreCase(enableTrans) + && "TRUE".equalsIgnoreCase(num2name))) { String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId; message.setData(newBinMsg(message.getData(), extraAttr)); } - + // reset groupId and streamId to message and attrMap attrMap.put(AttributeConstants.GROUP_ID, groupId); attrMap.put(AttributeConstants.STREAM_ID, streamId); message.setGroupId(groupId); message.setStreamId(streamId); - - String value = MessageUtils.getTopic(configManager.getTopicProperties(), - groupId, streamId); - if (StringUtils.isNotEmpty(value)) { - topicInfo.set(value.trim()); + // get configured topic name + String configTopic = MessageUtils.getTopic( + configManager.getTopicProperties(), groupId, streamId); + if (StringUtils.isNotEmpty(configTopic)) { + topicInfo.set(configTopic.trim()); } } } @@ -333,60 +320,28 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } private boolean updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap, - Map<String, HashMap<String, List<ProxyMessage>>> messageMap, - String strRemoteIP, MsgType msgType) { + Map<String, HashMap<String, List<ProxyMessage>>> messageMap, + String strRemoteIP) { for (ProxyMessage message : msgList) { - Map<String, String> attrMap = message.getAttributeMap(); - String topic = this.defaultTopic; - + Map<String, String> attrMap = message.getAttributeMap(); AtomicReference<String> topicInfo = new AtomicReference<>(topic); checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo); String groupId = message.getGroupId(); String streamId = message.getStreamId(); + if (streamId == null) { + streamId = ""; + message.setStreamId(streamId); + } topic = topicInfo.get(); if (StringUtils.isEmpty(topic)) { logger.warn("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}", groupId, streamId); } - // if(groupId==null)groupId="b_test";//default groupId - + // append topic message.setTopic(topic); commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP); - - // whether sla - if (SLA_METRIC_GROUPID.equals(groupId)) { - commonAttrMap.put(SLA_METRIC_DATA, "true"); - message.setTopic(SLA_METRIC_DATA); - } - - if (groupId != null && streamId != null) { - String tubeSwtichKey = groupId + SEPARATOR + streamId; - if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null - && "false".equals(configManager.getTubeSwitchProperties() - .get(tubeSwtichKey).trim())) { - continue; - } - } - - if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE)) - && !MsgType.MSG_MULTI_BODY.equals(msgType) - && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) { - byte[] data = message.getData(); - if (data[data.length - 1] == '\n') { - int tripDataLen = data.length - 1; - if (data[data.length - 2] == '\r') { - tripDataLen = data.length - 2; - } - byte[] tripData = new byte[tripDataLen]; - System.arraycopy(data, 0, tripData, 0, tripDataLen); - message.setData(tripData); - } - } - - if (streamId == null) { - streamId = ""; - } + // add ProxyMessage HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap .computeIfAbsent(topic, k -> new HashMap<>()); List<ProxyMessage> streamIdMsgList = streamIdMsgMap @@ -397,8 +352,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap, - Map<String, HashMap<String, List<ProxyMessage>>> messageMap, - String strRemoteIP, MsgType msgType) throws MessageIDException { + Map<String, HashMap<String, List<ProxyMessage>>> messageMap, + String strRemoteIP, MsgType msgType, long msgRcvTime) throws MessageIDException { int inLongMsgVer = 1; if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) { @@ -406,129 +361,124 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { inLongMsgVer = 4; } + StringBuilder strBuff = new StringBuilder(512); int recordMsgCnt = Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT)); - + // process each ProxyMessage for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) { for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) { - + // build InLongMsg + String groupId = null; + int streamMsgCnt = 0; InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer); - Map<String, String> headers = new HashMap<String, String>(); - for (ProxyMessage message : streamIdEntry.getValue()) { - if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) { + if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) { + for (ProxyMessage message : streamIdEntry.getValue()) { + if (StringUtils.isEmpty(groupId)) { + groupId = message.getGroupId(); + } + streamMsgCnt++; message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1)); inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData()); - } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { + } + } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { + for (ProxyMessage message : streamIdEntry.getValue()) { + if (StringUtils.isEmpty(groupId)) { + groupId = message.getGroupId(); + } + streamMsgCnt++; inLongMsg.addMsg(message.getData()); - } else { + } + } else { + for (ProxyMessage message : streamIdEntry.getValue()) { + if (StringUtils.isEmpty(groupId)) { + groupId = message.getGroupId(); + } + streamMsgCnt++; inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData()); } } - // get msgTime - long currTIme = System.currentTimeMillis(); - String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME); - long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme); - LocalDateTime localDateTime = - LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId); - String pkgTimeStr = DATE_FORMATTER.format(localDateTime); - headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis)); - headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr); - // get data time - long dtTime = NumberUtils.toLong( - commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme); - headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime)); - + if (recordMsgCnt != streamMsgCnt) { + logger.debug("Found message count not equal, record={}, calculate value = {}", + recordMsgCnt, streamMsgCnt); + } + commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(streamMsgCnt)); + // build headers + Map<String, String> headers = new HashMap<>(); + headers.put(AttributeConstants.GROUP_ID, groupId); + headers.put(AttributeConstants.STREAM_ID, streamIdEntry.getKey()); + headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey()); + String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME); + headers.put(AttributeConstants.DATA_TIME, strDataTime); + headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP); + headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE); + headers.put(ConfigConstants.MSG_COUNTER_KEY, + commonAttrMap.get(AttributeConstants.MESSAGE_COUNT)); + headers.put(AttributeConstants.RCV_TIME, + commonAttrMap.get(AttributeConstants.RCV_TIME)); + // add extra key-value information + headers.put(AttributeConstants.UNIQ_ID, + commonAttrMap.get(AttributeConstants.UNIQ_ID)); if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) { headers.put(AttributeConstants.MESSAGE_IS_ACK, "false"); } - String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND); if (StringUtils.isNotEmpty(syncSend)) { headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend); } - String partitionKey = commonAttrMap.get(AttributeConstants.MESSAGE_PARTITION_KEY); if (StringUtils.isNotEmpty(partitionKey)) { headers.put(AttributeConstants.MESSAGE_PARTITION_KEY, partitionKey); } - - headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey()); - headers.put(AttributeConstants.GROUP_ID, - streamIdEntry.getValue().get(0).getGroupId()); - headers.put(AttributeConstants.STREAM_ID, streamIdEntry.getKey()); - headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP); - headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE); - // every message share the same msg cnt? what if msgType = 5 - String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT); - if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) { - commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt)); - proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT); - } - headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt); - - byte[] data = inLongMsg.buildArray(); - headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length)); - - headers.put(AttributeConstants.UNIQ_ID, - commonAttrMap.get(AttributeConstants.UNIQ_ID)); String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID); if (StringUtils.isNotEmpty(sequenceId)) { - StringBuilder sidBuilder = new StringBuilder(); - sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey()) + strBuff.append(topicEntry.getKey()).append(SEPARATOR) + .append(streamIdEntry.getKey()) .append(SEPARATOR).append(sequenceId); - headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString()); + headers.put(ConfigConstants.SEQUENCE_ID, strBuff.toString()); + strBuff.delete(0, strBuff.length()); } + final byte[] data = inLongMsg.buildArray(); Event event = EventBuilder.withBody(data, headers); + inLongMsg.reset(); + // build metric data item String orderType = "non-order"; if (MessageUtils.isSyncSendForOrder(event)) { event = new OrderEvent(ctx, event); orderType = "order"; } - long dtten = 0; - try { - dtten = Long.parseLong(headers.get(AttributeConstants.DATA_TIME)); - } catch (Exception e1) { - long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID)); - throw new MessageIDException(uniqVal, - ErrorCode.DT_ERROR, - new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME - + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId=" - + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1)); - } - - dtten = dtten / 1000 / 60 / 10; - dtten = dtten * 1000 * 60 * 10; - StringBuilder newbase = new StringBuilder(); - newbase.append(protocolType).append(SEPARATOR) + long longDataTime = Long.parseLong(strDataTime); + longDataTime = longDataTime / 1000 / 60 / 10; + longDataTime = longDataTime * 1000 * 60 * 10; + strBuff.append(protocolType).append(SEPARATOR) .append(topicEntry.getKey()).append(SEPARATOR) .append(streamIdEntry.getKey()).append(SEPARATOR) .append(strRemoteIP).append(SEPARATOR) .append(NetworkUtils.getLocalIp()).append(SEPARATOR) .append(orderType).append(SEPARATOR) - .append(new SimpleDateFormat("yyyyMMddHHmm") - .format(dtten)).append(SEPARATOR).append(pkgTimeStr); + .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEPARATOR) + .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime)); try { processor.processEvent(event); monitorIndexExt.incrementAndGet("EVENT_SUCCESS"); this.addMetric(true, data.length, event); - monitorIndex.addAndGet(new String(newbase), - Integer.parseInt(proxyMetricMsgCnt), 1, data.length, 0); + monitorIndex.addAndGet(strBuff.toString(), + streamMsgCnt, 1, data.length, 0); + strBuff.delete(0, strBuff.length()); } catch (Throwable ex) { logger.error("Error writting to channel,data will discard.", ex); monitorIndexExt.incrementAndGet("EVENT_DROPPED"); - monitorIndex.addAndGet(new String(newbase), 0, 0, 0, - Integer.parseInt(proxyMetricMsgCnt)); + monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt); this.addMetric(false, data.length, event); + strBuff.delete(0, strBuff.length()); throw new ChannelException("ProcessEvent error can't write event to channel."); } } } } - private void responsePackage(ChannelHandlerContext ctx, Map<String, String> commonAttrMap, - Map<String, Object> resultMap, - Channel remoteChannel, - SocketAddress remoteSocketAddress, - MsgType msgType) throws Exception { + private void responsePackage(Map<String, String> commonAttrMap, + Map<String, Object> resultMap, + Channel remoteChannel, + MsgType msgType) throws Exception { String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK); if (isAck == null || "true".equals(isAck)) { if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN @@ -571,25 +521,22 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } } } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { - String backattrs = null; - if (resultMap.containsKey(ConfigConstants.DECODER_ATTRS)) { - backattrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS); - } + String backAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS); String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID); - ByteBuf binBuffer = MessageUtils.getResponsePackage(backattrs, msgType, uniqVal); + ByteBuf binBuffer = MessageUtils.getResponsePackage(backAttrs, msgType, uniqVal); if (remoteChannel.isWritable()) { remoteChannel.writeAndFlush(binBuffer); logger.debug("Connection info: {} ; attr is {} ; uniqVal {}", - remoteChannel, backattrs, uniqVal); + remoteChannel, backAttrs, uniqVal); } else { binBuffer.release(); logger.warn( "the send buffer2 is full, so disconnect it!please check remote client" + "; Connection info:" + remoteChannel + ";attr is " - + backattrs); + + backAttrs); throw new Exception(new Throwable( "the send buffer2 is full,so disconnect it!please check remote client, Connection info:" - + remoteChannel + ";attr is " + backattrs)); + + remoteChannel + ";attr is " + backAttrs)); } } } @@ -597,9 +544,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - logger.debug("message received"); if (msg == null) { - logger.error("get null msg, just skip"); + logger.error("Get null msg, just skip!"); this.addMetric(false, 0, null); return; } @@ -609,26 +555,27 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { String strRemoteIP = getRemoteIp(remoteChannel); int len = cb.readableBytes(); if (len == 0 && this.filterEmptyMsg) { - logger.warn("skip empty msg."); + logger.warn("Get empty msg from {}, just skip!", strRemoteIP); this.addMetric(false, 0, null); return; } - + // parse message Map<String, Object> resultMap = null; + final long msgRcvTime = System.currentTimeMillis(); try { - resultMap = serviceDecoder.extractData(cb, remoteChannel); + resultMap = serviceDecoder.extractData(cb, + strRemoteIP, msgRcvTime, remoteChannel); + if (resultMap == null || resultMap.isEmpty()) { + logger.info("Parse message result is null, from {}", strRemoteIP); + this.addMetric(false, 0, null); + return; + } } catch (MessageIDException ex) { logger.error("MessageIDException ex = {}", ex); this.addMetric(false, 0, null); throw new IOException(ex.getCause()); } - - if (resultMap == null) { - logger.info("result is null"); - this.addMetric(false, 0, null); - return; - } - + // process message by msgType MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE); if (MsgType.MSG_HEARTBEAT.equals(msgType)) { ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5); @@ -637,90 +584,83 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { this.addMetric(false, 0, null); return; } - + // process heart beat 8 if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) { this.addMetric(false, 0, null); return; } - + // process data message Map<String, String> commonAttrMap = (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP); if (commonAttrMap == null) { commonAttrMap = new HashMap<String, String>(); } - List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST); - boolean checkMessageTopic = true; - if (msgList != null - && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA) - && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) { - Map<String, HashMap<String, List<ProxyMessage>>> messageMap = - new HashMap<String, HashMap<String, List<ProxyMessage>>>( - msgList.size()); - - checkMessageTopic = updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, - msgType); - if (checkMessageTopic) { - formatMessagesAndSend(ctx, commonAttrMap, messageMap, - strRemoteIP, msgType); - } - } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) { - Map<String, String> headers = new HashMap<String, String>(); - headers.put("msgtype", "filestatus"); - headers.put(ConfigConstants.FILE_CHECK_DATA, - "true"); - headers.put(AttributeConstants.UNIQ_ID, - commonAttrMap.get(AttributeConstants.UNIQ_ID)); - for (ProxyMessage message : msgList) { - byte[] body = message.getData(); - Event event = EventBuilder.withBody(body, headers); - if (MessageUtils.isSyncSendForOrder(commonAttrMap - .get(AttributeConstants.MESSAGE_SYNC_SEND))) { - event = new OrderEvent(ctx, event); - } - try { - processor.processEvent(event); - this.addMetric(true, body.length, event); - } catch (Throwable ex) { - logger.error("Error writing to controller,data will discard.", ex); - this.addMetric(false, body.length, event); - throw new ChannelException( - "Process Controller Event error can't write event to channel."); + if (msgList != null) { + if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) { + // process file check data + Map<String, String> headers = new HashMap<String, String>(); + headers.put("msgtype", "filestatus"); + headers.put(ConfigConstants.FILE_CHECK_DATA, "true"); + headers.put(AttributeConstants.UNIQ_ID, + commonAttrMap.get(AttributeConstants.UNIQ_ID)); + for (ProxyMessage message : msgList) { + byte[] body = message.getData(); + Event event = EventBuilder.withBody(body, headers); + if (MessageUtils.isSyncSendForOrder(commonAttrMap + .get(AttributeConstants.MESSAGE_SYNC_SEND))) { + event = new OrderEvent(ctx, event); + } + try { + processor.processEvent(event); + this.addMetric(true, body.length, event); + } catch (Throwable ex) { + logger.error("Error writing to controller,data will discard.", ex); + this.addMetric(false, body.length, event); + throw new ChannelException( + "Process Controller Event error can't write event to channel."); + } } - } - } else if (msgList != null && commonAttrMap - .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) { - logger.info("i am in MINUTE_CHECK_DATA"); - Map<String, String> headers = new HashMap<String, String>(); - headers.put("msgtype", "measure"); - headers.put(ConfigConstants.FILE_CHECK_DATA, - "true"); - headers.put(AttributeConstants.UNIQ_ID, - commonAttrMap.get(AttributeConstants.UNIQ_ID)); - for (ProxyMessage message : msgList) { - byte[] body = message.getData(); - Event event = EventBuilder.withBody(body, headers); - if (MessageUtils.isSyncSendForOrder(commonAttrMap - .get(AttributeConstants.MESSAGE_SYNC_SEND))) { - event = new OrderEvent(ctx, event); + } else if (commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) { + // process minute check data + Map<String, String> headers = new HashMap<String, String>(); + headers.put("msgtype", "measure"); + headers.put(ConfigConstants.FILE_CHECK_DATA, "true"); + headers.put(AttributeConstants.UNIQ_ID, + commonAttrMap.get(AttributeConstants.UNIQ_ID)); + for (ProxyMessage message : msgList) { + byte[] body = message.getData(); + Event event = EventBuilder.withBody(body, headers); + if (MessageUtils.isSyncSendForOrder(commonAttrMap + .get(AttributeConstants.MESSAGE_SYNC_SEND))) { + event = new OrderEvent(ctx, event); + } + try { + processor.processEvent(event); + this.addMetric(true, body.length, event); + } catch (Throwable ex) { + logger.error("Error writing to controller,data will discard.", ex); + this.addMetric(false, body.length, event); + throw new ChannelException( + "Process Controller Event error can't write event to channel."); + } } - try { - processor.processEvent(event); - this.addMetric(true, body.length, event); - } catch (Throwable ex) { - logger.error("Error writing to controller,data will discard.", ex); - this.addMetric(false, body.length, event); - throw new ChannelException( - "Process Controller Event error can't write event to channel."); + } else { + // process message data + Map<String, HashMap<String, List<ProxyMessage>>> messageMap = + new HashMap<>(msgList.size()); + checkMessageTopic = updateMsgList(msgList, + commonAttrMap, messageMap, strRemoteIP); + if (checkMessageTopic) { + formatMessagesAndSend(ctx, commonAttrMap, + messageMap, strRemoteIP, msgType, msgRcvTime); } } } - SocketAddress remoteSocketAddress = remoteChannel.remoteAddress(); if (!checkMessageTopic || !MessageUtils.isSyncSendForOrder(commonAttrMap .get(AttributeConstants.MESSAGE_SYNC_SEND))) { - responsePackage(ctx, commonAttrMap, resultMap, remoteChannel, - remoteSocketAddress, msgType); + responsePackage(commonAttrMap, resultMap, remoteChannel, msgType); } } finally { cb.release(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java index 7f4852b5c..3f13f7e1d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java @@ -28,10 +28,13 @@ public interface ServiceDecoder { /** * extract data from buffer and convert it into map. * - * @param cb - * @param channel - * @return Map + * @param cb the message Byte buffer + * @param strRemoteIP the remote ip message sent + * @param msgRcvTime the received message time + * @param channel the channel + * @return Map the message map * @throws Exception */ - Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception; + Map<String, Object> extractData(ByteBuf cb, String strRemoteIP, + long msgRcvTime, Channel channel) throws Exception; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java index 38ca9ee32..71372da18 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java @@ -570,6 +570,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { return; } Channel remoteChannel = ctx.channel(); + String strRemoteIP = getRemoteIp(remoteChannel); ByteBuf cb = (ByteBuf) msg; try { int len = cb.readableBytes(); @@ -580,8 +581,10 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { return; } Map<String, Object> resultMap = null; + final long msgRcvTime = System.currentTimeMillis(); try { - resultMap = serviceProcessor.extractData(cb, remoteChannel); + resultMap = serviceProcessor.extractData(cb, + strRemoteIP, msgRcvTime, remoteChannel); } catch (MessageIDException ex) { this.addMetric(false, 0, null); throw new IOException(ex.getCause()); @@ -617,7 +620,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA) && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) { Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size()); - String strRemoteIP = getRemoteIp(remoteChannel); updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType); formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);