Thanks for the additional feedback Jun. Comments below. On Fri, Feb 16, 2024 at 4:09 PM Jun Rao <j...@confluent.io.invalid> wrote: > 10. "The controller state machine will instead push the brokers' > kraft.version information to the KRaft client". If we do that, why do we > need KRaftVersionRecord?
I am doing this as a reasonable compromise. Let me better explain why we need KRaftVersionRecord and VotersRecord for voters as control records. First the controller state machine (QuorumController in the code) operates on committed data (records that have an offset smaller than the HWM). That means that to read committed data the HWM needs to be established. The HWM is discovered by the KRaft leader. To establish the KRaft leader the voters need to send RPCs to other voters. To be able to send RPCs to other voters the replicas need to be able to read and process the locally uncommitted KRaftVersionRecord and VotersRecord. In short, the metadata layer (quorum controller) reads and processes committed data while the KRaft layer reads and processes uncommitted data. KRaft needs to read and process uncommitted data because that data is required to establish a majority (consensus) and a leader. I am relaxing this for observers (brokers) for two reasons: 1. Observers are dynamic and unknown to the voters (leader). Voters only need to handle Fetch and FetchSnapshot requests from observers (brokers). Their information is not persisted to disk and it is only tracked in-memory for reporting purposes (DescribeQuorum) while they continue to Fetch from the leader. 2. The voters don't need to read the uncommitted information about the brokers (observers) to establish a majority and the leader. So there is not strict requirement to include this information as control record in the log and snapshot. > 15. Hmm, I thought controller.listener.names already provides the listener > name. It's a list so that we could support changing security protocols. Not sure if I fully understand the comment but here is an example that maybe illustrates why we need all of the information included in the KIP (VotersRecord). Let's assume the following local configuration: controller.listener.names=CONTROLLER_SSL,CONTROLLER_PLAINTEXT With this configuration the voter (controller) prefers connecting through CONTROLLER_SSL first and CONTROLLER_PLAINTEXT second. To establish consensus and leadership the voters need to send the Vote request to other voters. Which host and endpoint should the voter use? Let's assume the follow VotersRecord: { "VoterId": 0, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER_SSL", "host": "controller-0", "port": 1234}, {"name": "CONTROLLER_PLAINTEXT", ... } ] { "VoterId": 1, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER_SSL", "host": "controller-1", "port": 1234}, {"name": "CONTROLLER_PLAINTEXT", ... } ] { "VoterId": 2, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER_SSL", "host": "controller-2", "port": 1234}, {"name": "CONTROLLER_PLAINTEXT", ... } ] In this configuration, the local replica can use CONTROLLER_SSL and lookup the host and port because that is the preferred (first) listener and it is supported by all of the voters. Now let's assume the following VotersRecord: { "VoterId": 0, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER_SSL", "host": "controller-0", "port": 1234}, {"name": "CONTROLLER_PLAINTEXT", ... } ] { "VoterId": 1, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER_PLAINTEXT", ... } ] { "VoterId": 2, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER_SSL", "host": "controller-2", "port": 1234}, {"name": "CONTROLLER_PLAINTEXT", ... } ] In this configuration, the local replica needs to use CONTROLLER_PLAINTEXT because that is what is supported by all of the voters. > 17.1 "1. They are implemented at two different layers of the protocol. The > Kafka controller is an application of the KRaft protocol. I wanted to > keep this distinction in this design. The controller API is going to > forward ControllerRegistrationRequest to the QuorumController and it > is going to forward UpdateVoter to the KafkaRaftClient." > Hmm, but the controller already pushes the brokers' kraft.version > information to the KRaft client. Right but only for brokers (observers). Voters are different in that their information is required to establish consensus. KRaft needs to read this information as uncommitted data because committed data (HWM) can only be established after leadership has been established. > "2. If the voter getting updated is not part of the > voter set the leader will reject the update." > Would it be simpler to just relax that? The KIP already relaxed some of the > checks during the vote. We could but in this KIP replicas only stores information about voters. If we relax that, replicas will start storing information about observers (or brokers). Now we would have to add a mechanism for deleting this data similar to UnregisterBrokerRequest. > "3. The other semantic difference is that in KRaft, the voter set > (which includes the replica ids and the endpoint) is based on > uncommitted data. While the controller state machine only sees > committed data." > Hmm, does that prevent the supported versions from being propagated through > ControllerRegistrationRequest? Technically no but it does lead to a clunky implementation. 1. KRaft needs to trust the application to provide correct information. 2. It is async. The local voter learns about the voters by reading the uncommitted VotersRecord at some offset X but the RegisterBrokerRecord is for some committed offset Y that has no causal relation to X. > 17.2 It seems the main purpose of UpdateVoter is to change the endpoints. > Currently, this is handled through ControllerRegistrationRequest. Are we > going to have two ways of changing endpoints? Yes. If we want to have a clean separation of layers we have to have this duplication with the current design. In a future KIP, I can explore what it would take to change ControllerRegistrationRequest so that it doesn't include endpoint information. Controllers (metadata layer) do not send RPCs to other controllers. Only the KRaft layer has point-to-point (RPC) communication between replicas (voters). That information was added so that it can be sent to clients in the METADATA response but we can change that so that it uses the information provided by the KRaft layer. > 18.2 I thought controller.listener.names already provides the listener name. I think I address this comment in one of my comments above but let me know if I missed anything. > 18.3 When the new voter starts, does it send the > ControllerRegistrationRequest? If so, it already includes the supported > kraft.version. It won't include kraft.version. kraft.version will behave differently from other feature levels. For context, I decided to use Kafka's feature level because I didn't want to design and implement a similar mechanism to UpdateFeatures and the kafka-features CLI. kraft.version will leverage UpdateFeatures and ApiVersions for updating and reporting the kraft.version. It will not use the metadata layer (QuorumController) to update and persist the finalized kraft.version. > 19. Hmm, LeaderChangeMessage only has one voter set, not per partition. Is > that a problem in the future? Correct. LeaderChangeMessage is a control record that is stored per KRaft topic partition. There is no need to include the topic partition because it is implicit based on the log segment where the control record was written. This also applies to the two new control records added by this KIP: KRaftVersionRecord and VotersRecord. > 20.1 The following are the changes to EndQuorumEpochRequest. > PreferredSuccessors is still there. > + { "name": "PreferredSuccessors", "type": "[]int32", "versions": > "0", > + "about": "A sorted list of preferred successors to start the > election" }, > + { "name": "PreferredCandidates", "type": "[]ReplicaInfo", > "versions": "1+", > + "about": "A sorted list of preferred successors to start the > election", "fields": [ Got it. Fixed. > 20.2 "To all of these responses I have added endpoint information now that > the voter set and endpoints are dynamic." > This seems redundant. Shouldn't we just use 1 RPC (e.g. > ControllerRegistrationRequest) to handle endPoint changes? Let me illustrate the issue with an example. Let's assume that the cluster has been fully enabled to use this feature and we have the following state in all of the replicas (observers and voters) VotersRecord is: { "VoterId": 0, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER", "host": "controller-0", "port": 1234} ] { "VoterId": 1, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER", "host": "controller-1", "port": 1234} ] { "VoterId": 2, "VoterUuid": "...", "Endpoints": [ {"name": "CONTROLLER", "host": "controller-2", "port": 1234} ] The property controller.quorum.bootstrap.servers is set to "controller-0:1234,controller-1:1234,controller-2:1234". The operator adds a new broker to the cluster. What happens at this point? The broker starts with no VotersRecords in the __cluster_metadata-0 topic partition. The KRaft client sends a Fetch request to a random endpoint in the controller.quorum.bootstrap.servers. Let's assume that the chosen endpoint is not the leader. The remote replica will reply with a Fetch response where the CurrentLeader is set to the current leader and epoch. This is not enough information for the new broker to send the next Fetch request. The broker needs to know the endpoint of the leader. This is just one example. There are many more examples where the replica receiving the CurrentLeader in the response doesn't have enough information to map the leader id (replica id) to an endpoint. The value in controller.quorum.bootstrap.servers can also be a DNS A record or CNAME record that maps to all of the voters. E.g. "controller:1234" where "controller" resolves to "controller-0", "controller-1" and "controller-2". This is useful for deployment environments like Kubernetes. ControllerRegistrationRequest contains the endpoints but those endpoints are propagated through the __cluster_metadata-0 partition. To replicate the __cluster_metadata-0 partition all replicas need to be able to contact the leader. > 22. Could we at least name the additional checkpoint file more consistently > with the existing one? e.g. kraft-bootstrap.checkpoint? Valid names for a checkpoint are <snapshot-id-offset>-<snapshot-id-epoch>.checkpoint (there is code for this in o.a.k.s.Snapshots). This is assumed in the KRaft implementation. I want the bootstrapping checkpoint to integrate with the rest of the implementation so we can use existing functionality like RaftClient.Listener::handleLoadSnapshot, the snapshot deletion logic, etc. > 26. Since MinSupportedVersion and MaxSupportedVersion are stored in > RegisterBrokerRecord, it's redundant to store them again in VotersRecord, > right? Yes. It is a duplication of the information. I would like to keep this information consistent with the replica id and replica uuid. Otherwise, we have issues where the information comes from different points in time (different offsets in the log). > 30. Currently, metadata.version controls all inter-broker RPCs, records and > some cluster-wide functionalities. What does kraft.version control? Is > there any RPC/record controlled by both features? In summary, it controls the following changes to disk: 1. Version 1 of the QuorumStateData - I wanted to make this upgrade explicit to allow for software binary downgrade if the kraft.version is not increased. E.g. replace the Jar of a Kafka node from version 3.8 to version 3.7 as long as they don't explicitly upgrade the kraft.version to 1. 2. The writing of VotersRecords to the log segments or snapshot. 3. The writing of Version 2 of the LeaderChangeRecord. This also means that the kraft.version is also checked in RPCs that attempt to write this data to disk. That includes AddVoter, RemoveVoter and UpdateVoter RPC. Thanks! I am going to update the KIP to document some of the feedback provided above. -- -José