I face an issue when try to match some elements in a Pattern sequence.
Flink 1.11.1 version. Here is my case:
final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
DataStream<Model> inputStream = env.fromElements(
Model.of(1, "A", "US"),
Model.of(2, "B", "US"),
Model.of(3, "C", "US"),
Model.of(4, "A", "AU"),
Model.of(5, "B", "AU"),
Model.of(6, "C", "AU"),
//Model.of(7, "D"),
Model.of(8, "D", "AU"),
Model.of(9, "A", "GB"),
Model.of(10, "B", "GB"),
Model.of(13, "D", "GB"),
Model.of(11, "C", "GB"),
Model.of(12, "D", "GB")
).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
.forceNonParallel();
Pattern<Model, Model> pattern = Pattern.<Model>begin("start",
AfterMatchSkipStrategy.skipToNext())
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx)
throws Exception {
return value.getText().equalsIgnoreCase("A");
}
}).followedBy("second")
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx)
throws Exception {
return value.getText().equalsIgnoreCase("B");
}
}).followedBy("third")
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx)
throws Exception {
return value.getText().equalsIgnoreCase("C");
}
}).followedBy("fourth")
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx)
throws Exception {
var list =
StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(),
false).collect(Collectors.toList());
var val =
list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
return value.getText().equalsIgnoreCase("D") && val;
}
});
PatternStream<Model> marketOpenPatternStream =
CEP.<Model>pattern(inputStream, pattern);
SingleOutputStreamOperator<List<Model>> marketOpenOutput =
marketOpenPatternStream
.process(new PatternProcessFunction<Model, List<Model>>() {
@Override
public void processMatch(Map<String,
List<Model>> match, Context ctx, Collector<List<Model>> out) throws
Exception {
System.out.println(match);
out.collect(new ArrayList(match.values()));
}
})
What I am trying to succeed is to match only patterns that have the
same symbol. If I use SimpleCondition with checks only about the text
of the Model(A, B,C..) without the symbol check in the last pattern,
the pattern sequence is fulfilled and I get the following output:
{start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
symbol='US'}], third=[Model{id=3, text='C', symbol='US'}],
fourth=[Model{id=8,text='D',symbol='AU'}]}
{start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
fourth=[Model{id=8, text='D', symbol='AU'}]}
{start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10,
text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}],
fourth=[Model{id=12, text='D', symbol='GB'}]}
However I want to avoid the match of elements with id= 1(A),2(B),3(C) with
the element with id = 8(D). For this reason I put the symbol check with the
event matched in the previous pattern in the last condition so I dont get
match since they do not have the same symbol. But after applying the
condition, now I do not get any output. none of the elements match the
pattern. What I am missing? Could someone help?