Aljoscha is right! Any contribution is more than welcomed.
Kostas > On Jan 10, 2017, at 3:48 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Yes, please go ahead with the fix! :-) > > (If I'm not mistaken Kostas is working on other stuff right now.) > > On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI <y.marzou...@mindlytix.com > <mailto:y.marzou...@mindlytix.com>> wrote: > Hi, > > I found the root cause of the problem : the listEligibleFiles method in > ContinuousFileMonitoringFunction scans only the topmost files and ignores the > nested files. By fixing that I was able to get the expected output. I created > Jira issue: https://issues.apache.org/jira/browse/FLINK-5432 > <https://issues.apache.org/jira/browse/FLINK-5432>. > > @Kostas, If you haven't already started working on a fix for this, I would > happily contribute a fix for it if you like. > > Best, > Yassine > > 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com > <mailto:y.marzou...@mindlytix.com>>: > Hi Kostas, > > I debugged the code and the nestedFileEnumeration parameter was always true > during the execution. I noticed however that in the following loop in > ContinuousFileMonitoringFunction, for some reason, the fileStatus was null > for files in nested folders, and non null for files directly under the parent > path, so no splits were forwarded in the case of nested folders. > > for(int var5 = 0; var5 < var4; ++var5) { > FileInputSplit split = var3[var5]; > FileStatus fileStatus = > (FileStatus)eligibleFiles.get(split.getPath()); > if(fileStatus != null) { > Long modTime = > Long.valueOf(fileStatus.getModificationTime()); > Object splitsToForward = > (List)splitsByModTime.get(modTime); > if(splitsToForward == null) { > splitsToForward = new ArrayList(); > splitsByModTime.put(modTime, splitsToForward); > } > > ((List)splitsToForward).add(new > TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), > split.getPath(), split.getStart(), split.getLength(), split.getHostnames())); > } > } > > Thanks, > Yassine > > > 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com > <mailto:k.klou...@data-artisans.com>>: > Hi Yassine, > > I suspect that the problem is in the way the input format (and not the > reader) scans nested files, > but could you see if in the code that is executed by the tasks, the > nestedFileEnumeration parameter is still true? > > I am asking in order to pin down if the problem is in the way we ship the > code to the tasks or in reading the > nested files. > > Thanks, > Kostas > >> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com >> <mailto:y.marzou...@mindlytix.com>> wrote: >> >> Hi, >> >> Any updates on this issue? Thank you. >> >> Best, >> Yassine >> >> >> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> +kostas, who probably has the most experience with this by now. Do you have >> an idea what might be going on? >> >> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com >> <mailto:y.marzou...@mindlytix.com>> wrote: >> Looks like this is not specific to the continuous file monitoring, I'm >> having the same issue (files in nested directories are not read) when using: >> >> env.readFile(fileInputFormat, "hdfs:///shared/mydir <>", >> FileProcessingMode.PROCESS_ONCE, -1L) >> >> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >> <mailto:y.marzou...@mindlytix.com>>: >> Hi all, >> >> I'm using the following code to continuously process files from a directory >> "mydir". >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> FileInputFormat fileInputFormat = new TextInputFormat(new >> Path("hdfs:///shared/mydir <>")); >> fileInputFormat.setNestedFileEnumeration(true); >> >> env.readFile(fileInputFormat, >> "hdfs:///shared/mydir <>", >> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) >> .print(); >> >> env.execute(); >> >> If I add directory under mydir, say "2016-12-16", and then add a file >> "2016-12-16/file.txt", its contents are not printed. If I add the same file >> directly under "mydir", its contents are correctly printed. After that the >> logs will show the following : >> >> 10:55:44,928 DEBUG >> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >> - Ignoring hdfs://mlxbackoffice/shared/ <>mydir/2016-12-16, with mod time= >> 1481882041587 and global mod time= 1481882126122 >> 10:55:44,928 DEBUG >> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction >> - Ignoring hdfs://mlxbackoffice/shared/ <>mydir/file.txt, with mod time= >> 1481881788704 and global mod time= 1481882126122 >> >> Looks like the ContinuousFileMonitoringFunction considered it already read >> 2016-12-16 as a file and then excludes it, but its contents were not >> processed. Any Idea why this happens? >> Thank you. >> >> Best, >> Yassine >> >> > > >