My intuition is that you have a non-deterministic shuffle key. If you
perform any "per-key" operation, you need to make sure that the same key
always end up in the same partition. To simplify this, it means that the
key needs to have a consistent *hashCode* and *equals* across different
JVMs.

Usual mistake is that hash code is not deterministic, because it defaults
to `System.identityHashCode(..)` (this applies for any custom object that
doesn't override hashCode, for arrays, for enums, ...).

What are you using as key for this operation?

Best,
D.

On Mon, Aug 16, 2021 at 4:00 PM László Ciople <ciople.las...@gmail.com>
wrote:

> Hello,
> I am trying to write a Flink application which receives data from Kafka,
> does processing on keyed windowed streams and sends results on a
> different topic.
> Shortly after the job is started it fails with a NullPointerException in
> StateTable.put(). I am really confused by this error, because I am not
> explicitly working with state and in the exception stack, I cannot find a
> reference to my own code. I'd really appreciate it if anyone would help me
> figure out what's going on. Here's the exception stack:
>
> 2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> Window(TumblingEventTimeWindows(1800000), EventTimeTrigger,
> CredentialStuffingAlerter) -> Map -> Sink:
> xdr.azure.analytics.login.credential_stuffing (2/2)
> (4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on
> senso-api-lciople-credential-stuffing-taskmanager-1-2 @
> ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941).
> 2021-08-16 16:28:54 java.lang.NullPointerException: null
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
> 2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?]
> 2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink
> Streaming Job (80895c82f109899577d166b4388c157d) switched from state
> RUNNING to RESTARTING.
>

Reply via email to