Xiaojun Jin created FLINK-5947:
----------------------------------

             Summary: NullPointerException in 
ContinuousProcessingTimeTrigger.clear()
                 Key: FLINK-5947
                 URL: https://issues.apache.org/jira/browse/FLINK-5947
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.2.0
            Reporter: Xiaojun Jin
            Priority: Critical


The fireTimestamp may  be null when deleting processing timer in the 
ContinuousProcessingTimerTrigger. Exception stack is as follows:
{quote}
Caused by: java.lang.NullPointerException
        at 
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger.clear(ContinuousProcessingTimeTrigger.java:89)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:761)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:348)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:336)
        at 
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:210)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:336)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:70)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
        at java.lang.Thread.run(Thread.java:745)

{quote}

The patch is as follows:
{code}
@@ -86,9 +86,10 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> extends Trigger<O
        @Override
        public void clear(W window, TriggerContext ctx) throws Exception {
                ReducingState<Long> fireTimestamp = 
ctx.getPartitionedState(stateDesc);
-               long timestamp = fireTimestamp.get();
-               ctx.deleteProcessingTimeTimer(timestamp);
-               fireTimestamp.clear();
+               if (fireTimestamp.get() != null) {
+                       ctx.deleteProcessingTimeTimer(fireTimestamp.get());
+                       fireTimestamp.clear();
+               }
        }
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to