Great, let me know if that helped ;) Best, D.
On Mon, Aug 16, 2021 at 4:36 PM László Ciople <ciople.las...@gmail.com> wrote: > The events are json dictionaries and the key is a field which represents a > device id, or if it doesn't exist, then actually a *hashCode *of the > device object in the dictionary is used. So this could be the problem then. > > On Mon, Aug 16, 2021 at 5:33 PM David Morávek <d...@apache.org> wrote: > >> Hi László, >> >> My intuition is that you have a non-deterministic shuffle key. If you >> perform any "per-key" operation, you need to make sure that the same key >> always end up in the same partition. To simplify this, it means that the >> key needs to have a consistent *hashCode* and *equals* across different >> JVMs. >> >> Usual mistake is that hash code is not deterministic, because it defaults >> to `System.identityHashCode(..)` (this applies for any custom object that >> doesn't override hashCode, for arrays, for enums, ...). >> >> What are you using as key for this operation? >> >> Best, >> D. >> >> On Mon, Aug 16, 2021 at 4:00 PM László Ciople <ciople.las...@gmail.com> >> wrote: >> >>> Hello, >>> I am trying to write a Flink application which receives data from Kafka, >>> does processing on keyed windowed streams and sends results on a >>> different topic. >>> Shortly after the job is started it fails with a NullPointerException in >>> StateTable.put(). I am really confused by this error, because I am not >>> explicitly working with state and in the exception stack, I cannot find a >>> reference to my own code. I'd really appreciate it if anyone would help me >>> figure out what's going on. Here's the exception stack: >>> >>> 2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >>> Window(TumblingEventTimeWindows(1800000), EventTimeTrigger, >>> CredentialStuffingAlerter) -> Map -> Sink: >>> xdr.azure.analytics.login.credential_stuffing (2/2) >>> (4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on >>> senso-api-lciople-credential-stuffing-taskmanager-1-2 @ >>> ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941). >>> 2021-08-16 16:28:54 java.lang.NullPointerException: null >>> 2021-08-16 16:28:54 at >>> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> ~[flink-dist_2.11-1.12.3.jar:1.12.3] >>> 2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?] >>> 2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink >>> Streaming Job (80895c82f109899577d166b4388c157d) switched from state >>> RUNNING to RESTARTING. >>> >>