---------- Forwarded message ---------
Από: Isidoros Ioannou <[email protected]>
Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
Subject: Re: IterativeCondition instead of SimpleCondition not matching
pattern
To: Austin Cawley-Edwards <[email protected]>
Hi Austin,
thank you for your answer and I really appreciate your willingness to help.
Actually the desired output is the one below
{start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B',
symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
symbol='GB'}]}
I would like only to generate sequences of Models that have the same
symbol. I noticed that if an event does not come as input
like the one I have commented bellow, it breaks all the pattern match and
the desired output is never produced
DataStream<Model> inputStream = env.fromElements(
Model.of(1, "A", "US"),
Model.of(2, "B", "US"),
Model.of(3, "C", "US"),
Model.of(4, "A", "AU"),
Model.of(5, "B", "AU"),
Model.of(6, "C", "AU"),
//Model.of(7, "D", "US"),
Model.of(8, "D", "AU"),
Model.of(9, "A", "GB"),
Model.of(10, "B", "GB"),
Model.of(13, "D", "GB"),
Model.of(11, "C", "GB"),
Model.of(12, "D", "GB")
Kind Regards,
Isidoros
Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
[email protected]> έγραψε:
> Hi Isidoros,
>
> Thanks for reaching out to the mailing list. I haven't worked with the CEP
> library in a long time but can try to help. I'm having a little trouble
> understanding the desired output + rules. Can you mock up the desired
> output like you have for the fulfilled pattern sequence?
>
> Best,
> Austin
>
> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <[email protected]>
> wrote:
>
>>
>> I face an issue when try to match some elements in a Pattern sequence.
>> Flink 1.11.1 version. Here is my case:
>>
>> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
>> DataStream<Model> inputStream = env.fromElements(
>> Model.of(1, "A", "US"),
>> Model.of(2, "B", "US"),
>> Model.of(3, "C", "US"),
>> Model.of(4, "A", "AU"),
>> Model.of(5, "B", "AU"),
>> Model.of(6, "C", "AU"),
>> //Model.of(7, "D"),
>> Model.of(8, "D", "AU"),
>> Model.of(9, "A", "GB"),
>> Model.of(10, "B", "GB"),
>> Model.of(13, "D", "GB"),
>> Model.of(11, "C", "GB"),
>> Model.of(12, "D", "GB")
>>
>>
>>
>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>> .forceNonParallel();
>>
>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start",
>> AfterMatchSkipStrategy.skipToNext())
>> .where(new IterativeCondition<Model>() {
>> @Override
>> public boolean filter(Model value, Context<Model> ctx)
>> throws Exception {
>> return value.getText().equalsIgnoreCase("A");
>> }
>> }).followedBy("second")
>> .where(new IterativeCondition<Model>() {
>> @Override
>> public boolean filter(Model value, Context<Model> ctx)
>> throws Exception {
>>
>> return value.getText().equalsIgnoreCase("B");
>> }
>> }).followedBy("third")
>> .where(new IterativeCondition<Model>() {
>> @Override
>> public boolean filter(Model value, Context<Model> ctx)
>> throws Exception {
>>
>> return value.getText().equalsIgnoreCase("C");
>> }
>> }).followedBy("fourth")
>> .where(new IterativeCondition<Model>() {
>> @Override
>> public boolean filter(Model value, Context<Model> ctx)
>> throws Exception {
>> var list =
>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(),
>> false).collect(Collectors.toList());
>> var val =
>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>> return value.getText().equalsIgnoreCase("D") && val;
>> }
>> });
>>
>>
>> PatternStream<Model> marketOpenPatternStream =
>> CEP.<Model>pattern(inputStream, pattern);
>>
>> SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>> marketOpenPatternStream
>> .process(new PatternProcessFunction<Model,
>> List<Model>>() {
>> @Override
>> public void processMatch(Map<String, List<Model>>
>> match, Context ctx, Collector<List<Model>> out) throws Exception {
>> System.out.println(match);
>> out.collect(new ArrayList(match.values()));
>> }
>> })
>>
>> What I am trying to succeed is to match only patterns that have the same
>> symbol. If I use SimpleCondition with checks only about the text of the
>> Model(A, B,C..) without the symbol check in the last pattern, the pattern
>> sequence is fulfilled and I get the following output:
>>
>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}],
>> fourth=[Model{id=8,text='D',symbol='AU'}]}
>>
>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B',
>> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>> fourth=[Model{id=8, text='D', symbol='AU'}]}
>>
>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B',
>> symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}],
>> fourth=[Model{id=12, text='D', symbol='GB'}]}
>>
>> However I want to avoid the match of elements with id= 1(A),2(B),3(C)
>> with the element with id = 8(D). For this reason I put the symbol check
>> with the event matched in the previous pattern in the last condition so I
>> dont get match since they do not have the same symbol. But after applying
>> the condition, now I do not get any output. none of the elements match the
>> pattern. What I am missing? Could someone help?
>>
>>