[ 
https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924386#comment-15924386
 ] 

ASF GitHub Bot commented on FLINK-5713:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3535#discussion_r105931848
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
    @@ -354,22 +354,27 @@ public void merge(W mergeResult,
                                        }
                                });
     
    -                           // drop if the window is already late
    -                           if (isLate(actualWindow)) {
    -                                   
mergingWindows.retireWindow(actualWindow);
    -                                   continue;
    -                           }
    +                           context.key = key;
    +                           context.window = actualWindow;
     
                                W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
                                if (stateWindow == null) {
                                        throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
                                }
     
                                windowState.setCurrentNamespace(stateWindow);
    -                           windowState.add(element.getValue());
     
    -                           context.key = key;
    -                           context.window = actualWindow;
    +                           // Drop if the window is already late. In rare 
cases (with a misbehaving
    +                           // WindowAssigner) it can happen that a window 
becomes late that already has
    +                           // state (contents, state and timers). That's 
why we first get the window state
    +                           // above and then drop everything.
    +                           if (isLate(actualWindow)) {
    +                                   clearAllState(actualWindow, 
windowState, mergingWindows);
    +                                   mergingWindows.persist();
    --- End diff --
    
    Why  move `mergingWindows.persist()` from  `clearAllState`  to here, And we 
need not do the null check? How about 
    ```
    if (mergingWindows != null) {
                        mergingWindows.persist();
    }
    ```


> Protect against NPE in WindowOperator window cleanup
> ----------------------------------------------------
>
>                 Key: FLINK-5713
>                 URL: https://issues.apache.org/jira/browse/FLINK-5713
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.2.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>             Fix For: 1.2.1
>
>
> Some (misbehaved) WindowAssigners can cause windows to be dropped from the 
> merging window set while a cleanup timer is still active. This will trigger a 
> NullPointerException when that timer fires.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to