Hi all,

I'm using the following code to continuously process files from a directory
"mydir".

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new
Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
                "hdfs:///shared/mydir",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
                .print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "
*2016-12-16/file.txt"*, its contents are not printed. If I add the same
file directly under "*mydir"*,  its contents are correctly printed. After
that the logs will show the following :

10:55:44,928 DEBUG
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
 - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
 - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time=
1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already
read 2016-12-16 as a file and then excludes it, but its contents were not
processed. Any Idea why this happens?
Thank you.

Best,
Yassine

Reply via email to