[ https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924507#comment-15924507 ]
ASF GitHub Bot commented on FLINK-5713: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105956185 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- The null check is not needed here since since we know from the if block we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in fact have merging windows. Now that I look at it, though, I realise that the `mergingWindows.persist()` call is not necessary because we already call it at the end of the if block. So thanks for making me notice! 😃 I moved the call out of `clearAllState()` in the first place because all places where `clearAllState()` are called already persist afterwards. See, for example, `onEventTime()` and `onProcessingTime()`. > Protect against NPE in WindowOperator window cleanup > ---------------------------------------------------- > > Key: FLINK-5713 > URL: https://issues.apache.org/jira/browse/FLINK-5713 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Affects Versions: 1.2.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.2.1 > > > Some (misbehaved) WindowAssigners can cause windows to be dropped from the > merging window set while a cleanup timer is still active. This will trigger a > NullPointerException when that timer fires. -- This message was sent by Atlassian JIRA (v6.3.15#6346)