[ 
https://issues.apache.org/jira/browse/FLINK-5947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891584#comment-15891584
 ] 

Xiaojun Jin commented on FLINK-5947:
------------------------------------

Yes. I have maintained a separate version of flink in my company and delelop a 
streaming SQL product based on flink.
I have rebased the code form branch(release-1.2), fixed bugs and added the 
retracting extension into the flink.
The exception occurs using ProcessingTimeSessionWindow and 
ContinuesProcessingTimeTrigger. The window merge action may happen  before 
Trgginer.onElement and the fireTimestamp may be null before the clear process 
trigger timer


> NullPointerException in ContinuousProcessingTimeTrigger.clear()
> ---------------------------------------------------------------
>
>                 Key: FLINK-5947
>                 URL: https://issues.apache.org/jira/browse/FLINK-5947
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0
>            Reporter: Xiaojun Jin
>            Priority: Critical
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> The fireTimestamp may  be null when deleting processing timer in the 
> ContinuousProcessingTimerTrigger. Exception stack is as follows:
> {quote}
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger.clear(ContinuousProcessingTimeTrigger.java:89)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:761)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:348)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:336)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:210)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:336)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:70)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
>       at java.lang.Thread.run(Thread.java:745)
> {quote}
> The patch is as follows:
> {code}
> @@ -86,9 +86,10 @@ public class ContinuousProcessingTimeTrigger<W extends 
> Window> extends Trigger<O
>         @Override
>         public void clear(W window, TriggerContext ctx) throws Exception {
>                 ReducingState<Long> fireTimestamp = 
> ctx.getPartitionedState(stateDesc);
> -               long timestamp = fireTimestamp.get();
> -               ctx.deleteProcessingTimeTimer(timestamp);
> -               fireTimestamp.clear();
> +               if (fireTimestamp.get() != null) {
> +                       ctx.deleteProcessingTimeTimer(fireTimestamp.get());
> +                       fireTimestamp.clear();
> +               }
>         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to