Hi everyone, Bumping this - I think this gap affects the lives of many MM2 users, would be great to fill it. Any comments are welcome. TIA Daniel
Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov. 15., P, 13:05): > Hi Vidor, > > KV2 and KV3. I think it is a common setup, but it is usually described as > separating the produce and the consume workload. Instead of "prod" and > "backup" think of "produce cluster" and "consume cluster". Currently, even > if a group does not exist in the target cluster, both checkpointing and > group offset syncing still happens. As for the setup you mentioned, if the > topic replication is unidirectional, there will be no checkpointing > happening inside B->A. With the current proposal, reverse checkpointing > could still occur if enabled. My issue with this example is that it sounds > like the topic replication is unidirectional - if that is the case, > checkpointing in general doesn't make sense in B->A, the groups filter will > only apply to reverse checkpointing, and have no effect on checkpointing > (as there are no topics replicated in B->A). I'm not strongly against > adding a separate reverse groups filter, but I don't really see the > use-case where it would make sense. > > Thanks, > Daniel > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. nov. > 15., P, 10:36): > >> Hi Daniel, >> >> KV1. Thanks for the rundown, good to know that the impact is not >> concerning, I agree with the optimization not worth the effort >> >> KV2 and KV3. Here’s the setup I was thinking about. Suppose we have a >> topic on a prod cluster A that is replicated to a backup cluster B. There >> is a CG that is working through the messages on the backup cluster, before >> it’s promoted to the prod cluster. In this case that CG does not exist on >> cluster A, and it won’t be checkpointed (obviously), but it’s not clear to >> me if it will be reverse checkpointed. >> I’m not certain if the above setup is an actual real-world use case, but >> if it is, we need to make sure that CGs can get reverse checkpointed even >> if they initially don’t exist on the cluster. (it’s not a traditional >> failover + failback scenario). This is why I was thinking that a reverse >> checkpointing group filter could be useful, but I agree that the same can >> be achieved with the existing filter. >> >> Best, >> Vidor >> >> >> From: Dániel Urbán <urb.dani...@gmail.com> >> Date: Friday, 15 November 2024 at 09:35 >> To: dev@kafka.apache.org <dev@kafka.apache.org> >> Subject: Re: [DISCUSS] KIP-1098: Reverse Checkpointing in MirrorMaker >> Hi Viktor, >> >> SVV3. In the current proposal, if the TopicFilter is not provided, it >> enables a different logic for reverse checkpointing - the task relies on >> the ReplicationPolicy to detect if a topic in the source cluster is a >> replica originating from the target cluster, e.g. in the A->B flow, the >> topic "B.T" in cluster A originates from B, and thus can be reverse >> checkpointed. This should work fine as long as the ReplicationPolicy can >> reliably tell the source cluster of a topic. For other ReplicationPolicy >> implementations (e.g. Identity), we cannot rely on the policy, and in that >> case, the TopicFilter takes over. If we have a default TopicFIlter, then >> we >> need another flag to tell the task if it should rely on the policy or the >> filter for identifying the reverse checkpointable topics. That seems >> redundant, I think a null filter is cleaner, and also means fewer configs. >> Or do we have a generic preference in null vs default+enable flag in terms >> of configs? >> >> SVV4. I think the current TopicFilter is a minimal interface, which works >> well for choosing topics. Adding more methods would bloat the interface, >> and require new configs for the existing implementations. It would also be >> confusing in some cases to figure out if we need to configure the new >> properties - e.g. in MirrorSourceConnector, the new >> "reverse.checkpoint.topics" wouldn't make sense, but could still be >> configurable. In short, I think it introduces a few fuzzy situations, and >> would rather just reuse the existing implementations with prefixed config >> overrides. >> >> Thanks, >> Daniel >> >> Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt írta >> (időpont: 2024. nov. 14., Cs, 16:56): >> >> > Hi Daniel, >> > >> > SVV3. Kind of an implementation detail. So I think using TopicFilter is >> > good, however I was wondering if we should provide a default >> implementation >> > instead of null? We have to implement the pass-through behavior anyways, >> > and it makes sense to me to do it in a filter. >> > SVV4. Also, an alternative to the previous one, instead of introducing >> > another topic filter, we could extend the current TopicFilter with a >> > shouldReverseCheckpointTopic() method with reverse.checkpoint.topics and >> > reverse.checkpoints.topics.exclude configs in the DefaultTopicFilter >> where >> > we configure it for pass-through. We wouldn't really have fewer configs >> > with this option, but we'd use an existing filter that has similar >> > functionality (filtering topics) and users could be more familiar with >> it. >> > >> > What do you think? >> > >> > Viktor >> > >> > On Thu, Nov 7, 2024 at 9:21 AM Dániel Urbán <urb.dani...@gmail.com> >> wrote: >> > >> > > Gentle bump - any comments are welcome. >> > > This could fill an important gap in MM2, and would be nice to fix. >> > > TIA >> > > Daniel >> > > >> > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov. >> 4., >> > H, >> > > 11:00): >> > > >> > > > Hi Vidor, >> > > > >> > > > Thank you for your comments! >> > > > >> > > > 1. I think the optimization sounds nice, but would not work well >> with >> > > > TopicFilter implementations which can be dynamically updated in the >> > > > background (without touching the Connector configuration). If we >> > actually >> > > > dropped the offset sync records for a topic when the task is >> started, >> > > then >> > > > the topic is added to the filter (without triggering a task >> restart), >> > > then >> > > > we would not have the history of offset syncs for the "new" topic. >> I'm >> > > not >> > > > saying that this makes the optimization you are suggesting >> impossible, >> > > but >> > > > it's a larger chunk of work - we would need to start monitoring the >> > > topics >> > > > in MirrorCheckpointConnector the same way we do in >> > MirrorSourceConnector, >> > > > and send config update requests to the Connect framework on topic >> > > changes. >> > > > Just to clarify the "memory intensive" nature of the offset sync >> > store, a >> > > > full offset sync history of a single partition takes less than 2 >> KBs (1 >> > > > offset sync contains the topic partition, and 2 longs, and the >> history >> > > has >> > > > a max size of 64). We need to multiply this by the number of >> replicated >> > > > partitions and the number of checkpoint tasks to get the full memory >> > > usage >> > > > increase when the feature is enabled. Even with thousands of >> partitions >> > > and >> > > > hundreds of tasks, this will be below 1 GB, which is distributed >> across >> > > the >> > > > Connect worker nodes. So I don't think that this would be an >> alarming >> > > > increase in memory usage, and the optimization is not worth it with >> the >> > > > extra complexity of the dynamic config updates. >> > > > 2. I don't really see the use-case for the reverse checkpointing >> group >> > > > filter. If a group is already checkpointed in the opposite flow, it >> > > > suggests that the intent is to be able to fail over. Why wouldn't >> the >> > > user >> > > > want to also perform a failback for that same group? >> > > > >> > > > Not sure what you mean by replication being bidirectional, but topic >> > > > replication not being bi-directional. With >> DefaultReplicationPolicy, we >> > > > usually have the same topic on both clusters to be used by >> producers - >> > > the >> > > > name is the same, but those are logically 2 different topics. With >> > > > IdentityReplicationPolicy, we require users to avoid loops (i.e. >> > > > replicating the same messages back and forth infinitely). >> > > > >> > > > As for being more flexible with failovers and failbacks, yes, I >> agree, >> > > the >> > > > fact the re-processing is minimized, it might enable more >> use-cases, or >> > > > allow more frequent failovers and failbacks. I'd say that in most >> > > > use-cases, users will still want to avoid doing this procedure >> > > frequently, >> > > > since it requires a client restart/reconfiguration, which is not >> > > risk-free. >> > > > >> > > > Thanks, >> > > > Daniel >> > > > >> > > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. >> okt. >> > > > 30., Sze, 22:21): >> > > > >> > > >> Hi Daniel, >> > > >> >> > > >> >> > > >> >> > > >> This would indeed greatly reduce the duplicate processing on >> > failbacks. >> > > >> >> > > >> >> > > >> >> > > >> Few questions: >> > > >> >> > > >> 1. Since having a second offset-sync store can be memory >> intensive, >> > > >> would it make sense to filter the topics in it based on the >> > > >> reverseCheckpointingTopicFilter? >> > > >> 2. Would it make sense to add a reverseCheckpointingGroupFilter >> as >> > > >> well, >> > > >> so that one can control not just the topics for reverse >> > checkpointing >> > > >> but >> > > >> also the groups? >> > > >> >> > > >> >> > > >> >> > > >> Do I understand this correctly, that the replication flow itself >> must >> > be >> > > >> bidirectional, but the topic replication doesn’t? If so, this >> seems to >> > > >> unlock another use case. With this change, one can more confidently >> > fail >> > > >> over the consumer group to the passive cluster and back (in the >> > context >> > > of >> > > >> the topic itself), without much reprocessing; I see this useful >> when a >> > > >> cluster gets busy at times. Or even have a new consumer group >> consume >> > > >> messages from the passive cluster for a while, before “failing it >> > over” >> > > to >> > > >> the active cluster. Is this something that you would recommend >> using >> > the >> > > >> feature for? >> > > >> >> > > >> >> > > >> >> > > >> Best, >> > > >> >> > > >> Vidor >> > > >> >> > > >> On Mon, Oct 28, 2024 at 7:25 PM Dániel Urbán < >> urb.dani...@gmail.com> >> > > >> wrote: >> > > >> >> > > >> > Hi Viktor, >> > > >> > >> > > >> > SVV1. Not easy to provide a number, but yes, it does scale with >> the >> > > >> number >> > > >> > of replicated topic partitions. Enabling this feature will add >> the >> > > >> overhead >> > > >> > of an extra consumer, and allocates memory for an offset-sync >> index >> > > for >> > > >> > each partition. The index is limited to 64 entries. I could give >> an >> > > >> upper >> > > >> > bound of the memory usage as a function of the number of >> replicated >> > > >> > topic-partitions, but not sure if it would be useful for users, >> and >> > to >> > > >> > where exactly document this. Wdyt? >> > > >> > >> > > >> > No worries, thanks for looking at the KIP! >> > > >> > Daniel >> > > >> > >> > > >> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt >> írta >> > > >> > (időpont: 2024. okt. 28., H, 17:07): >> > > >> > >> > > >> > > Hi Daniel, >> > > >> > > >> > > >> > > SVV1. Fair points about the performance impact. The next >> question >> > is >> > > >> that >> > > >> > > can we quantify it somehow, ie. does it scale with the number >> of >> > > >> topics >> > > >> > to >> > > >> > > reverse checkpoints, the offsets emitted, etc.? >> > > >> > > >> > > >> > > I'll do one more pass on the KIP in the following days but I >> > wanted >> > > to >> > > >> > > reply to you with what I have so far to keep this going. >> > > >> > > >> > > >> > > Best, >> > > >> > > Viktor >> > > >> > > >> > > >> > > On Fri, Oct 25, 2024 at 5:32 PM Dániel Urbán < >> > urb.dani...@gmail.com >> > > > >> > > >> > > wrote: >> > > >> > > >> > > >> > > > Hi, >> > > >> > > > >> > > >> > > > One more update. As I was working on the PR, I realized that >> the >> > > >> only >> > > >> > way >> > > >> > > > to support IdentityReplicationPolicy is to add an extra topic >> > > >> filter to >> > > >> > > the >> > > >> > > > checkpointing. I updated the KIP accordingly. >> > > >> > > > I also opened a draft PR to demonstrate the proposed changes: >> > > >> > > > https://github.com/apache/kafka/pull/17593 >> > > >> > > > >> > > >> > > > Daniel >> > > >> > > > >> > > >> > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: >> 2024. >> > > okt. >> > > >> > 24., >> > > >> > > > Cs, >> > > >> > > > 15:22): >> > > >> > > > >> > > >> > > > > Hi all, >> > > >> > > > > Just a bump/minor update here: >> > > >> > > > > As I've started working on a POC of the proposed solution, >> > I've >> > > >> > > realised >> > > >> > > > > that the hard requirement related to the ReplicationPolicy >> > > >> > > implementation >> > > >> > > > > can be eliminated, updated the KIP accordingly. >> > > >> > > > > Daniel >> > > >> > > > > >> > > >> > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: >> 2024. >> > > >> okt. >> > > >> > > 21., >> > > >> > > > > H, 16:18): >> > > >> > > > > >> > > >> > > > >> Hi Mickael, >> > > >> > > > >> Good point, I renamed the KIP and this thread: >> > > >> > > > >> >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing+in+MirrorMaker >> > > >> > > > >> Thank you, >> > > >> > > > >> Daniel >> > > >> > > > >> >> > > >> > > > >> Mickael Maison <mickael.mai...@gmail.com> ezt írta >> (időpont: >> > > >> 2024. >> > > >> > > okt. >> > > >> > > > >> 21., H, 15:22): >> > > >> > > > >> >> > > >> > > > >>> Hi Daniel, >> > > >> > > > >>> >> > > >> > > > >>> I've not had time to take a close look at the KIP but my >> > > initial >> > > >> > > > >>> feedback would be to adjust the name to make it clear >> it's >> > > about >> > > >> > > > >>> MirrorMaker. >> > > >> > > > >>> The word "checkpoint" has several meanings in Kafka and >> from >> > > the >> > > >> > > > >>> current KIP name it's not clear if it's about KRaft, >> Streams >> > > or >> > > >> > > > >>> Connect. >> > > >> > > > >>> >> > > >> > > > >>> Thanks, >> > > >> > > > >>> Mickael >> > > >> > > > >>> >> > > >> > > > >>> On Mon, Oct 21, 2024 at 2:55 PM Dániel Urbán < >> > > >> > urb.dani...@gmail.com> >> > > >> > > > >>> wrote: >> > > >> > > > >>> > >> > > >> > > > >>> > Hi Viktor, >> > > >> > > > >>> > >> > > >> > > > >>> > Thank you for the comments! >> > > >> > > > >>> > >> > > >> > > > >>> > SVV1: I think the feature has some performance >> > implications. >> > > >> If >> > > >> > the >> > > >> > > > >>> reverse >> > > >> > > > >>> > checkpointing is enabled, task startup will be possibly >> > > >> slower, >> > > >> > > since >> > > >> > > > >>> it >> > > >> > > > >>> > will need to consume from a second offset-syncs topic; >> and >> > > it >> > > >> > will >> > > >> > > > >>> also use >> > > >> > > > >>> > more memory, to keep the second offset-sync history. >> > > >> > Additionally, >> > > >> > > it >> > > >> > > > >>> is >> > > >> > > > >>> > also possible to have an offset-syncs topic present >> > without >> > > an >> > > >> > > > actual, >> > > >> > > > >>> > opposite flow being active - I think only users can >> tell >> > if >> > > >> the >> > > >> > > > reverse >> > > >> > > > >>> > checkpointing should be active, and they should be the >> one >> > > >> opting >> > > >> > > in >> > > >> > > > >>> for >> > > >> > > > >>> > the higher resource usage. >> > > >> > > > >>> > >> > > >> > > > >>> > SVV2: I mention the DefaultReplicationPolicy to provide >> > > >> > examples. I >> > > >> > > > >>> don't >> > > >> > > > >>> > think it is required. The actual requirement related to >> > the >> > > >> > > > >>> > ReplicationPolicy is that the policy should be able to >> > > >> correctly >> > > >> > > tell >> > > >> > > > >>> which >> > > >> > > > >>> > topic was replicated from which cluster. Because of >> this, >> > > >> > > > >>> > IdentityReplicationPolicy would not work, but >> > > >> > > > >>> DefaultReplicationPolicy, or >> > > >> > > > >>> > any other ReplicationPolicy implementations with a >> > correctly >> > > >> > > > >>> implemented >> > > >> > > > >>> > "topicSource" method should work. I will make an >> explicit >> > > >> note of >> > > >> > > > this >> > > >> > > > >>> in >> > > >> > > > >>> > the KIP. >> > > >> > > > >>> > >> > > >> > > > >>> > Thank you, >> > > >> > > > >>> > Daniel >> > > >> > > > >>> > >> > > >> > > > >>> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com >> .invalid> >> > > ezt >> > > >> > írta >> > > >> > > > >>> > (időpont: 2024. okt. 18., Pén 17:28): >> > > >> > > > >>> > >> > > >> > > > >>> > > Hey Dan, >> > > >> > > > >>> > > >> > > >> > > > >>> > > I think this is a very useful idea. Two questions: >> > > >> > > > >>> > > >> > > >> > > > >>> > > SVV1: Do you think we need the feature flag at all? I >> > know >> > > >> that >> > > >> > > not >> > > >> > > > >>> having >> > > >> > > > >>> > > this flag may technically render the KIP unnecessary >> > > >> (however >> > > >> > it >> > > >> > > > may >> > > >> > > > >>> still >> > > >> > > > >>> > > be useful to discuss this topic and create a >> concensus). >> > > As >> > > >> you >> > > >> > > > >>> wrote in >> > > >> > > > >>> > > the KIP, we may be able to look up the target and >> source >> > > >> topics >> > > >> > > and >> > > >> > > > >>> if we >> > > >> > > > >>> > > can do this, we can probably detect if the >> replication >> > is >> > > >> > one-way >> > > >> > > > or >> > > >> > > > >>> > > prefixless (identity). So that means we don't need >> this >> > > >> flag to >> > > >> > > > >>> control >> > > >> > > > >>> > > when we want to use this. Then it is really just >> there >> > to >> > > >> act >> > > >> > as >> > > >> > > > >>> something >> > > >> > > > >>> > > that can turn the feature on and off if needed, but >> I'm >> > > not >> > > >> > > really >> > > >> > > > >>> sure if >> > > >> > > > >>> > > there is a great risk in just enabling this by >> default. >> > If >> > > >> we >> > > >> > > > really >> > > >> > > > >>> just >> > > >> > > > >>> > > turn back the B -> A checkpoints and save them in >> the A >> > -> >> > > >> B, >> > > >> > > then >> > > >> > > > >>> maybe >> > > >> > > > >>> > > it's not too risky and users would get this >> immediately >> > by >> > > >> just >> > > >> > > > >>> upgrading. >> > > >> > > > >>> > > >> > > >> > > > >>> > > SVV2: You write that we need >> DefaultReplicationPolicy to >> > > use >> > > >> > this >> > > >> > > > >>> feature, >> > > >> > > > >>> > > but most of the functionality is there on interface >> > level >> > > in >> > > >> > > > >>> > > ReplicationPolicy. Is there anything that is missing >> > from >> > > >> there >> > > >> > > and >> > > >> > > > >>> if so, >> > > >> > > > >>> > > what do you think about pulling it into the >> interface? >> > If >> > > >> this >> > > >> > > > >>> improvement >> > > >> > > > >>> > > only works with the default replication policy, then >> > it's >> > > >> > > somewhat >> > > >> > > > >>> limiting >> > > >> > > > >>> > > as users may have their own policy for various >> reasons, >> > > but >> > > >> if >> > > >> > we >> > > >> > > > >>> can make >> > > >> > > > >>> > > it work on the interface level, then we could provide >> > this >> > > >> > > feature >> > > >> > > > to >> > > >> > > > >>> > > everyone. Of course there can be replication policies >> > like >> > > >> the >> > > >> > > > >>> identity one >> > > >> > > > >>> > > that by design disallows this feature, but for that, >> see >> > > my >> > > >> > > > previous >> > > >> > > > >>> point. >> > > >> > > > >>> > > >> > > >> > > > >>> > > Best, >> > > >> > > > >>> > > Viktor >> > > >> > > > >>> > > >> > > >> > > > >>> > > On Fri, Oct 18, 2024 at 3:30 PM Dániel Urbán < >> > > >> > > > urb.dani...@gmail.com> >> > > >> > > > >>> > > wrote: >> > > >> > > > >>> > > >> > > >> > > > >>> > > > Hi everyone, >> > > >> > > > >>> > > > >> > > >> > > > >>> > > > I'd like to start the discussion on KIP-1098: >> Reverse >> > > >> > > > >>> Checkpointing ( >> > > >> > > > >>> > > > >> > > >> > > > >>> > > > >> > > >> > > > >>> > > >> > > >> > > > >>> >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing >> > > >> > > > >>> > > > ) >> > > >> > > > >>> > > > which aims to minimize message reprocessing for >> > > consumers >> > > >> in >> > > >> > > > >>> failbacks. >> > > >> > > > >>> > > > >> > > >> > > > >>> > > > TIA, >> > > >> > > > >>> > > > Daniel >> > > >> > > > >>> > > > >> > > >> > > > >>> > > >> > > >> > > > >>> >> > > >> > > > >> >> > > >> > > > >> > > >> > > >> > > >> > >> > > >> >> > > > >> > > >> > >> >