Carlo Bongiovanni created KAFKA-10681:
-----------------------------------------

             Summary: MM2 translateOffsets returns wrong offsets
                 Key: KAFKA-10681
                 URL: https://issues.apache.org/jira/browse/KAFKA-10681
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.5.0
         Environment: GKE, strimzi release
            Reporter: Carlo Bongiovanni


Hi all,

we'd like to make use of the ability of MM2 to mirror checkpoints of consumer 
offsets, in order to have a graceful failover from an active cluster to a 
standby one.

For this reason we have created the following setup (FYI all done with strimzi 
on k8s):
 * an active kafka cluster 2.5.0 used by a few producers/consumers
 * a standby kafka cluster 2.5.0
 * MM2 is setup in one direction only to mirror from active to standby

We have let MM2 run for some time and we could verify that messages are 
effectively mirrored.

At this point we have started developing the tooling to create consumer groups 
in the consumer-offsets topic of the passive cluster, by reading the internal 
checkpoints topic.

The following is an extract of our code to read the translated offsets:


{code:java}
Map<String, Object> mm2Props = new HashMap<>();
 mm2Props.put(BOOTSTRAP_SERVERS_CONFIG, "bootstrap_servers");
 mm2Props.put("source.cluster.alias", "euwe");
 mm2Props.put(SASL_MECHANISM, "SCRAM-SHA-512");
 mm2Props.put(SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"user\" password=\"password\";");
 mm2Props.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
 mm2Props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/usr/local/lib/jdk/lib/security/cacerts");
 mm2Props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "some-password");
Map<TopicPartition, OffsetAndMetadata> translatedOffsets = RemoteClusterUtils
 .translateOffsets(mm2Props, (String) mm2Props.get("source.cluster.alias"), cgi,
 Duration.ofSeconds(60L));
{code}
 

Before persisting the translated offsets with 
{code:java}
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = kafkaClient
 .alterConsumerGroupOffsets(cgi, offsets);{code}
we filter them because we don't want to create consumer groups for all 
retrieved offsets.

During the filtering, we compare the values of the translated offset for each 
topic partition (as coming from the checkpoint topic), 
 with the respective current offset value for each topic partition (as mirrored 
from MM2).

While running this check we have verified that for some topics we get big 
difference between those values, while for other topics the update seems 
realistic.

For example, looking at a given target partition we see it has an offset of 100 
(after mirroring by mm2). 
 From the checkpoint topic for the same consumer group id, we receive offset 
200, and later 150.

The issues are that:
 * both consumer group id offsets exceed the real offset of the partition
 * the consumer group id offsets from checkpoint goes down over time, not up

We haven't been able to explain it, the wrong numbers are coming from the 
*RemoteClusterUtils.translateOffsets()* and we're wondering if this could be a 
misconfiguration on our side or a bug of MM2.

Thanks, best
 C.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to