Hi, the problem with the "maximum number of recursions" is the distribution
of join keys.

If a partition does not fit into memory, HybridHashJoin tries to solve this
problem by recursively partitioning the partition using a different hash
function.
If join keys are heavily skewed, this strategy might fail. Hashing the same
values with a different hash function won't put them into different
partitions so the partition remains too large to fit into memory and
another recursion is done. At some point, the HybridHashJoin gives up and
throws the too many recursions exception.

There are three solutions to this problem:
1. use more memory
2. use a sort-merge join
3. switch the inputs of the hash-join

Cheers, Fabian

2015-06-05 9:13 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:

> Shouldn't Flink figure it out on its own, how much memory there is for the
> join?
>
> The detailed trace for the Nullpointer exception can be found here:
>
> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
>
> Best regards,
> Felix
>
> 2015-06-04 19:41 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
>
> > I think it is not a problem of join hints, but rather of too little
> memory
> > for the join operator. If you set the temporary directory, then the job
> > will be split in smaller parts and thus each operator gets more memory.
> > Alternatively, you can increase the memory you give to the Task Managers.
> >
> > The problem with the NullPointerException won't be solved by this,
> though.
> > Could you send the full stack trace for that?
> >
> > Cheers,
> > Till
> > On Jun 4, 2015 7:10 PM, "Andra Lungu" <lungu.an...@gmail.com> wrote:
> >
> > > Hi Felix,
> > >
> > > Passing a JoinHint to your function should help.
> > > see:
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E
> > >
> > > Cheers,
> > > Andra
> > >
> > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <neut...@googlemail.com>
> > > wrote:
> > >
> > > > after bug fix:
> > > >
> > > > for 100 blocks and standard jvm heap space
> > > >
> > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> > number
> > > of
> > > > recursions, without reducing partitions enough to be memory resident.
> > > > Probably cause: Too many duplicate keys.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > at
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > at java.lang.Thread.run(Thread.java:745)
> > > >
> > > >
> > > > for 150 blocks and 5G jvm heap space
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > ...
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:
> > > >
> > > > > Yes, I will try it again with the newest update :)
> > > > >
> > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com
> >:
> > > > >
> > > > >> If the first error is not fixed by Chiwans PR, then we should
> > create a
> > > > >> JIRA
> > > > >> for it to not forget it.
> > > > >>
> > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again
> with
> > > > this
> > > > >> version?
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> [1] https://github.com/apache/flink/pull/751
> > > > >>
> > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
> chiwanp...@icloud.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > > >> > But there is just no test case for first bug.
> > > > >> >
> > > > >> > Regards,
> > > > >> > Chiwan Park
> > > > >> >
> > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org>
> wrote:
> > > > >> > >
> > > > >> > > I think both are bugs. They are triggered by the different
> > memory
> > > > >> > > configurations.
> > > > >> > >
> > > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > > >> > >
> > > > >> > > @felix: if yes, can you try the 2nd run again with the
> changes?
> > > > >> > >
> > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> > neut...@googlemail.com>
> > > > >> wrote:
> > > > >> > >
> > > > >> > >> Hi,
> > > > >> > >>
> > > > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > > > >> movielens
> > > > >> > >> dataset:
> > > > >> > >>
> > > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > > >> > >>
> > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > > >> > >>
> > > > >> > >> I run the algorithm with 3 configurations:
> > > > >> > >>
> > > > >> > >> 1. standard jvm heap space:
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(100)
> > > > >> > >>
> > > > >> > >> throws:
> > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory
> management:
> > > > >> Memory
> > > > >> > >> buffers leaked.
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > >> > >> at
> > > > >> >
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >> > >> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > >> > >>
> > > > >> > >> 2. 5G jvm heap space
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(150)
> > > > >> > >>
> > > > >> > >> throws:
> > > > >> > >>
> > > > >> > >> java.lang.NullPointerException
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > >> > >> at
> > > > >> >
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >> > >> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > >> > >>
> > > > >> > >> 3. 14G jvm heap space
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(150)
> > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > > >> > >>
> > > > >> > >> -> works
> > > > >> > >>
> > > > >> > >> Is this a Flink problem or is it just my bad configuration?
> > > > >> > >>
> > > > >> > >> Best regards,
> > > > >> > >> Felix
> > > > >> > >>
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to