This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f000b2dd4 [INLONG-8019][DataProxy] Optimize the function of getTopic() 
(#8020)
f000b2dd4 is described below

commit f000b2dd4d0be0b4d934943f8cd1b8428f765e34
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Fri May 12 18:46:23 2023 +0800

    [INLONG-8019][DataProxy] Optimize the function of getTopic() (#8020)
---
 .../inlong/dataproxy/config/ConfigManager.java     | 21 ++++++++++++++--
 .../inlong/dataproxy/http/MessageFilter.java       | 16 +++++++++++++
 .../dataproxy/http/SimpleMessageHandler.java       | 16 +------------
 .../dataproxy/source/ServerMessageHandler.java     |  6 ++---
 .../dataproxy/source/SimpleMessageHandler.java     | 28 ++--------------------
 .../inlong/dataproxy/utils/MessageUtils.java       | 20 ----------------
 6 files changed, 40 insertions(+), 67 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index c2fac48ce..e655ac083 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -104,8 +104,25 @@ public class ConfigManager {
         return weightHolder.getHolder();
     }
 
-    public Map<String, String> getTopicProperties() {
-        return topicConfig.getHolder();
+    /**
+     * get topic by groupId and streamId
+     */
+    public String getTopicName(String groupId, String streamId) {
+        String topic = null;
+        Map<String, String> topicsMap = topicConfig.getHolder();
+        if (topicsMap != null && StringUtils.isNotEmpty(groupId)) {
+            if (StringUtils.isNotEmpty(streamId)) {
+                topic = topicsMap.get(groupId + "/" + streamId);
+            }
+            if (StringUtils.isEmpty(topic)) {
+                topic = topicsMap.get(groupId);
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Get topic by groupId = {}, streamId = {}, topic = {}",
+                    groupId, streamId, topic);
+        }
+        return topic;
     }
 
     public boolean addTopicProperties(Map<String, String> result) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 430085403..e18f95ca5 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.slf4j.Logger;
@@ -100,6 +101,15 @@ public class MessageFilter implements Filter {
                     
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg());
             return;
         }
+        // get and check topicName
+        String topicName = ConfigManager.getInstance().getTopicName(groupId, 
streamId);
+        if (StringUtils.isBlank(topicName)
+                && !CommonConfigHolder.getInstance().isNoTopicAccept()) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
+                    DataProxyErrCode.TOPIC_IS_BLANK.getErrMsg());
+            return;
+        }
         // get and check dt
         String dt = req.getParameter(AttributeConstants.DATA_TIME);
         if (StringUtils.isEmpty(dt)) {
@@ -117,6 +127,12 @@ public class MessageFilter implements Filter {
             return;
         }
         // check body length
+        if (body.length() <= 0) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.EMPTY_MSG.getErrCode(),
+                    "Bad request, body length <= 0");
+            return;
+        }
         if (body.length() > maxMsgLength) {
             returnRspPackage(resp, req.getCharacterEncoding(),
                     DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index dae6a4c1b..8ad27654b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -97,7 +97,7 @@ public class SimpleMessageHandler implements MessageHandler {
         groupId = groupId.trim();
         streamId = streamId.trim();
         // get topicName
-        String topicName = getTopic(groupId, streamId);
+        String topicName = configManager.getTopicName(groupId, streamId);
         if (StringUtils.isBlank(topicName)) {
             if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
                 topicName = "test";
@@ -220,20 +220,6 @@ public class SimpleMessageHandler implements 
MessageHandler {
     public void configure(org.apache.flume.Context context) {
     }
 
-    private String getTopic(String groupId, String streamId) {
-        String topic = null;
-        if (StringUtils.isNotEmpty(groupId)) {
-            if (StringUtils.isNotEmpty(streamId)) {
-                topic = configManager.getTopicProperties().get(groupId + "/" + 
streamId);
-            }
-            if (StringUtils.isEmpty(topic)) {
-                topic = configManager.getTopicProperties().get(groupId);
-            }
-        }
-        LOG.debug("Get topic by groupId/streamId = {}, topic = {}", groupId + 
"/" + streamId, topic);
-        return topic;
-    }
-
     /**
      * add statistics information
      *
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index a846b3df7..085dd6258 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -377,8 +377,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                         message.setGroupId(groupId);
                         message.setStreamId(streamId);
                         // get configured topic name
-                        configTopic = MessageUtils.getTopic(
-                                configManager.getTopicProperties(), groupId, 
streamId);
+                        configTopic = configManager.getTopicName(groupId, 
streamId);
                     }
                 }
             } else {
@@ -391,8 +390,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                     
message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
                 }
                 // get configured topic name
-                configTopic = MessageUtils.getTopic(
-                        configManager.getTopicProperties(), groupId, streamId);
+                configTopic = configManager.getTopicName(groupId, streamId);
             }
             // check topic configure
             if (StringUtils.isEmpty(configTopic)) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 44595fd95..41742fc37 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -217,7 +217,7 @@ public class SimpleMessageHandler extends 
ChannelInboundHandlerAdapter {
         String streamId = message.getStreamId();
         if (null != groupId) {
 
-            String value = getTopic(groupId, streamId);
+            String value = configManager.getTopicName(groupId, streamId);
             if (StringUtils.isNotEmpty(value)) {
                 topicInfo.set(value.trim());
             }
@@ -254,7 +254,7 @@ public class SimpleMessageHandler extends 
ChannelInboundHandlerAdapter {
                     message.setGroupId(groupId);
                     message.setStreamId(streamId);
 
-                    String value = getTopic(groupId, streamId);
+                    String value = configManager.getTopicName(groupId, 
streamId);
                     if (StringUtils.isNotEmpty(value)) {
                         topicInfo.set(value.trim());
                     }
@@ -655,30 +655,6 @@ public class SimpleMessageHandler extends 
ChannelInboundHandlerAdapter {
         ctx.fireChannelInactive();
     }
 
-    /**
-     * get topic
-     */
-    private String getTopic(String groupId) {
-        return getTopic(groupId, null);
-    }
-
-    /**
-     * get topic
-     */
-    private String getTopic(String groupId, String streamId) {
-        String topic = null;
-        if (StringUtils.isNotEmpty(groupId)) {
-            if (StringUtils.isNotEmpty(streamId)) {
-                topic = configManager.getTopicProperties().get(groupId + "/" + 
streamId);
-            }
-            if (StringUtils.isEmpty(topic)) {
-                topic = configManager.getTopicProperties().get(groupId);
-            }
-        }
-        logger.debug("Get topic by groupId = {} , streamId = {}", groupId, 
streamId);
-        return topic;
-    }
-
     /**
      * addMetric
      * 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index 1c0c442d0..fdb3f8d9f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -383,26 +383,6 @@ public class MessageUtils {
         return binBuffer;
     }
 
-    /**
-     * get topic
-     */
-    public static String getTopic(Map<String, String> topicsMap, String 
groupId, String streamId) {
-        String topic = null;
-        if (topicsMap != null && StringUtils.isNotEmpty(groupId)) {
-            if (StringUtils.isNotEmpty(streamId)) {
-                topic = topicsMap.get(groupId + "/" + streamId);
-            }
-            if (StringUtils.isEmpty(topic)) {
-                topic = topicsMap.get(groupId);
-            }
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("Get topic by groupId = {}, streamId = {}, topic = 
{}",
-                    groupId, streamId, topic);
-        }
-        return topic;
-    }
-
     public static Map<String, String> getXfsAttrs(Map<String, String> headers, 
String pkgVersion) {
         // common attributes
         Map<String, String> attrs = new HashMap<>();

Reply via email to