Hi Team,

We are using Dynamic Kafka Source for reading data from Kafka topics from one 
or more Kafka clusters. We have 1 Primary and 1 Secondary kafka cluster setup 
where topic data is getting replicated from primary to secondary cluster using 
mirror maker. The Auto Offset reset strategy used in our config is Grouped and 
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS is set to 30 
sec. We are using default properties of Kafka Consumer.

Data from primary cluster is fetched based in grouped strategy. Now, when we 
make primary cluster down the config switched to secondary cluster but same 
data is getting read 4 to 10 times from secondary cluster.

When we analyzed the jobmanager logs we found that in the splits Starting 
offset the set to -2 after intially setting to -3

Line  5139: 2025-10-29 06:11:20,529 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -3, 
StoppingOffset: -9223372036854775808]]}
Line  5140: 2025-10-29 06:11:20,530 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster primarycluster: {0=[[Partition: oct1-0, 
StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
Line 12565: 2025-10-29 06:24:48,819 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -3, 
StoppingOffset: -9223372036854775808]]}
Line 12566: 2025-10-29 06:24:48,819 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0, 
StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
Line 12572: 2025-10-29 06:24:48,822 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2, 
StoppingOffset: -9223372036854775808]]}
Line 12573: 2025-10-29 06:24:48,822 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0, 
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12575: 2025-10-29 06:24:48,823 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2, 
StoppingOffset: -9223372036854775808]]}
Line 12576: 2025-10-29 06:24:48,823 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0, 
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12578: 2025-10-29 06:24:48,823 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2, 
StoppingOffset: -9223372036854775808]]}
Line 12579: 2025-10-29 06:24:48,823 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0, 
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12723: 2025-10-29 06:24:58,852 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2, 
StoppingOffset: -9223372036854775808]]}
Line 12724: 2025-10-29 06:24:58,852 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0, 
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12726: 2025-10-29 06:24:58,853 INFO  
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2, 
StoppingOffset: -9223372036854775808]]}
Line 12729: 2025-10-29 06:24:58,853 INFO  
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
 [] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0, 
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}

But when we increase the 
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS to approx 3 
minutes, this behaviour is not observed. Data from both primary and secondary 
are read based on grouped strategy without duplicates.

Can you please suggest a reason for this behaviour.

Best Regards
Swati

Reply via email to