Hi all, this is probably related to the problem that I reported in December. In case it helps you can find a self contained example below. I haven't looked deeply into the problem but it seems like the correct file splits are determined but somehow not processed. If I read from HDFS nested files are skipped as well which is a real problem for me at the moment.
Cheers, Lukas import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; public class ReadDirectorySSCCE { public static void main(String[] args) throws Exception { // create given dirs and add a .csv file to each one String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; for (String dir: dirs) { // create input file File tmpDir = new File(dir); if (!tmpDir.exists()) { tmpDir.mkdirs(); } File tempFile = File.createTempFile("file", ".csv", tmpDir); BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); w.write("content of " + dir + "/file.csv"); w.close(); tempFile.deleteOnExit(); } File root = new File("tmp"); TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString())); inputFormat.setNestedFileEnumeration(true); inputFormat.configure(new Configuration()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.createInput(inputFormat).print(); env.execute(); } } > >> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzou...@mindlytix.com >> <mailto:y.marzou...@mindlytix.com>> wrote: >> >> Hi, >> >> Any updates on this issue? Thank you. >> >> Best, >> Yassine >> >> >> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> +kostas, who probably has the most experience with this by now. Do you have >> an idea what might be going on? >> >> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com >> <mailto:y.marzou...@mindlytix.com>> wrote: >> Looks like this is not specific to the continuous file monitoring, I'm >> having the same issue (files in nested directories are not read) when using: >> >> env.readFile(fileInputFormat, "hdfs:///shared/mydir <hdfs:///shared/mydir>", >> FileProcessingMode.PROCESS_ONCE, -1L) >> >> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >> <mailto:y.marzou...@mindlytix.com>>: >> Hi all, >> >> I'm using the following code to continuously process files from a directory >> "mydir". >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> FileInputFormat fileInputFormat = new TextInputFormat(new >> Path("hdfs:///shared/mydir <hdfs:///shared/mydir>")); >> fileInputFormat.setNestedFileEnumeration(true); >> >> env.readFile(fileInputFormat, >> "hdfs:///shared/mydir <hdfs:///shared/mydir>", >> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) >> .print(); >> >> env.execute(); >> >> If I add directory under mydir, say "2016-12-16", and then add a file >> "2016-12-16/file.txt", its contents are not printed. If I add the same file >> directly under "mydir", its contents are correctly printed. After that the >> logs will show the following : >> >> 10:55:44,928 DEBUG >> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >> - Ignoring hdfs://mlxbackoffice/shared/ >> <hdfs://mlxbackoffice/shared/>mydir/2016-12-16, with mod time= 1481882041587 >> and global mod time= 1481882126122 >> 10:55:44,928 DEBUG >> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >> - Ignoring hdfs://mlxbackoffice/shared/ >> <hdfs://mlxbackoffice/shared/>mydir/file.txt, with mod time= 1481881788704 >> and global mod time= 1481882126122 >> >> Looks like the ContinuousFileMonitoringFunction considered it already read >> 2016-12-16 as a file and then excludes it, but its contents were not >> processed. Any Idea why this happens? >> Thank you. >> >> Best, >> Yassine >> >> >