[ https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jialun Peng updated KAFKA-19048: -------------------------------- Description: h2. Motivation Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary. h2. Goals The proposed approach prioritizes the following objectives: # {*}Minimal Movement{*}: Minimize the number of replica relocations during rebalancing. # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed across brokers. # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures. h2. Proposed Changes h3. Rack-Level Replica Distribution The following rules ensure balanced replica allocation at the rack level: # *When* {{{}*rackCount = replicationFactor*{}}}: * ** Each rack receives exactly {{partitionCount}} replicas. # *When* {{{}*rackCount > replicationFactor*{}}}: * ** If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. * ** If weighted allocation {{{}< partitionCount{}}}: distribute remaining replicas using a weighted remainder allocation. h3. Node-Level Replica Distribution # If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others. # *When* {{{}*rackCount = replicationFactor*{}}}: * ** If all racks have an equal number of nodes, each node will host an equal number of replicas. * ** If rack sizes vary, nodes in larger racks will host fewer replicas on average. # *When* {{{}*rackCount > replicationFactor*{}}}: * ** If no rack has a significantly higher node weight, replicas will be evenly distributed. * ** If a rack has disproportionately high node weight, those nodes will receive fewer replicas. h3. Anti-Affinity Support When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks. In this way we can unify the implementation logic of rack-aware and non-rack-aware. *Replica Balancing* {*}Algorithm{*}{*}{*} Through the above steps, we can calculate the ideal replica count for each node and rack. Based on the initial replica distribution of topics, we obtain the current replica partition allocation across nodes and racks, allowing us to identify which nodes violate anti-affinity rules. We iterate through nodes with the following priority: # First process nodes that violate anti-affinity rules # Then process nodes whose current replica count exceeds the desired replica count (prioritizing those with the largest discrepancy) For these identified nodes, we relocate their replicas to target nodes that: * Satisfy all anti-affinity constraints * Have a current replica count below their ideal allocation This process continues iteratively until: * No nodes violate anti-affinity rules * All nodes' current replica counts match their desired replica counts Upon satisfying these conditions, we achieve balanced replica distribution across nodes. *Leader* *Balancing* *Algorithm* # *Target Leader Calculation:* ** Compute baseline average: {{leader_avg = floor(total_partitions / total_nodes)}} ** Identify broker where {{{}replica_count ≤ leader_avg{}}}: *** Designate all replicas as leaders on these brokers *** Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}} *** Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= processed_brokers{}}} ** Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy {{replica_count ≥ leader_avg}} # *Leader Assignment Constraints:* ** Final targets: *** Light {{{}brokers{}}}: {{target_leaders = replica_count}} *** Normal {{{}broker{}}}s: {{target_leaders = leader_avg}} For each partition, select the {{broker}} with the largest difference between its {{target_leaders }}and current leader count to become that partition's leader. Upon completing this traversal, we achieve uniform leader distribution across all {{{}brokers{}}}. was: h2. Motivation Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary. h2. Goals The proposed approach prioritizes the following objectives: # {*}Minimal Movement{*}: Minimize the number of replica relocations during rebalancing. # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed across brokers. # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures. h2. Proposed Changes h3. Rack-Level Replica Distribution The following rules ensure balanced replica allocation at the rack level: # *When* {{{}*rackCount = replicationFactor*{}}}: * ** Each rack receives exactly {{partitionCount}} replicas. # *When* {{{}*rackCount > replicationFactor*{}}}: * ** If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. * ** If weighted allocation {{{}< partitionCount{}}}: distribute remaining replicas using a weighted remainder allocation. h3. Node-Level Replica Distribution # If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others. # *When* {{{}*rackCount = replicationFactor*{}}}: * ** If all racks have an equal number of nodes, each node will host an equal number of replicas. * ** If rack sizes vary, nodes in larger racks will host fewer replicas on average. # *When* {{{}*rackCount > replicationFactor*{}}}: * ** If no rack has a significantly higher node weight, replicas will be evenly distributed. * ** If a rack has disproportionately high node weight, those nodes will receive fewer replicas. h3. Anti-Affinity Support When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks. In this way we can unify the implementation logic of rack-aware and non-rack-aware. *Replica Balancing* {*}Algorithm{*}{*}{*} Through the above steps, we can calculate the ideal replica count for each node and rack. Based on the initial replica distribution of topics, we obtain the current replica partition allocation across nodes and racks, allowing us to identify which nodes violate anti-affinity rules. We iterate through nodes with the following priority: # First process nodes that violate anti-affinity rules # Then process nodes whose current replica count exceeds the desired replica count (prioritizing those with the largest discrepancy) For these identified nodes, we relocate their replicas to target nodes that: * Satisfy all anti-affinity constraints * Have a current replica count below their ideal allocation This process continues iteratively until: * No nodes violate anti-affinity rules * All nodes' current replica counts match their desired replica counts Upon satisfying these conditions, we achieve balanced replica distribution across nodes. *Leader* *Balancing* *Algorithm* # *Target Leader Calculation:* ** Compute baseline average: {{leader_avg = floor(total_partitions / total_nodes)}} ** Identify nodes where {{{}replica_count ≤ leader_avg{}}}: *** Designate all replicas as leaders on these nodes *** Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}} *** Exclude nodes: {{remaining_nodes -= processed_nodes}} ** Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy {{replica_count ≥ leader_avg}} # *Leader Assignment Constraints:* ** Final targets: *** Light nodes: {{target_leaders = replica_count}} *** Normal nodes: {{{}target_leaders = leader_avg{}}}{{{}{}}}{{{}{}}} > Minimal Movement Replica Balancing algorithm > -------------------------------------------- > > Key: KAFKA-19048 > URL: https://issues.apache.org/jira/browse/KAFKA-19048 > Project: Kafka > Issue Type: Improvement > Components: generator > Reporter: Jialun Peng > Assignee: Jialun Peng > Priority: Major > > h2. Motivation > Kafka clusters typically require rebalancing of topic replicas after > horizontal scaling to evenly distribute the load across new and existing > brokers. The current rebalancing approach does not consider the existing > replica distribution, often resulting in excessive and unnecessary replica > movements. These unnecessary movements increase rebalance duration, consume > significant bandwidth and CPU resources, and potentially disrupt ongoing > production and consumption operations. Thus, a replica rebalancing strategy > that minimizes movements while achieving an even distribution of replicas is > necessary. > h2. Goals > The proposed approach prioritizes the following objectives: > # {*}Minimal Movement{*}: Minimize the number of replica relocations during > rebalancing. > # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed > across brokers. > # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. > # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. > # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent > failover traffic concentration in case of broker failures. > h2. Proposed Changes > h3. Rack-Level Replica Distribution > The following rules ensure balanced replica allocation at the rack level: > # *When* {{{}*rackCount = replicationFactor*{}}}: > * > ** Each rack receives exactly {{partitionCount}} replicas. > # *When* {{{}*rackCount > replicationFactor*{}}}: > * > ** If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ > partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. > * > ** If weighted allocation {{{}< partitionCount{}}}: distribute remaining > replicas using a weighted remainder allocation. > h3. Node-Level Replica Distribution > # If the number of replicas assigned to a rack is not a multiple of the > number of nodes in that rack, some nodes will host one additional replica > compared to others. > # *When* {{{}*rackCount = replicationFactor*{}}}: > * > ** If all racks have an equal number of nodes, each node will host an equal > number of replicas. > * > ** If rack sizes vary, nodes in larger racks will host fewer replicas on > average. > # *When* {{{}*rackCount > replicationFactor*{}}}: > * > ** If no rack has a significantly higher node weight, replicas will be > evenly distributed. > * > ** If a rack has disproportionately high node weight, those nodes will > receive fewer replicas. > h3. Anti-Affinity Support > When anti-affinity is enabled, the rebalance algorithm ensures that replicas > of the same partition do not colocate on the same rack. Brokers without rack > configuration are excluded from anti-affinity checks. > In this way we can unify the implementation logic of rack-aware and > non-rack-aware. > > *Replica Balancing* {*}Algorithm{*}{*}{*} > Through the above steps, we can calculate the ideal replica count for each > node and rack. > Based on the initial replica distribution of topics, we obtain the current > replica partition allocation across nodes and racks, allowing us to identify > which nodes violate anti-affinity rules. > We iterate through nodes with the following priority: > # First process nodes that violate anti-affinity rules > # Then process nodes whose current replica count exceeds the desired replica > count (prioritizing those with the largest discrepancy) > For these identified nodes, we relocate their replicas to target nodes that: > * Satisfy all anti-affinity constraints > * Have a current replica count below their ideal allocation > This process continues iteratively until: > * No nodes violate anti-affinity rules > * All nodes' current replica counts match their desired replica counts > Upon satisfying these conditions, we achieve balanced replica distribution > across nodes. > > *Leader* *Balancing* *Algorithm* > # *Target Leader Calculation:* > ** Compute baseline average: {{leader_avg = floor(total_partitions / > total_nodes)}} > ** Identify broker where {{{}replica_count ≤ leader_avg{}}}: > *** Designate all replicas as leaders on these brokers > *** Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}} > *** Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= > processed_brokers{}}} > ** Iteratively recalculate {{leader_avg}} until minimum replica nodes > satisfy {{replica_count ≥ leader_avg}} > # *Leader Assignment Constraints:* > ** Final targets: > *** Light {{{}brokers{}}}: {{target_leaders = replica_count}} > *** Normal {{{}broker{}}}s: {{target_leaders = leader_avg}} > For each partition, select the {{broker}} with the largest difference between > its {{target_leaders }}and current leader count to become that partition's > leader. Upon completing this traversal, we achieve uniform leader > distribution across all {{{}brokers{}}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010)