Hi Dylan, I roughly looked at your job program and the DAG of the job. It seems that the optimizer chose the wrong optimization execution plan.
cc Till. Thanks, vino. Dylan Adams <dylan.ad...@gmail.com> 于2018年8月8日周三 上午2:26写道: > I'm trying to use the Flink DataSet API to validate some records and have > run into an issue. My program uses joins to validate inputs against > reference data. One of the attributes I'm validating is optional, and only > needs to be validated when non-NULL. So I added a filter to prevent the > null-keyed records from being used in the validation join, and was > surprised to receive this exception: > > java.lang.RuntimeException: A NullPointerException occured while accessing > a key field in a POJO. Most likely, the value grouped/joined on is null. > Field name: optionalKey > at > org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199) > > It looks like the problem is that Flink has pushed the hash partitioning > aspect of the join before the filter for the null-keyed records and is > trying to hash the null keys. The issue can be seen in the plan > visualization: > https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png > > I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small > project: https://github.com/dkadams/flink-plan-issue/ > > Is this expected behavior or a bug? FLINK-1915 seems to have the same root > problem, but with a negative performance impact instead of a > RuntimeException. > > Regards, > Dylan >