Hi Mazen, Sorry, I was wrong. If all consumers were not subscribed to the same set of topics, we'll invoke another sticky assignor algorithm (KIP-54). And I think this algorithm will handle the unbalanced topic subscription case well.
Ref: https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy Please let me know if you have other questions. Thanks. Luke On Tue, Mar 9, 2021 at 2:52 PM Luke Chen <show...@gmail.com> wrote: > Hi Mazen, > I'm not sure if I understand your question correctly. > For your question: > >> 3 consumers 10 partitions .... and the reason for rebelancing is the >> addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5 >> partitions and C3 has 3 partitions.... the case where C4 shall handle more >> partitions than C3 (as C4 has less assigned partitions is not >> considered...) >> > > Are you asking what will happen after sticky assignor when C1 has 3 > partitions, C2 has 5 partitions and C3 has 3 partitions, and adding now > member C4? If that's the question, the 11(3+3+5) partitions after > assignment should be like: > C1: 3 partitions > C2: 3 partitions > C3: 3 partitions > C4: 2 partitions > > Is this what you thought? > > Or, you are saying that there are cases that C2 has more topics > subscription than any other consumers, so your scenario is like: > C1/C2/C3/C4 all subscribe to topic "allTopic" with 10 partitions > C2 also subscribe to topic "onlyC2" with 1 partition > Before C4 joined, there are already 5 partitions assigned to C2 (suppose 1 > partition is from topic "onlyC2"), and 3 partitions for C1 and C3. So after > rebalance, we should assign less partitions to C2, ex: > > C1: 3 partitions > C2: 2 partitions ( + 1 partition from topic "onlyC2") > C3: 3 partitions > C4: 3 partitions > > instead of > C1: 3 partitions > C2: 3 partitions ( + 1 partition from topic "onlyC2") = 4 partitions > C3: 3 partitions > C4: 2 partitions > > If it's the latter case, yes, we didn't consider this situation. Please > open a jira ticket, I can help it. Not sure if there are different opinions. > > > Thanks. > Luke > > On Mon, Mar 8, 2021 at 7:58 PM Mazen Ezzeddine < > mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > >> Hi all, >> >> I was reading the sticky assignor code as shown in JIRA below. >> >> >> ===================================================================================== >> Pseudo-code sketch of the algorithm: >> C_f := (P/N)_floor, the floor capacity >> C_c := (P/N)_ceil, the ceiling capacity >> >> members := the sorted set of all consumers >> partitions := the set of all partitions >> unassigned_partitions := the set of partitions not yet assigned, >> initialized to be all partitions >> unfilled_members := the set of consumers not yet at capacity, initialized >> to empty >> max_capacity_members := the set of members with exactly C_c partitions >> assigned, initialized to empty >> member.owned_partitions := the set of previously owned partitions encoded >> in the Subscription >> >> // Reassign as many previously owned partitions as possible >> for member : members >> remove any partitions that are no longer in the subscription from >> its owned partitions >> remove all owned_partitions if the generation is old >> if member.owned_partitions.size < C_f >> assign all owned partitions to member and remove from >> unassigned_partitions >> add member to unfilled_members >> else if member.owned_partitions.size == C_f >> assign first C_f owned_partitions to member and remove from >> unassigned_partitions >> else >> assign first C_c owned_partitions to member and remove from >> unassigned_partitions >> add member to max_capacity_members >> >> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, >> t0_p1, t1_p0 .... (for data parallelism) >> sort unfilled_members by memberId (for determinism) >> >> // Fill remaining members up to C_f >> for member : unfilled_members >> compute the remaining capacity as C = C_f - num_assigned_partitions >> pop the first C partitions from unassigned_partitions and assign to >> member >> >> // Steal partitions from members with max_capacity if necessary >> if we run out of partitions before getting to the end of unfilled members: >> for member : unfilled_members >> poll for first member in max_capacity_members and remove one >> partition >> assign this partition to the unfilled member >> >> // Distribute remaining partitions, one per consumer, to fill some up to >> C_c if necessary >> if we run out of unfilled_members before assigning all partitions: >> for partition : unassigned_partitions >> assign to next member in members that is not in >> max_capacity_members (then add member to max_capacity_members) >> >> ========================================================================================================================= >> >> >> Wouldn't it be better if we sort the unfilled_members by nb of partitions >> assigned instead of member id. Indeed, I was hand-debugging the code with >> following :3 consumers 10 partitions .... and the reason for rebelancing is >> the addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5 >> partitions and C3 has 3 partitions.... the case where C4 shall handle more >> partitions than C3 (as C4 has less assigned partitions is not >> considered...) and this is the case for in general when partitions are to >> be reassigned/repacked : less partitions assigned consumers are not >> prioritrized .... Does that make sense? am I missing something? >> >> Thank you. >> >>