Hi,

What is exactly the problem? Is it that no patterns are being generated?

Usually the problem is in idle parallel instances[1]. You need to have
data flowing in each of the parallel instances for a watermark to
progress. You can also read about it in the aspect of Kafka's partitions[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
> Hello,
>  
> I’ve already asked the question today and got the
> solve: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html>,
>  and
> it’s clean for me how PatternStream works with ProcessTime.
>  
> But I need help again, I can’t write proper code to execute
> PatternStream with EventTime regime.
> I think the problem is how I assign the watermark strategy.
>  
> My code is below, version of Flink is 1.12:
>  
>
> public class Main {
>
>  
>
>     public static void main(String[] args) throws Exception {
>
>  
>
>         Properties properties = new Properties();
>
>         properties.put("group.id", "Flink");
>
>         properties.put("bootstrap.servers", "broker:9092");
>
>  
>
>  
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  
>
>         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
>
>                 "test",
>
>                 new SimpleStringSchema(),
>
>                 properties);
>
>  
>
>         DataStream<String> stream = env
>
>                 .addSource(consumer)
>
>                 .map((MapFunction<String, String>) s -> {
>
>                     // /Just getting an object model/
>
>                     return model.toString();
>
>                
> }).*assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))*
>
> *                        .withTimestampAssigner((event, timestamp) -> {*
>
> *                            Model model = new Gson().fromJson(event,
> Model.class);*
>
> *                            return model.getServerTime();*
>
>                       *  }));*
>
>  
>
>         stream.print("Stream");
>
>  
>
>  
>
>  
>
>         Pattern<String, String> firstPattern = Pattern
>
>                 .<String>begin("first")
>
>                 .where(new IterativeCondition<String>() {
>
>                     @Override
>
>                     public boolean filter(String s, Context<String>
> context) throws Exception {
>
>                         return s.contains("Start");
>
>                     }
>
>                 });
>
>  
>
>         DataStream<String> result = CEP
>
>                 .pattern(stream, firstPattern)
>
>                 *.inEventTime() // default TimeCharacteristic for 1.12*
>
>                 .process(new PatternProcessFunction<String, String>() {
>
>                     @Override
>
>                     public void processMatch(Map<String, List<String>>
> map, Context context, Collector<String> collector) throws Exception {
>
>                         collector.collect(map.get("first").get(0));
>
>                     }
>
>                 });
>
>  
>
>         result.print("Result");
>
>  
>
>         env.execute();
>
>     }
>
>  
>
> }
>
>  
> Please, help me to correct the code )
>  
> Thanks,Yuri L.
>  Ответить
>  Переслать
>  Предложить звонок
>  Создать событие
> ПринятоХорошоВсе понятно, спасибо за информацию
>
>  
>  
>  

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to