Hi all,

I was reading the sticky assignor code as shown in JIRA below.

=====================================================================================
Pseudo-code sketch of the algorithm:
C_f := (P/N)_floor, the floor capacity
C_c := (P/N)_ceil, the ceiling capacity

members := the sorted set of all consumers
partitions := the set of all partitions
unassigned_partitions := the set of partitions not yet assigned, initialized to 
be all partitions
unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty
member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription

// Reassign as many previously owned partitions as possible
for member : members
        remove any partitions that are no longer in the subscription from its 
owned partitions
        remove all owned_partitions if the generation is old
        if member.owned_partitions.size < C_f
            assign all owned partitions to member and remove from 
unassigned_partitions
            add member to unfilled_members
        else if member.owned_partitions.size == C_f
            assign first C_f owned_partitions to member and remove from 
unassigned_partitions
        else
            assign first C_c owned_partitions to member and remove from 
unassigned_partitions
            add member to max_capacity_members

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0 .... (for data parallelism)
sort unfilled_members by memberId (for determinism)

// Fill remaining members up to C_f
for member : unfilled_members
    compute the remaining capacity as C = C_f - num_assigned_partitions
    pop the first C partitions from unassigned_partitions and assign to member

// Steal partitions from members with max_capacity if necessary
if we run out of partitions before getting to the end of unfilled members:
    for member : unfilled_members
        poll for first member in max_capacity_members and remove one partition
        assign this partition to the unfilled member

// Distribute remaining partitions, one per consumer, to fill some up to C_c if 
necessary
if we run out of unfilled_members before assigning all partitions:
    for partition : unassigned_partitions
        assign to next member in members that is not in max_capacity_members 
(then add member to max_capacity_members)
=========================================================================================================================


Wouldn't it be better if we sort the unfilled_members by nb of partitions 
assigned  instead of member id. Indeed, I was hand-debugging the code with 
following :3 consumers 10 partitions .... and the reason for rebelancing is the 
addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5 partitions 
and C3 has 3 partitions.... the case where C4 shall handle more partitions than 
C3 (as C4 has less assigned partitions is not considered...) and this is the 
case for in general when partitions are to be reassigned/repacked : less 
partitions assigned consumers are not prioritrized .... Does that make sense? 
am I missing something?

Thank you.

Reply via email to