Hi Felix,

Passing a JoinHint to your function should help.
see:
http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E

Cheers,
Andra

On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <neut...@googlemail.com>
wrote:

> after bug fix:
>
> for 100 blocks and standard jvm heap space
>
> Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of
> recursions, without reducing partitions enough to be memory resident.
> Probably cause: Too many duplicate keys.
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> at
>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
>
> for 150 blocks and 5G jvm heap space
>
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> ...
>
> Best regards,
> Felix
>
> 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:
>
> > Yes, I will try it again with the newest update :)
> >
> > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
> >
> >> If the first error is not fixed by Chiwans PR, then we should create a
> >> JIRA
> >> for it to not forget it.
> >>
> >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> this
> >> version?
> >>
> >> Cheers,
> >> Till
> >>
> >> [1] https://github.com/apache/flink/pull/751
> >>
> >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <chiwanp...@icloud.com>
> >> wrote:
> >>
> >> > Hi. The second bug is fixed by the recent change in PR.
> >> > But there is just no test case for first bug.
> >> >
> >> > Regards,
> >> > Chiwan Park
> >> >
> >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <u...@apache.org> wrote:
> >> > >
> >> > > I think both are bugs. They are triggered by the different memory
> >> > > configurations.
> >> > >
> >> > > @chiwan: is the 2nd error fixed by your recent change?
> >> > >
> >> > > @felix: if yes, can you try the 2nd run again with the changes?
> >> > >
> >> > > On Thursday, June 4, 2015, Felix Neutatz <neut...@googlemail.com>
> >> wrote:
> >> > >
> >> > >> Hi,
> >> > >>
> >> > >> I played a bit with the ALS recommender algorithm. I used the
> >> movielens
> >> > >> dataset:
> >> > >>
> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> >> > >>
> >> > >> The rating matrix has 21.063.128 entries (ratings).
> >> > >>
> >> > >> I run the algorithm with 3 configurations:
> >> > >>
> >> > >> 1. standard jvm heap space:
> >> > >>
> >> > >> val als = ALS()
> >> > >>   .setIterations(10)
> >> > >>   .setNumFactors(10)
> >> > >>   .setBlocks(100)
> >> > >>
> >> > >> throws:
> >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> >> Memory
> >> > >> buffers leaked.
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> >> > >> at
> >> >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> > >> at java.lang.Thread.run(Thread.java:745)
> >> > >>
> >> > >> 2. 5G jvm heap space
> >> > >>
> >> > >> val als = ALS()
> >> > >>   .setIterations(10)
> >> > >>   .setNumFactors(10)
> >> > >>   .setBlocks(150)
> >> > >>
> >> > >> throws:
> >> > >>
> >> > >> java.lang.NullPointerException
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> >> > >> at
> >> >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> > >> at java.lang.Thread.run(Thread.java:745)
> >> > >>
> >> > >> 3. 14G jvm heap space
> >> > >>
> >> > >> val als = ALS()
> >> > >>   .setIterations(10)
> >> > >>   .setNumFactors(10)
> >> > >>   .setBlocks(150)
> >> > >>   .setTemporaryPath("/tmp/tmpALS")
> >> > >>
> >> > >> -> works
> >> > >>
> >> > >> Is this a Flink problem or is it just my bad configuration?
> >> > >>
> >> > >> Best regards,
> >> > >> Felix
> >> > >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >
> >
>

Reply via email to