Chirag Dewan created FLINK-37629: ------------------------------------ Summary: Use Checkpointed Offset while migrating clusters in DynamicKafkaSource Key: FLINK-37629 URL: https://issues.apache.org/jira/browse/FLINK-37629 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.20.1 Reporter: Chirag Dewan
In my use case, I have a 2 cluster Kafka deployment. One is primary and other one is replicated to using MM2. Producers can't directly write to the replicated cluster. It's just used for consuming records. I want my KafkaSource to failover to replicated cluster when the primary cluster fails. And I want the KafkaSource to resume reading the records from where it left off on the primary. If the checkpointed offset is not yet replicated on that cluster, KafkaSource can use the latest offset (means it will sit idle since new data isnt produced on this cluster) Fallback can also rely on Checkpointed offset, because I am sure replicated cluster will always trail the primary cluster. I thought of using DynamicKafkaSource for this purpose. However, currently, DynamicKafkaSource relies on the consumer group offset in Kafka (startingOffset = -3) to start reading data from the secondary cluster after failover. I understand it would be problematic to use checkpointed offset while falling back to the primary cluster, generally. But it works well in my use case. So the ask is - Can we make DynamicKafkaSource use the checkpointed offset? Maybe even as a configurable option? -- This message was sent by Atlassian Jira (v8.20.10#820010)