To return to this old thing, this was basically user error. The second of the transformations was keying by a field that was sometimes null after the first transformation. (This was supposed to never happen, but then it did happen in production.)
The confusing part is where the exception occurs. The NullPointerException happens because of, and in, the second transformation, but in my example here the stack trace points to the first transformation. Of course Flink doesn't execute the lines literally like that (i.e. there is optimisation going on), so the true location of the error is obscured. I tried to create a small reproducible example of this but only managed to get a situation where the NullPointerException instead very clearly points to the second transformation. I'm not sure how to reproduce the weirder edition of the error since it seems to depend on the query optimiser, which in turn might depend on data volumes, pojo structures etc. In any case, errors like this can of course be easily detected and fixed with proper unit tests, whereas I didn't originally have quite full coverage for unexpected partially-null data. Cheers, On Mon, 30 Jul 2018 at 10:21, Taneli Saastamoinen < taneli.saastamoi...@gmail.com> wrote: > On 27 July 2018 at 19:21, Chesnay Schepler <ches...@apache.org> wrote: > > At first glance this looks like a bug. Is the nothing in the stack trace > after the NullPointerException? > > Hmm, there is actually, sorry about that: > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59) > at > org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 22 more > > > How reliably can you reproduce this? > > 100%, though I've only run this job a handful of times. What's frustrating > is that I cannot reproduce this easily in a unit test (all my unit tests > work fine). On production data it happens every time and pretty much > instantly, but our data volumes are big enough that it's difficult to try > to dig into it further. > > For now I think I'll split the job into two, have the first aggregation > write to Kafka and have the second aggregation as a separate job that reads > its input from Kafka. When I run the first aggregation only that is fine > and no errors occur, so the issue seems to be the combination of > aggregations. > > Cheers, > >