Hi, I am using a broadcast pattern for publishing rules and aggregating the data(https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html). My use case is similar and also the code. One thing I wanted to capture is to figure out any latevents if any and send them to a sink. But when there are events already on the kafka topic which weren't consumed and start the app after a couple of hours I see output timestamps messed up.
timestamp: 2021-12-02T04:48:20.324+0000, watermark: 292269055-12-02T16:47:04.192+0000, timeService.watermark: 292269055-12-02T16:47:04.192+0000 I have watermark strategy set on KafkaSource as: WatermarkStrategy<Record> wmStrategy = WatermarkStrategy .<CDRRecord>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(OUT_OF_ORDERNESS))) .withTimestampAssigner((cdrRecord, timestamp) -> record.getEventTime()); return env.addSource(recordSource.assignTimestampsAndWatermarks(wmStrategy)) .name("records Source") .setParallelism(config.get(SOURCE_PARALLELISM)); Please let me know if you need any more information. Thanks, Sweta