Hi Vidor, Thank you for your comments!
1. I think the optimization sounds nice, but would not work well with TopicFilter implementations which can be dynamically updated in the background (without touching the Connector configuration). If we actually dropped the offset sync records for a topic when the task is started, then the topic is added to the filter (without triggering a task restart), then we would not have the history of offset syncs for the "new" topic. I'm not saying that this makes the optimization you are suggesting impossible, but it's a larger chunk of work - we would need to start monitoring the topics in MirrorCheckpointConnector the same way we do in MirrorSourceConnector, and send config update requests to the Connect framework on topic changes. Just to clarify the "memory intensive" nature of the offset sync store, a full offset sync history of a single partition takes less than 2 KBs (1 offset sync contains the topic partition, and 2 longs, and the history has a max size of 64). We need to multiply this by the number of replicated partitions and the number of checkpoint tasks to get the full memory usage increase when the feature is enabled. Even with thousands of partitions and hundreds of tasks, this will be below 1 GB, which is distributed across the Connect worker nodes. So I don't think that this would be an alarming increase in memory usage, and the optimization is not worth it with the extra complexity of the dynamic config updates. 2. I don't really see the use-case for the reverse checkpointing group filter. If a group is already checkpointed in the opposite flow, it suggests that the intent is to be able to fail over. Why wouldn't the user want to also perform a failback for that same group? Not sure what you mean by replication being bidirectional, but topic replication not being bi-directional. With DefaultReplicationPolicy, we usually have the same topic on both clusters to be used by producers - the name is the same, but those are logically 2 different topics. With IdentityReplicationPolicy, we require users to avoid loops (i.e. replicating the same messages back and forth infinitely). As for being more flexible with failovers and failbacks, yes, I agree, the fact the re-processing is minimized, it might enable more use-cases, or allow more frequent failovers and failbacks. I'd say that in most use-cases, users will still want to avoid doing this procedure frequently, since it requires a client restart/reconfiguration, which is not risk-free. Thanks, Daniel Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. okt. 30., Sze, 22:21): > Hi Daniel, > > > > This would indeed greatly reduce the duplicate processing on failbacks. > > > > Few questions: > > 1. Since having a second offset-sync store can be memory intensive, > would it make sense to filter the topics in it based on the > reverseCheckpointingTopicFilter? > 2. Would it make sense to add a reverseCheckpointingGroupFilter as well, > so that one can control not just the topics for reverse checkpointing > but > also the groups? > > > > Do I understand this correctly, that the replication flow itself must be > bidirectional, but the topic replication doesn’t? If so, this seems to > unlock another use case. With this change, one can more confidently fail > over the consumer group to the passive cluster and back (in the context of > the topic itself), without much reprocessing; I see this useful when a > cluster gets busy at times. Or even have a new consumer group consume > messages from the passive cluster for a while, before “failing it over” to > the active cluster. Is this something that you would recommend using the > feature for? > > > > Best, > > Vidor > > On Mon, Oct 28, 2024 at 7:25 PM Dániel Urbán <urb.dani...@gmail.com> > wrote: > > > Hi Viktor, > > > > SVV1. Not easy to provide a number, but yes, it does scale with the > number > > of replicated topic partitions. Enabling this feature will add the > overhead > > of an extra consumer, and allocates memory for an offset-sync index for > > each partition. The index is limited to 64 entries. I could give an upper > > bound of the memory usage as a function of the number of replicated > > topic-partitions, but not sure if it would be useful for users, and to > > where exactly document this. Wdyt? > > > > No worries, thanks for looking at the KIP! > > Daniel > > > > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt írta > > (időpont: 2024. okt. 28., H, 17:07): > > > > > Hi Daniel, > > > > > > SVV1. Fair points about the performance impact. The next question is > that > > > can we quantify it somehow, ie. does it scale with the number of topics > > to > > > reverse checkpoints, the offsets emitted, etc.? > > > > > > I'll do one more pass on the KIP in the following days but I wanted to > > > reply to you with what I have so far to keep this going. > > > > > > Best, > > > Viktor > > > > > > On Fri, Oct 25, 2024 at 5:32 PM Dániel Urbán <urb.dani...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > One more update. As I was working on the PR, I realized that the only > > way > > > > to support IdentityReplicationPolicy is to add an extra topic filter > to > > > the > > > > checkpointing. I updated the KIP accordingly. > > > > I also opened a draft PR to demonstrate the proposed changes: > > > > https://github.com/apache/kafka/pull/17593 > > > > > > > > Daniel > > > > > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. okt. > > 24., > > > > Cs, > > > > 15:22): > > > > > > > > > Hi all, > > > > > Just a bump/minor update here: > > > > > As I've started working on a POC of the proposed solution, I've > > > realised > > > > > that the hard requirement related to the ReplicationPolicy > > > implementation > > > > > can be eliminated, updated the KIP accordingly. > > > > > Daniel > > > > > > > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. okt. > > > 21., > > > > > H, 16:18): > > > > > > > > > >> Hi Mickael, > > > > >> Good point, I renamed the KIP and this thread: > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing+in+MirrorMaker > > > > >> Thank you, > > > > >> Daniel > > > > >> > > > > >> Mickael Maison <mickael.mai...@gmail.com> ezt írta (időpont: > 2024. > > > okt. > > > > >> 21., H, 15:22): > > > > >> > > > > >>> Hi Daniel, > > > > >>> > > > > >>> I've not had time to take a close look at the KIP but my initial > > > > >>> feedback would be to adjust the name to make it clear it's about > > > > >>> MirrorMaker. > > > > >>> The word "checkpoint" has several meanings in Kafka and from the > > > > >>> current KIP name it's not clear if it's about KRaft, Streams or > > > > >>> Connect. > > > > >>> > > > > >>> Thanks, > > > > >>> Mickael > > > > >>> > > > > >>> On Mon, Oct 21, 2024 at 2:55 PM Dániel Urbán < > > urb.dani...@gmail.com> > > > > >>> wrote: > > > > >>> > > > > > >>> > Hi Viktor, > > > > >>> > > > > > >>> > Thank you for the comments! > > > > >>> > > > > > >>> > SVV1: I think the feature has some performance implications. If > > the > > > > >>> reverse > > > > >>> > checkpointing is enabled, task startup will be possibly slower, > > > since > > > > >>> it > > > > >>> > will need to consume from a second offset-syncs topic; and it > > will > > > > >>> also use > > > > >>> > more memory, to keep the second offset-sync history. > > Additionally, > > > it > > > > >>> is > > > > >>> > also possible to have an offset-syncs topic present without an > > > > actual, > > > > >>> > opposite flow being active - I think only users can tell if the > > > > reverse > > > > >>> > checkpointing should be active, and they should be the one > opting > > > in > > > > >>> for > > > > >>> > the higher resource usage. > > > > >>> > > > > > >>> > SVV2: I mention the DefaultReplicationPolicy to provide > > examples. I > > > > >>> don't > > > > >>> > think it is required. The actual requirement related to the > > > > >>> > ReplicationPolicy is that the policy should be able to > correctly > > > tell > > > > >>> which > > > > >>> > topic was replicated from which cluster. Because of this, > > > > >>> > IdentityReplicationPolicy would not work, but > > > > >>> DefaultReplicationPolicy, or > > > > >>> > any other ReplicationPolicy implementations with a correctly > > > > >>> implemented > > > > >>> > "topicSource" method should work. I will make an explicit note > of > > > > this > > > > >>> in > > > > >>> > the KIP. > > > > >>> > > > > > >>> > Thank you, > > > > >>> > Daniel > > > > >>> > > > > > >>> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt > > írta > > > > >>> > (időpont: 2024. okt. 18., Pén 17:28): > > > > >>> > > > > > >>> > > Hey Dan, > > > > >>> > > > > > > >>> > > I think this is a very useful idea. Two questions: > > > > >>> > > > > > > >>> > > SVV1: Do you think we need the feature flag at all? I know > that > > > not > > > > >>> having > > > > >>> > > this flag may technically render the KIP unnecessary (however > > it > > > > may > > > > >>> still > > > > >>> > > be useful to discuss this topic and create a concensus). As > you > > > > >>> wrote in > > > > >>> > > the KIP, we may be able to look up the target and source > topics > > > and > > > > >>> if we > > > > >>> > > can do this, we can probably detect if the replication is > > one-way > > > > or > > > > >>> > > prefixless (identity). So that means we don't need this flag > to > > > > >>> control > > > > >>> > > when we want to use this. Then it is really just there to act > > as > > > > >>> something > > > > >>> > > that can turn the feature on and off if needed, but I'm not > > > really > > > > >>> sure if > > > > >>> > > there is a great risk in just enabling this by default. If we > > > > really > > > > >>> just > > > > >>> > > turn back the B -> A checkpoints and save them in the A -> B, > > > then > > > > >>> maybe > > > > >>> > > it's not too risky and users would get this immediately by > just > > > > >>> upgrading. > > > > >>> > > > > > > >>> > > SVV2: You write that we need DefaultReplicationPolicy to use > > this > > > > >>> feature, > > > > >>> > > but most of the functionality is there on interface level in > > > > >>> > > ReplicationPolicy. Is there anything that is missing from > there > > > and > > > > >>> if so, > > > > >>> > > what do you think about pulling it into the interface? If > this > > > > >>> improvement > > > > >>> > > only works with the default replication policy, then it's > > > somewhat > > > > >>> limiting > > > > >>> > > as users may have their own policy for various reasons, but > if > > we > > > > >>> can make > > > > >>> > > it work on the interface level, then we could provide this > > > feature > > > > to > > > > >>> > > everyone. Of course there can be replication policies like > the > > > > >>> identity one > > > > >>> > > that by design disallows this feature, but for that, see my > > > > previous > > > > >>> point. > > > > >>> > > > > > > >>> > > Best, > > > > >>> > > Viktor > > > > >>> > > > > > > >>> > > On Fri, Oct 18, 2024 at 3:30 PM Dániel Urbán < > > > > urb.dani...@gmail.com> > > > > >>> > > wrote: > > > > >>> > > > > > > >>> > > > Hi everyone, > > > > >>> > > > > > > > >>> > > > I'd like to start the discussion on KIP-1098: Reverse > > > > >>> Checkpointing ( > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing > > > > >>> > > > ) > > > > >>> > > > which aims to minimize message reprocessing for consumers > in > > > > >>> failbacks. > > > > >>> > > > > > > > >>> > > > TIA, > > > > >>> > > > Daniel > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > >> > > > > > > > > > >