[ 
https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jialun Peng updated KAFKA-19048:
--------------------------------
    Labels: pull-request-available  (was: )

> Minimal Movement Replica Balancing algorithm
> --------------------------------------------
>
>                 Key: KAFKA-19048
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19048
>             Project: Kafka
>          Issue Type: Improvement
>          Components: generator
>            Reporter: Jialun Peng
>            Assignee: Jialun Peng
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Motivation
> Kafka clusters typically require rebalancing of topic replicas after 
> horizontal scaling to evenly distribute the load across new and existing 
> brokers. The current rebalancing approach does not consider the existing 
> replica distribution, often resulting in excessive and unnecessary replica 
> movements. These unnecessary movements increase rebalance duration, consume 
> significant bandwidth and CPU resources, and potentially disrupt ongoing 
> production and consumption operations. Thus, a replica rebalancing strategy 
> that minimizes movements while achieving an even distribution of replicas is 
> necessary.
> h2. Goals
> The proposed approach prioritizes the following objectives:
>  # {*}Minimal Movement{*}: Minimize the number of replica relocations during 
> rebalancing.
>  # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed 
> across brokers.
>  # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled.
>  # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers.
>  # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent 
> failover traffic concentration in case of broker failures.
> h2. Proposed Changes
> h3. Rack-Level Replica Distribution
> The following rules ensure balanced replica allocation at the rack level:
>  # *When* {{{}*rackCount = replicationFactor*{}}}:
>  * Each rack receives exactly {{partitionCount}} replicas.
>        **       2. *When* {{{}*rackCount > replicationFactor*{}}}:
>  * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ 
> partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas.
>  * If weighted allocation {{{}< partitionCount{}}}: distribute remaining 
> replicas using a weighted remainder allocation.
> h3. Node-Level Replica Distribution
>  # If the number of replicas assigned to a rack is not a multiple of the 
> number of nodes in that rack, some nodes will host one additional replica 
> compared to others.
>  # *When* {{{}*rackCount = replicationFactor*{}}}:
>  * If all racks have an equal number of nodes, each node will host an equal 
> number of replicas.
>  * If rack sizes vary, nodes in larger racks will host fewer replicas on 
> average.
>       **      3. *When* {{{}*rackCount > replicationFactor*{}}}:
>  * If no rack has a significantly higher node weight, replicas will be evenly 
> distributed.
>  * If a rack has disproportionately high node weight, those nodes will 
> receive fewer replicas.
> h3. Anti-Affinity Support
> When anti-affinity is enabled, the rebalance algorithm ensures that replicas 
> of the same partition do not colocate on the same rack. Brokers without rack 
> configuration are excluded from anti-affinity checks.
> In this way we can unify the implementation logic of rack-aware and 
> non-rack-aware.
>  
> *Replica Balancing* *Algorithm*
> Through the above steps, we can calculate the ideal replica count for each 
> node and rack.
> Based on the initial replica distribution of topics, we obtain the current 
> replica partition allocation across nodes and racks, allowing us to identify 
> which nodes violate anti-affinity rules.
> We iterate through nodes with the following priority:
>  # First process nodes that violate anti-affinity rules
>  # Then process nodes whose current replica count exceeds the desired replica 
> count (prioritizing those with the largest discrepancy)
> For these identified nodes, we relocate their replicas to target nodes that:
>  * Satisfy all anti-affinity constraints
>  * Have a current replica count below their ideal allocation
> This process continues iteratively until:
>  * No nodes violate anti-affinity rules
>  * All nodes' current replica counts match their desired replica counts
> Upon satisfying these conditions, we achieve balanced replica distribution 
> across nodes.
>  
> *Leader* *Balancing* *Algorithm*
> *Target Leader Calculation:*
> Compute baseline average: {{leader_avg = floor(total_partitions / 
> total_nodes)}}
> Identify broker where {{{}replica_count ≤ leader_avg{}}}:
>  * Designate all replicas as leaders on these brokers
>  * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}}
>  * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= 
> processed_brokers{}}}
> Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy 
> {{replica_count ≥ leader_avg}}
> *Leader Assignment Constraints:*
> Final targets:
>  * Light {{{}brokers{}}}: {{target_leaders = replica_count}}
>  * Normal {{{}broker{}}}s: {{target_leaders = leader_avg}}
>  
> For each partition, select the {{broker}} with the largest difference between 
> its {{{}target_leaders }}and current leader count to become that partition's 
> leader. Upon completing this traversal, we achieve uniform leader 
> distribution across all brokers{}}}.
>  
> *Optimizing ISR Order*
> During Leader Rebalancing, the leader of each partition is fixed and does not 
> change.
> {*}Tracking Node Pair Frequencies{*}:
>  * Iterate through all partitions and record the first replica (which is the 
> leader).
>  * Track the occurrences of broker pairs (broker pairs) formed by the first 
> and second replicas of each partition.
> {*}Optimized Selection of Subsequent Replicas{*}:
>  * For each partition, when selecting the second replica, choose a broker 
> that forms the least frequent node pair with the first replica.
>  * Continue this process iteratively for all replicas in the partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to