Andrei Shumanski created FLINK-9587: ---------------------------------------
Summary: ContinuousFileMonitoringFunction crashes on short living files Key: FLINK-9587 URL: https://issues.apache.org/jira/browse/FLINK-9587 Project: Flink Issue Type: Bug Components: FileSystem, Streaming, Streaming Connectors Affects Versions: 1.5.0 Environment: Flink 1.5 running as a standalone cluster. Reporter: Andrei Shumanski Hi, We use Flink to monitor a directory for new files. The filesystem is a MapR Fuse mount that looks like a local FS. The files are copied to the directory by another process that uses rsync command. While a file is not completely written rsync creates a temporary file with a name like ".file.txt.uM6MfZ" where the last extension is a random string. When the copying is done - file is renamed to the final name "file.txt". The bug is that Flink does not correctly handle this behavior and does not take into account that files in the directory might be deleted. We are getting error traces: {code:java} java.io.FileNotFoundException: File file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or the user running Flink ('root') has insufficient permissions to access it. at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92) at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707) at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) {code} In LocalFileSystem.listStatus(final Path f) we read the list of files in a directory and then create LocalFileStatus object for each of the files. But a file might be removed during the interval between these operations. I do not see any option to handle this exception in our code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)