Thanks for your suggestions: @Timo 1) Regarding the recursive.file.enumeration parameter: I think what counts here is the enumerateNestedFiles parameter in FileInputFormat.java. Calling the setter for enumerateNestedFiles is expected to overwrite recursive.file.enumeration. Not literally - I think recursive.file.enumeration is simply to be ignored here. This is tested in TextInputFormatTest.testNestedFileRead().
@Kostas 2) tempFile.deleteOnExit(): If I remove the line I get the same result. Only the content of the file in the top-level tmp directory is printed. I derived the SSCCE from my real use-case where I encountered the problem originally. I don't mess with the input files there in any way. 3) The given example is run locally. In TextInputFormat.readRecord(String, byte[], int, int) the nestedFileEnumeration parameter is true during execution. Is this what you meant? Cheers, Lukas > On 9 Jan 2017, at 14:56, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > Hi Lukas, > > Are you sure that the tempFile.deleteOnExit() does not remove the files > before the test completes. > I am just asking to be sure. > > Also from the code, I suppose that you run it locally. I suspect that the > problem is in the way the input > format scans nested files, but could you see if in the code that is executed > by the tasks, the > nestedFileEnumeration parameter is still true? > > I am asking in order to pin down if the problem is in the way we ship the > code to the tasks or in reading the > nested files. > > Thanks, > Kostas > >> On Jan 9, 2017, at 2:24 PM, Timo Walther <twal...@apache.org >> <mailto:twal...@apache.org>> wrote: >> >> Hi Lukas, >> >> have you tried to set the parameter " recursive.file.enumeration" to true? >> >> // create a configuration object >> Configuration parameters = new Configuration(); >> >> // set the recursive enumeration parameter >> parameters.setBoolean("recursive.file.enumeration", true); >> If this also does not work, I think this could be a bug. You can open an >> issue for it and attach your sample code. >> >> Timo >> >> >> Am 09/01/17 um 13:47 schrieb Lukas Kircher: >>> Hi all, >>> >>> this is probably related to the problem that I reported in December. In >>> case it helps you can find a self contained example below. I haven't looked >>> deeply into the problem but it seems like the correct file splits are >>> determined but somehow not processed. If I read from HDFS nested files are >>> skipped as well which is a real problem for me at the moment. >>> >>> Cheers, >>> Lukas >>> >>> import org.apache.flink.api.java.io.TextInputFormat; >>> import org.apache.flink.configuration.Configuration; >>> import org.apache.flink.core.fs.Path; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> >>> import java.io.BufferedWriter; >>> import java.io.File; >>> import java.io.FileWriter; >>> >>> public class ReadDirectorySSCCE { >>> public static void main(String[] args) throws Exception { >>> // create given dirs and add a .csv file to each one >>> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; >>> for (String dir: dirs) { >>> // create input file >>> File tmpDir = new File(dir); >>> if (!tmpDir.exists()) { >>> tmpDir.mkdirs(); >>> } >>> File tempFile = File.createTempFile("file", ".csv", tmpDir); >>> BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); >>> w.write("content of " + dir + "/file.csv"); >>> w.close(); >>> tempFile.deleteOnExit(); >>> } >>> File root = new File("tmp"); >>> >>> TextInputFormat inputFormat = new TextInputFormat(new >>> Path(root.toURI().toString())); >>> inputFormat.setNestedFileEnumeration(true); >>> >>> inputFormat.configure(new Configuration()); >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.createInput(inputFormat).print(); >>> env.execute(); >>> } >>> } >>>> >>>>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>>> <mailto:y.marzou...@mindlytix.com>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> Any updates on this issue? Thank you. >>>>> >>>>> Best, >>>>> Yassine >>>>> >>>>> >>>>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org >>>>> <mailto:aljos...@apache.org>> wrote: >>>>> +kostas, who probably has the most experience with this by now. Do you >>>>> have an idea what might be going on? >>>>> >>>>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>>> <mailto:y.marzou...@mindlytix.com>> wrote: >>>>> Looks like this is not specific to the continuous file monitoring, I'm >>>>> having the same issue (files in nested directories are not read) when >>>>> using: >>>>> >>>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir >>>>> <hdfs:///shared/mydir>", FileProcessingMode.PROCESS_ONCE, -1L) >>>>> >>>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>>> <mailto:y.marzou...@mindlytix.com>>: >>>>> Hi all, >>>>> >>>>> I'm using the following code to continuously process files from a >>>>> directory "mydir". >>>>> >>>>> final StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> FileInputFormat fileInputFormat = new TextInputFormat(new >>>>> Path("hdfs:///shared/mydir <hdfs:///shared/mydir>")); >>>>> fileInputFormat.setNestedFileEnumeration(true); >>>>> >>>>> env.readFile(fileInputFormat, >>>>> "hdfs:///shared/mydir <hdfs:///shared/mydir>", >>>>> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) >>>>> .print(); >>>>> >>>>> env.execute(); >>>>> >>>>> If I add directory under mydir, say "2016-12-16", and then add a file >>>>> "2016-12-16/file.txt", its contents are not printed. If I add the same >>>>> file directly under "mydir", its contents are correctly printed. After >>>>> that the logs will show the following : >>>>> >>>>> 10:55:44,928 DEBUG >>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >>>>> - Ignoring hdfs://mlxbackoffice/shared/ >>>>> <hdfs://mlxbackoffice/shared/>mydir/2016-12-16, with mod time= >>>>> 1481882041587 and global mod time= 1481882126122 >>>>> 10:55:44,928 DEBUG >>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >>>>> - Ignoring hdfs://mlxbackoffice/shared/ >>>>> <hdfs://mlxbackoffice/shared/>mydir/file.txt, with mod time= >>>>> 1481881788704 and global mod time= 1481882126122 >>>>> >>>>> Looks like the ContinuousFileMonitoringFunction considered it already >>>>> read 2016-12-16 as a file and then excludes it, but its contents were not >>>>> processed. Any Idea why this happens? >>>>> Thank you. >>>>> >>>>> Best, >>>>> Yassine >>>>> >>>>> >>>> >>> >> >