I think that the NPE in second condition is bug in HashTable. I just found that ConnectedComponents with small memory segments causes same error. (I thought I fixed the bug, but It is still alive.)
Regards, Chiwan Park > On Jun 5, 2015, at 2:35 AM, Felix Neutatz <neut...@googlemail.com> wrote: > > now the question is, which join in the ALS implementation is the problem :) > > 2015-06-04 19:09 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > >> Hi Felix, >> >> Passing a JoinHint to your function should help. >> see: >> >> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E >> >> Cheers, >> Andra >> >> On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <neut...@googlemail.com> >> wrote: >> >>> after bug fix: >>> >>> for 100 blocks and standard jvm heap space >>> >>> Caused by: java.lang.RuntimeException: Hash join exceeded maximum number >> of >>> recursions, without reducing partitions enough to be memory resident. >>> Probably cause: Too many duplicate keys. >>> at >>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718) >>> at >>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506) >>> at >>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543) >>> at >>> >>> >> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) >>> at >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) >>> at >>> >>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) >>> at >>> >>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> for 150 blocks and 5G jvm heap space >>> >>> Caused by: java.lang.NullPointerException >>> at >>> >>> >> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) >>> ... >>> >>> Best regards, >>> Felix >>> >>> 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>: >>> >>>> Yes, I will try it again with the newest update :) >>>> >>>> 2015-06-04 10:17 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: >>>> >>>>> If the first error is not fixed by Chiwans PR, then we should create a >>>>> JIRA >>>>> for it to not forget it. >>>>> >>>>> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with >>> this >>>>> version? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> [1] https://github.com/apache/flink/pull/751 >>>>> >>>>> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <chiwanp...@icloud.com> >>>>> wrote: >>>>> >>>>>> Hi. The second bug is fixed by the recent change in PR. >>>>>> But there is just no test case for first bug. >>>>>> >>>>>> Regards, >>>>>> Chiwan Park >>>>>> >>>>>>> On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org> wrote: >>>>>>> >>>>>>> I think both are bugs. They are triggered by the different memory >>>>>>> configurations. >>>>>>> >>>>>>> @chiwan: is the 2nd error fixed by your recent change? >>>>>>> >>>>>>> @felix: if yes, can you try the 2nd run again with the changes? >>>>>>> >>>>>>> On Thursday, June 4, 2015, Felix Neutatz <neut...@googlemail.com> >>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I played a bit with the ALS recommender algorithm. I used the >>>>> movielens >>>>>>>> dataset: >>>>>>>> >>> http://files.grouplens.org/datasets/movielens/ml-latest-README.html >>>>>>>> >>>>>>>> The rating matrix has 21.063.128 entries (ratings). >>>>>>>> >>>>>>>> I run the algorithm with 3 configurations: >>>>>>>> >>>>>>>> 1. standard jvm heap space: >>>>>>>> >>>>>>>> val als = ALS() >>>>>>>> .setIterations(10) >>>>>>>> .setNumFactors(10) >>>>>>>> .setBlocks(100) >>>>>>>> >>>>>>>> throws: >>>>>>>> java.lang.RuntimeException: Hash Join bug in memory management: >>>>> Memory >>>>>>>> buffers leaked. >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) >>>>>>>> at >>>>>> >>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> 2. 5G jvm heap space >>>>>>>> >>>>>>>> val als = ALS() >>>>>>>> .setIterations(10) >>>>>>>> .setNumFactors(10) >>>>>>>> .setBlocks(150) >>>>>>>> >>>>>>>> throws: >>>>>>>> >>>>>>>> java.lang.NullPointerException >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) >>>>>>>> at >>>>>> >>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> 3. 14G jvm heap space >>>>>>>> >>>>>>>> val als = ALS() >>>>>>>> .setIterations(10) >>>>>>>> .setNumFactors(10) >>>>>>>> .setBlocks(150) >>>>>>>> .setTemporaryPath("/tmp/tmpALS") >>>>>>>> >>>>>>>> -> works >>>>>>>> >>>>>>>> Is this a Flink problem or is it just my bad configuration? >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Felix >>>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >>