[ https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924386#comment-15924386 ]
ASF GitHub Bot commented on FLINK-5713: --------------------------------------- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105931848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- Why move `mergingWindows.persist()` from `clearAllState` to here, And we need not do the null check? How about ``` if (mergingWindows != null) { mergingWindows.persist(); } ``` > Protect against NPE in WindowOperator window cleanup > ---------------------------------------------------- > > Key: FLINK-5713 > URL: https://issues.apache.org/jira/browse/FLINK-5713 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Affects Versions: 1.2.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.2.1 > > > Some (misbehaved) WindowAssigners can cause windows to be dropped from the > merging window set while a cleanup timer is still active. This will trigger a > NullPointerException when that timer fires. -- This message was sent by Atlassian JIRA (v6.3.15#6346)