[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521853#comment-17521853 ]
Konstantine Karantasis commented on KAFKA-12495: ------------------------------------------------ Thanks for documenting the issue in detail [~showuon]. Adding here the comment I added to the PR. My main concern is indeed related to the proposed change to apply consecutive rebalances that will perform revocations. The current incremental cooperative rebalancing algorithm, is using two consecutive rebalances in order to move tasks between workers. One rebalance during which revocations are happening and one during which the revoked tasks are reassigned. Although clearly this is not an atomic process (as this issue also demonstrates) I find that it's a good property to maintain and reason about. Allowing for consecutive revocations that happen immediately when an imbalance is detected might mean that the workers overreact to external circumstances that have caused an imbalanced between the initial calculation of task assignments of the revocation rebalance and the subsequent rebalance for the assignment of revoked tasks. Such circumstances might have to do with rolling upgrades, scaling a cluster up or down or simply might be caused by temporary instability. We were first able to reproduce this issue in integration tests by the test that is currently disabled. My main thought was that, instead of risking shuffling tasks too aggressively within a short period of time and open the door to bugs that will make workers oscillate between imbalanced task assignments continuously and in a tight loop, we could use the existing mechanism of scheduling delayed rebalances to program workers to perform a pair of rebalanced (revocation + reassignment) soon after an imbalance is detected. Regarding when an imbalance is detected, the good news is that the leader worker sending the assignment during the second rebalance of a pair of rebalances knows that it will send an imbalanced assignment (there's no code to detect right now that but can be easily added just before the assignment is sent). The idea here would be to send this assignment anyways, but also schedule a follow up rebalance that will have the opportunity to balance tasks soon with our standard pair of rebalances that works dependably as long as no new workers are added or removed between the two rebalances. We can discuss what is a good setting for the delay. One obvious possibility is to reuse the existing property. Adding another config just for that seems unwarranted. To shield ourselves from infinite such rebalances the leader should also keep track of how many such attempts have been made and stop attempting to balance out tasks after a certain number of tries. Of course every other normal rebalance should reset both this counter and possibly the delay. I'd be interested to hear what do you think of this approach that is quite similar to what you have demonstrated already but potentially less risky in terms of changes in the assignor logic and how aggressively the leader attempts to fix an imbalance. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Blocker > Fix For: 3.2.0 > > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4] > W4 joins with assignment: [BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > // (final) We assigned all the previous revoked Connectors/Tasks to the > members > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: []) > {code} > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. > > Note2: this issue makes KAFKA-12283 test flaky. -- This message was sent by Atlassian Jira (v8.20.1#820001)