Hi Joshua, with the unmanaged solution set, the records are not serialized but they need to be copied to avoid them from being mutated by the user-code JoinFunction. The stacktrace hints that the NPE is caused by copying a null record. This would happen if the solution set would not contain the key.
I was not sure if there is a restriction of the delta iteration that all keys must be present in the initial solution set. I tried to find this in the documentation but didn't see information on that. So I checked and was able to reproduce the problem. It is only possible to join the solution set with keys that are actually contained in the solution set. It's a bit surprising that this limitation is not documented and no proper exception is thrown. In fact it would be possible to avoid the exception by either: - not calling the join function (this would be inner join semantics) or - calling the join function with a null value (similar to an outer join). If created a JIRA issue [1] to track the problem. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7919 2017-10-25 16:58 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>: > Hello Fabian, > > Thank you for your response. I tried setting the solution set to unmanaged > and got a different error: > > 2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR > org.apache.flink.runtime.operators.BatchTask - Error in task code: Join > (join solution trees) (1/8) > java.lang.NullPointerException: null > at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy( > TupleSerializer.java:104) > at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy( > TupleSerializer.java:30) > at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run( > JoinWithSolutionSetSecondDriver.java:207) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > AbstractIterativeTask.java:146) > at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run( > IterationIntermediateTask.java:92) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > > > I initially thought this was due to a null being present in the solution > set tuple so I added assertions to ensure that tuple values were never > null. However, I’m still getting the above error. Did changing it to > unmanaged cause the tuples to be serialized? Is there another reason aside > from null values that this error might be thrown? > > Thank you, > > Joshua > > On Oct 25, 2017, at 3:12 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Joshua, > > that is correct. Delta iterations cannot spill to disk. The solution set > is managed in an in-memory hash table. > Spilling that hash table to disk would have a significant impact on the > performance. > > By default the hash table is organized in Flink's managed memory. > You can try to increase the managed memory size (tweaking managed memory > vs. heap memory, increasing heap memory, ...) or add more resources and > increase the parallelism. > Alternatively, it is possible to store the solution set in a Java HashMap > on the heap by setting the solution set to unManaged (DeltaIteration. > setSolutionSetUnManaged(true)). > > Best, Fabian > > > 2017-10-24 21:09 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>: > >> I’m currently using a delta iteration within a batch job and received the >> following error: >> >> java.lang.RuntimeException: Memory ran out. Compaction failed. >> numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow >> segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory: >> 18350080 Message: null >> at org.apache.flink.runtime.operators.hash.CompactingHashTable. >> insertRecordIntoPartition(CompactingHashTable.java:457) >> at org.apache.flink.runtime.operators.hash.CompactingHashTable. >> insertOrReplaceRecord(CompactingHashTable.java:392) >> at org.apache.flink.runtime.iterative.io >> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fiterative.io&data=02%7C01%7CJGriffith%40campuslabs.com%7Cd8ec77de6d934f7200a708d51b80337a%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636445159803224851&sdata=g0iK%2BZymCRuy4fEyHJ55bvhanT%2FLe7QzoURYLBhnlos%3D&reserved=0> >> .SolutionSetUpdateOutputCollector.collect(SolutionSet >> UpdateOutputCollector.java:54) >> at org.apache.flink.runtime.operators.util.metrics.CountingColl >> ector.collect(CountingCollector.java:35) >> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) >> at org.apache.flink.runtime.iterative.task.AbstractIterativeTas >> k.run(AbstractIterativeTask.java:146) >> at org.apache.flink.runtime.iterative.task.IterationTailTask. >> run(IterationTailTask.java:107) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >> k.java:355) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> >> It looks like the job ran out of Flink managed memory. Can delta >> iterations not spill to disk? >> >> Thanks, >> >> Joshua >> > > >