Yes, thanks for the effort. I will look into it. Kostas
> On Jan 9, 2017, at 4:24 PM, Lukas Kircher <lukas.kirc...@uni-konstanz.de> > wrote: > > Thanks for your suggestions: > > @Timo > 1) Regarding the recursive.file.enumeration parameter: I think what counts > here is the enumerateNestedFiles parameter in FileInputFormat.java. Calling > the setter for enumerateNestedFiles is expected to overwrite > recursive.file.enumeration. Not literally - I think > recursive.file.enumeration is simply to be ignored here. This is tested in > TextInputFormatTest.testNestedFileRead(). > > @Kostas > 2) tempFile.deleteOnExit(): If I remove the line I get the same result. Only > the content of the file in the top-level tmp directory is printed. I derived > the SSCCE from my real use-case where I encountered the problem originally. I > don't mess with the input files there in any way. > > 3) The given example is run locally. In TextInputFormat.readRecord(String, > byte[], int, int) the nestedFileEnumeration parameter is true during > execution. Is this what you meant? > > Cheers, > Lukas > >> On 9 Jan 2017, at 14:56, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> >> Hi Lukas, >> >> Are you sure that the tempFile.deleteOnExit() does not remove the files >> before the test completes. >> I am just asking to be sure. >> >> Also from the code, I suppose that you run it locally. I suspect that the >> problem is in the way the input >> format scans nested files, but could you see if in the code that is executed >> by the tasks, the >> nestedFileEnumeration parameter is still true? >> >> I am asking in order to pin down if the problem is in the way we ship the >> code to the tasks or in reading the >> nested files. >> >> Thanks, >> Kostas >> >>> On Jan 9, 2017, at 2:24 PM, Timo Walther <twal...@apache.org >>> <mailto:twal...@apache.org>> wrote: >>> >>> Hi Lukas, >>> >>> have you tried to set the parameter " recursive.file.enumeration" to true? >>> >>> // create a configuration object >>> Configuration parameters = new Configuration(); >>> >>> // set the recursive enumeration parameter >>> parameters.setBoolean("recursive.file.enumeration", true); >>> If this also does not work, I think this could be a bug. You can open an >>> issue for it and attach your sample code. >>> >>> Timo >>> >>> >>> Am 09/01/17 um 13:47 schrieb Lukas Kircher: >>>> Hi all, >>>> >>>> this is probably related to the problem that I reported in December. In >>>> case it helps you can find a self contained example below. I haven't >>>> looked deeply into the problem but it seems like the correct file splits >>>> are determined but somehow not processed. If I read from HDFS nested files >>>> are skipped as well which is a real problem for me at the moment. >>>> >>>> Cheers, >>>> Lukas >>>> >>>> import org.apache.flink.api.java.io.TextInputFormat; >>>> import org.apache.flink.configuration.Configuration; >>>> import org.apache.flink.core.fs.Path; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> >>>> import java.io.BufferedWriter; >>>> import java.io.File; >>>> import java.io.FileWriter; >>>> >>>> public class ReadDirectorySSCCE { >>>> public static void main(String[] args) throws Exception { >>>> // create given dirs and add a .csv file to each one >>>> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; >>>> for (String dir: dirs) { >>>> // create input file >>>> File tmpDir = new File(dir); >>>> if (!tmpDir.exists()) { >>>> tmpDir.mkdirs(); >>>> } >>>> File tempFile = File.createTempFile("file", ".csv", tmpDir); >>>> BufferedWriter w = new BufferedWriter(new >>>> FileWriter(tempFile)); >>>> w.write("content of " + dir + "/file.csv"); >>>> w.close(); >>>> tempFile.deleteOnExit(); >>>> } >>>> File root = new File("tmp"); >>>> >>>> TextInputFormat inputFormat = new TextInputFormat(new >>>> Path(root.toURI().toString())); >>>> inputFormat.setNestedFileEnumeration(true); >>>> >>>> inputFormat.configure(new Configuration()); >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.createInput(inputFormat).print(); >>>> env.execute(); >>>> } >>>> } >>>>> >>>>>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>>>> <mailto:y.marzou...@mindlytix.com>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> Any updates on this issue? Thank you. >>>>>> >>>>>> Best, >>>>>> Yassine >>>>>> >>>>>> >>>>>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org >>>>>> <mailto:aljos...@apache.org>> wrote: >>>>>> +kostas, who probably has the most experience with this by now. Do you >>>>>> have an idea what might be going on? >>>>>> >>>>>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI >>>>>> <y.marzou...@mindlytix.com <mailto:y.marzou...@mindlytix.com>> wrote: >>>>>> Looks like this is not specific to the continuous file monitoring, I'm >>>>>> having the same issue (files in nested directories are not read) when >>>>>> using: >>>>>> >>>>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir >>>>>> <hdfs:///shared/mydir>", FileProcessingMode.PROCESS_ONCE, -1L) >>>>>> >>>>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>>>> <mailto:y.marzou...@mindlytix.com>>: >>>>>> Hi all, >>>>>> >>>>>> I'm using the following code to continuously process files from a >>>>>> directory "mydir". >>>>>> >>>>>> final StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> >>>>>> FileInputFormat fileInputFormat = new TextInputFormat(new >>>>>> Path("hdfs:///shared/mydir <hdfs:///shared/mydir>")); >>>>>> fileInputFormat.setNestedFileEnumeration(true); >>>>>> >>>>>> env.readFile(fileInputFormat, >>>>>> "hdfs:///shared/mydir <hdfs:///shared/mydir>", >>>>>> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) >>>>>> .print(); >>>>>> >>>>>> env.execute(); >>>>>> >>>>>> If I add directory under mydir, say "2016-12-16", and then add a file >>>>>> "2016-12-16/file.txt", its contents are not printed. If I add the same >>>>>> file directly under "mydir", its contents are correctly printed. After >>>>>> that the logs will show the following : >>>>>> >>>>>> 10:55:44,928 DEBUG >>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >>>>>> - Ignoring hdfs://mlxbackoffice/shared/ >>>>>> <hdfs://mlxbackoffice/shared/>mydir/2016-12-16, with mod time= >>>>>> 1481882041587 and global mod time= 1481882126122 >>>>>> 10:55:44,928 DEBUG >>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >>>>>> - Ignoring hdfs://mlxbackoffice/shared/ >>>>>> <hdfs://mlxbackoffice/shared/>mydir/file.txt, with mod time= >>>>>> 1481881788704 and global mod time= 1481882126122 >>>>>> >>>>>> Looks like the ContinuousFileMonitoringFunction considered it already >>>>>> read 2016-12-16 as a file and then excludes it, but its contents were >>>>>> not processed. Any Idea why this happens? >>>>>> Thank you. >>>>>> >>>>>> Best, >>>>>> Yassine >>>>>> >>>>>> >>>>> >>>> >>> >> >