> do you recall the purpose of [...] renameTopicPartition [?]

A's topic1 and B's a.topic1 should be the same data (minus replication
lag). You can't consume a record in a.topic1 that hasn't been replicated
yet -- a remote topic by definition does not have any records that MM2
didn't put there. So an offset for a consumer consuming from B's a.topic1
can be translated back to an offset in A's topic1, where the data came from.

Ryanne

On Wed, Jan 10, 2024, 6:07 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hi Jeroen,
>
> I'm glad you're experimenting with MM2, and I hope we can give you
> some more context to explain what you're seeing.
>
> > I wrote a small program to produce these offset syncs for the prefixed
> > topic, and this successfully triggers the Checkpoint connector to start
> > replicating the consumer offsets back to the primary cluster.
>
> This is interesting, and I wouldn't have expected it to work.
>
> To rewind, each flow Source->Target has a MirrorSourceConnector, an
> Offset Syncs Topic, and a MirrorCheckpointConnector. With both
> directions enabled, there are two separate flows each with Source,
> Syncs topic, and Checkpoint.
> With offset-syncs.topic.location=source, the
> mm2-offset-syncs.b.internal on the A cluster is used for the A -> B
> replication flow. It contains topic names from cluster A, and the
> corresponding offsets those records were written to on the B cluster.
> When translation is performed, the consumer groups from A are
> replicated to the B cluster, and the replication mapping (cluster
> prefix) is added.
> Using your syncs topic as an example,
> OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> downstreamOffset=28} will be used to write offsets for
> "a.replicate-me-0" for the equivalent group on the B cluster.
>
> When your artificial sync OffsetSync{topicPartition=a.replicate-me-0,
> upstreamOffset=29, downstreamOffset=29} is processed, it should be
> used to write offsets for "a.a.replicate-me-0" but it actually writes
> offsets to "replicate-me-0" due to this function that I've never
> encountered before: [1].
> I think you could get those sorts of syncs into the syncs-topic if you
> had A->B configured with offset-syncs.topic.location=source, and B->A
> with offset-syncs-topic.location=target, and configured the topic
> filter to do A -> B -> A round trip replication.
>
> This appears to work as expected if there are no failures or restarts,
> but as soon as a record is re-delivered in either flow, I think the
> offsets should end up constantly advancing in an infinite loop. Maybe
> you can try that: Before starting the replication, insert a few
> records into `a.replicate-me` to force replicate-me-0's offset n to
> replicate to a.replicate-me-0's offset n+k.
>
> Ryanne, do you recall the purpose of the renameTopicPartition
> function? To me it looks like it could only be harmful, as it renames
> checkpoints to target topics that MirrorMaker2 isn't writing. It also
> looks like it isn't active in a typical MM2 setup.
>
> Thanks!
> Greg
>
> [1]:
> https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267
>
> On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup
> <jer...@cloudflare.com.invalid> wrote:
> >
> > Thank you both for your swift responses!
> >
> > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset
> > replication in cases where the producer migrated to the secondary cluster
> > as well, starts feeding messages into the non-prefixed topic which are
> > subsequently consumed by the consumer on the secondary cluster. After the
> > fallback, it asserts the consumer offsets on the non-prefixed topic in
> the
> > secondary cluster are translated and replicated to the consumer offsets
> of
> > the prefixed topic in the primary cluster.
> > In my example, the producer keeps producing in the primary cluster
> whereas
> > only the consumer fails over to the secondary cluster and, after some
> time
> > fails back to the primary cluster. This consumer will then consume
> messages
> > from the prefixed topic in the secondary cluster, and I'd like to have
> > those offsets replicated back to the non-prefixed topic in the primary
> > cluster. If you like I can provide an illustration if that helps to
> clarify
> > this use case.
> >
> > To add some context on why I'd like to have this is to retain loose
> > coupling between producers and consumers so we're able to test failovers
> > for individual applications without the need for all producers/consumers
> to
> > failover and failback at once.
> >
> > Digging through the Connect debug logs I found the offset-syncs of the
> > prefixed topic not being pushed to mm2-offset-syncs.b.internal is likely
> > the reason the checkpoint connector doesn't replicate consumer offsets:
> > DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped
> (offset
> > sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore)
> >
> > I wrote a small program to produce these offset syncs for the prefixed
> > topic, and this successfully triggers the Checkpoint connector to start
> > replicating the consumer offsets back to the primary cluster.
> > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > downstreamOffset=28}
> > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29,
> > downstreamOffset=29}
> > OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29,
> > downstreamOffset=29} <-- the artificially generated offset-sync
> >
> > At this point it goes a bit beyond my understanding of the MM2 internals
> of
> > whether this is a wise thing to do and if it would have any negative side
> > effects. I'd need to spend some more time in the MM2 source, though I
> > welcome any feedback on this hack :-)
> >
> > On the two complications you're mentioning Greg, the second one is
> > something we should figure out regardless, as any given consumer group
> may
> > not be active on both the primary and secondary cluster as it would block
> > MM2 from replicating its offsets from primary to the cluster-prefixed
> topic
> > on the secondary cluster already. On the first point, I think it would
> be a
> > good practice to only allow MM2 to produce to any cluster-prefixed topic
> by
> > using topic ACLs. In other words, the only application producing to a
> > cluster-prefixed (or downstream) topic would be mirror maker and I think
> > that prevents this kind of message 'drift'. In case a producer has to
> > failover, it starts producing to the non-prefixed topic on the secondary
> > cluster whose offsets are subject to a different Source/Checkpoint
> > connector replication stream.
> >
> > On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
> >
> > > Jeroen, MirrorClient will correctly translate offsets for both
> failover and
> > > failback, exactly as you describe. It's possible to automate failover
> and
> > > failback using that logic. The integration tests automatically fail
> over
> > > and fail back, for example. I've seen it done two ways: during startup
> > > within the consumer itself, or in an external tool which writes offsets
> > > directly. In either case MirrorClient will give you the correct
> offsets to
> > > resume from.
> > >
> > > MirrorCheckpointConnector will automatically write offsets, but only
> under
> > > certain conditions, to avoid accidentally overwriting offsets. I'm not
> sure
> > > whether you can failover and failback using just the automatic
> behavior. My
> > > guess is it works, but you are tripping over one of the safety checks.
> You
> > > might try deleting the consumer group on the source cluster prior to
> > > failback.
> > >
> > > Ryanne
> > >
> > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup
> <jer...@cloudflare.com.invalid
> > > >
> > > wrote:
> > >
> > > > Hi all,
> > > > I'm exploring using the MirrorSourceConnector and
> > > MirrorCheckpointConnector
> > > > on Kafka Connect to setup active/active replication between two Kafka
> > > > clusters. Using the DefaultReplicationPolicy replication policy
> class,
> > > > messages originating from the source cluster get replicated as
> expected
> > > to
> > > > the cluster-prefixed topic in the target cluster. Consumergroup
> offsets
> > > > from the source to target cluster are replicated likewise. However,
> once
> > > > the consumer group migrates from the source to the target cluster,
> its
> > > > offsets are not replicated from the target back to the source
> cluster.
> > > > For an active/active setup I'd want consumer group offsets for topic
> > > > <source-cluster-alias>.<topic-name> in the target cluster to be
> > > replicated
> > > > back to <topic-name> in the source cluster. This would allow
> consumers to
> > > > failover & failback between clusters with minimal duplicate message
> > > > consumption.
> > > >
> > > > To clarify my setup a bit; I'm running two single-broker Kafka
> clusters
> > > in
> > > > Docker (cluster A & B), along with a single Connect instance on which
> > > I've
> > > > provisioned four source connectors:
> > > > - A MirrorSourceConnector replicating topics from cluster A to
> cluster B
> > > > - A MirrorSourceConnector replicating topics from cluster B to
> cluster A
> > > > - A MirrorCheckpointConnector translating & replicating offsets from
> > > > cluster A to cluster B
> > > > - A MirrorCheckpointConnector translating & replicating offsets from
> > > > cluster B to cluster A
> > > >
> > > > I'm not sure whether this is by design, or maybe I'm missing
> something.
> > > > I've seen a similar question posted to KAFKA-9076 [1] without a
> > > resolution.
> > > >
> > > > Regards,
> > > > Jeroen
> > > >
> > > > [1]
> > > >
> > > >
> > >
> https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908
> > > >
> > >
>

Reply via email to