Hi José, Thanks for the replies and questions.
RE JS1: "Can you clarify that this KIP removes the need for all Kafka nodes to be formatted prior to starting Kafka." Hmmm, I guess in the static cluster case that skips formatting having a newer software version + older MV is not a possible case, so I will remove that mention from the KIP. We should default to the latest MV if we skip formatting, which will support writing a ClusterIdRecord. Right now, it is not completely clear to me how we can allow bootstrap controllers (this applies mainly for kraft.version=0, since kraft.version=1 cannot elect a leader without proper formatting) to also skip formatting. That is why I said in the proposed changes: "*Remove the requirement of brokers and observer controllers to format before starting kafka"*. I agree that KRaft can still elect a leader without clusterId in this case, but I'm not completely sure how a QuorumController with an "empty" clusterId which needs to be set later, should behave. My working idea is detailed in RE JS6. This is required because the active controller needs to generate a clusterId and write it back to KRaft upon activation in order for the committed `ClusterIdRecord` to appear in records passed to `RaftListener#handleCommit()`, so we cannot block its initialization. Keeping the assumption that QuorumController.clusterId is final and non-null would be nice, but that requires all KRaft voters to format with a cluster.id. Let me know what you think about the best way to remove this requirement. RE JS2: My plan was to continue to write meta.properties V1 during formatting with a `cluster.id` field like today, but also write a `ClusterIdRecord` to the bootstrap snapshot for redundancy if the MV supports it (I'm not sure if kafka is expected to handle only partial log directory corruption/destruction). If the "bootstrap controller cluster id check" from JS4 is correct, then the initial active controller is guaranteed to have a non-null `cluster.id` in meta.properties. So long as the MV supports it, the active controller would then write ClusterIdRecord as part of the bootstrap records. RE JS3: When I said this, I meant that the restriction of waiting for the discovery of cluster.id to persist it to meta.properties during broker startup is no more restrictive than what already currently exists, which is being caught up to the HWM in order to register with the active controller. RE JS 4: Yeah, I thought about this, specifically around the kraft.version=1 case since it is less straightforward what a "bootstrap controller" is. Under the current design, in kraft.version=0, any node who is part of the `controller.quorum.voters` config must have `meta.properties` with `cluster.id`. In kraft.version=1, any node who has a `0-0.checkpoint` is considered a "bootstrap controller." This is a heuristic, but I believe it is correct, since in order for the 0-0.checkpoint to not exist on a node which formatted with --standalone or --initial-controllers, there must have either been another checkpoint with committed records, which imply an elected initial leader, or a disk loss. Whenever a voter with id X and initial directory-id A comes back as (X, B), this process incarnation is an observer from the perspective of KRaft, and I think we can assume it has neither `meta.properties` or `0-0.checkpoint` if the operator did not format it (assumption from RE JS2 about the kinds of storage failures we expect to handle are not partial directory failures). In this case, the "bootstrap controller" check does not apply to (X, B), and if auto-join is enabled, it will follow the steps detailed in RE JS5 to recover and rejoin the voter set. If we remove the requirement on all nodes to format, then we would not need to implement these checks. RE JS5: An observer without clusterId who can auto-join will fetch until its KafkaRaftClient updates the cluster id in-memory (basically, auto-join is off until it discovers the leader's clusterId). If the observer has clusterId, it needs to match the leader's to perform a successful fetch, which is required for successfully adding a voter via auto-join. RE JS6: Apologies, I meant to say a MetadataPublisher registered to the MetadataLoader. Although, looking at this again, maybe this discovery + persistence of clusterId can be handled by a new RaftListener instead. I don't think we need the overhead of the MetadataImage + MetadataDelta for this feature since a RaftListener's `handleCommit()` and `handleLoadSnapshot()` contain `ClusterIdRecord`. However, this means needing a third listener besides the MetadataLoader and QuorumMetaListener, and therefore an additional call to log#read() when handling KRaft commits + snapshots. From my reading, it seems like the Kafka log layer does not attempt any caching, and instead we rely on the OS page cache. Because of this, I think we should be using MetadataPublisher, but let me know what you think. I am thinking of using an AtomicReference<String> to represent the clusterId in-memory. This RaftListener/MetadataPublisher will be the only writer to this value if it is not already defined by meta.properties, but there are many readers of this value. The initial value of this reference is null or the cluster.id from meta.properties. Upon reading `ClusterIdRecord`, the listener will throw an exception if it has a non-null clusterId and reads a ClusterIdRecord with a different ID. If it does not have cluster.id set and reads a ClusterIdRecord, it will update the AtomicReference and persist cluster.id to meta.properties. Let me know if this approach sounds reasonable to you. RE JS7: From what I understand about MetaPropertiesEnsemble and its verify() method, I think it is reasonable to say our RaftListener/MetadataPublisher will know how many (if any) `meta.properties` files it is responsible for persisting cluster.id to during the current process incarnation when it starts up. Currently we only validate the MetaPropertiesEnsemble in two places: during formatting, and during node startup. From what I understand, scenarios 1 and 2 should only occur alongside a restart of the kafka process (to generate a new directory-id and/or update log.dirs), but please correct me if this assumption is wrong. I'm not sure if scenario 3 is referring to a partial write of a given meta.properties (i.e. it does not contain cluster.id), or not writing the discovered cluster.id to all meta.properties files on the node before a crash. If a meta.properties does not exist in a log/metadata log directory during startup, we need to write a V2 one without a cluster.id, but we would be aware of this. If we succeed writing cluster.id to at least one meta.properties via the ClusterIdRecord, I believe it is safe to write that same value to the other meta.properties upon restart if they exist because cluster.id does not change. I may have previously removed this from the KIP, but given this discussion, I believe it is only safe to update the in-memory cluster.id only after writing this to all meta.properties on a node. RE JS8: Okay, maybe I will just rewrite the section. My point was to say something like: a node's discovery of the leader's committed cluster.id relies on the discovery of a HWM and our RaftListener/MetadataPublisher to be registered with the raft client, and that we need to wait for these things before the startup logic in Controller/BrokerServer executes. However, if our listener does not see the ClusterIdRecord in `handleCommit` or `handleLoadSnapshot`, it can't do anything meaningful, so it is more accurate to say we need to wait until ClusterIdRecord is committed. On Thu, Mar 19, 2026 at 12:57 AM José Armando García Sancio via dev < [email protected]> wrote: > Hi Kevin, Thanks for the KIP and excuse my delay response. > > JS1: Can you clarify that this KIP removes the need for all Kafka > nodes to be formatted pior to starting Kafka. However, this doesn't > prevent users from formatting their broker with a cluster ID if they > prefer. This is especially needed for Kafka nodes formatted for a > cluster using an MV that doesn't support this feature. > > JS2: How are you planning to implement "kafka-storage format > --clusterid YYY --standalone"? Is that going to behave like it does > today by writing the cluster id to the meta.properties files? Or are > you planning to write the cluster id using the ClusterIdRecord to the > bootstrap.checkpoint or 0-0.checkpoint (after KIP-1170)? > > JS3: In one of your replies you say "Discovering the cluster id value > for the first time would only require a single FetchSnapshot or a > Fetch of the bootstrap metadata records." This is not entirely > accurate. The best we can say is that brokers need to catch up to the > HWM before they can send a registration requests to the active > controller or it can start a few internal component. However, the > broker already had this requirement prior to this KIP, so it is not > new. > > JS4: In the KIP you mention "if meta.properties does not exist and the > node is a bootstrap controller, throw a runtime exception." Can you > explain how you plan to implement this? One important aspect to > consider is that in KRaft voters (controllers) are identified by the > node ID and directory ID. A node can recover from a disk failure by > coming back with the same node ID but a different directory ID. In > this case, the controller should auto-recover if the auto-join feature > is enabled. > > JS5: In the KIP you mention "One detail here is that observer > controllers with auto-join must wait until they have a cluster id > before trying to add or remove themselves." I understand the reason > for this requirement. If a node auto-joins the controller cluster, you > must guarantee that it knows the cluster id in case it becomes the > leader and needs to write the ClusterIDRecord. Can you elaborate on > your implementation plan? > > JS6: In the KIP you mention "This can be implemented as a > MetadataPublisher that registers to the raft client alongside the > MetadataLoader." Metadata publishers don't register with the KRaft > client. RaftClient.Listener register with the KRaft client. Metadata > publisher register with the metadata loader instead. > > JS7: One complexity is that there is a meta.properties per log > directory and metadata log directory. This means that in the stable > case the cluster ID exists in all the meta.properties files. > Unfortunately, this may not be the case for several reasons: 1) the > disk was replaced, 2) a new disk was added, or 3) the write operation > was only partially successful. How do you plan to handle this case? > Consider that the controller and the broker can run on the same JVM > and use a log directory different from the metadata log directory. > Controllers only read and write to the metadata log directory. > > JS8: In the KIP you mention "Learning of a HWM from the leader, which > the leader allows for because it will send valid fetch responses back > to nodes that do not have a cluster id." One implementation complexity > is that KRaft can discover the HWM and send a handleCommit event > without having fetched all data up to the HWM. What KRaft guarantees > is that the active leader will not receive a handleLeaderChange event > until it has caught up to the leader's epoch. How do you plan to > implement this? > > Thanks, > -- > -José >
