Pushed a fix to the master and will open a PR to programmatically fix this.
On Tue, May 26, 2015 at 4:22 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Yeap, that definitively solves the problem! Could you make a PR to fix > that..? > > Thank you in advance, > Flavio > > On Tue, May 26, 2015 at 3:20 PM, Maximilian Michels <m...@apache.org> > wrote: > >> Yes, there is a loop to recursively search for files in directory but >> that should be ok. The code fails when adding a new InputSplit to an >> ArrayList. This is a standard operation. >> >> Oh, I think I found a bug in `addNestedFiles`. It does not pick up the >> length of the recursively found files in line 546. That can result in a >> returned size of 0 which causes infinite InputSplits to be created and >> added to the aforementioned ArrayList. Can you change >> >> addNestedFiles(dir.getPath(), files, length, logExcludedFiles); >> >> to >> >> length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles); >> >> ? >> >> >> >> On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> I have 10 files..I debugged the code and it seems that there's a loop in >>> the FileInputFormat when files are nested far away from the root directory >>> of the scan >>> >>> On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Flavio, >>>> >>>> how many files are in the directory? >>>> You can count with "find /tmp/myDir | wc -l" >>>> >>>> Flink running out of memory while creating input splits indicates to me >>>> that there are a lot of files in there. >>>> >>>> On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi to all, >>>>> >>>>> I'm trying to recursively read a directory but it seems that the >>>>> totalLength value in the FileInputformat.createInputSplits() is not >>>>> computed correctly.. >>>>> >>>>> I have a files organized as: >>>>> >>>>> /tmp/myDir/A/B/cunk-1.txt >>>>> /tmp/myDir/A/B/cunk-2.txt >>>>> .. >>>>> >>>>> If I try to do the following: >>>>> >>>>> Configuration parameters = new Configuration(); >>>>> parameters.setBoolean("recursive.file.enumeration", true); >>>>> >>>>> env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print(); >>>>> >>>>> I get: >>>>> >>>>> Caused by: org.apache.flink.runtime.JobException: Creating the input >>>>> splits caused an error: Java heap space >>>>> at >>>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) >>>>> at >>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471) >>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515) >>>>> ... 19 more >>>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>> at >>>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503) >>>>> at >>>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51) >>>>> at >>>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) >>>>> >>>>> Am I doing something wrong or is it a bug? >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>>> >>> >>> >> >