Aljoscha Pörtner created KAFKA-13611: ----------------------------------------
Summary: Failed reconfiguration of tasks can cause missing offset replications in MirrorCheckpointConnector Key: KAFKA-13611 URL: https://issues.apache.org/jira/browse/KAFKA-13611 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 3.0.0, 2.8.1, 2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.4.0 Reporter: Aljoscha Pörtner Because the _knownConsumerGroups_ are stored within a variable an not queried every time _refreshConsumerGroups_ gets executed, errors within the task reconfiguration aren't recognized and the reconfiguration will not be retried until a new consumer group is added. This can lead to missing offset updates in the target cluster because the consumer group is not picked up by a task until a completely new consumer group is added and the task reconfiguration is successful. {code:java} private void refreshConsumerGroups() throws InterruptedException, ExecutionException { List<String> consumerGroups = findConsumerGroups(); Set<String> newConsumerGroups = new HashSet<>(); newConsumerGroups.addAll(consumerGroups); newConsumerGroups.removeAll(knownConsumerGroups); Set<String> deadConsumerGroups = new HashSet<>(); deadConsumerGroups.addAll(knownConsumerGroups); deadConsumerGroups.removeAll(consumerGroups); if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) { log.info("Found {} consumer groups for {}. {} are new. {} were removed. Previously had {}.", consumerGroups.size(), sourceAndTarget, newConsumerGroups.size(), deadConsumerGroups.size(), knownConsumerGroups.size()); log.debug("Found new consumer groups: {}", newConsumerGroups); knownConsumerGroups = consumerGroups; context.requestTaskReconfiguration(); } } {code} [Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124] For an example how the problem can be triggered take a look at the following [issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688]. -- This message was sent by Atlassian Jira (v8.20.1#820001)