[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869925#comment-16869925
 ] 

Ryanne Dolan commented on KAFKA-7500:
-------------------------------------

[~abellemare] Thanks for trying it out.

> 1) Are the offset translation topics included... KAFKA-7915

Yes, I've included the required changes to SourceTask in PR-6295.

> 2) ...switching a consumer from one cluster to another...

So glad you asked :)

The key is the RemoteClusterUtils.translateOffsets() method. This consumes from 
the checkpoint topic (not the offset-sync topic directly), which has both 
upstream and downstream offsets for each consumer group. The downstream offsets 
are calculated based on the offset-sync stream, of course, but 
MirrorCheckpointConnector does the translation for you. This makes the 
translateOffsets() method rather straightforward -- it just finds the most 
recent checkpoint for a given group.

The translateOffsets() method works both ways: you can translate from a source 
topic ("topic1") to a remote topic ("us-east.topic1") and vice versa, which 
means your failover and failback logic is identical. In both cases you just 
migrate all your consumer groups from one cluster to another.

Also note that migration only requires information that is already stored on 
the target cluster (the checkpoints), so you do not need to connect to a failed 
cluster in order to failover from it. Obviously that would defeat the purpose!

Based on translateOffsets(), you can do several nifty things wrt 
failover/failback, e.g. build scripts that bulk-migrate consumer groups from 
one cluster to another, or add consumer client logic that automatically 
failsover to a secondary cluster as needed.

In the former case, you can use kafka-consumer-groups.sh to "reset offsets" to 
those returned by translateOffsets(). This will cause consumer state to be 
transferred to the target cluster, in effect. In the latter, you can use 
translateOffsets() with KafkaConsumer.seek().

There are more advanced operations and architectures you can build using MM2 as 
well. Some are outlined in the following talk (by me): 
https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

> 3) Do you use timestamps at all for failing over from one cluster to another?

MM2 preserves the timestamps of replicated records, but otherwise does not care 
about timestamps. Nor does failover need to involve any timestamps.

Ryanne

> MirrorMaker 2.0 (KIP-382)
> -------------------------
>
>                 Key: KAFKA-7500
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7500
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect, mirrormaker
>            Reporter: Ryanne Dolan
>            Priority: Minor
>             Fix For: 2.4.0
>
>
> ImplementĀ a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to