Looks like this is not specific to the continuous file monitoring, I'm
having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir",
FileProcessingMode.PROCESS_ONCE,
-1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi all,
>
> I'm using the following code to continuously process files from a
> directory "mydir".
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> FileInputFormat fileInputFormat = new TextInputFormat(new
> Path("hdfs:///shared/mydir"));
> fileInputFormat.setNestedFileEnumeration(true);
>
> env.readFile(fileInputFormat,
>                 "hdfs:///shared/mydir",
>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>                 .print();
>
> env.execute();
>
> If I add directory under mydir, say "2016-12-16", and then add a file "
> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
> file directly under "*mydir"*,  its contents are correctly printed. After
> that the logs will show the following :
>
> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.
> ContinuousFileMonitoringFunction  - Ignoring 
> hdfs://mlxbackoffice/shared/mydir/2016-12-16,
> with mod time= 1481882041587 and global mod time= 1481882126122
> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.
> ContinuousFileMonitoringFunction  - Ignoring 
> hdfs://mlxbackoffice/shared/mydir/file.txt,
> with mod time= 1481881788704 and global mod time= 1481882126122
>
> Looks like the ContinuousFileMonitoringFunction  considered it already
> read 2016-12-16 as a file and then excludes it, but its contents were not
> processed. Any Idea why this happens?
> Thank you.
>
> Best,
> Yassine
>

Reply via email to