[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101358#comment-16101358 ]
ASF GitHub Bot commented on FLINK-7169: --------------------------------------- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129515022 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState<T> startComputationState = createStartComputationState(computationState, outgoingEdges); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection<ComputationState<T>> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); --- End diff -- Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process at the first or last matched event in specified pattern. For example, for a given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new `startComputationState` after `b1` is being processed. And the next match should start at event `b1`. So we need to manually feed `b1` to the newly created `startComputationState`. > Support AFTER MATCH SKIP function in CEP library API > ---------------------------------------------------- > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP > Reporter: Yueting Chen > Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)