Hello,
I’ve already asked the question today and got the solve:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
, and it’s clean for me how PatternStream works with ProcessTime.
But I need help again, I can’t write proper code to execute PatternStream with
EventTime regime.
I think the problem is how I assign the watermark strategy.
My code is below, version of Flink is 1.12:
public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("group.id", "Flink");
properties.put("bootstrap.servers", "broker:9092");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"test",
new SimpleStringSchema(),
properties);
DataStream<String> stream = env
.addSource(consumer)
.map((MapFunction<String, String>) s -> {
// Just getting an object model
return model.toString();
}).
assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> {
Model model = new Gson().fromJson(event,
Model.class);
return model.getServerTime();
}));
stream.print("Stream");
Pattern<String, String> firstPattern = Pattern
.<String>begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context)
throws Exception {
return s.contains("Start");
}
});
DataStream<String> result = CEP
.pattern(stream, firstPattern)
.inEventTime() // default TimeCharacteristic for 1.12
.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(Map<String, List<String>> map,
Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("first").get(0));
}
});
result.print("Result");
env.execute();
}
}
Please, help me to correct the code )
Thanks, Yuri L.
Ответить
Переслать
Предложить звонок
Создать событие
Принято Хорошо Все понятно, спасибо за информацию