Alex Rovner created KAFKA-19107: ----------------------------------- Summary: MirrorMaker2 repeatedly tries to sync group offsets for partitions that no longer exist Key: KAFKA-19107 URL: https://issues.apache.org/jira/browse/KAFKA-19107 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.9.0 Reporter: Alex Rovner
h3. Summary MM2's Checkpoint connector is currently unaware of topics being deleted and will repeatedly attempt to sync group offsets for them, which results in `UnknownTopicOrPartitionException` being logged at each sync. The checkpoints used for this sync are persisted in a MM2-internal checkpoints topic. Because the topic is compacted, the checkpoints that reference deleted topic-partitions are never cleaned up. This means that `UnknownTopicOrPartitionException` will occur indefinitely on every sync until manual intervention. Our current workaround is to change the retention policy of the checkpoints topic to `compact,delete` or even `delete` to make sure that old checkpoints do not remain in the topic forever. However, this is not an ideal approach because (correct me if I'm wrong) any cleanup would only take effect when the connector is restarted because that is when the contents of the checkpoint-topic are loaded into the internal state of the connector. h3. How to reproduce # Create 2 Kafka clusters (my-cluster and my-backup) with `{{{}auto.create.topics.enable{}}}=false` # Create a MM2 instance and configure it to sync consumer group offsets # Spin up a dummy synthetic client that continuously produces to some topic (e.g. `synth-client-topic`) # Consume from `synth-client-topic` using `kafka-console-consumer` with `--group foo` # Consume from the checkpoints topic to make sure that some checkpoints have been created: {{kafka-console-consumer.sh --bootstrap-server my-backup-kafka-bootstrap:9092 --topic source.checkpoints.internal --formatter "org.apache.kafka.connect.mirror.formatters.CheckpointFormatter" --from-beginning}} # Shut down MM2, consume some more from the `synth-client-topic`. This is probably not strictly necessary, but we want to create a scenario where the groups in the two clusters are out-of-sync. # Stop the dummy client. Delete the source and mirrored topics. # Launch MM2 again. Observe that it is now logging errors like this one every minute: {quote}{{{}2025-04-07 12:25:34,203 ERROR [source->backup.MirrorCheckpointConnector|task-0] Unable to sync offsets for consumer group foo. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) [kafka-admin-client-thread | source->backup|source->backup.MirrorCheckpointConnector-0|checkpoint-target-admin]{}}}{{{}java.util.concurrent.CompletionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Failed altering consumer group offsets for the following partitions: [source.synth-client-topic-0]{}}}{{ at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)}}{{ at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)}}{{ at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)}}{{ at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)}}{{ at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)}}{{ at org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)}}{{ at org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)}}{{ at org.apache.kafka.clients.admin.internals.AdminApiFuture$SimpleAdminApiFuture.complete(AdminApiFuture.java:100)}}{{ at java.base/java.util.Collections$SingletonMap.forEach(Collections.java:5061)}}{{ at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)}}{{ at org.apache.kafka.clients.admin.internals.AdminApiFuture$SimpleAdminApiFuture.complete(AdminApiFuture.java:96)}}{{ at org.apache.kafka.clients.admin.internals.AdminApiDriver.complete(AdminApiDriver.java:186)}}{{ at org.apache.kafka.clients.admin.internals.AdminApiDriver.onResponse(AdminApiDriver.java:230)}}{{ at org.apache.kafka.clients.admin.KafkaAdminClient$39.handleResponse(KafkaAdminClient.java:4843)}}{{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1397)}}{{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1550)}}{{ at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}}{{{} at java.base/java.lang.Thread.run(Thread.java:840){}}}{{{}Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Failed altering consumer group offsets for the following partitions: [source.synth-client-topic-0]{}}}{quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)