[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106832#comment-17106832 ]
Sophie Blee-Goldman commented on KAFKA-9987: -------------------------------------------- Pseudo-code sketch of the algorithm: {{C_f := (P/N)_floor, the floor capacity}} {{C_c := (P/N)_ceil, the ceiling capacity}} {{members := the sorted set of all consumers}} {{partitions := the set of all partitions}} {{unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions}} {{unfilled_members := the set of consumers not yet at capacity, initialized to empty}} {{max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty}} {{member.owned_partitions := the set of previously owned partitions encoded in the Subscription}} {{// Reassign as many previously owned partitions as possible}} {{for member : members}} {{ }}{{remove any partitions that are no longer in the subscription from its owned partitions}} {{ }}{{remove all owned_partitions if the generation is old}} {{ }}{{if member.owned_partitions.size < C_f}} {{ }}{{assign all owned partitions to member and remove from unassigned_partitions}} {{ }}{{add member to unfilled_members}} {{ }}{{else if member.owned_partitions.size == C_f}} {{ }}{{assign first C_f owned_partitions to member and remove from unassigned_partitions}} {{ }}{{else}} {{ }}{{assign first C_c owned_partitions to member and remove from unassigned_partitions}} {{ }}{{add member to max_capacity_members}} {{sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism)}} {{sort unfilled_members by memberId (for determinism)}} {{// Fill remaining members up to C_f}} {{for member : unfilled_members}} {{ }}{{compute the remaining capacity as C = C_f - num_assigned_partitions}} {{ }}{{pop the first C partitions from unassigned_partitions and assign to member}} {{// Steal partitions from members with max_capacity if necessary}} {{if we run out of partitions before getting to the end of unfilled members:}} {{ }}{{for member : unfilled_members}} {{ }}{{poll for first member in max_capacity_members and remove one partition}} {{ }}{{assign this partition to the unfilled member}} {{ }} {{// Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary}} {{if we run out of unfilled_members before assigning all partitions:}} {{ }}{{for partition : unassigned_partitions}} {{ }}{{assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members)}} > Improve sticky partition assignor algorithm > ------------------------------------------- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Sophie Blee-Goldman > Assignee: Sophie Blee-Goldman > Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)