Hi everyone,
Bumping this - I think this gap affects the lives of many MM2 users, would
be great to fill it. Any comments are welcome.
TIA
Daniel

Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov. 15., P,
13:05):

> Hi Vidor,
>
> KV2 and KV3. I think it is a common setup, but it is usually described as
> separating the produce and the consume workload. Instead of "prod" and
> "backup" think of "produce cluster" and "consume cluster". Currently, even
> if a group does not exist in the target cluster, both checkpointing and
> group offset syncing still happens. As for the setup you mentioned, if the
> topic replication is unidirectional, there will be no checkpointing
> happening inside B->A. With the current proposal, reverse checkpointing
> could still occur if enabled. My issue with this example is that it sounds
> like the topic replication is unidirectional - if that is the case,
> checkpointing in general doesn't make sense in B->A, the groups filter will
> only apply to reverse checkpointing, and have no effect on checkpointing
> (as there are no topics replicated in B->A). I'm not strongly against
> adding a separate reverse groups filter, but I don't really see the
> use-case where it would make sense.
>
> Thanks,
> Daniel
>
> Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. nov.
> 15., P, 10:36):
>
>> Hi Daniel,
>>
>> KV1. Thanks for the rundown, good to know that the impact is not
>> concerning, I agree with the optimization not worth the effort
>>
>> KV2 and KV3. Here’s the setup I was thinking about. Suppose we have a
>> topic on a prod cluster A that is replicated to a backup cluster B. There
>> is a CG that is working through the messages on the backup cluster, before
>> it’s promoted to the prod cluster. In this case that CG does not exist on
>> cluster A, and it won’t be checkpointed (obviously), but it’s not clear to
>> me if it will be reverse checkpointed.
>> I’m not certain if the above setup is an actual real-world use case, but
>> if it is, we need to make sure that CGs can get reverse checkpointed even
>> if they initially don’t exist on the cluster. (it’s not a traditional
>> failover + failback scenario). This is why I was thinking that a reverse
>> checkpointing group filter could be useful, but I agree that the same can
>> be achieved with the existing filter.
>>
>> Best,
>> Vidor
>>
>>
>> From: Dániel Urbán <urb.dani...@gmail.com>
>> Date: Friday, 15 November 2024 at 09:35
>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>> Subject: Re: [DISCUSS] KIP-1098: Reverse Checkpointing in MirrorMaker
>> Hi Viktor,
>>
>> SVV3. In the current proposal, if the TopicFilter is not provided, it
>> enables a different logic for reverse checkpointing - the task relies on
>> the ReplicationPolicy to detect if a topic in the source cluster is a
>> replica originating from the target cluster, e.g. in the A->B flow, the
>> topic "B.T" in cluster A originates from B, and thus can be reverse
>> checkpointed. This should work fine as long as the ReplicationPolicy can
>> reliably tell the source cluster of a topic. For other ReplicationPolicy
>> implementations (e.g. Identity), we cannot rely on the policy, and in that
>> case, the TopicFilter takes over. If we have a default TopicFIlter, then
>> we
>> need another flag to tell the task if it should rely on the policy or the
>> filter for identifying the reverse checkpointable topics. That seems
>> redundant, I think a null filter is cleaner, and also means fewer configs.
>> Or do we have a generic preference in null vs default+enable flag in terms
>> of configs?
>>
>> SVV4. I think the current TopicFilter is a minimal interface, which works
>> well for choosing topics. Adding more methods would bloat the interface,
>> and require new configs for the existing implementations. It would also be
>> confusing in some cases to figure out if we need to configure the new
>> properties - e.g. in MirrorSourceConnector, the new
>> "reverse.checkpoint.topics" wouldn't make sense, but could still be
>> configurable. In short, I think it introduces a few fuzzy situations, and
>> would rather just reuse the existing implementations with prefixed config
>> overrides.
>>
>> Thanks,
>> Daniel
>>
>> Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt írta
>> (időpont: 2024. nov. 14., Cs, 16:56):
>>
>> > Hi Daniel,
>> >
>> > SVV3. Kind of an implementation detail. So I think using TopicFilter is
>> > good, however I was wondering if we should provide a default
>> implementation
>> > instead of null? We have to implement the pass-through behavior anyways,
>> > and it makes sense to me to do it in a filter.
>> > SVV4. Also, an alternative to the previous one, instead of introducing
>> > another topic filter, we could extend the current TopicFilter with a
>> > shouldReverseCheckpointTopic() method with reverse.checkpoint.topics and
>> > reverse.checkpoints.topics.exclude configs in the DefaultTopicFilter
>> where
>> > we configure it for pass-through. We wouldn't really have fewer configs
>> > with this option, but we'd use an existing filter that has similar
>> > functionality (filtering topics) and users could be more familiar with
>> it.
>> >
>> > What do you think?
>> >
>> > Viktor
>> >
>> > On Thu, Nov 7, 2024 at 9:21 AM Dániel Urbán <urb.dani...@gmail.com>
>> wrote:
>> >
>> > > Gentle bump - any comments are welcome.
>> > > This could fill an important gap in MM2, and would be nice to fix.
>> > > TIA
>> > > Daniel
>> > >
>> > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov.
>> 4.,
>> > H,
>> > > 11:00):
>> > >
>> > > > Hi Vidor,
>> > > >
>> > > > Thank you for your comments!
>> > > >
>> > > > 1. I think the optimization sounds nice, but would not work well
>> with
>> > > > TopicFilter implementations which can be dynamically updated in the
>> > > > background (without touching the Connector configuration). If we
>> > actually
>> > > > dropped the offset sync records for a topic when the task is
>> started,
>> > > then
>> > > > the topic is added to the filter (without triggering a task
>> restart),
>> > > then
>> > > > we would not have the history of offset syncs for the "new" topic.
>> I'm
>> > > not
>> > > > saying that this makes the optimization you are suggesting
>> impossible,
>> > > but
>> > > > it's a larger chunk of work - we would need to start monitoring the
>> > > topics
>> > > > in MirrorCheckpointConnector the same way we do in
>> > MirrorSourceConnector,
>> > > > and send config update requests to the Connect framework on topic
>> > > changes.
>> > > > Just to clarify the "memory intensive" nature of the offset sync
>> > store, a
>> > > > full offset sync history of a single partition takes less than 2
>> KBs (1
>> > > > offset sync contains the topic partition, and 2 longs, and the
>> history
>> > > has
>> > > > a max size of 64). We need to multiply this by the number of
>> replicated
>> > > > partitions and the number of checkpoint tasks to get the full memory
>> > > usage
>> > > > increase when the feature is enabled. Even with thousands of
>> partitions
>> > > and
>> > > > hundreds of tasks, this will be below 1 GB, which is distributed
>> across
>> > > the
>> > > > Connect worker nodes. So I don't think that this would be an
>> alarming
>> > > > increase in memory usage, and the optimization is not worth it with
>> the
>> > > > extra complexity of the dynamic config updates.
>> > > > 2. I don't really see the use-case for the reverse checkpointing
>> group
>> > > > filter. If a group is already checkpointed in the opposite flow, it
>> > > > suggests that the intent is to be able to fail over. Why wouldn't
>> the
>> > > user
>> > > > want to also perform a failback for that same group?
>> > > >
>> > > > Not sure what you mean by replication being bidirectional, but topic
>> > > > replication not being bi-directional. With
>> DefaultReplicationPolicy, we
>> > > > usually have the same topic on both clusters to be used by
>> producers -
>> > > the
>> > > > name is the same, but those are logically 2 different topics. With
>> > > > IdentityReplicationPolicy, we require users to avoid loops (i.e.
>> > > > replicating the same messages back and forth infinitely).
>> > > >
>> > > > As for being more flexible with failovers and failbacks, yes, I
>> agree,
>> > > the
>> > > > fact the re-processing is minimized, it might enable more
>> use-cases, or
>> > > > allow more frequent failovers and failbacks. I'd say that in most
>> > > > use-cases, users will still want to avoid doing this procedure
>> > > frequently,
>> > > > since it requires a client restart/reconfiguration, which is not
>> > > risk-free.
>> > > >
>> > > > Thanks,
>> > > > Daniel
>> > > >
>> > > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024.
>> okt.
>> > > > 30., Sze, 22:21):
>> > > >
>> > > >> Hi Daniel,
>> > > >>
>> > > >>
>> > > >>
>> > > >> This would indeed greatly reduce the duplicate processing on
>> > failbacks.
>> > > >>
>> > > >>
>> > > >>
>> > > >> Few questions:
>> > > >>
>> > > >>    1. Since having a second offset-sync store can be memory
>> intensive,
>> > > >>    would it make sense to filter the topics in it based on the
>> > > >>    reverseCheckpointingTopicFilter?
>> > > >>    2. Would it make sense to add a reverseCheckpointingGroupFilter
>> as
>> > > >> well,
>> > > >>    so that one can control not just the topics for reverse
>> > checkpointing
>> > > >> but
>> > > >>    also the groups?
>> > > >>
>> > > >>
>> > > >>
>> > > >> Do I understand this correctly, that the replication flow itself
>> must
>> > be
>> > > >> bidirectional, but the topic replication doesn’t? If so, this
>> seems to
>> > > >> unlock another use case. With this change, one can more confidently
>> > fail
>> > > >> over the consumer group to the passive cluster and back (in the
>> > context
>> > > of
>> > > >> the topic itself), without much reprocessing; I see this useful
>> when a
>> > > >> cluster gets busy at times. Or even have a new consumer group
>> consume
>> > > >> messages from the passive cluster for a while, before “failing it
>> > over”
>> > > to
>> > > >> the active cluster. Is this something that you would recommend
>> using
>> > the
>> > > >> feature for?
>> > > >>
>> > > >>
>> > > >>
>> > > >> Best,
>> > > >>
>> > > >> Vidor
>> > > >>
>> > > >> On Mon, Oct 28, 2024 at 7:25 PM Dániel Urbán <
>> urb.dani...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >> > Hi Viktor,
>> > > >> >
>> > > >> > SVV1. Not easy to provide a number, but yes, it does scale with
>> the
>> > > >> number
>> > > >> > of replicated topic partitions. Enabling this feature will add
>> the
>> > > >> overhead
>> > > >> > of an extra consumer, and allocates memory for an offset-sync
>> index
>> > > for
>> > > >> > each partition. The index is limited to 64 entries. I could give
>> an
>> > > >> upper
>> > > >> > bound of the memory usage as a function of the number of
>> replicated
>> > > >> > topic-partitions, but not sure if it would be useful for users,
>> and
>> > to
>> > > >> > where exactly document this. Wdyt?
>> > > >> >
>> > > >> > No worries, thanks for looking at the KIP!
>> > > >> > Daniel
>> > > >> >
>> > > >> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt
>> írta
>> > > >> > (időpont: 2024. okt. 28., H, 17:07):
>> > > >> >
>> > > >> > > Hi Daniel,
>> > > >> > >
>> > > >> > > SVV1. Fair points about the performance impact. The next
>> question
>> > is
>> > > >> that
>> > > >> > > can we quantify it somehow, ie. does it scale with the number
>> of
>> > > >> topics
>> > > >> > to
>> > > >> > > reverse checkpoints, the offsets emitted, etc.?
>> > > >> > >
>> > > >> > > I'll do one more pass on the KIP in the following days but I
>> > wanted
>> > > to
>> > > >> > > reply to you with what I have so far to keep this going.
>> > > >> > >
>> > > >> > > Best,
>> > > >> > > Viktor
>> > > >> > >
>> > > >> > > On Fri, Oct 25, 2024 at 5:32 PM Dániel Urbán <
>> > urb.dani...@gmail.com
>> > > >
>> > > >> > > wrote:
>> > > >> > >
>> > > >> > > > Hi,
>> > > >> > > >
>> > > >> > > > One more update. As I was working on the PR, I realized that
>> the
>> > > >> only
>> > > >> > way
>> > > >> > > > to support IdentityReplicationPolicy is to add an extra topic
>> > > >> filter to
>> > > >> > > the
>> > > >> > > > checkpointing. I updated the KIP accordingly.
>> > > >> > > > I also opened a draft PR to demonstrate the proposed changes:
>> > > >> > > > https://github.com/apache/kafka/pull/17593
>> > > >> > > >
>> > > >> > > > Daniel
>> > > >> > > >
>> > > >> > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont:
>> 2024.
>> > > okt.
>> > > >> > 24.,
>> > > >> > > > Cs,
>> > > >> > > > 15:22):
>> > > >> > > >
>> > > >> > > > > Hi all,
>> > > >> > > > > Just a bump/minor update here:
>> > > >> > > > > As I've started working on a POC of the proposed solution,
>> > I've
>> > > >> > > realised
>> > > >> > > > > that the hard requirement related to the ReplicationPolicy
>> > > >> > > implementation
>> > > >> > > > > can be eliminated, updated the KIP accordingly.
>> > > >> > > > > Daniel
>> > > >> > > > >
>> > > >> > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont:
>> 2024.
>> > > >> okt.
>> > > >> > > 21.,
>> > > >> > > > > H, 16:18):
>> > > >> > > > >
>> > > >> > > > >> Hi Mickael,
>> > > >> > > > >> Good point, I renamed the KIP and this thread:
>> > > >> > > > >>
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing+in+MirrorMaker
>> > > >> > > > >> Thank you,
>> > > >> > > > >> Daniel
>> > > >> > > > >>
>> > > >> > > > >> Mickael Maison <mickael.mai...@gmail.com> ezt írta
>> (időpont:
>> > > >> 2024.
>> > > >> > > okt.
>> > > >> > > > >> 21., H, 15:22):
>> > > >> > > > >>
>> > > >> > > > >>> Hi Daniel,
>> > > >> > > > >>>
>> > > >> > > > >>> I've not had time to take a close look at the KIP but my
>> > > initial
>> > > >> > > > >>> feedback would be to adjust the name to make it clear
>> it's
>> > > about
>> > > >> > > > >>> MirrorMaker.
>> > > >> > > > >>> The word "checkpoint" has several meanings in Kafka and
>> from
>> > > the
>> > > >> > > > >>> current KIP name it's not clear if it's about KRaft,
>> Streams
>> > > or
>> > > >> > > > >>> Connect.
>> > > >> > > > >>>
>> > > >> > > > >>> Thanks,
>> > > >> > > > >>> Mickael
>> > > >> > > > >>>
>> > > >> > > > >>> On Mon, Oct 21, 2024 at 2:55 PM Dániel Urbán <
>> > > >> > urb.dani...@gmail.com>
>> > > >> > > > >>> wrote:
>> > > >> > > > >>> >
>> > > >> > > > >>> > Hi Viktor,
>> > > >> > > > >>> >
>> > > >> > > > >>> > Thank you for the comments!
>> > > >> > > > >>> >
>> > > >> > > > >>> > SVV1: I think the feature has some performance
>> > implications.
>> > > >> If
>> > > >> > the
>> > > >> > > > >>> reverse
>> > > >> > > > >>> > checkpointing is enabled, task startup will be possibly
>> > > >> slower,
>> > > >> > > since
>> > > >> > > > >>> it
>> > > >> > > > >>> > will need to consume from a second offset-syncs topic;
>> and
>> > > it
>> > > >> > will
>> > > >> > > > >>> also use
>> > > >> > > > >>> > more memory, to keep the second offset-sync history.
>> > > >> > Additionally,
>> > > >> > > it
>> > > >> > > > >>> is
>> > > >> > > > >>> > also possible to have an offset-syncs topic present
>> > without
>> > > an
>> > > >> > > > actual,
>> > > >> > > > >>> > opposite flow being active - I think only users can
>> tell
>> > if
>> > > >> the
>> > > >> > > > reverse
>> > > >> > > > >>> > checkpointing should be active, and they should be the
>> one
>> > > >> opting
>> > > >> > > in
>> > > >> > > > >>> for
>> > > >> > > > >>> > the higher resource usage.
>> > > >> > > > >>> >
>> > > >> > > > >>> > SVV2: I mention the DefaultReplicationPolicy to provide
>> > > >> > examples. I
>> > > >> > > > >>> don't
>> > > >> > > > >>> > think it is required. The actual requirement related to
>> > the
>> > > >> > > > >>> > ReplicationPolicy is that the policy should be able to
>> > > >> correctly
>> > > >> > > tell
>> > > >> > > > >>> which
>> > > >> > > > >>> > topic was replicated from which cluster. Because of
>> this,
>> > > >> > > > >>> > IdentityReplicationPolicy would not work, but
>> > > >> > > > >>> DefaultReplicationPolicy, or
>> > > >> > > > >>> > any other ReplicationPolicy implementations with a
>> > correctly
>> > > >> > > > >>> implemented
>> > > >> > > > >>> > "topicSource" method should work. I will make an
>> explicit
>> > > >> note of
>> > > >> > > > this
>> > > >> > > > >>> in
>> > > >> > > > >>> > the KIP.
>> > > >> > > > >>> >
>> > > >> > > > >>> > Thank you,
>> > > >> > > > >>> > Daniel
>> > > >> > > > >>> >
>> > > >> > > > >>> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com
>> .invalid>
>> > > ezt
>> > > >> > írta
>> > > >> > > > >>> > (időpont: 2024. okt. 18., Pén 17:28):
>> > > >> > > > >>> >
>> > > >> > > > >>> > > Hey Dan,
>> > > >> > > > >>> > >
>> > > >> > > > >>> > > I think this is a very useful idea. Two questions:
>> > > >> > > > >>> > >
>> > > >> > > > >>> > > SVV1: Do you think we need the feature flag at all? I
>> > know
>> > > >> that
>> > > >> > > not
>> > > >> > > > >>> having
>> > > >> > > > >>> > > this flag may technically render the KIP unnecessary
>> > > >> (however
>> > > >> > it
>> > > >> > > > may
>> > > >> > > > >>> still
>> > > >> > > > >>> > > be useful to discuss this topic and create a
>> concensus).
>> > > As
>> > > >> you
>> > > >> > > > >>> wrote in
>> > > >> > > > >>> > > the KIP, we may be able to look up the target and
>> source
>> > > >> topics
>> > > >> > > and
>> > > >> > > > >>> if we
>> > > >> > > > >>> > > can do this, we can probably detect if the
>> replication
>> > is
>> > > >> > one-way
>> > > >> > > > or
>> > > >> > > > >>> > > prefixless (identity). So that means we don't need
>> this
>> > > >> flag to
>> > > >> > > > >>> control
>> > > >> > > > >>> > > when we want to use this. Then it is really just
>> there
>> > to
>> > > >> act
>> > > >> > as
>> > > >> > > > >>> something
>> > > >> > > > >>> > > that can turn the feature on and off if needed, but
>> I'm
>> > > not
>> > > >> > > really
>> > > >> > > > >>> sure if
>> > > >> > > > >>> > > there is a great risk in just enabling this by
>> default.
>> > If
>> > > >> we
>> > > >> > > > really
>> > > >> > > > >>> just
>> > > >> > > > >>> > > turn back the B -> A checkpoints and save them in
>> the A
>> > ->
>> > > >> B,
>> > > >> > > then
>> > > >> > > > >>> maybe
>> > > >> > > > >>> > > it's not too risky and users would get this
>> immediately
>> > by
>> > > >> just
>> > > >> > > > >>> upgrading.
>> > > >> > > > >>> > >
>> > > >> > > > >>> > > SVV2: You write that we need
>> DefaultReplicationPolicy to
>> > > use
>> > > >> > this
>> > > >> > > > >>> feature,
>> > > >> > > > >>> > > but most of the functionality is there on interface
>> > level
>> > > in
>> > > >> > > > >>> > > ReplicationPolicy. Is there anything that is missing
>> > from
>> > > >> there
>> > > >> > > and
>> > > >> > > > >>> if so,
>> > > >> > > > >>> > > what do you think about pulling it into the
>> interface?
>> > If
>> > > >> this
>> > > >> > > > >>> improvement
>> > > >> > > > >>> > > only works with the default replication policy, then
>> > it's
>> > > >> > > somewhat
>> > > >> > > > >>> limiting
>> > > >> > > > >>> > > as users may have their own policy for various
>> reasons,
>> > > but
>> > > >> if
>> > > >> > we
>> > > >> > > > >>> can make
>> > > >> > > > >>> > > it work on the interface level, then we could provide
>> > this
>> > > >> > > feature
>> > > >> > > > to
>> > > >> > > > >>> > > everyone. Of course there can be replication policies
>> > like
>> > > >> the
>> > > >> > > > >>> identity one
>> > > >> > > > >>> > > that by design disallows this feature, but for that,
>> see
>> > > my
>> > > >> > > > previous
>> > > >> > > > >>> point.
>> > > >> > > > >>> > >
>> > > >> > > > >>> > > Best,
>> > > >> > > > >>> > > Viktor
>> > > >> > > > >>> > >
>> > > >> > > > >>> > > On Fri, Oct 18, 2024 at 3:30 PM Dániel Urbán <
>> > > >> > > > urb.dani...@gmail.com>
>> > > >> > > > >>> > > wrote:
>> > > >> > > > >>> > >
>> > > >> > > > >>> > > > Hi everyone,
>> > > >> > > > >>> > > >
>> > > >> > > > >>> > > > I'd like to start the discussion on KIP-1098:
>> Reverse
>> > > >> > > > >>> Checkpointing (
>> > > >> > > > >>> > > >
>> > > >> > > > >>> > > >
>> > > >> > > > >>> > >
>> > > >> > > > >>>
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing
>> > > >> > > > >>> > > > )
>> > > >> > > > >>> > > > which aims to minimize message reprocessing for
>> > > consumers
>> > > >> in
>> > > >> > > > >>> failbacks.
>> > > >> > > > >>> > > >
>> > > >> > > > >>> > > > TIA,
>> > > >> > > > >>> > > > Daniel
>> > > >> > > > >>> > > >
>> > > >> > > > >>> > >
>> > > >> > > > >>>
>> > > >> > > > >>
>> > > >> > > >
>> > > >> > >
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>

Reply via email to