Hi Lukas, Are you sure that the tempFile.deleteOnExit() does not remove the files before the test completes. I am just asking to be sure.
Also from the code, I suppose that you run it locally. I suspect that the problem is in the way the input format scans nested files, but could you see if in the code that is executed by the tasks, the nestedFileEnumeration parameter is still true? I am asking in order to pin down if the problem is in the way we ship the code to the tasks or in reading the nested files. Thanks, Kostas > On Jan 9, 2017, at 2:24 PM, Timo Walther <twal...@apache.org> wrote: > > Hi Lukas, > > have you tried to set the parameter " recursive.file.enumeration" to true? > > // create a configuration object > Configuration parameters = new Configuration(); > > // set the recursive enumeration parameter > parameters.setBoolean("recursive.file.enumeration", true); > If this also does not work, I think this could be a bug. You can open an > issue for it and attach your sample code. > > Timo > > > Am 09/01/17 um 13:47 schrieb Lukas Kircher: >> Hi all, >> >> this is probably related to the problem that I reported in December. In case >> it helps you can find a self contained example below. I haven't looked >> deeply into the problem but it seems like the correct file splits are >> determined but somehow not processed. If I read from HDFS nested files are >> skipped as well which is a real problem for me at the moment. >> >> Cheers, >> Lukas >> >> import org.apache.flink.api.java.io.TextInputFormat; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.core.fs.Path; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> import java.io.BufferedWriter; >> import java.io.File; >> import java.io.FileWriter; >> >> public class ReadDirectorySSCCE { >> public static void main(String[] args) throws Exception { >> // create given dirs and add a .csv file to each one >> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; >> for (String dir: dirs) { >> // create input file >> File tmpDir = new File(dir); >> if (!tmpDir.exists()) { >> tmpDir.mkdirs(); >> } >> File tempFile = File.createTempFile("file", ".csv", tmpDir); >> BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); >> w.write("content of " + dir + "/file.csv"); >> w.close(); >> tempFile.deleteOnExit(); >> } >> File root = new File("tmp"); >> >> TextInputFormat inputFormat = new TextInputFormat(new >> Path(root.toURI().toString())); >> inputFormat.setNestedFileEnumeration(true); >> >> inputFormat.configure(new Configuration()); >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.createInput(inputFormat).print(); >> env.execute(); >> } >> } >>> >>>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>> <mailto:y.marzou...@mindlytix.com>> wrote: >>>> >>>> Hi, >>>> >>>> Any updates on this issue? Thank you. >>>> >>>> Best, >>>> Yassine >>>> >>>> >>>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org >>>> <mailto:aljos...@apache.org>> wrote: >>>> +kostas, who probably has the most experience with this by now. Do you >>>> have an idea what might be going on? >>>> >>>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>> <mailto:y.marzou...@mindlytix.com>> wrote: >>>> Looks like this is not specific to the continuous file monitoring, I'm >>>> having the same issue (files in nested directories are not read) when >>>> using: >>>> >>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir >>>> <hdfs:///shared/mydir>", FileProcessingMode.PROCESS_ONCE, >>>> -1L) >>>> >>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>> <mailto:y.marzou...@mindlytix.com>>: >>>> Hi all, >>>> >>>> I'm using the following code to continuously process files from a >>>> directory "mydir". >>>> >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> FileInputFormat fileInputFormat = new TextInputFormat(new >>>> Path("hdfs:///shared/mydir <hdfs:///shared/mydir>")); >>>> fileInputFormat.setNestedFileEnumeration(true); >>>> >>>> env.readFile(fileInputFormat, >>>> "hdfs:///shared/mydir <hdfs:///shared/mydir>", >>>> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) >>>> .print(); >>>> >>>> env.execute(); >>>> >>>> If I add directory under mydir, say "2016-12-16", and then add a file >>>> "2016-12-16/file.txt", its contents are not printed. If I add the same >>>> file directly under "mydir", its contents are correctly printed. After >>>> that the logs will show the following : >>>> >>>> 10:55:44,928 DEBUG >>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >>>> - Ignoring hdfs://mlxbackoffice/shared/ >>>> <hdfs://mlxbackoffice/shared/>mydir/2016-12-16, with mod time= >>>> 1481882041587 and global mod time= 1481882126122 >>>> 10:55:44,928 DEBUG >>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >>>> - Ignoring hdfs://mlxbackoffice/shared/ >>>> <hdfs://mlxbackoffice/shared/>mydir/file.txt, with mod time= 1481881788704 >>>> and global mod time= 1481882126122 >>>> >>>> Looks like the ContinuousFileMonitoringFunction considered it already >>>> read 2016-12-16 as a file and then excludes it, but its contents were not >>>> processed. Any Idea why this happens? >>>> Thank you. >>>> >>>> Best, >>>> Yassine >>>> >>>> >>> >> >