[ https://issues.apache.org/jira/browse/FLINK-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049206#comment-16049206 ]
ASF GitHub Bot commented on FLINK-6904: --------------------------------------- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4121#discussion_r121953747 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java --- @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception { )); } + @Test + public void testTimesRangeNonStrictOptional1() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictOptional2() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictOptional3() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictWithNext() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNotStrictWithFollowedByEager() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNotStrictWithFollowedByNotEager() { --- End diff -- Agree, addressed in the new patch. > Support for quantifier range to CEP's pattern API > ------------------------------------------------- > > Key: FLINK-6904 > URL: https://issues.apache.org/jira/browse/FLINK-6904 > Project: Flink > Issue Type: Bug > Components: CEP > Reporter: Dian Fu > Assignee: Dian Fu > > Currently the quantifier has supported oneOrMore, times(int times), one(),we > should also support API such as times(int from, int to) to specify a > quantifier range. -- This message was sent by Atlassian JIRA (v6.4.14#64029)