Alex Rovner created KAFKA-19107:
-----------------------------------

             Summary: MirrorMaker2 repeatedly tries to sync group offsets for 
partitions that no longer exist
                 Key: KAFKA-19107
                 URL: https://issues.apache.org/jira/browse/KAFKA-19107
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 3.9.0
            Reporter: Alex Rovner


h3. Summary

MM2's Checkpoint connector is currently unaware of topics being deleted and 
will repeatedly attempt to sync group offsets for them, which results in 
`UnknownTopicOrPartitionException` being logged at each sync. The checkpoints 
used for this sync are persisted in a MM2-internal checkpoints topic. Because 
the topic is compacted, the checkpoints that reference deleted topic-partitions 
are never cleaned up. This means that `UnknownTopicOrPartitionException` will 
occur indefinitely on every sync until manual intervention.

Our current workaround is to change the retention policy of the checkpoints 
topic to `compact,delete` or even `delete` to make sure that old checkpoints do 
not remain in the topic forever. However, this is not an ideal approach because 
(correct me if I'm wrong) any cleanup would only take effect when the connector 
is restarted because that is when the contents of the checkpoint-topic are 
loaded into the internal state of the connector.
h3. How to reproduce
 # Create 2 Kafka clusters (my-cluster and my-backup) with 
`{{{}auto.create.topics.enable{}}}=false`
 # Create a MM2 instance and configure it to sync consumer group offsets
 # Spin up a dummy synthetic client that continuously produces to some topic 
(e.g. `synth-client-topic`)
 # Consume from `synth-client-topic` using `kafka-console-consumer` with 
`--group foo`
 # Consume from the checkpoints topic to make sure that some checkpoints have 
been created:
{{kafka-console-consumer.sh --bootstrap-server my-backup-kafka-bootstrap:9092 
--topic source.checkpoints.internal --formatter 
"org.apache.kafka.connect.mirror.formatters.CheckpointFormatter" 
--from-beginning}}
 # Shut down MM2, consume some more from the `synth-client-topic`. This is 
probably not strictly necessary, but we want to create a scenario where the 
groups in the two clusters are out-of-sync.
 # Stop the dummy client. Delete the source and mirrored topics.
 # Launch MM2 again. Observe that it is now logging errors like this one every 
minute:
{quote}{{{}2025-04-07 12:25:34,203 ERROR 
[source->backup.MirrorCheckpointConnector|task-0] Unable to sync offsets for 
consumer group foo. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) 
[kafka-admin-client-thread | 
source->backup|source->backup.MirrorCheckpointConnector-0|checkpoint-target-admin]{}}}{{{}java.util.concurrent.CompletionException:
 org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Failed 
altering consumer group offsets for the following partitions: 
[source.synth-client-topic-0]{}}}{{    at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)}}{{
    at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)}}{{
    at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)}}{{
    at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)}}{{
    at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)}}{{
    at 
org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)}}{{
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)}}{{
    at 
org.apache.kafka.clients.admin.internals.AdminApiFuture$SimpleAdminApiFuture.complete(AdminApiFuture.java:100)}}{{
    at 
java.base/java.util.Collections$SingletonMap.forEach(Collections.java:5061)}}{{ 
   at 
java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)}}{{
    at 
org.apache.kafka.clients.admin.internals.AdminApiFuture$SimpleAdminApiFuture.complete(AdminApiFuture.java:96)}}{{
    at 
org.apache.kafka.clients.admin.internals.AdminApiDriver.complete(AdminApiDriver.java:186)}}{{
    at 
org.apache.kafka.clients.admin.internals.AdminApiDriver.onResponse(AdminApiDriver.java:230)}}{{
    at 
org.apache.kafka.clients.admin.KafkaAdminClient$39.handleResponse(KafkaAdminClient.java:4843)}}{{
    at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1397)}}{{
    at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1550)}}{{
    at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}}{{{}
    at java.base/java.lang.Thread.run(Thread.java:840){}}}{{{}Caused by: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Failed 
altering consumer group offsets for the following partitions: 
[source.synth-client-topic-0]{}}}{quote}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to