This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c5b9319a8 [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900) c5b9319a8 is described below commit c5b9319a8fc49ebddad92a4f8193073db7e3f5fc Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Sep 16 10:07:32 2022 +0800 [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900) --- .../dataproxy/http/SimpleMessageHandler.java | 135 +++++++++++---------- .../inlong/dataproxy/utils/DateTimeUtils.java | 41 +++++++ 2 files changed, 109 insertions(+), 67 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index 1969f7cea..3f1bf425c 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -20,11 +20,6 @@ package org.apache.inlong.dataproxy.http; import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG; import javax.servlet.http.HttpServletRequest; import java.io.UnsupportedEncodingException; -import java.text.SimpleDateFormat; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -45,7 +40,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.ServiceDecoder; -import org.apache.inlong.dataproxy.utils.Constants; +import org.apache.inlong.dataproxy.utils.DateTimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,9 +48,6 @@ public class SimpleMessageHandler implements MessageHandler { private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class); private static final ConfigManager configManager = ConfigManager.getInstance(); - private static final DateTimeFormatter DATE_FORMATTER - = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); - private static final ZoneId defZoneId = ZoneId.systemDefault(); private static final String DEFAULT_REMOTE_IDC_VALUE = "0"; private final MonitorIndex monitorIndex; @@ -88,96 +80,105 @@ public class SimpleMessageHandler implements MessageHandler { @Override public void processMessage(Context context) throws MessageProcessException { - String topicValue = "test"; - String attr = "m=0"; - StringBuilder newAttrBuffer = new StringBuilder(attr); - + StringBuilder strBuff = new StringBuilder(512); + // get groupId and streamId String groupId = (String) context.get(AttributeConstants.GROUP_ID); String streamId = (String) context.get(AttributeConstants.STREAM_ID); - String dt = (String) context.get(AttributeConstants.DATA_TIME); - - String value = getTopic(groupId, streamId); - if (null != value && !"".equals(value)) { - topicValue = value.trim(); + if (StringUtils.isBlank(groupId) || StringUtils.isBlank(streamId)) { + throw new MessageProcessException(strBuff.append("Field ") + .append(AttributeConstants.GROUP_ID).append(" or ") + .append(AttributeConstants.STREAM_ID) + .append(" must exist and not blank!").toString()); } - - String mxValue = configManager.getMxProperties().get(groupId); - if (null != mxValue) { - newAttrBuffer = new StringBuilder(mxValue.trim()); + groupId = groupId.trim(); + streamId = streamId.trim(); + // get topicName + String topicName = "test"; + String configedTopicName = getTopic(groupId, streamId); + if (StringUtils.isNotBlank(configedTopicName)) { + topicName = configedTopicName.trim(); } - - newAttrBuffer.append("&groupId=").append(groupId).append("&streamId=").append(streamId) - .append("&dt=").append(dt); + // get message data time + final long msgRcvTime = System.currentTimeMillis(); + String strDataTime = (String) context.get(AttributeConstants.DATA_TIME); + long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime); + strDataTime = String.valueOf(longDataTime); + // get char set + String charset = (String) context.get(AttributeConstants.CHARSET); + if (StringUtils.isBlank(charset)) { + charset = AttributeConstants.CHARSET; + } + String body = (String) context.get(AttributeConstants.BODY); + if (StringUtils.isEmpty(body)) { + throw new MessageProcessException(strBuff.append("Field ") + .append(AttributeConstants.BODY) + .append(" must exist and not empty!").toString()); + } + // get m attribute + String mxValue = "m=0"; + String configedMxAttr = configManager.getMxProperties().get(groupId); + if (StringUtils.isNotEmpty(configedMxAttr)) { + mxValue = configedMxAttr.trim(); + } + // convert context to http request HttpServletRequest request = (HttpServletRequest) context.get(AttributeConstants.HTTP_REQUEST); + // get report node ip String strRemoteIP = request.getRemoteAddr(); - newAttrBuffer.append("&NodeIP=").append(strRemoteIP); - String msgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT); - if (msgCount == null || "".equals(msgCount)) { - msgCount = "1"; - } - + // get message count + String strMsgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT); + int intMsgCnt = NumberUtils.toInt(strMsgCount, 1); + strMsgCount = String.valueOf(intMsgCnt); + // build message attributes InLongMsg inLongMsg = InLongMsg.newInLongMsg(true); - String charset = (String) context.get(AttributeConstants.CHARSET); - if (charset == null || "".equals(charset)) { - charset = "UTF-8"; - } - String body = (String) context.get(AttributeConstants.BODY); + strBuff.append(mxValue).append("&groupId=").append(groupId) + .append("&streamId=").append(streamId) + .append("&dt=").append(strDataTime) + .append("&NodeIP=").append(strRemoteIP) + .append("&cnt=").append(strMsgCount) + .append("&rt=").append(msgRcvTime); try { - inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset)); + inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset)); + strBuff.delete(0, strBuff.length()); } catch (UnsupportedEncodingException e) { throw new MessageProcessException(e); } - + // build flume event Map<String, String> headers = new HashMap<>(); - headers.put(AttributeConstants.DATA_TIME, dt); - headers.put(ConfigConstants.TOPIC_KEY, topicValue); + headers.put(AttributeConstants.GROUP_ID, groupId); headers.put(AttributeConstants.STREAM_ID, streamId); + headers.put(ConfigConstants.TOPIC_KEY, topicName); + headers.put(AttributeConstants.DATA_TIME, strDataTime); headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP); headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE); - headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount); + headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount); byte[] data = inLongMsg.buildArray(); headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length)); - // add msgTime if not existed - long currTIme = System.currentTimeMillis(); - String strMsgTime = request.getParameter(Constants.HEADER_KEY_MSG_TIME); - long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme); - LocalDateTime localDateTime = - LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId); - String pkgTime = DATE_FORMATTER.format(localDateTime); - headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis)); - headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime); + headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); Event event = EventBuilder.withBody(data, headers); inLongMsg.reset(); - long dtten; - try { - dtten = Long.parseLong(dt); - } catch (NumberFormatException e1) { - throw new MessageProcessException(new Throwable( - "attribute dt=" + dt + " has error," + " detail is: " + newAttrBuffer)); - } - dtten = dtten / 1000 / 60 / 10; - dtten = dtten * 1000 * 60 * 10; - StringBuilder newBase = new StringBuilder(); - newBase.append("http").append(SEP_HASHTAG).append(topicValue).append(SEP_HASHTAG) + // build metric data item + longDataTime = longDataTime / 1000 / 60 / 10; + longDataTime = longDataTime * 1000 * 60 * 10; + strBuff.append("http").append(SEP_HASHTAG).append(topicName).append(SEP_HASHTAG) .append(streamId).append(SEP_HASHTAG).append(strRemoteIP).append(SEP_HASHTAG) .append(NetworkUtils.getLocalIp()).append(SEP_HASHTAG) .append("non-order").append(SEP_HASHTAG) - .append(new SimpleDateFormat("yyyyMMddHHmm").format(dtten)).append(SEP_HASHTAG) - .append(pkgTime); + .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEP_HASHTAG) + .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime)); long beginTime = System.currentTimeMillis(); try { processor.processEvent(event); if (monitorIndex != null) { - monitorIndex.addAndGet(new String(newBase), - Integer.parseInt(msgCount), 1, data.length, 0); + monitorIndex.addAndGet(strBuff.toString(), + intMsgCnt, 1, data.length, 0); monitorIndexExt.incrementAndGet("EVENT_SUCCESS"); } addMetric(true, data.length, event); } catch (ChannelException ex) { if (monitorIndex != null) { - monitorIndex.addAndGet(new String(newBase), - 0, 0, 0, Integer.parseInt(msgCount)); + monitorIndex.addAndGet(strBuff.toString(), + 0, 0, 0, intMsgCnt); monitorIndexExt.incrementAndGet("EVENT_DROPPED"); } addMetric(false, data.length, event); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java new file mode 100644 index 000000000..222e343c8 --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.utils; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public class DateTimeUtils { + private static final DateTimeFormatter DATE_FORMATTER + = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); + private static final ZoneId defZoneId = ZoneId.systemDefault(); + + /** + * convert ms value to 'yyyyMMddHHmm' string + * + * @param timestamp The millisecond value of the specified time + * @return the time string in yyyyMMddHHmm format + */ + public static String ms2yyyyMMddHHmm(long timestamp) { + LocalDateTime localDateTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), defZoneId); + return DATE_FORMATTER.format(localDateTime); + } +}