Hi Team,

We are using Dynamic Kafka Source for reading data from Kafka topics from
one or more Kafka clusters. We have 1 Primary and 1 Secondary kafka cluster
setup where topic data is getting replicated from primary to secondary
cluster using mirror maker. The Auto Offset reset strategy used in our
config is Grouped and
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS is set to
30 sec. We are using default properties of Kafka Consumer.

Data from primary cluster is fetched based in grouped strategy. Now, when
we make primary cluster down the config switched to secondary cluster but
same data is getting read 4 to 10 times from secondary cluster.

When we analyzed the jobmanager logs we found that in the splits Starting
offset the set to -2 after intially setting to -3

Line  5139: 2025-10-29 06:11:20,529 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-3, StoppingOffset: -9223372036854775808]]}
Line  5140: 2025-10-29 06:11:20,530 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster primarycluster: {0=[[Partition: oct1-0,
StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
Line 12565: 2025-10-29 06:24:48,819 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-3, StoppingOffset: -9223372036854775808]]}
Line 12566: 2025-10-29 06:24:48,819 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition:
oct1-0, StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
Line 12572: 2025-10-29 06:24:48,822 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-2, StoppingOffset: -9223372036854775808]]}
Line 12573: 2025-10-29 06:24:48,822 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition:
oct1-0, StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12575: 2025-10-29 06:24:48,823 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-2, StoppingOffset: -9223372036854775808]]}
Line 12576: 2025-10-29 06:24:48,823 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition:
oct1-0, StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12578: 2025-10-29 06:24:48,823 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-2, StoppingOffset: -9223372036854775808]]}
Line 12579: 2025-10-29 06:24:48,823 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition:
oct1-0, StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12723: 2025-10-29 06:24:58,852 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-2, StoppingOffset: -9223372036854775808]]}
Line 12724: 2025-10-29 06:24:58,852 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition:
oct1-0, StartingOffset: -2, StoppingOffset: -9223372036854775808]]}
Line 12726: 2025-10-29 06:24:58,853 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: oct1-0, StartingOffset:
-2, StoppingOffset: -9223372036854775808]]}
Line 12729: 2025-10-29 06:24:58,853 INFO
 
org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy
[] - Assigning 1 splits for cluster secondarycluster0: {0=[[Partition:
oct1-0, StartingOffset: -2, StoppingOffset: -9223372036854775808]]}

But when we increase the
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS to approx 3
minutes, this behaviour is not observed. Data from both primary and
secondary are read based on grouped strategy without duplicates.

Can you please suggest a reason for this behaviour.

Best Regards
Mahima Agarwal

Reply via email to