This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c0beb955 [INLONG-9237][Agent] Move addictive fields to package 
attributes (#9238)
b1c0beb955 is described below

commit b1c0beb955763dd9e3b9fdefafa9d1d36d51486a
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Nov 8 15:25:52 2023 +0800

    [INLONG-9237][Agent] Move addictive fields to package attributes (#9238)
---
 .../inlong/agent/constant/TaskConstants.java       |  2 +-
 .../message/filecollect/ProxyMessageCache.java     |  1 +
 .../org/apache/inlong/agent/utils/AgentUtils.java  | 25 ++++++++++++++++++++++
 .../inlong/agent/plugin/sources/LogFileSource.java |  2 --
 .../inlong/agent/plugin/utils/MetaDataUtils.java   | 25 ----------------------
 5 files changed, 27 insertions(+), 28 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index facefd011c..3285936951 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -80,7 +80,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_START_TIME = "task.fileTask.startTime";
     public static final String TASK_END_TIME = "task.fileTask.endTime";
     public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
-    public static final String PREDEFINE_FIELDS = "task.predefineFields";
+    public static final String PREDEFINE_FIELDS = "task.predefinedFields";
 
     // Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 5426c2eb54..7ca74fb603 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -79,6 +79,7 @@ public class ProxyMessageCache {
         this.streamId = streamId;
         this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
         extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
+        
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
     }
 
     public void generateExtraMap(String dataKey) {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index fd87faca40..01c1567726 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -40,6 +40,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.StringTokenizer;
 import java.util.TimeZone;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -251,6 +252,30 @@ public class AgentUtils {
         return Pair.of(mValue, attr);
     }
 
+    public static Map<String, String> parseAddAttrToMap(String addictiveAttr) {
+        StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
+        Map<String, String> attr = new HashMap<String, String>();
+        while (token.hasMoreTokens()) {
+            String value = token.nextToken().trim();
+            if (value.contains("=")) {
+                String[] pairs = value.split("=");
+
+                if (pairs[0].equalsIgnoreCase("m")) {
+                    continue;
+                }
+
+                // when addictiveattr like "m=10&__addcol1__worldid="
+                if (value.endsWith("=") && pairs.length == 1) {
+                    attr.put(pairs[0], "");
+                } else {
+                    attr.put(pairs[0], pairs[1]);
+                }
+
+            }
+        }
+        return attr;
+    }
+
     /**
      * Get the attrs in pairs can be complicated in online env
      */
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 5cbec1efda..ab96b6a8e8 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -32,7 +32,6 @@ import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 import 
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 
@@ -349,7 +348,6 @@ public class LogFileSource extends AbstractSource {
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
         header.put(OFFSET, sourceData.offset.toString());
-        
header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields()));
         Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index d2028bfc52..aa215bb539 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -137,28 +136,4 @@ public class MetaDataUtils {
         }).filter(Objects::nonNull).collect(Collectors.toList());
         return podName.isEmpty() ? null : podName.get(0);
     }
-
-    public static Map<String, String> parseAddAttr(String addictiveAttr) {
-        StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
-        Map<String, String> attr = new HashMap<String, String>();
-        while (token.hasMoreTokens()) {
-            String value = token.nextToken().trim();
-            if (value.contains("=")) {
-                String[] pairs = value.split("=");
-
-                if (pairs[0].equalsIgnoreCase("m")) {
-                    continue;
-                }
-
-                // when addictiveattr like "m=10&__addcol1__worldid="
-                if (value.endsWith("=") && pairs.length == 1) {
-                    attr.put(pairs[0], "");
-                } else {
-                    attr.put(pairs[0], pairs[1]);
-                }
-
-            }
-        }
-        return attr;
-    }
 }

Reply via email to