Thank you both for your swift responses! Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset replication in cases where the producer migrated to the secondary cluster as well, starts feeding messages into the non-prefixed topic which are subsequently consumed by the consumer on the secondary cluster. After the fallback, it asserts the consumer offsets on the non-prefixed topic in the secondary cluster are translated and replicated to the consumer offsets of the prefixed topic in the primary cluster. In my example, the producer keeps producing in the primary cluster whereas only the consumer fails over to the secondary cluster and, after some time fails back to the primary cluster. This consumer will then consume messages from the prefixed topic in the secondary cluster, and I'd like to have those offsets replicated back to the non-prefixed topic in the primary cluster. If you like I can provide an illustration if that helps to clarify this use case.
To add some context on why I'd like to have this is to retain loose coupling between producers and consumers so we're able to test failovers for individual applications without the need for all producers/consumers to failover and failback at once. Digging through the Connect debug logs I found the offset-syncs of the prefixed topic not being pushed to mm2-offset-syncs.b.internal is likely the reason the checkpoint connector doesn't replicate consumer offsets: DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped (offset sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore) I wrote a small program to produce these offset syncs for the prefixed topic, and this successfully triggers the Checkpoint connector to start replicating the consumer offsets back to the primary cluster. OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28, downstreamOffset=28} OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29, downstreamOffset=29} OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29, downstreamOffset=29} <-- the artificially generated offset-sync At this point it goes a bit beyond my understanding of the MM2 internals of whether this is a wise thing to do and if it would have any negative side effects. I'd need to spend some more time in the MM2 source, though I welcome any feedback on this hack :-) On the two complications you're mentioning Greg, the second one is something we should figure out regardless, as any given consumer group may not be active on both the primary and secondary cluster as it would block MM2 from replicating its offsets from primary to the cluster-prefixed topic on the secondary cluster already. On the first point, I think it would be a good practice to only allow MM2 to produce to any cluster-prefixed topic by using topic ACLs. In other words, the only application producing to a cluster-prefixed (or downstream) topic would be mirror maker and I think that prevents this kind of message 'drift'. In case a producer has to failover, it starts producing to the non-prefixed topic on the secondary cluster whose offsets are subject to a different Source/Checkpoint connector replication stream. On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <> wrote: > Jeroen, MirrorClient will correctly translate offsets for both failover and > failback, exactly as you describe. It's possible to automate failover and > failback using that logic. The integration tests automatically fail over > and fail back, for example. I've seen it done two ways: during startup > within the consumer itself, or in an external tool which writes offsets > directly. In either case MirrorClient will give you the correct offsets to > resume from. > > MirrorCheckpointConnector will automatically write offsets, but only under > certain conditions, to avoid accidentally overwriting offsets. I'm not sure > whether you can failover and failback using just the automatic behavior. My > guess is it works, but you are tripping over one of the safety checks. You > might try deleting the consumer group on the source cluster prior to > failback. > > Ryanne > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup < > > > wrote: > > > Hi all, > > I'm exploring using the MirrorSourceConnector and > MirrorCheckpointConnector > > on Kafka Connect to setup active/active replication between two Kafka > > clusters. Using the DefaultReplicationPolicy replication policy class, > > messages originating from the source cluster get replicated as expected > to > > the cluster-prefixed topic in the target cluster. Consumergroup offsets > > from the source to target cluster are replicated likewise. However, once > > the consumer group migrates from the source to the target cluster, its > > offsets are not replicated from the target back to the source cluster. > > For an active/active setup I'd want consumer group offsets for topic > > <source-cluster-alias>.<topic-name> in the target cluster to be > replicated > > back to <topic-name> in the source cluster. This would allow consumers to > > failover & failback between clusters with minimal duplicate message > > consumption. > > > > To clarify my setup a bit; I'm running two single-broker Kafka clusters > in > > Docker (cluster A & B), along with a single Connect instance on which > I've > > provisioned four source connectors: > > - A MirrorSourceConnector replicating topics from cluster A to cluster B > > - A MirrorSourceConnector replicating topics from cluster B to cluster A > > - A MirrorCheckpointConnector translating & replicating offsets from > > cluster A to cluster B > > - A MirrorCheckpointConnector translating & replicating offsets from > > cluster B to cluster A > > > > I'm not sure whether this is by design, or maybe I'm missing something. > > I've seen a similar question posted to KAFKA-9076 [1] without a > resolution. > > > > Regards, > > Jeroen > > > > [1] > > > > > > > >