Thanks for the update Taneli. Glad that you solved the problem. If you should find out more about the more obscure case, let us know. Maybe there is something we can still improve to prevent misleading exceptions in the future.
Cheers, Till On Tue, Jan 1, 2019 at 3:01 PM Taneli Saastamoinen < taneli.saastamoi...@gmail.com> wrote: > To return to this old thing, this was basically user error. The second of > the transformations was keying by a field that was sometimes null after the > first transformation. (This was supposed to never happen, but then it did > happen in production.) > > The confusing part is where the exception occurs. The NullPointerException > happens because of, and in, the second transformation, but in my example > here the stack trace points to the first transformation. Of course Flink > doesn't execute the lines literally like that (i.e. there is optimisation > going on), so the true location of the error is obscured. > > I tried to create a small reproducible example of this but only managed to > get a situation where the NullPointerException instead very clearly points > to the second transformation. I'm not sure how to reproduce the weirder > edition of the error since it seems to depend on the query optimiser, which > in turn might depend on data volumes, pojo structures etc. > > In any case, errors like this can of course be easily detected and fixed > with proper unit tests, whereas I didn't originally have quite full > coverage for unexpected partially-null data. > > Cheers, > > > > On Mon, 30 Jul 2018 at 10:21, Taneli Saastamoinen < > taneli.saastamoi...@gmail.com> wrote: > >> On 27 July 2018 at 19:21, Chesnay Schepler <ches...@apache.org> wrote: >> > At first glance this looks like a bug. Is the nothing in the stack >> trace after the NullPointerException? >> >> Hmm, there is actually, sorry about that: >> >> Caused by: java.lang.NullPointerException >> at >> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59) >> at >> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48) >> at >> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63) >> at >> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) >> at >> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) >> ... 22 more >> >> > How reliably can you reproduce this? >> >> 100%, though I've only run this job a handful of times. What's >> frustrating is that I cannot reproduce this easily in a unit test (all my >> unit tests work fine). On production data it happens every time and pretty >> much instantly, but our data volumes are big enough that it's difficult to >> try to dig into it further. >> >> For now I think I'll split the job into two, have the first aggregation >> write to Kafka and have the second aggregation as a separate job that reads >> its input from Kafka. When I run the first aggregation only that is fine >> and no errors occur, so the issue seems to be the combination of >> aggregations. >> >> Cheers, >> >>