I am working with a Kafka 3.6.1 cluster (KRaft mode enabled) and would like 
some guidance on scaling Kafka brokers and controllers. Below are the details 
of my setup and the steps I followed, along with some challenges encountered. 
So before going on production, I tested the scaling process in a 2-node test 
environment. ( broker and KRaft controllers on the same nodes )

Test Cluster Setup:

    Initial Configuration:

    Nodes: 2
    Controller quorum configuration on nodes: 
controller.quorum.voters=0@172.26.1.103:9093,1@172.26.1.189:9093

    Scaling Process:

    Added a new node (172.26.1.81).
    Configured controller.quorum.voters on the new node as: 
controller.quorum.voters=0@172.26.1.103:9093,1@172.26.1.189:9093,2@172.26.1.81:9093
    Started Kafka on the new node, which connected successfully as an observer 
in the KRaft quorum.

Issues Encountered:

    The new node was listed as an observer instead of a voter. after starting

ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               0
LeaderEpoch:            7
HighWatermark:          33068
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [0,1]
CurrentObservers:       [2]

    Updating controller.quorum.voters on the old nodes caused an error:

[2024-12-02 12:04:11,314] ERROR [SharedServer id=0] Got exception while 
starting SharedServer (kafka.server.SharedServer)
java.lang.IllegalStateException: Configured voter set: [0, 1, 2] is different 
from the voter set read from the state file: [0, 1]. Check if the quorum 
configuration is up to date, or wipe out the local state file if necessary
at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:132)
at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:375)
at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:248)
at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:174)
at kafka.server.SharedServer.start(SharedServer.scala:260)
at kafka.server.SharedServer.startForController(SharedServer.scala:132)
at kafka.server.ControllerServer.startup(ControllerServer.scala:192)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at 
kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
[2024-12-02 12:04:11,325] INFO [ControllerServer id=0] Waiting for controller 
quorum voters future (kafka.server.ControllerServer)
[2024-12-02 12:04:11,328] INFO [ControllerServer id=0] Finished waiting for 
controller quorum voters future (kafka.server.ControllerServer)
[2024-12-02 12:04:11,331] ERROR Encountered fatal fault: caught exception 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.NullPointerException: Cannot invoke 
"kafka.raft.KafkaRaftManager.apiVersions(" because the return value of 
"kafka.server.SharedServer.raftManager()" is null
at kafka.server.ControllerServer.startup(ControllerServer.scala:205)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at 
kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)

So according to logs I need to “wipe out the local state file.” Okay so the 
file which contains the word “state” is located in the data.dir folder

/var/lib/kafka/data/__cluster-metadata-0

So I delete that file from old broker 103 and restart Kafka, which completed 
successfully. So I asked the 103 node about KRaft quorum status and got:

ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               2
LeaderEpoch:            125
HighWatermark:          84616
MaxFollowerLag:         84617
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [0,1,2]
CurrentObservers:       []
LeaderID is 2? What :)
Okay let’s ask the same on old node 189 and got:
ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               1
LeaderEpoch:            8
HighWatermark:          -1
MaxFollowerLag:         74376
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [0,1]
CurrentObservers:       []
Let’s ask the same on new node 81 and got:
ClusterId:              mXMb-Ah9Q8uNFoMtqGrBag
LeaderId:               2
LeaderEpoch:            125
HighWatermark:          84813
MaxFollowerLag:         84814
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [0,1,2]
CurrentObservers:       []

So it seems that the old node is mismatched from other nodes. Okay let’s delete 
the quorum-state file on the 189 node. After deleting that state file, I 
encountered the following error:

[2024-12-02 12:16:33,310] ERROR Encountered fatal fault: Unexpected error in 
raft I0 thread (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.IllegalStateException: Cannot transition to Follower with leaderId=2 
and epoch=125 since it is not one of the voters [0, 1]
at org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:382)
at 
org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:522)

What? :slight_smile: Okay, I decided to delete the state file on the newest 
node (81). Deleting the quorum-state file on affected nodes resolved the issue, 
but the process felt risky and unstructured.

Rebalancing Partitions: After adding the new node, I rebalanced the partitions 
using the following commands:

/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 
172.26.1.103:9092 --command-config /etc/kafka/admin.properties 
--topics-to-move-json-file topics.json --broker-list "0,1,2" > 
reassignment_plan.json
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 
172.26.1.103:9092 --command-config /etc/kafka/admin.properties --execute 
--reassignment-json-file reassignment_plan.json

The partitions balanced well across the brokers after waiting for 3–4 minutes.

Questions:

    What are the recommended steps to safely add new brokers and KRaft 
controllers to an existing Kafka cluster?
    Is it normal to require quorum-state file deletion during the scaling 
process?
    Are there tools or documentation specifically for scaling KRaft-based Kafka 
clusters that I might have missed? Any advice or feedback on my approach would 
be greatly appreciated!

Reply via email to