Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6171#discussion_r198473858 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -330,77 +328,85 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta } } - discardComputationStatesAccordingToStrategy( - sharedBuffer, computationStates, result, afterMatchSkipStrategy); + if (!potentialMatches.isEmpty()) { + nfaState.setStateChanged(); + } + + List<Map<String, List<T>>> result = new ArrayList<>(); + if (afterMatchSkipStrategy.isSkipStrategy()) { + processMatchesAccordingToSkipStrategy(sharedBuffer, + nfaState, + afterMatchSkipStrategy, + potentialMatches, + result); + } else { + for (ComputationState match : potentialMatches) { + result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(), + match.getVersion()).get(0))); + sharedBuffer.releaseNode(match.getPreviousBufferEntry()); + } + } return result; } - private void discardComputationStatesAccordingToStrategy( - final SharedBuffer<T> sharedBuffer, - final Queue<ComputationState> computationStates, - final Collection<Map<String, List<T>>> matchedResult, - final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { + private void processMatchesAccordingToSkipStrategy( + SharedBuffer<T> sharedBuffer, + NFAState nfaState, + AfterMatchSkipStrategy afterMatchSkipStrategy, + PriorityQueue<ComputationState> potentialMatches, + List<Map<String, List<T>>> result) throws Exception { - Set<T> discardEvents = new HashSet<>(); - switch(afterMatchSkipStrategy.getStrategy()) { - case SKIP_TO_LAST: - for (Map<String, List<T>> resultMap: matchedResult) { - for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) { - if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { - discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); - break; - } else { - discardEvents.addAll(keyMatches.getValue()); - } - } - } - break; - case SKIP_TO_FIRST: - for (Map<String, List<T>> resultMap: matchedResult) { - for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) { - if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { - break; - } else { - discardEvents.addAll(keyMatches.getValue()); - } - } - } - break; - case SKIP_PAST_LAST_EVENT: - for (Map<String, List<T>> resultMap: matchedResult) { - for (List<T> eventList: resultMap.values()) { - discardEvents.addAll(eventList); - } - } - break; - } - if (!discardEvents.isEmpty()) { - List<ComputationState> discardStates = new ArrayList<>(); - for (ComputationState computationState : computationStates) { - boolean discard = false; - Map<String, List<T>> partialMatch = extractCurrentMatches(sharedBuffer, computationState); - for (List<T> list: partialMatch.values()) { - for (T e: list) { - if (discardEvents.contains(e)) { - // discard the computation state. - discard = true; - break; - } - } - if (discard) { - break; - } - } - if (discard) { - sharedBuffer.releaseNode(computationState.getPreviousBufferEntry()); - discardStates.add(computationState); - } + nfaState.getCompletedMatches().addAll(potentialMatches); + + ComputationState earliestMatch = nfaState.getCompletedMatches().peek(); + + if (earliestMatch != null) { + Queue<ComputationState> sortedPartialMatches = sortByStartTime(nfaState.getPartialMatches()); --- End diff -- Instead of sorting every time, why not keeping the partial matches in a priority queue?
---