[ https://issues.apache.org/jira/browse/FLINK-9164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436512#comment-16436512 ]
Kostas Kloudas commented on FLINK-9164: --------------------------------------- Hi Romain, What if you remove the skipPastLastEvent()? I suspect that the reason is that you match the first 'a' and then you skip till the next 'a'. And this goes on, so you never match a pair. Kostas > times(#,#) quantifier does not seem to work > ------------------------------------------- > > Key: FLINK-9164 > URL: https://issues.apache.org/jira/browse/FLINK-9164 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.4.2 > Environment: Windows 10 Pro 64-bit > Core i7-6820HQ @ 2.7 GHz > 16GB RAM > Flink 1.4.2 > Scala client > Scala 2.11.12 > Reporter: Romain Revol > Priority: Major > > Assuming the following piece of code : > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > val inputs = env.fromElements((1L, 'a'), (20L, 'a'), (22L, 'a'), (30L, 'a'), > (40L, 'a')) > .assignAscendingTimestamps(_._1) > val pattern = Pattern.begin[(Long, Char)]("Start", > AfterMatchSkipStrategy.skipPastLastEvent()) > .where(_._2 == 'a').times(1,2) > CEP.pattern(inputs, pattern).select(_("Start")).addSink(println(_)) > env.execute("Test"{code} > > This results in > {code:java} > Buffer((1,a)) > Buffer((20,a)) > Buffer((22,a)) > Buffer((30,a)) > Buffer((40,a)){code} > While I would expect > {code:java} > Buffer((1,a), (20,a)) > Buffer((22,a), (30,a)) > Buffer((40,a){code} > My purpose is to match events by pair if possible, or alone if not. Note that > adding greedy does nothing mode but this may be due to > https://issues.apache.org/jira/browse/FLINK-8914. -- This message was sent by Atlassian JIRA (v7.6.3#76005)