dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r904446329
########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ########## @@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() { } } + private void checkWindowTimeBetweenEvents(Time windowTime, WithinType withinType) { + if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType) + && windowTimes.containsKey(WithinType.FIRST_AND_LAST) + && windowTime.toMilliseconds() + > windowTimes.get(WithinType.FIRST_AND_LAST).toMilliseconds()) { + throw new MalformedPatternException( + "Window length between the previous and current event cannot be larger than which between the first and last event for pattern."); Review Comment: ```suggestion "Window length between the previous and current event cannot be larger than the window length between the first and last event for pattern."); ``` ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ########## @@ -348,10 +370,28 @@ public Pattern<T, F> optional() { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> oneOrMore() { + return oneOrMore(null); + } + + /** + * Specifies that this pattern can occur {@code one or more} times and time interval corresponds + * to the maximum time gap between previous and current event for each times. This means at + * least one and at most infinite number of events can be matched to this pattern. + * + * <p>If this quantifier is enabled for a pattern {@code A.oneOrMore().followedBy(B)} and a + * sequence of events {@code A1 A2 B} appears, this will generate patterns: {@code A1 B} and + * {@code A1 A2 B}. See also {@link #allowCombinations()}. + * + * @param windowTimes mapping between times and time of the matching window. + * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier + * applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> oneOrMore(Map<Integer, Time> windowTimes) { Review Comment: I guess *oneOrMore(Time windowTime)* is enough. ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ########## @@ -394,14 +447,28 @@ public Pattern<T, F> times(int times) { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> times(int from, int to) { + return times(from, to, null); + } + + /** + * Specifies that the pattern can occur between from and to times with time interval corresponds + * to the maximum time gap between previous and current event for each times. + * + * @param from number of times matching event must appear at least + * @param to number of times matching event must appear at most + * @param windowTimes mapping between times and time of the matching window. + * @return The same pattern with the number of times range applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> times(int from, int to, Map<Integer, Time> windowTimes) { Review Comment: ditto ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -174,6 +182,7 @@ void compileFactory() { if (lastPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW + && !windowTimes.containsKey(lastPattern.getName()) Review Comment: && (!windowTimes.containsKey(lastPattern.getName()) || windowTimes.get(lastPattern.getName()) <= 0) ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ########## @@ -613,6 +643,7 @@ private Collection<ComputationState> computeNextStates( int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); + final long stateTimestamp = event.getTimestamp(); Review Comment: Move it into `case TAKE`? ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java: ########## @@ -41,6 +41,9 @@ public class ComputationState { // Timestamp of the first element in the pattern private final long startTimestamp; + // Timestamp of the previous element in the state + private final long stateTimestamp; Review Comment: ```suggestion private final long previousTimestamp; ``` ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ########## @@ -91,6 +92,13 @@ */ private final Map<String, State<T>> states; + /** + * The lengths of a windowed pattern, as specified using the {@link + * org.apache.flink.cep.pattern.Pattern#within(Time, Pattern.WithinType)} Pattern.within(Time, + * WithinType)} method with {@code WithinType.PREVIOUS_AND_CURRENT}. + */ + private final Map<String, Long> windowTimes; Review Comment: Do you think it makes sense to move the windowTime between previous and current event to *State*? ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ########## @@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() { } } + private void checkWindowTimeBetweenEvents(Time windowTime, WithinType withinType) { + if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType) Review Comment: If may happen that PREVIOUS_AND_CURRENT is set before FIRST_AND_LAST. In this case, this validation will be skipped. For example: ` Pattern.begin("a").followedBy("b").withIn(Time.seconds(20), WithinType.PREVIOUS_AND_CURRENT).withIn(Time.seconds(10)) ` ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ########## @@ -601,4 +694,12 @@ public String toString() { + afterMatchSkipStrategy + '}'; } + + /** Type enum of time interval corresponds to the maximum time gap between events. */ + public enum WithinType { Review Comment: Move it to a separate file and make it as a standalone class. You can move it to package *org.apache.flink.cep.pattern* ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -308,9 +321,10 @@ private State<T> createMiddleStates(final State<T> sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions - if (getWindowTime() > 0 && lastSink.isFinal()) { Review Comment: (currentPattern.getWindowTime(Pattern.WithinType.PREVIOUS_AND_CURRENT) != null || getWindowTime() > 0) && lastSink.isFinal() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org