Hi all, I was reading the sticky assignor code as shown in JIRA below.
===================================================================================== Pseudo-code sketch of the algorithm: C_f := (P/N)_floor, the floor capacity C_c := (P/N)_ceil, the ceiling capacity members := the sorted set of all consumers partitions := the set of all partitions unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions unfilled_members := the set of consumers not yet at capacity, initialized to empty max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty member.owned_partitions := the set of previously owned partitions encoded in the Subscription // Reassign as many previously owned partitions as possible for member : members remove any partitions that are no longer in the subscription from its owned partitions remove all owned_partitions if the generation is old if member.owned_partitions.size < C_f assign all owned partitions to member and remove from unassigned_partitions add member to unfilled_members else if member.owned_partitions.size == C_f assign first C_f owned_partitions to member and remove from unassigned_partitions else assign first C_c owned_partitions to member and remove from unassigned_partitions add member to max_capacity_members sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism) sort unfilled_members by memberId (for determinism) // Fill remaining members up to C_f for member : unfilled_members compute the remaining capacity as C = C_f - num_assigned_partitions pop the first C partitions from unassigned_partitions and assign to member // Steal partitions from members with max_capacity if necessary if we run out of partitions before getting to the end of unfilled members: for member : unfilled_members poll for first member in max_capacity_members and remove one partition assign this partition to the unfilled member // Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary if we run out of unfilled_members before assigning all partitions: for partition : unassigned_partitions assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members) ========================================================================================================================= Wouldn't it be better if we sort the unfilled_members by nb of partitions assigned instead of member id. Indeed, I was hand-debugging the code with following :3 consumers 10 partitions .... and the reason for rebelancing is the addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5 partitions and C3 has 3 partitions.... the case where C4 shall handle more partitions than C3 (as C4 has less assigned partitions is not considered...) and this is the case for in general when partitions are to be reassigned/repacked : less partitions assigned consumers are not prioritrized .... Does that make sense? am I missing something? Thank you.