dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r904446329


##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() {
         }
     }
 
+    private void checkWindowTimeBetweenEvents(Time windowTime, WithinType 
withinType) {
+        if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType)
+                && windowTimes.containsKey(WithinType.FIRST_AND_LAST)
+                && windowTime.toMilliseconds()
+                        > 
windowTimes.get(WithinType.FIRST_AND_LAST).toMilliseconds()) {
+            throw new MalformedPatternException(
+                    "Window length between the previous and current event 
cannot be larger than which between the first and last event for pattern.");

Review Comment:
   ```suggestion
                       "Window length between the previous and current event 
cannot be larger than the window length between the first and last event for 
pattern.");
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -348,10 +370,28 @@ public Pattern<T, F> optional() {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> oneOrMore() {
+        return oneOrMore(null);
+    }
+
+    /**
+     * Specifies that this pattern can occur {@code one or more} times and 
time interval corresponds
+     * to the maximum time gap between previous and current event for each 
times. This means at
+     * least one and at most infinite number of events can be matched to this 
pattern.
+     *
+     * <p>If this quantifier is enabled for a pattern {@code 
A.oneOrMore().followedBy(B)} and a
+     * sequence of events {@code A1 A2 B} appears, this will generate 
patterns: {@code A1 B} and
+     * {@code A1 A2 B}. See also {@link #allowCombinations()}.
+     *
+     * @param windowTimes mapping between times and time of the matching 
window.
+     * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+     *     applied.
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> oneOrMore(Map<Integer, Time> windowTimes) {

Review Comment:
   I guess *oneOrMore(Time windowTime)* is enough.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -394,14 +447,28 @@ public Pattern<T, F> times(int times) {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> times(int from, int to) {
+        return times(from, to, null);
+    }
+
+    /**
+     * Specifies that the pattern can occur between from and to times with 
time interval corresponds
+     * to the maximum time gap between previous and current event for each 
times.
+     *
+     * @param from number of times matching event must appear at least
+     * @param to number of times matching event must appear at most
+     * @param windowTimes mapping between times and time of the matching 
window.
+     * @return The same pattern with the number of times range applied
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> times(int from, int to, Map<Integer, Time> 
windowTimes) {

Review Comment:
   ditto



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -174,6 +182,7 @@ void compileFactory() {
 
             if (lastPattern.getQuantifier().getConsumingStrategy()
                             == Quantifier.ConsumingStrategy.NOT_FOLLOW
+                    && !windowTimes.containsKey(lastPattern.getName())

Review Comment:
   && (!windowTimes.containsKey(lastPattern.getName()) || 
windowTimes.get(lastPattern.getName()) <= 0)



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##########
@@ -613,6 +643,7 @@ private Collection<ComputationState> computeNextStates(
         int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
         int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() 
- 1);
 
+        final long stateTimestamp = event.getTimestamp();

Review Comment:
   Move it into `case TAKE`?



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java:
##########
@@ -41,6 +41,9 @@ public class ComputationState {
     // Timestamp of the first element in the pattern
     private final long startTimestamp;
 
+    // Timestamp of the previous element in the state
+    private final long stateTimestamp;

Review Comment:
   ```suggestion
       private final long previousTimestamp;
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##########
@@ -91,6 +92,13 @@
      */
     private final Map<String, State<T>> states;
 
+    /**
+     * The lengths of a windowed pattern, as specified using the {@link
+     * org.apache.flink.cep.pattern.Pattern#within(Time, Pattern.WithinType)} 
Pattern.within(Time,
+     * WithinType)} method with {@code WithinType.PREVIOUS_AND_CURRENT}.
+     */
+    private final Map<String, Long> windowTimes;

Review Comment:
   Do you think it makes sense to move the windowTime between previous and 
current event to *State*?



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() {
         }
     }
 
+    private void checkWindowTimeBetweenEvents(Time windowTime, WithinType 
withinType) {
+        if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType)

Review Comment:
   If may happen that PREVIOUS_AND_CURRENT is set before FIRST_AND_LAST. In 
this case, this validation will be skipped. For example:
   `
   Pattern.begin("a").followedBy("b").withIn(Time.seconds(20), 
WithinType.PREVIOUS_AND_CURRENT).withIn(Time.seconds(10))
   `



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -601,4 +694,12 @@ public String toString() {
                 + afterMatchSkipStrategy
                 + '}';
     }
+
+    /** Type enum of time interval corresponds to the maximum time gap between 
events. */
+    public enum WithinType {

Review Comment:
   Move it to a separate file and make it as a standalone class. You can move 
it to package *org.apache.flink.cep.pattern*



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -308,9 +321,10 @@ private State<T> createMiddleStates(final State<T> 
sinkState) {
                 if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                     // skip notFollow patterns, they are converted into edge 
conditions
-                    if (getWindowTime() > 0 && lastSink.isFinal()) {

Review Comment:
   (currentPattern.getWindowTime(Pattern.WithinType.PREVIOUS_AND_CURRENT)
                                       != null
                               || getWindowTime() > 0) && lastSink.isFinal()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to