I'll look into it to find the responsible join operation. On Jun 5, 2015 10:50 AM, "Stephan Ewen" <se...@apache.org> wrote:
> There are two different issues here: > > 1) Flink does figure out how much memory a join gets, but that memory may > be too little for the join to accept it. Flink plans highly conservative > right now - too conservative often, which is something we have on the > immediate roadmap to fix. > > 2) The "Hash Join exceeded recursions" problems is made worse by little > memory, but is usually an indicator that the join is running the wrong way > anyways. The side with many duplicates should rarely be the build side, but > in most cases the probe side. > > > Stephan > > > > > On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz <neut...@googlemail.com> > wrote: > > > Shouldn't Flink figure it out on its own, how much memory there is for > the > > join? > > > > The detailed trace for the Nullpointer exception can be found here: > > > > > https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt > > > > Best regards, > > Felix > > > > 2015-06-04 19:41 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: > > > > > I think it is not a problem of join hints, but rather of too little > > memory > > > for the join operator. If you set the temporary directory, then the job > > > will be split in smaller parts and thus each operator gets more memory. > > > Alternatively, you can increase the memory you give to the Task > Managers. > > > > > > The problem with the NullPointerException won't be solved by this, > > though. > > > Could you send the full stack trace for that? > > > > > > Cheers, > > > Till > > > On Jun 4, 2015 7:10 PM, "Andra Lungu" <lungu.an...@gmail.com> wrote: > > > > > > > Hi Felix, > > > > > > > > Passing a JoinHint to your function should help. > > > > see: > > > > > > > > > > > > > > http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E > > > > > > > > Cheers, > > > > Andra > > > > > > > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz < > neut...@googlemail.com> > > > > wrote: > > > > > > > > > after bug fix: > > > > > > > > > > for 100 blocks and standard jvm heap space > > > > > > > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum > > > number > > > > of > > > > > recursions, without reducing partitions enough to be memory > resident. > > > > > Probably cause: Too many duplicate keys. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > > > > at > > > > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > > > > for 150 blocks and 5G jvm heap space > > > > > > > > > > Caused by: java.lang.NullPointerException > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > > > > > ... > > > > > > > > > > Best regards, > > > > > Felix > > > > > > > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>: > > > > > > > > > > > Yes, I will try it again with the newest update :) > > > > > > > > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann < > till.rohrm...@gmail.com > > >: > > > > > > > > > > > >> If the first error is not fixed by Chiwans PR, then we should > > > create a > > > > > >> JIRA > > > > > >> for it to not forget it. > > > > > >> > > > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again > > with > > > > > this > > > > > >> version? > > > > > >> > > > > > >> Cheers, > > > > > >> Till > > > > > >> > > > > > >> [1] https://github.com/apache/flink/pull/751 > > > > > >> > > > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park < > > chiwanp...@icloud.com > > > > > > > > > >> wrote: > > > > > >> > > > > > >> > Hi. The second bug is fixed by the recent change in PR. > > > > > >> > But there is just no test case for first bug. > > > > > >> > > > > > > >> > Regards, > > > > > >> > Chiwan Park > > > > > >> > > > > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org> > > wrote: > > > > > >> > > > > > > > >> > > I think both are bugs. They are triggered by the different > > > memory > > > > > >> > > configurations. > > > > > >> > > > > > > > >> > > @chiwan: is the 2nd error fixed by your recent change? > > > > > >> > > > > > > > >> > > @felix: if yes, can you try the 2nd run again with the > > changes? > > > > > >> > > > > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz < > > > neut...@googlemail.com> > > > > > >> wrote: > > > > > >> > > > > > > > >> > >> Hi, > > > > > >> > >> > > > > > >> > >> I played a bit with the ALS recommender algorithm. I used > the > > > > > >> movielens > > > > > >> > >> dataset: > > > > > >> > >> > > > > > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html > > > > > >> > >> > > > > > >> > >> The rating matrix has 21.063.128 entries (ratings). > > > > > >> > >> > > > > > >> > >> I run the algorithm with 3 configurations: > > > > > >> > >> > > > > > >> > >> 1. standard jvm heap space: > > > > > >> > >> > > > > > >> > >> val als = ALS() > > > > > >> > >> .setIterations(10) > > > > > >> > >> .setNumFactors(10) > > > > > >> > >> .setBlocks(100) > > > > > >> > >> > > > > > >> > >> throws: > > > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory > > management: > > > > > >> Memory > > > > > >> > >> buffers leaked. > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > > > > >> > >> at > > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > >> > >> at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > > >> > >> at java.lang.Thread.run(Thread.java:745) > > > > > >> > >> > > > > > >> > >> 2. 5G jvm heap space > > > > > >> > >> > > > > > >> > >> val als = ALS() > > > > > >> > >> .setIterations(10) > > > > > >> > >> .setNumFactors(10) > > > > > >> > >> .setBlocks(150) > > > > > >> > >> > > > > > >> > >> throws: > > > > > >> > >> > > > > > >> > >> java.lang.NullPointerException > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > > > > >> > >> at > > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > > > > >> > >> at > > > > > >> > >> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > >> > >> at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > > >> > >> at java.lang.Thread.run(Thread.java:745) > > > > > >> > >> > > > > > >> > >> 3. 14G jvm heap space > > > > > >> > >> > > > > > >> > >> val als = ALS() > > > > > >> > >> .setIterations(10) > > > > > >> > >> .setNumFactors(10) > > > > > >> > >> .setBlocks(150) > > > > > >> > >> .setTemporaryPath("/tmp/tmpALS") > > > > > >> > >> > > > > > >> > >> -> works > > > > > >> > >> > > > > > >> > >> Is this a Flink problem or is it just my bad configuration? > > > > > >> > >> > > > > > >> > >> Best regards, > > > > > >> > >> Felix > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >