Hey Hemanth! Thank you for asking about Mirror Maker 2! Offset translation is not so simple, so I'll summarize the main functionality and leave some pointers into the code for you to examine yourself.
1. After MirrorSourceTask writes a record, it receives a commitRecord callback [1] with information about what offset the record has in the destination. 2. The MirrorSourceTask sends this information (an OffsetSync) [2] to a persistent topic (the Offset Syncs topic.) 3. The MirrorCheckpointTask uses an OffsetSyncStore to read the Offset Syncs from the Offset Syncs topic [3] and store them in memory. 4. The OffsetSyncStore provides a translateDownstream method that reads from memory and translates the offset [4]. 5. The translation picks an offset sync before the offset being translated, to obtain a downstream offset which must be earlier than where the requested offset could be written. This changed recently in KAFKA-12468. 6. The MirrorCheckpointTask uses the translateDownstream method when computing the checkpoint records [5] which it then emits to the checkpoint topic. 7. The MirrorCheckpointTask stores the last checkpoint record it emitted and writes [6] the checkpoint downstream offset to the consumer group offset periodically. [1]: https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L184 [2]: https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L251 [3]: https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L180 [4]: https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L131 [5]: https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L169 [6]: https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L302 I hope the above can get you started in this area, please let me know if you have any more questions about offset translation. Thanks, Greg On Mon, Aug 28, 2023 at 9:14 PM Hemanth Savasere <hemanth.savas...@gmail.com> wrote: > > Hi, > We're using Mirror Maker 2 for replicating messages from our primary to > secondary cluster, it's one way replication. We're also replicating by > the consumer group offsets by adding the below properties > > sync.group.offsets.enabled=true > sync.group.offsets.interval.seconds=5 > emit.checkpoints.interval.seconds=5 > > Wanted to know how the translation of consumer group happens from source to > destination cluster, especially need to know "how current offset and log > end offset translation happens from source to destination kafka cluster" > > Thanks in advance. > > -- > Regards, > Hemanth Savasere