To return to this old thing, this was basically user error. The second of
the transformations was keying by a field that was sometimes null after the
first transformation. (This was supposed to never happen, but then it did
happen in production.)

The confusing part is where the exception occurs. The NullPointerException
happens because of, and in, the second transformation, but in my example
here the stack trace points to the first transformation. Of course Flink
doesn't execute the lines literally like that (i.e. there is optimisation
going on), so the true location of the error is obscured.

I tried to create a small reproducible example of this but only managed to
get a situation where the NullPointerException instead very clearly points
to the second transformation. I'm not sure how to reproduce the weirder
edition of the error since it seems to depend on the query optimiser, which
in turn might depend on data volumes, pojo structures etc.

In any case, errors like this can of course be easily detected and fixed
with proper unit tests, whereas I didn't originally have quite full
coverage for unexpected partially-null data.

Cheers,



On Mon, 30 Jul 2018 at 10:21, Taneli Saastamoinen <
taneli.saastamoi...@gmail.com> wrote:

> On 27 July 2018 at 19:21, Chesnay Schepler <ches...@apache.org> wrote:
> > At first glance this looks like a bug. Is the nothing in the stack trace
> after the NullPointerException?
>
> Hmm, there is actually, sorry about that:
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
> at
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
> at
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
> at
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> ... 22 more
>
> > How reliably can you reproduce this?
>
> 100%, though I've only run this job a handful of times. What's frustrating
> is that I cannot reproduce this easily in a unit test (all my unit tests
> work fine). On production data it happens every time and pretty much
> instantly, but our data volumes are big enough that it's difficult to try
> to dig into it further.
>
> For now I think I'll split the job into two, have the first aggregation
> write to Kafka and have the second aggregation as a separate job that reads
> its input from Kafka. When I run the first aggregation only that is fine
> and no errors occur, so the issue seems to be the combination of
> aggregations.
>
> Cheers,
>
>

Reply via email to