To second Dawids question: are all fields final or is it possible that
their values are changing?

On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <unix...@gmail.com> wrote:

> Hi,Dawid,
>
> Thanks for your help. I use com.google.common.base.Objects.hashCode, pass
> all fields to it and generate a hashcode, and the equal method also compare
> all the fields.
>
> Dawid Wysakowicz <dwysakow...@apache.org> 于2021年4月13日周二 下午8:10写道:
>
>> Hi,
>>
>> Could you check that your grouping key has a stable hashcode and equals?
>> It is very likely caused by an unstable hashcode and that a record with an
>> incorrect key ends up on a wrong task manager.
>>
>> Best,
>>
>> Dawid
>> On 13/04/2021 08:47, Si-li Liu wrote:
>>
>> Hi,
>>
>> I encounter a weird NPE when try to do aggregate on a fixed window. If I
>> set a small parallism number the whole job uses only one TaskManager, this
>> NPE will not happen. But when the job scales to two TaskManagers, the
>> TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
>>
>> The NPE exception stack is:
>>
>> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
>> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink:
>> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING
>> to FAILED.
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>>     at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-
>> 1.11.1.jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
>> .jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:
>> 1.11.1]
>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1
>> .jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:
>> 1.11.1]
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:
>> 1.11.1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>> Caused by: java.lang.NullPointerException
>>     at org.apache.flink.runtime.state.heap.StateTable.transform(
>> StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>     ... 13 more
>> My aggregate code is
>>
>> public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, 
>> DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {
>>
>>     @Override    public Map<DataKey, DataIndex> createAccumulator() {
>>         return new HashMap<>();
>>     }
>>
>>     @Override    public Map<DataKey, DataIndex> add(Tuple2<DataKey, 
>> DataIndex> value, Map<DataKey, DataIndex> accumulator) {
>>         accumulator.merge(value.f0, value.f1, DataIndex::add);
>>         return accumulator;
>>     }
>>
>>     @Override    public Map<DataKey, DataIndex> getResult(Map<DataKey, 
>> DataIndex> accumulator) {
>>         return accumulator;
>>     }
>>
>>     @Override    public Map<DataKey, DataIndex> merge(Map<DataKey, 
>> DataIndex> a, Map<DataKey, DataIndex> b) {
>>         a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
>> DataIndex::add));
>>         return b;
>>     }
>> }
>>
>> Could anyone know something about this NPE, thanks!
>> --
>> Best regards
>>
>> Sili Liu
>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to