Thanks for the additional feedback Jun. Comments below.

On Fri, Feb 16, 2024 at 4:09 PM Jun Rao <j...@confluent.io.invalid> wrote:
> 10. "The controller state machine will instead push the brokers'
> kraft.version information to the KRaft client". If we do that, why do we
> need KRaftVersionRecord?

I am doing this as a reasonable compromise. Let me better explain why
we need KRaftVersionRecord and VotersRecord for voters as control
records.

First the controller state machine (QuorumController in the code)
operates on committed data (records that have an offset smaller than
the HWM). That means that to read committed data the HWM needs to be
established. The HWM is discovered by the KRaft leader. To establish
the KRaft leader the voters need to send RPCs to other voters. To be
able to send RPCs to other voters the replicas need to be able to read
and process the locally uncommitted KRaftVersionRecord and
VotersRecord.

In short, the metadata layer (quorum controller) reads and processes
committed data while the KRaft layer reads and processes uncommitted
data. KRaft needs to read and process uncommitted data because that
data is required to establish a majority (consensus) and a leader.

I am relaxing this for observers (brokers) for two reasons:
1. Observers are dynamic and unknown to the voters (leader). Voters
only need to handle Fetch and FetchSnapshot requests from observers
(brokers). Their information is not persisted to disk and it is only
tracked in-memory for reporting purposes (DescribeQuorum) while they
continue to Fetch from the leader.
2. The voters don't need to read the uncommitted information about the
brokers (observers) to establish a majority and the leader. So there
is not strict requirement to include this information as control
record in the log and snapshot.

> 15. Hmm, I thought controller.listener.names already provides the listener
> name. It's a list so that we could support changing security protocols.

Not sure if I fully understand the comment but here is an example that
maybe illustrates why we need all of the information included in the
KIP (VotersRecord). Let's assume the following local configuration:
controller.listener.names=CONTROLLER_SSL,CONTROLLER_PLAINTEXT

With this configuration the voter (controller) prefers connecting
through CONTROLLER_SSL first and CONTROLLER_PLAINTEXT second. To
establish consensus and leadership the voters need to send the Vote
request to other voters. Which host and endpoint should the voter use?
Let's assume the follow VotersRecord:

{ "VoterId": 0, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER_SSL", "host": "controller-0", "port": 1234}, {"name":
"CONTROLLER_PLAINTEXT", ... } ]
{ "VoterId": 1, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER_SSL", "host": "controller-1", "port": 1234}, {"name":
"CONTROLLER_PLAINTEXT", ... } ]
{ "VoterId": 2, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER_SSL", "host": "controller-2", "port": 1234}, {"name":
"CONTROLLER_PLAINTEXT", ... } ]

In this configuration, the local replica can use CONTROLLER_SSL and
lookup the host and port because that is the preferred (first)
listener and it is supported by all of the voters.

Now let's assume the following VotersRecord:

{ "VoterId": 0, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER_SSL", "host": "controller-0", "port": 1234}, {"name":
"CONTROLLER_PLAINTEXT", ... } ]
{ "VoterId": 1, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER_PLAINTEXT", ... } ]
{ "VoterId": 2, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER_SSL", "host": "controller-2", "port": 1234}, {"name":
"CONTROLLER_PLAINTEXT", ... } ]

In this configuration, the local replica needs to use
CONTROLLER_PLAINTEXT because that is what is supported by all of the
voters.

> 17.1 "1. They are implemented at two different layers of the protocol. The
> Kafka controller is an application of the KRaft protocol. I wanted to
> keep this distinction in this design. The controller API is going to
> forward ControllerRegistrationRequest to the QuorumController and it
> is going to forward UpdateVoter to the KafkaRaftClient."
> Hmm, but the controller already pushes the brokers' kraft.version
> information to the KRaft client.

Right but only for brokers (observers). Voters are different in that
their information is required to establish consensus. KRaft needs to
read this information as uncommitted data because committed data (HWM)
can only be established after leadership has been established.

> "2. If the voter getting updated is not part of the
> voter set the leader will reject the update."
> Would it be simpler to just relax that? The KIP already relaxed some of the
> checks during the vote.

We could but in this KIP replicas only stores information about
voters. If we relax that, replicas will start storing information
about observers (or brokers). Now we would have to add a mechanism for
deleting this data similar to UnregisterBrokerRequest.

> "3. The other semantic difference is that in KRaft, the voter set
> (which includes the replica ids and the endpoint) is based on
> uncommitted data. While the controller state machine only sees
> committed data."
> Hmm, does that prevent the supported versions from being propagated through
> ControllerRegistrationRequest?

Technically no but it does lead to a clunky implementation.
1. KRaft needs to trust the application to provide correct information.
2. It is async. The local voter learns about the voters by reading the
uncommitted VotersRecord at some offset X but the RegisterBrokerRecord
is for some committed offset Y that has no causal relation to X.

> 17.2 It seems the main purpose of UpdateVoter is to change the endpoints.
> Currently, this is handled through ControllerRegistrationRequest. Are we
> going to have two ways of changing endpoints?

Yes. If we want to have a clean separation of layers we have to have
this duplication with the current design. In a future KIP, I can
explore what it would take to change ControllerRegistrationRequest so
that it doesn't include endpoint information. Controllers (metadata
layer) do not send RPCs to other controllers. Only the KRaft layer has
point-to-point (RPC) communication between replicas (voters). That
information was added so that it can be sent to clients in the
METADATA response but we can change that so that it uses the
information provided by the KRaft layer.

> 18.2 I thought controller.listener.names already provides the listener name.

I think I address this comment in one of my comments above but let me
know if I missed anything.

> 18.3 When the new voter starts, does it send the
> ControllerRegistrationRequest? If so, it already includes the supported
> kraft.version.

It won't include kraft.version. kraft.version will behave differently
from other feature levels. For context, I decided to use Kafka's
feature level because I didn't want to design and implement a similar
mechanism to UpdateFeatures and the kafka-features CLI. kraft.version
will leverage UpdateFeatures and ApiVersions for updating and
reporting the kraft.version. It will not use the metadata layer
(QuorumController) to update and persist the finalized kraft.version.

> 19. Hmm, LeaderChangeMessage only has one voter set, not per partition. Is
> that a problem in the future?

Correct. LeaderChangeMessage is a control record that is stored per
KRaft topic partition. There is no need to include the topic partition
because it is implicit based on the log segment where the control
record was written. This also applies to the two new control records
added by this KIP: KRaftVersionRecord and VotersRecord.

> 20.1 The following are the changes to EndQuorumEpochRequest.
> PreferredSuccessors is still there.
> +        { "name": "PreferredSuccessors", "type": "[]int32", "versions":
> "0",
> +          "about": "A sorted list of preferred successors to start the
> election" },
> +        { "name": "PreferredCandidates", "type": "[]ReplicaInfo",
> "versions": "1+",
> +          "about": "A sorted list of preferred successors to start the
> election", "fields": [

Got it. Fixed.

> 20.2 "To all of these responses I have added endpoint information now that
> the voter set and endpoints are dynamic."
> This seems redundant. Shouldn't we just use 1 RPC (e.g.
> ControllerRegistrationRequest) to handle endPoint changes?

Let me illustrate the issue with an example. Let's assume that the
cluster has been fully enabled to use this feature and we have the
following state in all of the replicas (observers and voters)

VotersRecord is:
{ "VoterId": 0, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER", "host": "controller-0", "port": 1234} ]
{ "VoterId": 1, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER", "host": "controller-1", "port": 1234} ]
{ "VoterId": 2, "VoterUuid": "...", "Endpoints": [ {"name":
"CONTROLLER", "host": "controller-2", "port": 1234} ]

The property controller.quorum.bootstrap.servers is set to
"controller-0:1234,controller-1:1234,controller-2:1234".

The operator adds a new broker to the cluster. What happens at this
point? The broker starts with no VotersRecords in the
__cluster_metadata-0 topic partition. The KRaft client sends a Fetch
request to a random endpoint in the
controller.quorum.bootstrap.servers. Let's assume that the chosen
endpoint is not the leader. The remote replica will reply with a Fetch
response where the CurrentLeader is set to the current leader and
epoch. This is not enough information for the new broker to send the
next Fetch request. The broker needs to know the endpoint of the
leader.

This is just one example. There are many more examples where the
replica receiving the CurrentLeader in the response doesn't have
enough information to map the leader id (replica id) to an endpoint.

The value in controller.quorum.bootstrap.servers can also be a DNS A
record or CNAME record that maps to all of the voters. E.g.
"controller:1234" where "controller" resolves to "controller-0",
"controller-1" and "controller-2". This is useful for deployment
environments like Kubernetes.

ControllerRegistrationRequest contains the endpoints but those
endpoints are propagated through the __cluster_metadata-0 partition.
To replicate the __cluster_metadata-0 partition all replicas need to
be able to contact the leader.

> 22. Could we at least name the additional checkpoint file more consistently
> with the existing one? e.g. kraft-bootstrap.checkpoint?

Valid names for a checkpoint are
<snapshot-id-offset>-<snapshot-id-epoch>.checkpoint (there is code for
this in o.a.k.s.Snapshots). This is assumed in the KRaft
implementation. I want the bootstrapping checkpoint to integrate with
the rest of the implementation so we can use existing functionality
like RaftClient.Listener::handleLoadSnapshot, the snapshot deletion
logic, etc.

> 26. Since MinSupportedVersion and MaxSupportedVersion are stored in
> RegisterBrokerRecord, it's redundant to store them again in VotersRecord,
> right?

Yes. It is a duplication of the information. I would like to keep this
information consistent with the replica id and replica uuid.
Otherwise, we have issues where the information comes from different
points in time (different offsets in the log).

> 30. Currently, metadata.version controls all inter-broker RPCs, records and
> some cluster-wide functionalities. What does kraft.version control? Is
> there any RPC/record controlled by both features?

In summary, it controls the following changes to disk:
1. Version 1 of the QuorumStateData - I wanted to make this upgrade
explicit to allow for software binary downgrade if the kraft.version
is not increased. E.g. replace the Jar of a Kafka node from version
3.8 to version 3.7 as long as they don't explicitly upgrade the
kraft.version to 1.
2. The writing of VotersRecords to the log segments or snapshot.
3. The writing of Version 2 of the LeaderChangeRecord.

This also means that the kraft.version is also checked in RPCs that
attempt to write this data to disk. That includes AddVoter,
RemoveVoter and UpdateVoter RPC.

Thanks! I am going to update the KIP to document some of the feedback
provided above.
--
-José

Reply via email to