Hi Kostas,

I debugged the code and the nestedFileEnumeration parameter was always true
during the execution. I noticed however that in the following loop
in ContinuousFileMonitoringFunction, for some reason, the fileStatus was
null for files in nested folders, and non null for files directly under the
parent path, so no splits were forwarded in the case of nested folders.

for(int var5 = 0; var5 < var4; ++var5) {
                FileInputSplit split = var3[var5];
                FileStatus fileStatus =
(FileStatus)eligibleFiles.get(split.getPath());
                if(fileStatus != null) {
                    Long modTime =
Long.valueOf(fileStatus.getModificationTime());
                    Object splitsToForward =
(List)splitsByModTime.get(modTime);
                    if(splitsToForward == null) {
                        splitsToForward = new ArrayList();
                        splitsByModTime.put(modTime, splitsToForward);
                    }

                    ((List)splitsToForward).add(new
TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(),
split.getPath(), split.getStart(), split.getLength(),
split.getHostnames()));
                }
            }

Thanks,
Yassine


2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>:

> Hi Yassine,
>
> I suspect that the problem is in the way the input format (and not the
> reader) scans nested files,
> but could you see if in the code that is executed by the tasks, the
> nestedFileEnumeration parameter is still true?
>
> I am asking in order to pin down if the problem is in the way we ship the
> code to the tasks or in reading the
> nested files.
>
> Thanks,
> Kostas
>
> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
> Hi,
>
> Any updates on this issue? Thank you.
>
> Best,
> Yassine
>
>
> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>
> +kostas, who probably has the most experience with this by now. Do you
> have an idea what might be going on?
>
> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
>> Looks like this is not specific to the continuous file monitoring, I'm
>> having the same issue (files in nested directories are not read) when using:
>>
>> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
>> FileProcessingMode.PROCESS_ONCE,
>> -1L)
>>
>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>> Hi all,
>>
>> I'm using the following code to continuously process files from a
>> directory "mydir".
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>> FileInputFormat fileInputFormat = new TextInputFormat(new Path("
>> hdfs:///shared/mydir"));
>> fileInputFormat.setNestedFileEnumeration(true);
>>
>> env.readFile(fileInputFormat,
>>                 "hdfs:///shared/mydir",
>>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>                 .print();
>>
>> env.execute();
>>
>> If I add directory under mydir, say "2016-12-16", and then add a file "
>> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
>> file directly under "*mydir"*,  its contents are correctly printed.
>> After that the logs will show the following :
>>
>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>> hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
>> 1481882041587 and global mod time= 1481882126122
>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>> hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704
>> and global mod time= 1481882126122
>>
>> Looks like the ContinuousFileMonitoringFunction  considered it already
>> read 2016-12-16 as a file and then excludes it, but its contents were not
>> processed. Any Idea why this happens?
>> Thank you.
>>
>> Best,
>> Yassine
>>
>>
>>
>
>

Reply via email to