Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6059#discussion_r194469640 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -296,42 +292,31 @@ public void resetNFAChanged() { if (shouldDiscardPath) { // a stop state was reached in this branch. release branch which results in removing previous event from // the buffer - for (final ComputationState<T> state : statesToRetain) { - eventSharedBuffer.release( - NFAStateNameHandler.getOriginalNameFromInternal( - state.getPreviousState().getName()), - state.getEvent(), - state.getTimestamp(), - state.getCounter()); + for (final ComputationState state : statesToRetain) { + sharedBuffer.releaseNode(state.getPreviousBufferEntry()); } } else { computationStates.addAll(statesToRetain); } } - discardComputationStatesAccordingToStrategy(computationStates, result, afterMatchSkipStrategy); - - // prune shared buffer based on window length - if (windowTime > 0L) { - long pruningTimestamp = timestamp - windowTime; - - if (pruningTimestamp < timestamp) { - // the check is to guard against underflows + discardComputationStatesAccordingToStrategy( + sharedBuffer, computationStates, result, afterMatchSkipStrategy); - // remove all elements which are expired - // with respect to the window length - if (eventSharedBuffer.prune(pruningTimestamp)) { - nfaChanged = true; - } - } + if (event.getEvent() == null) { --- End diff -- This is unfortunate way we currently push Watermark into NFA.
---