Thanks for your suggestions:

@Timo
1) Regarding the recursive.file.enumeration parameter: I think what counts here 
is the enumerateNestedFiles parameter in FileInputFormat.java. Calling the 
setter for enumerateNestedFiles is expected to overwrite 
recursive.file.enumeration. Not literally - I think recursive.file.enumeration 
is simply to be ignored here. This is tested in 
TextInputFormatTest.testNestedFileRead().

@Kostas
2) tempFile.deleteOnExit(): If I remove the line I get the same result. Only 
the content of the file in the top-level tmp directory is printed. I derived 
the SSCCE from my real use-case where I encountered the problem originally. I 
don't mess with the input files there in any way.

3) The given example is run locally. In TextInputFormat.readRecord(String, 
byte[], int, int) the nestedFileEnumeration parameter is true during execution. 
Is this what you meant?

Cheers,
Lukas

> On 9 Jan 2017, at 14:56, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> 
> Hi Lukas,
> 
> Are you sure that the tempFile.deleteOnExit() does not remove the files 
> before the test completes.
> I am just asking to be sure.
> 
> Also from the code, I suppose that you run it locally. I suspect that the 
> problem is in the way the input
> format scans nested files, but could you see if in the code that is executed 
> by the tasks, the 
> nestedFileEnumeration parameter is still true?
> 
> I am asking in order to pin down if the problem is in the way we ship the 
> code to the tasks or in reading the 
> nested files.
> 
> Thanks,
> Kostas
> 
>> On Jan 9, 2017, at 2:24 PM, Timo Walther <twal...@apache.org 
>> <mailto:twal...@apache.org>> wrote:
>> 
>> Hi Lukas,
>> 
>> have you tried to set the parameter " recursive.file.enumeration" to true?
>> 
>> // create a configuration object
>> Configuration parameters = new Configuration();
>> 
>> // set the recursive enumeration parameter
>> parameters.setBoolean("recursive.file.enumeration", true);
>> If this also does not work, I think this could be a bug. You can open an 
>> issue for it and attach your sample code.
>> 
>> Timo
>> 
>> 
>> Am 09/01/17 um 13:47 schrieb Lukas Kircher:
>>> Hi all,
>>> 
>>> this is probably related to the problem that I reported in December. In 
>>> case it helps you can find a self contained example below. I haven't looked 
>>> deeply into the problem but it seems like the correct file splits are 
>>> determined but somehow not processed. If I read from HDFS nested files are 
>>> skipped as well which is a real problem for me at the moment.
>>> 
>>> Cheers,
>>> Lukas
>>> 
>>> import org.apache.flink.api.java.io.TextInputFormat;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.core.fs.Path;
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> 
>>> import java.io.BufferedWriter;
>>> import java.io.File;
>>> import java.io.FileWriter;
>>> 
>>> public class ReadDirectorySSCCE {
>>>     public static void main(String[] args) throws Exception {
>>>         // create given dirs and add a .csv file to each one
>>>         String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
>>>         for (String dir: dirs) {
>>>             // create input file
>>>             File tmpDir = new File(dir);
>>>             if (!tmpDir.exists()) {
>>>                 tmpDir.mkdirs();
>>>             }
>>>             File tempFile = File.createTempFile("file", ".csv", tmpDir);
>>>             BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
>>>             w.write("content of " + dir + "/file.csv");
>>>             w.close();
>>>             tempFile.deleteOnExit();
>>>         }
>>>         File root = new File("tmp");
>>> 
>>>         TextInputFormat inputFormat = new TextInputFormat(new 
>>> Path(root.toURI().toString()));
>>>         inputFormat.setNestedFileEnumeration(true);
>>> 
>>>         inputFormat.configure(new Configuration());
>>> 
>>>         StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         env.createInput(inputFormat).print();
>>>         env.execute();
>>>     }
>>> }
>>>> 
>>>>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>>>>> <mailto:y.marzou...@mindlytix.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Any updates on this issue? Thank you.
>>>>> 
>>>>> Best,
>>>>> Yassine
>>>>> 
>>>>> 
>>>>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org 
>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>> +kostas, who probably has the most experience with this by now. Do you 
>>>>> have an idea what might be going on?
>>>>> 
>>>>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>>>>> <mailto:y.marzou...@mindlytix.com>> wrote:
>>>>> Looks like this is not specific to the continuous file monitoring, I'm 
>>>>> having the same issue (files in nested directories are not read) when 
>>>>> using:
>>>>> 
>>>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir 
>>>>> <hdfs:///shared/mydir>", FileProcessingMode.PROCESS_ONCE, -1L)
>>>>> 
>>>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>>>>> <mailto:y.marzou...@mindlytix.com>>:
>>>>> Hi all,
>>>>> 
>>>>> I'm using the following code to continuously process files from a 
>>>>> directory "mydir".
>>>>> 
>>>>> final StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> 
>>>>> FileInputFormat fileInputFormat = new TextInputFormat(new 
>>>>> Path("hdfs:///shared/mydir <hdfs:///shared/mydir>"));
>>>>> fileInputFormat.setNestedFileEnumeration(true);
>>>>> 
>>>>> env.readFile(fileInputFormat,
>>>>>                 "hdfs:///shared/mydir <hdfs:///shared/mydir>",
>>>>>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>>>>                 .print();
>>>>> 
>>>>> env.execute();
>>>>> 
>>>>> If I add directory under mydir, say "2016-12-16", and then add a file 
>>>>> "2016-12-16/file.txt", its contents are not printed. If I add the same 
>>>>> file directly under "mydir",  its contents are correctly printed. After 
>>>>> that the logs will show the following :
>>>>> 
>>>>> 10:55:44,928 DEBUG 
>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>>>>   - Ignoring hdfs://mlxbackoffice/shared/ 
>>>>> <hdfs://mlxbackoffice/shared/>mydir/2016-12-16, with mod time= 
>>>>> 1481882041587 and global mod time= 1481882126122
>>>>> 10:55:44,928 DEBUG 
>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>>>>   - Ignoring hdfs://mlxbackoffice/shared/ 
>>>>> <hdfs://mlxbackoffice/shared/>mydir/file.txt, with mod time= 
>>>>> 1481881788704 and global mod time= 1481882126122
>>>>> 
>>>>> Looks like the ContinuousFileMonitoringFunction  considered it already 
>>>>> read 2016-12-16 as a file and then excludes it, but its contents were not 
>>>>> processed. Any Idea why this happens?
>>>>> Thank you.
>>>>> 
>>>>> Best,
>>>>> Yassine
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to