Hi Greg, Thank you for providing the resources to understand offset translation. I'm currently going through them and am gaining a better understanding of the process.
Before delving into these resources, I was examining the source code of RemoteClusterUtils.translateOffsets(). I'm trying to understand the specific differences between syncing offsets using RemoteClusterUtils and the inbuilt MM2 consumer offset sync feature. Although I read through KIP 545 regarding Mirror Maker 2, I didn't find much context to explain why this approach is preferable. Additionally, would it still be advisable to use RemoteClusterUtils.translateOffsets() given that we are on Kafka version 2.8.2, which has some known bugs related to negative consumer offsets? For reference, here is the related bug report: https://issues.apache.org/jira/browse/KAFKA-12635. On Sun, Sep 3, 2023 at 6:17 PM Hemanth Savasere <hemanth.savas...@gmail.com> wrote: > Hi, > Thanks a lot Greg for the resources to understand the offset translation, > I'm going through them and understanding the process. > > But before that I was also going through the source code of > RemoteClusterUtils.translateOffsets() and wanted to know that I did not get > much context on what's the difference when we sync the offsets using the > RemoteClusterUtils instead of using the inbuilt MM2 consumer offset sync > feature. > > I read through the KIP 545 regarding the Mirror Maker 2 but could not get > the context more on why this was being done, and can we still use the > RemoteClusterUtils.translateOffsets() as we are using the Kafka 2.8.2 > version which has some bugs related to negative consumer offsets > https://issues.apache.org/jira/browse/KAFKA-12635 > > Thanks in advance, > > > > On Tue, Aug 29, 2023 at 9:29 PM Greg Harris <greg.har...@aiven.io.invalid> > wrote: > >> Hey Hemanth! >> >> Thank you for asking about Mirror Maker 2! Offset translation is not >> so simple, so I'll summarize the main functionality and leave some >> pointers into the code for you to examine yourself. >> >> 1. After MirrorSourceTask writes a record, it receives a commitRecord >> callback [1] with information about what offset the record has in the >> destination. >> 2. The MirrorSourceTask sends this information (an OffsetSync) [2] to >> a persistent topic (the Offset Syncs topic.) >> 3. The MirrorCheckpointTask uses an OffsetSyncStore to read the Offset >> Syncs from the Offset Syncs topic [3] and store them in memory. >> 4. The OffsetSyncStore provides a translateDownstream method that >> reads from memory and translates the offset [4]. >> 5. The translation picks an offset sync before the offset being >> translated, to obtain a downstream offset which must be earlier than >> where the requested offset could be written. This changed recently in >> KAFKA-12468. >> 6. The MirrorCheckpointTask uses the translateDownstream method when >> computing the checkpoint records [5] which it then emits to the >> checkpoint topic. >> 7. The MirrorCheckpointTask stores the last checkpoint record it >> emitted and writes [6] the checkpoint downstream offset to the >> consumer group offset periodically. >> >> [1]: >> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L184 >> [2]: >> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L251 >> [3]: >> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L180 >> [4]: >> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L131 >> [5]: >> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L169 >> [6]: >> https://github.com/apache/kafka/blob/0912ca27e2a229d2ebe02f4d1dabc40ed5fab0bb/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L302 >> >> I hope the above can get you started in this area, please let me know >> if you have any more questions about offset translation. >> >> Thanks, >> Greg >> >> On Mon, Aug 28, 2023 at 9:14 PM Hemanth Savasere >> <hemanth.savas...@gmail.com> wrote: >> > >> > Hi, >> > We're using Mirror Maker 2 for replicating messages from our primary to >> > secondary cluster, it's one way replication. We're also replicating by >> > the consumer group offsets by adding the below properties >> > >> > sync.group.offsets.enabled=true >> > sync.group.offsets.interval.seconds=5 >> > emit.checkpoints.interval.seconds=5 >> > >> > Wanted to know how the translation of consumer group happens from >> source to >> > destination cluster, especially need to know "how current offset and log >> > end offset translation happens from source to destination kafka cluster" >> > >> > Thanks in advance. >> > >> > -- >> > Regards, >> > Hemanth Savasere >> > > > -- > Thanks & Regards, > Hemanth Savasere > > -- Thanks & Regards, Hemanth Savasere