Hello folks, I've also updated the KIP wiki page adding a section of the proposed metrics for this Raft protocol:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP-595:ARaftProtocolfortheMetadataQuorum-Metrics Please let us know if you have any thoughts about them as well. Guozhang On Thu, May 7, 2020 at 4:04 PM Guozhang Wang <wangg...@gmail.com> wrote: > Thanks Jun. Further replies are in-lined. > > On Mon, May 4, 2020 at 11:58 AM Jun Rao <j...@confluent.io> wrote: > >> Hi, Guozhang, >> >> Thanks for the reply. A few more replies inlined below. >> >> On Sun, May 3, 2020 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Hello Jun, >> > >> > Thanks for your comments! I'm replying inline below: >> > >> > On Fri, May 1, 2020 at 12:36 PM Jun Rao <j...@confluent.io> wrote: >> > >> > > 101. Bootstrapping related issues. >> > > 101.1 Currently, we support auto broker id generation. Is this >> supported >> > > for bootstrap brokers? >> > > >> > >> > The vote ids would just be the broker ids. "bootstrap.servers" would be >> > similar to what client configs have today, where "quorum.voters" would >> be >> > pre-defined config values. >> > >> > >> My question was on the auto generated broker id. Currently, the broker can >> choose to have its broker Id auto generated. The generation is done >> through >> ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id >> is >> auto generated. "quorum.voters" also can't be set statically if broker ids >> are auto generated. >> >> Jason has explained some ideas that we've discussed so far, the reason we > intentional did not include them so far is that we feel it is out-side the > scope of KIP-595. Under the umbrella of KIP-500 we should definitely > address them though. > > On the high-level, our belief is that "joining a quorum" and "joining (or > more specifically, registering brokers in) the cluster" would be > de-coupled a bit, where the former should be completed before we do the > latter. More specifically, assuming the quorum is already up and running, > after the newly started broker found the leader of the quorum it can send a > specific RegisterBroker request including its listener / protocol / etc, > and upon handling it the leader can send back the uniquely generated broker > id to the new broker, while also executing the "startNewBroker" callback as > the controller. > > >> >> > > 102. Log compaction. One weak spot of log compaction is for the >> consumer >> > to >> > > deal with deletes. When a key is deleted, it's retained as a tombstone >> > > first and then physically removed. If a client misses the tombstone >> > > (because it's physically removed), it may not be able to update its >> > > metadata properly. The way we solve this in Kafka is based on a >> > > configuration (log.cleaner.delete.retention.ms) and we expect a >> consumer >> > > having seen an old key to finish reading the deletion tombstone within >> > that >> > > time. There is no strong guarantee for that since a broker could be >> down >> > > for a long time. It would be better if we can have a more reliable >> way of >> > > dealing with deletes. >> > > >> > >> > We propose to capture this in the "FirstDirtyOffset" field of the quorum >> > record fetch response: the offset is the maximum offset that log >> compaction >> > has reached up to. If the follower has fetched beyond this offset it >> means >> > itself is safe hence it has seen all records up to that offset. On >> getting >> > the response, the follower can then decide if its end offset actually >> below >> > that dirty offset (and hence may miss some tombstones). If that's the >> case: >> > >> > 1) Naively, it could re-bootstrap metadata log from the very beginning >> to >> > catch up. >> > 2) During that time, it would refrain itself from answering >> MetadataRequest >> > from any clients. >> > >> > >> I am not sure that the "FirstDirtyOffset" field fully addresses the issue. >> Currently, the deletion tombstone is not removed immediately after a round >> of cleaning. It's removed after a delay in a subsequent round of cleaning. >> Consider an example where a key insertion is at offset 200 and a deletion >> tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A >> follower/observer fetches from offset 0 and fetches the key at offset >> 200. >> A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the >> tombstone at 400 is physically removed. The follower/observer continues >> the >> fetch, but misses offset 400. It catches all the way to FirstDirtyOffset >> and declares its metadata as ready. However, its metadata could be stale >> since it actually misses the deletion of the key. >> >> Yeah good question, I should have put more details in my explanation :) > > The idea is that we will adjust the log compaction for this raft based > metadata log: before more details to be explained, since we have two types > of "watermarks" here, whereas in Kafka the watermark indicates where every > replica have replicated up to and in Raft the watermark indicates where the > majority of replicas (here only indicating voters of the quorum, not > counting observers) have replicated up to, let's call them Kafka watermark > and Raft watermark. For this special log, we would maintain both watermarks. > > When log compacting on the leader, we would only compact up to the Kafka > watermark, i.e. if there is at least one voter who have not replicated an > entry, it would not be compacted. The "dirty-offset" is the offset that > we've compacted up to and is communicated to other voters, and the other > voters would also compact up to this value --- i.e. the difference here is > that instead of letting each replica doing log compaction independently, > we'll have the leader to decide upon which offset to compact to, and > propagate this value to others to follow, in a more coordinated manner. > Also note when there are new voters joining the quorum who has not > replicated up to the dirty-offset, of because of other issues they > truncated their logs to below the dirty-offset, they'd have to re-bootstrap > from the beginning, and during this period of time the leader learned about > this lagging voter would not advance the watermark (also it would not > decrement it), and hence not compacting either, until the voter(s) has > caught up to that dirty-offset. > > So back to your example above, before the bootstrap voter gets to 300 no > log compaction would happen on the leader; and until later when the voter > have got to beyond 400 and hence replicated that tombstone, the log > compaction would possibly get to that tombstone and remove it. Say later it > the leader's log compaction reaches 500, it can send this back to the voter > who can then also compact locally up to 500. > > >> > > 105. Quorum State: In addition to VotedId, do we need the epoch >> > > corresponding to VotedId? Over time, the same broker Id could be >> voted in >> > > different generations with different epoch. >> > > >> > > >> > Hmm, this is a good point. Originally I think the "LeaderEpoch" field in >> > that file is corresponding to the "latest known leader epoch", not the >> > "current leader epoch". For example, if the current epoch is N, and >> then a >> > vote-request with epoch N+1 is received and the voter granted the vote >> for >> > it, then it means for this voter it knows the "latest epoch" is N + 1 >> > although it is unknown if that sending candidate will indeed become the >> new >> > leader (which would only be notified via begin-quorum request). However, >> > when persisting the quorum state, we would encode leader-epoch to N+1, >> > while the leaderId to be the older leader. >> > >> > But now thinking about this a bit more, I feel we should use two >> separate >> > epochs, one for the "lates known" and one for the "current" to pair with >> > the leaderId. I will update the wiki page. >> > >> > >> Hmm, it's kind of weird to bump up the leader epoch before the new leader >> is actually elected, right. >> >> >> > > 106. "OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to >> indicate >> > > that the follower has fetched from an invalid offset and should >> truncate >> > to >> > > the offset/epoch indicated in the response." Observers can't truncate >> > their >> > > logs. What should they do with OFFSET_OUT_OF_RANGE? >> > > >> > > >> > I'm not sure if I understand your question? Observers should still be >> able >> > to truncate their logs as well. >> > >> > >> Hmm, I thought only the quorum nodes have local logs and observers don't? >> >> > 107. "The leader will continue sending BeginQuorumEpoch to each known >> > voter >> > > until it has received its endorsement." If a voter is down for a long >> > time, >> > > sending BeginQuorumEpoch seems to add unnecessary overhead. Similarly, >> > if a >> > > follower stops sending FetchQuorumRecords, does the leader keep >> sending >> > > BeginQuorumEpoch? >> > > >> > >> > Regarding BeginQuorumEpoch: that is a good point. The begin-quorum-epoch >> > request is for voters to quickly get the new leader information; however >> > even if they do not get them they can still eventually learn about that >> > from others via gossiping FindQuorum. I think we can adjust the logic to >> > e.g. exponential back-off or with a limited num.retries. >> > >> > Regarding FetchQuorumRecords: if the follower sends FetchQuorumRecords >> > already, it means that follower already knows that the broker is the >> > leader, and hence we can stop retrying BeginQuorumEpoch; however it is >> > possible that after a follower sends FetchQuorumRecords already, >> suddenly >> > it stops send it (possibly because it learned about a higher epoch >> leader), >> > and hence this broker may be a "zombie" leader and we propose to use the >> > fetch.timeout to let the leader to try to verify if it has already been >> > stale. >> > >> > >> It just seems that we should handle these two cases in a consistent way? >> >> Yes I agree, on the leader's side, the FetchQuorumRecords from a follower > could mean that we no longer needs to send BeginQuorumEpoch anymore --- and > it is already part of our current implementations in > https://github.com/confluentinc/kafka/commits/kafka-raft > > >> Thanks, >> >> Jun >> >> > >> > > >> > > Jun >> > > >> > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > > >> > > > Hello Leonard, >> > > > >> > > > Thanks for your comments, I'm relying in line below: >> > > > >> > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge <w...@confluent.io >> > >> > > > wrote: >> > > > >> > > > > Hi Kafka developers, >> > > > > >> > > > > It's great to see this proposal and it took me some time to finish >> > > > reading >> > > > > it. >> > > > > >> > > > > And I have the following questions about the Proposal: >> > > > > >> > > > > - How do we plan to test this design to ensure its >> correctness? Or >> > > > more >> > > > > broadly, how do we ensure that our new ‘pull’ based model is >> > > > functional >> > > > > and >> > > > > correct given that it is different from the original RAFT >> > > > implementation >> > > > > which has formal proof of correctness? >> > > > > >> > > > >> > > > We have two planned verifications on the correctness and liveness of >> > the >> > > > design. One is via model verification (TLA+) >> > > > https://github.com/guozhangwang/kafka-specification >> > > > >> > > > Another is via the concurrent simulation tests >> > > > >> > > > >> > > >> > >> https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91 >> > > > >> > > > - Have we considered any sensible defaults for the configuration, >> > i.e. >> > > > > all the election timeout, fetch time out, etc.? Or we want to >> > leave >> > > > > this to >> > > > > a later stage when we do the performance testing, etc. >> > > > > >> > > > >> > > > This is a good question, the reason we did not set any default >> values >> > for >> > > > the timeout configurations is that we think it may take some >> > benchmarking >> > > > experiments to get these defaults right. Some high-level principles >> to >> > > > consider: 1) the fetch.timeout should be around the same scale with >> zk >> > > > session timeout, which is now 18 seconds by default -- in practice >> > we've >> > > > seen unstable networks having more than 10 secs of transient >> > > connectivity, >> > > > 2) the election.timeout, however, should be smaller than the fetch >> > > timeout >> > > > as is also suggested as a practical optimization in literature: >> > > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf >> > > > >> > > > Some more discussions can be found here: >> > > > https://github.com/confluentinc/kafka/pull/301/files#r415420081 >> > > > >> > > > >> > > > > - Have we considered piggybacking `BeginQuorumEpoch` with the ` >> > > > > FetchQuorumRecords`? I might be missing something obvious but >> I am >> > > > just >> > > > > wondering why don’t we just use the `FindQuorum` and >> > > > > `FetchQuorumRecords` >> > > > > APIs and remove the `BeginQuorumEpoch` API? >> > > > > >> > > > >> > > > Note that Begin/EndQuorumEpoch is sent from leader -> other voter >> > > > followers, while FindQuorum / Fetch are sent from follower to >> leader. >> > > > Arguably one can eventually realize the new leader and epoch via >> > > gossiping >> > > > FindQuorum, but that could in practice require a long delay. Having >> a >> > > > leader -> other voters request helps the new leader epoch to be >> > > propagated >> > > > faster under a pull model. >> > > > >> > > > >> > > > > - And about the `FetchQuorumRecords` response schema, in the >> > > `Records` >> > > > > field of the response, is it just one record or all the records >> > > > starting >> > > > > from the FetchOffset? It seems a lot more efficient if we sent >> all >> > > the >> > > > > records during the bootstrapping of the brokers. >> > > > > >> > > > >> > > > Yes the fetching is batched: FetchOffset is just the starting >> offset of >> > > the >> > > > batch of records. >> > > > >> > > > >> > > > > - Regarding the disruptive broker issues, does our pull based >> > model >> > > > > suffer from it? If so, have we considered the Pre-Vote stage? >> If >> > > not, >> > > > > why? >> > > > > >> > > > > >> > > > The disruptive broker is stated in the original Raft paper which is >> the >> > > > result of the push model design. Our analysis showed that with the >> pull >> > > > model it is no longer an issue. >> > > > >> > > > >> > > > > Thanks a lot for putting this up, and I hope that my questions >> can be >> > > of >> > > > > some value to make this KIP better. >> > > > > >> > > > > Hope to hear from you soon! >> > > > > >> > > > > Best wishes, >> > > > > Leonard >> > > > > >> > > > > >> > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe <cmcc...@apache.org> >> > > wrote: >> > > > > >> > > > > > Hi Jason, >> > > > > > >> > > > > > It's amazing to see this coming together :) >> > > > > > >> > > > > > I haven't had a chance to read in detail, but I read the outline >> > and >> > > a >> > > > > few >> > > > > > things jumped out at me. >> > > > > > >> > > > > > First, for every epoch that is 32 bits rather than 64, I sort of >> > > wonder >> > > > > if >> > > > > > that's a good long-term choice. I keep reading about stuff like >> > > this: >> > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 . >> Obviously, >> > > > that >> > > > > > JIRA is about zxid, which increments much faster than we expect >> > these >> > > > > > leader epochs to, but it would still be good to see some rough >> > > > > calculations >> > > > > > about how long 32 bits (or really, 31 bits) will last us in the >> > cases >> > > > > where >> > > > > > we're using it, and what the space savings we're getting really >> is. >> > > It >> > > > > > seems like in most cases the tradeoff may not be worth it? >> > > > > > >> > > > > > Another thing I've been thinking about is how we do >> > bootstrapping. I >> > > > > > would prefer to be in a world where formatting a new Kafka node >> > was a >> > > > > first >> > > > > > class operation explicitly initiated by the admin, rather than >> > > > something >> > > > > > that happened implicitly when you started up the broker and >> things >> > > > > "looked >> > > > > > blank." >> > > > > > >> > > > > > The first problem is that things can "look blank" accidentally >> if >> > the >> > > > > > storage system is having a bad day. Clearly in the non-Raft >> world, >> > > > this >> > > > > > leads to data loss if the broker that is (re)started this way >> was >> > the >> > > > > > leader for some partitions. >> > > > > > >> > > > > > The second problem is that we have a bit of a chicken and egg >> > problem >> > > > > with >> > > > > > certain configuration keys. For example, maybe you want to >> > configure >> > > > > some >> > > > > > connection security settings in your cluster, but you don't want >> > them >> > > > to >> > > > > > ever be stored in a plaintext config file. (For example, SCRAM >> > > > > passwords, >> > > > > > etc.) You could use a broker API to set the configuration, but >> > that >> > > > > brings >> > > > > > up the chicken and egg problem. The broker needs to be >> configured >> > to >> > > > > know >> > > > > > how to talk to you, but you need to configure it before you can >> > talk >> > > to >> > > > > > it. Using an external secret manager like Vault is one way to >> > solve >> > > > > this, >> > > > > > but not everyone uses an external secret manager. >> > > > > > >> > > > > > quorum.voters seems like a similar configuration key. In the >> > current >> > > > > KIP, >> > > > > > this is only read if there is no other configuration specifying >> the >> > > > > quorum >> > > > > > voter set. If we had a kafka.mkfs command, we wouldn't need >> this >> > key >> > > > > > because we could assume that there was always quorum information >> > > stored >> > > > > > locally. >> > > > > > >> > > > > > best, >> > > > > > Colin >> > > > > > >> > > > > > >> > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson wrote: >> > > > > > > Hi All, >> > > > > > > >> > > > > > > I'd like to start a discussion on KIP-595: >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum >> > > > > > . >> > > > > > > This proposal specifies a Raft protocol to ultimately replace >> > > > Zookeeper >> > > > > > > as >> > > > > > > documented in KIP-500. Please take a look and share your >> > thoughts. >> > > > > > > >> > > > > > > A few minor notes to set the stage a little bit: >> > > > > > > >> > > > > > > - This KIP does not specify the structure of the messages >> used to >> > > > > > represent >> > > > > > > metadata in Kafka, nor does it specify the internal API that >> will >> > > be >> > > > > used >> > > > > > > by the controller. Expect these to come in later proposals. >> Here >> > we >> > > > are >> > > > > > > primarily concerned with the replication protocol and basic >> > > > operational >> > > > > > > mechanics. >> > > > > > > - We expect many details to change as we get closer to >> > integration >> > > > with >> > > > > > > the controller. Any changes we make will be made either as >> > > amendments >> > > > > to >> > > > > > > this KIP or, in the case of larger changes, as new proposals. >> > > > > > > - We have a prototype implementation which I will put online >> > within >> > > > the >> > > > > > > next week which may help in understanding some details. It has >> > > > > diverged a >> > > > > > > little bit from our proposal, so I am taking a little time to >> > bring >> > > > it >> > > > > in >> > > > > > > line. I'll post an update to this thread when it is available >> for >> > > > > review. >> > > > > > > >> > > > > > > Finally, I want to mention that this proposal was drafted by >> > > myself, >> > > > > > Boyang >> > > > > > > Chen, and Guozhang Wang. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Jason >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > Leonard Ge >> > > > > Software Engineer Intern - Confluent >> > > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > -- > -- Guozhang > -- -- Guozhang