In general I would strongly encourage you to find a way to `key` your stream, it will make everything much simpler.
On Thu, Nov 4, 2021 at 6:05 PM Seth Wiesman <sjwies...@gmail.com> wrote: > Why not? > > All those classes have a Symbol attribute, why can't you use that to key > the stream? > > On Thu, Nov 4, 2021 at 5:51 PM Isidoros Ioannou <akis3...@gmail.com> > wrote: > >> Hi Seth, >> >> thank you for your answer. >> In this case you are right and it would solve my problem. but actually my >> case is a bit more complex and my mistake I wanted to provide a simple >> example. >> >> The actual case is, >> >> I have DataStream< ServerAwareMessage > inputStream as a source , >> Message is just an interface. The events can be of certain subtypes >> >> 1) class OrderRequest implements ServerAwareMessage { >> String symbol >> String orderType >> } >> >> 2) class OrderActivated implements ServerAwareMessage { >> String symbol >> String orderType >> long orderId >> } >> >> 3) class DealPerformed implements ServerAwareMessage { >> String symbol >> String orderType >> } >> >> 4) class OrderFilled implements ServerAwareMessage { >> String symbol >> String orderType >> long orderId >> } >> >> And here is the pattern sequence. >> >> Pattern.<ServerAwareMessage>begin(OPEN, >> AfterMatchSkipStrategy.skipToNext()) >> .where(new SimpleCondition <ServerAwareMessage>() { >> @Override >> public boolean filter(ServerAwareMessage value) >> throws Exception { >> return value instanceof OrderRequest >> } ) >> .followedBy("OrderActivated ") >> .where(new IterativeCondition<ServerAwareMessage>() { >> @Override >> public boolean filter(ServerAwareMessage value, >> Context<ServerAwareMessage> ctx) throws Exception { >> if(value.getMessage() instanceof OrderActivated >> ) { >> var msg = ( OrderActivated ) >> value.getMessage(); >> var list = >> StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false) >> .filter(i -> i.getMessage() instanceof >> OrderRequest ) >> .collect(Collectors.toList()); >> return list.stream().allMatch(i -> (( >> OrderRequest )i.getMessage()).getSymbol().equals(msg.getSymbol()) && >> (( >> OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType())); >> >> } >> return false; >> } >> }) >> .followedBy("DealPerformed") >> .where(new IterativeCondition<ServerAwareMessage>() { >> @Override >> public boolean filter(ServerAwareMessage value, >> Context<ServerAwareMessage> ctx) throws Exception { >> if (value.getMessage() instanceof DealPerformed >> ) { >> var order = ( DealPerformed ) >> value.getMessage(); >> var list = >> StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated >> ").spliterator(), false) >> .filter(i -> i.getMessage() instanceof >> OrderPerformed) >> .collect(Collectors.toList()); >> >> return list.stream().allMatch(i -> (( >> OrderActivated )i.getMessage()).getSymbol().equals(msg.getSymbol()) && >> (( OrderActivated >> )i.getMessage()).getOrderType().equals(msg.getOrderType())); >> >> } >> return false; >> } >> }) >> .followedBy("OrdeFilled") >> .where(new IterativeCondition<ServerAwareMessage>() { >> @Override >> public boolean filter(ServerAwareMessage value, >> Context<ServerAwareMessage> ctx) throws Exception { >> if (value.getMessage() instanceof OrderFilled ) >> { >> var order = ( OrderFilled ) >> value.getMessage(); >> var list = >> StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed" >> ).spliterator(), false) >> .filter(i -> i.getMessage() instanceof >> OrderActivationRequestNewDue) >> .collect(Collectors.toList()); >> >> return list.stream().allMatch(i -> (( >> DealPerformed )i.getMessage()).getSymbol().equals(order.getSymbol()) && >> (( DealPerformed >> )i.getMessage()).getOrderType().equals(order.getOrderType()); >> >> } >> return false; >> } >> }) >> >> In this case I can not group by unfortunately. so I may a receive a >> packet { OrderRequest(1), OrderActivated (1) , OrderRequest (2), >> DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2), >> OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc. >> For me it is crucial to match all the event sequence (1) (2), etc. and >> there is a case where the sequence of the Messages is incomplete , that >> means that an event does not get inserted into the pattern. >> The above pattern sequence unfortunately does not work properly. Any >> suggestions? >> >> BR, >> Isidoros >> >> Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou < >> akis3...@gmail.com> έγραψε: >> >>> I am using Processing time characteristic. >>> >>> DataStream<Model> inputStream = env.fromElements( >>> Model.of(1, "A", "US"), >>> Model.of(2, "B", "US"), >>> Model.of(3, "C", "US"), >>> Model.of(4, "A", "AU"), >>> Model.of(5, "B", "AU"), >>> Model.of(6, "C", "AU"), >>> //Model.of(7, "D", "US"), >>> Model.of(8, "D", "AU"), >>> Model.of(9, "A", "GB"), >>> Model.of(10, "B", "GB"), >>> Model.of(13, "D", "GB"), >>> Model.of(11, "C", "GB"), >>> Model.of(12, "D", "GB") >>> >>> in the above inputStream the Model.of(7, "D", "US") is not supplied to the >>> pattern sequence. Nothing special at all >>> with this but I wanted to simulate the case that an event that fulfills the >>> pattern sequence might be missing >>> which it happens in my case. >>> >>> BR, >>> Isidoros >>> >>> >>> >>> >>> Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards < >>> austin.caw...@gmail.com> έγραψε: >>> >>>> Thanks for the update, the requirements make sense. >>>> >>>> Some follow up questions: >>>> * What time characteristic are you using? Processing or Event? >>>> * Can you describe a bit more what you mean by "input like the one I >>>> have commented bellow"? What is special about the one you have commented? >>>> >>>> Best, >>>> Austin >>>> >>>> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> >>>>> ---------- Forwarded message --------- >>>>> Από: Isidoros Ioannou <akis3...@gmail.com> >>>>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ. >>>>> Subject: Re: IterativeCondition instead of SimpleCondition not >>>>> matching pattern >>>>> To: Austin Cawley-Edwards <austin.caw...@gmail.com> >>>>> >>>>> >>>>> Hi Austin, >>>>> thank you for your answer and I really appreciate your willingness to >>>>> help. >>>>> >>>>> Actually the desired output is the one below >>>>> >>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>>>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A', >>>>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], >>>>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, >>>>> text='D', >>>>> symbol='GB'}]} >>>>> I would like only to generate sequences of Models that have the same >>>>> symbol. I noticed that if an event does not come as input >>>>> like the one I have commented bellow, it breaks all the pattern match >>>>> and the desired output is never produced >>>>> >>>>> DataStream<Model> inputStream = env.fromElements( >>>>> Model.of(1, "A", "US"), >>>>> Model.of(2, "B", "US"), >>>>> Model.of(3, "C", "US"), >>>>> Model.of(4, "A", "AU"), >>>>> Model.of(5, "B", "AU"), >>>>> Model.of(6, "C", "AU"), >>>>> //Model.of(7, "D", "US"), >>>>> Model.of(8, "D", "AU"), >>>>> Model.of(9, "A", "GB"), >>>>> Model.of(10, "B", "GB"), >>>>> Model.of(13, "D", "GB"), >>>>> Model.of(11, "C", "GB"), >>>>> Model.of(12, "D", "GB") >>>>> >>>>> Kind Regards, >>>>> Isidoros >>>>> >>>>> >>>>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards < >>>>> austin.caw...@gmail.com> έγραψε: >>>>> >>>>>> Hi Isidoros, >>>>>> >>>>>> Thanks for reaching out to the mailing list. I haven't worked with >>>>>> the CEP library in a long time but can try to help. I'm having a little >>>>>> trouble understanding the desired output + rules. Can you mock up the >>>>>> desired output like you have for the fulfilled pattern sequence? >>>>>> >>>>>> Best, >>>>>> Austin >>>>>> >>>>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> I face an issue when try to match some elements in a Pattern >>>>>>> sequence. Flink 1.11.1 version. Here is my case: >>>>>>> >>>>>>> final StreamExecutionEnvironment env = >>>>>>> EnvironmentProvider.getEnvironment(); >>>>>>> DataStream<Model> inputStream = env.fromElements( >>>>>>> Model.of(1, "A", "US"), >>>>>>> Model.of(2, "B", "US"), >>>>>>> Model.of(3, "C", "US"), >>>>>>> Model.of(4, "A", "AU"), >>>>>>> Model.of(5, "B", "AU"), >>>>>>> Model.of(6, "C", "AU"), >>>>>>> //Model.of(7, "D"), >>>>>>> Model.of(8, "D", "AU"), >>>>>>> Model.of(9, "A", "GB"), >>>>>>> Model.of(10, "B", "GB"), >>>>>>> Model.of(13, "D", "GB"), >>>>>>> Model.of(11, "C", "GB"), >>>>>>> Model.of(12, "D", "GB") >>>>>>> >>>>>>> >>>>>>> >>>>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) >>>>>>> .forceNonParallel(); >>>>>>> >>>>>>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start", >>>>>>> AfterMatchSkipStrategy.skipToNext()) >>>>>>> .where(new IterativeCondition<Model>() { >>>>>>> @Override >>>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>>> throws Exception { >>>>>>> return value.getText().equalsIgnoreCase("A"); >>>>>>> } >>>>>>> }).followedBy("second") >>>>>>> .where(new IterativeCondition<Model>() { >>>>>>> @Override >>>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>>> throws Exception { >>>>>>> >>>>>>> return value.getText().equalsIgnoreCase("B"); >>>>>>> } >>>>>>> }).followedBy("third") >>>>>>> .where(new IterativeCondition<Model>() { >>>>>>> @Override >>>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>>> throws Exception { >>>>>>> >>>>>>> return value.getText().equalsIgnoreCase("C"); >>>>>>> } >>>>>>> }).followedBy("fourth") >>>>>>> .where(new IterativeCondition<Model>() { >>>>>>> @Override >>>>>>> public boolean filter(Model value, Context<Model> ctx) >>>>>>> throws Exception { >>>>>>> var list = >>>>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), >>>>>>> false).collect(Collectors.toList()); >>>>>>> var val = >>>>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); >>>>>>> return value.getText().equalsIgnoreCase("D") && val; >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> >>>>>>> PatternStream<Model> marketOpenPatternStream = >>>>>>> CEP.<Model>pattern(inputStream, pattern); >>>>>>> >>>>>>> SingleOutputStreamOperator<List<Model>> marketOpenOutput = >>>>>>> marketOpenPatternStream >>>>>>> .process(new PatternProcessFunction<Model, >>>>>>> List<Model>>() { >>>>>>> @Override >>>>>>> public void processMatch(Map<String, >>>>>>> List<Model>> match, Context ctx, Collector<List<Model>> out) throws >>>>>>> Exception { >>>>>>> System.out.println(match); >>>>>>> out.collect(new ArrayList(match.values())); >>>>>>> } >>>>>>> }) >>>>>>> >>>>>>> What I am trying to succeed is to match only patterns that have the >>>>>>> same symbol. If I use SimpleCondition with checks only about the text >>>>>>> of the Model(A, B,C..) without the symbol check in the last pattern, >>>>>>> the pattern sequence is fulfilled and I get the following output: >>>>>>> >>>>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, >>>>>>> text='B', >>>>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], >>>>>>> fourth=[Model{id=8,text='D',symbol='AU'}]} >>>>>>> >>>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, >>>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], >>>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]} >>>>>>> >>>>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, >>>>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], >>>>>>> fourth=[Model{id=12, text='D', symbol='GB'}]} >>>>>>> >>>>>>> However I want to avoid the match of elements with id= >>>>>>> 1(A),2(B),3(C) with the element with id = 8(D). For this reason I put >>>>>>> the >>>>>>> symbol check with the event matched in the previous pattern in the last >>>>>>> condition so I dont get match since they do not have the same symbol. >>>>>>> But >>>>>>> after applying the condition, now I do not get any output. none of the >>>>>>> elements match the pattern. What I am missing? Could someone help? >>>>>>> >>>>>>>