Andrei Shumanski created FLINK-9587:
---------------------------------------

             Summary: ContinuousFileMonitoringFunction crashes on short living 
files
                 Key: FLINK-9587
                 URL: https://issues.apache.org/jira/browse/FLINK-9587
             Project: Flink
          Issue Type: Bug
          Components: FileSystem, Streaming, Streaming Connectors
    Affects Versions: 1.5.0
         Environment: Flink 1.5 running as a standalone cluster.
            Reporter: Andrei Shumanski


Hi,

 

We use Flink to monitor a directory for new files. The filesystem is a MapR 
Fuse mount that looks like a local FS.

The files are copied to the directory by another process that uses rsync 
command. While a file is not completely written rsync creates a temporary file 
with a name like ".file.txt.uM6MfZ" where the last extension is a random string.

When the copying is done - file is renamed to the final name "file.txt".

 

The bug is that Flink does not correctly handle this behavior and does not take 
into account that files in the directory might be deleted.

 

We are getting error traces:
{code:java}
java.io.FileNotFoundException: File 
file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or 
the user running Flink ('root') has insufficient permissions to access it.
at 
org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
at 
org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
at 
org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
at 
org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
at 
org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
at 
org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
{code}
In LocalFileSystem.listStatus(final Path f) we read the list of files in a 
directory and then create LocalFileStatus object for each of the files. But a 
file might be removed during the interval between these operations.

I do not see any option to handle this exception in our code.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to