Hi,Dawid, Thanks for your help. I use com.google.common.base.Objects.hashCode, pass all fields to it and generate a hashcode, and the equal method also compare all the fields.
Dawid Wysakowicz <dwysakow...@apache.org> 于2021年4月13日周二 下午8:10写道: > Hi, > > Could you check that your grouping key has a stable hashcode and equals? > It is very likely caused by an unstable hashcode and that a record with an > incorrect key ends up on a wrong task manager. > > Best, > > Dawid > On 13/04/2021 08:47, Si-li Liu wrote: > > Hi, > > I encounter a weird NPE when try to do aggregate on a fixed window. If I > set a small parallism number the whole job uses only one TaskManager, this > NPE will not happen. But when the job scales to two TaskManagers, the > TaskManager will crash at Create stage. The Flink version I use is 1.11.1. > > The NPE exception stack is: > > 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] > - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, > AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: > Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING > to FAILED. > java.io.IOException: Exception while applying AggregateFunction in > aggregating state > at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( > HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11- > 1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask > .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1 > .jar:1.11.1] > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar: > 1.11.1] > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1 > .jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar: > 1.11.1] > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar: > 1.11.1] > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: java.lang.NullPointerException > at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable > .java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( > HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1] > ... 13 more > My aggregate code is > > public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, > DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> { > > @Override public Map<DataKey, DataIndex> createAccumulator() { > return new HashMap<>(); > } > > @Override public Map<DataKey, DataIndex> add(Tuple2<DataKey, > DataIndex> value, Map<DataKey, DataIndex> accumulator) { > accumulator.merge(value.f0, value.f1, DataIndex::add); > return accumulator; > } > > @Override public Map<DataKey, DataIndex> getResult(Map<DataKey, > DataIndex> accumulator) { > return accumulator; > } > > @Override public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> > a, Map<DataKey, DataIndex> b) { > a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, > DataIndex::add)); > return b; > } > } > > Could anyone know something about this NPE, thanks! > -- > Best regards > > Sili Liu > > -- Best regards Sili Liu