Hi, I encounter a weird NPE when try to do aggregate on a fixed window. If I set a small parallism number the whole job uses only one TaskManager, this NPE will not happen. But when the job scales to two TaskManagers, the TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
The NPE exception stack is: 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED. java.io.IOException: Exception while applying AggregateFunction in aggregating state at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator .processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1 ] at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1 .jar:1.11.1] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11 .1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar: 1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11 .1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11 .1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable .java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1] ... 13 more My aggregate code is public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> { @Override public Map<DataKey, DataIndex> createAccumulator() { return new HashMap<>(); } @Override public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> value, Map<DataKey, DataIndex> accumulator) { accumulator.merge(value.f0, value.f1, DataIndex::add); return accumulator; } @Override public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex> accumulator) { return accumulator; } @Override public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, Map<DataKey, DataIndex> b) { a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, DataIndex::add)); return b; } } Could anyone know something about this NPE, thanks! -- Best regards Sili Liu