> do you recall the purpose of [...] renameTopicPartition [?] A's topic1 and B's a.topic1 should be the same data (minus replication lag). You can't consume a record in a.topic1 that hasn't been replicated yet -- a remote topic by definition does not have any records that MM2 didn't put there. So an offset for a consumer consuming from B's a.topic1 can be translated back to an offset in A's topic1, where the data came from.
Ryanne On Wed, Jan 10, 2024, 6:07 PM Greg Harris <greg.har...@aiven.io.invalid> wrote: > Hi Jeroen, > > I'm glad you're experimenting with MM2, and I hope we can give you > some more context to explain what you're seeing. > > > I wrote a small program to produce these offset syncs for the prefixed > > topic, and this successfully triggers the Checkpoint connector to start > > replicating the consumer offsets back to the primary cluster. > > This is interesting, and I wouldn't have expected it to work. > > To rewind, each flow Source->Target has a MirrorSourceConnector, an > Offset Syncs Topic, and a MirrorCheckpointConnector. With both > directions enabled, there are two separate flows each with Source, > Syncs topic, and Checkpoint. > With offset-syncs.topic.location=source, the > mm2-offset-syncs.b.internal on the A cluster is used for the A -> B > replication flow. It contains topic names from cluster A, and the > corresponding offsets those records were written to on the B cluster. > When translation is performed, the consumer groups from A are > replicated to the B cluster, and the replication mapping (cluster > prefix) is added. > Using your syncs topic as an example, > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28, > downstreamOffset=28} will be used to write offsets for > "a.replicate-me-0" for the equivalent group on the B cluster. > > When your artificial sync OffsetSync{topicPartition=a.replicate-me-0, > upstreamOffset=29, downstreamOffset=29} is processed, it should be > used to write offsets for "a.a.replicate-me-0" but it actually writes > offsets to "replicate-me-0" due to this function that I've never > encountered before: [1]. > I think you could get those sorts of syncs into the syncs-topic if you > had A->B configured with offset-syncs.topic.location=source, and B->A > with offset-syncs-topic.location=target, and configured the topic > filter to do A -> B -> A round trip replication. > > This appears to work as expected if there are no failures or restarts, > but as soon as a record is re-delivered in either flow, I think the > offsets should end up constantly advancing in an infinite loop. Maybe > you can try that: Before starting the replication, insert a few > records into `a.replicate-me` to force replicate-me-0's offset n to > replicate to a.replicate-me-0's offset n+k. > > Ryanne, do you recall the purpose of the renameTopicPartition > function? To me it looks like it could only be harmful, as it renames > checkpoints to target topics that MirrorMaker2 isn't writing. It also > looks like it isn't active in a typical MM2 setup. > > Thanks! > Greg > > [1]: > https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267 > > On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup > <jer...@cloudflare.com.invalid> wrote: > > > > Thank you both for your swift responses! > > > > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset > > replication in cases where the producer migrated to the secondary cluster > > as well, starts feeding messages into the non-prefixed topic which are > > subsequently consumed by the consumer on the secondary cluster. After the > > fallback, it asserts the consumer offsets on the non-prefixed topic in > the > > secondary cluster are translated and replicated to the consumer offsets > of > > the prefixed topic in the primary cluster. > > In my example, the producer keeps producing in the primary cluster > whereas > > only the consumer fails over to the secondary cluster and, after some > time > > fails back to the primary cluster. This consumer will then consume > messages > > from the prefixed topic in the secondary cluster, and I'd like to have > > those offsets replicated back to the non-prefixed topic in the primary > > cluster. If you like I can provide an illustration if that helps to > clarify > > this use case. > > > > To add some context on why I'd like to have this is to retain loose > > coupling between producers and consumers so we're able to test failovers > > for individual applications without the need for all producers/consumers > to > > failover and failback at once. > > > > Digging through the Connect debug logs I found the offset-syncs of the > > prefixed topic not being pushed to mm2-offset-syncs.b.internal is likely > > the reason the checkpoint connector doesn't replicate consumer offsets: > > DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped > (offset > > sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore) > > > > I wrote a small program to produce these offset syncs for the prefixed > > topic, and this successfully triggers the Checkpoint connector to start > > replicating the consumer offsets back to the primary cluster. > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28, > > downstreamOffset=28} > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29, > > downstreamOffset=29} > > OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29, > > downstreamOffset=29} <-- the artificially generated offset-sync > > > > At this point it goes a bit beyond my understanding of the MM2 internals > of > > whether this is a wise thing to do and if it would have any negative side > > effects. I'd need to spend some more time in the MM2 source, though I > > welcome any feedback on this hack :-) > > > > On the two complications you're mentioning Greg, the second one is > > something we should figure out regardless, as any given consumer group > may > > not be active on both the primary and secondary cluster as it would block > > MM2 from replicating its offsets from primary to the cluster-prefixed > topic > > on the secondary cluster already. On the first point, I think it would > be a > > good practice to only allow MM2 to produce to any cluster-prefixed topic > by > > using topic ACLs. In other words, the only application producing to a > > cluster-prefixed (or downstream) topic would be mirror maker and I think > > that prevents this kind of message 'drift'. In case a producer has to > > failover, it starts producing to the non-prefixed topic on the secondary > > cluster whose offsets are subject to a different Source/Checkpoint > > connector replication stream. > > > > On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > > > > > Jeroen, MirrorClient will correctly translate offsets for both > failover and > > > failback, exactly as you describe. It's possible to automate failover > and > > > failback using that logic. The integration tests automatically fail > over > > > and fail back, for example. I've seen it done two ways: during startup > > > within the consumer itself, or in an external tool which writes offsets > > > directly. In either case MirrorClient will give you the correct > offsets to > > > resume from. > > > > > > MirrorCheckpointConnector will automatically write offsets, but only > under > > > certain conditions, to avoid accidentally overwriting offsets. I'm not > sure > > > whether you can failover and failback using just the automatic > behavior. My > > > guess is it works, but you are tripping over one of the safety checks. > You > > > might try deleting the consumer group on the source cluster prior to > > > failback. > > > > > > Ryanne > > > > > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup > <jer...@cloudflare.com.invalid > > > > > > > wrote: > > > > > > > Hi all, > > > > I'm exploring using the MirrorSourceConnector and > > > MirrorCheckpointConnector > > > > on Kafka Connect to setup active/active replication between two Kafka > > > > clusters. Using the DefaultReplicationPolicy replication policy > class, > > > > messages originating from the source cluster get replicated as > expected > > > to > > > > the cluster-prefixed topic in the target cluster. Consumergroup > offsets > > > > from the source to target cluster are replicated likewise. However, > once > > > > the consumer group migrates from the source to the target cluster, > its > > > > offsets are not replicated from the target back to the source > cluster. > > > > For an active/active setup I'd want consumer group offsets for topic > > > > <source-cluster-alias>.<topic-name> in the target cluster to be > > > replicated > > > > back to <topic-name> in the source cluster. This would allow > consumers to > > > > failover & failback between clusters with minimal duplicate message > > > > consumption. > > > > > > > > To clarify my setup a bit; I'm running two single-broker Kafka > clusters > > > in > > > > Docker (cluster A & B), along with a single Connect instance on which > > > I've > > > > provisioned four source connectors: > > > > - A MirrorSourceConnector replicating topics from cluster A to > cluster B > > > > - A MirrorSourceConnector replicating topics from cluster B to > cluster A > > > > - A MirrorCheckpointConnector translating & replicating offsets from > > > > cluster A to cluster B > > > > - A MirrorCheckpointConnector translating & replicating offsets from > > > > cluster B to cluster A > > > > > > > > I'm not sure whether this is by design, or maybe I'm missing > something. > > > > I've seen a similar question posted to KAFKA-9076 [1] without a > > > resolution. > > > > > > > > Regards, > > > > Jeroen > > > > > > > > [1] > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908 > > > > > > > >