This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3596862db0 [INLONG-9215][Agent] Add predefine fields (#9217)
3596862db0 is described below

commit 3596862db00db0a9ce11e406896a4f9d362ab3ee
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Nov 3 17:39:01 2023 +0800

    [INLONG-9215][Agent] Add predefine fields (#9217)
    
    * [INLONG-9215][Agent] Add predefine fields
    
    * [INLONG-9215][Agent] Add predefine fields
    
    * [INLONG-9215][Agent] Add predefine fields
---
 .../apache/inlong/agent/conf/InstanceProfile.java  |  4 ++++
 .../inlong/agent/constant/TaskConstants.java       |  1 +
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  4 +++-
 .../inlong/agent/plugin/sources/LogFileSource.java |  4 +++-
 .../inlong/agent/plugin/utils/MetaDataUtils.java   | 25 ++++++++++++++++++++++
 5 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 5592008085..5c3e74fe86 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -101,6 +101,10 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
         setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime);
     }
 
+    public String getPredefineFields() {
+        return get(TaskConstants.PREDEFINE_FIELDS, "");
+    }
+
     @Override
     public boolean allRequiredKeyExist() {
         return hasKey(TaskConstants.FILE_UPDATE_TIME);
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index fa2ac856fd..facefd011c 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -80,6 +80,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_START_TIME = "task.fileTask.startTime";
     public static final String TASK_END_TIME = "task.fileTask.endTime";
     public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
+    public static final String PREDEFINE_FIELDS = "task.predefineFields";
 
     // Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 964ac1cf02..0fcf7a95f4 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -154,7 +154,7 @@ public class TaskProfileDto {
         }
 
         if (null != taskConfig.getLineEndPattern()) {
-            FileTask.Line line = new Line();
+            Line line = new Line();
             line.setEndPattern(taskConfig.getLineEndPattern());
             fileTask.setLine(line);
         }
@@ -410,6 +410,7 @@ public class TaskProfileDto {
         task.setUuid(dataConfig.getUuid());
         task.setVersion(dataConfig.getVersion());
         task.setState(dataConfig.getState());
+        task.setPredefinedFields(dataConfig.getPredefinedFields());
 
         // set sink type
         if (dataConfig.getDataReportType() == 
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
@@ -516,6 +517,7 @@ public class TaskProfileDto {
         private String mqClusters;
         private String topicInfo;
         private String taskClass;
+        private String predefinedFields;
         private Integer state;
 
         private FileTask fileTask;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 1e187e2bf4..c056b3aae1 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -32,6 +32,7 @@ import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 import 
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
+import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 
@@ -137,7 +138,7 @@ public class LogFileSource extends AbstractSource {
     @Override
     public void init(InstanceProfile profile) {
         try {
-            LOGGER.info("FileReaderOperator init: {}", profile.toJsonStr());
+            LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
             this.profile = profile;
             super.init(profile);
             taskId = profile.getTaskId();
@@ -348,6 +349,7 @@ public class LogFileSource extends AbstractSource {
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
         header.put(OFFSET, sourceData.offset.toString());
+        
header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields()));
         Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index aa215bb539..d2028bfc52 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -136,4 +137,28 @@ public class MetaDataUtils {
         }).filter(Objects::nonNull).collect(Collectors.toList());
         return podName.isEmpty() ? null : podName.get(0);
     }
+
+    public static Map<String, String> parseAddAttr(String addictiveAttr) {
+        StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
+        Map<String, String> attr = new HashMap<String, String>();
+        while (token.hasMoreTokens()) {
+            String value = token.nextToken().trim();
+            if (value.contains("=")) {
+                String[] pairs = value.split("=");
+
+                if (pairs[0].equalsIgnoreCase("m")) {
+                    continue;
+                }
+
+                // when addictiveattr like "m=10&__addcol1__worldid="
+                if (value.endsWith("=") && pairs.length == 1) {
+                    attr.put(pairs[0], "");
+                } else {
+                    attr.put(pairs[0], pairs[1]);
+                }
+
+            }
+        }
+        return attr;
+    }
 }

Reply via email to