Hello Jun, Thanks for your comments! I'm replying inline below:
On Fri, May 1, 2020 at 12:36 PM Jun Rao <j...@confluent.io> wrote: > Hi, Jason, > > Thanks for the KIP. Great writeup. A few comments below. > > 100. Do we need AlterQuorum in the first version? Quorum changes are rare > and the implementation is involved. ZK doesn't have that until 4 years > after the initial version. Dropping that in the first version could speed > up this KIP. > Yes, the first version of the implementation does not necessarily need quorum re-configuration. In fact, in our current on-going work we did not put it as the top priorities to complete before we push out the first working prototype --- the quorum members can just be static “quorum.voters” config. However, we still want to include the design of quorum re-configuration in this KIP for discussion, so that we are confident that when adding the dynamic reconfiguration feature it is well-aligned with the protocol we've proposed and implemented, and we would not need to rework a lot of the implementation in our first version. > 101. Bootstrapping related issues. > 101.1 Currently, we support auto broker id generation. Is this supported > for bootstrap brokers? > The vote ids would just be the broker ids. "bootstrap.servers" would be similar to what client configs have today, where "quorum.voters" would be pre-defined config values. > 101.2 As Colin mentioned, sometimes we may need to load the security > credentials to be broker before it can be connected to. Could you provide a > bit more detail on how this will work? > > This is a good question.. Either the credentials are stored in a remote source or in local JAAS file, I think we need to load it before the broker trying to find out the quorum. > 102. Log compaction. One weak spot of log compaction is for the consumer to > deal with deletes. When a key is deleted, it's retained as a tombstone > first and then physically removed. If a client misses the tombstone > (because it's physically removed), it may not be able to update its > metadata properly. The way we solve this in Kafka is based on a > configuration (log.cleaner.delete.retention.ms) and we expect a consumer > having seen an old key to finish reading the deletion tombstone within that > time. There is no strong guarantee for that since a broker could be down > for a long time. It would be better if we can have a more reliable way of > dealing with deletes. > We propose to capture this in the "FirstDirtyOffset" field of the quorum record fetch response: the offset is the maximum offset that log compaction has reached up to. If the follower has fetched beyond this offset it means itself is safe hence it has seen all records up to that offset. On getting the response, the follower can then decide if its end offset actually below that dirty offset (and hence may miss some tombstones). If that's the case: 1) Naively, it could re-bootstrap metadata log from the very beginning to catch up. 2) During that time, it would refrain itself from answering MetadataRequest from any clients. > 103. For the newly introduced configurations related to timeouts, could we > describe the defaults? > We are still discussing about the default values, and I think some benchmarking experiment would be needed. At the moment just based on the literature, our general thinking are: 1) the fetch.timeout should be around the same scale with zk session timeout, which is now 18 seconds by default -- in practice we've seen unstable networks having more than 10 secs of transient connectivity, 2) the election.timeout, however, should be smaller than the fetch timeout as is also suggested as a practical optimization in literature: https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf > 104. "This proposal requires a persistent log as well as a separate file to > maintain the current quorum state". In JBOD, are the quorum log and quorum > state file kept together on the same disk? > > I think for correctness there's no requirement if they have to be either both intact or corrupted: the quorum-state is the source of truth for the current quorum state, especially for the current candidate (if any) that this voter has voted to. If it is missing, the broker would just rely on the gossiping FindQuorum to refresh its knowledge. The quorum log on the other hand, is just storing the metadata updates (including the quorum changes, for reconfiguration purposes) for Kafka. So it is okay if either of them is corrupted while the other is intact. > 105. Quorum State: In addition to VotedId, do we need the epoch > corresponding to VotedId? Over time, the same broker Id could be voted in > different generations with different epoch. > > Hmm, this is a good point. Originally I think the "LeaderEpoch" field in that file is corresponding to the "latest known leader epoch", not the "current leader epoch". For example, if the current epoch is N, and then a vote-request with epoch N+1 is received and the voter granted the vote for it, then it means for this voter it knows the "latest epoch" is N + 1 although it is unknown if that sending candidate will indeed become the new leader (which would only be notified via begin-quorum request). However, when persisting the quorum state, we would encode leader-epoch to N+1, while the leaderId to be the older leader. But now thinking about this a bit more, I feel we should use two separate epochs, one for the "lates known" and one for the "current" to pair with the leaderId. I will update the wiki page. > 106. "OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to indicate > that the follower has fetched from an invalid offset and should truncate to > the offset/epoch indicated in the response." Observers can't truncate their > logs. What should they do with OFFSET_OUT_OF_RANGE? > > I'm not sure if I understand your question? Observers should still be able to truncate their logs as well. > 107. "The leader will continue sending BeginQuorumEpoch to each known voter > until it has received its endorsement." If a voter is down for a long time, > sending BeginQuorumEpoch seems to add unnecessary overhead. Similarly, if a > follower stops sending FetchQuorumRecords, does the leader keep sending > BeginQuorumEpoch? > Regarding BeginQuorumEpoch: that is a good point. The begin-quorum-epoch request is for voters to quickly get the new leader information; however even if they do not get them they can still eventually learn about that from others via gossiping FindQuorum. I think we can adjust the logic to e.g. exponential back-off or with a limited num.retries. Regarding FetchQuorumRecords: if the follower sends FetchQuorumRecords already, it means that follower already knows that the broker is the leader, and hence we can stop retrying BeginQuorumEpoch; however it is possible that after a follower sends FetchQuorumRecords already, suddenly it stops send it (possibly because it learned about a higher epoch leader), and hence this broker may be a "zombie" leader and we propose to use the fetch.timeout to let the leader to try to verify if it has already been stale. > > 108. Do we store leaderEpoch history and HWM on disk for the quorum log as > we do for regular logs? > Yes we will. I can make that clear in the doc too. > > 109. Does FetchQuorumRecordsRequest return only up to HWM for observers? > > Yes, the current logic is that for voters we would return up to log-end while for observers we would only return up to hwm. This is already implemented in https://github.com/confluentinc/kafka/commit/9f52a740aba457dcf27c5babd12d6dbbe31d5e4f I will update the doc too. > 110. "Specifically a follower/observer must check for voter assignment > messages". Do you mean LeaderChangeMessage? > > It is referred to "VoterAssignmentMessage", which is type==2 stored as part of the metadata log and is to be used for quorum reconfiguration. I will update the doc. > 111. FetchQuorumRecords also serves as keep-alive for observers. I am > wondering if the observer needs a separate EndObserving RPC during a > controlled shutdown so that the controller can detect the planned failure > faster than the timeout. > We've thought about the special logic for broker start up / broker shutdown to replace the current ZK ephemeral node mechanism. For broker shutdown, the current thinking is to still piggy-back on ControlledShutdown request from broker to controller. Then in addition to partition migration, the controller would also do extra logic if the shutting down broker is also part of the quorum (i.e. it is a voter). > > Jun > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Leonard, > > > > Thanks for your comments, I'm relying in line below: > > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge <w...@confluent.io> > > wrote: > > > > > Hi Kafka developers, > > > > > > It's great to see this proposal and it took me some time to finish > > reading > > > it. > > > > > > And I have the following questions about the Proposal: > > > > > > - How do we plan to test this design to ensure its correctness? Or > > more > > > broadly, how do we ensure that our new ‘pull’ based model is > > functional > > > and > > > correct given that it is different from the original RAFT > > implementation > > > which has formal proof of correctness? > > > > > > > We have two planned verifications on the correctness and liveness of the > > design. One is via model verification (TLA+) > > https://github.com/guozhangwang/kafka-specification > > > > Another is via the concurrent simulation tests > > > > > https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91 > > > > - Have we considered any sensible defaults for the configuration, i.e. > > > all the election timeout, fetch time out, etc.? Or we want to leave > > > this to > > > a later stage when we do the performance testing, etc. > > > > > > > This is a good question, the reason we did not set any default values for > > the timeout configurations is that we think it may take some benchmarking > > experiments to get these defaults right. Some high-level principles to > > consider: 1) the fetch.timeout should be around the same scale with zk > > session timeout, which is now 18 seconds by default -- in practice we've > > seen unstable networks having more than 10 secs of transient > connectivity, > > 2) the election.timeout, however, should be smaller than the fetch > timeout > > as is also suggested as a practical optimization in literature: > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf > > > > Some more discussions can be found here: > > https://github.com/confluentinc/kafka/pull/301/files#r415420081 > > > > > > > - Have we considered piggybacking `BeginQuorumEpoch` with the ` > > > FetchQuorumRecords`? I might be missing something obvious but I am > > just > > > wondering why don’t we just use the `FindQuorum` and > > > `FetchQuorumRecords` > > > APIs and remove the `BeginQuorumEpoch` API? > > > > > > > Note that Begin/EndQuorumEpoch is sent from leader -> other voter > > followers, while FindQuorum / Fetch are sent from follower to leader. > > Arguably one can eventually realize the new leader and epoch via > gossiping > > FindQuorum, but that could in practice require a long delay. Having a > > leader -> other voters request helps the new leader epoch to be > propagated > > faster under a pull model. > > > > > > > - And about the `FetchQuorumRecords` response schema, in the > `Records` > > > field of the response, is it just one record or all the records > > starting > > > from the FetchOffset? It seems a lot more efficient if we sent all > the > > > records during the bootstrapping of the brokers. > > > > > > > Yes the fetching is batched: FetchOffset is just the starting offset of > the > > batch of records. > > > > > > > - Regarding the disruptive broker issues, does our pull based model > > > suffer from it? If so, have we considered the Pre-Vote stage? If > not, > > > why? > > > > > > > > The disruptive broker is stated in the original Raft paper which is the > > result of the push model design. Our analysis showed that with the pull > > model it is no longer an issue. > > > > > > > Thanks a lot for putting this up, and I hope that my questions can be > of > > > some value to make this KIP better. > > > > > > Hope to hear from you soon! > > > > > > Best wishes, > > > Leonard > > > > > > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe <cmcc...@apache.org> > wrote: > > > > > > > Hi Jason, > > > > > > > > It's amazing to see this coming together :) > > > > > > > > I haven't had a chance to read in detail, but I read the outline and > a > > > few > > > > things jumped out at me. > > > > > > > > First, for every epoch that is 32 bits rather than 64, I sort of > wonder > > > if > > > > that's a good long-term choice. I keep reading about stuff like > this: > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 . Obviously, > > that > > > > JIRA is about zxid, which increments much faster than we expect these > > > > leader epochs to, but it would still be good to see some rough > > > calculations > > > > about how long 32 bits (or really, 31 bits) will last us in the cases > > > where > > > > we're using it, and what the space savings we're getting really is. > It > > > > seems like in most cases the tradeoff may not be worth it? > > > > > > > > Another thing I've been thinking about is how we do bootstrapping. I > > > > would prefer to be in a world where formatting a new Kafka node was a > > > first > > > > class operation explicitly initiated by the admin, rather than > > something > > > > that happened implicitly when you started up the broker and things > > > "looked > > > > blank." > > > > > > > > The first problem is that things can "look blank" accidentally if the > > > > storage system is having a bad day. Clearly in the non-Raft world, > > this > > > > leads to data loss if the broker that is (re)started this way was the > > > > leader for some partitions. > > > > > > > > The second problem is that we have a bit of a chicken and egg problem > > > with > > > > certain configuration keys. For example, maybe you want to configure > > > some > > > > connection security settings in your cluster, but you don't want them > > to > > > > ever be stored in a plaintext config file. (For example, SCRAM > > > passwords, > > > > etc.) You could use a broker API to set the configuration, but that > > > brings > > > > up the chicken and egg problem. The broker needs to be configured to > > > know > > > > how to talk to you, but you need to configure it before you can talk > to > > > > it. Using an external secret manager like Vault is one way to solve > > > this, > > > > but not everyone uses an external secret manager. > > > > > > > > quorum.voters seems like a similar configuration key. In the current > > > KIP, > > > > this is only read if there is no other configuration specifying the > > > quorum > > > > voter set. If we had a kafka.mkfs command, we wouldn't need this key > > > > because we could assume that there was always quorum information > stored > > > > locally. > > > > > > > > best, > > > > Colin > > > > > > > > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson wrote: > > > > > Hi All, > > > > > > > > > > I'd like to start a discussion on KIP-595: > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum > > > > . > > > > > This proposal specifies a Raft protocol to ultimately replace > > Zookeeper > > > > > as > > > > > documented in KIP-500. Please take a look and share your thoughts. > > > > > > > > > > A few minor notes to set the stage a little bit: > > > > > > > > > > - This KIP does not specify the structure of the messages used to > > > > represent > > > > > metadata in Kafka, nor does it specify the internal API that will > be > > > used > > > > > by the controller. Expect these to come in later proposals. Here we > > are > > > > > primarily concerned with the replication protocol and basic > > operational > > > > > mechanics. > > > > > - We expect many details to change as we get closer to integration > > with > > > > > the controller. Any changes we make will be made either as > amendments > > > to > > > > > this KIP or, in the case of larger changes, as new proposals. > > > > > - We have a prototype implementation which I will put online within > > the > > > > > next week which may help in understanding some details. It has > > > diverged a > > > > > little bit from our proposal, so I am taking a little time to bring > > it > > > in > > > > > line. I'll post an update to this thread when it is available for > > > review. > > > > > > > > > > Finally, I want to mention that this proposal was drafted by > myself, > > > > Boyang > > > > > Chen, and Guozhang Wang. > > > > > > > > > > Thanks, > > > > > Jason > > > > > > > > > > > > > > > > > > -- > > > Leonard Ge > > > Software Engineer Intern - Confluent > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang