Hi Mazen,
I'm not sure if I understand your question correctly.
For your question:

> 3 consumers 10 partitions .... and the reason for rebelancing is the
> addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5
> partitions and C3 has 3 partitions.... the case where C4 shall handle more
> partitions than C3 (as C4 has less assigned partitions is not
> considered...)
>

Are you asking what will happen after sticky assignor when C1 has 3
partitions, C2 has 5 partitions and C3 has 3 partitions, and adding now
member C4? If that's the question, the 11(3+3+5) partitions after
assignment should be like:
C1: 3 partitions
C2: 3 partitions
C3: 3 partitions
C4: 2 partitions

Is this what you thought?

Or, you are saying that there are cases that C2 has more topics
subscription than any other consumers, so your scenario is like:
C1/C2/C3/C4 all subscribe to topic "allTopic" with 10 partitions
C2 also subscribe to topic "onlyC2" with 1 partition
Before C4 joined, there are already 5 partitions assigned to C2 (suppose 1
partition is from topic "onlyC2"), and 3 partitions for C1 and C3. So after
rebalance, we should assign less partitions to C2, ex:

C1: 3 partitions
C2: 2 partitions  ( + 1 partition from topic "onlyC2")
C3: 3 partitions
C4: 3 partitions

instead of
C1: 3 partitions
C2: 3 partitions  ( + 1 partition from topic "onlyC2") = 4 partitions
C3: 3 partitions
C4: 2 partitions

If it's the latter case, yes, we didn't consider this situation. Please
open a jira ticket, I can help it. Not sure if there are different opinions.


Thanks.
Luke

On Mon, Mar 8, 2021 at 7:58 PM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Hi all,
>
> I was reading the sticky assignor code as shown in JIRA below.
>
>
> =====================================================================================
> Pseudo-code sketch of the algorithm:
> C_f := (P/N)_floor, the floor capacity
> C_c := (P/N)_ceil, the ceiling capacity
>
> members := the sorted set of all consumers
> partitions := the set of all partitions
> unassigned_partitions := the set of partitions not yet assigned,
> initialized to be all partitions
> unfilled_members := the set of consumers not yet at capacity, initialized
> to empty
> max_capacity_members := the set of members with exactly C_c partitions
> assigned, initialized to empty
> member.owned_partitions := the set of previously owned partitions encoded
> in the Subscription
>
> // Reassign as many previously owned partitions as possible
> for member : members
>         remove any partitions that are no longer in the subscription from
> its owned partitions
>         remove all owned_partitions if the generation is old
>         if member.owned_partitions.size < C_f
>             assign all owned partitions to member and remove from
> unassigned_partitions
>             add member to unfilled_members
>         else if member.owned_partitions.size == C_f
>             assign first C_f owned_partitions to member and remove from
> unassigned_partitions
>         else
>             assign first C_c owned_partitions to member and remove from
> unassigned_partitions
>             add member to max_capacity_members
>
> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0,
> t0_p1, t1_p0 .... (for data parallelism)
> sort unfilled_members by memberId (for determinism)
>
> // Fill remaining members up to C_f
> for member : unfilled_members
>     compute the remaining capacity as C = C_f - num_assigned_partitions
>     pop the first C partitions from unassigned_partitions and assign to
> member
>
> // Steal partitions from members with max_capacity if necessary
> if we run out of partitions before getting to the end of unfilled members:
>     for member : unfilled_members
>         poll for first member in max_capacity_members and remove one
> partition
>         assign this partition to the unfilled member
>
> // Distribute remaining partitions, one per consumer, to fill some up to
> C_c if necessary
> if we run out of unfilled_members before assigning all partitions:
>     for partition : unassigned_partitions
>         assign to next member in members that is not in
> max_capacity_members (then add member to max_capacity_members)
>
> =========================================================================================================================
>
>
> Wouldn't it be better if we sort the unfilled_members by nb of partitions
> assigned  instead of member id. Indeed, I was hand-debugging the code with
> following :3 consumers 10 partitions .... and the reason for rebelancing is
> the addition of a new consumer C4 .... say C1 has 3 partitions, C2 has 5
> partitions and C3 has 3 partitions.... the case where C4 shall handle more
> partitions than C3 (as C4 has less assigned partitions is not
> considered...) and this is the case for in general when partitions are to
> be reassigned/repacked : less partitions assigned consumers are not
> prioritrized .... Does that make sense? am I missing something?
>
> Thank you.
>
>

Reply via email to