To second Dawids question: are all fields final or is it possible that their values are changing?
On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <unix...@gmail.com> wrote: > Hi,Dawid, > > Thanks for your help. I use com.google.common.base.Objects.hashCode, pass > all fields to it and generate a hashcode, and the equal method also compare > all the fields. > > Dawid Wysakowicz <dwysakow...@apache.org> 于2021年4月13日周二 下午8:10写道: > >> Hi, >> >> Could you check that your grouping key has a stable hashcode and equals? >> It is very likely caused by an unstable hashcode and that a record with an >> incorrect key ends up on a wrong task manager. >> >> Best, >> >> Dawid >> On 13/04/2021 08:47, Si-li Liu wrote: >> >> Hi, >> >> I encounter a weird NPE when try to do aggregate on a fixed window. If I >> set a small parallism number the whole job uses only one TaskManager, this >> NPE will not happen. But when the job scales to two TaskManagers, the >> TaskManager will crash at Create stage. The Flink version I use is 1.11.1. >> >> The NPE exception stack is: >> >> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task >> [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, >> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: >> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING >> to FAILED. >> java.io.IOException: Exception while applying AggregateFunction in >> aggregating state >> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( >> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.streaming.runtime.operators.windowing. >> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11- >> 1.11.1.jar:1.11.1] >> at org.apache.flink.streaming.runtime.tasks. >> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask >> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1 >> .jar:1.11.1] >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar: >> 1.11.1] >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1 >> .jar:1.11.1] >> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar: >> 1.11.1] >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar: >> 1.11.1] >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> [flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> [flink-dist_2.11-1.11.1.jar:1.11.1] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] >> Caused by: java.lang.NullPointerException >> at org.apache.flink.runtime.state.heap.StateTable.transform( >> StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( >> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> ... 13 more >> My aggregate code is >> >> public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, >> DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> { >> >> @Override public Map<DataKey, DataIndex> createAccumulator() { >> return new HashMap<>(); >> } >> >> @Override public Map<DataKey, DataIndex> add(Tuple2<DataKey, >> DataIndex> value, Map<DataKey, DataIndex> accumulator) { >> accumulator.merge(value.f0, value.f1, DataIndex::add); >> return accumulator; >> } >> >> @Override public Map<DataKey, DataIndex> getResult(Map<DataKey, >> DataIndex> accumulator) { >> return accumulator; >> } >> >> @Override public Map<DataKey, DataIndex> merge(Map<DataKey, >> DataIndex> a, Map<DataKey, DataIndex> b) { >> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, >> DataIndex::add)); >> return b; >> } >> } >> >> Could anyone know something about this NPE, thanks! >> -- >> Best regards >> >> Sili Liu >> >> > > -- > Best regards > > Sili Liu >