Hello Jose, Thanks for the KIP. Overall it looks great. I have a few meta / minor question, or maybe just clarifications below:
Meta: 1. I want to clarify that if only the active controller would generate snapshots, OR would any voter in the quorum would generate snapshots, OR would even observers generate snapshots? Originally I thought it was the latter case, but I think reading through the doc I got confused by some paragraphs. E.g. you mentioned snapshots are generated by the Controller module, and observers would not have that module. 2. Following on Jun's previous comment: currently the __consumer_metadata log is replicated on ALL brokers since all voters and observers would replicate that topic. I know this may be out of the scope of this KIP but I think maybe only letting the voters to replicate (and periodically truncate) the log while observers only maintain the in-memory state and snapshots is a good trade-off here, assuming snapshot loading is relatively fast. 3. When a raft client is in the middle of loading a snapshot, should it reject any vote / begin-/end-/describe-quorum requests at the time? More generally, while a snapshot is being loaded, how should we treat the current state of the client when handling Raft requests. Minor: 4."All of the live replicas (followers and observers) have replicated LBO". Today the raft layer does not yet maintain LBO across all replicas, is this information kept in the controller layer? I'm asking because I do not see relevant docs in KIP-631 and hence a bit confused which layer is responsible for bookkeeping the LBOs of all replicas. 5. "Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log begin offset." Not sure I follow this: 1) If observers do not generate snapshots since they do not have a Controller module on them, then it is possible that observers do not have any snapshots at all if they do not get one from the leader, in that case they would never truncate the logs locally; 2) could you clarify on "value sent on the fetch response", are you referring to the "HighWatermark", or "LogStartOffset" in the schema, or some other fields? 6. The term "SnapshotId" is introduced without definition at first. My understanding is it's defined as a combo of <endOffset, epoch>, could you clarify if this is the case? BTW I think the term "endOffset" is a term used per log, and maybe calling the part of the SnapshotId "nextOffset" is better since that offset is likely already filled with a record. 7. This is a very nit one: "If the latest snapshot has an epoch E and end offset O and is it newer than the LEO of the replicated log, then the replica must set the LBO and LEO to O." On wiki `O` and `0` looks very much the same and that confused me a couple of times... I'd suggest we phrase any of such occussions to "an epoch e1 and offset o1". Also for LEO since we would not really know what would be its epoch (since it may be bumped) when comparing we only care about the offset and not about the epoch right? If yes, please clarify that in the doc as well. 8. "LEO - log end offset - the largest offset and epoch that has been written to disk." I think LEO is the "next" offset to be written to the log right? Also it seems consistent with your diagrams. 9. "... will send a vote request and response as if they had an empty log." Not sure I completely follow this, do you mean that they will set "LastOffsetEpoch/LastOffset" as "-1/0" when sending a vote request, and upon receiving a vote request it would compare the request's "LastOffsetEpoch/LastOffset" with "-1/0" as well? 10. In the FetchSnapshot response schema, just to clarify the "Position" : "The byte position within the snapshot." is referring to the starting byte position of the returned snapshot data, right? Thanks, Guozhang On Fri, Sep 25, 2020 at 4:42 PM Jun Rao <j...@confluent.io> wrote: > Hi, Jose, > > Thanks for the reply. A few more comments below. > > 20. Good point on metadata cache. I think we need to make a decision > consistently. For example, if we decide that dedicated voter nodes don't > serve metadata requests, then we don't need to expose the voters host/port > to the client. Which KIP should make this decision? > > 31. controller.snapshot.minimum.records: For a compacted topic, we use a > ratio instead of the number of records to determine when to compact. This > has some advantages. For example, if we use > controller.snapshot.minimum.records and set it to 1000, then it will > trigger the generation of a new snapshot when the existing snapshot is > either 10MB or 1GB. Intuitively, the larger the snapshot, the more > expensive it is to write to disk. So, we want to wait for more data to be > accumulated before generating the next snapshot. The ratio based setting > achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be > accumulated to regenerate a 10MB/1GB snapshot respectively. > > 32. max.replication.lag.ms: It seems this is specific to the metadata > topic. Could we make that clear in the name? > > Thanks, > > Jun > > On Fri, Sep 25, 2020 at 12:43 PM Jose Garcia Sancio <jsan...@confluent.io> > wrote: > > > Thanks for the detailed feedback Jun. > > > > The changes are here: > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763&selectedPageVersions=25&selectedPageVersions=24 > > > > Here is a summary of the change to the KIP: > > 1. Use end offset for snapshot and snapshot id. > > 2. Include default for all of the new configuration options. > > 3. Provide more detail in the response handling for FetchSnapshot > > > > > 20. "Metadata Cache: The component that generates snapshots, reads > > > snapshots and reads logs for observer replicas of the topic partition > > > __cluster_metadata." It seems this is needed on every broker, not just > > > observers? > > > > Yes. I think we need some clarification and consensus here. Some > > people are advocating for Kafka brokers to only be observers and would > > only contain a Metadata Cache. With the Kafka Controllers being > > separate nodes that are voters (follower, candidate or leader) and not > > observers. Others are advocating for Kafka Brokers to be able to host > > both the Kafka Controller and the Metadata Cache. In this case if the > > Controller and Metadata Cache are sharing the same underlying topic > > partition then we need to make sure that we unify the snapshotting > > logic. > > > > I would like to be able to unify the in-memory state for both the > > Kafka Controller and the Metadata Cache so that we can share the same > > replicated log and snapshot. > > > > > 21. Our current convention is to use exclusive offset for naming > > > checkpoint files. For example, a producer snapshot file of > 1234.snapshot > > > means that the file includes the producer state up to, but not > including > > > offset 1234. So, we probably want to follow the same convention for the > > new > > > checkpoint file. > > > > Thanks for pointing this out. This sounds good to me. This was a > > detail that I was struggling with when reading the replication code. > > Updated the KIP. Wherever the offset is exclusive, I renamed it to > > "end offset" (EndOffset). > > > > > 22. Snapshot Format: KIP-631 only defines the format for individual > > > records. It seems that we need to define the container format here. For > > > example, we need to store the length of each record. Also, does the > > > snapshot file need a CRC field? > > > > Yes. I have added more information on this. In summary, we are going > > to use Kafka's log format version 2. This will give us support for > > compression and CRC at the record batch level. The Kafka Controller > > and Metadata Cache can control how big they want the batches to be. > > > > > 23. Could we provide the default value for the new > > > configs controller.snapshot.minimum.records and max.replication.lag.ms > . > > > Also, max.replication.lag.ms seems to just control the snapshot > > frequency > > > by time and not directly relate to replication. So, maybe it should be > > > called sth like controller.snapshot.minimum.interval.ms? > > > > "max.replication.lag.ms" is very similar to "replica.lag.time.max.ms". > > Kafka uses "replica.lag.time.max.ms" to make progress on the > > high-watermark when replicas are slow or offline. We want to use > > "max.replication.lag.ms" to make progress on the LBO when replicas are > > slow or offline. These very similar names are confusing. How about > > "replica.lbo.lag.time.max.ms"? > > > > How often snapshotting will happen is determined by > > "controller.snapshot.minimum.records". > > > > > 24. "Kafka allows the clients to delete records that are less than a > > given > > > offset by using the DeleteRecords RPC . Those requests will be > validated > > > using the same logic enumerated above." Hmm, should we allow > deleteRecord > > > on the metadata topic? If we do, does it trim the snapshot accordingly? > > > > Yeah. After thinking about it some more, I don't think we shouldn't > > allow DeleteRecords to succeed on the __cluster_metadata partition. > > For the error that we return it looks like our options are the > > existing "POLICY_VIOLATIOIN" (the description for this error is > > "Request parameters do not satisfy the configured policy.') or > > introduce a new error. I think we should just return > > POLICY_VIOLATIOIN, what do you think? > > > > > 25. "The followers of the __cluster_metadata topic partition will > > > concurrently fetch the snapshot and replicated log. This means that > > > candidates with incomplete snapshots will send a vote request with a > > > LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the > > > replicated log." My understanding is that a follower will either fetch > > from > > > the snapshot or the log, but not both at the same time. Could you > explain > > > how the concurrent part works? Also, what's an incomplete snapshot? > > > > Yes. I rewrote this section based on your comment and Jason's > > comments. Let me know if this addresses your concerns. > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-630:+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ChangestoLeaderElection > > > > > > > > 26. FetchRequest: > > > 26.1 Handling Fetch Request: I agree with Jason that > > SnapshotOffsetAndEpoch > > > already tells us the next offset to fetch. So, we don't need to > > > set NextOffsetAndEpoch in the response. > > > > Agreed. The response will set one or the other. If SnapshotId (field > > renamed in the latest version of the KIP) is set then the follower > > will fetch the snapshot and follow the logic set in this KIP. If > > DivergingEpoch (field renamed in the latest version of KIP-595) is set > > then the follower will follow the truncation logic set in KIP-595. > > > > > 26.2 Is there a reason to rename LogStartOffset to LogBeginOffset? I am > > not > > > sure if they are truly identical semantically. For example, currently, > > the > > > follower moves it's logStartOffset based on the leader's. Will we do > the > > > same thing with LogBeginOffset? > > > > They are identical. I renamed it so that we can use the acronym LBO > > and to match the concept of log end off (LEO). We can't use the > > acronym LSB because we use that for last stable offset. What do you > > think? > > > > In this KIP the follower will also use the leader's LogBeginOffset > > (currently known as log start offset) to advance it's LBO. The > > difference is that followers can only advance it if they also have a > > snapshot with an end offset greater than or equal to the new LBO. This > > is documented in the "When to Increase the Log Begin Offset" section: > > > > Followers and observers will increase their log begin offset to the > > value sent on the fetch response as long as the local Kafka Controller > > and Metadata Cache has generated a snapshot with an end offset greater > > than or equal to the new log begin offset. > > > > > > > > 27. FetchSnapshotRequest: It seems that SnapshotOffsetAndEpoch > shouldn't > > be > > > optional. Also, its version number 12 is incorrect. > > > > Fixed. > > > > > 28. FetchSnapshotResponse: Do we need the position field? It seems it's > > the > > > same as in the request. > > > > I decided to return the position field so that the follower doesn't > > need to remember the request sent. > > > > > 29. "OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is > > > greater than the size of the snapshot." By offset, do you mean > position? > > > > Yes. Fixed. > > > > > 30. It's possible for a broker to die while copying the snapshot file > > from > > > the leader or saving its locally generated snapshot. On restart, how > does > > > the broker know whether a local snapshot file is complete or not? > > > > When fetching a snapshot the current implementation writes the content > > to a "temporary" file in the "checkpoints" directory (FYI, I think > > this checkpoints directory was added after your review) of the topic > > partition.The name of this file is > > "<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part". After the > > follower has finished downloading the entire snapshot it does an > > atomic move/rename to > > "<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint". > > > > We can safely ignore the file named *.part when determining the state > > of the replica with respect to leader election. When a replica starts > > up it can resume fetching the snapshot based on the state of the local > > *.part file. > > > > I updated the section "Response Handler" for FetchSnapshot with this > > information. > > > > -- > > -Jose > > > -- -- Guozhang