This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit e26610ff1de43db5ecbe1eb548724630e9d3ee7e Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Thu Jan 5 16:32:24 2023 +0800 [INLONG-7156][Agent] Support directly sending raw file data (#7157) --- .../sources/reader/file/FileReaderOperator.java | 8 ++++- .../apache/inlong/agent/plugin/TestFileAgent.java | 2 +- .../agent/plugin/sources/TestTextFileReader.java | 34 +++++++++++++++++++++- .../inlong/agent/plugin/task/TestTextFileTask.java | 6 +++- .../agent-plugins/src/test/resources/test/3.txt | 5 ++++ 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java index 32ede5da4..583632877 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java @@ -21,11 +21,11 @@ import com.google.gson.Gson; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.Validator; -import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.sources.reader.AbstractReader; import org.apache.inlong.agent.plugin.utils.FileDataUtils; import org.apache.inlong.agent.plugin.validator.PatternValidator; @@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader { private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); private final StringBuffer sb = new StringBuffer(); + private boolean needMetadata = false; public FileReaderOperator(File file, int position) { this(file, position, ""); @@ -261,6 +262,9 @@ public class FileReaderOperator extends AbstractReader { } public String metadataMessage(String message) { + if (!needMetadata) { + return message; + } long timestamp = System.currentTimeMillis(); boolean isJson = FileDataUtils.isJSON(message); Map<String, String> mergeData = new HashMap<>(metadata); @@ -280,8 +284,10 @@ public class FileReaderOperator extends AbstractReader { String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA); Arrays.stream(env).forEach(data -> { if (data.equalsIgnoreCase(KUBERNETES)) { + needMetadata = true; new KubernetesMetadataProvider(this).getData(); } else if (data.equalsIgnoreCase(ENV_CVM)) { + needMetadata = true; metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost()); metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp()); metadata.put(METADATA_FILE_NAME, file.getName()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java index d0bf5d99d..011f70bf7 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java @@ -169,7 +169,7 @@ public class TestFileAgent { await().atMost(10, TimeUnit.SECONDS).until(() -> { Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs(); return jobs.size() == 1 - && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 4; + && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 5; }); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java index 6159aa219..f89bf0355 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; @@ -56,13 +57,15 @@ import java.util.stream.Stream; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS; -import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE; import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID; import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID; +import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES; @PowerMockIgnore({"javax.management.*", "javax.script.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "org.w3c.*"}) @@ -135,6 +138,33 @@ public class TestTextFileReader { } } + @Test + public void testFileRowDataRead() throws URISyntaxException { + URI uri = getClass().getClassLoader().getResource("test").toURI(); + JobProfile jobConfiguration = JobProfile.parseJsonStr("{}"); + String mainPath = Paths.get(uri).toString(); + jobConfiguration.set(JOB_DIR_FILTER_PATTERNS, Paths.get(mainPath, + "3.txt").toFile().getAbsolutePath()); + jobConfiguration.set(JOB_INSTANCE_ID, "test"); + jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid"); + jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid"); + jobConfiguration.set(JOB_GROUP_ID, "groupid"); + jobConfiguration.set(JOB_STREAM_ID, "streamid"); + TextFileSource fileSource = new TextFileSource(); + List<Reader> readerList = fileSource.split(jobConfiguration); + Assert.assertEquals(1, readerList.size()); + Reader reader = readerList.get(0); + reader.init(jobConfiguration); + while (!reader.isFinished()) { + Message message = reader.read(); + if (message == null) { + break; + } + Assert.assertEquals("agent text content test", message.toString()); + } + + } + /** * Custom line end character. */ @@ -152,6 +182,7 @@ public class TestTextFileReader { jobConfiguration.set(JOB_STREAM_ID, "streamid"); jobConfiguration.set(JOB_FILE_TRIGGER_TYPE, FileTriggerType.FULL); jobConfiguration.set(JOB_FILE_LINE_END_PATTERN, "line-end-symbol"); + jobConfiguration.set(JOB_FILE_META_ENV_LIST, KUBERNETES); TextFileSource fileSource = new TextFileSource(); List<Reader> readerList = fileSource.split(jobConfiguration); Assert.assertEquals(1, readerList.size()); @@ -224,6 +255,7 @@ public class TestTextFileReader { jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid"); jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid"); jobProfile.set(JOB_INSTANCE_ID, "1"); + jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES); fileReaderOperator.init(jobProfile); Assert.assertEquals("world", getContent( diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java index cd7c29aa5..e0245a83d 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java @@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST; +import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -180,7 +182,7 @@ public class TestTextFileTask { jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath()); jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0)); jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL); - + jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES); // mock data final MockSink sink = mockTextTask(jobProfile); await().atMost(10, TimeUnit.SECONDS).until(() -> sink.getResult().size() == 100); @@ -220,6 +222,7 @@ public class TestTextFileTask { jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath()); jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT); jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0)); + jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES); // mock data final MockSink sink = mockTextTask(jobProfile); @@ -252,6 +255,7 @@ public class TestTextFileTask { jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL); jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0)); jobProfile.set(JobConstants.JOB_FILE_LINE_END_PATTERN, "[0-9]"); + jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES); // mock data final MockSink sink = mockTextTask(jobProfile); diff --git a/inlong-agent/agent-plugins/src/test/resources/test/3.txt b/inlong-agent/agent-plugins/src/test/resources/test/3.txt new file mode 100644 index 000000000..45d4bba28 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/resources/test/3.txt @@ -0,0 +1,5 @@ +agent text content test +agent text content test +agent text content test +agent text content test +agent text content test \ No newline at end of file