This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f0a575dc3 [INLONG-11135][Agent] Support filtering capability when 
supplementing data (#11137)
2f0a575dc3 is described below

commit 2f0a575dc3147ab287797ea92a253c2ead788933
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Sep 19 16:06:05 2024 +0800

    [INLONG-11135][Agent] Support filtering capability when supplementing data 
(#11137)
    
    * [INLONG-11135][Agent] Support filtering capability when supplementing data
    
    * INLONG-11135][Agent] Support filtering capability when supplementing data
    
    * [INLONG-11135][Agent] Support filtering capability when supplementing data
---
 .../inlong/agent/conf/AbstractConfiguration.java   |  6 +--
 .../inlong/agent/constant/TaskConstants.java       |  3 +-
 .../org/apache/inlong/agent/pojo/FileTask.java     | 10 ++---
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  3 ++
 .../agent/plugin/sources/file/AbstractSource.java  | 43 ++++++++++++++++------
 ...dedHandler.java => DefaultExtendedHandler.java} | 11 ++++--
 .../sources/file/extend/ExtendedHandler.java       |  9 +++--
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  | 18 ++++++---
 .../agent/plugin/instance/TestInstanceManager.java |  4 +-
 .../inlong/agent/plugin/sinks/KafkaSinkTest.java   |  4 +-
 .../inlong/agent/plugin/sinks/PulsarSinkTest.java  |  4 +-
 .../sinks/filecollect/TestSenderManager.java       |  4 +-
 .../agent/plugin/sources/TestLogFileSource.java    |  5 ++-
 .../agent/plugin/sources/TestSQLServerSource.java  |  8 ++--
 .../inlong/agent/plugin/task/TestLogFileTask.java  |  5 ++-
 .../inlong/agent/plugin/task/TestTaskManager.java  | 12 +++---
 .../src/test/resources/test/mix_20230928_1.txt     |  3 ++
 17 files changed, 97 insertions(+), 55 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
index f323386bfd..e8842d7d3a 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
@@ -45,8 +45,6 @@ import java.util.Properties;
 public abstract class AbstractConfiguration {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractConfiguration.class);
-    private static final JsonParser JSON_PARSER = new JsonParser();
-
     private final Map<String, JsonPrimitive> configStorage = new HashMap<>();
 
     /**
@@ -81,7 +79,7 @@ public abstract class AbstractConfiguration {
             if (inputStream != null) {
                 reader = new InputStreamReader(inputStream, 
StandardCharsets.UTF_8);
                 if (isJson) {
-                    JsonElement tmpElement = 
JSON_PARSER.parse(reader).getAsJsonObject();
+                    JsonElement tmpElement = 
JsonParser.parseReader(reader).getAsJsonObject();
                     updateConfig(new HashMap<>(10), 0, tmpElement);
                 } else {
                     Properties properties = new Properties();
@@ -103,7 +101,7 @@ public abstract class AbstractConfiguration {
      * @param jsonStr json string
      */
     public void loadJsonStrResource(String jsonStr) {
-        JsonElement tmpElement = JSON_PARSER.parse(jsonStr);
+        JsonElement tmpElement = JsonParser.parseString(jsonStr);
         updateConfig(new HashMap<>(10), 0, tmpElement);
     }
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index d2642efb03..9398d70640 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -61,6 +61,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_FILE_CONTENT_COLLECT_TYPE = 
"task.fileTask.contentCollectType";
     public static final String SOURCE_DATA_CONTENT_STYLE = 
"task.fileTask.dataContentStyle";
     public static final String SOURCE_DATA_SEPARATOR = 
"task.fileTask.dataSeparator";
+    public static final String SOURCE_FILTER_STREAMS = 
"task.fileTask.filterStreams";
     public static final String TASK_RETRY = "task.fileTask.retry";
     public static final String TASK_START_TIME = "task.fileTask.startTime";
     public static final String TASK_END_TIME = "task.fileTask.endTime";
@@ -68,7 +69,7 @@ public class TaskConstants extends CommonConstants {
     public static final String PREDEFINE_FIELDS = "task.predefinedFields";
     public static final String FILE_SOURCE_EXTEND_CLASS = 
"task.fileTask.extendedClass";
     public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
-            
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
+            
"org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler";
     public static final String TASK_AUDIT_VERSION = "task.auditVersion";
 
     // Kafka task
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index 6397ecf5db..57c294f7d4 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.pojo;
 
 import lombok.Data;
 
-import java.util.Map;
+import java.util.List;
 
 @Data
 public class FileTask {
@@ -46,8 +46,8 @@ public class FileTask {
 
     private String dataSeparator;
 
-    // JSON string, the content format is Map<String,Object>
-    private String properties;
+    // The streamIds to be filtered out
+    private String filterStreams;
 
     // Monitor interval for file
     private Long monitorInterval;
@@ -121,8 +121,8 @@ public class FileTask {
         // Column separator of data source
         private String dataSeparator;
 
-        // Properties for file
-        private Map<String, Object> properties;
+        // The streamIds to be filtered out
+        private List<String> filterStreams;
 
         // Monitor interval for file
         private Long monitorInterval;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index e340f88d8b..bb602e2c61 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -167,6 +167,9 @@ public class TaskProfileDto {
         fileTask.setCycleUnit(taskConfig.getCycleUnit());
         fileTask.setStartTime(taskConfig.getStartTime());
         fileTask.setEndTime(taskConfig.getEndTime());
+        if (taskConfig.getFilterStreams() != null) {
+            
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
+        }
         if (taskConfig.getTimeOffset() != null) {
             fileTask.setTimeOffset(taskConfig.getTimeOffset());
         } else {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 6ee2950d3c..f1fb8b5570 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -309,6 +309,37 @@ public abstract class AbstractSource implements Source {
 
     @Override
     public Message read() {
+        SourceData sourceData = readFromQueue();
+        while (sourceData != null) {
+            Message msg = createMessage(sourceData);
+            if (filterSourceData(msg)) {
+                long auditTime = 0;
+                if (isRealTime) {
+                    auditTime = AgentUtils.getCurrentTime();
+                } else {
+                    auditTime = profile.getSinkDataTime();
+                }
+                Map<String, String> header = msg.getHeader();
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+                        auditTime, 1, sourceData.getData().length, 
auditVersion);
+                
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
+                        header.get(PROXY_KEY_STREAM_ID),
+                        AgentUtils.getCurrentTime(), 1, 
sourceData.getData().length, auditVersion);
+                return msg;
+            }
+            sourceData = readFromQueue();
+        }
+        return null;
+    }
+
+    private boolean filterSourceData(Message msg) {
+        if (extendedHandler != null) {
+            return extendedHandler.filterMessage(msg);
+        }
+        return true;
+    }
+
+    private SourceData readFromQueue() {
         SourceData sourceData = null;
         try {
             sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
@@ -321,7 +352,7 @@ public abstract class AbstractSource implements Source {
         }
         LOGGER.debug("Read from source queue {} {}", new 
String(sourceData.getData()), inlongGroupId);
         MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.getData().length);
-        return createMessage(sourceData);
+        return sourceData;
     }
 
     private Message createMessage(SourceData sourceData) {
@@ -333,16 +364,6 @@ public abstract class AbstractSource implements Source {
         if (extendedHandler != null) {
             extendedHandler.dealWithHeader(header, sourceData.getData());
         }
-        long auditTime = 0;
-        if (isRealTime) {
-            auditTime = AgentUtils.getCurrentTime();
-        } else {
-            auditTime = profile.getSinkDataTime();
-        }
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                auditTime, 1, sourceData.getData().length, auditVersion);
-        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
-                AgentUtils.getCurrentTime(), 1, sourceData.getData().length, 
auditVersion);
         Message finalMsg = new DefaultMessage(sourceData.getData(), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java
similarity index 80%
copy from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java
index 8cd2e76f48..18df71bf31 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java
@@ -18,19 +18,22 @@
 package org.apache.inlong.agent.plugin.sources.file.extend;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.plugin.Message;
 
 import java.util.Map;
 
-// For some private, customized extension processing
-public abstract class ExtendedHandler {
-
-    public ExtendedHandler(InstanceProfile profile) {
+public class DefaultExtendedHandler extends ExtendedHandler {
 
+    public DefaultExtendedHandler(InstanceProfile profile) {
+        super(profile);
     }
 
     // Modify the header by the body
     public void dealWithHeader(Map<String, String> header, byte[] body) {
+    }
 
+    public boolean filterMessage(Message msg) {
+        return true;
     }
 
     public static class Constants {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
index 8cd2e76f48..2412a3055c 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
@@ -18,20 +18,23 @@
 package org.apache.inlong.agent.plugin.sources.file.extend;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.plugin.Message;
 
 import java.util.Map;
 
 // For some private, customized extension processing
 public abstract class ExtendedHandler {
 
-    public ExtendedHandler(InstanceProfile profile) {
+    protected InstanceProfile profile;
 
+    public ExtendedHandler(InstanceProfile profile) {
+        this.profile = profile;
     }
 
     // Modify the header by the body
-    public void dealWithHeader(Map<String, String> header, byte[] body) {
+    abstract public void dealWithHeader(Map<String, String> header, byte[] 
body);
 
-    }
+    abstract public boolean filterMessage(Message msg);
 
     public static class Constants {
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 10e4532be2..a7693f7da4 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.List;
 
 /**
  * common environment setting up for test cases.
@@ -80,15 +81,19 @@ public class AgentBaseTestsHelper {
         }
     }
 
-    public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
-            TaskStateEnum state, String cycleUnit, String timeZone) {
-        DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state, cycleUnit, timeZone);
+    public TaskProfile getTaskProfile(int taskId, String pattern, String 
dataContentStyle, boolean retry,
+            Long startTime, Long endTime,
+            TaskStateEnum state, String cycleUnit, String timeZone, 
List<String> filterStreams) {
+        DataConfig dataConfig = getDataConfig(taskId, pattern, 
dataContentStyle, retry, startTime, endTime,
+                state, cycleUnit, timeZone,
+                filterStreams);
         TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
         return profile;
     }
 
-    private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
-            TaskStateEnum state, String cycleUnit, String timeZone) {
+    private DataConfig getDataConfig(int taskId, String pattern, String 
dataContentStyle, boolean retry, Long startTime,
+            Long endTime,
+            TaskStateEnum state, String cycleUnit, String timeZone, 
List<String> filterStreams) {
         DataConfig dataConfig = new DataConfig();
         dataConfig.setInlongGroupId("testGroupId");
         dataConfig.setInlongStreamId("testStreamId");
@@ -107,8 +112,9 @@ public class AgentBaseTestsHelper {
         fileTaskConfig.setStartTime(startTime);
         fileTaskConfig.setEndTime(endTime);
         // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
-        fileTaskConfig.setDataContentStyle("mix");
+        fileTaskConfig.setDataContentStyle(dataContentStyle);
         fileTaskConfig.setDataSeparator("|");
+        fileTaskConfig.setFilterStreams(filterStreams);
         dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
         return dataConfig;
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index ce9a77acf6..f2c5f25ee3 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -59,8 +59,8 @@ public class TestInstanceManager {
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
         Store basicInstanceStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
-        taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
-                "GMT+6:00");
+        taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, 
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
+                "GMT+6:00", null);
         Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
         TaskStore taskStore = new TaskStore(taskBasicStore);
         taskStore.storeTask(taskProfile);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 26e06c1326..5524c5e96e 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -47,8 +47,8 @@ public class KafkaSinkTest {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING, "D",
-                "GMT+8:00");
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+                "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
         kafkaSink = new MockSink();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index 39f6ec8e71..93702fad16 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -47,8 +47,8 @@ public class PulsarSinkTest {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING, "D",
-                "GMT+8:00");
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+                "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
         pulsarSink = new MockSink();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 91b3c6c10a..afeb3565e2 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -70,8 +70,8 @@ public class TestSenderManager {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING, "D",
-                "GMT+8:00");
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+                "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 1049bebb1a..df70039459 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -77,8 +77,9 @@ public class TestLogFileSource {
     private LogFileSource getSource(int taskId, long offset) {
         try {
             String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
-                    "GMT+8:00");
+            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", false, 0L, 0L,
+                    TaskStateEnum.RUNNING, "D",
+                    "GMT+8:00", null);
             String fileName = 
LOADER.getResource("test/20230928_1.txt").getPath();
             InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile("",
                     fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 0dc8b71be4..2a90bdc37a 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -49,7 +49,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -134,8 +136,8 @@ public class TestSQLServerSource {
         final String tableName = "test_source";
         final String serverName = "server-01";
 
-        TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L, 
TaskStateEnum.RUNNING, "D",
-                "GMT+8:00");
+        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
0L, 0L, TaskStateEnum.RUNNING, "D",
+                "GMT+8:00", null);
         instanceProfile = taskProfile.createInstanceProfile("",
                 "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
         instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 30abede6fc..1ef3b5db1e 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -103,8 +103,9 @@ public class TestLogFileTask {
         for (int i = 0; i < resources.size(); i++) {
             resourceName.add(LOADER.getResource(resources.get(i)).getPath());
         }
-        TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, true, 
0L, 0L, TaskStateEnum.RUNNING, cycle,
-                "GMT+8:00");
+        TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", true, 0L, 0L, TaskStateEnum.RUNNING,
+                cycle,
+                "GMT+8:00", null);
         LogFileTask dayTask = null;
         final List<String> fileName = new ArrayList();
         final List<String> dataTime = new ArrayList();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index c5370fc69d..608d3adec6 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -58,8 +58,8 @@ public class TestTaskManager {
             manager = new TaskManager();
             TaskStore taskStore = manager.getTaskStore();
             for (int i = 1; i <= 10; i++) {
-                TaskProfile taskProfile = helper.getTaskProfile(i, pattern, 
false, 0L, 0L, TaskStateEnum.RUNNING,
-                        "D", "GMT+8:00");
+                TaskProfile taskProfile = helper.getTaskProfile(i, pattern, 
"csv", false, 0L, 0L, TaskStateEnum.RUNNING,
+                        "D", "GMT+8:00", null);
                 taskProfile.setTaskClass(MockTask.class.getCanonicalName());
                 taskStore.storeTask(taskProfile);
             }
@@ -74,8 +74,8 @@ public class TestTaskManager {
             Assert.assertTrue("manager start error", false);
         }
 
-        TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING,
-                "D", "GMT+8:00");
+        TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING,
+                "D", "GMT+8:00", null);
         String taskId1 = taskProfile1.getTaskId();
         taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
         List<TaskProfile> taskProfiles1 = new ArrayList<>();
@@ -99,8 +99,8 @@ public class TestTaskManager {
         Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.RUNNING);
 
         // test delete
-        TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING,
-                "D", "GMT+8:00");
+        TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING,
+                "D", "GMT+8:00", null);
         taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
         List<TaskProfile> taskProfiles2 = new ArrayList<>();
         taskProfiles2.add(taskProfile2);
diff --git 
a/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt 
b/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt
new file mode 100644
index 0000000000..0d136aa28e
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt
@@ -0,0 +1,3 @@
+ok|hello line-end-symbol aa
+no|world line-end-symbol
+ok|agent line-end-symbol

Reply via email to