Hey Hemanth!

Thank you for asking about Mirror Maker 2! Offset translation is not
so simple, so I'll summarize the main functionality and leave some
pointers into the code for you to examine yourself.

1. After MirrorSourceTask writes a record, it receives a commitRecord
callback [1] with information about what offset the record has in the
destination.
2. The MirrorSourceTask sends this information (an OffsetSync) [2] to
a persistent topic (the Offset Syncs topic.)
3. The MirrorCheckpointTask uses an OffsetSyncStore to read the Offset
Syncs from the Offset Syncs topic [3] and store them in memory.
4. The OffsetSyncStore provides a translateDownstream method that
reads from memory and translates the offset [4].
5. The translation picks an offset sync before the offset being
translated, to obtain a downstream offset which must be earlier than
where the requested offset could be written. This changed recently in
KAFKA-12468.
6. The MirrorCheckpointTask uses the translateDownstream method when
computing the checkpoint records [5] which it then emits to the
checkpoint topic.
7. The MirrorCheckpointTask stores the last checkpoint record it
emitted and writes [6] the checkpoint downstream offset to the
consumer group offset periodically.

[1]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L184
[2]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L251
[3]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L180
[4]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L131
[5]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L169
[6]: 
https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L302

I hope the above can get you started in this area, please let me know
if you have any more questions about offset translation.

Thanks,
Greg

On Mon, Aug 28, 2023 at 9:14 PM Hemanth Savasere
<hemanth.savas...@gmail.com> wrote:
>
> Hi,
> We're using Mirror Maker 2 for replicating messages from our primary to
> secondary cluster, it's one way replication. We're also replicating by
> the consumer group offsets by adding the below properties
>
> sync.group.offsets.enabled=true
> sync.group.offsets.interval.seconds=5
> emit.checkpoints.interval.seconds=5
>
> Wanted to know how the translation of consumer group happens from source to
> destination cluster, especially need to know "how current offset and log
> end offset translation happens from source to destination kafka cluster"
>
> Thanks in advance.
>
> --
> Regards,
> Hemanth Savasere

Reply via email to