Hi, What is exactly the problem? Is it that no patterns are being generated?
Usually the problem is in idle parallel instances[1]. You need to have data flowing in each of the parallel instances for a watermark to progress. You can also read about it in the aspect of Kafka's partitions[2]. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote: > Hello, > > I’ve already asked the question today and got the > solve: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html>, > and > it’s clean for me how PatternStream works with ProcessTime. > > But I need help again, I can’t write proper code to execute > PatternStream with EventTime regime. > I think the problem is how I assign the watermark strategy. > > My code is below, version of Flink is 1.12: > > > public class Main { > > > > public static void main(String[] args) throws Exception { > > > > Properties properties = new Properties(); > > properties.put("group.id", "Flink"); > > properties.put("bootstrap.servers", "broker:9092"); > > > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( > > "test", > > new SimpleStringSchema(), > > properties); > > > > DataStream<String> stream = env > > .addSource(consumer) > > .map((MapFunction<String, String>) s -> { > > // /Just getting an object model/ > > return model.toString(); > > > }).*assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))* > > * .withTimestampAssigner((event, timestamp) -> {* > > * Model model = new Gson().fromJson(event, > Model.class);* > > * return model.getServerTime();* > > * }));* > > > > stream.print("Stream"); > > > > > > > > Pattern<String, String> firstPattern = Pattern > > .<String>begin("first") > > .where(new IterativeCondition<String>() { > > @Override > > public boolean filter(String s, Context<String> > context) throws Exception { > > return s.contains("Start"); > > } > > }); > > > > DataStream<String> result = CEP > > .pattern(stream, firstPattern) > > *.inEventTime() // default TimeCharacteristic for 1.12* > > .process(new PatternProcessFunction<String, String>() { > > @Override > > public void processMatch(Map<String, List<String>> > map, Context context, Collector<String> collector) throws Exception { > > collector.collect(map.get("first").get(0)); > > } > > }); > > > > result.print("Result"); > > > > env.execute(); > > } > > > > } > > > Please, help me to correct the code ) > > Thanks,Yuri L. > Ответить > Переслать > Предложить звонок > Создать событие > ПринятоХорошоВсе понятно, спасибо за информацию > > > >
OpenPGP_signature
Description: OpenPGP digital signature