Yes, please go ahead with the fix! :-)

(If I'm not mistaken Kostas is working on other stuff right now.)

On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
wrote:

> Hi,
>
> I found the root cause of the problem : the listEligibleFiles method in 
> ContinuousFileMonitoringFunction
> scans only the topmost files and ignores the nested files. By fixing that
> I was able to get the expected output. I created Jira issue:
> https://issues.apache.org/jira/browse/FLINK-5432.
>
> @Kostas, If you haven't already started working on a fix for this, I would
> happily contribute a fix for it if you like.
>
> Best,
> Yassine
>
> 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
> Hi Kostas,
>
> I debugged the code and the nestedFileEnumeration parameter was always
> true during the execution. I noticed however that in the following loop
> in ContinuousFileMonitoringFunction, for some reason, the fileStatus was
> null for files in nested folders, and non null for files directly under the
> parent path, so no splits were forwarded in the case of nested folders.
>
> for(int var5 = 0; var5 < var4; ++var5) {
>                 FileInputSplit split = var3[var5];
>                 FileStatus fileStatus =
> (FileStatus)eligibleFiles.get(split.getPath());
>                 if(fileStatus != null) {
>                     Long modTime =
> Long.valueOf(fileStatus.getModificationTime());
>                     Object splitsToForward =
> (List)splitsByModTime.get(modTime);
>                     if(splitsToForward == null) {
>                         splitsToForward = new ArrayList();
>                         splitsByModTime.put(modTime, splitsToForward);
>                     }
>
>                     ((List)splitsToForward).add(new
> TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(),
> split.getPath(), split.getStart(), split.getLength(),
> split.getHostnames()));
>                 }
>             }
>
> Thanks,
> Yassine
>
>
> 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>:
>
> Hi Yassine,
>
> I suspect that the problem is in the way the input format (and not the
> reader) scans nested files,
> but could you see if in the code that is executed by the tasks, the
> nestedFileEnumeration parameter is still true?
>
> I am asking in order to pin down if the problem is in the way we ship the
> code to the tasks or in reading the
> nested files.
>
> Thanks,
> Kostas
>
> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
> Hi,
>
> Any updates on this issue? Thank you.
>
> Best,
> Yassine
>
>
> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>
> +kostas, who probably has the most experience with this by now. Do you
> have an idea what might be going on?
>
> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
> Looks like this is not specific to the continuous file monitoring, I'm
> having the same issue (files in nested directories are not read) when using:
>
> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
> FileProcessingMode.PROCESS_ONCE,
> -1L)
>
> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
> Hi all,
>
> I'm using the following code to continuously process files from a
> directory "mydir".
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> FileInputFormat fileInputFormat = new TextInputFormat(new Path("
> hdfs:///shared/mydir"));
> fileInputFormat.setNestedFileEnumeration(true);
>
> env.readFile(fileInputFormat,
>                 "hdfs:///shared/mydir",
>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>                 .print();
>
> env.execute();
>
> If I add directory under mydir, say "2016-12-16", and then add a file "
> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
> file directly under "*mydir"*,  its contents are correctly printed. After
> that the logs will show the following :
>
> 10:55:44,928 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>  - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
> 1481882041587 and global mod time= 1481882126122
> 10:55:44,928 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>  - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time=
> 1481881788704 and global mod time= 1481882126122
>
> Looks like the ContinuousFileMonitoringFunction  considered it already
> read 2016-12-16 as a file and then excludes it, but its contents were not
> processed. Any Idea why this happens?
> Thank you.
>
> Best,
> Yassine
>
>
>
>
>
>
>

Reply via email to