Yes, please go ahead with the fix! :-) (If I'm not mistaken Kostas is working on other stuff right now.)
On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI <y.marzou...@mindlytix.com> wrote: > Hi, > > I found the root cause of the problem : the listEligibleFiles method in > ContinuousFileMonitoringFunction > scans only the topmost files and ignores the nested files. By fixing that > I was able to get the expected output. I created Jira issue: > https://issues.apache.org/jira/browse/FLINK-5432. > > @Kostas, If you haven't already started working on a fix for this, I would > happily contribute a fix for it if you like. > > Best, > Yassine > > 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > > Hi Kostas, > > I debugged the code and the nestedFileEnumeration parameter was always > true during the execution. I noticed however that in the following loop > in ContinuousFileMonitoringFunction, for some reason, the fileStatus was > null for files in nested folders, and non null for files directly under the > parent path, so no splits were forwarded in the case of nested folders. > > for(int var5 = 0; var5 < var4; ++var5) { > FileInputSplit split = var3[var5]; > FileStatus fileStatus = > (FileStatus)eligibleFiles.get(split.getPath()); > if(fileStatus != null) { > Long modTime = > Long.valueOf(fileStatus.getModificationTime()); > Object splitsToForward = > (List)splitsByModTime.get(modTime); > if(splitsToForward == null) { > splitsToForward = new ArrayList(); > splitsByModTime.put(modTime, splitsToForward); > } > > ((List)splitsToForward).add(new > TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), > split.getPath(), split.getStart(), split.getLength(), > split.getHostnames())); > } > } > > Thanks, > Yassine > > > 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>: > > Hi Yassine, > > I suspect that the problem is in the way the input format (and not the > reader) scans nested files, > but could you see if in the code that is executed by the tasks, the > nestedFileEnumeration parameter is still true? > > I am asking in order to pin down if the problem is in the way we ship the > code to the tasks or in reading the > nested files. > > Thanks, > Kostas > > On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com> > wrote: > > Hi, > > Any updates on this issue? Thank you. > > Best, > Yassine > > > On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote: > > +kostas, who probably has the most experience with this by now. Do you > have an idea what might be going on? > > On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com> > wrote: > > Looks like this is not specific to the continuous file monitoring, I'm > having the same issue (files in nested directories are not read) when using: > > env.readFile(fileInputFormat, "hdfs:///shared/mydir", > FileProcessingMode.PROCESS_ONCE, > -1L) > > 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > > Hi all, > > I'm using the following code to continuously process files from a > directory "mydir". > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > FileInputFormat fileInputFormat = new TextInputFormat(new Path(" > hdfs:///shared/mydir")); > fileInputFormat.setNestedFileEnumeration(true); > > env.readFile(fileInputFormat, > "hdfs:///shared/mydir", > FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) > .print(); > > env.execute(); > > If I add directory under mydir, say "2016-12-16", and then add a file " > *2016-12-16/file.txt"*, its contents are not printed. If I add the same > file directly under "*mydir"*, its contents are correctly printed. After > that the logs will show the following : > > 10:55:44,928 DEBUG > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction > - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= > 1481882041587 and global mod time= 1481882126122 > 10:55:44,928 DEBUG > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction > - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= > 1481881788704 and global mod time= 1481882126122 > > Looks like the ContinuousFileMonitoringFunction considered it already > read 2016-12-16 as a file and then excludes it, but its contents were not > processed. Any Idea why this happens? > Thank you. > > Best, > Yassine > > > > > > >