Hi all, I'm using the following code to continuously process files from a directory "mydir".
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir")); fileInputFormat.setNestedFileEnumeration(true); env.readFile(fileInputFormat, "hdfs:///shared/mydir", FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) .print(); env.execute(); If I add directory under mydir, say "2016-12-16", and then add a file " *2016-12-16/file.txt"*, its contents are not printed. If I add the same file directly under "*mydir"*, its contents are correctly printed. After that the logs will show the following : 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= 1481882041587 and global mod time= 1481882126122 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704 and global mod time= 1481882126122 Looks like the ContinuousFileMonitoringFunction considered it already read 2016-12-16 as a file and then excludes it, but its contents were not processed. Any Idea why this happens? Thank you. Best, Yassine