Hi Mazen,
Sorry, I was wrong. If all consumers were not subscribed to the same set of
topics, we'll invoke another sticky assignor algorithm (KIP-54).
And I think this algorithm will handle the unbalanced topic subscription
case well.

Ref:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

Please let me know if you have other questions.

Thanks.
Luke

On Tue, Mar 9, 2021 at 2:52 PM Luke Chen <show...@gmail.com> wrote:

> Hi Mazen,
> I'm not sure if I understand your question correctly.
> For your question:
>
>> 3 consumers 10 partitions .... and the reason for rebelancing is the
>> addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5
>> partitions and C3 has 3 partitions.... the case where C4 shall handle more
>> partitions than C3 (as C4 has less assigned partitions is not
>> considered...)
>>
>
> Are you asking what will happen after sticky assignor when C1 has 3
> partitions, C2 has 5 partitions and C3 has 3 partitions, and adding now
> member C4? If that's the question, the 11(3+3+5) partitions after
> assignment should be like:
> C1: 3 partitions
> C2: 3 partitions
> C3: 3 partitions
> C4: 2 partitions
>
> Is this what you thought?
>
> Or, you are saying that there are cases that C2 has more topics
> subscription than any other consumers, so your scenario is like:
> C1/C2/C3/C4 all subscribe to topic "allTopic" with 10 partitions
> C2 also subscribe to topic "onlyC2" with 1 partition
> Before C4 joined, there are already 5 partitions assigned to C2 (suppose 1
> partition is from topic "onlyC2"), and 3 partitions for C1 and C3. So after
> rebalance, we should assign less partitions to C2, ex:
>
> C1: 3 partitions
> C2: 2 partitions  ( + 1 partition from topic "onlyC2")
> C3: 3 partitions
> C4: 3 partitions
>
> instead of
> C1: 3 partitions
> C2: 3 partitions  ( + 1 partition from topic "onlyC2") = 4 partitions
> C3: 3 partitions
> C4: 2 partitions
>
> If it's the latter case, yes, we didn't consider this situation. Please
> open a jira ticket, I can help it. Not sure if there are different opinions.
>
>
> Thanks.
> Luke
>
> On Mon, Mar 8, 2021 at 7:58 PM Mazen Ezzeddine <
> mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:
>
>> Hi all,
>>
>> I was reading the sticky assignor code as shown in JIRA below.
>>
>>
>> =====================================================================================
>> Pseudo-code sketch of the algorithm:
>> C_f := (P/N)_floor, the floor capacity
>> C_c := (P/N)_ceil, the ceiling capacity
>>
>> members := the sorted set of all consumers
>> partitions := the set of all partitions
>> unassigned_partitions := the set of partitions not yet assigned,
>> initialized to be all partitions
>> unfilled_members := the set of consumers not yet at capacity, initialized
>> to empty
>> max_capacity_members := the set of members with exactly C_c partitions
>> assigned, initialized to empty
>> member.owned_partitions := the set of previously owned partitions encoded
>> in the Subscription
>>
>> // Reassign as many previously owned partitions as possible
>> for member : members
>>         remove any partitions that are no longer in the subscription from
>> its owned partitions
>>         remove all owned_partitions if the generation is old
>>         if member.owned_partitions.size < C_f
>>             assign all owned partitions to member and remove from
>> unassigned_partitions
>>             add member to unfilled_members
>>         else if member.owned_partitions.size == C_f
>>             assign first C_f owned_partitions to member and remove from
>> unassigned_partitions
>>         else
>>             assign first C_c owned_partitions to member and remove from
>> unassigned_partitions
>>             add member to max_capacity_members
>>
>> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0,
>> t0_p1, t1_p0 .... (for data parallelism)
>> sort unfilled_members by memberId (for determinism)
>>
>> // Fill remaining members up to C_f
>> for member : unfilled_members
>>     compute the remaining capacity as C = C_f - num_assigned_partitions
>>     pop the first C partitions from unassigned_partitions and assign to
>> member
>>
>> // Steal partitions from members with max_capacity if necessary
>> if we run out of partitions before getting to the end of unfilled members:
>>     for member : unfilled_members
>>         poll for first member in max_capacity_members and remove one
>> partition
>>         assign this partition to the unfilled member
>>
>> // Distribute remaining partitions, one per consumer, to fill some up to
>> C_c if necessary
>> if we run out of unfilled_members before assigning all partitions:
>>     for partition : unassigned_partitions
>>         assign to next member in members that is not in
>> max_capacity_members (then add member to max_capacity_members)
>>
>> =========================================================================================================================
>>
>>
>> Wouldn't it be better if we sort the unfilled_members by nb of partitions
>> assigned  instead of member id. Indeed, I was hand-debugging the code with
>> following :3 consumers 10 partitions .... and the reason for rebelancing is
>> the addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5
>> partitions and C3 has 3 partitions.... the case where C4 shall handle more
>> partitions than C3 (as C4 has less assigned partitions is not
>> considered...) and this is the case for in general when partitions are to
>> be reassigned/repacked : less partitions assigned consumers are not
>> prioritrized .... Does that make sense? am I missing something?
>>
>> Thank you.
>>
>>

Reply via email to