This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new bce8b5432 [INLONG-5861][Agent] Support symbolic links for file (#5862) bce8b5432 is described below commit bce8b543208351b557fb384c11d34d1ad0d3a505 Author: ganfengtan <ganfeng...@users.noreply.github.com> AuthorDate: Tue Sep 13 11:01:29 2022 +0800 [INLONG-5861][Agent] Support symbolic links for file (#5862) --- .../sources/reader/file/MonitorTextFile.java | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java index dfad65450..0c6f6a72e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java @@ -82,6 +82,7 @@ public final class MonitorTextFile { private final TextFileReader textFileReader; private final Long interval; private final long startTime = System.currentTimeMillis(); + private String path; /** * the last modify time of the file */ @@ -95,6 +96,7 @@ public final class MonitorTextFile { try { this.attributesBefore = Files .readAttributes(fileReaderOperator.file.toPath(), BasicFileAttributes.class); + this.path = this.fileReaderOperator.file.getCanonicalPath(); } catch (IOException e) { LOGGER.error("get {} last modify time error:", fileReaderOperator.file.getName(), e); } @@ -121,17 +123,29 @@ public final class MonitorTextFile { } private void listen() throws IOException { - BasicFileAttributes attributesAfter = Files - .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class); + BasicFileAttributes attributesAfter; + String currentPath; + try { + attributesAfter = Files + .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class); + currentPath = this.fileReaderOperator.file.getCanonicalPath(); + } catch (Exception e) { + // Set position 0 when split file + this.fileReaderOperator.position = 0; + LOGGER.error("monitor {} error, reset position is 0:", this.fileReaderOperator.file.getName(), e); + return; + } + // If change symbolic links + if (attributesAfter.isSymbolicLink() && !path.equals(currentPath)) { + this.fileReaderOperator.position = 0; + path = currentPath; + } if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0) { // Not triggered during data sending - if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator.hasNext()) { + if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator + .hasNext()) { return; } - // Set position 0 when split file - if (attributesBefore.creationTime().compareTo(attributesAfter.creationTime()) < 0) { - this.fileReaderOperator.position = 0; - } this.textFileReader.getData(); this.textFileReader.mergeData(this.fileReaderOperator); this.attributesBefore = attributesAfter;