In general I would strongly encourage you to find a way to `key` your
stream, it will make everything much simpler.

On Thu, Nov 4, 2021 at 6:05 PM Seth Wiesman <sjwies...@gmail.com> wrote:

> Why not?
>
> All those classes have a Symbol attribute, why can't you use that to key
> the stream?
>
> On Thu, Nov 4, 2021 at 5:51 PM Isidoros Ioannou <akis3...@gmail.com>
> wrote:
>
>> Hi Seth,
>>
>> thank you for your answer.
>> In this case you are right and it would solve my problem. but actually my
>> case is a bit more complex and my mistake I wanted to provide a simple
>> example.
>>
>> The actual case is,
>>
>> I have DataStream< ServerAwareMessage  > inputStream as a source ,
>> Message is just an interface. The events can be of certain subtypes
>>
>> 1) class OrderRequest implements  ServerAwareMessage   {
>>        String symbol
>>        String orderType
>> }
>>
>> 2) class OrderActivated implements  ServerAwareMessage   {
>>        String symbol
>>        String orderType
>>        long orderId
>> }
>>
>> 3) class DealPerformed implements  ServerAwareMessage   {
>>        String symbol
>>        String orderType
>> }
>>
>> 4) class OrderFilled implements  ServerAwareMessage   {
>>        String symbol
>>        String orderType
>>        long orderId
>> }
>>
>> And here is the pattern sequence.
>>
>> Pattern.<ServerAwareMessage>begin(OPEN,
>> AfterMatchSkipStrategy.skipToNext())
>>                 .where(new SimpleCondition <ServerAwareMessage>() {
>>                     @Override
>>                     public boolean filter(ServerAwareMessage value)
>> throws Exception {
>>                          return value instanceof OrderRequest
>>                      }  )
>>                 .followedBy("OrderActivated ")
>>                 .where(new IterativeCondition<ServerAwareMessage>() {
>>                     @Override
>>                     public boolean filter(ServerAwareMessage value,
>> Context<ServerAwareMessage> ctx) throws Exception {
>>                         if(value.getMessage() instanceof  OrderActivated
>> ) {
>>                             var msg = ( OrderActivated )
>> value.getMessage();
>>                             var list =
>> StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
>>                                 .filter(i -> i.getMessage() instanceof
>> OrderRequest  )
>>                                 .collect(Collectors.toList());
>>                            return  list.stream().allMatch(i -> ((
>> OrderRequest  )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
>>                                 ((
>> OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));
>>
>>                         }
>>                         return false;
>>                     }
>>                 })
>>                 .followedBy("DealPerformed")
>>                 .where(new IterativeCondition<ServerAwareMessage>() {
>>                     @Override
>>                     public boolean filter(ServerAwareMessage value,
>> Context<ServerAwareMessage> ctx) throws Exception {
>>                         if (value.getMessage() instanceof  DealPerformed
>> ) {
>>                             var order = ( DealPerformed  )
>> value.getMessage();
>>                             var list =
>> StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
>> ").spliterator(), false)
>>                                 .filter(i -> i.getMessage() instanceof
>> OrderPerformed)
>>                                 .collect(Collectors.toList());
>>
>>                             return list.stream().allMatch(i -> ((
>> OrderActivated   )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
>>                                 ((  OrderActivated
>> )i.getMessage()).getOrderType().equals(msg.getOrderType()));
>>
>>                         }
>>                         return false;
>>                     }
>>                 })
>>                 .followedBy("OrdeFilled")
>>                 .where(new IterativeCondition<ServerAwareMessage>() {
>>                     @Override
>>                     public boolean filter(ServerAwareMessage value,
>> Context<ServerAwareMessage> ctx) throws Exception {
>>                         if (value.getMessage() instanceof  OrderFilled  )
>> {
>>                             var order = ( OrderFilled  )
>> value.getMessage();
>>                             var list =
>> StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"
>> ).spliterator(), false)
>>                                 .filter(i -> i.getMessage() instanceof
>> OrderActivationRequestNewDue)
>>                                 .collect(Collectors.toList());
>>
>>                             return list.stream().allMatch(i -> ((
>> DealPerformed  )i.getMessage()).getSymbol().equals(order.getSymbol()) &&
>>                                 (( DealPerformed
>> )i.getMessage()).getOrderType().equals(order.getOrderType());
>>
>>                         }
>>                         return false;
>>                     }
>>                 })
>>
>> In this case I can not group by unfortunately. so I may a receive a
>> packet  { OrderRequest(1), OrderActivated (1) , OrderRequest (2),
>> DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2),
>> OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc.
>> For me it is crucial to match all the event sequence (1) (2), etc. and
>> there is a case where the sequence of the Messages is incomplete , that
>> means that an event does not get inserted into the pattern.
>> The above pattern sequence unfortunately does not work properly. Any
>> suggestions?
>>
>> BR,
>> Isidoros
>>
>> Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou <
>> akis3...@gmail.com> έγραψε:
>>
>>> I am using Processing time characteristic.
>>>
>>> DataStream<Model> inputStream = env.fromElements(
>>>             Model.of(1, "A", "US"),
>>>             Model.of(2, "B", "US"),
>>>             Model.of(3, "C", "US"),
>>>             Model.of(4, "A", "AU"),
>>>             Model.of(5, "B", "AU"),
>>>             Model.of(6, "C", "AU"),
>>>           //Model.of(7, "D", "US"),
>>>             Model.of(8, "D", "AU"),
>>>             Model.of(9, "A", "GB"),
>>>             Model.of(10, "B", "GB"),
>>>             Model.of(13, "D", "GB"),
>>>             Model.of(11, "C", "GB"),
>>>             Model.of(12, "D", "GB")
>>>
>>> in the above inputStream the Model.of(7, "D", "US") is not supplied to the 
>>> pattern sequence. Nothing special at all
>>> with this but I wanted to simulate the case that an event that fulfills the 
>>> pattern sequence might be missing
>>> which it happens in my case.
>>>
>>> BR,
>>> Isidoros
>>>
>>>
>>>
>>>
>>> Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> έγραψε:
>>>
>>>> Thanks for the update, the requirements make sense.
>>>>
>>>> Some follow up questions:
>>>> * What time characteristic are you using? Processing or Event?
>>>> * Can you describe a bit more what you mean by "input like the one I
>>>> have commented bellow"? What is special about the one you have commented?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <akis3...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> Από: Isidoros Ioannou <akis3...@gmail.com>
>>>>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>>>>> Subject: Re: IterativeCondition instead of SimpleCondition not
>>>>> matching pattern
>>>>> To: Austin Cawley-Edwards <austin.caw...@gmail.com>
>>>>>
>>>>>
>>>>> Hi Austin,
>>>>> thank you for your answer and I really appreciate your willingness to
>>>>> help.
>>>>>
>>>>> Actually the desired output is the one below
>>>>>
>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>>>>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>>>>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, 
>>>>> text='D',
>>>>> symbol='GB'}]}
>>>>> I would like only to generate sequences of Models that have the same
>>>>> symbol. I noticed that if an event does not come as input
>>>>> like the one I have commented bellow, it breaks all the pattern match
>>>>> and the desired output is never produced
>>>>>
>>>>> DataStream<Model> inputStream = env.fromElements(
>>>>>             Model.of(1, "A", "US"),
>>>>>             Model.of(2, "B", "US"),
>>>>>             Model.of(3, "C", "US"),
>>>>>             Model.of(4, "A", "AU"),
>>>>>             Model.of(5, "B", "AU"),
>>>>>             Model.of(6, "C", "AU"),
>>>>>           //Model.of(7, "D", "US"),
>>>>>             Model.of(8, "D", "AU"),
>>>>>             Model.of(9, "A", "GB"),
>>>>>             Model.of(10, "B", "GB"),
>>>>>             Model.of(13, "D", "GB"),
>>>>>             Model.of(11, "C", "GB"),
>>>>>             Model.of(12, "D", "GB")
>>>>>
>>>>> Kind Regards,
>>>>> Isidoros
>>>>>
>>>>>
>>>>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> έγραψε:
>>>>>
>>>>>> Hi Isidoros,
>>>>>>
>>>>>> Thanks for reaching out to the mailing list. I haven't worked with
>>>>>> the CEP library in a long time but can try to help. I'm having a little
>>>>>> trouble understanding the desired output + rules. Can you mock up the
>>>>>> desired output like you have for the fulfilled pattern sequence?
>>>>>>
>>>>>> Best,
>>>>>> Austin
>>>>>>
>>>>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> I face an issue when try to match some elements in a Pattern
>>>>>>> sequence. Flink 1.11.1 version. Here is my case:
>>>>>>>
>>>>>>> final StreamExecutionEnvironment env = 
>>>>>>> EnvironmentProvider.getEnvironment();
>>>>>>> DataStream<Model> inputStream = env.fromElements(
>>>>>>>             Model.of(1, "A", "US"),
>>>>>>>             Model.of(2, "B", "US"),
>>>>>>>             Model.of(3, "C", "US"),
>>>>>>>             Model.of(4, "A", "AU"),
>>>>>>>             Model.of(5, "B", "AU"),
>>>>>>>             Model.of(6, "C", "AU"),
>>>>>>>           //Model.of(7, "D"),
>>>>>>>             Model.of(8, "D", "AU"),
>>>>>>>             Model.of(9, "A", "GB"),
>>>>>>>             Model.of(10, "B", "GB"),
>>>>>>>             Model.of(13, "D", "GB"),
>>>>>>>             Model.of(11, "C", "GB"),
>>>>>>>             Model.of(12, "D", "GB")
>>>>>>>
>>>>>>>
>>>>>>>     
>>>>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>>>>>>>             .forceNonParallel();
>>>>>>>
>>>>>>>     Pattern<Model, Model> pattern = Pattern.<Model>begin("start", 
>>>>>>> AfterMatchSkipStrategy.skipToNext())
>>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>>                 @Override
>>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>>> throws Exception {
>>>>>>>                     return value.getText().equalsIgnoreCase("A");
>>>>>>>                 }
>>>>>>>             }).followedBy("second")
>>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>>                 @Override
>>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>>> throws Exception {
>>>>>>>
>>>>>>>                     return value.getText().equalsIgnoreCase("B");
>>>>>>>                 }
>>>>>>>             }).followedBy("third")
>>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>>                 @Override
>>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>>> throws Exception {
>>>>>>>
>>>>>>>                     return value.getText().equalsIgnoreCase("C");
>>>>>>>                 }
>>>>>>>             }).followedBy("fourth")
>>>>>>>             .where(new IterativeCondition<Model>() {
>>>>>>>                 @Override
>>>>>>>                 public boolean filter(Model value, Context<Model> ctx) 
>>>>>>> throws Exception {
>>>>>>>                     var  list = 
>>>>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), 
>>>>>>> false).collect(Collectors.toList());
>>>>>>>                     var val = 
>>>>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>>>>>>>                     return value.getText().equalsIgnoreCase("D") && val;
>>>>>>>                 }
>>>>>>>             });
>>>>>>>
>>>>>>>
>>>>>>>             PatternStream<Model> marketOpenPatternStream = 
>>>>>>> CEP.<Model>pattern(inputStream, pattern);
>>>>>>>
>>>>>>>              SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>>>>>>>             marketOpenPatternStream
>>>>>>>                     .process(new PatternProcessFunction<Model, 
>>>>>>> List<Model>>() {
>>>>>>>                         @Override
>>>>>>>                         public void processMatch(Map<String, 
>>>>>>> List<Model>> match, Context ctx, Collector<List<Model>> out) throws 
>>>>>>> Exception {
>>>>>>>                             System.out.println(match);
>>>>>>>                             out.collect(new ArrayList(match.values()));
>>>>>>>                         }
>>>>>>>                     })
>>>>>>>
>>>>>>> What I am trying to succeed is to match only patterns that have the 
>>>>>>> same symbol. If I use SimpleCondition with checks only about the text 
>>>>>>> of the Model(A, B,C..) without the symbol check in the last pattern, 
>>>>>>> the pattern sequence is fulfilled and I get the following output:
>>>>>>>
>>>>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, 
>>>>>>> text='B',
>>>>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], 
>>>>>>> fourth=[Model{id=8,text='D',symbol='AU'}]}
>>>>>>>
>>>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, 
>>>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], 
>>>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]}
>>>>>>>
>>>>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, 
>>>>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], 
>>>>>>> fourth=[Model{id=12, text='D', symbol='GB'}]}
>>>>>>>
>>>>>>> However I want to avoid the match of elements with id=
>>>>>>> 1(A),2(B),3(C) with the element with id = 8(D). For this reason I put 
>>>>>>> the
>>>>>>> symbol check with the event matched in the previous pattern in the last
>>>>>>> condition so I dont get match since they do not have the same symbol. 
>>>>>>> But
>>>>>>> after applying the condition, now I do not get any output. none of the
>>>>>>> elements match the pattern. What I am missing? Could someone help?
>>>>>>>
>>>>>>>

Reply via email to