This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ed96c248 [INLONG-8212][DataProxy] Improve HTTP related message 
handling (#8213)
55ed96c248 is described below

commit 55ed96c248f09f10de095750ec3f6a72ac3fc0a5
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Jun 12 09:47:08 2023 +0800

    [INLONG-8212][DataProxy] Improve HTTP related message handling (#8213)
---
 .../inlong/common/enums/DataProxyErrCode.java      |   9 +-
 .../inlong/dataproxy/config/ConfigManager.java     |  27 +-
 .../inlong/dataproxy/consts/HttpAttrConst.java     |  38 +++
 .../inlong/dataproxy/consts/StatConstants.java     |  45 +--
 .../dataproxy/heartbeat/HeartbeatManager.java      |   2 +-
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |   3 +-
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java |  10 +-
 .../inlong/dataproxy/source2/BaseSource.java       |  34 ++
 .../dataproxy/source2/InLongMessageHandler.java    |  30 +-
 .../inlong/dataproxy/source2/SimpleHttpSource.java |   1 +
 .../inlong/dataproxy/source2/SimpleTcpSource.java  |  34 +-
 .../source2/httpMsg/InLongHttpMsgHandler.java      | 379 +++++++++++++--------
 .../dataproxy/source2/v0msg/CodecBinMsg.java       |   6 +-
 .../dataproxy/source2/v0msg/CodecTextMsg.java      |   2 +-
 14 files changed, 378 insertions(+), 242 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 263af3678e..5caeb2d875 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -26,11 +26,18 @@ public enum DataProxyErrCode {
 
     SUCCESS(0, "Ok"),
 
-    SINK_SERVICE_UNREADY(1, "Service not ready"),
+    SINK_SERVICE_UNREADY(1, "Service sink not ready"),
     SERVICE_CLOSED(2, "Service closed"),
     CONF_SERVICE_UNREADY(3, "Configure Service not ready"),
     ILLEGAL_VISIT_IP(10, "Illegal visit ip"),
 
+    HTTP_DECODE_REQ_FAILURE(31, "Decode request failure"),
+    HTTP_UNSUPPORTED_METHOD(32, "Un-supported method"),
+    HTTP_REQ_URI_BLANK(33, "Request uri is blank"),
+    HTTP_DECODE_REQ_URI_FAILURE(34, "Decode uri failure"),
+    HTTP_UNSUPPORTED_SERVICE_URI(35, "Un-supported service uri"),
+    HTTP_UNSUPPORTED_CONTENT_TYPE(36, "Un-supported content type"),
+
     FIELD_VALUE_NOT_EQUAL(95, "Field value not equal"),
     UNCOMPRESS_DATA_ERROR(96, "Uncompress data error"),
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 2507419686..af626660ba 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -317,7 +317,7 @@ public class ConfigManager {
             }
             List<String> managerIpList = 
CommonConfigHolder.getInstance().getManagerHosts();
             if (managerIpList == null || managerIpList.size() == 0) {
-                LOG.error("Found remote manager ip list are empty, can't quest 
remote configure!");
+                LOG.error("Found manager ip list are empty, can't quest remote 
configure!");
                 return;
             }
             int managerIpSize = managerIpList.size();
@@ -333,10 +333,11 @@ public class ConfigManager {
          * reloadDataProxyConfig
          */
         private boolean reloadDataProxyConfig(String clusterName, String 
clusterTag, String host) {
+            String url = null;
             HttpPost httpPost = null;
             try {
-                String url =
-                        "http://"; + host + ConfigConstants.MANAGER_PATH + 
ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH;
+                url = "http://"; + host + ConfigConstants.MANAGER_PATH
+                        + ConfigConstants.MANAGER_GET_ALL_CONFIG_PATH;
                 httpPost = new HttpPost(url);
                 httpPost.addHeader(HttpHeaders.CONNECTION, "close");
                 httpPost.addHeader(HttpHeaders.AUTHORIZATION, 
AuthUtils.genBasicAuth());
@@ -349,11 +350,12 @@ public class ConfigManager {
                 }
                 httpPost.setEntity(HttpUtils.getEntity(request));
                 // request with post
-                LOG.info("Start to request {} to get config info with params 
{}", url, request);
+                LOG.info("Start to request {} to get config info, with params 
{}", url, request);
                 CloseableHttpResponse response = httpClient.execute(httpPost);
                 String returnStr = EntityUtils.toString(response.getEntity());
                 if (response.getStatusLine().getStatusCode() != 200) {
-                    LOG.info("Failed to request {}, the response is {}", url, 
returnStr);
+                    LOG.warn("Failed to request {}, with params {}, the 
response is {}",
+                            url, request, returnStr);
                     return false;
                 }
                 LOG.info("End to request {} to get config info:{}", url, 
returnStr);
@@ -361,28 +363,33 @@ public class ConfigManager {
                 DataProxyConfigResponse proxyResponse =
                         gson.fromJson(returnStr, 
DataProxyConfigResponse.class);
                 if (!proxyResponse.isResult()) {
-                    LOG.info("Fail to get config info from url:{}, error code 
is {}", url, proxyResponse.getErrCode());
+                    LOG.warn("Fail to get config from url {}, with params {}, 
error code is {}",
+                            url, request, proxyResponse.getErrCode());
                     return false;
                 }
                 if (proxyResponse.getErrCode() != 
DataProxyConfigResponse.SUCC) {
-                    LOG.info("Get config info from url:{}, error code is {}", 
url, proxyResponse.getErrCode());
+                    if (proxyResponse.getErrCode() != 
DataProxyConfigResponse.NOUPDATE) {
+                        LOG.warn("Get config failure from url:{}, with params 
{}, error code is {}",
+                                url, request, proxyResponse.getErrCode());
+                    }
                     return true;
                 }
                 DataProxyCluster dataProxyCluster = proxyResponse.getData();
                 if (dataProxyCluster == null
                         || dataProxyCluster.getCacheClusterSet() == null
                         || 
dataProxyCluster.getCacheClusterSet().getCacheClusters().isEmpty()) {
-                    LOG.info("Get config info from url:{}, found cluster set 
is empty!", url);
+                    LOG.warn("Get config empty from url:{}, with params {}, 
return:{}, cluster is empty!",
+                            url, request, returnStr);
                     return true;
                 }
                 // update meta configure
                 if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(), 
returnStr)) {
                     ConfigManager.handshakeManagerOk.set(true);
-                    LOG.info("Get meta config info and set handshake status is 
ok!");
+                    LOG.info("Get config success from manager and updated, set 
handshake status is ok!");
                 }
                 return true;
             } catch (Throwable ex) {
-                LOG.error("Request remote manager failure", ex);
+                LOG.error("Request manager {} failure, throw exception", url, 
ex);
                 return false;
             } finally {
                 if (httpPost != null) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java
new file mode 100644
index 0000000000..045696100b
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/HttpAttrConst.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.consts;
+
+/**
+ * Http Attribute constants
+ */
+public class HttpAttrConst {
+
+    public static final String KEY_SRV_URL_HEARTBEAT = "/dataproxy/heartbeat";
+    public static final String KEY_SRV_URL_REPORT_MSG = "/dataproxy/message";
+    public static final String KEY_URL_FAVICON_ICON = "/favicon.ico";
+
+    public static final String KEY_GROUP_ID = "groupId";
+    public static final String KEY_STREAM_ID = "streamId";
+    public static final String KEY_BODY = "body";
+    public static final String KEY_DATA_TIME = "dt";
+    public static final String KEY_MESSAGE_COUNT = "cnt";
+    public static final String KEY_CHARSET = "charset";
+    public static final String VAL_DEF_CHARSET = "UTF-8";
+    public static final String RET_CNT_TYPE = "application/json;charset=utf-8";
+
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index c445cadd31..1d262a8112 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -19,15 +19,30 @@ package org.apache.inlong.dataproxy.consts;
 
 public class StatConstants {
 
-    public static final java.lang.String EVENT_SERVICE_CLOSED = 
"source.srvclosed";
-    public static final java.lang.String EVENT_SERVICE_UNREADY = 
"sink.unready";
-    public static final java.lang.String EVENT_VISITIP_ILLEGAL = 
"links.illegal";
-    public static final java.lang.String EVENT_NOTOPIC = "config.notopic";
+    public static final java.lang.String EVENT_SERVICE_CLOSED = 
"service.closed";
+    public static final java.lang.String EVENT_SERVICE_SINK_UNREADY = 
"service.sink.unready";
+    // visit
+    public static final java.lang.String EVENT_VISIT_ILLEGAL = "visit.illegal";
+    public static final java.lang.String EVENT_VISIT_OVERMAX = "visit.overmax";
+    public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin";
+    public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout";
+    public static final java.lang.String EVENT_VISIT_EXCEPTION = 
"visit.exception";
+    // configure
+    public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING = 
"config.topic.missing";
     // source
-    public static final java.lang.String EVENT_LINKS_OVERMAX = "links.overmax";
-    public static final java.lang.String EVENT_LINKS_IN = "links.linkin";
-    public static final java.lang.String EVENT_LINKS_OUT = "links.linkout";
-    public static final java.lang.String EVENT_LINKS_EXCEPTION = 
"links.exception";
+    public static final java.lang.String EVENT_MSG_DECODE_FAIL = 
"msg.decode.failure";
+    public static final java.lang.String EVENT_MSG_METHOD_INVALID = 
"msg.method.invalid";
+    public static final java.lang.String EVENT_MSG_PATH_INVALID = 
"msg.path.invalid";
+    public static final java.lang.String EVENT_MSG_CONTYPE_INVALID = 
"msg.content.invalid";
+    public static final java.lang.String EVENT_MSG_GROUPID_MISSING = 
"msg.groupid.missing";
+    public static final java.lang.String EVENT_MSG_STREAMID_MISSING = 
"msg.streamid.missing";
+    public static final java.lang.String EVENT_MSG_BODY_MISSING = 
"msg.body.missing";
+    public static final java.lang.String EVENT_MSG_BODY_BLANK = 
"msg.body.blank";
+    public static final java.lang.String EVENT_MSG_BODY_OVERMAX = 
"msg.body.overmax";
+    public static final java.lang.String EVENT_MSG_HB_SUCCESS = 
"msg.hb.success";
+    public static final java.lang.String EVENT_MSG_POST_SUCCESS = 
"msg.post.success";
+    public static final java.lang.String EVENT_MSG_POST_FAILURE = 
"msg.post.failure";
+
     public static final java.lang.String EVENT_EMPTY = "socketmsg.empty";
     public static final java.lang.String EVENT_OVERMAXLEN = 
"socketmsg.overmaxlen";
     public static final java.lang.String EVENT_NOTEQUALLEN = 
"socketmsg.notequallen";
@@ -46,20 +61,6 @@ public class StatConstants {
     public static final java.lang.String EVENT_POST_SUCCESS = 
"socketmsg.success";
     public static final java.lang.String EVENT_POST_DROPPED = 
"socketmsg.dropped";
     // http
-    public static final java.lang.String EVENT_HTTP_DECFAIL = 
"httpmsg.decfailure";
-    public static final java.lang.String EVENT_HTTP_INVALIDMETHOD = 
"httpmsg.invmethod";
-    public static final java.lang.String EVENT_HTTP_BLANKURI = 
"httpmsg.blankuri";
-    public static final java.lang.String EVENT_HTTP_URIDECFAIL = 
"httpmsg.decurifail";
-    public static final java.lang.String EVENT_HTTP_INVALIDURI = 
"httpmsg.invuri";
-    public static final java.lang.String EVENT_HTTP_ILLEGAL_VISIT = 
"httpmsg.illegal";
-    public static final java.lang.String EVENT_HTTP_HB_SUCCESS = 
"httphb.success";
-    public static final java.lang.String EVENT_HTTP_WITHOUTGROUPID = 
"httpmsg.wogroupid";
-    public static final java.lang.String EVENT_HTTP_WITHOUTSTREAMID = 
"httpmsg.wostreamid";
-    public static final java.lang.String EVENT_HTTP_NOBODY = "httpmsg.nobody";
-    public static final java.lang.String EVENT_HTTP_EMPTYBODY = 
"httpmsg.emptybody";
-    public static final java.lang.String EVENT_HTTP_BODYOVERMAXLEN = 
"httpmsg.bodyovermax";
-    public static final java.lang.String EVENT_HTTP_POST_SUCCESS = 
"httpmsg.success";
-    public static final java.lang.String EVENT_HTTP_POST_DROPPED = 
"httpmsg.dropped";
 
     public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
     public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 26407038ee..80ef43e508 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -117,7 +117,7 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
                 return true;
             }
         } catch (Exception ex) {
-            log.error("reportHeartbeat failed for url {}", url, ex);
+            log.error("reportHeartbeat failed for url {}, exception message is 
{}", url, ex.getMessage());
         }
         return false;
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index c4eddac7c5..5fb13fb7a3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -107,8 +107,7 @@ public class AuditUtils {
      */
     public static long getLogTime(Event event) {
         if (event != null) {
-            Map<String, String> headers = event.getHeaders();
-            return getLogTime(headers);
+            return getLogTime(event.getHeaders());
         }
         return System.currentTimeMillis();
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 54da8c283b..0f6b4fcce1 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -106,16 +106,16 @@ public class TubeHandler implements MessageQueueHandler {
     }
 
     @Override
-    public void publishTopic(Set<String> topicSet) {
-        if (this.producer == null || topicSet == null || topicSet.isEmpty()) {
+    public void publishTopic(Set<String> newTopicSet) {
+        if (this.producer == null || newTopicSet == null || 
newTopicSet.isEmpty()) {
             return;
         }
         Set<String> published;
         try {
-            published = producer.publish(topicSet);
-            topicSet.addAll(published);
+            published = producer.publish(newTopicSet);
+            this.topicSet.addAll(newTopicSet);
             LOG.info("Publish topics to {}, need publish are {}, published are 
{}",
-                    this.clusterName, topicSet, published);
+                    this.clusterName, newTopicSet, published);
         } catch (Throwable e) {
             LOG.warn("Publish topics to {} failure", this.clusterName, e);
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index 356561ff20..fd9ae39caf 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -23,16 +23,20 @@ import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source2.httpMsg.InLongHttpMsgHandler;
+import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
 import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
 
 import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
@@ -52,6 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -61,6 +66,7 @@ public abstract class BaseSource
         extends
             AbstractSource
         implements
+            ConfigUpdateCallback,
             ProxyServiceMBean,
             EventDrivenSource,
             Configurable {
@@ -282,6 +288,34 @@ public abstract class BaseSource
         logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), 
this.getName());
     }
 
+    @Override
+    public void update() {
+        // check current all links
+        if (ConfigManager.getInstance().needChkIllegalIP()) {
+            int cnt = 0;
+            Channel channel;
+            String strRemoteIP;
+            long startTime = System.currentTimeMillis();
+            Iterator<Channel> iterator = allChannels.iterator();
+            while (iterator.hasNext()) {
+                channel = iterator.next();
+                strRemoteIP = AddressUtils.getChannelRemoteIP(channel);
+                if (strRemoteIP == null) {
+                    continue;
+                }
+                if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) {
+                    channel.disconnect();
+                    channel.close();
+                    allChannels.remove(channel);
+                    cnt++;
+                    logger.error(strRemoteIP + " is Illegal IP, so disconnect 
it !");
+                }
+            }
+            logger.info("Source {} channel check, disconnects {} Illegal 
channels, waist {} ms",
+                    getName(), cnt, (System.currentTimeMillis() - startTime));
+        }
+    }
+
     /**
      * get metricItemSet
      *
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index d030eeb1a8..0594a8682b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -193,31 +193,31 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        // check max allowed connection count
-        if (source.getAllChannels().size() >= source.getMaxConnections()) {
-            source.fileMetricEventInc(StatConstants.EVENT_LINKS_OVERMAX);
-            ctx.channel().disconnect();
-            ctx.channel().close();
-            logger.warn("{} refuse to connect = {} , connections = {}, 
maxConnections = {}",
-                    source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
-            return;
-        }
         // check illegal ip
         if (ConfigManager.getInstance().needChkIllegalIP()) {
             String strRemoteIp = 
AddressUtils.getChannelRemoteIP(ctx.channel());
             if (strRemoteIp != null
                     && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
-                source.fileMetricEventInc(StatConstants.EVENT_VISITIP_ILLEGAL);
+                source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
                 ctx.channel().disconnect();
                 ctx.channel().close();
                 logger.error(strRemoteIp + " is Illegal IP, so refuse it !");
                 return;
             }
         }
+        // check max allowed connection count
+        if (source.getAllChannels().size() >= source.getMaxConnections()) {
+            source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+            ctx.channel().disconnect();
+            ctx.channel().close();
+            logger.warn("{} refuse to connect = {} , connections = {}, 
maxConnections = {}",
+                    source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
+            return;
+        }
         // add legal channel
         source.getAllChannels().add(ctx.channel());
         ctx.fireChannelActive();
-        source.fileMetricEventInc(StatConstants.EVENT_LINKS_IN);
+        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
         logger.info("{} added new channel {}, current connections = {}, 
maxConnections = {}",
                 source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
     }
@@ -227,11 +227,12 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         logger.error("{} channel {} inactive", source.getName(), 
ctx.channel());
         ctx.fireChannelInactive();
         source.getAllChannels().remove(ctx.channel());
-        source.fileMetricEventInc(StatConstants.EVENT_LINKS_OUT);
+        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+        source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
         logger.error("{} channel {} throw exception", source.getName(), 
ctx.channel(), cause);
         ctx.fireExceptionCaught(cause);
         if (ctx.channel() != null) {
@@ -242,7 +243,6 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                 //
             }
             source.getAllChannels().remove(ctx.channel());
-            source.fileMetricEventInc(StatConstants.EVENT_LINKS_EXCEPTION);
         }
         ctx.close();
     }
@@ -263,7 +263,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
         }
         // check if the node is linked to the Manager.
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+            
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
             msgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY);
             responseV0Msg(channel, msgCodec, strBuff);
             return;
@@ -418,7 +418,7 @@ public class InLongMessageHandler extends 
ChannelInboundHandlerAdapter {
                 if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                     topic = source.getDefTopic();
                 } else {
-                    source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
+                    
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
                     source.addMetric(false, event.getBody().length, event);
                     this.responsePackage(ctx, 
ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
                     return;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
index 73c96dd7b7..74cc4435c0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
@@ -47,6 +47,7 @@ public class SimpleHttpSource extends BaseSource implements 
Configurable {
 
     public SimpleHttpSource() {
         super();
+        ConfigManager.getInstance().regIPVisitConfigChgCallback(this);
     }
 
     @Override
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
index 6526bba84e..acda3cf4f7 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
@@ -18,15 +18,12 @@
 package org.apache.inlong.dataproxy.source2;
 
 import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
-import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
 import org.apache.inlong.dataproxy.utils.EventLoopUtil;
 
 import com.google.common.base.Preconditions;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.flume.Context;
@@ -35,12 +32,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.Iterator;
 
 /**
  * Simple tcp source
  */
-public class SimpleTcpSource extends BaseSource implements Configurable, 
ConfigUpdateCallback {
+public class SimpleTcpSource extends BaseSource implements Configurable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SimpleTcpSource.class);
 
@@ -123,32 +119,4 @@ public class SimpleTcpSource extends BaseSource implements 
Configurable, ConfigU
         return SourceConstants.SRC_PROTOCOL_TYPE_TCP;
     }
 
-    @Override
-    public void update() {
-        // check current all links
-        if (ConfigManager.getInstance().needChkIllegalIP()) {
-            int cnt = 0;
-            Channel channel;
-            String strRemoteIP;
-            long startTime = System.currentTimeMillis();
-            Iterator<Channel> iterator = allChannels.iterator();
-            while (iterator.hasNext()) {
-                channel = iterator.next();
-                strRemoteIP = AddressUtils.getChannelRemoteIP(channel);
-                if (strRemoteIP == null) {
-                    continue;
-                }
-                if (ConfigManager.getInstance().isIllegalIP(strRemoteIP)) {
-                    channel.disconnect();
-                    channel.close();
-                    allChannels.remove(channel);
-                    cnt++;
-                    logger.error(strRemoteIP + " is Illegal IP, so disconnect 
it !");
-                }
-            }
-            logger.info("Source {} channel check, disconnects {} Illegal 
channels, waist {} ms",
-                    getName(), cnt, (System.currentTimeMillis() - startTime));
-        }
-    }
-
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index ce33c75d45..4f636f8173 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -21,11 +21,10 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
-import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.consts.HttpAttrConst;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.source2.BaseSource;
 import org.apache.inlong.dataproxy.utils.AddressUtils;
@@ -42,23 +41,25 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.QueryStringDecoder;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.CharEncoding;
+import org.apache.commons.codec.Charsets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
+import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static io.netty.handler.codec.http.HttpUtil.is100ContinueExpected;
@@ -68,13 +69,9 @@ import static 
io.netty.handler.codec.http.HttpUtil.is100ContinueExpected;
  */
 public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRequest> {
 
-    private static final String hbSrvUrl = "/dataproxy/heartbeat";
-    private static final String msgSrvUrl = "/dataproxy/message";
-
     private static final Logger logger = 
LoggerFactory.getLogger(InLongHttpMsgHandler.class);
     // log print count
     private static final LogCounter logCounter = new LogCounter(10, 100000, 30 
* 1000);
-
     private final BaseSource source;
 
     /**
@@ -88,120 +85,203 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest 
req) throws Exception {
-        // check request decode result
-        if (!req.decoderResult().isSuccess()) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_DECFAIL);
-            sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Decode message 
failure!");
-            return;
-        }
-        // check request method
-        if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST) 
{
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_INVALIDMETHOD);
-            sendErrorMsg(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, "Only 
support Get and Post methods");
-            return;
-        }
         // process 100-continue request
         if (is100ContinueExpected(req)) {
             ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.CONTINUE));
         }
-        // get requested service
-        String reqUri = req.uri();
-        if (StringUtils.isBlank(reqUri)) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_BLANKURI);
-            sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Uri is blank!");
-            return;
-        }
-        try {
-            reqUri = URLDecoder.decode(reqUri, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            try {
-                reqUri = URLDecoder.decode(reqUri, "ISO-8859-1");
-            } catch (UnsupportedEncodingException e1) {
-                source.fileMetricEventInc(StatConstants.EVENT_HTTP_URIDECFAIL);
-                sendErrorMsg(ctx, HttpResponseStatus.BAD_REQUEST, "Decode uri 
failure!");
-                return;
-            }
-        }
-        // check requested service url
-        if (!reqUri.startsWith(hbSrvUrl) || !reqUri.startsWith(msgSrvUrl)) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_INVALIDURI);
-            sendErrorMsg(ctx, HttpResponseStatus.NOT_IMPLEMENTED, "Not 
supported uri!");
-            return;
-        }
         // get current time and clientIP
-        long msgRcvTime = System.currentTimeMillis();
-        String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
-        // check illegal ip
-        if (ConfigManager.getInstance().needChkIllegalIP()
-                && ConfigManager.getInstance().isIllegalIP(clientIp)) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_ILLEGAL_VISIT);
-            sendResponse(ctx, DataProxyErrCode.ILLEGAL_VISIT_IP, true);
+        final long msgRcvTime = System.currentTimeMillis();
+        final String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
+        // check request decode result
+        if (!req.decoderResult().isSuccess()) {
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_DECODE_FAIL);
+            sendErrorMsg(ctx, DataProxyErrCode.HTTP_DECODE_REQ_FAILURE);
             return;
         }
         // check service status.
         if (source.isRejectService()) {
             source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
-            sendResponse(ctx, DataProxyErrCode.SERVICE_CLOSED, true);
+            sendErrorMsg(ctx, DataProxyErrCode.SERVICE_CLOSED);
             return;
         }
         // check sink service status
         if (!ConfigManager.getInstance().isMqClusterReady()) {
-            source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
-            sendResponse(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY, true);
+            
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+            sendErrorMsg(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY);
             return;
         }
+        // check request method
+        if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST) 
{
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_METHOD_INVALID);
+            sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_METHOD,
+                    "Only support [" + HttpMethod.GET.name() + ", "
+                            + HttpMethod.POST.name() + "] methods");
+            return;
+        }
+        // parse request uri
+        QueryStringDecoder uriDecoder =
+                new QueryStringDecoder(req.uri(), 
Charsets.toCharset(CharEncoding.UTF_8));
+        // check requested service url
+        if (!HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())
+                && 
!HttpAttrConst.KEY_SRV_URL_REPORT_MSG.equals(uriDecoder.path())) {
+            if (!HttpAttrConst.KEY_URL_FAVICON_ICON.equals(uriDecoder.path())) 
{
+                
source.fileMetricEventInc(StatConstants.EVENT_MSG_PATH_INVALID);
+                sendErrorMsg(ctx, 
DataProxyErrCode.HTTP_UNSUPPORTED_SERVICE_URI,
+                        "Only support [" + HttpAttrConst.KEY_SRV_URL_HEARTBEAT 
+ ", "
+                                + HttpAttrConst.KEY_SRV_URL_REPORT_MSG + "] 
paths!");
+            }
+            return;
+        }
+        // get connection status
+        boolean closeConnection = isCloseConnection(req);
         // process hb service
-        if (reqUri.startsWith(hbSrvUrl)) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_HB_SUCCESS);
-            sendResponse(ctx, DataProxyErrCode.SUCCESS, checkClose(req));
+        if (HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())) {
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_HB_SUCCESS);
+            sendResponse(ctx, closeConnection);
             return;
         }
+        // get request attributes
+        final Map<String, String> reqAttrs = new HashMap<>();
+        getAttrsFromDecoder(uriDecoder, reqAttrs);
+        if (req.method() == HttpMethod.POST) {
+            // check and get content value
+            String cntLengthStr = 
req.headers().get(HttpHeaderNames.CONTENT_LENGTH);
+            if (StringUtils.isNotBlank(cntLengthStr) && 
NumberUtils.toInt(cntLengthStr, 0) > 0) {
+                String cntType = 
req.headers().get(HttpHeaderNames.CONTENT_TYPE);
+                if (StringUtils.isNotBlank(cntType)) {
+                    cntType = cntType.trim();
+                    if (!cntType.equalsIgnoreCase(
+                            
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) {
+                        
source.fileMetricEventInc(StatConstants.EVENT_MSG_CONTYPE_INVALID);
+                        sendErrorMsg(ctx, 
DataProxyErrCode.HTTP_UNSUPPORTED_CONTENT_TYPE,
+                                "Only support [" + 
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED
+                                        + "] content type!");
+                        return;
+                    }
+                    String cntStr = 
req.content().toString(Charsets.toCharset(CharEncoding.UTF_8));
+                    QueryStringDecoder cntDecoder = new 
QueryStringDecoder(cntStr, false);
+                    getAttrsFromDecoder(cntDecoder, reqAttrs);
+                }
+            }
+        }
         // process message request
-        processMessage(ctx, req, msgRcvTime, clientIp);
+        processMessage(ctx, reqAttrs, msgRcvTime, clientIp, closeConnection);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+        ctx.flush();
     }
 
-    private boolean processMessage(ChannelHandlerContext ctx, FullHttpRequest 
req,
-            long msgRcvTime, String clientIp) throws Exception {
-        // get and check groupId
-        HttpHeaders headers = req.headers();
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
+        // check illegal ip
+        if (ConfigManager.getInstance().needChkIllegalIP()) {
+            String strRemoteIp = 
AddressUtils.getChannelRemoteIP(ctx.channel());
+            if (strRemoteIp != null
+                    && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
+                source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
+                ctx.channel().disconnect();
+                ctx.channel().close();
+                if (logCounter.shouldPrint()) {
+                    logger.error(strRemoteIp + " is Illegal IP, so refuse it 
!");
+                }
+                return;
+            }
+        }
+        // check max allowed connection count
+        if (source.getAllChannels().size() >= source.getMaxConnections()) {
+            source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+            ctx.channel().disconnect();
+            ctx.channel().close();
+            if (logCounter.shouldPrint()) {
+                logger.warn("{} refuse to connect = {} , connections = {}, 
maxConnections = {}",
+                        source.getName(), ctx.channel(), 
source.getAllChannels().size(), source.getMaxConnections());
+            }
+            return;
+        }
+        // add legal channel
+        source.getAllChannels().add(ctx.channel());
+        ctx.fireChannelActive();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
+        ctx.fireChannelInactive();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
+        if (cause instanceof IOException) {
+            if (logCounter.shouldPrint()) {
+                logger.warn("{} received an IOException from channel {}",
+                        source.getName(), ctx.channel(), cause);
+            }
+            ctx.close();
+        } else {
+            sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
+                    "Process message failure: " + cause.getMessage());
+        }
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
+            ctx.close();
+        }
+    }
+
+    /**
+     * Process http report message
+     *
+     * @param ctx the handler context
+     * @param reqAttrs the attributes
+     * @param msgRcvTime  the message received time
+     * @param clientIp  the report ip
+     * @param isCloseCon  whether close connection
+     *
+     * @return whether process success
+     */
+    private boolean processMessage(ChannelHandlerContext ctx, Map<String, 
String> reqAttrs,
+            long msgRcvTime, String clientIp, boolean isCloseCon) throws 
Exception {
         StringBuilder strBuff = new StringBuilder(512);
-        String groupId = headers.get(AttributeConstants.GROUP_ID);
-        if (StringUtils.isEmpty(groupId)) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_WITHOUTGROUPID);
+        String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID);
+        if (StringUtils.isBlank(groupId)) {
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_GROUPID_MISSING);
             sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
-                    strBuff.append("Field 
").append(AttributeConstants.GROUP_ID)
+                    strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID)
                             .append(" must exist and not blank!").toString(),
-                    checkClose(req));
+                    isCloseCon);
             return false;
         }
         // get and check streamId
-        String streamId = headers.get(AttributeConstants.STREAM_ID);
-        if (StringUtils.isEmpty(streamId)) {
-            
source.fileMetricEventInc(StatConstants.EVENT_HTTP_WITHOUTSTREAMID);
+        String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
+        if (StringUtils.isBlank(streamId)) {
+            
source.fileMetricEventInc(StatConstants.EVENT_MSG_STREAMID_MISSING);
             sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
-                    strBuff.append("Field 
").append(AttributeConstants.STREAM_ID)
+                    strBuff.append("Field 
").append(HttpAttrConst.KEY_STREAM_ID)
                             .append(" must exist and not blank!").toString(),
-                    checkClose(req));
+                    isCloseCon);
             return false;
         }
         // get and check topicName
         String topicName = ConfigManager.getInstance().getTopicName(groupId, 
streamId);
         if (StringUtils.isBlank(topicName)) {
-            if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
-                source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
-                sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
-                        strBuff.append("Topic is null for 
").append(AttributeConstants.GROUP_ID)
-                                .append("(").append(groupId).append("),")
-                                .append(AttributeConstants.STREAM_ID)
-                                
.append("(,").append(streamId).append(")").toString(),
-                        checkClose(req));
-                return false;
-            }
-            topicName = source.getDefTopic();
+            
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+            sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
+                    strBuff.append("Topic not configured for 
").append(HttpAttrConst.KEY_STREAM_ID)
+                            .append("(").append(groupId).append("),")
+                            .append(HttpAttrConst.KEY_STREAM_ID)
+                            
.append("(,").append(streamId).append(")").toString(),
+                    isCloseCon);
+            return false;
         }
         // get and check dt
         long dataTime = msgRcvTime;
-        String dt = headers.get(AttributeConstants.DATA_TIME);
+        String dt = reqAttrs.get(HttpAttrConst.KEY_DATA_TIME);
         if (StringUtils.isNotEmpty(dt)) {
             try {
                 dataTime = Long.parseLong(dt);
@@ -209,45 +289,40 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
                 //
             }
         }
-        // get char set
-        String charset = headers.get(AttrConstants.CHARSET);
-        if (StringUtils.isBlank(charset)) {
-            charset = AttrConstants.CHARSET;
-        }
         // get and check body
-        String body = headers.get(AttrConstants.BODY);
+        String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
         if (StringUtils.isBlank(body)) {
             if (body == null) {
-                source.fileMetricEventInc(StatConstants.EVENT_HTTP_NOBODY);
+                
source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_MISSING);
                 sendResponse(ctx, 
DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
-                        strBuff.append("Field ").append(AttrConstants.BODY)
+                        strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                 .append(" is not exist!").toString(),
-                        checkClose(req));
+                        isCloseCon);
             } else {
-                source.fileMetricEventInc(StatConstants.EVENT_HTTP_EMPTYBODY);
+                source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_BLANK);
                 sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
-                        strBuff.append("Field ").append(AttrConstants.BODY)
+                        strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
                                 .append(" is Blank!").toString(),
-                        checkClose(req));
+                        isCloseCon);
             }
             return false;
         }
         if (body.length() > source.getMaxMsgLength()) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_BODYOVERMAXLEN);
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_OVERMAX);
             sendResponse(ctx, 
DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
-                    strBuff.append("Error msg, the body 
length(").append(body.length())
+                    strBuff.append("Error msg, the 
").append(HttpAttrConst.KEY_BODY)
+                            .append(" length(").append(body.length())
                             .append(") is bigger than allowed length(")
                             
.append(source.getMaxMsgLength()).append(")").toString(),
-                    checkClose(req));
+                    isCloseCon);
             return false;
         }
         // get message count
-        String strMsgCount = headers.get(AttributeConstants.MESSAGE_COUNT);
-        int intMsgCnt = NumberUtils.toInt(strMsgCount, 1);
-        strMsgCount = String.valueOf(intMsgCnt);
+        int intMsgCnt = 
NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
+        String strMsgCount = String.valueOf(intMsgCnt);
         // build message attributes
         InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
-        strBuff.append("&groupId=").append(groupId)
+        strBuff.append("groupId=").append(groupId)
                 .append("&streamId=").append(streamId)
                 .append("&dt=").append(dataTime)
                 .append("&NodeIP=").append(clientIp)
@@ -255,7 +330,7 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
                 .append("&rt=").append(msgRcvTime)
                 
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
                 
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
-        inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset));
+        inLongMsg.addMsg(strBuff.toString(), 
body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
         byte[] inlongMsgData = inLongMsg.buildArray();
         inLongMsg.reset();
         strBuff.delete(0, strBuff.length());
@@ -270,7 +345,6 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
         eventHeaders.put(ConfigConstants.MSG_ENCODE_VER, 
InLongMsgVer.INLONG_V0.getName());
         eventHeaders.put(AttributeConstants.RCV_TIME, 
String.valueOf(msgRcvTime));
         Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
-        String msgProcType = "b2b";
         // build metric data item
         dataTime = dataTime / 1000 / 60 / 10;
         dataTime = dataTime * 1000 * 60 * 10;
@@ -278,77 +352,84 @@ public class InLongHttpMsgHandler extends 
SimpleChannelInboundHandler<FullHttpRe
                 .append(AttrConstants.SEP_HASHTAG).append(topicName)
                 .append(AttrConstants.SEP_HASHTAG).append(streamId)
                 .append(AttrConstants.SEP_HASHTAG).append(clientIp)
-                
.append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp())
-                .append(AttrConstants.SEP_HASHTAG).append(msgProcType)
+                .append(AttrConstants.SEP_HASHTAG).append(source.getSrcHost())
+                .append(AttrConstants.SEP_HASHTAG).append("b2b")
                 
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime))
                 
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
         try {
             source.getChannelProcessor().processEvent(event);
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_POST_SUCCESS);
-            source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1, 
body.length(), 0);
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_SUCCESS);
+            source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1, 
event.getBody().length, 0);
             strBuff.delete(0, strBuff.length());
             source.addMetric(true, event.getBody().length, event);
-            sendResponse(ctx, DataProxyErrCode.SUCCESS, false);
+            sendResponse(ctx, isCloseCon);
             return true;
-        } catch (ChannelException ex) {
-            source.fileMetricEventInc(StatConstants.EVENT_HTTP_POST_DROPPED);
+        } catch (Throwable ex) {
+            source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_FAILURE);
             source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0, intMsgCnt);
             source.addMetric(false, event.getBody().length, event);
             strBuff.delete(0, strBuff.length());
-            sendResponse(ctx, DataProxyErrCode.UNKNOWN_ERROR.getErrCode(),
-                    strBuff.append("Put event to channel failure: 
").append(ex.getMessage())
-                            .toString(),
-                    false);
+            sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
+                    strBuff.append("Put event to channel failure: 
").append(ex.getMessage()).toString());
             if (logCounter.shouldPrint()) {
-                logger.error("Error write event to channel, data will 
discard.", ex);
+                logger.error("Error writing HTTP event to channel failure.", 
ex);
             }
             return false;
         }
     }
 
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) {
-        ctx.flush();
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
-        logger.error("Http process client={} error, cause:{}, msg:{}",
-                cause, clientIp, cause.getMessage());
-        sendErrorMsg(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR,
-                "Process message failure: " + cause.getMessage());
-        ctx.close();
+    /**
+     * Get attributes from decoder
+     *
+     * @param decoder the decode object
+     * @param reqAttrs the attributes
+     */
+    private void getAttrsFromDecoder(QueryStringDecoder decoder, Map<String, 
String> reqAttrs) {
+        for (Map.Entry<String, List<String>> attr : 
decoder.parameters().entrySet()) {
+            if (attr == null
+                    || attr.getKey() == null
+                    || attr.getValue() == null
+                    || attr.getValue().isEmpty()) {
+                continue;
+            }
+            reqAttrs.put(attr.getKey(), attr.getValue().get(0));
+        }
     }
 
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
-        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
-            ctx.close();
+    private boolean isCloseConnection(FullHttpRequest req) {
+        String connStatus = req.headers().get(HttpHeaderNames.CONNECTION);
+        if (connStatus == null) {
+            return false;
         }
+        connStatus = connStatus.trim();
+        return connStatus.equalsIgnoreCase(HttpHeaderValues.CLOSE.toString());
     }
 
-    private boolean checkClose(FullHttpRequest req) {
-        String connStatus = req.headers().get("Connection");
-        return !StringUtils.isBlank(connStatus) && 
"close".equalsIgnoreCase(connStatus);
+    private void sendErrorMsg(ChannelHandlerContext ctx, DataProxyErrCode 
errCodeObj) {
+        sendResponse(ctx, errCodeObj.getErrCode(), errCodeObj.getErrMsg(), 
true);
     }
 
-    private void sendErrorMsg(ChannelHandlerContext ctx, HttpResponseStatus 
status, String errMsg) {
-        FullHttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
-                Unpooled.copiedBuffer("Failure: " + status + ", "
-                        + errMsg + "\r\n", CharsetUtil.UTF_8));
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; 
charset=UTF-8");
-        ctx.writeAndFlush(response).addListener(new SendResultListener(true));
+    private void sendErrorMsg(ChannelHandlerContext ctx, DataProxyErrCode 
errCodeObj, String errMsg) {
+        sendResponse(ctx, errCodeObj.getErrCode(), errMsg, true);
     }
 
-    private void sendResponse(ChannelHandlerContext ctx, DataProxyErrCode 
errCodeObj, boolean isClose) {
-        sendResponse(ctx, errCodeObj.getErrCode(), errCodeObj.getErrMsg(), 
isClose);
-
+    private void sendResponse(ChannelHandlerContext ctx, boolean isClose) {
+        sendResponse(ctx, DataProxyErrCode.SUCCESS.getErrCode(), 
DataProxyErrCode.SUCCESS.getErrMsg(), isClose);
     }
 
     private void sendResponse(ChannelHandlerContext ctx, int errCode, String 
errMsg, boolean isClose) {
+        if (ctx == null || ctx.channel() == null) {
+            return;
+        }
+        if (!ctx.channel().isWritable()) {
+            
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+            if (logCounter.shouldPrint()) {
+                logger.warn("Send msg but channel full, channel={}", 
ctx.channel());
+            }
+            return;
+        }
         FullHttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json;charset=utf-8");
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
HttpAttrConst.RET_CNT_TYPE);
         StringBuilder builder =
                 new StringBuilder().append("{\"code\":\"").append(errCode)
                         .append("\",\"msg\":\"").append(errMsg).append("\"}");
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index b455817554..027e33e74f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -268,7 +268,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
             if (StringUtils.isBlank(confGroupId)) {
                 if (configManager.isGroupIdNumConfigEmpty()) {
-                    
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+                    
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
                     this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
                     this.errMsg = "GroupId-Mapping configuration is null";
                 } else {
@@ -300,7 +300,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                 confStreamId = 
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
                 if (StringUtils.isBlank(confStreamId)) {
                     if (configManager.isStreamIdNumConfigEmpty()) {
-                        
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_UNREADY);
+                        
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
                         this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
                         this.errMsg = "StreamId-Mapping configuration is null";
                     } else {
@@ -338,7 +338,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
             if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                 this.topicName = source.getDefTopic();
             } else {
-                source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
+                
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
                 this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
                 this.errMsg = String.format("Topic is null for 
inlongGroupId=(%s), inlongStreamId=(%s)",
                         this.groupId, this.streamId);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
index ddda70fde1..f02752ca10 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
@@ -148,7 +148,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
             if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                 tmpTopicName = source.getDefTopic();
             } else {
-                source.fileMetricEventInc(StatConstants.EVENT_NOTOPIC);
+                
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
                 this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
                 this.errMsg = String.format(
                         "Topic is null for inlongGroupId=(%s), 
inlongStreamId=(%s)", tmpGroupId, tmpStreamId);

Reply via email to