[ https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jialun Peng updated KAFKA-19048: -------------------------------- Labels: pull-request-available (was: ) > Minimal Movement Replica Balancing algorithm > -------------------------------------------- > > Key: KAFKA-19048 > URL: https://issues.apache.org/jira/browse/KAFKA-19048 > Project: Kafka > Issue Type: Improvement > Components: generator > Reporter: Jialun Peng > Assignee: Jialun Peng > Priority: Major > Labels: pull-request-available > > h2. Motivation > Kafka clusters typically require rebalancing of topic replicas after > horizontal scaling to evenly distribute the load across new and existing > brokers. The current rebalancing approach does not consider the existing > replica distribution, often resulting in excessive and unnecessary replica > movements. These unnecessary movements increase rebalance duration, consume > significant bandwidth and CPU resources, and potentially disrupt ongoing > production and consumption operations. Thus, a replica rebalancing strategy > that minimizes movements while achieving an even distribution of replicas is > necessary. > h2. Goals > The proposed approach prioritizes the following objectives: > # {*}Minimal Movement{*}: Minimize the number of replica relocations during > rebalancing. > # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed > across brokers. > # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. > # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. > # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent > failover traffic concentration in case of broker failures. > h2. Proposed Changes > h3. Rack-Level Replica Distribution > The following rules ensure balanced replica allocation at the rack level: > # *When* {{{}*rackCount = replicationFactor*{}}}: > * Each rack receives exactly {{partitionCount}} replicas. > ** 2. *When* {{{}*rackCount > replicationFactor*{}}}: > * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ > partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. > * If weighted allocation {{{}< partitionCount{}}}: distribute remaining > replicas using a weighted remainder allocation. > h3. Node-Level Replica Distribution > # If the number of replicas assigned to a rack is not a multiple of the > number of nodes in that rack, some nodes will host one additional replica > compared to others. > # *When* {{{}*rackCount = replicationFactor*{}}}: > * If all racks have an equal number of nodes, each node will host an equal > number of replicas. > * If rack sizes vary, nodes in larger racks will host fewer replicas on > average. > ** 3. *When* {{{}*rackCount > replicationFactor*{}}}: > * If no rack has a significantly higher node weight, replicas will be evenly > distributed. > * If a rack has disproportionately high node weight, those nodes will > receive fewer replicas. > h3. Anti-Affinity Support > When anti-affinity is enabled, the rebalance algorithm ensures that replicas > of the same partition do not colocate on the same rack. Brokers without rack > configuration are excluded from anti-affinity checks. > In this way we can unify the implementation logic of rack-aware and > non-rack-aware. > > *Replica Balancing* *Algorithm* > Through the above steps, we can calculate the ideal replica count for each > node and rack. > Based on the initial replica distribution of topics, we obtain the current > replica partition allocation across nodes and racks, allowing us to identify > which nodes violate anti-affinity rules. > We iterate through nodes with the following priority: > # First process nodes that violate anti-affinity rules > # Then process nodes whose current replica count exceeds the desired replica > count (prioritizing those with the largest discrepancy) > For these identified nodes, we relocate their replicas to target nodes that: > * Satisfy all anti-affinity constraints > * Have a current replica count below their ideal allocation > This process continues iteratively until: > * No nodes violate anti-affinity rules > * All nodes' current replica counts match their desired replica counts > Upon satisfying these conditions, we achieve balanced replica distribution > across nodes. > > *Leader* *Balancing* *Algorithm* > *Target Leader Calculation:* > Compute baseline average: {{leader_avg = floor(total_partitions / > total_nodes)}} > Identify broker where {{{}replica_count ≤ leader_avg{}}}: > * Designate all replicas as leaders on these brokers > * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}} > * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= > processed_brokers{}}} > Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy > {{replica_count ≥ leader_avg}} > *Leader Assignment Constraints:* > Final targets: > * Light {{{}brokers{}}}: {{target_leaders = replica_count}} > * Normal {{{}broker{}}}s: {{target_leaders = leader_avg}} > > For each partition, select the {{broker}} with the largest difference between > its {{{}target_leaders }}and current leader count to become that partition's > leader. Upon completing this traversal, we achieve uniform leader > distribution across all brokers{}}}. > > *Optimizing ISR Order* > During Leader Rebalancing, the leader of each partition is fixed and does not > change. > {*}Tracking Node Pair Frequencies{*}: > * Iterate through all partitions and record the first replica (which is the > leader). > * Track the occurrences of broker pairs (broker pairs) formed by the first > and second replicas of each partition. > {*}Optimized Selection of Subsequent Replicas{*}: > * For each partition, when selecting the second replica, choose a broker > that forms the least frequent node pair with the first replica. > * Continue this process iteratively for all replicas in the partition. -- This message was sent by Atlassian Jira (v8.20.10#820010)