[ https://issues.apache.org/jira/browse/FLINK-5947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891584#comment-15891584 ]
Xiaojun Jin commented on FLINK-5947: ------------------------------------ Yes. I have maintained a separate version of flink in my company and delelop a streaming SQL product based on flink. I have rebased the code form branch(release-1.2), fixed bugs and added the retracting extension into the flink. The exception occurs using ProcessingTimeSessionWindow and ContinuesProcessingTimeTrigger. The window merge action may happen before Trgginer.onElement and the fireTimestamp may be null before the clear process trigger timer > NullPointerException in ContinuousProcessingTimeTrigger.clear() > --------------------------------------------------------------- > > Key: FLINK-5947 > URL: https://issues.apache.org/jira/browse/FLINK-5947 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0 > Reporter: Xiaojun Jin > Priority: Critical > Original Estimate: 12h > Remaining Estimate: 12h > > The fireTimestamp may be null when deleting processing timer in the > ContinuousProcessingTimerTrigger. Exception stack is as follows: > {quote} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger.clear(ContinuousProcessingTimeTrigger.java:89) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:761) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:348) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:336) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:210) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:336) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:70) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668) > at java.lang.Thread.run(Thread.java:745) > {quote} > The patch is as follows: > {code} > @@ -86,9 +86,10 @@ public class ContinuousProcessingTimeTrigger<W extends > Window> extends Trigger<O > @Override > public void clear(W window, TriggerContext ctx) throws Exception { > ReducingState<Long> fireTimestamp = > ctx.getPartitionedState(stateDesc); > - long timestamp = fireTimestamp.get(); > - ctx.deleteProcessingTimeTimer(timestamp); > - fireTimestamp.clear(); > + if (fireTimestamp.get() != null) { > + ctx.deleteProcessingTimeTimer(fireTimestamp.get()); > + fireTimestamp.clear(); > + } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)