This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2f0a575dc3 [INLONG-11135][Agent] Support filtering capability when supplementing data (#11137) 2f0a575dc3 is described below commit 2f0a575dc3147ab287797ea92a253c2ead788933 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Sep 19 16:06:05 2024 +0800 [INLONG-11135][Agent] Support filtering capability when supplementing data (#11137) * [INLONG-11135][Agent] Support filtering capability when supplementing data * INLONG-11135][Agent] Support filtering capability when supplementing data * [INLONG-11135][Agent] Support filtering capability when supplementing data --- .../inlong/agent/conf/AbstractConfiguration.java | 6 +-- .../inlong/agent/constant/TaskConstants.java | 3 +- .../org/apache/inlong/agent/pojo/FileTask.java | 10 ++--- .../apache/inlong/agent/pojo/TaskProfileDto.java | 3 ++ .../agent/plugin/sources/file/AbstractSource.java | 43 ++++++++++++++++------ ...dedHandler.java => DefaultExtendedHandler.java} | 11 ++++-- .../sources/file/extend/ExtendedHandler.java | 9 +++-- .../inlong/agent/plugin/AgentBaseTestsHelper.java | 18 ++++++--- .../agent/plugin/instance/TestInstanceManager.java | 4 +- .../inlong/agent/plugin/sinks/KafkaSinkTest.java | 4 +- .../inlong/agent/plugin/sinks/PulsarSinkTest.java | 4 +- .../sinks/filecollect/TestSenderManager.java | 4 +- .../agent/plugin/sources/TestLogFileSource.java | 5 ++- .../agent/plugin/sources/TestSQLServerSource.java | 8 ++-- .../inlong/agent/plugin/task/TestLogFileTask.java | 5 ++- .../inlong/agent/plugin/task/TestTaskManager.java | 12 +++--- .../src/test/resources/test/mix_20230928_1.txt | 3 ++ 17 files changed, 97 insertions(+), 55 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java index f323386bfd..e8842d7d3a 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java @@ -45,8 +45,6 @@ import java.util.Properties; public abstract class AbstractConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfiguration.class); - private static final JsonParser JSON_PARSER = new JsonParser(); - private final Map<String, JsonPrimitive> configStorage = new HashMap<>(); /** @@ -81,7 +79,7 @@ public abstract class AbstractConfiguration { if (inputStream != null) { reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); if (isJson) { - JsonElement tmpElement = JSON_PARSER.parse(reader).getAsJsonObject(); + JsonElement tmpElement = JsonParser.parseReader(reader).getAsJsonObject(); updateConfig(new HashMap<>(10), 0, tmpElement); } else { Properties properties = new Properties(); @@ -103,7 +101,7 @@ public abstract class AbstractConfiguration { * @param jsonStr json string */ public void loadJsonStrResource(String jsonStr) { - JsonElement tmpElement = JSON_PARSER.parse(jsonStr); + JsonElement tmpElement = JsonParser.parseString(jsonStr); updateConfig(new HashMap<>(10), 0, tmpElement); } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index d2642efb03..9398d70640 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -61,6 +61,7 @@ public class TaskConstants extends CommonConstants { public static final String TASK_FILE_CONTENT_COLLECT_TYPE = "task.fileTask.contentCollectType"; public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle"; public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator"; + public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams"; public static final String TASK_RETRY = "task.fileTask.retry"; public static final String TASK_START_TIME = "task.fileTask.startTime"; public static final String TASK_END_TIME = "task.fileTask.endTime"; @@ -68,7 +69,7 @@ public class TaskConstants extends CommonConstants { public static final String PREDEFINE_FIELDS = "task.predefinedFields"; public static final String FILE_SOURCE_EXTEND_CLASS = "task.fileTask.extendedClass"; public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS = - "org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler"; + "org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler"; public static final String TASK_AUDIT_VERSION = "task.auditVersion"; // Kafka task diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java index 6397ecf5db..57c294f7d4 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java @@ -19,7 +19,7 @@ package org.apache.inlong.agent.pojo; import lombok.Data; -import java.util.Map; +import java.util.List; @Data public class FileTask { @@ -46,8 +46,8 @@ public class FileTask { private String dataSeparator; - // JSON string, the content format is Map<String,Object> - private String properties; + // The streamIds to be filtered out + private String filterStreams; // Monitor interval for file private Long monitorInterval; @@ -121,8 +121,8 @@ public class FileTask { // Column separator of data source private String dataSeparator; - // Properties for file - private Map<String, Object> properties; + // The streamIds to be filtered out + private List<String> filterStreams; // Monitor interval for file private Long monitorInterval; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index e340f88d8b..bb602e2c61 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -167,6 +167,9 @@ public class TaskProfileDto { fileTask.setCycleUnit(taskConfig.getCycleUnit()); fileTask.setStartTime(taskConfig.getStartTime()); fileTask.setEndTime(taskConfig.getEndTime()); + if (taskConfig.getFilterStreams() != null) { + fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams())); + } if (taskConfig.getTimeOffset() != null) { fileTask.setTimeOffset(taskConfig.getTimeOffset()); } else { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 6ee2950d3c..f1fb8b5570 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -309,6 +309,37 @@ public abstract class AbstractSource implements Source { @Override public Message read() { + SourceData sourceData = readFromQueue(); + while (sourceData != null) { + Message msg = createMessage(sourceData); + if (filterSourceData(msg)) { + long auditTime = 0; + if (isRealTime) { + auditTime = AgentUtils.getCurrentTime(); + } else { + auditTime = profile.getSinkDataTime(); + } + Map<String, String> header = msg.getHeader(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), + auditTime, 1, sourceData.getData().length, auditVersion); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, + header.get(PROXY_KEY_STREAM_ID), + AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion); + return msg; + } + sourceData = readFromQueue(); + } + return null; + } + + private boolean filterSourceData(Message msg) { + if (extendedHandler != null) { + return extendedHandler.filterMessage(msg); + } + return true; + } + + private SourceData readFromQueue() { SourceData sourceData = null; try { sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -321,7 +352,7 @@ public abstract class AbstractSource implements Source { } LOGGER.debug("Read from source queue {} {}", new String(sourceData.getData()), inlongGroupId); MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); - return createMessage(sourceData); + return sourceData; } private Message createMessage(SourceData sourceData) { @@ -333,16 +364,6 @@ public abstract class AbstractSource implements Source { if (extendedHandler != null) { extendedHandler.dealWithHeader(header, sourceData.getData()); } - long auditTime = 0; - if (isRealTime) { - auditTime = AgentUtils.getCurrentTime(); - } else { - auditTime = profile.getSinkDataTime(); - } - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - auditTime, 1, sourceData.getData().length, auditVersion); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion); Message finalMsg = new DefaultMessage(sourceData.getData(), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java similarity index 80% copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java index 8cd2e76f48..18df71bf31 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/DefaultExtendedHandler.java @@ -18,19 +18,22 @@ package org.apache.inlong.agent.plugin.sources.file.extend; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.plugin.Message; import java.util.Map; -// For some private, customized extension processing -public abstract class ExtendedHandler { - - public ExtendedHandler(InstanceProfile profile) { +public class DefaultExtendedHandler extends ExtendedHandler { + public DefaultExtendedHandler(InstanceProfile profile) { + super(profile); } // Modify the header by the body public void dealWithHeader(Map<String, String> header, byte[] body) { + } + public boolean filterMessage(Message msg) { + return true; } public static class Constants { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java index 8cd2e76f48..2412a3055c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java @@ -18,20 +18,23 @@ package org.apache.inlong.agent.plugin.sources.file.extend; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.plugin.Message; import java.util.Map; // For some private, customized extension processing public abstract class ExtendedHandler { - public ExtendedHandler(InstanceProfile profile) { + protected InstanceProfile profile; + public ExtendedHandler(InstanceProfile profile) { + this.profile = profile; } // Modify the header by the body - public void dealWithHeader(Map<String, String> header, byte[] body) { + abstract public void dealWithHeader(Map<String, String> header, byte[] body); - } + abstract public boolean filterMessage(Message msg); public static class Constants { diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 10e4532be2..a7693f7da4 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; /** * common environment setting up for test cases. @@ -80,15 +81,19 @@ public class AgentBaseTestsHelper { } } - public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state, String cycleUnit, String timeZone) { - DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit, timeZone); + public TaskProfile getTaskProfile(int taskId, String pattern, String dataContentStyle, boolean retry, + Long startTime, Long endTime, + TaskStateEnum state, String cycleUnit, String timeZone, List<String> filterStreams) { + DataConfig dataConfig = getDataConfig(taskId, pattern, dataContentStyle, retry, startTime, endTime, + state, cycleUnit, timeZone, + filterStreams); TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); return profile; } - private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state, String cycleUnit, String timeZone) { + private DataConfig getDataConfig(int taskId, String pattern, String dataContentStyle, boolean retry, Long startTime, + Long endTime, + TaskStateEnum state, String cycleUnit, String timeZone, List<String> filterStreams) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); dataConfig.setInlongStreamId("testStreamId"); @@ -107,8 +112,9 @@ public class AgentBaseTestsHelper { fileTaskConfig.setStartTime(startTime); fileTaskConfig.setEndTime(endTime); // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf - fileTaskConfig.setDataContentStyle("mix"); + fileTaskConfig.setDataContentStyle(dataContentStyle); fileTaskConfig.setDataSeparator("|"); + fileTaskConfig.setFilterStreams(filterStreams); dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); return dataConfig; } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java index ce9a77acf6..f2c5f25ee3 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java @@ -59,8 +59,8 @@ public class TestInstanceManager { helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt"; Store basicInstanceStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); - taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR, - "GMT+6:00"); + taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR, + "GMT+6:00", null); Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); TaskStore taskStore = new TaskStore(taskBasicStore); taskStore.storeTask(taskProfile); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java index 26e06c1326..5524c5e96e 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java @@ -47,8 +47,8 @@ public class KafkaSinkTest { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", - "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); kafkaSink = new MockSink(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java index 39f6ec8e71..93702fad16 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java @@ -47,8 +47,8 @@ public class PulsarSinkTest { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", - "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); pulsarSink = new MockSink(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 91b3c6c10a..afeb3565e2 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -70,8 +70,8 @@ public class TestSenderManager { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", - "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 1049bebb1a..df70039459 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -77,8 +77,9 @@ public class TestLogFileSource { private LogFileSource getSource(int taskId, long offset) { try { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D", - "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, 0L, 0L, + TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); InstanceProfile instanceProfile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java index 0dc8b71be4..2a90bdc37a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java @@ -49,7 +49,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.powermock.api.mockito.PowerMockito.mockStatic; @@ -134,8 +136,8 @@ public class TestSQLServerSource { final String tableName = "test_source"; final String serverName = "server-01"; - TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L, TaskStateEnum.RUNNING, "D", - "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); instanceProfile = taskProfile.createInstanceProfile("", "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java index 30abede6fc..1ef3b5db1e 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java @@ -103,8 +103,9 @@ public class TestLogFileTask { for (int i = 0; i < resources.size(); i++) { resourceName.add(LOADER.getResource(resources.get(i)).getPath()); } - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, cycle, - "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", true, 0L, 0L, TaskStateEnum.RUNNING, + cycle, + "GMT+8:00", null); LogFileTask dayTask = null; final List<String> fileName = new ArrayList(); final List<String> dataTime = new ArrayList(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java index c5370fc69d..608d3adec6 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java @@ -58,8 +58,8 @@ public class TestTaskManager { manager = new TaskManager(); TaskStore taskStore = manager.getTaskStore(); for (int i = 1; i <= 10; i++) { - TaskProfile taskProfile = helper.getTaskProfile(i, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, - "D", "GMT+8:00"); + TaskProfile taskProfile = helper.getTaskProfile(i, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, + "D", "GMT+8:00", null); taskProfile.setTaskClass(MockTask.class.getCanonicalName()); taskStore.storeTask(taskProfile); } @@ -74,8 +74,8 @@ public class TestTaskManager { Assert.assertTrue("manager start error", false); } - TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, - "D", "GMT+8:00"); + TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, + "D", "GMT+8:00", null); String taskId1 = taskProfile1.getTaskId(); taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles1 = new ArrayList<>(); @@ -99,8 +99,8 @@ public class TestTaskManager { Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); // test delete - TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, - "D", "GMT+8:00"); + TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, + "D", "GMT+8:00", null); taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles2 = new ArrayList<>(); taskProfiles2.add(taskProfile2); diff --git a/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt b/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt new file mode 100644 index 0000000000..0d136aa28e --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/resources/test/mix_20230928_1.txt @@ -0,0 +1,3 @@ +ok|hello line-end-symbol aa +no|world line-end-symbol +ok|agent line-end-symbol