Carlo Bongiovanni created KAFKA-10681: -----------------------------------------
Summary: MM2 translateOffsets returns wrong offsets Key: KAFKA-10681 URL: https://issues.apache.org/jira/browse/KAFKA-10681 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.5.0 Environment: GKE, strimzi release Reporter: Carlo Bongiovanni Hi all, we'd like to make use of the ability of MM2 to mirror checkpoints of consumer offsets, in order to have a graceful failover from an active cluster to a standby one. For this reason we have created the following setup (FYI all done with strimzi on k8s): * an active kafka cluster 2.5.0 used by a few producers/consumers * a standby kafka cluster 2.5.0 * MM2 is setup in one direction only to mirror from active to standby We have let MM2 run for some time and we could verify that messages are effectively mirrored. At this point we have started developing the tooling to create consumer groups in the consumer-offsets topic of the passive cluster, by reading the internal checkpoints topic. The following is an extract of our code to read the translated offsets: {code:java} Map<String, Object> mm2Props = new HashMap<>(); mm2Props.put(BOOTSTRAP_SERVERS_CONFIG, "bootstrap_servers"); mm2Props.put("source.cluster.alias", "euwe"); mm2Props.put(SASL_MECHANISM, "SCRAM-SHA-512"); mm2Props.put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";"); mm2Props.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); mm2Props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/usr/local/lib/jdk/lib/security/cacerts"); mm2Props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "some-password"); Map<TopicPartition, OffsetAndMetadata> translatedOffsets = RemoteClusterUtils .translateOffsets(mm2Props, (String) mm2Props.get("source.cluster.alias"), cgi, Duration.ofSeconds(60L)); {code} Before persisting the translated offsets with {code:java} AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = kafkaClient .alterConsumerGroupOffsets(cgi, offsets);{code} we filter them because we don't want to create consumer groups for all retrieved offsets. During the filtering, we compare the values of the translated offset for each topic partition (as coming from the checkpoint topic), with the respective current offset value for each topic partition (as mirrored from MM2). While running this check we have verified that for some topics we get big difference between those values, while for other topics the update seems realistic. For example, looking at a given target partition we see it has an offset of 100 (after mirroring by mm2). From the checkpoint topic for the same consumer group id, we receive offset 200, and later 150. The issues are that: * both consumer group id offsets exceed the real offset of the partition * the consumer group id offsets from checkpoint goes down over time, not up We haven't been able to explain it, the wrong numbers are coming from the *RemoteClusterUtils.translateOffsets()* and we're wondering if this could be a misconfiguration on our side or a bug of MM2. Thanks, best C. -- This message was sent by Atlassian Jira (v8.3.4#803005)