Hi, I found the root cause of the problem : the listEligibleFiles method in ContinuousFileMonitoringFunction scans only the topmost files and ignores the nested files. By fixing that I was able to get the expected output. I created Jira issue: https://issues.apache.org/jira/browse/FLINK-5432.
@Kostas, If you haven't already started working on a fix for this, I would happily contribute a fix for it if you like. Best, Yassine 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Hi Kostas, > > I debugged the code and the nestedFileEnumeration parameter was always > true during the execution. I noticed however that in the following loop in > ContinuousFileMonitoringFunction, for some reason, the fileStatus was > null for files in nested folders, and non null for files directly under the > parent path, so no splits were forwarded in the case of nested folders. > > for(int var5 = 0; var5 < var4; ++var5) { > FileInputSplit split = var3[var5]; > FileStatus fileStatus = (FileStatus)eligibleFiles.get( > split.getPath()); > if(fileStatus != null) { > Long modTime = Long.valueOf(fileStatus. > getModificationTime()); > Object splitsToForward = (List)splitsByModTime.get( > modTime); > if(splitsToForward == null) { > splitsToForward = new ArrayList(); > splitsByModTime.put(modTime, splitsToForward); > } > > ((List)splitsToForward).add(new > TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), > split.getPath(), split.getStart(), split.getLength(), > split.getHostnames())); > } > } > > Thanks, > Yassine > > > 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>: > >> Hi Yassine, >> >> I suspect that the problem is in the way the input format (and not the >> reader) scans nested files, >> but could you see if in the code that is executed by the tasks, the >> nestedFileEnumeration parameter is still true? >> >> I am asking in order to pin down if the problem is in the way we ship the >> code to the tasks or in reading the >> nested files. >> >> Thanks, >> Kostas >> >> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com> >> wrote: >> >> Hi, >> >> Any updates on this issue? Thank you. >> >> Best, >> Yassine >> >> >> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote: >> >> +kostas, who probably has the most experience with this by now. Do you >> have an idea what might be going on? >> >> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com> >> wrote: >> >>> Looks like this is not specific to the continuous file monitoring, I'm >>> having the same issue (files in nested directories are not read) when using: >>> >>> env.readFile(fileInputFormat, "hdfs:///shared/mydir", >>> FileProcessingMode.PROCESS_ONCE, >>> -1L) >>> >>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com> >>> : >>> >>> Hi all, >>> >>> I'm using the following code to continuously process files from a >>> directory "mydir". >>> >>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>> ExecutionEnvironment(); >>> >>> FileInputFormat fileInputFormat = new TextInputFormat(new Path(" >>> hdfs:///shared/mydir")); >>> fileInputFormat.setNestedFileEnumeration(true); >>> >>> env.readFile(fileInputFormat, >>> "hdfs:///shared/mydir", >>> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L) >>> .print(); >>> >>> env.execute(); >>> >>> If I add directory under mydir, say "2016-12-16", and then add a file " >>> *2016-12-16/file.txt"*, its contents are not printed. If I add the same >>> file directly under "*mydir"*, its contents are correctly printed. >>> After that the logs will show the following : >>> >>> 10:55:44,928 DEBUG org.apache.flink.streaming.api >>> .functions.source.ContinuousFileMonitoringFunction - Ignoring >>> hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time= >>> 1481882041587 and global mod time= 1481882126122 >>> 10:55:44,928 DEBUG org.apache.flink.streaming.api >>> .functions.source.ContinuousFileMonitoringFunction - Ignoring >>> hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= >>> 1481881788704 and global mod time= 1481882126122 >>> >>> Looks like the ContinuousFileMonitoringFunction considered it already >>> read 2016-12-16 as a file and then excludes it, but its contents were not >>> processed. Any Idea why this happens? >>> Thank you. >>> >>> Best, >>> Yassine >>> >>> >>> >> >> >