Hi Joshua,

with the unmanaged solution set, the records are not serialized but they
need to be copied to avoid them from being mutated by the user-code
JoinFunction.
The stacktrace hints that the NPE is caused by copying a null record. This
would happen if the solution set would not contain the key.

I was not sure if there is a restriction of the delta iteration that all
keys must be present in the initial solution set. I tried to find this in
the documentation but didn't see information on that.
So I checked and was able to reproduce the problem.
It is only possible to join the solution set with keys that are actually
contained in the solution set.

It's a bit surprising that this limitation is not documented and no proper
exception is thrown. In fact it would be possible to avoid the exception by
either:
- not calling the join function (this would be inner join semantics) or
- calling the join function with a null value (similar to an outer join).

If created a JIRA issue [1] to track the problem.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7919

2017-10-25 16:58 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>:

> Hello Fabian,
>
> Thank you for your response. I tried setting the solution set to unmanaged
> and got a different error:
>
> 2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR
> org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join
> (join solution trees) (1/8)
> java.lang.NullPointerException: null
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(
> TupleSerializer.java:104)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(
> TupleSerializer.java:30)
> at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(
> JoinWithSolutionSetSecondDriver.java:207)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(
> IterationIntermediateTask.java:92)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>
> I initially thought this was due to a null being present in the solution
> set tuple so I added assertions to ensure that tuple values were never
> null. However, I’m still getting the above error. Did changing it to
> unmanaged cause the tuples to be serialized? Is there another reason aside
> from null values that this error might be thrown?
>
> Thank you,
>
> Joshua
>
> On Oct 25, 2017, at 3:12 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Joshua,
>
> that is correct. Delta iterations cannot spill to disk. The solution set
> is managed in an in-memory hash table.
> Spilling that hash table to disk would have a significant impact on the
> performance.
>
> By default the hash table is organized in Flink's managed memory.
> You can try to increase the managed memory size (tweaking managed memory
> vs. heap memory, increasing heap memory, ...) or add more resources and
> increase the parallelism.
> Alternatively, it is possible to store the solution set in a Java HashMap
> on the heap by setting the solution set to unManaged (DeltaIteration.
> setSolutionSetUnManaged(true)).
>
> Best, Fabian
>
>
> 2017-10-24 21:09 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>:
>
>> I’m currently using a delta iteration within a batch job and received the
>> following error:
>>
>> java.lang.RuntimeException: Memory ran out. Compaction failed.
>> numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow
>> segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory:
>> 18350080 Message: null
>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> insertRecordIntoPartition(CompactingHashTable.java:457)
>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> insertOrReplaceRecord(CompactingHashTable.java:392)
>> at org.apache.flink.runtime.iterative.io
>> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fiterative.io&data=02%7C01%7CJGriffith%40campuslabs.com%7Cd8ec77de6d934f7200a708d51b80337a%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636445159803224851&sdata=g0iK%2BZymCRuy4fEyHJ55bvhanT%2FLe7QzoURYLBhnlos%3D&reserved=0>
>> .SolutionSetUpdateOutputCollector.collect(SolutionSet
>> UpdateOutputCollector.java:54)
>> at org.apache.flink.runtime.operators.util.metrics.CountingColl
>> ector.collect(CountingCollector.java:35)
>> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>> at org.apache.flink.runtime.iterative.task.AbstractIterativeTas
>> k.run(AbstractIterativeTask.java:146)
>> at org.apache.flink.runtime.iterative.task.IterationTailTask.
>> run(IterationTailTask.java:107)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> It looks like the job ran out of Flink managed memory. Can delta
>> iterations not spill to disk?
>>
>> Thanks,
>>
>> Joshua
>>
>
>
>

Reply via email to