This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3596862db0 [INLONG-9215][Agent] Add predefine fields (#9217) 3596862db0 is described below commit 3596862db00db0a9ce11e406896a4f9d362ab3ee Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Nov 3 17:39:01 2023 +0800 [INLONG-9215][Agent] Add predefine fields (#9217) * [INLONG-9215][Agent] Add predefine fields * [INLONG-9215][Agent] Add predefine fields * [INLONG-9215][Agent] Add predefine fields --- .../apache/inlong/agent/conf/InstanceProfile.java | 4 ++++ .../inlong/agent/constant/TaskConstants.java | 1 + .../apache/inlong/agent/pojo/TaskProfileDto.java | 4 +++- .../inlong/agent/plugin/sources/LogFileSource.java | 4 +++- .../inlong/agent/plugin/utils/MetaDataUtils.java | 25 ++++++++++++++++++++++ 5 files changed, 36 insertions(+), 2 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java index 5592008085..5c3e74fe86 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -101,6 +101,10 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime); } + public String getPredefineFields() { + return get(TaskConstants.PREDEFINE_FIELDS, ""); + } + @Override public boolean allRequiredKeyExist() { return hasKey(TaskConstants.FILE_UPDATE_TIME); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index fa2ac856fd..facefd011c 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -80,6 +80,7 @@ public class TaskConstants extends CommonConstants { public static final String TASK_START_TIME = "task.fileTask.startTime"; public static final String TASK_END_TIME = "task.fileTask.endTime"; public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount"; + public static final String PREDEFINE_FIELDS = "task.predefineFields"; // Binlog job public static final String JOB_DATABASE_USER = "job.binlogJob.user"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 964ac1cf02..0fcf7a95f4 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -154,7 +154,7 @@ public class TaskProfileDto { } if (null != taskConfig.getLineEndPattern()) { - FileTask.Line line = new Line(); + Line line = new Line(); line.setEndPattern(taskConfig.getLineEndPattern()); fileTask.setLine(line); } @@ -410,6 +410,7 @@ public class TaskProfileDto { task.setUuid(dataConfig.getUuid()); task.setVersion(dataConfig.getVersion()); task.setState(dataConfig.getState()); + task.setPredefinedFields(dataConfig.getPredefinedFields()); // set sink type if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) { @@ -516,6 +517,7 @@ public class TaskProfileDto { private String mqClusters; private String topicInfo; private String taskClass; + private String predefinedFields; private Integer state; private FileTask fileTask; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 1e187e2bf4..c056b3aae1 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -32,6 +32,7 @@ import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider; +import org.apache.inlong.agent.plugin.utils.MetaDataUtils; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; @@ -137,7 +138,7 @@ public class LogFileSource extends AbstractSource { @Override public void init(InstanceProfile profile) { try { - LOGGER.info("FileReaderOperator init: {}", profile.toJsonStr()); + LOGGER.info("LogFileSource init: {}", profile.toJsonStr()); this.profile = profile; super.init(profile); taskId = profile.getTaskId(); @@ -348,6 +349,7 @@ public class LogFileSource extends AbstractSource { Map<String, String> header = new HashMap<>(); header.put(PROXY_KEY_DATA, proxyPartitionKey); header.put(OFFSET, sourceData.offset.toString()); + header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields())); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java index aa215bb539..d2028bfc52 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -136,4 +137,28 @@ public class MetaDataUtils { }).filter(Objects::nonNull).collect(Collectors.toList()); return podName.isEmpty() ? null : podName.get(0); } + + public static Map<String, String> parseAddAttr(String addictiveAttr) { + StringTokenizer token = new StringTokenizer(addictiveAttr, "&"); + Map<String, String> attr = new HashMap<String, String>(); + while (token.hasMoreTokens()) { + String value = token.nextToken().trim(); + if (value.contains("=")) { + String[] pairs = value.split("="); + + if (pairs[0].equalsIgnoreCase("m")) { + continue; + } + + // when addictiveattr like "m=10&__addcol1__worldid=" + if (value.endsWith("=") && pairs.length == 1) { + attr.put(pairs[0], ""); + } else { + attr.put(pairs[0], pairs[1]); + } + + } + } + return attr; + } }