Thanks for your help. After I replaced com.google.common.base.Objects.hashCode with toString().hashCode(), the NPE problem is solved.
Arvid Heise <ar...@apache.org> 于2021年4月13日周二 下午11:40写道: > To second Dawids question: are all fields final or is it possible that > their values are changing? > > On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <unix...@gmail.com> wrote: > >> Hi,Dawid, >> >> Thanks for your help. I use com.google.common.base.Objects.hashCode, pass >> all fields to it and generate a hashcode, and the equal method also compare >> all the fields. >> >> Dawid Wysakowicz <dwysakow...@apache.org> 于2021年4月13日周二 下午8:10写道: >> >>> Hi, >>> >>> Could you check that your grouping key has a stable hashcode and equals? >>> It is very likely caused by an unstable hashcode and that a record with an >>> incorrect key ends up on a wrong task manager. >>> >>> Best, >>> >>> Dawid >>> On 13/04/2021 08:47, Si-li Liu wrote: >>> >>> Hi, >>> >>> I encounter a weird NPE when try to do aggregate on a fixed window. If I >>> set a small parallism number the whole job uses only one TaskManager, this >>> NPE will not happen. But when the job scales to two TaskManagers, the >>> TaskManager will crash at Create stage. The Flink version I use is 1.11.1. >>> >>> The NPE exception stack is: >>> >>> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task >>> [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, >>> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: >>> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING >>> to FAILED. >>> java.io.IOException: Exception while applying AggregateFunction in >>> aggregating state >>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( >>> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11 >>> -1.11.1.jar:1.11.1] >>> at org.apache.flink.streaming.runtime.tasks. >>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask >>> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11. >>> 1.jar:1.11.1] >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar: >>> 1.11.1] >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1 >>> .jar:1.11.1] >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar: >>> 1.11.1] >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar: >>> 1.11.1] >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1 >>> ] >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> [flink-dist_2.11-1.11.1.jar:1.11.1] >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> [flink-dist_2.11-1.11.1.jar:1.11.1] >>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] >>> Caused by: java.lang.NullPointerException >>> at org.apache.flink.runtime.state.heap.StateTable.transform( >>> StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add( >>> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1] >>> ... 13 more >>> My aggregate code is >>> >>> public class AggregateDataEntry implements >>> AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, >>> Map<DataKey, DataIndex>> { >>> >>> @Override public Map<DataKey, DataIndex> createAccumulator() { >>> return new HashMap<>(); >>> } >>> >>> @Override public Map<DataKey, DataIndex> add(Tuple2<DataKey, >>> DataIndex> value, Map<DataKey, DataIndex> accumulator) { >>> accumulator.merge(value.f0, value.f1, DataIndex::add); >>> return accumulator; >>> } >>> >>> @Override public Map<DataKey, DataIndex> getResult(Map<DataKey, >>> DataIndex> accumulator) { >>> return accumulator; >>> } >>> >>> @Override public Map<DataKey, DataIndex> merge(Map<DataKey, >>> DataIndex> a, Map<DataKey, DataIndex> b) { >>> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, >>> DataIndex::add)); >>> return b; >>> } >>> } >>> >>> Could anyone know something about this NPE, thanks! >>> -- >>> Best regards >>> >>> Sili Liu >>> >>> >> >> -- >> Best regards >> >> Sili Liu >> > -- Best regards Sili Liu