[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119852#comment-16119852 ]
ASF GitHub Bot commented on FLINK-7169: --------------------------------------- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132139074 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -322,6 +336,64 @@ public void resetNFAChanged() { } + Set<T> discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (Map<String, List<T>> resultMap: result) { + boolean matched = false; --- End diff -- How about: for (Map<String, List<T>> resultMap: result) { for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) { if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); break; } else { discardEvents.addAll(keyMatches.getValue()); } } } This way we will stop whenever we reach the matching pattern. > Support AFTER MATCH SKIP function in CEP library API > ---------------------------------------------------- > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP > Reporter: Yueting Chen > Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)