This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 55ed96c248 [INLONG-8212][DataProxy] Improve HTTP related message handling (#8213) 55ed96c248 is described below commit 55ed96c248f09f10de095750ec3f6a72ac3fc0a5 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Jun 12 09:47:08 2023 +0800 [INLONG-8212][DataProxy] Improve HTTP related message handling (#8213) --- .../inlong/common/enums/DataProxyErrCode.java | 9 +- .../inlong/dataproxy/config/ConfigManager.java | 27 +- .../inlong/dataproxy/consts/HttpAttrConst.java | 38 +++ .../inlong/dataproxy/consts/StatConstants.java | 45 +-- .../dataproxy/heartbeat/HeartbeatManager.java | 2 +- .../inlong/dataproxy/metrics/audit/AuditUtils.java | 3 +- .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 10 +- .../inlong/dataproxy/source2/BaseSource.java | 34 ++ .../dataproxy/source2/InLongMessageHandler.java | 30 +- .../inlong/dataproxy/source2/SimpleHttpSource.java | 1 + .../inlong/dataproxy/source2/SimpleTcpSource.java | 34 +- .../source2/httpMsg/InLongHttpMsgHandler.java | 379 +++++++++++++-------- .../dataproxy/source2/v0msg/CodecBinMsg.java | 6 +- .../dataproxy/source2/v0msg/CodecTextMsg.java | 2 +- 14 files changed, 378 insertions(+), 242 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java index 263af3678e..5caeb2d875 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java @@ -26,11 +26,18 @@ public enum DataProxyErrCode { SUCCESS(0, "Ok"), - SINK_SERVICE_UNREADY(1, "Service not ready"), + SINK_SERVICE_UNREADY(1, "Service sink not ready"), SERVICE_CLOSED(2, "Service closed"), CONF_SERVICE_UNREADY(3, "Configure Service not ready"), ILLEGAL_VISIT_IP(10, "Illegal visit ip"), + HTTP_DECODE_REQ_FAILURE(31, "Decode request failure"), + HTTP_UNSUPPORTED_METHOD(32, "Un-supported method"), + HTTP_REQ_URI_BLANK(33, "Request uri is blank"), + HTTP_DECODE_REQ_URI_FAILURE(34, "Decode uri failure"), + HTTP_UNSUPPORTED_SERVICE_URI(35, "Un-supported service uri"), + HTTP_UNSUPPORTED_CONTENT_TYPE(36, "Un-supported content type"), + FIELD_VALUE_NOT_EQUAL(95, "Field value not equal"), UNCOMPRESS_DATA_ERROR(96, "Uncompress data error"), diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index 2507419686..af626660ba 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -317,7 +317,7 @@ public class ConfigManager { } List<String> managerIpList = CommonConfigHolder.getInstance().getManagerHosts(); if (managerIpList == null || managerIpList.size() == 0) { - LOG.error("Found remote manager ip list are empty, can't quest remote configure!"); + LOG.error("Found manager ip list are empty, can't quest remote configure!"); return; } int managerIpSize = managerIpList.size(); @@ -333,10 +333,11 @@ public class ConfigManager { * reloadDataProxyConfig */ private boolean reloadDataProxyConfig(String clusterName, String clusterTag, String host) { + String url = null; HttpPost httpPost = null; try { - String url = - "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH; + url = "http://" + host + ConfigConstants.MANAGER_PATH + + ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH; httpPost = new HttpPost(url); httpPost.addHeader(HttpHeaders.CONNECTION, "close"); httpPost.addHeader(HttpHeaders.AUTHORIZATION, AuthUtils.genBasicAuth()); @@ -349,11 +350,12 @@ public class ConfigManager { } httpPost.setEntity(HttpUtils.getEntity(request)); // request with post - LOG.info("Start to request {} to get config info with params {}", url, request); + LOG.info("Start to request {} to get config info, with params {}", url, request); CloseableHttpResponse response = httpClient.execute(httpPost); String returnStr = EntityUtils.toString(response.getEntity()); if (response.getStatusLine().getStatusCode() != 200) { - LOG.info("Failed to request {}, the response is {}", url, returnStr); + LOG.warn("Failed to request {}, with params {}, the response is {}", + url, request, returnStr); return false; } LOG.info("End to request {} to get config info:{}", url, returnStr); @@ -361,28 +363,33 @@ public class ConfigManager { DataProxyConfigResponse proxyResponse = gson.fromJson(returnStr, DataProxyConfigResponse.class); if (!proxyResponse.isResult()) { - LOG.info("Fail to get config info from url:{}, error code is {}", url, proxyResponse.getErrCode()); + LOG.warn("Fail to get config from url {}, with params {}, error code is {}", + url, request, proxyResponse.getErrCode()); return false; } if (proxyResponse.getErrCode() != DataProxyConfigResponse.SUCC) { - LOG.info("Get config info from url:{}, error code is {}", url, proxyResponse.getErrCode()); + if (proxyResponse.getErrCode() != DataProxyConfigResponse.NOUPDATE) { + LOG.warn("Get config failure from url:{}, with params {}, error code is {}", + url, request, proxyResponse.getErrCode()); + } return true; } DataProxyCluster dataProxyCluster = proxyResponse.getData(); if (dataProxyCluster == null || dataProxyCluster.getCacheClusterSet() == null || dataProxyCluster.getCacheClusterSet().getCacheClusters().isEmpty()) { - LOG.info("Get config info from url:{}, found cluster set is empty!", url); + LOG.warn("Get config empty from url:{}, with params {}, return:{}, cluster is empty!", + url, request, returnStr); return true; } // update meta configure if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(), returnStr)) { ConfigManager.handshakeManagerOk.set(true); - LOG.info("Get meta config info and set handshake status is ok!"); + LOG.info("Get config success from manager and updated, set handshake status is ok!"); } return true; } catch (Throwable ex) { - LOG.error("Request remote manager failure", ex); + LOG.error("Request manager {} failure, throw exception", url, ex); return false; } finally { if (httpPost != null) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java new file mode 100644 index 0000000000..045696100b --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.consts; + +/** + * Http Attribute constants + */ +public class HttpAttrConst { + + public static final String KEY_SRV_URL_HEARTBEAT = "/dataproxy/heartbeat"; + public static final String KEY_SRV_URL_REPORT_MSG = "/dataproxy/message"; + public static final String KEY_URL_FAVICON_ICON = "/favicon.ico"; + + public static final String KEY_GROUP_ID = "groupId"; + public static final String KEY_STREAM_ID = "streamId"; + public static final String KEY_BODY = "body"; + public static final String KEY_DATA_TIME = "dt"; + public static final String KEY_MESSAGE_COUNT = "cnt"; + public static final String KEY_CHARSET = "charset"; + public static final String VAL_DEF_CHARSET = "UTF-8"; + public static final String RET_CNT_TYPE = "application/json;charset=utf-8"; + +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java index c445cadd31..1d262a8112 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java @@ -19,15 +19,30 @@ package org.apache.inlong.dataproxy.consts; public class StatConstants { - public static final java.lang.String EVENT_SERVICE_CLOSED = "source.srvclosed"; - public static final java.lang.String EVENT_SERVICE_UNREADY = "sink.unready"; - public static final java.lang.String EVENT_VISITIP_ILLEGAL = "links.illegal"; - public static final java.lang.String EVENT_NOTOPIC = "config.notopic"; + public static final java.lang.String EVENT_SERVICE_CLOSED = "service.closed"; + public static final java.lang.String EVENT_SERVICE_SINK_UNREADY = "service.sink.unready"; + // visit + public static final java.lang.String EVENT_VISIT_ILLEGAL = "visit.illegal"; + public static final java.lang.String EVENT_VISIT_OVERMAX = "visit.overmax"; + public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin"; + public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout"; + public static final java.lang.String EVENT_VISIT_EXCEPTION = "visit.exception"; + // configure + public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING = "config.topic.missing"; // source - public static final java.lang.String EVENT_LINKS_OVERMAX = "links.overmax"; - public static final java.lang.String EVENT_LINKS_IN = "links.linkin"; - public static final java.lang.String EVENT_LINKS_OUT = "links.linkout"; - public static final java.lang.String EVENT_LINKS_EXCEPTION = "links.exception"; + public static final java.lang.String EVENT_MSG_DECODE_FAIL = "msg.decode.failure"; + public static final java.lang.String EVENT_MSG_METHOD_INVALID = "msg.method.invalid"; + public static final java.lang.String EVENT_MSG_PATH_INVALID = "msg.path.invalid"; + public static final java.lang.String EVENT_MSG_CONTYPE_INVALID = "msg.content.invalid"; + public static final java.lang.String EVENT_MSG_GROUPID_MISSING = "msg.groupid.missing"; + public static final java.lang.String EVENT_MSG_STREAMID_MISSING = "msg.streamid.missing"; + public static final java.lang.String EVENT_MSG_BODY_MISSING = "msg.body.missing"; + public static final java.lang.String EVENT_MSG_BODY_BLANK = "msg.body.blank"; + public static final java.lang.String EVENT_MSG_BODY_OVERMAX = "msg.body.overmax"; + public static final java.lang.String EVENT_MSG_HB_SUCCESS = "msg.hb.success"; + public static final java.lang.String EVENT_MSG_POST_SUCCESS = "msg.post.success"; + public static final java.lang.String EVENT_MSG_POST_FAILURE = "msg.post.failure"; + public static final java.lang.String EVENT_EMPTY = "socketmsg.empty"; public static final java.lang.String EVENT_OVERMAXLEN = "socketmsg.overmaxlen"; public static final java.lang.String EVENT_NOTEQUALLEN = "socketmsg.notequallen"; @@ -46,20 +61,6 @@ public class StatConstants { public static final java.lang.String EVENT_POST_SUCCESS = "socketmsg.success"; public static final java.lang.String EVENT_POST_DROPPED = "socketmsg.dropped"; // http - public static final java.lang.String EVENT_HTTP_DECFAIL = "httpmsg.decfailure"; - public static final java.lang.String EVENT_HTTP_INVALIDMETHOD = "httpmsg.invmethod"; - public static final java.lang.String EVENT_HTTP_BLANKURI = "httpmsg.blankuri"; - public static final java.lang.String EVENT_HTTP_URIDECFAIL = "httpmsg.decurifail"; - public static final java.lang.String EVENT_HTTP_INVALIDURI = "httpmsg.invuri"; - public static final java.lang.String EVENT_HTTP_ILLEGAL_VISIT = "httpmsg.illegal"; - public static final java.lang.String EVENT_HTTP_HB_SUCCESS = "httphb.success"; - public static final java.lang.String EVENT_HTTP_WITHOUTGROUPID = "httpmsg.wogroupid"; - public static final java.lang.String EVENT_HTTP_WITHOUTSTREAMID = "httpmsg.wostreamid"; - public static final java.lang.String EVENT_HTTP_NOBODY = "httpmsg.nobody"; - public static final java.lang.String EVENT_HTTP_EMPTYBODY = "httpmsg.emptybody"; - public static final java.lang.String EVENT_HTTP_BODYOVERMAXLEN = "httpmsg.bodyovermax"; - public static final java.lang.String EVENT_HTTP_POST_SUCCESS = "httpmsg.success"; - public static final java.lang.String EVENT_HTTP_POST_DROPPED = "httpmsg.dropped"; public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid"; public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java index 26407038ee..80ef43e508 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java @@ -117,7 +117,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { return true; } } catch (Exception ex) { - log.error("reportHeartbeat failed for url {}", url, ex); + log.error("reportHeartbeat failed for url {}, exception message is {}", url, ex.getMessage()); } return false; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java index c4eddac7c5..5fb13fb7a3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java @@ -107,8 +107,7 @@ public class AuditUtils { */ public static long getLogTime(Event event) { if (event != null) { - Map<String, String> headers = event.getHeaders(); - return getLogTime(headers); + return getLogTime(event.getHeaders()); } return System.currentTimeMillis(); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java index 54da8c283b..0f6b4fcce1 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java @@ -106,16 +106,16 @@ public class TubeHandler implements MessageQueueHandler { } @Override - public void publishTopic(Set<String> topicSet) { - if (this.producer == null || topicSet == null || topicSet.isEmpty()) { + public void publishTopic(Set<String> newTopicSet) { + if (this.producer == null || newTopicSet == null || newTopicSet.isEmpty()) { return; } Set<String> published; try { - published = producer.publish(topicSet); - topicSet.addAll(published); + published = producer.publish(newTopicSet); + this.topicSet.addAll(newTopicSet); LOG.info("Publish topics to {}, need publish are {}, published are {}", - this.clusterName, topicSet, published); + this.clusterName, newTopicSet, published); } catch (Throwable e) { LOG.warn("Publish topics to {} failure", this.clusterName, e); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java index 356561ff20..fd9ae39caf 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java @@ -23,16 +23,20 @@ import org.apache.inlong.common.monitor.MonitorIndexExt; import org.apache.inlong.dataproxy.admin.ProxyServiceMBean; import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor; import org.apache.inlong.dataproxy.config.CommonConfigHolder; +import org.apache.inlong.dataproxy.config.ConfigManager; +import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; import org.apache.inlong.dataproxy.consts.AttrConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source2.httpMsg.InLongHttpMsgHandler; +import org.apache.inlong.dataproxy.utils.AddressUtils; import org.apache.inlong.dataproxy.utils.ConfStringUtils; import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder; import org.apache.inlong.sdk.commons.admin.AdminServiceRegister; import com.google.common.base.Preconditions; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; @@ -52,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; /** @@ -61,6 +66,7 @@ public abstract class BaseSource extends AbstractSource implements + ConfigUpdateCallback, ProxyServiceMBean, EventDrivenSource, Configurable { @@ -282,6 +288,34 @@ public abstract class BaseSource logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.getName()); } + @Override + public void update() { + // check current all links + if (ConfigManager.getInstance().needChkIllegalIP()) { + int cnt = 0; + Channel channel; + String strRemoteIP; + long startTime = System.currentTimeMillis(); + Iterator<Channel> iterator = allChannels.iterator(); + while (iterator.hasNext()) { + channel = iterator.next(); + strRemoteIP = AddressUtils.getChannelRemoteIP(channel); + if (strRemoteIP == null) { + continue; + } + if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) { + channel.disconnect(); + channel.close(); + allChannels.remove(channel); + cnt++; + logger.error(strRemoteIP + " is Illegal IP, so disconnect it !"); + } + } + logger.info("Source {} channel check, disconnects {} Illegal channels, waist {} ms", + getName(), cnt, (System.currentTimeMillis() - startTime)); + } + } + /** * get metricItemSet * diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java index d030eeb1a8..0594a8682b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java @@ -193,31 +193,31 @@ public class InLongMessageHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - // check max allowed connection count - if (source.getAllChannels().size() >= source.getMaxConnections()) { - source.fileMetricEventInc(StatConstants.EVENT_LINKS_OVERMAX); - ctx.channel().disconnect(); - ctx.channel().close(); - logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", - source.getName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections()); - return; - } // check illegal ip if (ConfigManager.getInstance().needChkIllegalIP()) { String strRemoteIp = AddressUtils.getChannelRemoteIP(ctx.channel()); if (strRemoteIp != null && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) { - source.fileMetricEventInc(StatConstants.EVENT_VISITIP_ILLEGAL); + source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL); ctx.channel().disconnect(); ctx.channel().close(); logger.error(strRemoteIp + " is Illegal IP, so refuse it !"); return; } } + // check max allowed connection count + if (source.getAllChannels().size() >= source.getMaxConnections()) { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX); + ctx.channel().disconnect(); + ctx.channel().close(); + logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", + source.getName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections()); + return; + } // add legal channel source.getAllChannels().add(ctx.channel()); ctx.fireChannelActive(); - source.fileMetricEventInc(StatConstants.EVENT_LINKS_IN); + source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN); logger.info("{} added new channel {}, current connections = {}, maxConnections = {}", source.getName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections()); } @@ -227,11 +227,12 @@ public class InLongMessageHandler extends ChannelInboundHandlerAdapter { logger.error("{} channel {} inactive", source.getName(), ctx.channel()); ctx.fireChannelInactive(); source.getAllChannels().remove(ctx.channel()); - source.fileMetricEventInc(StatConstants.EVENT_LINKS_OUT); + source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION); logger.error("{} channel {} throw exception", source.getName(), ctx.channel(), cause); ctx.fireExceptionCaught(cause); if (ctx.channel() != null) { @@ -242,7 +243,6 @@ public class InLongMessageHandler extends ChannelInboundHandlerAdapter { // } source.getAllChannels().remove(ctx.channel()); - source.fileMetricEventInc(StatConstants.EVENT_LINKS_EXCEPTION); } ctx.close(); } @@ -263,7 +263,7 @@ public class InLongMessageHandler extends ChannelInboundHandlerAdapter { } // check if the node is linked to the Manager. if (!ConfigManager.getInstance().isMqClusterReady()) { - source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY); + source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY); msgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY); responseV0Msg(channel, msgCodec, strBuff); return; @@ -418,7 +418,7 @@ public class InLongMessageHandler extends ChannelInboundHandlerAdapter { if (CommonConfigHolder.getInstance().isNoTopicAccept()) { topic = source.getDefTopic(); } else { - source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC); + source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING); source.addMetric(false, event.getBody().length, event); this.responsePackage(ctx, ProxySdk.ResultCode.ERR_ID_ERROR, packObject); return; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java index 73c96dd7b7..74cc4435c0 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java @@ -47,6 +47,7 @@ public class SimpleHttpSource extends BaseSource implements Configurable { public SimpleHttpSource() { super(); + ConfigManager.getInstance().regIPVisitConfigChgCallback(this); } @Override diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java index 6526bba84e..acda3cf4f7 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java @@ -18,15 +18,12 @@ package org.apache.inlong.dataproxy.source2; import org.apache.inlong.dataproxy.config.ConfigManager; -import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; -import org.apache.inlong.dataproxy.utils.AddressUtils; import org.apache.inlong.dataproxy.utils.ConfStringUtils; import org.apache.inlong.dataproxy.utils.EventLoopUtil; import com.google.common.base.Preconditions; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.flume.Context; @@ -35,12 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.util.Iterator; /** * Simple tcp source */ -public class SimpleTcpSource extends BaseSource implements Configurable, ConfigUpdateCallback { +public class SimpleTcpSource extends BaseSource implements Configurable { private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class); @@ -123,32 +119,4 @@ public class SimpleTcpSource extends BaseSource implements Configurable, ConfigU return SourceConstants.SRC_PROTOCOL_TYPE_TCP; } - @Override - public void update() { - // check current all links - if (ConfigManager.getInstance().needChkIllegalIP()) { - int cnt = 0; - Channel channel; - String strRemoteIP; - long startTime = System.currentTimeMillis(); - Iterator<Channel> iterator = allChannels.iterator(); - while (iterator.hasNext()) { - channel = iterator.next(); - strRemoteIP = AddressUtils.getChannelRemoteIP(channel); - if (strRemoteIP == null) { - continue; - } - if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) { - channel.disconnect(); - channel.close(); - allChannels.remove(channel); - cnt++; - logger.error(strRemoteIP + " is Illegal IP, so disconnect it !"); - } - } - logger.info("Source {} channel check, disconnects {} Illegal channels, waist {} ms", - getName(), cnt, (System.currentTimeMillis() - startTime)); - } - } - } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java index ce33c75d45..4f636f8173 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java @@ -21,11 +21,10 @@ import org.apache.inlong.common.enums.DataProxyErrCode; import org.apache.inlong.common.monitor.LogCounter; import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.common.msg.InLongMsg; -import org.apache.inlong.common.util.NetworkUtils; -import org.apache.inlong.dataproxy.config.CommonConfigHolder; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttrConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; +import org.apache.inlong.dataproxy.consts.HttpAttrConst; import org.apache.inlong.dataproxy.consts.StatConstants; import org.apache.inlong.dataproxy.source2.BaseSource; import org.apache.inlong.dataproxy.utils.AddressUtils; @@ -42,23 +41,25 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; +import org.apache.commons.codec.CharEncoding; +import org.apache.commons.codec.Charsets; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import static io.netty.handler.codec.http.HttpUtil.is100ContinueExpected; @@ -68,13 +69,9 @@ import static io.netty.handler.codec.http.HttpUtil.is100ContinueExpected; */ public class InLongHttpMsgHandler extends SimpleChannelInboundHandler<FullHttpRequest> { - private static final String hbSrvUrl = "/dataproxy/heartbeat"; - private static final String msgSrvUrl = "/dataproxy/message"; - private static final Logger logger = LoggerFactory.getLogger(InLongHttpMsgHandler.class); // log print count private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000); - private final BaseSource source; /** @@ -88,120 +85,203 @@ public class InLongHttpMsgHandler extends SimpleChannelInboundHandler<FullHttpRe @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { - // check request decode result - if (!req.decoderResult().isSuccess()) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_DECFAIL); - sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Decode message failure!"); - return; - } - // check request method - if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_INVALIDMETHOD); - sendErrorMsg(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, "Only support Get and Post methods"); - return; - } // process 100-continue request if (is100ContinueExpected(req)) { ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); } - // get requested service - String reqUri = req.uri(); - if (StringUtils.isBlank(reqUri)) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_BLANKURI); - sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Uri is blank!"); - return; - } - try { - reqUri = URLDecoder.decode(reqUri, "UTF-8"); - } catch (UnsupportedEncodingException e) { - try { - reqUri = URLDecoder.decode(reqUri, "ISO-8859-1"); - } catch (UnsupportedEncodingException e1) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_URIDECFAIL); - sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Decode uri failure!"); - return; - } - } - // check requested service url - if (!reqUri.startsWith(hbSrvUrl) || !reqUri.startsWith(msgSrvUrl)) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_INVALIDURI); - sendErrorMsg(ctx, HttpResponseStatus.NOT_IMPLEMENTED, "Not supported uri!"); - return; - } // get current time and clientIP - long msgRcvTime = System.currentTimeMillis(); - String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel()); - // check illegal ip - if (ConfigManager.getInstance().needChkIllegalIP() - && ConfigManager.getInstance().isIllegalIP(clientIp)) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_ILLEGAL_VISIT); - sendResponse(ctx, DataProxyErrCode.ILLEGAL_VISIT_IP, true); + final long msgRcvTime = System.currentTimeMillis(); + final String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel()); + // check request decode result + if (!req.decoderResult().isSuccess()) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_DECODE_FAIL); + sendErrorMsg(ctx, DataProxyErrCode.HTTP_DECODE_REQ_FAILURE); return; } // check service status. if (source.isRejectService()) { source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED); - sendResponse(ctx, DataProxyErrCode.SERVICE_CLOSED, true); + sendErrorMsg(ctx, DataProxyErrCode.SERVICE_CLOSED); return; } // check sink service status if (!ConfigManager.getInstance().isMqClusterReady()) { - source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY); - sendResponse(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY, true); + source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY); + sendErrorMsg(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY); return; } + // check request method + if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_METHOD_INVALID); + sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_METHOD, + "Only support [" + HttpMethod.GET.name() + ", " + + HttpMethod.POST.name() + "] methods"); + return; + } + // parse request uri + QueryStringDecoder uriDecoder = + new QueryStringDecoder(req.uri(), Charsets.toCharset(CharEncoding.UTF_8)); + // check requested service url + if (!HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path()) + && !HttpAttrConst.KEY_SRV_URL_REPORT_MSG.equals(uriDecoder.path())) { + if (!HttpAttrConst.KEY_URL_FAVICON_ICON.equals(uriDecoder.path())) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_PATH_INVALID); + sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_SERVICE_URI, + "Only support [" + HttpAttrConst.KEY_SRV_URL_HEARTBEAT + ", " + + HttpAttrConst.KEY_SRV_URL_REPORT_MSG + "] paths!"); + } + return; + } + // get connection status + boolean closeConnection = isCloseConnection(req); // process hb service - if (reqUri.startsWith(hbSrvUrl)) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_HB_SUCCESS); - sendResponse(ctx, DataProxyErrCode.SUCCESS, checkClose(req)); + if (HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_HB_SUCCESS); + sendResponse(ctx, closeConnection); return; } + // get request attributes + final Map<String, String> reqAttrs = new HashMap<>(); + getAttrsFromDecoder(uriDecoder, reqAttrs); + if (req.method() == HttpMethod.POST) { + // check and get content value + String cntLengthStr = req.headers().get(HttpHeaderNames.CONTENT_LENGTH); + if (StringUtils.isNotBlank(cntLengthStr) && NumberUtils.toInt(cntLengthStr, 0) > 0) { + String cntType = req.headers().get(HttpHeaderNames.CONTENT_TYPE); + if (StringUtils.isNotBlank(cntType)) { + cntType = cntType.trim(); + if (!cntType.equalsIgnoreCase( + HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_CONTYPE_INVALID); + sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_CONTENT_TYPE, + "Only support [" + HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED + + "] content type!"); + return; + } + String cntStr = req.content().toString(Charsets.toCharset(CharEncoding.UTF_8)); + QueryStringDecoder cntDecoder = new QueryStringDecoder(cntStr, false); + getAttrsFromDecoder(cntDecoder, reqAttrs); + } + } + } // process message request - processMessage(ctx, req, msgRcvTime, clientIp); + processMessage(ctx, reqAttrs, msgRcvTime, clientIp, closeConnection); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); } - private boolean processMessage(ChannelHandlerContext ctx, FullHttpRequest req, - long msgRcvTime, String clientIp) throws Exception { - // get and check groupId - HttpHeaders headers = req.headers(); + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN); + // check illegal ip + if (ConfigManager.getInstance().needChkIllegalIP()) { + String strRemoteIp = AddressUtils.getChannelRemoteIP(ctx.channel()); + if (strRemoteIp != null + && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL); + ctx.channel().disconnect(); + ctx.channel().close(); + if (logCounter.shouldPrint()) { + logger.error(strRemoteIp + " is Illegal IP, so refuse it !"); + } + return; + } + } + // check max allowed connection count + if (source.getAllChannels().size() >= source.getMaxConnections()) { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX); + ctx.channel().disconnect(); + ctx.channel().close(); + if (logCounter.shouldPrint()) { + logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", + source.getName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections()); + } + return; + } + // add legal channel + source.getAllChannels().add(ctx.channel()); + ctx.fireChannelActive(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT); + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION); + if (cause instanceof IOException) { + if (logCounter.shouldPrint()) { + logger.warn("{} received an IOException from channel {}", + source.getName(), ctx.channel(), cause); + } + ctx.close(); + } else { + sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR, + "Process message failure: " + cause.getMessage()); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { + ctx.close(); + } + } + + /** + * Process http report message + * + * @param ctx the handler context + * @param reqAttrs the attributes + * @param msgRcvTime the message received time + * @param clientIp the report ip + * @param isCloseCon whether close connection + * + * @return whether process success + */ + private boolean processMessage(ChannelHandlerContext ctx, Map<String, String> reqAttrs, + long msgRcvTime, String clientIp, boolean isCloseCon) throws Exception { StringBuilder strBuff = new StringBuilder(512); - String groupId = headers.get(AttributeConstants.GROUP_ID); - if (StringUtils.isEmpty(groupId)) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_WITHOUTGROUPID); + String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID); + if (StringUtils.isBlank(groupId)) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_GROUPID_MISSING); sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(), - strBuff.append("Field ").append(AttributeConstants.GROUP_ID) + strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID) .append(" must exist and not blank!").toString(), - checkClose(req)); + isCloseCon); return false; } // get and check streamId - String streamId = headers.get(AttributeConstants.STREAM_ID); - if (StringUtils.isEmpty(streamId)) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_WITHOUTSTREAMID); + String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID); + if (StringUtils.isBlank(streamId)) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_STREAMID_MISSING); sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(), - strBuff.append("Field ").append(AttributeConstants.STREAM_ID) + strBuff.append("Field ").append(HttpAttrConst.KEY_STREAM_ID) .append(" must exist and not blank!").toString(), - checkClose(req)); + isCloseCon); return false; } // get and check topicName String topicName = ConfigManager.getInstance().getTopicName(groupId, streamId); if (StringUtils.isBlank(topicName)) { - if (CommonConfigHolder.getInstance().isNoTopicAccept()) { - source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC); - sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(), - strBuff.append("Topic is null for ").append(AttributeConstants.GROUP_ID) - .append("(").append(groupId).append("),") - .append(AttributeConstants.STREAM_ID) - .append("(,").append(streamId).append(")").toString(), - checkClose(req)); - return false; - } - topicName = source.getDefTopic(); + source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING); + sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(), + strBuff.append("Topic not configured for ").append(HttpAttrConst.KEY_STREAM_ID) + .append("(").append(groupId).append("),") + .append(HttpAttrConst.KEY_STREAM_ID) + .append("(,").append(streamId).append(")").toString(), + isCloseCon); + return false; } // get and check dt long dataTime = msgRcvTime; - String dt = headers.get(AttributeConstants.DATA_TIME); + String dt = reqAttrs.get(HttpAttrConst.KEY_DATA_TIME); if (StringUtils.isNotEmpty(dt)) { try { dataTime = Long.parseLong(dt); @@ -209,45 +289,40 @@ public class InLongHttpMsgHandler extends SimpleChannelInboundHandler<FullHttpRe // } } - // get char set - String charset = headers.get(AttrConstants.CHARSET); - if (StringUtils.isBlank(charset)) { - charset = AttrConstants.CHARSET; - } // get and check body - String body = headers.get(AttrConstants.BODY); + String body = reqAttrs.get(HttpAttrConst.KEY_BODY); if (StringUtils.isBlank(body)) { if (body == null) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_NOBODY); + source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_MISSING); sendResponse(ctx, DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(), - strBuff.append("Field ").append(AttrConstants.BODY) + strBuff.append("Field ").append(HttpAttrConst.KEY_BODY) .append(" is not exist!").toString(), - checkClose(req)); + isCloseCon); } else { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_EMPTYBODY); + source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_BLANK); sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(), - strBuff.append("Field ").append(AttrConstants.BODY) + strBuff.append("Field ").append(HttpAttrConst.KEY_BODY) .append(" is Blank!").toString(), - checkClose(req)); + isCloseCon); } return false; } if (body.length() > source.getMaxMsgLength()) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_BODYOVERMAXLEN); + source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_OVERMAX); sendResponse(ctx, DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(), - strBuff.append("Error msg, the body length(").append(body.length()) + strBuff.append("Error msg, the ").append(HttpAttrConst.KEY_BODY) + .append(" length(").append(body.length()) .append(") is bigger than allowed length(") .append(source.getMaxMsgLength()).append(")").toString(), - checkClose(req)); + isCloseCon); return false; } // get message count - String strMsgCount = headers.get(AttributeConstants.MESSAGE_COUNT); - int intMsgCnt = NumberUtils.toInt(strMsgCount, 1); - strMsgCount = String.valueOf(intMsgCnt); + int intMsgCnt = NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1); + String strMsgCount = String.valueOf(intMsgCnt); // build message attributes InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed()); - strBuff.append("&groupId=").append(groupId) + strBuff.append("groupId=").append(groupId) .append("&streamId=").append(streamId) .append("&dt=").append(dataTime) .append("&NodeIP=").append(clientIp) @@ -255,7 +330,7 @@ public class InLongHttpMsgHandler extends SimpleChannelInboundHandler<FullHttpRe .append("&rt=").append(msgRcvTime) .append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME) .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime); - inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset)); + inLongMsg.addMsg(strBuff.toString(), body.getBytes(HttpAttrConst.VAL_DEF_CHARSET)); byte[] inlongMsgData = inLongMsg.buildArray(); inLongMsg.reset(); strBuff.delete(0, strBuff.length()); @@ -270,7 +345,6 @@ public class InLongHttpMsgHandler extends SimpleChannelInboundHandler<FullHttpRe eventHeaders.put(ConfigConstants.MSG_ENCODE_VER, InLongMsgVer.INLONG_V0.getName()); eventHeaders.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); Event event = EventBuilder.withBody(inlongMsgData, eventHeaders); - String msgProcType = "b2b"; // build metric data item dataTime = dataTime / 1000 / 60 / 10; dataTime = dataTime * 1000 * 60 * 10; @@ -278,77 +352,84 @@ public class InLongHttpMsgHandler extends SimpleChannelInboundHandler<FullHttpRe .append(AttrConstants.SEP_HASHTAG).append(topicName) .append(AttrConstants.SEP_HASHTAG).append(streamId) .append(AttrConstants.SEP_HASHTAG).append(clientIp) - .append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp()) - .append(AttrConstants.SEP_HASHTAG).append(msgProcType) + .append(AttrConstants.SEP_HASHTAG).append(source.getSrcHost()) + .append(AttrConstants.SEP_HASHTAG).append("b2b") .append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime)) .append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime)); try { source.getChannelProcessor().processEvent(event); - source.fileMetricEventInc(StatConstants.EVENT_HTTP_POST_SUCCESS); - source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1, body.length(), 0); + source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_SUCCESS); + source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1, event.getBody().length, 0); strBuff.delete(0, strBuff.length()); source.addMetric(true, event.getBody().length, event); - sendResponse(ctx, DataProxyErrCode.SUCCESS, false); + sendResponse(ctx, isCloseCon); return true; - } catch (ChannelException ex) { - source.fileMetricEventInc(StatConstants.EVENT_HTTP_POST_DROPPED); + } catch (Throwable ex) { + source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_FAILURE); source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0, intMsgCnt); source.addMetric(false, event.getBody().length, event); strBuff.delete(0, strBuff.length()); - sendResponse(ctx, DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), - strBuff.append("Put event to channel failure: ").append(ex.getMessage()) - .toString(), - false); + sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR, + strBuff.append("Put event to channel failure: ").append(ex.getMessage()).toString()); if (logCounter.shouldPrint()) { - logger.error("Error write event to channel, data will discard.", ex); + logger.error("Error writing HTTP event to channel failure.", ex); } return false; } } - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel()); - logger.error("Http process client={} error, cause:{}, msg:{}", - cause, clientIp, cause.getMessage()); - sendErrorMsg(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, - "Process message failure: " + cause.getMessage()); - ctx.close(); + /** + * Get attributes from decoder + * + * @param decoder the decode object + * @param reqAttrs the attributes + */ + private void getAttrsFromDecoder(QueryStringDecoder decoder, Map<String, String> reqAttrs) { + for (Map.Entry<String, List<String>> attr : decoder.parameters().entrySet()) { + if (attr == null + || attr.getKey() == null + || attr.getValue() == null + || attr.getValue().isEmpty()) { + continue; + } + reqAttrs.put(attr.getKey(), attr.getValue().get(0)); + } } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { - ctx.close(); + private boolean isCloseConnection(FullHttpRequest req) { + String connStatus = req.headers().get(HttpHeaderNames.CONNECTION); + if (connStatus == null) { + return false; } + connStatus = connStatus.trim(); + return connStatus.equalsIgnoreCase(HttpHeaderValues.CLOSE.toString()); } - private boolean checkClose(FullHttpRequest req) { - String connStatus = req.headers().get("Connection"); - return !StringUtils.isBlank(connStatus) && "close".equalsIgnoreCase(connStatus); + private void sendErrorMsg(ChannelHandlerContext ctx, DataProxyErrCode errCodeObj) { + sendResponse(ctx, errCodeObj.getErrCode(), errCodeObj.getErrMsg(), true); } - private void sendErrorMsg(ChannelHandlerContext ctx, HttpResponseStatus status, String errMsg) { - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, - Unpooled.copiedBuffer("Failure: " + status + ", " - + errMsg + "\r\n", CharsetUtil.UTF_8)); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); - ctx.writeAndFlush(response).addListener(new SendResultListener(true)); + private void sendErrorMsg(ChannelHandlerContext ctx, DataProxyErrCode errCodeObj, String errMsg) { + sendResponse(ctx, errCodeObj.getErrCode(), errMsg, true); } - private void sendResponse(ChannelHandlerContext ctx, DataProxyErrCode errCodeObj, boolean isClose) { - sendResponse(ctx, errCodeObj.getErrCode(), errCodeObj.getErrMsg(), isClose); - + private void sendResponse(ChannelHandlerContext ctx, boolean isClose) { + sendResponse(ctx, DataProxyErrCode.SUCCESS.getErrCode(), DataProxyErrCode.SUCCESS.getErrMsg(), isClose); } private void sendResponse(ChannelHandlerContext ctx, int errCode, String errMsg, boolean isClose) { + if (ctx == null || ctx.channel() == null) { + return; + } + if (!ctx.channel().isWritable()) { + source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE); + if (logCounter.shouldPrint()) { + logger.warn("Send msg but channel full, channel={}", ctx.channel()); + } + return; + } FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json;charset=utf-8"); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpAttrConst.RET_CNT_TYPE); StringBuilder builder = new StringBuilder().append("{\"code\":\"").append(errCode) .append("\",\"msg\":\"").append(errMsg).append("\"}"); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java index b455817554..027e33e74f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java @@ -268,7 +268,7 @@ public class CodecBinMsg extends AbsV0MsgCodec { confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum); if (StringUtils.isBlank(confGroupId)) { if (configManager.isGroupIdNumConfigEmpty()) { - source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY); + source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY); this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY; this.errMsg = "GroupId-Mapping configuration is null"; } else { @@ -300,7 +300,7 @@ public class CodecBinMsg extends AbsV0MsgCodec { confStreamId = configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum); if (StringUtils.isBlank(confStreamId)) { if (configManager.isStreamIdNumConfigEmpty()) { - source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY); + source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY); this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY; this.errMsg = "StreamId-Mapping configuration is null"; } else { @@ -338,7 +338,7 @@ public class CodecBinMsg extends AbsV0MsgCodec { if (CommonConfigHolder.getInstance().isNoTopicAccept()) { this.topicName = source.getDefTopic(); } else { - source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC); + source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING); this.errCode = DataProxyErrCode.TOPIC_IS_BLANK; this.errMsg = String.format("Topic is null for inlongGroupId=(%s), inlongStreamId=(%s)", this.groupId, this.streamId); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java index ddda70fde1..f02752ca10 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java @@ -148,7 +148,7 @@ public class CodecTextMsg extends AbsV0MsgCodec { if (CommonConfigHolder.getInstance().isNoTopicAccept()) { tmpTopicName = source.getDefTopic(); } else { - source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC); + source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING); this.errCode = DataProxyErrCode.TOPIC_IS_BLANK; this.errMsg = String.format( "Topic is null for inlongGroupId=(%s), inlongStreamId=(%s)", tmpGroupId, tmpStreamId);