Hi, I played a bit with the ALS recommender algorithm. I used the movielens dataset: http://files.grouplens.org/datasets/movielens/ml-latest-README.html
The rating matrix has 21.063.128 entries (ratings). I run the algorithm with 3 configurations: 1. standard jvm heap space: val als = ALS() .setIterations(10) .setNumFactors(10) .setBlocks(100) throws: java.lang.RuntimeException: Hash Join bug in memory management: Memory buffers leaked. at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) 2. 5G jvm heap space val als = ALS() .setIterations(10) .setNumFactors(10) .setBlocks(150) throws: java.lang.NullPointerException at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) 3. 14G jvm heap space val als = ALS() .setIterations(10) .setNumFactors(10) .setBlocks(150) .setTemporaryPath("/tmp/tmpALS") -> works Is this a Flink problem or is it just my bad configuration? Best regards, Felix