Hi Vidor,

KV2 and KV3. I think it is a common setup, but it is usually described as
separating the produce and the consume workload. Instead of "prod" and
"backup" think of "produce cluster" and "consume cluster". Currently, even
if a group does not exist in the target cluster, both checkpointing and
group offset syncing still happens. As for the setup you mentioned, if the
topic replication is unidirectional, there will be no checkpointing
happening inside B->A. With the current proposal, reverse checkpointing
could still occur if enabled. My issue with this example is that it sounds
like the topic replication is unidirectional - if that is the case,
checkpointing in general doesn't make sense in B->A, the groups filter will
only apply to reverse checkpointing, and have no effect on checkpointing
(as there are no topics replicated in B->A). I'm not strongly against
adding a separate reverse groups filter, but I don't really see the
use-case where it would make sense.

Thanks,
Daniel

Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. nov. 15.,
P, 10:36):

> Hi Daniel,
>
> KV1. Thanks for the rundown, good to know that the impact is not
> concerning, I agree with the optimization not worth the effort
>
> KV2 and KV3. Here’s the setup I was thinking about. Suppose we have a
> topic on a prod cluster A that is replicated to a backup cluster B. There
> is a CG that is working through the messages on the backup cluster, before
> it’s promoted to the prod cluster. In this case that CG does not exist on
> cluster A, and it won’t be checkpointed (obviously), but it’s not clear to
> me if it will be reverse checkpointed.
> I’m not certain if the above setup is an actual real-world use case, but
> if it is, we need to make sure that CGs can get reverse checkpointed even
> if they initially don’t exist on the cluster. (it’s not a traditional
> failover + failback scenario). This is why I was thinking that a reverse
> checkpointing group filter could be useful, but I agree that the same can
> be achieved with the existing filter.
>
> Best,
> Vidor
>
>
> From: Dániel Urbán <urb.dani...@gmail.com>
> Date: Friday, 15 November 2024 at 09:35
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-1098: Reverse Checkpointing in MirrorMaker
> Hi Viktor,
>
> SVV3. In the current proposal, if the TopicFilter is not provided, it
> enables a different logic for reverse checkpointing - the task relies on
> the ReplicationPolicy to detect if a topic in the source cluster is a
> replica originating from the target cluster, e.g. in the A->B flow, the
> topic "B.T" in cluster A originates from B, and thus can be reverse
> checkpointed. This should work fine as long as the ReplicationPolicy can
> reliably tell the source cluster of a topic. For other ReplicationPolicy
> implementations (e.g. Identity), we cannot rely on the policy, and in that
> case, the TopicFilter takes over. If we have a default TopicFIlter, then we
> need another flag to tell the task if it should rely on the policy or the
> filter for identifying the reverse checkpointable topics. That seems
> redundant, I think a null filter is cleaner, and also means fewer configs.
> Or do we have a generic preference in null vs default+enable flag in terms
> of configs?
>
> SVV4. I think the current TopicFilter is a minimal interface, which works
> well for choosing topics. Adding more methods would bloat the interface,
> and require new configs for the existing implementations. It would also be
> confusing in some cases to figure out if we need to configure the new
> properties - e.g. in MirrorSourceConnector, the new
> "reverse.checkpoint.topics" wouldn't make sense, but could still be
> configurable. In short, I think it introduces a few fuzzy situations, and
> would rather just reuse the existing implementations with prefixed config
> overrides.
>
> Thanks,
> Daniel
>
> Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt írta
> (időpont: 2024. nov. 14., Cs, 16:56):
>
> > Hi Daniel,
> >
> > SVV3. Kind of an implementation detail. So I think using TopicFilter is
> > good, however I was wondering if we should provide a default
> implementation
> > instead of null? We have to implement the pass-through behavior anyways,
> > and it makes sense to me to do it in a filter.
> > SVV4. Also, an alternative to the previous one, instead of introducing
> > another topic filter, we could extend the current TopicFilter with a
> > shouldReverseCheckpointTopic() method with reverse.checkpoint.topics and
> > reverse.checkpoints.topics.exclude configs in the DefaultTopicFilter
> where
> > we configure it for pass-through. We wouldn't really have fewer configs
> > with this option, but we'd use an existing filter that has similar
> > functionality (filtering topics) and users could be more familiar with
> it.
> >
> > What do you think?
> >
> > Viktor
> >
> > On Thu, Nov 7, 2024 at 9:21 AM Dániel Urbán <urb.dani...@gmail.com>
> wrote:
> >
> > > Gentle bump - any comments are welcome.
> > > This could fill an important gap in MM2, and would be nice to fix.
> > > TIA
> > > Daniel
> > >
> > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov. 4.,
> > H,
> > > 11:00):
> > >
> > > > Hi Vidor,
> > > >
> > > > Thank you for your comments!
> > > >
> > > > 1. I think the optimization sounds nice, but would not work well with
> > > > TopicFilter implementations which can be dynamically updated in the
> > > > background (without touching the Connector configuration). If we
> > actually
> > > > dropped the offset sync records for a topic when the task is started,
> > > then
> > > > the topic is added to the filter (without triggering a task restart),
> > > then
> > > > we would not have the history of offset syncs for the "new" topic.
> I'm
> > > not
> > > > saying that this makes the optimization you are suggesting
> impossible,
> > > but
> > > > it's a larger chunk of work - we would need to start monitoring the
> > > topics
> > > > in MirrorCheckpointConnector the same way we do in
> > MirrorSourceConnector,
> > > > and send config update requests to the Connect framework on topic
> > > changes.
> > > > Just to clarify the "memory intensive" nature of the offset sync
> > store, a
> > > > full offset sync history of a single partition takes less than 2 KBs
> (1
> > > > offset sync contains the topic partition, and 2 longs, and the
> history
> > > has
> > > > a max size of 64). We need to multiply this by the number of
> replicated
> > > > partitions and the number of checkpoint tasks to get the full memory
> > > usage
> > > > increase when the feature is enabled. Even with thousands of
> partitions
> > > and
> > > > hundreds of tasks, this will be below 1 GB, which is distributed
> across
> > > the
> > > > Connect worker nodes. So I don't think that this would be an alarming
> > > > increase in memory usage, and the optimization is not worth it with
> the
> > > > extra complexity of the dynamic config updates.
> > > > 2. I don't really see the use-case for the reverse checkpointing
> group
> > > > filter. If a group is already checkpointed in the opposite flow, it
> > > > suggests that the intent is to be able to fail over. Why wouldn't the
> > > user
> > > > want to also perform a failback for that same group?
> > > >
> > > > Not sure what you mean by replication being bidirectional, but topic
> > > > replication not being bi-directional. With DefaultReplicationPolicy,
> we
> > > > usually have the same topic on both clusters to be used by producers
> -
> > > the
> > > > name is the same, but those are logically 2 different topics. With
> > > > IdentityReplicationPolicy, we require users to avoid loops (i.e.
> > > > replicating the same messages back and forth infinitely).
> > > >
> > > > As for being more flexible with failovers and failbacks, yes, I
> agree,
> > > the
> > > > fact the re-processing is minimized, it might enable more use-cases,
> or
> > > > allow more frequent failovers and failbacks. I'd say that in most
> > > > use-cases, users will still want to avoid doing this procedure
> > > frequently,
> > > > since it requires a client restart/reconfiguration, which is not
> > > risk-free.
> > > >
> > > > Thanks,
> > > > Daniel
> > > >
> > > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024.
> okt.
> > > > 30., Sze, 22:21):
> > > >
> > > >> Hi Daniel,
> > > >>
> > > >>
> > > >>
> > > >> This would indeed greatly reduce the duplicate processing on
> > failbacks.
> > > >>
> > > >>
> > > >>
> > > >> Few questions:
> > > >>
> > > >>    1. Since having a second offset-sync store can be memory
> intensive,
> > > >>    would it make sense to filter the topics in it based on the
> > > >>    reverseCheckpointingTopicFilter?
> > > >>    2. Would it make sense to add a reverseCheckpointingGroupFilter
> as
> > > >> well,
> > > >>    so that one can control not just the topics for reverse
> > checkpointing
> > > >> but
> > > >>    also the groups?
> > > >>
> > > >>
> > > >>
> > > >> Do I understand this correctly, that the replication flow itself
> must
> > be
> > > >> bidirectional, but the topic replication doesn’t? If so, this seems
> to
> > > >> unlock another use case. With this change, one can more confidently
> > fail
> > > >> over the consumer group to the passive cluster and back (in the
> > context
> > > of
> > > >> the topic itself), without much reprocessing; I see this useful
> when a
> > > >> cluster gets busy at times. Or even have a new consumer group
> consume
> > > >> messages from the passive cluster for a while, before “failing it
> > over”
> > > to
> > > >> the active cluster. Is this something that you would recommend using
> > the
> > > >> feature for?
> > > >>
> > > >>
> > > >>
> > > >> Best,
> > > >>
> > > >> Vidor
> > > >>
> > > >> On Mon, Oct 28, 2024 at 7:25 PM Dániel Urbán <urb.dani...@gmail.com
> >
> > > >> wrote:
> > > >>
> > > >> > Hi Viktor,
> > > >> >
> > > >> > SVV1. Not easy to provide a number, but yes, it does scale with
> the
> > > >> number
> > > >> > of replicated topic partitions. Enabling this feature will add the
> > > >> overhead
> > > >> > of an extra consumer, and allocates memory for an offset-sync
> index
> > > for
> > > >> > each partition. The index is limited to 64 entries. I could give
> an
> > > >> upper
> > > >> > bound of the memory usage as a function of the number of
> replicated
> > > >> > topic-partitions, but not sure if it would be useful for users,
> and
> > to
> > > >> > where exactly document this. Wdyt?
> > > >> >
> > > >> > No worries, thanks for looking at the KIP!
> > > >> > Daniel
> > > >> >
> > > >> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt
> írta
> > > >> > (időpont: 2024. okt. 28., H, 17:07):
> > > >> >
> > > >> > > Hi Daniel,
> > > >> > >
> > > >> > > SVV1. Fair points about the performance impact. The next
> question
> > is
> > > >> that
> > > >> > > can we quantify it somehow, ie. does it scale with the number of
> > > >> topics
> > > >> > to
> > > >> > > reverse checkpoints, the offsets emitted, etc.?
> > > >> > >
> > > >> > > I'll do one more pass on the KIP in the following days but I
> > wanted
> > > to
> > > >> > > reply to you with what I have so far to keep this going.
> > > >> > >
> > > >> > > Best,
> > > >> > > Viktor
> > > >> > >
> > > >> > > On Fri, Oct 25, 2024 at 5:32 PM Dániel Urbán <
> > urb.dani...@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi,
> > > >> > > >
> > > >> > > > One more update. As I was working on the PR, I realized that
> the
> > > >> only
> > > >> > way
> > > >> > > > to support IdentityReplicationPolicy is to add an extra topic
> > > >> filter to
> > > >> > > the
> > > >> > > > checkpointing. I updated the KIP accordingly.
> > > >> > > > I also opened a draft PR to demonstrate the proposed changes:
> > > >> > > > https://github.com/apache/kafka/pull/17593
> > > >> > > >
> > > >> > > > Daniel
> > > >> > > >
> > > >> > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024.
> > > okt.
> > > >> > 24.,
> > > >> > > > Cs,
> > > >> > > > 15:22):
> > > >> > > >
> > > >> > > > > Hi all,
> > > >> > > > > Just a bump/minor update here:
> > > >> > > > > As I've started working on a POC of the proposed solution,
> > I've
> > > >> > > realised
> > > >> > > > > that the hard requirement related to the ReplicationPolicy
> > > >> > > implementation
> > > >> > > > > can be eliminated, updated the KIP accordingly.
> > > >> > > > > Daniel
> > > >> > > > >
> > > >> > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont:
> 2024.
> > > >> okt.
> > > >> > > 21.,
> > > >> > > > > H, 16:18):
> > > >> > > > >
> > > >> > > > >> Hi Mickael,
> > > >> > > > >> Good point, I renamed the KIP and this thread:
> > > >> > > > >>
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing+in+MirrorMaker
> > > >> > > > >> Thank you,
> > > >> > > > >> Daniel
> > > >> > > > >>
> > > >> > > > >> Mickael Maison <mickael.mai...@gmail.com> ezt írta
> (időpont:
> > > >> 2024.
> > > >> > > okt.
> > > >> > > > >> 21., H, 15:22):
> > > >> > > > >>
> > > >> > > > >>> Hi Daniel,
> > > >> > > > >>>
> > > >> > > > >>> I've not had time to take a close look at the KIP but my
> > > initial
> > > >> > > > >>> feedback would be to adjust the name to make it clear it's
> > > about
> > > >> > > > >>> MirrorMaker.
> > > >> > > > >>> The word "checkpoint" has several meanings in Kafka and
> from
> > > the
> > > >> > > > >>> current KIP name it's not clear if it's about KRaft,
> Streams
> > > or
> > > >> > > > >>> Connect.
> > > >> > > > >>>
> > > >> > > > >>> Thanks,
> > > >> > > > >>> Mickael
> > > >> > > > >>>
> > > >> > > > >>> On Mon, Oct 21, 2024 at 2:55 PM Dániel Urbán <
> > > >> > urb.dani...@gmail.com>
> > > >> > > > >>> wrote:
> > > >> > > > >>> >
> > > >> > > > >>> > Hi Viktor,
> > > >> > > > >>> >
> > > >> > > > >>> > Thank you for the comments!
> > > >> > > > >>> >
> > > >> > > > >>> > SVV1: I think the feature has some performance
> > implications.
> > > >> If
> > > >> > the
> > > >> > > > >>> reverse
> > > >> > > > >>> > checkpointing is enabled, task startup will be possibly
> > > >> slower,
> > > >> > > since
> > > >> > > > >>> it
> > > >> > > > >>> > will need to consume from a second offset-syncs topic;
> and
> > > it
> > > >> > will
> > > >> > > > >>> also use
> > > >> > > > >>> > more memory, to keep the second offset-sync history.
> > > >> > Additionally,
> > > >> > > it
> > > >> > > > >>> is
> > > >> > > > >>> > also possible to have an offset-syncs topic present
> > without
> > > an
> > > >> > > > actual,
> > > >> > > > >>> > opposite flow being active - I think only users can tell
> > if
> > > >> the
> > > >> > > > reverse
> > > >> > > > >>> > checkpointing should be active, and they should be the
> one
> > > >> opting
> > > >> > > in
> > > >> > > > >>> for
> > > >> > > > >>> > the higher resource usage.
> > > >> > > > >>> >
> > > >> > > > >>> > SVV2: I mention the DefaultReplicationPolicy to provide
> > > >> > examples. I
> > > >> > > > >>> don't
> > > >> > > > >>> > think it is required. The actual requirement related to
> > the
> > > >> > > > >>> > ReplicationPolicy is that the policy should be able to
> > > >> correctly
> > > >> > > tell
> > > >> > > > >>> which
> > > >> > > > >>> > topic was replicated from which cluster. Because of
> this,
> > > >> > > > >>> > IdentityReplicationPolicy would not work, but
> > > >> > > > >>> DefaultReplicationPolicy, or
> > > >> > > > >>> > any other ReplicationPolicy implementations with a
> > correctly
> > > >> > > > >>> implemented
> > > >> > > > >>> > "topicSource" method should work. I will make an
> explicit
> > > >> note of
> > > >> > > > this
> > > >> > > > >>> in
> > > >> > > > >>> > the KIP.
> > > >> > > > >>> >
> > > >> > > > >>> > Thank you,
> > > >> > > > >>> > Daniel
> > > >> > > > >>> >
> > > >> > > > >>> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com
> .invalid>
> > > ezt
> > > >> > írta
> > > >> > > > >>> > (időpont: 2024. okt. 18., Pén 17:28):
> > > >> > > > >>> >
> > > >> > > > >>> > > Hey Dan,
> > > >> > > > >>> > >
> > > >> > > > >>> > > I think this is a very useful idea. Two questions:
> > > >> > > > >>> > >
> > > >> > > > >>> > > SVV1: Do you think we need the feature flag at all? I
> > know
> > > >> that
> > > >> > > not
> > > >> > > > >>> having
> > > >> > > > >>> > > this flag may technically render the KIP unnecessary
> > > >> (however
> > > >> > it
> > > >> > > > may
> > > >> > > > >>> still
> > > >> > > > >>> > > be useful to discuss this topic and create a
> concensus).
> > > As
> > > >> you
> > > >> > > > >>> wrote in
> > > >> > > > >>> > > the KIP, we may be able to look up the target and
> source
> > > >> topics
> > > >> > > and
> > > >> > > > >>> if we
> > > >> > > > >>> > > can do this, we can probably detect if the replication
> > is
> > > >> > one-way
> > > >> > > > or
> > > >> > > > >>> > > prefixless (identity). So that means we don't need
> this
> > > >> flag to
> > > >> > > > >>> control
> > > >> > > > >>> > > when we want to use this. Then it is really just there
> > to
> > > >> act
> > > >> > as
> > > >> > > > >>> something
> > > >> > > > >>> > > that can turn the feature on and off if needed, but
> I'm
> > > not
> > > >> > > really
> > > >> > > > >>> sure if
> > > >> > > > >>> > > there is a great risk in just enabling this by
> default.
> > If
> > > >> we
> > > >> > > > really
> > > >> > > > >>> just
> > > >> > > > >>> > > turn back the B -> A checkpoints and save them in the
> A
> > ->
> > > >> B,
> > > >> > > then
> > > >> > > > >>> maybe
> > > >> > > > >>> > > it's not too risky and users would get this
> immediately
> > by
> > > >> just
> > > >> > > > >>> upgrading.
> > > >> > > > >>> > >
> > > >> > > > >>> > > SVV2: You write that we need DefaultReplicationPolicy
> to
> > > use
> > > >> > this
> > > >> > > > >>> feature,
> > > >> > > > >>> > > but most of the functionality is there on interface
> > level
> > > in
> > > >> > > > >>> > > ReplicationPolicy. Is there anything that is missing
> > from
> > > >> there
> > > >> > > and
> > > >> > > > >>> if so,
> > > >> > > > >>> > > what do you think about pulling it into the interface?
> > If
> > > >> this
> > > >> > > > >>> improvement
> > > >> > > > >>> > > only works with the default replication policy, then
> > it's
> > > >> > > somewhat
> > > >> > > > >>> limiting
> > > >> > > > >>> > > as users may have their own policy for various
> reasons,
> > > but
> > > >> if
> > > >> > we
> > > >> > > > >>> can make
> > > >> > > > >>> > > it work on the interface level, then we could provide
> > this
> > > >> > > feature
> > > >> > > > to
> > > >> > > > >>> > > everyone. Of course there can be replication policies
> > like
> > > >> the
> > > >> > > > >>> identity one
> > > >> > > > >>> > > that by design disallows this feature, but for that,
> see
> > > my
> > > >> > > > previous
> > > >> > > > >>> point.
> > > >> > > > >>> > >
> > > >> > > > >>> > > Best,
> > > >> > > > >>> > > Viktor
> > > >> > > > >>> > >
> > > >> > > > >>> > > On Fri, Oct 18, 2024 at 3:30 PM Dániel Urbán <
> > > >> > > > urb.dani...@gmail.com>
> > > >> > > > >>> > > wrote:
> > > >> > > > >>> > >
> > > >> > > > >>> > > > Hi everyone,
> > > >> > > > >>> > > >
> > > >> > > > >>> > > > I'd like to start the discussion on KIP-1098:
> Reverse
> > > >> > > > >>> Checkpointing (
> > > >> > > > >>> > > >
> > > >> > > > >>> > > >
> > > >> > > > >>> > >
> > > >> > > > >>>
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing
> > > >> > > > >>> > > > )
> > > >> > > > >>> > > > which aims to minimize message reprocessing for
> > > consumers
> > > >> in
> > > >> > > > >>> failbacks.
> > > >> > > > >>> > > >
> > > >> > > > >>> > > > TIA,
> > > >> > > > >>> > > > Daniel
> > > >> > > > >>> > > >
> > > >> > > > >>> > >
> > > >> > > > >>>
> > > >> > > > >>
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to