Hi,Dawid,

Thanks for your help. I use com.google.common.base.Objects.hashCode, pass
all fields to it and generate a hashcode, and the equal method also compare
all the fields.

Dawid Wysakowicz <dwysakow...@apache.org> 于2021年4月13日周二 下午8:10写道:

> Hi,
>
> Could you check that your grouping key has a stable hashcode and equals?
> It is very likely caused by an unstable hashcode and that a record with an
> incorrect key ends up on a wrong task manager.
>
> Best,
>
> Dawid
> On 13/04/2021 08:47, Si-li Liu wrote:
>
> Hi,
>
> I encounter a weird NPE when try to do aggregate on a fixed window. If I
> set a small parallism number the whole job uses only one TaskManager, this
> NPE will not happen. But when the job scales to two TaskManagers, the
> TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
>
> The NPE exception stack is:
>
> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task []
> - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink:
> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING
> to FAILED.
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
>     at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-
> 1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable
> .java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     ... 13 more
> My aggregate code is
>
> public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, 
> DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {
>
>     @Override    public Map<DataKey, DataIndex> createAccumulator() {
>         return new HashMap<>();
>     }
>
>     @Override    public Map<DataKey, DataIndex> add(Tuple2<DataKey, 
> DataIndex> value, Map<DataKey, DataIndex> accumulator) {
>         accumulator.merge(value.f0, value.f1, DataIndex::add);
>         return accumulator;
>     }
>
>     @Override    public Map<DataKey, DataIndex> getResult(Map<DataKey, 
> DataIndex> accumulator) {
>         return accumulator;
>     }
>
>     @Override    public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> 
> a, Map<DataKey, DataIndex> b) {
>         a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
> DataIndex::add));
>         return b;
>     }
> }
>
> Could anyone know something about this NPE, thanks!
> --
> Best regards
>
> Sili Liu
>
>

-- 
Best regards

Sili Liu

Reply via email to