I would like to understand how FlinkKafkaConsumer treats "unbalanced" topics.
We're using FlinkKafkaConsumer010 with 2 topics, say "small_topic" & "big_topic". After restoring from an old savepoint (4 hours before), I checked the consumer offsets on Kafka (Flink commits offsets to kafka for reference – checkpointing is enabled so in reality Flink manages the offsets internally). It seems that the offsets for "small_topic" are quickly caught up and committed to the latest current offset. However, catching up with the "big_topic" takes much longer. Is it possible that watermarks are determined based on the "small_topic" and messages that are read from the "big_topic" are getting discarded (ie. excluded from the triggered windows) if they have too old event time timestamps? Or how does FlinkKafkaConsumer handle this? Does it somehow synchronize reading based on the extracted timestamp: - across partitions of a single topic? - across topics? Our code is basically: env .addSource(new FlinkKafkaConsumer010<>( Arrays.asList("big_topic", "small_topic"), new EventMapSchema(), props)) .assignTimestampsAndWatermarks( new OufOfOrderTimestampExtractor("timestamp_field", Time.seconds(cfg.getMaxOutOfOrdernessInSeconds()))); Finally, I don't think this problem is only related to restoring state and having to catch up. It just makes it more prominent. This could also happen during normal streaming, if consuming from bigger topics is slow enough? Thanks!