Hi,

I found the root cause of the problem : the listEligibleFiles method in
ContinuousFileMonitoringFunction scans only the topmost files and ignores
the nested files. By fixing that I was able to get the expected output. I
created Jira issue: https://issues.apache.org/jira/browse/FLINK-5432.

@Kostas, If you haven't already started working on a fix for this, I would
happily contribute a fix for it if you like.

Best,
Yassine

2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi Kostas,
>
> I debugged the code and the nestedFileEnumeration parameter was always
> true during the execution. I noticed however that in the following loop in
> ContinuousFileMonitoringFunction, for some reason, the fileStatus was
> null for files in nested folders, and non null for files directly under the
> parent path, so no splits were forwarded in the case of nested folders.
>
> for(int var5 = 0; var5 < var4; ++var5) {
>                 FileInputSplit split = var3[var5];
>                 FileStatus fileStatus = (FileStatus)eligibleFiles.get(
> split.getPath());
>                 if(fileStatus != null) {
>                     Long modTime = Long.valueOf(fileStatus.
> getModificationTime());
>                     Object splitsToForward = (List)splitsByModTime.get(
> modTime);
>                     if(splitsToForward == null) {
>                         splitsToForward = new ArrayList();
>                         splitsByModTime.put(modTime, splitsToForward);
>                     }
>
>                     ((List)splitsToForward).add(new
> TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(),
> split.getPath(), split.getStart(), split.getLength(),
> split.getHostnames()));
>                 }
>             }
>
> Thanks,
> Yassine
>
>
> 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>:
>
>> Hi Yassine,
>>
>> I suspect that the problem is in the way the input format (and not the
>> reader) scans nested files,
>> but could you see if in the code that is executed by the tasks, the
>> nestedFileEnumeration parameter is still true?
>>
>> I am asking in order to pin down if the problem is in the way we ship the
>> code to the tasks or in reading the
>> nested files.
>>
>> Thanks,
>> Kostas
>>
>> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>> wrote:
>>
>> Hi,
>>
>> Any updates on this issue? Thank you.
>>
>> Best,
>> Yassine
>>
>>
>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>>
>> +kostas, who probably has the most experience with this by now. Do you
>> have an idea what might be going on?
>>
>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>> wrote:
>>
>>> Looks like this is not specific to the continuous file monitoring, I'm
>>> having the same issue (files in nested directories are not read) when using:
>>>
>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
>>> FileProcessingMode.PROCESS_ONCE,
>>> -1L)
>>>
>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>>> :
>>>
>>> Hi all,
>>>
>>> I'm using the following code to continuously process files from a
>>> directory "mydir".
>>>
>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> FileInputFormat fileInputFormat = new TextInputFormat(new Path("
>>> hdfs:///shared/mydir"));
>>> fileInputFormat.setNestedFileEnumeration(true);
>>>
>>> env.readFile(fileInputFormat,
>>>                 "hdfs:///shared/mydir",
>>>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>>                 .print();
>>>
>>> env.execute();
>>>
>>> If I add directory under mydir, say "2016-12-16", and then add a file "
>>> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
>>> file directly under "*mydir"*,  its contents are correctly printed.
>>> After that the logs will show the following :
>>>
>>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>>> hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
>>> 1481882041587 and global mod time= 1481882126122
>>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>>> hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time=
>>> 1481881788704 and global mod time= 1481882126122
>>>
>>> Looks like the ContinuousFileMonitoringFunction  considered it already
>>> read 2016-12-16 as a file and then excludes it, but its contents were not
>>> processed. Any Idea why this happens?
>>> Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>>
>>
>>
>

Reply via email to