I would like to understand how FlinkKafkaConsumer treats "unbalanced"
topics.

We're using FlinkKafkaConsumer010 with 2 topics, say "small_topic" &
"big_topic".

After restoring from an old savepoint (4 hours before), I checked the
consumer offsets on Kafka (Flink commits offsets to kafka for reference –
checkpointing is enabled so in reality Flink manages the offsets
internally). It seems that the offsets for "small_topic" are quickly caught
up and committed to the latest current offset. However, catching up with
the "big_topic" takes much longer.

Is it possible that watermarks are determined based on the "small_topic"
and messages that are read from the "big_topic" are getting discarded (ie.
excluded from the triggered windows) if they have too old event time
timestamps?

Or how does FlinkKafkaConsumer handle this? Does it somehow synchronize
reading based on the extracted timestamp:
- across partitions of a single topic?
- across topics?

Our code is basically:

        env
                .addSource(new FlinkKafkaConsumer010<>(
                        Arrays.asList("big_topic", "small_topic"),
                        new EventMapSchema(),
                        props))
                .assignTimestampsAndWatermarks(
                        new OufOfOrderTimestampExtractor("timestamp_field",

Time.seconds(cfg.getMaxOutOfOrdernessInSeconds())));

Finally, I don't think this problem is only related to restoring state and
having to catch up. It just makes it more prominent. This could also happen
during normal streaming, if consuming from bigger topics is slow enough?

Thanks!

Reply via email to