Hi Dylan,

I roughly looked at your job program and the DAG of the job. It seems that
the optimizer chose the wrong optimization execution plan.

cc Till.

Thanks, vino.

Dylan Adams <dylan.ad...@gmail.com> 于2018年8月8日周三 上午2:26写道:

> I'm trying to use the Flink DataSet API to validate some records and have
> run into an issue. My program uses joins to validate inputs against
> reference data. One of the attributes I'm validating is optional, and only
> needs to be validated when non-NULL. So I added a filter to prevent the
> null-keyed records from being used in the validation join, and was
> surprised to receive this exception:
>
> java.lang.RuntimeException: A NullPointerException occured while accessing
> a key field in a POJO. Most likely, the value grouped/joined on is null.
> Field name: optionalKey
> at
> org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)
>
> It looks like the problem is that Flink has pushed the hash partitioning
> aspect of the join before the filter for the null-keyed records and is
> trying to hash the null keys. The issue can be seen in the plan
> visualization:
> https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png
>
> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
> project: https://github.com/dkadams/flink-plan-issue/
>
> Is this expected behavior or a bug? FLINK-1915 seems to have the same root
> problem, but with a negative performance impact instead of a
> RuntimeException.
>
> Regards,
> Dylan
>

Reply via email to