Hi Team,
We are using Dynamic Kafka Source for reading data from Kafka topics from one
or more Kafka clusters. We have 1 Primary and 1 Secondary kafka cluster setup
where topic data is getting replicated from primary to secondary cluster using
mirror maker. The Auto Offset reset strategy used in our config is Grouped and
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS is set to 30
sec. We are using default properties of Kafka Consumer.
Data from primary cluster is fetched based in grouped strategy. Now, when we
make primary cluster down the config switched to secondary cluster but same
data is getting read 4 to 10 times from secondary cluster.
When we analyzed the jobmanager logs we found that in the splits Starting
offset the set to -2 after intially setting to -3
Line 5139: 2025-10-29 06:11:20,529 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -3,
StoppingOffset: -9223372036854775808]]}
Line 5140: 2025-10-29 06:11:20,530 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster primarycluster: {0=[[Partition: oct1-0,
StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
Line 12565: 2025-10-29 06:24:48,819 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -3,
StoppingOffset: -9223372036854775808]]}
Line 12566: 2025-10-29 06:24:48,819 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0,
StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
Line 12572: 2025-10-29 06:24:48,822 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2,
StoppingOffset: -9223372036854775808]]}
Line 12573: 2025-10-29 06:24:48,822 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0,
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12575: 2025-10-29 06:24:48,823 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2,
StoppingOffset: -9223372036854775808]]}
Line 12576: 2025-10-29 06:24:48,823 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0,
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12578: 2025-10-29 06:24:48,823 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2,
StoppingOffset: -9223372036854775808]]}
Line 12579: 2025-10-29 06:24:48,823 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0,
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12723: 2025-10-29 06:24:58,852 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2,
StoppingOffset: -9223372036854775808]]}
Line 12724: 2025-10-29 06:24:58,852 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0,
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12726: 2025-10-29 06:24:58,853 INFO
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] -
Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset: -2,
StoppingOffset: -9223372036854775808]]}
Line 12729: 2025-10-29 06:24:58,853 INFO
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition: oct1-0,
StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
But when we increase the
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS to approx 3
minutes, this behaviour is not observed. Data from both primary and secondary
are read based on grouped strategy without duplicates.
Can you please suggest a reason for this behaviour.
Best Regards
Swati