Thanks for the reply, Jose. 10. "The controller state machine will instead push the brokers' kraft.version information to the KRaft client". If we do that, why do we need KRaftVersionRecord?
15. Hmm, I thought controller.listener.names already provides the listener name. It's a list so that we could support changing security protocols. 17.1 "1. They are implemented at two different layers of the protocol. The Kafka controller is an application of the KRaft protocol. I wanted to keep this distinction in this design. The controller API is going to forward ControllerRegistrationRequest to the QuorumController and it is going to forward UpdateVoter to the KafkaRaftClient." Hmm, but the controller already pushes the brokers' kraft.version information to the KRaft client. "2. If the voter getting updated is not part of the voter set the leader will reject the update." Would it be simpler to just relax that? The KIP already relaxed some of the checks during the vote. "3. The other semantic difference is that in KRaft, the voter set (which includes the replica ids and the endpoint) is based on uncommitted data. While the controller state machine only sees committed data." Hmm, does that prevent the supported versions from being propagated through ControllerRegistrationRequest? 17.2 It seems the main purpose of UpdateVoter is to change the endpoints. Currently, this is handled through ControllerRegistrationRequest. Are we going to have two ways of changing endpoints? 18.2 I thought controller.listener.names already provides the listener name. 18.3 When the new voter starts, does it send the ControllerRegistrationRequest? If so, it already includes the supported kraft.version. 19. Hmm, LeaderChangeMessage only has one voter set, not per partition. Is that a problem in the future? 20.1 The following are the changes to EndQuorumEpochRequest. PreferredSuccessors is still there. + { "name": "PreferredSuccessors", "type": "[]int32", "versions": "0", + "about": "A sorted list of preferred successors to start the election" }, + { "name": "PreferredCandidates", "type": "[]ReplicaInfo", "versions": "1+", + "about": "A sorted list of preferred successors to start the election", "fields": [ 20.2 "To all of these responses I have added endpoint information now that the voter set and endpoints are dynamic." This seems redundant. Shouldn't we just use 1 RPC (e.g. ControllerRegistrationRequest) to handle endPoint changes? 22. Could we at least name the additional checkpoint file more consistently with the existing one? e.g. kraft-bootstrap.checkpoint? 26. Since MinSupportedVersion and MaxSupportedVersion are stored in RegisterBrokerRecord, it's redundant to store them again in VotersRecord, right? 30. Currently, metadata.version controls all inter-broker RPCs, records and some cluster-wide functionalities. What does kraft.version control? Is there any RPC/record controlled by both features? Thanks, Jun On Wed, Feb 14, 2024 at 11:12 AM José Armando García Sancio <jsan...@confluent.io.invalid> wrote: > Hi Jun, > > Thanks for the feedback. Excuse the delay, it took me a while to > properly address your detailed feedback. See my comments below. > > I am going to update the KIP as outlined in this email. I will send > another email when I have made all of the changes. > > On Fri, Feb 2, 2024 at 10:54 AM Jun Rao <j...@confluent.io.invalid> wrote: > > 10. kraft.version: Functionality wise, this seems very similar to > > metadata.version, which is to make sure that all brokers/controllers are > on > > a supported version before enabling a new feature. Could you explain why > we > > need a new one instead of just relying on metadata.version? > > Yes, they are trying to solve similar problems but they are doing them > at a different abstraction layer. "metadata.version" uses > FeatureLevelRecord which is a "data" record. I am trying to design the > KRaft protocol (KafkaRaftClient implementation) to not assume a > specific application (metadata and the controller). To support > multiple applications possibly using KRaft means that we need to use > control records to define KRaft's behavior. Instead of using > FeatureLevelRecord which is a cluster metadata record, this KIP uses > KRaftVersionRecord which will be a control record. > > The reason why we need to consider observers (brokers) when > determining the kraft.version is because of snapshot (compaction). > Since snapshots need to include all logical records in the log > segments and observers (brokers) maintain the log on disk, this means > that they need to be able to decode and encode both KRaftVersionRecord > and VotersRecord before the leader is allowed to write them. > > > 11. Both the quorum-state file and controller.quorum.bootstrap.servers > > contain endpoints. Which one takes precedence? > > I updated the KIP (section "Reference explanation / Endpoints > information") after Jack's email. KRaft is going to keep two sets of > endpoints. > > 1. Voters set. This set of endpoints is used by voters to establish > leadership with the Vote, BeginQuorumEpoch and EndQuorumEpoch RPCs. > The precedence order is the VotersRecord in the snapshot and log. If > there are no VotersRecords in the snapshot or log, it will use the > configuration in controller.quorum.voters. > > 2. Bootstrap servers. This is mainly used by observers (brokers) to > discover the leader through Fetch RPCs. The precedence order is > controller.quorum.bootstrap.servers first, controller.quorum.voters > second. Voters won't use this property as they discover the leader and > its endpoint from the BeginQuorumEpoch RPC from the leader. > > I believe that the original intent of the voters in quorum state file > was as a cache of the controller.quorum.voters configuration and to > identify invalid changes in the configuration. I was trying to keep > this functionality in the new version of the quorum state file. > > The more I think about the implementation, I don't think this is > useful or even implementable. KRaft needs to keep all voters sets from > the latest snapshot to the LEO so that it could include the correct > voters set when generating a snapshot. I am going to update the KIP to > remove voter information from the quorum state file. > > > 12. It seems that downgrading from this KIP is not supported? Could we > have > > a section to make it explicit? > > Yes. Downgrades will not be supported. I updated the "Compatibility, > deprecation and migration plan". There is a sentence about this in the > "Public interfaces / Command line interface / kafka-features" section. > I'll also update the "Proposed changes / Reference explanation / > Supported features" and "Public interfaces / RPCs / UpdateFeatures / > Handling" sections. > > > 13. controller.quorum.auto.join.enable: If this is set true, when does > the > > controller issue the addVoter RPC? Does it need to wait until it's caught > > up? Does it issue the addVoter RPC on every restart? > > The controller will monitor the voters set. If the controller finds > that the voters set doesn't contain itself, it will send an AddVoter > RPC to the leader. To avoid the quorum adding a node and becoming > unavailable, I will change the handling of the AddVoter RPC to not > allow duplicate replica id. If there is any replica with the same id > but a different uuid, the old uuid must be removed first (with the > RemoveVoter RPC) before the new uuid can be added. > > Here is an example that shows how a KRaft partition can become > unavailable if we allow automatically adding duplicate replica ids. > Assume that the partition starts with voters v1 and v2. Operator > starts controller 3 as v3 (replica id 3, some generated replica uuid) > and it attempts to automatically join the cluster by sending the > AddVoter RPC. Assume that the voter set (v1, v2, v3) gets committed by > v1 and v2. Controller 3 restarts with a new disk as v3' and sends an > AddVoter RPC. Because (v1, v2, v3) was committed the leader is able to > change the quorum to (v1, v2, v3, v3') but won't be able to commit it > if the controller 3 restarts again and comes back with a new disk as > v3''. > > This case is avoided if the KRaft leader rejects any AddVoter RPC that > duplicates the replica id. > > > 14. "using the AddVoter RPC, the Admin client or the > kafka-metadata-quorum > > CLI.": In general, the operator doesn't know how to make RPC calls. So > the > > practical options are either CLI or adminClient. > > Sure. I can remove the explicit mention of using AddVoter RPC. I > mentioned it because 3rd party libraries for Kafka could also > implement the client side of the AddVoter RPC. > > > 15. VotersRecord: Why does it need to include name and SecurityProtocol > in > > EndPoints? It's meant to replace controller.quorum.voters, which only > > includes host/port. > > I can remove the security protocol but we should keep the name. This > would allow the local replica to look up the security protocol in the > security protocol map to negotiate the connection to the remote node. > > The current implementation always assumes that all of the endpoints in > controller.quorum.voters expect the security protocol of the first > controller listener. This is difficult to satisfy if the operator > tries to change the security protocol of the controllers and remain > available. > > Assume the following remote voter: > [ {name: CONTROLLER_SSL, host: controller-0, port: 9000}, {name: > CONTROLLER, host: controller-0, port: 9001} ] > > Also assume that the local controller has the configuration for the > CONTROLLER listener and not the CONTROLLER_SSL listener because the > local node has yet to be configured to use SSL. In this case we want > the local replica to be able to connect to the remote replica using > CONTROLLER instead of CONTROLLER_SSL. > > > 16. "The KRaft leader cannot do this for observers (brokers) since their > > supported versions are not directly known by the KRaft leader." > > Hmm, the KRaft leader receives BrokerRegistrationRequest that includes > > supported feature versions, right? > > Yeah. I should define my terms better. I'll update the glossary > section. When I mention KRaft, (KRaft leader, voter, etc), I am > referring to the KRaft protocol and the KafkaRaftClient implementation > and not the controller state machine that is implemented using KRaft. > > As you point out the controller does know about all of the registered > brokers through BrokerRegistrationRequest. The following sentence > mentions that the controller state machine will make this information > available to the KRaft implementation (KafkaRaftClient): "The > controller state machine will instead push the brokers' kraft.version > information to the KRaft client." The controller will push this > information down to the RaftClient because the KRaft implementation > doesn't know how to decode the application's (cluster metadata) > RegisterBrokerRecord. > > > 17. UpdateVoter: > > 17.1 "The leader will learn the range of supported version from the > > UpdateVoter RPC". > > KIP-919 introduced ControllerRegistrationRequest to do that. Do we need a > > new one? > > They are similar but there are 3 differences as to why I decided to > add this new RPC. > 1. They are implemented at two different layers of the protocol. The > Kafka controller is an application of the KRaft protocol. I wanted to > keep this distinction in this design. The controller API is going to > forward ControllerRegistrationRequest to the QuorumController and it > is going to forward UpdateVoter to the KafkaRaftClient. > 2. The semantics are different. ControllerRegistrationRequest is an > update and an insert (upsert) operation while UpdateVoter is only an > update operation. If the voter getting updated is not part of the > voter set the leader will reject the update. > 3. The other semantic difference is that in KRaft, the voter set > (which includes the replica ids and the endpoint) is based on > uncommitted data. While the controller state machine only sees > committed data. > > > 17.2 Do we support changing the voter's endpoint dynamically? If not, it > > seems that can be part of ControllerRegistrationRequest too. > > KRaft should allow for the endpoints of a remote replica to change > without having to restart the local replica. It is okay to require a > restart to change the endpoints of the local replica. > > > 18. AddVoter > > 18.1 "This RPC can be sent to a broker or controller, when sent to a > > broker, the broker will forward the request to the active controller." > > If it's sent to a non-active controller, it will also be forwarded to the > > active controller, right? > > I guess it could but I wasn't planning to implement that. We don't > have a forwarding manager from controller to active controller. We > only have that from brokers to controllers. > > When the Admin client is connected to the controllers the admin client > will connect and send requests directly to the active controller. This > is implemented in the LeastLoadedBrokerOrActiveKController node > provider in the admin client. > > > 18.2 Why do we need the name/security protocol fields in the request? > > Currently, they can be derived from the configs. > > { "name": "Listeners", "type": "[]Listener", "versions": "0+", > > "about": "The endpoints that can be used to communicate with the > > voter", "fields": [ > > { "name": "Name", "type": "string", "versions": "0+", "mapKey": > true, > > "about": "The name of the endpoint" }, > > { "name": "Host", "type": "string", "versions": "0+", > > "about": "The hostname" }, > > { "name": "Port", "type": "uint16", "versions": "0+", > > "about": "The port" }, > > { "name": "SecurityProtocol", "type": "int16", "versions": "0+", > > "about": "The security protocol" } > > ]} > > This is the listener name of the remote replica. The local replica > needs to know how to connect to the remote replica that includes host, > port and security protocol. The local replica can lookup the > connection mechanism by using the listener name as long as the local > replica also has a configuration for that listener name. > > I'll remove the security protocol from the Listener struct. We can add > it later if we find it helpful for debug purposes. > > > 18.3 "4. Send an ApiVersions RPC to the first listener to discover the > > supported kraft.version of the new voter." > > Hmm, I thought that we found using ApiVersions unreliable ( > > https://issues.apache.org/jira/browse/KAFKA-15230) and therefore > > introduced ControllerRegistrationRequest to propagate this information. > > ControllerRegistrationRequest can be made at step 1 during catchup. > > The context of that Jira is around the UpdateFeature request. When > upgrading a feature the controller was checking the ApiVersions > responses in the NetworkClient. This is incorrect because those > entries and connections can timeout. The other issue is that they > wanted UpdateFeature to work even if the controller were offline, > similar to UpdateFeature working if the brokers are offline. > > I don't have this concern with AddVoter since it is already expected > that the kraft voter (controller) is online and fetching from the > leader when the user attempts to add the voter. > > > 18.4 "In 4., the new replica will be part of the quorum so the leader > will > > start sending BeginQuorumEpoch requests to this replica." > > Hmm, the leader should have sent BeginQuorumEpoch at step 1 so that the > new > > replica can catch up from it, right? Aslo, step 4 above only mentions > > ApiVersions RPC, not BeginQuorumEpoch. > > Apologies, that should read: "In 7., ..." I think I updated the > bulleted outline and forgot to update the section that follows. > > Before the VotersRecord gets appended to the log the new replica > should be an observer. Observers discover the leader sending Fetch > requests to the bootstrap servers (either > controller.quorum.bootstrap.servers or controller.quorum.voters). > > This is part of the current behavior and is needed so that the new > voters (currently observers) can catch up to the leader before getting > added to the voters set. > > The leader only sends BeginQuorumEpoch to voters because those are the > only replicas and endpoints that the leader knows about and knows how > to connect to. > > > 19. Vote: It's kind of weird that VoterUuid is at the partition level. > > VoteId and VoterUuid uniquely identify a node, right? So it seems that it > > should be at the same level as VoterId? > > Yeah, it is. The issue is that one node (replica) can have multiple > log directories (log.dirs property) and a different metadata log dir > (metadata.log.dir). So it is possible for one voter id (node.id) to > have different voter uuid for different partitions. > > In practice this won't happen because cluster metadata is the only > KRaft partition but it can happen in the future. > > > 20. EndQuorumEpoch: "PreferredSuccessor which is an array is replica ids, > > will be replaced by PreferredCandidates" > > 20.1 But PreferredSuccessor is still listed in the response. > > It shouldn't be in the response. I don't see it in > EndQuorumEpochRespose. Did you mean that I used "preferred successor" > instead of "preferred candidates" in the "about" section of the JSON > schema? If so, I'll fix that. > > > 20.2 Why does the response need to include endPoint? > > In KRaft most RPC responses return the leader id and leader epoch if > the epoch in the request is smaller than the epoch of the receiving > replica. KRaft does this to propagate the latest leader and epoch as > fast as possible. > > To all of these responses I have added endpoint information now that > the voter set and endpoints are dynamic. > > > 21. "The KRaft implementation and protocol describe in KIP-595 and > KIP-630 > > never read from the log or snapshot." > > This seems confusing. The KIP-595 protocol does read the log to replicate > > it, right? > > You are right. This is not accurate. KRaft reads from the log and > snapshot. I meant to say that the KRaft protocol described in those > KIPs is not dependent on the content of the control record in the > snapshot or log. That is changing with this KIP. > > > 22. "create a snapshot at 00000000000000000000-0000000000.checkpoint with > > the necessary control records (KRaftVersionRecord and VotersRecord) to > make > > this Kafka node the only voter for the quorum." > > 22.1 It seems that currently, the bootstrap checkpoint is > > named bootstrap.checkpoint. Are we changing it to > > 00000000000000000000-0000000000.checkpoint? > > bootstrap.checkpoint is controller feature and not integrated into the > KRaft layer. For example, that file is read directly by the controller > without interacting with KRaft. For this feature we need a checkpoint > that integrates with the KRaft checkpoint mechanisms. > > Do you mind if I resolve this inconsistency in a future KIP? It should > be possible to get rid of the bootstrap.checkpoint and fix the > controller implementation to always let the RaftClient manage the > checkpoint's lifecycle. > > > 22.2 Just to be clear, records in the bootstrap snapshot will be > propagated > > to QuorumStateData, right? > > This is true in the current KIP. As I mentioned earlier in this email, > I am going to remove voter information from QuorumStateData. It is not > useful since KRaft needs to support the case when log truncation > includes VotersRecord control records. This means that the local > replica will always discover the voters set by reading and decoding > the local snapshot and log segments. > > > 23. kafka-metadata-quorum: In the output, we include the endpoints for > the > > voters, but not the observers. Why the inconsistency? > > Yes. This is intentional. At the KRaft layer the leader doesn't know > the endpoints for the observers. The leader doesn't need to know those > endpoints because in the KRaft protocol the leader never sends > requests to observers. The KRaft leader will only print the endpoints > contained in the VotersRecord control records in the local log. > > > 24. "The tool will set the TimoutMs for the AddVoterRequest to 30 > seconds." > > Earlier, we have "The TimoutMs for the AddVoter RPC will be set to 10 > > minutes". Should we make them consistent? > > Yeah. They are different clients, the admin client and > "kafka-metadata-quorum add-controller" will use 30 seconds. The > controller's network client was going to use 10 minutes. Now that I > think about it is better to keep the timeout lower and consistent. The > controller needs to implement request retrying anyways. I'll change > the KIP to use 30 seconds for both cases. > > > 25. "The replicas will update these order list of voters set whenever the > > latest snapshot id increases, a VotersRecord control record is read from > > the log and the log is truncated." > > Why will the voters set change when a new snapshot is created? > > The implementation needs to keep a list of voter sets (e.g. > List[(Offset: Int, Voters: Set[Voter]]). It needs to keep this list of > voters sets for any VotersRecord between the latest snapshot and the > LEO. It needs to do this for two reasons: > > 1. The replica will need to generate a snapshot at some offset X > between the latest snapshot and the HWM. The replica needs to know the > latest voters set (VotersRecord) at that offset. > 2. The replica may be asked (through the DivergingEpoch in the Fetch > response) to truncate its LEO. It is more efficient to just remove > elements from the list of voters set than to re-read the latest > snapshot and the following log segments. > > > 26. VotersRecord includes MinSupportedVersion and MaxSupportedVersion. In > > FeatureLevelRecord, we only have one finalized version. Should we be > > consistent? > > The VotersRecord control record in KRaft is similar to the > RegisterBrokerRecord in metadata (controller). VotersRecord describes > all of the voters, their endpoints and supported kraft protocol > versions (e.g. 0 to 1). > The KRaftVersionRecord control record in KRaft is similar to the > FeatureLevelRecord in metadata (controller). It specifies the > currently active or finalized kraft.version. > > > 27. The Raft dissertation mentioned the issue of disruptive servers after > > they are removed. "Once the cluster leader has created the Cnew entry, a > > server that is not in Cnew will no longer receive heartbeats, so it will > > time out and start new elections." > > Have we addressed this issue? > > To summarize, Diego suggests that Vote requests should be denied if > the receiving voter is a follower with an active leader. He also > suggests ignoring the epoch in the request even if it is a greater > epoch than the epoch of the local replica. > > My plan is to rely on KIP-996: Pre-vote: > "When servers receive VoteRequests with the PreVote field set to true, > they will respond with VoteGranted set to > * true if they are not a Follower and the epoch and offsets in the > Pre-Vote request satisfy the same requirements as a standard vote > * false if otherwise" > > Similar to the pre-vote KIP, I don't think we should implement Diego's > suggestion of also ignoring the higher epoch for "standard" Vote > request: > "if a server receives a RequestVote request within the minimum > election timeout of hearing from a current leader, it does not update > its term or grant its vote" > > I don't think we should do this because otherwise the replica that > sent the Vote request may never be able to participate in the KRaft > partition (even as an observer) because its on-disk epoch is greater > than the epoch of the rest of the replicas. In KRaft (and Raft) we > have an invariant where epochs are monotonically increasing. > > > 28. There are quite a few typos in the KIP. > > 28.1 "the voters are the replica ID and UUID is in its own voters set > > Does not read well. > > 28.2 "used to configured: > > used to configure > > 28.3 "When at voter becomes a leader" > > when a voter > > 28.4 "write an VotersRecord controler" > > a VotersRecord; controller > > 28.5 "will get bootstrap" > > bootstrapped > > 28.6 "the leader of the KRaft cluster metadata leader" > > the leader of the KRaft cluster metadata partition > > 28.7 "until the call as been acknowledge" > > has been acknowledged > > 28.8 "As describe in KIP-595" > > described > > 28.9 "The KRaft implementation and protocol describe" > > described > > 28.10 "In other, the KRaft topic partition won't" > > In other words > > 28.11 "greater than their epic" > > epoch > > 28.12 "so their state will be tracking using their ID and UUID" > > tracked > > Thanks. I'll fix these typos and see if I can find more typos in the KIP. > -- > -José >