I think it is not a problem of join hints, but rather of too little memory for the join operator. If you set the temporary directory, then the job will be split in smaller parts and thus each operator gets more memory. Alternatively, you can increase the memory you give to the Task Managers.
The problem with the NullPointerException won't be solved by this, though. Could you send the full stack trace for that? Cheers, Till On Jun 4, 2015 7:10 PM, "Andra Lungu" <lungu.an...@gmail.com> wrote: > Hi Felix, > > Passing a JoinHint to your function should help. > see: > > http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E > > Cheers, > Andra > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <neut...@googlemail.com> > wrote: > > > after bug fix: > > > > for 100 blocks and standard jvm heap space > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum number > of > > recursions, without reducing partitions enough to be memory resident. > > Probably cause: Too many duplicate keys. > > at > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718) > > at > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506) > > at > > > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543) > > at > > > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > at > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > > > > > for 150 blocks and 5G jvm heap space > > > > Caused by: java.lang.NullPointerException > > at > > > > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > > ... > > > > Best regards, > > Felix > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>: > > > > > Yes, I will try it again with the newest update :) > > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: > > > > > >> If the first error is not fixed by Chiwans PR, then we should create a > > >> JIRA > > >> for it to not forget it. > > >> > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with > > this > > >> version? > > >> > > >> Cheers, > > >> Till > > >> > > >> [1] https://github.com/apache/flink/pull/751 > > >> > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <chiwanp...@icloud.com> > > >> wrote: > > >> > > >> > Hi. The second bug is fixed by the recent change in PR. > > >> > But there is just no test case for first bug. > > >> > > > >> > Regards, > > >> > Chiwan Park > > >> > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org> wrote: > > >> > > > > >> > > I think both are bugs. They are triggered by the different memory > > >> > > configurations. > > >> > > > > >> > > @chiwan: is the 2nd error fixed by your recent change? > > >> > > > > >> > > @felix: if yes, can you try the 2nd run again with the changes? > > >> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <neut...@googlemail.com> > > >> wrote: > > >> > > > > >> > >> Hi, > > >> > >> > > >> > >> I played a bit with the ALS recommender algorithm. I used the > > >> movielens > > >> > >> dataset: > > >> > >> > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html > > >> > >> > > >> > >> The rating matrix has 21.063.128 entries (ratings). > > >> > >> > > >> > >> I run the algorithm with 3 configurations: > > >> > >> > > >> > >> 1. standard jvm heap space: > > >> > >> > > >> > >> val als = ALS() > > >> > >> .setIterations(10) > > >> > >> .setNumFactors(10) > > >> > >> .setBlocks(100) > > >> > >> > > >> > >> throws: > > >> > >> java.lang.RuntimeException: Hash Join bug in memory management: > > >> Memory > > >> > >> buffers leaked. > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > >> > >> at > > >> > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > >> > >> at java.lang.Thread.run(Thread.java:745) > > >> > >> > > >> > >> 2. 5G jvm heap space > > >> > >> > > >> > >> val als = ALS() > > >> > >> .setIterations(10) > > >> > >> .setNumFactors(10) > > >> > >> .setBlocks(150) > > >> > >> > > >> > >> throws: > > >> > >> > > >> > >> java.lang.NullPointerException > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) > > >> > >> at > > >> > > > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > > >> > >> at > > >> > >> > > >> > >> > > >> > > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > >> > >> at java.lang.Thread.run(Thread.java:745) > > >> > >> > > >> > >> 3. 14G jvm heap space > > >> > >> > > >> > >> val als = ALS() > > >> > >> .setIterations(10) > > >> > >> .setNumFactors(10) > > >> > >> .setBlocks(150) > > >> > >> .setTemporaryPath("/tmp/tmpALS") > > >> > >> > > >> > >> -> works > > >> > >> > > >> > >> Is this a Flink problem or is it just my bad configuration? > > >> > >> > > >> > >> Best regards, > > >> > >> Felix > > >> > >> > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > > > > > >