[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106832#comment-17106832
 ] 

Sophie Blee-Goldman commented on KAFKA-9987:
--------------------------------------------

Pseudo-code sketch of the algorithm:
{{C_f := (P/N)_floor, the floor capacity}}
{{C_c := (P/N)_ceil, the ceiling capacity}}
 
{{members := the sorted set of all consumers}}
{{partitions := the set of all partitions}}
{{unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions}}
{{unfilled_members := the set of consumers not yet at capacity, initialized to 
empty}}
{{max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty}}
{{member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription}}
 
{{// Reassign as many previously owned partitions as possible}}
{{for member : members}}
{{        }}{{remove any partitions that are no longer in the subscription from 
its owned partitions}}
{{        }}{{remove all owned_partitions if the generation is old}}
{{        }}{{if member.owned_partitions.size < C_f}}
{{            }}{{assign all owned partitions to member and remove from 
unassigned_partitions}}
{{            }}{{add member to unfilled_members}}
{{        }}{{else if member.owned_partitions.size == C_f}}
{{            }}{{assign first C_f owned_partitions to member and remove from 
unassigned_partitions}}
{{        }}{{else}}
{{            }}{{assign first C_c owned_partitions to member and remove from 
unassigned_partitions}}
{{            }}{{add member to max_capacity_members}}
 
{{sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0 .... (for data parallelism)}}
{{sort unfilled_members by memberId (for determinism)}}
 
{{// Fill remaining members up to C_f}}
{{for member : unfilled_members}}
{{    }}{{compute the remaining capacity as C = C_f - num_assigned_partitions}}
{{    }}{{pop the first C partitions from unassigned_partitions and assign to 
member}}
 
{{// Steal partitions from members with max_capacity if necessary}}
{{if we run out of partitions before getting to the end of unfilled members:}}
{{    }}{{for member : unfilled_members}}
{{        }}{{poll for first member in max_capacity_members and remove one 
partition}}
{{        }}{{assign this partition to the unfilled member}}
{{    }} 
{{// Distribute remaining partitions, one per consumer, to fill some up to C_c 
if necessary}}
{{if we run out of unfilled_members before assigning all partitions:}}
{{    }}{{for partition : unassigned_partitions}}
{{        }}{{assign to next member in members that is not in 
max_capacity_members (then add member to max_capacity_members)}}

> Improve sticky partition assignor algorithm
> -------------------------------------------
>
>                 Key: KAFKA-9987
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9987
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Sophie Blee-Goldman
>            Assignee: Sophie Blee-Goldman
>            Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to