Hi Mazen, I'm not sure if I understand your question correctly. For your question:
> 3 consumers 10 partitions .... and the reason for rebelancing is the > addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5 > partitions and C3 has 3 partitions.... the case where C4 shall handle more > partitions than C3 (as C4 has less assigned partitions is not > considered...) > Are you asking what will happen after sticky assignor when C1 has 3 partitions, C2 has 5 partitions and C3 has 3 partitions, and adding now member C4? If that's the question, the 11(3+3+5) partitions after assignment should be like: C1: 3 partitions C2: 3 partitions C3: 3 partitions C4: 2 partitions Is this what you thought? Or, you are saying that there are cases that C2 has more topics subscription than any other consumers, so your scenario is like: C1/C2/C3/C4 all subscribe to topic "allTopic" with 10 partitions C2 also subscribe to topic "onlyC2" with 1 partition Before C4 joined, there are already 5 partitions assigned to C2 (suppose 1 partition is from topic "onlyC2"), and 3 partitions for C1 and C3. So after rebalance, we should assign less partitions to C2, ex: C1: 3 partitions C2: 2 partitions ( + 1 partition from topic "onlyC2") C3: 3 partitions C4: 3 partitions instead of C1: 3 partitions C2: 3 partitions ( + 1 partition from topic "onlyC2") = 4 partitions C3: 3 partitions C4: 2 partitions If it's the latter case, yes, we didn't consider this situation. Please open a jira ticket, I can help it. Not sure if there are different opinions. Thanks. Luke On Mon, Mar 8, 2021 at 7:58 PM Mazen Ezzeddine < mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > Hi all, > > I was reading the sticky assignor code as shown in JIRA below. > > > ===================================================================================== > Pseudo-code sketch of the algorithm: > C_f := (P/N)_floor, the floor capacity > C_c := (P/N)_ceil, the ceiling capacity > > members := the sorted set of all consumers > partitions := the set of all partitions > unassigned_partitions := the set of partitions not yet assigned, > initialized to be all partitions > unfilled_members := the set of consumers not yet at capacity, initialized > to empty > max_capacity_members := the set of members with exactly C_c partitions > assigned, initialized to empty > member.owned_partitions := the set of previously owned partitions encoded > in the Subscription > > // Reassign as many previously owned partitions as possible > for member : members > remove any partitions that are no longer in the subscription from > its owned partitions > remove all owned_partitions if the generation is old > if member.owned_partitions.size < C_f > assign all owned partitions to member and remove from > unassigned_partitions > add member to unfilled_members > else if member.owned_partitions.size == C_f > assign first C_f owned_partitions to member and remove from > unassigned_partitions > else > assign first C_c owned_partitions to member and remove from > unassigned_partitions > add member to max_capacity_members > > sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, > t0_p1, t1_p0 .... (for data parallelism) > sort unfilled_members by memberId (for determinism) > > // Fill remaining members up to C_f > for member : unfilled_members > compute the remaining capacity as C = C_f - num_assigned_partitions > pop the first C partitions from unassigned_partitions and assign to > member > > // Steal partitions from members with max_capacity if necessary > if we run out of partitions before getting to the end of unfilled members: > for member : unfilled_members > poll for first member in max_capacity_members and remove one > partition > assign this partition to the unfilled member > > // Distribute remaining partitions, one per consumer, to fill some up to > C_c if necessary > if we run out of unfilled_members before assigning all partitions: > for partition : unassigned_partitions > assign to next member in members that is not in > max_capacity_members (then add member to max_capacity_members) > > ========================================================================================================================= > > > Wouldn't it be better if we sort the unfilled_members by nb of partitions > assigned instead of member id. Indeed, I was hand-debugging the code with > following :3 consumers 10 partitions .... and the reason for rebelancing is > the addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5 > partitions and C3 has 3 partitions.... the case where C4 shall handle more > partitions than C3 (as C4 has less assigned partitions is not > considered...) and this is the case for in general when partitions are to > be reassigned/repacked : less partitions assigned consumers are not > prioritrized .... Does that make sense? am I missing something? > > Thank you. > >