Hey Greg,
There are no offset collisions as the offset-syncs albeit being stored on
the same cluster, offsets from A->B are stored
in mm2-offset-syncs.b.internal whereas offsets from B->A are stored
in mm2-offset-syncs.a.internal.
What's curious though is the B->A checkpoint connector (which has
offset-syncs.topic.location: target) actually uses the offsets stored in
mm2-offset-syncs.b.internal (which contains the downstream offsets) while I
expected it to only read offsets stored in mm2-offset-syncs.a.internal, as
cluster A is its target.

I'm positive on driving a KIP for this feature to see whether we can get it
implemented. I'm hoping to submit a draft in the upcoming weeks, though I'd
need a bit of time to get a better grasp on the mirror connector codebase.

Thank you both for your valuable insights so far!

Jeroen

On Thu, Jan 11, 2024 at 8:06 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hey Jeroen,
>
> Thanks for sharing your prototype! It is very interesting!
>
> > I couldn't reproduce your hypothesis.
>
> I think my hypothesis was for another setup which didn't involve code
> changes, and instead relied on A->B->A round trip replication to
> produce the "backwards" offset syncs.
> I believe this would replicate data from "replicate-me-0" to
> "b.a.replicate-me-0", and then possibly take the offsets intended for
> "b.a.replicate-me-0" and apply them to "replicate-me-0" creating the
> infinite cycle.
> I would not expect your implementation to suffer from this failure
> mode, because it's using the offset in "replicate-me-0" as the
> downstream offset, not the offset of "b.a.replicate-me-0".
>
> With your prototype, do you experience "collisions" in the
> offset-syncs topic? Since you're sharing a single offset-syncs topic
> between both replication flows, I would expect offsets for topics with
> the same names on both clusters to conflict, and cause the translation
> to happen using the opposite topic's offsets.
> It would also be visible in the state of the OffsetSyncStore here:
> [1], you can compare the normal A->B behavior before and after
> starting the B -> A source connector to see if the concurrent flows
> causes more syncs to be cleared, or the wrong syncs to be present.
>
> I think it is normal for every MM2 connector to have the same
> offset-syncs.topic.location to avoid these sorts of conflicts, so that
> each syncs topic is only used by one of the MM2 replication flows.
> I think that turning on bidirectional offset syncs will probably
> require a second producer in the MirrorSourceTask to contact the
> opposite cluster, or a second admin client in the
> MirrorCheckpointTask.
>
> > Do you think it'd be worthwhile proceeding with this?
>
> This is certainly a capability that MM2 is missing right now, and
> seems like it would be a natural component of failing consumers back
> and forth. If you see value in it, and are interested in driving the
> feature, you can open a KIP [2] to discuss the interface and design
> with the rest of the community.
>
> Thanks!
> Greg
>
> [1]
> https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L194
> [2]
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> On Thu, Jan 11, 2024 at 9:27 AM Jeroen Schutrup
> <jer...@cloudflare.com.invalid> wrote:
> >
> > I see, makes complete sense to me. I built a custom version [1] based off
> > of Kafka 3.5.1 with bidirectional offset replication enabled so I could
> do
> > some more testing. Offset translation back upstream works well; I think
> > because of the reason Ryanne pointed out, both topics contain identical
> > data. Tested this by truncating the upstream topic before starting
> > replication (so the downstream/upstream topics have different offsets).
> > Truncating the upstream topic while replication is running neither
> results
> > in any weirdness.
> >
> > > Before starting the replication, insert a few records into
> > `a.replicate-me` to force replicate-me-0's offset n to replicate to
> > a.replicate-me-0's offset n+k.
> > I couldn't reproduce your hypothesis. After doing the above and then
> > starting replication I didn't see any offset replication loops. Once I
> > started producing data into the upstream topic and subscribing a
> > console-consumer on the downstream topic, offsets were translated and
> > replicated correctly back upstream. My guess is the CheckpointConnector
> can
> > offset these surplus of messages as the actual log offsets of the
> > downstream topic are written to the offset-sync topic.
> >
> > As this kind of active/active replication would be very beneficial to us
> > for reasons stated in my previous message, we'd love to help out building
> > this kind of offset replication into the Mirror connectors. I understand
> > this is not something that should be enabled by default, but having it
> > behind configuration toggle could help out users desiring a similar kind
> of
> > active/active setup and who understand the restrictions. Do you think
> it'd
> > be worthwhile proceeding with this?
> >
> > [1]
> >
> https://github.com/jeroen92/kafka/commit/1a27696ec6777c230f100cf9887368c431ebe0f8
> >
> > On Thu, Jan 11, 2024 at 1:06 AM Greg Harris <greg.har...@aiven.io.invalid
> >
> > wrote:
> >
> > > Hi Jeroen,
> > >
> > > I'm glad you're experimenting with MM2, and I hope we can give you
> > > some more context to explain what you're seeing.
> > >
> > > > I wrote a small program to produce these offset syncs for the
> prefixed
> > > > topic, and this successfully triggers the Checkpoint connector to
> start
> > > > replicating the consumer offsets back to the primary cluster.
> > >
> > > This is interesting, and I wouldn't have expected it to work.
> > >
> > > To rewind, each flow Source->Target has a MirrorSourceConnector, an
> > > Offset Syncs Topic, and a MirrorCheckpointConnector. With both
> > > directions enabled, there are two separate flows each with Source,
> > > Syncs topic, and Checkpoint.
> > > With offset-syncs.topic.location=source, the
> > > mm2-offset-syncs.b.internal on the A cluster is used for the A -> B
> > > replication flow. It contains topic names from cluster A, and the
> > > corresponding offsets those records were written to on the B cluster.
> > > When translation is performed, the consumer groups from A are
> > > replicated to the B cluster, and the replication mapping (cluster
> > > prefix) is added.
> > > Using your syncs topic as an example,
> > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > > downstreamOffset=28} will be used to write offsets for
> > > "a.replicate-me-0" for the equivalent group on the B cluster.
> > >
> > > When your artificial sync OffsetSync{topicPartition=a.replicate-me-0,
> > > upstreamOffset=29, downstreamOffset=29} is processed, it should be
> > > used to write offsets for "a.a.replicate-me-0" but it actually writes
> > > offsets to "replicate-me-0" due to this function that I've never
> > > encountered before: [1].
> > > I think you could get those sorts of syncs into the syncs-topic if you
> > > had A->B configured with offset-syncs.topic.location=source, and B->A
> > > with offset-syncs-topic.location=target, and configured the topic
> > > filter to do A -> B -> A round trip replication.
> > >
> > > This appears to work as expected if there are no failures or restarts,
> > > but as soon as a record is re-delivered in either flow, I think the
> > > offsets should end up constantly advancing in an infinite loop. Maybe
> > > you can try that: Before starting the replication, insert a few
> > > records into `a.replicate-me` to force replicate-me-0's offset n to
> > > replicate to a.replicate-me-0's offset n+k.
> > >
> > > Ryanne, do you recall the purpose of the renameTopicPartition
> > > function? To me it looks like it could only be harmful, as it renames
> > > checkpoints to target topics that MirrorMaker2 isn't writing. It also
> > > looks like it isn't active in a typical MM2 setup.
> > >
> > > Thanks!
> > > Greg
> > >
> > > [1]:
> > >
> https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267
> > >
> > > On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup
> > > <jer...@cloudflare.com.invalid> wrote:
> > > >
> > > > Thank you both for your swift responses!
> > > >
> > > > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset
> > > > replication in cases where the producer migrated to the secondary
> cluster
> > > > as well, starts feeding messages into the non-prefixed topic which
> are
> > > > subsequently consumed by the consumer on the secondary cluster.
> After the
> > > > fallback, it asserts the consumer offsets on the non-prefixed topic
> in
> > > the
> > > > secondary cluster are translated and replicated to the consumer
> offsets
> > > of
> > > > the prefixed topic in the primary cluster.
> > > > In my example, the producer keeps producing in the primary cluster
> > > whereas
> > > > only the consumer fails over to the secondary cluster and, after some
> > > time
> > > > fails back to the primary cluster. This consumer will then consume
> > > messages
> > > > from the prefixed topic in the secondary cluster, and I'd like to
> have
> > > > those offsets replicated back to the non-prefixed topic in the
> primary
> > > > cluster. If you like I can provide an illustration if that helps to
> > > clarify
> > > > this use case.
> > > >
> > > > To add some context on why I'd like to have this is to retain loose
> > > > coupling between producers and consumers so we're able to test
> failovers
> > > > for individual applications without the need for all
> producers/consumers
> > > to
> > > > failover and failback at once.
> > > >
> > > > Digging through the Connect debug logs I found the offset-syncs of
> the
> > > > prefixed topic not being pushed to mm2-offset-syncs.b.internal is
> likely
> > > > the reason the checkpoint connector doesn't replicate consumer
> offsets:
> > > > DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped
> > > (offset
> > > > sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore)
> > > >
> > > > I wrote a small program to produce these offset syncs for the
> prefixed
> > > > topic, and this successfully triggers the Checkpoint connector to
> start
> > > > replicating the consumer offsets back to the primary cluster.
> > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > > > downstreamOffset=28}
> > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29,
> > > > downstreamOffset=29}
> > > > OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29,
> > > > downstreamOffset=29} <-- the artificially generated offset-sync
> > > >
> > > > At this point it goes a bit beyond my understanding of the MM2
> internals
> > > of
> > > > whether this is a wise thing to do and if it would have any negative
> side
> > > > effects. I'd need to spend some more time in the MM2 source, though I
> > > > welcome any feedback on this hack :-)
> > > >
> > > > On the two complications you're mentioning Greg, the second one is
> > > > something we should figure out regardless, as any given consumer
> group
> > > may
> > > > not be active on both the primary and secondary cluster as it would
> block
> > > > MM2 from replicating its offsets from primary to the cluster-prefixed
> > > topic
> > > > on the secondary cluster already. On the first point, I think it
> would
> > > be a
> > > > good practice to only allow MM2 to produce to any cluster-prefixed
> topic
> > > by
> > > > using topic ACLs. In other words, the only application producing to a
> > > > cluster-prefixed (or downstream) topic would be mirror maker and I
> think
> > > > that prevents this kind of message 'drift'. In case a producer has to
> > > > failover, it starts producing to the non-prefixed topic on the
> secondary
> > > > cluster whose offsets are subject to a different Source/Checkpoint
> > > > connector replication stream.
> > > >
> > > > On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > > >
> > > > > Jeroen, MirrorClient will correctly translate offsets for both
> > > failover and
> > > > > failback, exactly as you describe. It's possible to automate
> failover
> > > and
> > > > > failback using that logic. The integration tests automatically fail
> > > over
> > > > > and fail back, for example. I've seen it done two ways: during
> startup
> > > > > within the consumer itself, or in an external tool which writes
> offsets
> > > > > directly. In either case MirrorClient will give you the correct
> > > offsets to
> > > > > resume from.
> > > > >
> > > > > MirrorCheckpointConnector will automatically write offsets, but
> only
> > > under
> > > > > certain conditions, to avoid accidentally overwriting offsets. I'm
> not
> > > sure
> > > > > whether you can failover and failback using just the automatic
> > > behavior. My
> > > > > guess is it works, but you are tripping over one of the safety
> checks.
> > > You
> > > > > might try deleting the consumer group on the source cluster prior
> to
> > > > > failback.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup
> > > <jer...@cloudflare.com.invalid
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > > I'm exploring using the MirrorSourceConnector and
> > > > > MirrorCheckpointConnector
> > > > > > on Kafka Connect to setup active/active replication between two
> Kafka
> > > > > > clusters. Using the DefaultReplicationPolicy replication policy
> > > class,
> > > > > > messages originating from the source cluster get replicated as
> > > expected
> > > > > to
> > > > > > the cluster-prefixed topic in the target cluster. Consumergroup
> > > offsets
> > > > > > from the source to target cluster are replicated likewise.
> However,
> > > once
> > > > > > the consumer group migrates from the source to the target
> cluster,
> > > its
> > > > > > offsets are not replicated from the target back to the source
> > > cluster.
> > > > > > For an active/active setup I'd want consumer group offsets for
> topic
> > > > > > <source-cluster-alias>.<topic-name> in the target cluster to be
> > > > > replicated
> > > > > > back to <topic-name> in the source cluster. This would allow
> > > consumers to
> > > > > > failover & failback between clusters with minimal duplicate
> message
> > > > > > consumption.
> > > > > >
> > > > > > To clarify my setup a bit; I'm running two single-broker Kafka
> > > clusters
> > > > > in
> > > > > > Docker (cluster A & B), along with a single Connect instance on
> which
> > > > > I've
> > > > > > provisioned four source connectors:
> > > > > > - A MirrorSourceConnector replicating topics from cluster A to
> > > cluster B
> > > > > > - A MirrorSourceConnector replicating topics from cluster B to
> > > cluster A
> > > > > > - A MirrorCheckpointConnector translating & replicating offsets
> from
> > > > > > cluster A to cluster B
> > > > > > - A MirrorCheckpointConnector translating & replicating offsets
> from
> > > > > > cluster B to cluster A
> > > > > >
> > > > > > I'm not sure whether this is by design, or maybe I'm missing
> > > something.
> > > > > > I've seen a similar question posted to KAFKA-9076 [1] without a
> > > > > resolution.
> > > > > >
> > > > > > Regards,
> > > > > > Jeroen
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > >
> https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908
> > > > > >
> > > > >
> > >
>

Reply via email to