This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b1c0beb955 [INLONG-9237][Agent] Move addictive fields to package attributes (#9238) b1c0beb955 is described below commit b1c0beb955763dd9e3b9fdefafa9d1d36d51486a Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Nov 8 15:25:52 2023 +0800 [INLONG-9237][Agent] Move addictive fields to package attributes (#9238) --- .../inlong/agent/constant/TaskConstants.java | 2 +- .../message/filecollect/ProxyMessageCache.java | 1 + .../org/apache/inlong/agent/utils/AgentUtils.java | 25 ++++++++++++++++++++++ .../inlong/agent/plugin/sources/LogFileSource.java | 2 -- .../inlong/agent/plugin/utils/MetaDataUtils.java | 25 ---------------------- 5 files changed, 27 insertions(+), 28 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index facefd011c..3285936951 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -80,7 +80,7 @@ public class TaskConstants extends CommonConstants { public static final String TASK_START_TIME = "task.fileTask.startTime"; public static final String TASK_END_TIME = "task.fileTask.endTime"; public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount"; - public static final String PREDEFINE_FIELDS = "task.predefineFields"; + public static final String PREDEFINE_FIELDS = "task.predefinedFields"; // Binlog job public static final String JOB_DATABASE_USER = "job.binlogJob.user"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 5426c2eb54..7ca74fb603 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -79,6 +79,7 @@ public class ProxyMessageCache { this.streamId = streamId; this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO); extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); + extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields())); } public void generateExtraMap(String dataKey) { diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java index fd87faca40..01c1567726 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java @@ -40,6 +40,7 @@ import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.StringTokenizer; import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -251,6 +252,30 @@ public class AgentUtils { return Pair.of(mValue, attr); } + public static Map<String, String> parseAddAttrToMap(String addictiveAttr) { + StringTokenizer token = new StringTokenizer(addictiveAttr, "&"); + Map<String, String> attr = new HashMap<String, String>(); + while (token.hasMoreTokens()) { + String value = token.nextToken().trim(); + if (value.contains("=")) { + String[] pairs = value.split("="); + + if (pairs[0].equalsIgnoreCase("m")) { + continue; + } + + // when addictiveattr like "m=10&__addcol1__worldid=" + if (value.endsWith("=") && pairs.length == 1) { + attr.put(pairs[0], ""); + } else { + attr.put(pairs[0], pairs[1]); + } + + } + } + return attr; + } + /** * Get the attrs in pairs can be complicated in online env */ diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 5cbec1efda..ab96b6a8e8 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -32,7 +32,6 @@ import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider; -import org.apache.inlong.agent.plugin.utils.MetaDataUtils; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; @@ -349,7 +348,6 @@ public class LogFileSource extends AbstractSource { Map<String, String> header = new HashMap<>(); header.put(PROXY_KEY_DATA, proxyPartitionKey); header.put(OFFSET, sourceData.offset.toString()); - header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields())); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java index d2028bfc52..aa215bb539 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -137,28 +136,4 @@ public class MetaDataUtils { }).filter(Objects::nonNull).collect(Collectors.toList()); return podName.isEmpty() ? null : podName.get(0); } - - public static Map<String, String> parseAddAttr(String addictiveAttr) { - StringTokenizer token = new StringTokenizer(addictiveAttr, "&"); - Map<String, String> attr = new HashMap<String, String>(); - while (token.hasMoreTokens()) { - String value = token.nextToken().trim(); - if (value.contains("=")) { - String[] pairs = value.split("="); - - if (pairs[0].equalsIgnoreCase("m")) { - continue; - } - - // when addictiveattr like "m=10&__addcol1__worldid=" - if (value.endsWith("=") && pairs.length == 1) { - attr.put(pairs[0], ""); - } else { - attr.put(pairs[0], pairs[1]); - } - - } - } - return attr; - } }