This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new c61b3f69b [INLONG-5803][Agent] Fix NPE when collect file data (#5804) c61b3f69b is described below commit c61b3f69ba23df59508bc7cbf3a7f7b552783b6b Author: ganfengtan <ganfeng...@users.noreply.github.com> AuthorDate: Tue Sep 6 19:20:48 2022 +0800 [INLONG-5803][Agent] Fix NPE when collect file data (#5804) --- .../sources/reader/file/FileReaderOperator.java | 4 ++-- .../sources/reader/file/MonitorTextFile.java | 22 ++++++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java index 5c35f85d9..2ff0bf189 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java @@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.apache.inlong.agent.constant.CommonConstants.COMMA; -import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST; import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES; @@ -192,9 +191,10 @@ public class FileReaderOperator extends AbstractReader { } } + // default value is -1 and never stop task private void initReadTimeout(JobProfile jobConf) { int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT, - DEFAULT_JOB_FILE_MAX_WAIT); + NEVER_STOP_SIGN); if (waitTime == NEVER_STOP_SIGN) { timeout = NEVER_STOP_SIGN; } else { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java index e684b2ce7..4a2644dba 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Files; import java.nio.file.attribute.BasicFileAttributes; +import java.util.Objects; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -76,6 +77,7 @@ public final class MonitorTextFile { */ private class MonitorEventRunnable implements Runnable { + private static final int WAIT_TIME = 30; private final FileReaderOperator fileReaderOperator; private final TextFileReader textFileReader; private final Long interval; @@ -100,8 +102,9 @@ public final class MonitorTextFile { @Override public void run() { - while (!this.fileReaderOperator.finished) { - try { + try { + TimeUnit.SECONDS.sleep(WAIT_TIME); + while (!this.fileReaderOperator.finished) { long expireTime = Long.parseLong(fileReaderOperator.jobConf .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE)); long currentTime = System.currentTimeMillis(); @@ -111,17 +114,24 @@ public final class MonitorTextFile { } listen(); TimeUnit.MILLISECONDS.sleep(interval); - } catch (Exception e) { - LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e); } + } catch (Exception e) { + LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e); } } private void listen() throws IOException { BasicFileAttributes attributesAfter = Files .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class); - if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0 - && !this.fileReaderOperator.iterator.hasNext()) { + if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0) { + // Not triggered during data sending + if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator.hasNext()) { + return; + } + // Set position 0 when split file + if (attributesBefore.creationTime().compareTo(attributesAfter.creationTime()) < 0) { + this.fileReaderOperator.position = 0; + } this.textFileReader.getData(); this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator(); this.attributesBefore = attributesAfter;