My intuition is that you have a non-deterministic shuffle key. If you perform any "per-key" operation, you need to make sure that the same key always end up in the same partition. To simplify this, it means that the key needs to have a consistent *hashCode* and *equals* across different JVMs.
Usual mistake is that hash code is not deterministic, because it defaults to `System.identityHashCode(..)` (this applies for any custom object that doesn't override hashCode, for arrays, for enums, ...). What are you using as key for this operation? Best, D. On Mon, Aug 16, 2021 at 4:00 PM László Ciople <ciople.las...@gmail.com> wrote: > Hello, > I am trying to write a Flink application which receives data from Kafka, > does processing on keyed windowed streams and sends results on a > different topic. > Shortly after the job is started it fails with a NullPointerException in > StateTable.put(). I am really confused by this error, because I am not > explicitly working with state and in the exception stack, I cannot find a > reference to my own code. I'd really appreciate it if anyone would help me > figure out what's going on. Here's the exception stack: > > 2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1800000), EventTimeTrigger, > CredentialStuffingAlerter) -> Map -> Sink: > xdr.azure.analytics.login.credential_stuffing (2/2) > (4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on > senso-api-lciople-credential-stuffing-taskmanager-1-2 @ > ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941). > 2021-08-16 16:28:54 java.lang.NullPointerException: null > 2021-08-16 16:28:54 at > org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > ~[flink-dist_2.11-1.12.3.jar:1.12.3] > 2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?] > 2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink > Streaming Job (80895c82f109899577d166b4388c157d) switched from state > RUNNING to RESTARTING. >