Gentle bump - any comments are welcome. This could fill an important gap in MM2, and would be nice to fix. TIA Daniel
Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov. 4., H, 11:00): > Hi Vidor, > > Thank you for your comments! > > 1. I think the optimization sounds nice, but would not work well with > TopicFilter implementations which can be dynamically updated in the > background (without touching the Connector configuration). If we actually > dropped the offset sync records for a topic when the task is started, then > the topic is added to the filter (without triggering a task restart), then > we would not have the history of offset syncs for the "new" topic. I'm not > saying that this makes the optimization you are suggesting impossible, but > it's a larger chunk of work - we would need to start monitoring the topics > in MirrorCheckpointConnector the same way we do in MirrorSourceConnector, > and send config update requests to the Connect framework on topic changes. > Just to clarify the "memory intensive" nature of the offset sync store, a > full offset sync history of a single partition takes less than 2 KBs (1 > offset sync contains the topic partition, and 2 longs, and the history has > a max size of 64). We need to multiply this by the number of replicated > partitions and the number of checkpoint tasks to get the full memory usage > increase when the feature is enabled. Even with thousands of partitions and > hundreds of tasks, this will be below 1 GB, which is distributed across the > Connect worker nodes. So I don't think that this would be an alarming > increase in memory usage, and the optimization is not worth it with the > extra complexity of the dynamic config updates. > 2. I don't really see the use-case for the reverse checkpointing group > filter. If a group is already checkpointed in the opposite flow, it > suggests that the intent is to be able to fail over. Why wouldn't the user > want to also perform a failback for that same group? > > Not sure what you mean by replication being bidirectional, but topic > replication not being bi-directional. With DefaultReplicationPolicy, we > usually have the same topic on both clusters to be used by producers - the > name is the same, but those are logically 2 different topics. With > IdentityReplicationPolicy, we require users to avoid loops (i.e. > replicating the same messages back and forth infinitely). > > As for being more flexible with failovers and failbacks, yes, I agree, the > fact the re-processing is minimized, it might enable more use-cases, or > allow more frequent failovers and failbacks. I'd say that in most > use-cases, users will still want to avoid doing this procedure frequently, > since it requires a client restart/reconfiguration, which is not risk-free. > > Thanks, > Daniel > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. okt. > 30., Sze, 22:21): > >> Hi Daniel, >> >> >> >> This would indeed greatly reduce the duplicate processing on failbacks. >> >> >> >> Few questions: >> >> 1. Since having a second offset-sync store can be memory intensive, >> would it make sense to filter the topics in it based on the >> reverseCheckpointingTopicFilter? >> 2. Would it make sense to add a reverseCheckpointingGroupFilter as >> well, >> so that one can control not just the topics for reverse checkpointing >> but >> also the groups? >> >> >> >> Do I understand this correctly, that the replication flow itself must be >> bidirectional, but the topic replication doesn’t? If so, this seems to >> unlock another use case. With this change, one can more confidently fail >> over the consumer group to the passive cluster and back (in the context of >> the topic itself), without much reprocessing; I see this useful when a >> cluster gets busy at times. Or even have a new consumer group consume >> messages from the passive cluster for a while, before “failing it over” to >> the active cluster. Is this something that you would recommend using the >> feature for? >> >> >> >> Best, >> >> Vidor >> >> On Mon, Oct 28, 2024 at 7:25 PM Dániel Urbán <urb.dani...@gmail.com> >> wrote: >> >> > Hi Viktor, >> > >> > SVV1. Not easy to provide a number, but yes, it does scale with the >> number >> > of replicated topic partitions. Enabling this feature will add the >> overhead >> > of an extra consumer, and allocates memory for an offset-sync index for >> > each partition. The index is limited to 64 entries. I could give an >> upper >> > bound of the memory usage as a function of the number of replicated >> > topic-partitions, but not sure if it would be useful for users, and to >> > where exactly document this. Wdyt? >> > >> > No worries, thanks for looking at the KIP! >> > Daniel >> > >> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt írta >> > (időpont: 2024. okt. 28., H, 17:07): >> > >> > > Hi Daniel, >> > > >> > > SVV1. Fair points about the performance impact. The next question is >> that >> > > can we quantify it somehow, ie. does it scale with the number of >> topics >> > to >> > > reverse checkpoints, the offsets emitted, etc.? >> > > >> > > I'll do one more pass on the KIP in the following days but I wanted to >> > > reply to you with what I have so far to keep this going. >> > > >> > > Best, >> > > Viktor >> > > >> > > On Fri, Oct 25, 2024 at 5:32 PM Dániel Urbán <urb.dani...@gmail.com> >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > One more update. As I was working on the PR, I realized that the >> only >> > way >> > > > to support IdentityReplicationPolicy is to add an extra topic >> filter to >> > > the >> > > > checkpointing. I updated the KIP accordingly. >> > > > I also opened a draft PR to demonstrate the proposed changes: >> > > > https://github.com/apache/kafka/pull/17593 >> > > > >> > > > Daniel >> > > > >> > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. okt. >> > 24., >> > > > Cs, >> > > > 15:22): >> > > > >> > > > > Hi all, >> > > > > Just a bump/minor update here: >> > > > > As I've started working on a POC of the proposed solution, I've >> > > realised >> > > > > that the hard requirement related to the ReplicationPolicy >> > > implementation >> > > > > can be eliminated, updated the KIP accordingly. >> > > > > Daniel >> > > > > >> > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. >> okt. >> > > 21., >> > > > > H, 16:18): >> > > > > >> > > > >> Hi Mickael, >> > > > >> Good point, I renamed the KIP and this thread: >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing+in+MirrorMaker >> > > > >> Thank you, >> > > > >> Daniel >> > > > >> >> > > > >> Mickael Maison <mickael.mai...@gmail.com> ezt írta (időpont: >> 2024. >> > > okt. >> > > > >> 21., H, 15:22): >> > > > >> >> > > > >>> Hi Daniel, >> > > > >>> >> > > > >>> I've not had time to take a close look at the KIP but my initial >> > > > >>> feedback would be to adjust the name to make it clear it's about >> > > > >>> MirrorMaker. >> > > > >>> The word "checkpoint" has several meanings in Kafka and from the >> > > > >>> current KIP name it's not clear if it's about KRaft, Streams or >> > > > >>> Connect. >> > > > >>> >> > > > >>> Thanks, >> > > > >>> Mickael >> > > > >>> >> > > > >>> On Mon, Oct 21, 2024 at 2:55 PM Dániel Urbán < >> > urb.dani...@gmail.com> >> > > > >>> wrote: >> > > > >>> > >> > > > >>> > Hi Viktor, >> > > > >>> > >> > > > >>> > Thank you for the comments! >> > > > >>> > >> > > > >>> > SVV1: I think the feature has some performance implications. >> If >> > the >> > > > >>> reverse >> > > > >>> > checkpointing is enabled, task startup will be possibly >> slower, >> > > since >> > > > >>> it >> > > > >>> > will need to consume from a second offset-syncs topic; and it >> > will >> > > > >>> also use >> > > > >>> > more memory, to keep the second offset-sync history. >> > Additionally, >> > > it >> > > > >>> is >> > > > >>> > also possible to have an offset-syncs topic present without an >> > > > actual, >> > > > >>> > opposite flow being active - I think only users can tell if >> the >> > > > reverse >> > > > >>> > checkpointing should be active, and they should be the one >> opting >> > > in >> > > > >>> for >> > > > >>> > the higher resource usage. >> > > > >>> > >> > > > >>> > SVV2: I mention the DefaultReplicationPolicy to provide >> > examples. I >> > > > >>> don't >> > > > >>> > think it is required. The actual requirement related to the >> > > > >>> > ReplicationPolicy is that the policy should be able to >> correctly >> > > tell >> > > > >>> which >> > > > >>> > topic was replicated from which cluster. Because of this, >> > > > >>> > IdentityReplicationPolicy would not work, but >> > > > >>> DefaultReplicationPolicy, or >> > > > >>> > any other ReplicationPolicy implementations with a correctly >> > > > >>> implemented >> > > > >>> > "topicSource" method should work. I will make an explicit >> note of >> > > > this >> > > > >>> in >> > > > >>> > the KIP. >> > > > >>> > >> > > > >>> > Thank you, >> > > > >>> > Daniel >> > > > >>> > >> > > > >>> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt >> > írta >> > > > >>> > (időpont: 2024. okt. 18., Pén 17:28): >> > > > >>> > >> > > > >>> > > Hey Dan, >> > > > >>> > > >> > > > >>> > > I think this is a very useful idea. Two questions: >> > > > >>> > > >> > > > >>> > > SVV1: Do you think we need the feature flag at all? I know >> that >> > > not >> > > > >>> having >> > > > >>> > > this flag may technically render the KIP unnecessary >> (however >> > it >> > > > may >> > > > >>> still >> > > > >>> > > be useful to discuss this topic and create a concensus). As >> you >> > > > >>> wrote in >> > > > >>> > > the KIP, we may be able to look up the target and source >> topics >> > > and >> > > > >>> if we >> > > > >>> > > can do this, we can probably detect if the replication is >> > one-way >> > > > or >> > > > >>> > > prefixless (identity). So that means we don't need this >> flag to >> > > > >>> control >> > > > >>> > > when we want to use this. Then it is really just there to >> act >> > as >> > > > >>> something >> > > > >>> > > that can turn the feature on and off if needed, but I'm not >> > > really >> > > > >>> sure if >> > > > >>> > > there is a great risk in just enabling this by default. If >> we >> > > > really >> > > > >>> just >> > > > >>> > > turn back the B -> A checkpoints and save them in the A -> >> B, >> > > then >> > > > >>> maybe >> > > > >>> > > it's not too risky and users would get this immediately by >> just >> > > > >>> upgrading. >> > > > >>> > > >> > > > >>> > > SVV2: You write that we need DefaultReplicationPolicy to use >> > this >> > > > >>> feature, >> > > > >>> > > but most of the functionality is there on interface level in >> > > > >>> > > ReplicationPolicy. Is there anything that is missing from >> there >> > > and >> > > > >>> if so, >> > > > >>> > > what do you think about pulling it into the interface? If >> this >> > > > >>> improvement >> > > > >>> > > only works with the default replication policy, then it's >> > > somewhat >> > > > >>> limiting >> > > > >>> > > as users may have their own policy for various reasons, but >> if >> > we >> > > > >>> can make >> > > > >>> > > it work on the interface level, then we could provide this >> > > feature >> > > > to >> > > > >>> > > everyone. Of course there can be replication policies like >> the >> > > > >>> identity one >> > > > >>> > > that by design disallows this feature, but for that, see my >> > > > previous >> > > > >>> point. >> > > > >>> > > >> > > > >>> > > Best, >> > > > >>> > > Viktor >> > > > >>> > > >> > > > >>> > > On Fri, Oct 18, 2024 at 3:30 PM Dániel Urbán < >> > > > urb.dani...@gmail.com> >> > > > >>> > > wrote: >> > > > >>> > > >> > > > >>> > > > Hi everyone, >> > > > >>> > > > >> > > > >>> > > > I'd like to start the discussion on KIP-1098: Reverse >> > > > >>> Checkpointing ( >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > >> > > > >>> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing >> > > > >>> > > > ) >> > > > >>> > > > which aims to minimize message reprocessing for consumers >> in >> > > > >>> failbacks. >> > > > >>> > > > >> > > > >>> > > > TIA, >> > > > >>> > > > Daniel >> > > > >>> > > > >> > > > >>> > > >> > > > >>> >> > > > >> >> > > > >> > > >> > >> >