Hi, Jose, Thanks for the updated KIP. A few more comments below.
20. "Metadata Cache: The component that generates snapshots, reads snapshots and reads logs for observer replicas of the topic partition __cluster_metadata." It seems this is needed on every broker, not just observers? 21. Our current convention is to use exclusive offset for naming checkpoint files. For example, a producer snapshot file of 1234.snapshot means that the file includes the producer state up to, but not including offset 1234. So, we probably want to follow the same convention for the new checkpoint file. 22. Snapshot Format: KIP-631 only defines the format for individual records. It seems that we need to define the container format here. For example, we need to store the length of each record. Also, does the snapshot file need a CRC field? 23. Could we provide the default value for the new configs controller.snapshot.minimum.records and max.replication.lag.ms. Also, max.replication.lag.ms seems to just control the snapshot frequency by time and not directly relate to replication. So, maybe it should be called sth like controller.snapshot.minimum.interval.ms? 24. "Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be validated using the same logic enumerated above." Hmm, should we allow deleteRecord on the metadata topic? If we do, does it trim the snapshot accordingly? 25. "The followers of the __cluster_metadata topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the replicated log." My understanding is that a follower will either fetch from the snapshot or the log, but not both at the same time. Could you explain how the concurrent part works? Also, what's an incomplete snapshot? 26. FetchRequest: 26.1 Handling Fetch Request: I agree with Jason that SnapshotOffsetAndEpoch already tells us the next offset to fetch. So, we don't need to set NextOffsetAndEpoch in the response. 26.2 Is there a reason to rename LogStartOffset to LogBeginOffset? I am not sure if they are truly identical semantically. For example, currently, the follower moves it's logStartOffset based on the leader's. Will we do the same thing with LogBeginOffset? 27. FetchSnapshotRequest: It seems that SnapshotOffsetAndEpoch shouldn't be optional. Also, its version number 12 is incorrect. 28. FetchSnapshotResponse: Do we need the position field? It seems it's the same as in the request. 29. "OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is greater than the size of the snapshot." By offset, do you mean position? 30. It's possible for a broker to die while copying the snapshot file from the leader or saving its locally generated snapshot. On restart, how does the broker know whether a local snapshot file is complete or not? Thanks, Jun On Fri, Sep 18, 2020 at 1:38 PM Jason Gustafson <ja...@confluent.io> wrote: > Hi Jose, > > A few comments/questions below: > > 1. There is a comment in the proposal which suggests that we will maintain > multiple snapshots: > > > Having multiple snapshots is useful for minimizing re-fetching of the > snapshot when a new snapshot is generated. > > However, the document later says that snapshots get deleted as the LBO > advances. Just wanted to clarify the intent. Will we generally only have > one snapshot? > > 2. The proposal says the following: > > > During leader election, followers with incomplete or missing snapshot > will send a vote request and response as if they had an empty log. > > Maybe you can help me understand the scenario we're talking about since I'm > not sure I understand the point of this. If the intent is to not allow such > a follower to become leader, why would it ever become a candidate? On the > other hand, if the intent is to still allow it to become leader in some > disaster scenario, then why would it not use its latest log state? For > inbound Vote requests, I think it should definitely still consider its > latest log state when deciding whether to grant a vote. > > 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as > well? It looks like we are specifying this at the partition level, but it > might be more useful to track the maximum bytes at the request level. On a > related note, it might be useful to think through heuristics for balancing > between the requests in a partition. Unlike fetches, it seems like we'd > want to complete snapshot loading partition by partition. I wonder if it > would be simpler for FetchSnapshot to handle just one partition. > > 4. It would help if the document motivated the need to track the snapshot > epoch. Since we are only snapshotting below the high watermark, are you > thinking about recovering from data loss scenarios? > > 5. Might need to fix the following: > > > Otherwise, the leader will respond with the offset and epoch of the > latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d) > > We ended up renaming the next fetch offset and epoch. I think we should > just leave it empty in this case. The snapshot offset and epoch seem > sufficient. > > > Thanks, > Jason > > On Fri, Aug 7, 2020 at 11:33 AM Jose Garcia Sancio <jsan...@confluent.io> > wrote: > > > Thanks for your feedback Jun. > > > > Here are my changes to the KIP: > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763&selectedPageVersions=21&selectedPageVersions=20 > > > > My comments are below... > > > > On Wed, Aug 5, 2020 at 1:59 PM Jun Rao <j...@confluent.io> wrote: > > > > > > Hi, Jose, > > > > > > Thanks for the KIP. A few comments blow. > > > > > > 10. I agree with Jason that it's useful to document the motivation a > bit > > > clearer. Regarding semantic/performance, one benefit of snapshotting is > > > that it allows changes to be encoded incrementally instead of using the > > > full post image. For example, in KIP-631, each partition has multiple > > > fields like assigned replicas, leader, epoch, isr, etc. If only isr is > > > changed, the snapshotting approach allows the change to be represented > > with > > > just the new value in isr. Compaction will require all existing fields > to > > > be included in order to represent just an isr change. This is just > > because > > > we can customize the combining logic with snapshotting. > > > > > > > Yes. Right now the IsrChange record from KIP-631 has both the ISR and > > the leader and epoch. I think we can split this record into two > > records: > > 1. ISR change that includes the topic, partition, and isr. > > 2. Leader change that includes the topic, partition, leader and leader > > epoch. > > I'll bring this up in the discussion thread for that KIP. > > > > > As for the > > > performance benefit, I guess in theory snapshotting allows the snapshot > > to > > > be updated in-place incrementally without having to read the full state > > in > > > the snapshot. BTW, during compaction, we only read the cleaned data > once > > > instead of 3 times. > > > > > > > Doesn't compaction need to read the clean records to compare if the > > key is in the map of keys to offset? I made the following changes to > > the KIP: > > > > 2. With log compaction the broker needs to > > a. read 1MB/s from the head of the log to update the in-memory state > > b. read 1MB/s to update the map of keys to offsets > > c. read 3MB/s (100MB from the already compacted log, 50MB from the > > new key-value records) from the older segments. The log will > > accumulate 50MB in 50 seconds worth of changes before compacting > > because the default configuration has a minimum clean ratio of 50%. > > > > The "100MB in the already compacted log" are these cleaned records. > > Let me know what you think and if I am missing something. > > > > > 11. The KIP mentions topic id. Currently there is no topic id. Does > this > > > KIP depend on KIP-516? > > > > > > > For the purpose of measuring the impact, I was using the records > > proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on > > its design and implementation. I was just referencing that KIP in the > > motivation and analysis. The KIP only assumes the changes in KIP-595 > > which has been approved but it are not part of trunk yet. > > > > In the overview section the KIP mentions: "This KIP assumes that > > KIP-595 has been approved and implemented. " > > > > > 12. Is there a need to keep more than 1 snapshot? It seems we always > > expose > > > the latest snapshot to clients. > > > > > > > The KIP proposes keeping more than one snapshot to not invalidate any > > pending/concurrent `FetchSnapshot` that are attempting to fetch a > > snapshot that can be deleted. I'll remove this wording as the first > > version of this implementation will probably won't have this feature > > as it requires extra coordination. The implementation will still allow > > for multiple snapshots because generating a snapshot is not atomic > > with respect to increasing the LBO. > > > > > > > 13. "During leader election, followers with incomplete or missing > > snapshot > > > will send a vote request and response as if they had an empty log." > Hmm, > > a > > > follower may not have a snapshot created, but that doesn't imply its > log > > is > > > empty. > > > > > > > Yes. I fixed the "Validation of Snapshot and Log" and that sentence. > > I basically added an additional condition where a snapshot is not > > required if the LBO is 0. > > > > > 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How > do > > we > > > compare an offset to a time? > > > > > > > Yeah. This may be hard to implement. I am trying to avoid invalidating > > followers and observers by aggressively deleting an offset/record > > which they are trying to fetch. It is possible that > > `controller.snapshot.minimum.records` is good enough to throttle > > increasing LBO. > > > > > 15. "Followers and observers will increase their log begin offset to > the > > > value sent on the fetch response as long as the local state machine has > > > generated a snapshot that includes to the log begin offset minus one." > > Does > > > the observer store the log? I thought it only needed to maintain a > > > snapshot. If so, the observer doesn't need to maintain LBO. > > > > > > > In KIP-595 observers are similar to voters/followers in that they > > Fetch the log and the snapshot. Two of the distinctions are that they > > don't participate in the leader election and they are not included > > when computing the high-watermark. Regarding storing: it is possible > > that observers never need to store the log since they don't vote or > > become leaders. I think in the future we would like to implement > > "KIP-642: Dynamic quorum reassignment" which would add the capability > > to upgrade an observer to a voter. In that case we do want to store > > the log. > > > > > > > 16. "There are two cases when the Kafka Controller needs to load the > > > snapshot: When it is booting. When the follower and observer replicas > > > finishes fetching a new snapshot from the leader." For faster failover, > > it > > > seems it's useful for a non-controller voter to maintain the in-memory > > > metadata state. In order to do that, it seems that every voter needs to > > > load the snapshot on booting? > > > > > > > Yeah. I think there is some confusion on terminology. This document > > refers to Kafka Controller as the component in every voter that reads > > the snapshot and log. The Active Controller is the leader from that > > set of voters (Kafka Controller). I added a terminology section in the > > Overview section. > > > > > 17. "There is an invariant that every log must have at least one > snapshot > > > where the offset of the snapshot is between LogBeginOffset - 1 and > > > High-Watermark. If this is not the case then the replica should assume > > that > > > the log is empty." Does that invariant hold when there is no snapshot > > > initially? > > > > > > > You are correct that this invariant doesn't hold for a replica that > > has a LBO of 0. I fixed this section to make this additional case > > clear. > > > > > 18. "The __cluster_metadata topic will have an cleanup.policy value of > > > snapshot" Is there a need to make this configurable if it's read-only? > > > > > > > Hmm. Every topic has a default cleanup.policy of delete. Looking at > > the code we also required that at least one of them is set. We need a > > way to disable the existing delete and compaction clean up policy. In > > LogConfig.scala we have: > > > > ``` > > ... > > .define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy, > > ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc, > > KafkaConfig.LogCleanupPolicyProp) > > ... > > ``` > > > > > 19. OFFSET_OUT_OF_RANGE in FetchSnapshotResponse: It seems that > > > POSITION_OUT_OF_RANGE is more appropriate? > > > > > > > Are you suggesting we introduce a new error type? For > > OFFSET_OUT_OF_RANGE we have > > > > OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range > > of offsets maintained by the server.", > > OffsetOutOfRangeException::new), > > > > Are you arguing that this is not entirely correct and instead the > > request offset is encoded/included in a snapshot? > > > > -- > > -Jose > > >