Hi José,

Thanks for the replies and questions.

RE JS1: "Can you clarify that this KIP removes the need for all Kafka nodes
to be formatted prior to starting Kafka." Hmmm, I guess in the static
cluster case that skips formatting having a newer software version + older
MV is not a possible case, so I will remove that mention from the KIP. We
should default to the latest MV if we skip formatting, which will support
writing a ClusterIdRecord.

Right now, it is not completely clear to me how we can allow bootstrap
controllers (this applies mainly for kraft.version=0, since kraft.version=1
cannot elect a leader without proper formatting) to also skip formatting.
That is why I said in the proposed changes: "*Remove the requirement of
brokers and observer controllers to format before starting kafka"*. I agree
that KRaft can still elect a leader without clusterId in this case, but I'm
not completely sure how a QuorumController with an "empty" clusterId which
needs to be set later, should behave. My working idea is detailed in RE
JS6. This is required because the active controller needs to generate a
clusterId and write it back to KRaft upon activation in order for the
committed `ClusterIdRecord` to appear in records passed to
`RaftListener#handleCommit()`, so we cannot block its initialization.
Keeping the assumption that QuorumController.clusterId is final and
non-null would be nice, but that requires all KRaft voters to format with a
cluster.id. Let me know what you think about the best way to remove this
requirement.

RE JS2: My plan was to continue to write meta.properties V1 during
formatting with a `cluster.id` field like today, but also write a
`ClusterIdRecord` to the bootstrap snapshot for redundancy if the MV
supports it (I'm not sure if kafka is expected to handle only partial log
directory corruption/destruction). If the "bootstrap controller cluster id
check" from JS4 is correct, then the initial active controller is
guaranteed to have a non-null `cluster.id` in meta.properties. So long as
the MV supports it, the active controller would then write ClusterIdRecord
as part of the bootstrap records.

RE JS3: When I said this, I meant that the restriction of waiting for the
discovery of cluster.id to persist it to meta.properties during broker
startup is no more restrictive than what already currently exists, which is
being caught up to the HWM in order to register with the active controller.

RE JS 4: Yeah, I thought about this, specifically around the
kraft.version=1 case since it is less straightforward what a "bootstrap
controller" is. Under the current design, in kraft.version=0, any node who
is part of the `controller.quorum.voters` config must have
`meta.properties` with `cluster.id`. In kraft.version=1, any node who has a
`0-0.checkpoint` is considered a "bootstrap controller." This is a
heuristic, but I believe it is correct, since in order for the
0-0.checkpoint to not exist on a node which formatted with --standalone or
--initial-controllers, there must have either been another checkpoint with
committed records, which imply an elected initial leader, or a disk loss.
Whenever a voter with id X and initial directory-id A comes back as (X, B),
this process incarnation is an observer from the perspective of KRaft, and
I think we can assume it has neither `meta.properties` or `0-0.checkpoint`
if the operator did not format it (assumption from RE JS2 about the kinds
of storage failures we expect to handle are not partial directory
failures). In this case, the "bootstrap controller" check does not apply to
(X, B), and if auto-join is enabled, it will follow the steps detailed in
RE JS5 to recover and rejoin the voter set. If we remove the requirement on
all nodes to format, then we would not need to implement these checks.

RE JS5: An observer without clusterId who can auto-join will fetch until
its KafkaRaftClient updates the cluster id in-memory (basically, auto-join
is off until it discovers the leader's clusterId). If the observer has
clusterId, it needs to match the leader's to perform a successful fetch,
which is required for successfully adding a voter via auto-join.

RE JS6: Apologies, I meant to say a MetadataPublisher registered to the
MetadataLoader. Although, looking at this again, maybe this discovery +
persistence of clusterId can be handled by a new RaftListener instead. I
don't think we need the overhead of the MetadataImage + MetadataDelta for
this feature since a RaftListener's `handleCommit()` and
`handleLoadSnapshot()` contain `ClusterIdRecord`. However, this means
needing a third listener besides the MetadataLoader and QuorumMetaListener,
and therefore an additional call to log#read() when handling KRaft commits
+ snapshots. From my reading, it seems like the Kafka log layer does not
attempt any caching, and instead we rely on the OS page cache. Because of
this, I think we should be using MetadataPublisher, but let me know what
you think.

I am thinking of using an AtomicReference<String> to represent the
clusterId in-memory. This RaftListener/MetadataPublisher will be the only
writer to this value if it is not already defined by meta.properties, but
there are many readers of this value. The initial value of this reference
is null or the cluster.id from meta.properties. Upon reading
`ClusterIdRecord`, the listener will throw an exception if it has a
non-null clusterId and reads a ClusterIdRecord with a different ID. If it
does not have cluster.id set and reads a ClusterIdRecord, it will update
the AtomicReference and persist cluster.id to meta.properties. Let me know
if this approach sounds reasonable to you.

RE JS7: From what I understand about MetaPropertiesEnsemble and its
verify() method, I think it is reasonable to say our
RaftListener/MetadataPublisher will know how many (if any)
`meta.properties` files it is responsible for persisting cluster.id to
during the current process incarnation when it starts up. Currently we only
validate the MetaPropertiesEnsemble in two places: during formatting, and
during node startup. From what I understand, scenarios 1 and 2 should only
occur alongside a restart of the kafka process (to generate a new
directory-id and/or update log.dirs), but please correct me if this
assumption is wrong. I'm not sure if scenario 3 is referring to a partial
write of a given meta.properties (i.e. it does not contain cluster.id), or
not writing the discovered cluster.id to all meta.properties files on the
node before a crash. If a meta.properties does not exist in a log/metadata
log directory during startup, we need to write a V2 one without a cluster.id,
but we would be aware of this. If we succeed writing cluster.id to at least
one meta.properties via the ClusterIdRecord, I believe it is safe to write
that same value to the other meta.properties upon restart if they exist
because cluster.id does not change.

I may have previously removed this from the KIP, but given this discussion,
I believe it is only safe to update the in-memory cluster.id only after
writing this to all meta.properties on a node.

RE JS8: Okay, maybe I will just rewrite the section. My point was to say
something like: a node's discovery of the leader's committed cluster.id
relies on the discovery of a HWM and our RaftListener/MetadataPublisher to
be registered with the raft client, and that we need to wait for these
things before the startup logic in Controller/BrokerServer executes.
However, if our listener does not see the ClusterIdRecord in `handleCommit`
or `handleLoadSnapshot`, it can't do anything meaningful, so it is more
accurate to say we need to wait until ClusterIdRecord is committed.

On Thu, Mar 19, 2026 at 12:57 AM José Armando García Sancio via dev <
[email protected]> wrote:

> Hi Kevin, Thanks for the KIP and excuse my delay response.
>
> JS1: Can you clarify that this KIP removes the need for all Kafka
> nodes to be formatted pior to starting Kafka. However, this doesn't
> prevent users from formatting their broker with a cluster ID if they
> prefer. This is especially needed for Kafka nodes formatted for a
> cluster using an MV that doesn't support this feature.
>
> JS2: How are you planning to implement "kafka-storage format
> --clusterid YYY --standalone"? Is that going to behave like it does
> today by writing the cluster id to the meta.properties files? Or are
> you planning to write the cluster id using the ClusterIdRecord to the
> bootstrap.checkpoint or 0-0.checkpoint (after KIP-1170)?
>
> JS3: In one of your replies you say "Discovering the cluster id value
> for the first time would only require a single FetchSnapshot or a
> Fetch of the bootstrap metadata records." This is not entirely
> accurate. The best we can say is that brokers need to catch up to the
> HWM before they can send a registration requests to the active
> controller or it can start a few internal component. However, the
> broker already had this requirement prior to this KIP, so it is not
> new.
>
> JS4: In the KIP you mention "if meta.properties does not exist and the
> node is a bootstrap controller, throw a runtime exception." Can you
> explain how you plan to implement this? One important aspect to
> consider is that in KRaft voters (controllers) are identified by the
> node ID and directory ID. A node can recover from a disk failure by
> coming back with the same node ID but a different directory ID. In
> this case, the controller should auto-recover if the auto-join feature
> is enabled.
>
> JS5: In the KIP you mention "One detail here is that observer
> controllers with auto-join must wait until they have a cluster id
> before trying to add or remove themselves." I understand the reason
> for this requirement. If a node auto-joins the controller cluster, you
> must guarantee that it knows the cluster id in case it becomes the
> leader and needs to write the ClusterIDRecord. Can you elaborate on
> your implementation plan?
>
> JS6: In the KIP you mention "This can be implemented as a
> MetadataPublisher that registers to the raft client alongside the
> MetadataLoader." Metadata publishers don't register with the KRaft
> client. RaftClient.Listener register with the KRaft client. Metadata
> publisher register with the metadata loader instead.
>
> JS7: One complexity is that there is a meta.properties per log
> directory and metadata log directory. This means that in the stable
> case the cluster ID exists in all the meta.properties files.
> Unfortunately, this may not be the case for several reasons: 1) the
> disk was replaced, 2) a new disk was added, or 3) the write operation
> was only partially successful. How do you plan to handle this case?
> Consider that the controller and the broker can run on the same JVM
> and use a log directory different from the metadata log directory.
> Controllers only read and write to the metadata log directory.
>
> JS8: In the KIP you mention "Learning of a HWM from the leader, which
> the leader allows for because it will send valid fetch responses back
> to nodes that do not have a cluster id." One implementation complexity
> is that KRaft can discover the HWM and send a handleCommit event
> without having fetched all data up to the HWM. What KRaft guarantees
> is that the active leader will not receive a handleLeaderChange event
> until it has caught up to the leader's epoch. How do you plan to
> implement this?
>
> Thanks,
> --
> -José
>

Reply via email to