Have you tried pattern like: /Pattern.begin[Event]("b", //AfterMatchSkipStrategy.skipPastLast//).where(...).followedBy("c").where(...).followedBy("e").where(...)/
The method followedBy(Pattern) constructs a Pattern with a subGroup pattern. The skip strategy there does not have any effect. Best, Dawid On 25/07/2019 16:50, Federico D'Ambrosio wrote: > Hello everyone, > > I need a bit of help concerning a correct formulation for a Complex > Event Pattern, using CEP. > > I have a stream of events which once keyed for ids, they may look like > this: > > a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1 > > what I want to achieve is to get, from a formulation similar to this: > > [1] b c e > > this: > > b1 c1 e1 > > that is, for each input stream, have an output composed of only the > first appearance of events of class b, c and e. > > I realize that a pattern formulated like [1] would also match: > > b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it. > > So, I tried using oneOrMore(), consecutive() and > AfterMatchSkipStrategy.skypToFirst, like this: > > val b = Pattern > .begin[Event]("b") > .where((value, _) => value.state == "b") > .oneOrMore().consecutive() > > val c = Pattern > .begin[Event]("c") > .where((value, _) => value.state == "c") > .oneOrMore().consecutive() > > val e = Pattern > .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b")) > .where((value, _) => value.state == "e") > .oneOrMore().consecutive() > > val pattern: Pattern[Event, _] = > b.followedBy(c).followedBy(e) > > In the process function I would do something like this: > > override def processMatch(matches: util.Map[String, util.List[Event]], > ctx: PatternProcessFunction.Context, > out: Collector[OutputEvent]): Unit = { > > val bEvent = matches.get("b").asScala.head > val cEvent = matches.get("c").asScala.head > val eEvent = matches.get("e").asScala.head > > out.collect(OutputEvent(bEvent, cEvent, eEvent)) > } > > But unfortunately it doesn't work like I want, which makes me think > I'm missing something within the functionalities of Flink CEP. > > What's the best way to achieve what I want? Is it possible? > Should I even use any AfterMatchSkipStrategy? > > Thank you, > Federico D'Ambrosio
signature.asc
Description: OpenPGP digital signature