ambition created FLINK-10674: -------------------------------- Summary: DistinctAccumulator.remove lead to NPE Key: FLINK-10674 URL: https://issues.apache.org/jira/browse/FLINK-10674 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 1.6.1 Environment: Flink 1.6.0 Reporter: ambition Attachments: image-2018-10-25-14-46-03-373.png
Our online Flink Job run about a week,job contain sql : {code:java} select `time`, lower(trim(os_type)) as os_type, count(distinct feed_id) as feed_total_view from my_table group by `time`, lower(trim(os_type)){code} then occur NPE: {code:java} java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109) at NonWindowedAggregationHelper$894.retract(Unknown Source) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} View DistinctAccumulator.remove !image-2018-10-25-14-46-03-373.png! this NPE should currentCnt = null lead to, so we simple handle like : {code:java} def remove(params: Row): Boolean = { if(!distinctValueMap.contains(params)){ true }else{ val currentCnt = distinctValueMap.get(params) // if (currentCnt == null || currentCnt == 1) { distinctValueMap.remove(params) true } else { var value = currentCnt - 1L if(value < 0){ value = 1 } distinctValueMap.put(params, value) false } } }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)