Yes, thanks for the effort. I will look into it.

Kostas

> On Jan 9, 2017, at 4:24 PM, Lukas Kircher <lukas.kirc...@uni-konstanz.de> 
> wrote:
> 
> Thanks for your suggestions:
> 
> @Timo
> 1) Regarding the recursive.file.enumeration parameter: I think what counts 
> here is the enumerateNestedFiles parameter in FileInputFormat.java. Calling 
> the setter for enumerateNestedFiles is expected to overwrite 
> recursive.file.enumeration. Not literally - I think 
> recursive.file.enumeration is simply to be ignored here. This is tested in 
> TextInputFormatTest.testNestedFileRead().
> 
> @Kostas
> 2) tempFile.deleteOnExit(): If I remove the line I get the same result. Only 
> the content of the file in the top-level tmp directory is printed. I derived 
> the SSCCE from my real use-case where I encountered the problem originally. I 
> don't mess with the input files there in any way.
> 
> 3) The given example is run locally. In TextInputFormat.readRecord(String, 
> byte[], int, int) the nestedFileEnumeration parameter is true during 
> execution. Is this what you meant?
> 
> Cheers,
> Lukas
> 
>> On 9 Jan 2017, at 14:56, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Lukas,
>> 
>> Are you sure that the tempFile.deleteOnExit() does not remove the files 
>> before the test completes.
>> I am just asking to be sure.
>> 
>> Also from the code, I suppose that you run it locally. I suspect that the 
>> problem is in the way the input
>> format scans nested files, but could you see if in the code that is executed 
>> by the tasks, the 
>> nestedFileEnumeration parameter is still true?
>> 
>> I am asking in order to pin down if the problem is in the way we ship the 
>> code to the tasks or in reading the 
>> nested files.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 9, 2017, at 2:24 PM, Timo Walther <twal...@apache.org 
>>> <mailto:twal...@apache.org>> wrote:
>>> 
>>> Hi Lukas,
>>> 
>>> have you tried to set the parameter " recursive.file.enumeration" to true?
>>> 
>>> // create a configuration object
>>> Configuration parameters = new Configuration();
>>> 
>>> // set the recursive enumeration parameter
>>> parameters.setBoolean("recursive.file.enumeration", true);
>>> If this also does not work, I think this could be a bug. You can open an 
>>> issue for it and attach your sample code.
>>> 
>>> Timo
>>> 
>>> 
>>> Am 09/01/17 um 13:47 schrieb Lukas Kircher:
>>>> Hi all,
>>>> 
>>>> this is probably related to the problem that I reported in December. In 
>>>> case it helps you can find a self contained example below. I haven't 
>>>> looked deeply into the problem but it seems like the correct file splits 
>>>> are determined but somehow not processed. If I read from HDFS nested files 
>>>> are skipped as well which is a real problem for me at the moment.
>>>> 
>>>> Cheers,
>>>> Lukas
>>>> 
>>>> import org.apache.flink.api.java.io.TextInputFormat;
>>>> import org.apache.flink.configuration.Configuration;
>>>> import org.apache.flink.core.fs.Path;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> 
>>>> import java.io.BufferedWriter;
>>>> import java.io.File;
>>>> import java.io.FileWriter;
>>>> 
>>>> public class ReadDirectorySSCCE {
>>>>     public static void main(String[] args) throws Exception {
>>>>         // create given dirs and add a .csv file to each one
>>>>         String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
>>>>         for (String dir: dirs) {
>>>>             // create input file
>>>>             File tmpDir = new File(dir);
>>>>             if (!tmpDir.exists()) {
>>>>                 tmpDir.mkdirs();
>>>>             }
>>>>             File tempFile = File.createTempFile("file", ".csv", tmpDir);
>>>>             BufferedWriter w = new BufferedWriter(new 
>>>> FileWriter(tempFile));
>>>>             w.write("content of " + dir + "/file.csv");
>>>>             w.close();
>>>>             tempFile.deleteOnExit();
>>>>         }
>>>>         File root = new File("tmp");
>>>> 
>>>>         TextInputFormat inputFormat = new TextInputFormat(new 
>>>> Path(root.toURI().toString()));
>>>>         inputFormat.setNestedFileEnumeration(true);
>>>> 
>>>>         inputFormat.configure(new Configuration());
>>>> 
>>>>         StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         env.createInput(inputFormat).print();
>>>>         env.execute();
>>>>     }
>>>> }
>>>>> 
>>>>>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>>>>>> <mailto:y.marzou...@mindlytix.com>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Any updates on this issue? Thank you.
>>>>>> 
>>>>>> Best,
>>>>>> Yassine
>>>>>> 
>>>>>> 
>>>>>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org 
>>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>>> +kostas, who probably has the most experience with this by now. Do you 
>>>>>> have an idea what might be going on?
>>>>>> 
>>>>>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI 
>>>>>> <y.marzou...@mindlytix.com <mailto:y.marzou...@mindlytix.com>> wrote:
>>>>>> Looks like this is not specific to the continuous file monitoring, I'm 
>>>>>> having the same issue (files in nested directories are not read) when 
>>>>>> using:
>>>>>> 
>>>>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir 
>>>>>> <hdfs:///shared/mydir>", FileProcessingMode.PROCESS_ONCE, -1L)
>>>>>> 
>>>>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>>>>>> <mailto:y.marzou...@mindlytix.com>>:
>>>>>> Hi all,
>>>>>> 
>>>>>> I'm using the following code to continuously process files from a 
>>>>>> directory "mydir".
>>>>>> 
>>>>>> final StreamExecutionEnvironment env = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> 
>>>>>> FileInputFormat fileInputFormat = new TextInputFormat(new 
>>>>>> Path("hdfs:///shared/mydir <hdfs:///shared/mydir>"));
>>>>>> fileInputFormat.setNestedFileEnumeration(true);
>>>>>> 
>>>>>> env.readFile(fileInputFormat,
>>>>>>                 "hdfs:///shared/mydir <hdfs:///shared/mydir>",
>>>>>>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>>>>>                 .print();
>>>>>> 
>>>>>> env.execute();
>>>>>> 
>>>>>> If I add directory under mydir, say "2016-12-16", and then add a file 
>>>>>> "2016-12-16/file.txt", its contents are not printed. If I add the same 
>>>>>> file directly under "mydir",  its contents are correctly printed. After 
>>>>>> that the logs will show the following :
>>>>>> 
>>>>>> 10:55:44,928 DEBUG 
>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>>>>>   - Ignoring hdfs://mlxbackoffice/shared/ 
>>>>>> <hdfs://mlxbackoffice/shared/>mydir/2016-12-16, with mod time= 
>>>>>> 1481882041587 and global mod time= 1481882126122
>>>>>> 10:55:44,928 DEBUG 
>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>>>>>   - Ignoring hdfs://mlxbackoffice/shared/ 
>>>>>> <hdfs://mlxbackoffice/shared/>mydir/file.txt, with mod time= 
>>>>>> 1481881788704 and global mod time= 1481882126122
>>>>>> 
>>>>>> Looks like the ContinuousFileMonitoringFunction  considered it already 
>>>>>> read 2016-12-16 as a file and then excludes it, but its contents were 
>>>>>> not processed. Any Idea why this happens?
>>>>>> Thank you.
>>>>>> 
>>>>>> Best,
>>>>>> Yassine
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to