Thanks for your help.

After I replaced com.google.common.base.Objects.hashCode with
toString().hashCode(), the NPE problem is solved.

Arvid Heise <ar...@apache.org> 于2021年4月13日周二 下午11:40写道:

> To second Dawids question: are all fields final or is it possible that
> their values are changing?
>
> On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu <unix...@gmail.com> wrote:
>
>> Hi,Dawid,
>>
>> Thanks for your help. I use com.google.common.base.Objects.hashCode, pass
>> all fields to it and generate a hashcode, and the equal method also compare
>> all the fields.
>>
>> Dawid Wysakowicz <dwysakow...@apache.org> 于2021年4月13日周二 下午8:10写道:
>>
>>> Hi,
>>>
>>> Could you check that your grouping key has a stable hashcode and equals?
>>> It is very likely caused by an unstable hashcode and that a record with an
>>> incorrect key ends up on a wrong task manager.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 13/04/2021 08:47, Si-li Liu wrote:
>>>
>>> Hi,
>>>
>>> I encounter a weird NPE when try to do aggregate on a fixed window. If I
>>> set a small parallism number the whole job uses only one TaskManager, this
>>> NPE will not happen. But when the job scales to two TaskManagers, the
>>> TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
>>>
>>> The NPE exception stack is:
>>>
>>> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
>>> [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
>>> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink:
>>> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING
>>> to FAILED.
>>> java.io.IOException: Exception while applying AggregateFunction in
>>> aggregating state
>>>     at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>>> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11
>>> -1.11.1.jar:1.11.1]
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.
>>> 1.jar:1.11.1]
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:
>>> 1.11.1]
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1
>>> .jar:1.11.1]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:
>>> 1.11.1]
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:
>>> 1.11.1]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1
>>> ]
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>>> Caused by: java.lang.NullPointerException
>>>     at org.apache.flink.runtime.state.heap.StateTable.transform(
>>> StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>>> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>>     ... 13 more
>>> My aggregate code is
>>>
>>> public class AggregateDataEntry implements 
>>> AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>, 
>>> Map<DataKey, DataIndex>> {
>>>
>>>     @Override    public Map<DataKey, DataIndex> createAccumulator() {
>>>         return new HashMap<>();
>>>     }
>>>
>>>     @Override    public Map<DataKey, DataIndex> add(Tuple2<DataKey, 
>>> DataIndex> value, Map<DataKey, DataIndex> accumulator) {
>>>         accumulator.merge(value.f0, value.f1, DataIndex::add);
>>>         return accumulator;
>>>     }
>>>
>>>     @Override    public Map<DataKey, DataIndex> getResult(Map<DataKey, 
>>> DataIndex> accumulator) {
>>>         return accumulator;
>>>     }
>>>
>>>     @Override    public Map<DataKey, DataIndex> merge(Map<DataKey, 
>>> DataIndex> a, Map<DataKey, DataIndex> b) {
>>>         a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
>>> DataIndex::add));
>>>         return b;
>>>     }
>>> }
>>>
>>> Could anyone know something about this NPE, thanks!
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Reply via email to