This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f000b2dd4 [INLONG-8019][DataProxy] Optimize the function of getTopic() (#8020) f000b2dd4 is described below commit f000b2dd4d0be0b4d934943f8cd1b8428f765e34 Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri May 12 18:46:23 2023 +0800 [INLONG-8019][DataProxy] Optimize the function of getTopic() (#8020) --- .../inlong/dataproxy/config/ConfigManager.java | 21 ++++++++++++++-- .../inlong/dataproxy/http/MessageFilter.java | 16 +++++++++++++ .../dataproxy/http/SimpleMessageHandler.java | 16 +------------ .../dataproxy/source/ServerMessageHandler.java | 6 ++--- .../dataproxy/source/SimpleMessageHandler.java | 28 ++-------------------- .../inlong/dataproxy/utils/MessageUtils.java | 20 ---------------- 6 files changed, 40 insertions(+), 67 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index c2fac48ce..e655ac083 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -104,8 +104,25 @@ public class ConfigManager { return weightHolder.getHolder(); } - public Map<String, String> getTopicProperties() { - return topicConfig.getHolder(); + /** + * get topic by groupId and streamId + */ + public String getTopicName(String groupId, String streamId) { + String topic = null; + Map<String, String> topicsMap = topicConfig.getHolder(); + if (topicsMap != null && StringUtils.isNotEmpty(groupId)) { + if (StringUtils.isNotEmpty(streamId)) { + topic = topicsMap.get(groupId + "/" + streamId); + } + if (StringUtils.isEmpty(topic)) { + topic = topicsMap.get(groupId); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Get topic by groupId = {}, streamId = {}, topic = {}", + groupId, streamId, topic); + } + return topic; } public boolean addTopicProperties(Map<String, String> result) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java index 430085403..e18f95ca5 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flume.ChannelException; import org.apache.inlong.common.enums.DataProxyErrCode; import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.dataproxy.config.CommonConfigHolder; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttrConstants; import org.slf4j.Logger; @@ -100,6 +101,15 @@ public class MessageFilter implements Filter { DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg()); return; } + // get and check topicName + String topicName = ConfigManager.getInstance().getTopicName(groupId, streamId); + if (StringUtils.isBlank(topicName) + && !CommonConfigHolder.getInstance().isNoTopicAccept()) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(), + DataProxyErrCode.TOPIC_IS_BLANK.getErrMsg()); + return; + } // get and check dt String dt = req.getParameter(AttributeConstants.DATA_TIME); if (StringUtils.isEmpty(dt)) { @@ -117,6 +127,12 @@ public class MessageFilter implements Filter { return; } // check body length + if (body.length() <= 0) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.EMPTY_MSG.getErrCode(), + "Bad request, body length <= 0"); + return; + } if (body.length() > maxMsgLength) { returnRspPackage(resp, req.getCharacterEncoding(), DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(), diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index dae6a4c1b..8ad27654b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -97,7 +97,7 @@ public class SimpleMessageHandler implements MessageHandler { groupId = groupId.trim(); streamId = streamId.trim(); // get topicName - String topicName = getTopic(groupId, streamId); + String topicName = configManager.getTopicName(groupId, streamId); if (StringUtils.isBlank(topicName)) { if (CommonConfigHolder.getInstance().isNoTopicAccept()) { topicName = "test"; @@ -220,20 +220,6 @@ public class SimpleMessageHandler implements MessageHandler { public void configure(org.apache.flume.Context context) { } - private String getTopic(String groupId, String streamId) { - String topic = null; - if (StringUtils.isNotEmpty(groupId)) { - if (StringUtils.isNotEmpty(streamId)) { - topic = configManager.getTopicProperties().get(groupId + "/" + streamId); - } - if (StringUtils.isEmpty(topic)) { - topic = configManager.getTopicProperties().get(groupId); - } - } - LOG.debug("Get topic by groupId/streamId = {}, topic = {}", groupId + "/" + streamId, topic); - return topic; - } - /** * add statistics information * diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index a846b3df7..085dd6258 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -377,8 +377,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { message.setGroupId(groupId); message.setStreamId(streamId); // get configured topic name - configTopic = MessageUtils.getTopic( - configManager.getTopicProperties(), groupId, streamId); + configTopic = configManager.getTopicName(groupId, streamId); } } } else { @@ -391,8 +390,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr)); } // get configured topic name - configTopic = MessageUtils.getTopic( - configManager.getTopicProperties(), groupId, streamId); + configTopic = configManager.getTopicName(groupId, streamId); } // check topic configure if (StringUtils.isEmpty(configTopic)) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java index 44595fd95..41742fc37 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java @@ -217,7 +217,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { String streamId = message.getStreamId(); if (null != groupId) { - String value = getTopic(groupId, streamId); + String value = configManager.getTopicName(groupId, streamId); if (StringUtils.isNotEmpty(value)) { topicInfo.set(value.trim()); } @@ -254,7 +254,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { message.setGroupId(groupId); message.setStreamId(streamId); - String value = getTopic(groupId, streamId); + String value = configManager.getTopicName(groupId, streamId); if (StringUtils.isNotEmpty(value)) { topicInfo.set(value.trim()); } @@ -655,30 +655,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter { ctx.fireChannelInactive(); } - /** - * get topic - */ - private String getTopic(String groupId) { - return getTopic(groupId, null); - } - - /** - * get topic - */ - private String getTopic(String groupId, String streamId) { - String topic = null; - if (StringUtils.isNotEmpty(groupId)) { - if (StringUtils.isNotEmpty(streamId)) { - topic = configManager.getTopicProperties().get(groupId + "/" + streamId); - } - if (StringUtils.isEmpty(topic)) { - topic = configManager.getTopicProperties().get(groupId); - } - } - logger.debug("Get topic by groupId = {} , streamId = {}", groupId, streamId); - return topic; - } - /** * addMetric * diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java index 1c0c442d0..fdb3f8d9f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java @@ -383,26 +383,6 @@ public class MessageUtils { return binBuffer; } - /** - * get topic - */ - public static String getTopic(Map<String, String> topicsMap, String groupId, String streamId) { - String topic = null; - if (topicsMap != null && StringUtils.isNotEmpty(groupId)) { - if (StringUtils.isNotEmpty(streamId)) { - topic = topicsMap.get(groupId + "/" + streamId); - } - if (StringUtils.isEmpty(topic)) { - topic = topicsMap.get(groupId); - } - } - if (logger.isDebugEnabled()) { - logger.debug("Get topic by groupId = {}, streamId = {}, topic = {}", - groupId, streamId, topic); - } - return topic; - } - public static Map<String, String> getXfsAttrs(Map<String, String> headers, String pkgVersion) { // common attributes Map<String, String> attrs = new HashMap<>();