[ https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925371#comment-15925371 ]
ASF GitHub Bot commented on FLINK-5713: --------------------------------------- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r106065855 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- Yes,in fact,I had seen that, but I did not realize that it could be deleted.Haha, You are very quick-witted.:) Anyway, I must say thanks for your explaining. Best, SunJincheng > Protect against NPE in WindowOperator window cleanup > ---------------------------------------------------- > > Key: FLINK-5713 > URL: https://issues.apache.org/jira/browse/FLINK-5713 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Affects Versions: 1.2.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.2.1 > > > Some (misbehaved) WindowAssigners can cause windows to be dropped from the > merging window set while a cleanup timer is still active. This will trigger a > NullPointerException when that timer fires. -- This message was sent by Atlassian JIRA (v6.3.15#6346)