[ https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116442#comment-16116442 ]
ASF GitHub Bot commented on FLINK-7147: --------------------------------------- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131629357 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java --- @@ -0,0 +1,1068 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyZeroOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreConsecutiveEndWithOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).optional(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c), + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreInBetween() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2)); + inputEvents.add(new StreamRecord<>(a1, 3)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4)); + inputEvents.add(new StreamRecord<>(a2, 5)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6)); + inputEvents.add(new StreamRecord<>(a3, 7)); + inputEvents.add(new StreamRecord<>(d, 8)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, d) + )); + } + + @Test + public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, d) + )); + } + + @Test + public void testGreedyZeroOrMoreBeforeOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event e = new Event(44, "e", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(e, 5)); + + // c a* d e + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).optional().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, e) + )); + } + + @Test + public void testGreedyZeroOrMoreBeforeOptional2() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event d = new Event(43, "d", 3.0); + Event e = new Event(44, "e", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(d, 4)); + inputEvents.add(new StreamRecord<>(e, 5)); + + // c a* d e + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).optional().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, e), + Lists.newArrayList(c, a1, a2, d, e) + )); + } + + @Test + public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() { --- End diff -- misspell: `testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier ` -> `testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier ` > Support greedy quantifier in CEP > -------------------------------- > > Key: FLINK-7147 > URL: https://issues.apache.org/jira/browse/FLINK-7147 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL > Reporter: Dian Fu > Assignee: Dian Fu > > Greedy quantifier will try to match the token as many times as possible. For > example, for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 > c}}, if the quantifier for {{b}} is greedy, it will only output {{a b1 b2 c}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)