Hi,

I played a bit with the ALS recommender algorithm. I used the movielens
dataset: http://files.grouplens.org/datasets/movielens/ml-latest-README.html

The rating matrix has 21.063.128 entries (ratings).

I run the algorithm with 3 configurations:

1. standard jvm heap space:

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(100)

throws:
java.lang.RuntimeException: Hash Join bug in memory management: Memory
buffers leaked.
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

2. 5G jvm heap space

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(150)

throws:

java.lang.NullPointerException
at
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

3. 14G jvm heap space

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(150)
   .setTemporaryPath("/tmp/tmpALS")

-> works

Is this a Flink problem or is it just my bad configuration?

Best regards,
Felix

Reply via email to