Hi, I was experimenting with DynamicKafkaSource with 2 clusters. My use case is of a failover - when the active site fails, I want the Kafka Source to start reading data from the standby site. I observed that DynamicKafkaSource resets the offsets on Cluster-2 back to -3 though it was already at 30 on Cluster-1: org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader - Skipping outdated split due to metadata changes: DynamicKafkaSourceSplit{kafkaClusterId=cluster1, kafkaPartitionSplit=[Partition: topic1-0, StartingOffset: 30, StoppingOffset: -9223372036854775808]}
org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader - Adding splits to reader 6: [DynamicKafkaSourceSplit{kafkaClusterId=cluster2, kafkaPartitionSplit=[Partition: topic1-0, StartingOffset: -3, StoppingOffset: -9223372036854775808]}] My question is - Shouldn't DynamicKafkaSource use the checkpointed offset, 30 in this case so that it can resume replicated data from checkpointed offset? I know it should start reading from the committed offset on cluster-2 anyway (it's not in my use case because I am doing a PoC on non-replicated separate Kafka Clusters). But is there no way DynamicKafkaSource can use the checkpointed offset while switching to cluster-2? Thanks in advance! Chirag