Hi Seth,
thank you for your answer.
In this case you are right and it would solve my problem. but actually my
case is a bit more complex and my mistake I wanted to provide a simple
example.
The actual case is,
I have DataStream< ServerAwareMessage > inputStream as a source ,
Message is just an interface. The events can be of certain subtypes
1) class OrderRequest implements ServerAwareMessage {
String symbol
String orderType
}
2) class OrderActivated implements ServerAwareMessage {
String symbol
String orderType
long orderId
}
3) class DealPerformed implements ServerAwareMessage {
String symbol
String orderType
}
4) class OrderFilled implements ServerAwareMessage {
String symbol
String orderType
long orderId
}
And here is the pattern sequence.
Pattern.<ServerAwareMessage>begin(OPEN, AfterMatchSkipStrategy.skipToNext())
.where(new SimpleCondition <ServerAwareMessage>() {
@Override
public boolean filter(ServerAwareMessage value) throws
Exception {
return value instanceof OrderRequest
} )
.followedBy("OrderActivated ")
.where(new IterativeCondition<ServerAwareMessage>() {
@Override
public boolean filter(ServerAwareMessage value,
Context<ServerAwareMessage> ctx) throws Exception {
if(value.getMessage() instanceof OrderActivated )
{
var msg = ( OrderActivated ) value.getMessage();
var list =
StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
.filter(i -> i.getMessage() instanceof
OrderRequest )
.collect(Collectors.toList());
return list.stream().allMatch(i -> ((
OrderRequest )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
((
OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));
}
return false;
}
})
.followedBy("DealPerformed")
.where(new IterativeCondition<ServerAwareMessage>() {
@Override
public boolean filter(ServerAwareMessage value,
Context<ServerAwareMessage> ctx) throws Exception {
if (value.getMessage() instanceof DealPerformed )
{
var order = ( DealPerformed )
value.getMessage();
var list =
StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
").spliterator(), false)
.filter(i -> i.getMessage() instanceof
OrderPerformed)
.collect(Collectors.toList());
return list.stream().allMatch(i -> ((
OrderActivated )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
(( OrderActivated
)i.getMessage()).getOrderType().equals(msg.getOrderType()));
}
return false;
}
})
.followedBy("OrdeFilled")
.where(new IterativeCondition<ServerAwareMessage>() {
@Override
public boolean filter(ServerAwareMessage value,
Context<ServerAwareMessage> ctx) throws Exception {
if (value.getMessage() instanceof OrderFilled ) {
var order = ( OrderFilled ) value.getMessage();
var list =
StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"
).spliterator(), false)
.filter(i -> i.getMessage() instanceof
OrderActivationRequestNewDue)
.collect(Collectors.toList());
return list.stream().allMatch(i -> ((
DealPerformed )i.getMessage()).getSymbol().equals(order.getSymbol()) &&
(( DealPerformed
)i.getMessage()).getOrderType().equals(order.getOrderType());
}
return false;
}
})
In this case I can not group by unfortunately. so I may a receive a packet
{ OrderRequest(1), OrderActivated (1) , OrderRequest (2), DealPerformed(1)
, OrderActivated(2), OrderRequest(3), DealPerformed(2), OrderFilled(1),
OrderFilled(2), OrderActivated(3)} and etc.
For me it is crucial to match all the event sequence (1) (2), etc. and
there is a case where the sequence of the Messages is incomplete , that
means that an event does not get inserted into the pattern.
The above pattern sequence unfortunately does not work properly. Any
suggestions?
BR,
Isidoros
Στις Πέμ, 4 Νοε 2021 στις 11:27 μ.μ., ο/η Isidoros Ioannou <
[email protected]> έγραψε:
> I am using Processing time characteristic.
>
> DataStream<Model> inputStream = env.fromElements(
> Model.of(1, "A", "US"),
> Model.of(2, "B", "US"),
> Model.of(3, "C", "US"),
> Model.of(4, "A", "AU"),
> Model.of(5, "B", "AU"),
> Model.of(6, "C", "AU"),
> //Model.of(7, "D", "US"),
> Model.of(8, "D", "AU"),
> Model.of(9, "A", "GB"),
> Model.of(10, "B", "GB"),
> Model.of(13, "D", "GB"),
> Model.of(11, "C", "GB"),
> Model.of(12, "D", "GB")
>
> in the above inputStream the Model.of(7, "D", "US") is not supplied to the
> pattern sequence. Nothing special at all
> with this but I wanted to simulate the case that an event that fulfills the
> pattern sequence might be missing
> which it happens in my case.
>
> BR,
> Isidoros
>
>
>
>
> Στις Πέμ, 4 Νοε 2021 στις 11:01 μ.μ., ο/η Austin Cawley-Edwards <
> [email protected]> έγραψε:
>
>> Thanks for the update, the requirements make sense.
>>
>> Some follow up questions:
>> * What time characteristic are you using? Processing or Event?
>> * Can you describe a bit more what you mean by "input like the one I have
>> commented bellow"? What is special about the one you have commented?
>>
>> Best,
>> Austin
>>
>> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou <[email protected]>
>> wrote:
>>
>>>
>>>
>>> ---------- Forwarded message ---------
>>> Από: Isidoros Ioannou <[email protected]>
>>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>>> Subject: Re: IterativeCondition instead of SimpleCondition not matching
>>> pattern
>>> To: Austin Cawley-Edwards <[email protected]>
>>>
>>>
>>> Hi Austin,
>>> thank you for your answer and I really appreciate your willingness to
>>> help.
>>>
>>> Actually the desired output is the one below
>>>
>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
>>> symbol='GB'}]}
>>> I would like only to generate sequences of Models that have the same
>>> symbol. I noticed that if an event does not come as input
>>> like the one I have commented bellow, it breaks all the pattern match
>>> and the desired output is never produced
>>>
>>> DataStream<Model> inputStream = env.fromElements(
>>> Model.of(1, "A", "US"),
>>> Model.of(2, "B", "US"),
>>> Model.of(3, "C", "US"),
>>> Model.of(4, "A", "AU"),
>>> Model.of(5, "B", "AU"),
>>> Model.of(6, "C", "AU"),
>>> //Model.of(7, "D", "US"),
>>> Model.of(8, "D", "AU"),
>>> Model.of(9, "A", "GB"),
>>> Model.of(10, "B", "GB"),
>>> Model.of(13, "D", "GB"),
>>> Model.of(11, "C", "GB"),
>>> Model.of(12, "D", "GB")
>>>
>>> Kind Regards,
>>> Isidoros
>>>
>>>
>>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>>> [email protected]> έγραψε:
>>>
>>>> Hi Isidoros,
>>>>
>>>> Thanks for reaching out to the mailing list. I haven't worked with the
>>>> CEP library in a long time but can try to help. I'm having a little trouble
>>>> understanding the desired output + rules. Can you mock up the desired
>>>> output like you have for the fulfilled pattern sequence?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> I face an issue when try to match some elements in a Pattern sequence.
>>>>> Flink 1.11.1 version. Here is my case:
>>>>>
>>>>> final StreamExecutionEnvironment env =
>>>>> EnvironmentProvider.getEnvironment();
>>>>> DataStream<Model> inputStream = env.fromElements(
>>>>> Model.of(1, "A", "US"),
>>>>> Model.of(2, "B", "US"),
>>>>> Model.of(3, "C", "US"),
>>>>> Model.of(4, "A", "AU"),
>>>>> Model.of(5, "B", "AU"),
>>>>> Model.of(6, "C", "AU"),
>>>>> //Model.of(7, "D"),
>>>>> Model.of(8, "D", "AU"),
>>>>> Model.of(9, "A", "GB"),
>>>>> Model.of(10, "B", "GB"),
>>>>> Model.of(13, "D", "GB"),
>>>>> Model.of(11, "C", "GB"),
>>>>> Model.of(12, "D", "GB")
>>>>>
>>>>>
>>>>>
>>>>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>>>>> .forceNonParallel();
>>>>>
>>>>> Pattern<Model, Model> pattern = Pattern.<Model>begin("start",
>>>>> AfterMatchSkipStrategy.skipToNext())
>>>>> .where(new IterativeCondition<Model>() {
>>>>> @Override
>>>>> public boolean filter(Model value, Context<Model> ctx)
>>>>> throws Exception {
>>>>> return value.getText().equalsIgnoreCase("A");
>>>>> }
>>>>> }).followedBy("second")
>>>>> .where(new IterativeCondition<Model>() {
>>>>> @Override
>>>>> public boolean filter(Model value, Context<Model> ctx)
>>>>> throws Exception {
>>>>>
>>>>> return value.getText().equalsIgnoreCase("B");
>>>>> }
>>>>> }).followedBy("third")
>>>>> .where(new IterativeCondition<Model>() {
>>>>> @Override
>>>>> public boolean filter(Model value, Context<Model> ctx)
>>>>> throws Exception {
>>>>>
>>>>> return value.getText().equalsIgnoreCase("C");
>>>>> }
>>>>> }).followedBy("fourth")
>>>>> .where(new IterativeCondition<Model>() {
>>>>> @Override
>>>>> public boolean filter(Model value, Context<Model> ctx)
>>>>> throws Exception {
>>>>> var list =
>>>>> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(),
>>>>> false).collect(Collectors.toList());
>>>>> var val =
>>>>> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>>>>> return value.getText().equalsIgnoreCase("D") && val;
>>>>> }
>>>>> });
>>>>>
>>>>>
>>>>> PatternStream<Model> marketOpenPatternStream =
>>>>> CEP.<Model>pattern(inputStream, pattern);
>>>>>
>>>>> SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>>>>> marketOpenPatternStream
>>>>> .process(new PatternProcessFunction<Model,
>>>>> List<Model>>() {
>>>>> @Override
>>>>> public void processMatch(Map<String, List<Model>>
>>>>> match, Context ctx, Collector<List<Model>> out) throws Exception {
>>>>> System.out.println(match);
>>>>> out.collect(new ArrayList(match.values()));
>>>>> }
>>>>> })
>>>>>
>>>>> What I am trying to succeed is to match only patterns that have the same
>>>>> symbol. If I use SimpleCondition with checks only about the text of the
>>>>> Model(A, B,C..) without the symbol check in the last pattern, the pattern
>>>>> sequence is fulfilled and I get the following output:
>>>>>
>>>>> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
>>>>> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}],
>>>>> fourth=[Model{id=8,text='D',symbol='AU'}]}
>>>>>
>>>>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>>>>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>>>>> fourth=[Model{id=8, text='D', symbol='AU'}]}
>>>>>
>>>>> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10,
>>>>> text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}],
>>>>> fourth=[Model{id=12, text='D', symbol='GB'}]}
>>>>>
>>>>> However I want to avoid the match of elements with id= 1(A),2(B),3(C)
>>>>> with the element with id = 8(D). For this reason I put the symbol check
>>>>> with the event matched in the previous pattern in the last condition so I
>>>>> dont get match since they do not have the same symbol. But after applying
>>>>> the condition, now I do not get any output. none of the elements match the
>>>>> pattern. What I am missing? Could someone help?
>>>>>
>>>>>