[ https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann reopened FLINK-9587: ---------------------------------- > ContinuousFileMonitoringFunction crashes on short living files > -------------------------------------------------------------- > > Key: FLINK-9587 > URL: https://issues.apache.org/jira/browse/FLINK-9587 > Project: Flink > Issue Type: Bug > Components: FileSystem, Streaming, Streaming Connectors > Affects Versions: 1.5.0 > Environment: Flink 1.5 running as a standalone cluster. > Reporter: Andrei Shumanski > Priority: Critical > Fix For: 1.6.3, 1.7.0 > > > Hi, > > We use Flink to monitor a directory for new files. The filesystem is a MapR > Fuse mount that looks like a local FS. > The files are copied to the directory by another process that uses rsync > command. While a file is not completely written rsync creates a temporary > file with a name like ".file.txt.uM6MfZ" where the last extension is a random > string. > When the copying is done - file is renamed to the final name "file.txt". > > The bug is that Flink does not correctly handle this behavior and does not > take into account that files in the directory might be deleted. > > We are getting error traces: > {code:java} > java.io.FileNotFoundException: File > file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or > the user running Flink ('root') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at > org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > In LocalFileSystem.listStatus(final Path f) we read the list of files in a > directory and then create LocalFileStatus object for each of the files. But a > file might be removed during the interval between these operations. > I do not see any option to handle this exception in our code. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)