showuon commented on code in PR #12561: URL: https://github.com/apache/kafka/pull/12561#discussion_r963289740
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -843,12 +1002,6 @@ public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { performStandardRebalance(); assertDelay(0); assertConnectorAllocations(1, 1); - assertTaskAllocations(2, 4); - - // fourth rebalance after revocations Review Comment: The comment in L1008 should change to // `Fourth` rebalance should not change assignments ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, newSubmissions.tasks().addAll(lostAssignments.tasks()); } resetDelay(); + // Resetting the flag as now we can permit successive revoking rebalances. + // since we have gone through the full rebalance delay + revokedInPrevious = false; Review Comment: Should we also reset `numSuccessiveRevokingRebalances = 0;`? Otherwise, after another successive revoking, the exponential backoff will increase immediately, right? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void testAssignmentsWhenWorkersJoinAfterRevocations() { + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance. We should not perform any revocations + // in this round + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); + + // Fourth assignment and a fourth worker joining + // Since the worker is joining immediately and within the rebalance delay + // there should not be any revoking rebalance + addNewEmptyWorkers("worker4"); + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4"); + assertConnectorAllocations(0, 0, 1, 2); + assertTaskAllocations(0, 3, 3, 6); + + // Add new worker immediately. Since a scheduled rebalance is in progress, + // There should still not be be any revocations + addNewEmptyWorkers("worker5"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5"); + assertConnectorAllocations(0, 0, 0, 1, 2); + assertTaskAllocations(0, 0, 3, 3, 6); + + // Add new worker but this time after crossing the delay. + // There would be revocations allowed + time.sleep(assignor.delay); + addNewEmptyWorkers("worker6"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 0, 1, 1); + assertTaskAllocations(0, 0, 0, 2, 2, 2); + + // Follow up rebalance since there were revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 1, 1, 1); + assertTaskAllocations(2, 2, 2, 2, 2, 2); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testImmediateRevocationsWhenMaxDelayIs0() { + + // Customize assignor for this test case + rebalanceDelay = 0; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0 + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 1); + assertTaskAllocations(3, 3, 4); + + // Follow up rebalance post revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(1, 1, 1); + assertTaskAllocations(4, 4, 4); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() { + + rebalanceDelay = 1; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertDelay(1); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); + + // Follow up rebalance post revocations. No revocations should have happened + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); Review Comment: Why is there the follow-up rebalance? We don't have revocation happened in previous round, do we? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void testAssignmentsWhenWorkersJoinAfterRevocations() { + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance. We should not perform any revocations + // in this round + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); + + // Fourth assignment and a fourth worker joining + // Since the worker is joining immediately and within the rebalance delay + // there should not be any revoking rebalance + addNewEmptyWorkers("worker4"); + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4"); + assertConnectorAllocations(0, 0, 1, 2); + assertTaskAllocations(0, 3, 3, 6); + + // Add new worker immediately. Since a scheduled rebalance is in progress, + // There should still not be be any revocations + addNewEmptyWorkers("worker5"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5"); + assertConnectorAllocations(0, 0, 0, 1, 2); + assertTaskAllocations(0, 0, 3, 3, 6); + + // Add new worker but this time after crossing the delay. + // There would be revocations allowed + time.sleep(assignor.delay); + addNewEmptyWorkers("worker6"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 0, 1, 1); + assertTaskAllocations(0, 0, 0, 2, 2, 2); + + // Follow up rebalance since there were revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 1, 1, 1); + assertTaskAllocations(2, 2, 2, 2, 2, 2); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testImmediateRevocationsWhenMaxDelayIs0() { + + // Customize assignor for this test case + rebalanceDelay = 0; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0 + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 1); + assertTaskAllocations(3, 3, 4); + + // Follow up rebalance post revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(1, 1, 1); + assertTaskAllocations(4, 4, 4); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() { + + rebalanceDelay = 1; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned Review Comment: 3 connectors? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -877,15 +1030,9 @@ public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() assertDelay(0); assertWorkers("worker1", "worker2"); assertConnectorAllocations(0, 1); - assertTaskAllocations(0, 4); - - // Third assignment after revocations - performStandardRebalance(); - assertDelay(0); - assertConnectorAllocations(0, 1); assertTaskAllocations(0, 2); - // fourth rebalance after revocations + // Third assignment after revocations Review Comment: The comment in L1042 should change to // Fourth rebalance should not change assignments ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void testAssignmentsWhenWorkersJoinAfterRevocations() { + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned Review Comment: 3 connectors? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void testAssignmentsWhenWorkersJoinAfterRevocations() { + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance. We should not perform any revocations + // in this round + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); + + // Fourth assignment and a fourth worker joining + // Since the worker is joining immediately and within the rebalance delay + // there should not be any revoking rebalance + addNewEmptyWorkers("worker4"); + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4"); + assertConnectorAllocations(0, 0, 1, 2); + assertTaskAllocations(0, 3, 3, 6); + + // Add new worker immediately. Since a scheduled rebalance is in progress, + // There should still not be be any revocations + addNewEmptyWorkers("worker5"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5"); + assertConnectorAllocations(0, 0, 0, 1, 2); + assertTaskAllocations(0, 0, 3, 3, 6); + + // Add new worker but this time after crossing the delay. + // There would be revocations allowed + time.sleep(assignor.delay); + addNewEmptyWorkers("worker6"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 0, 1, 1); + assertTaskAllocations(0, 0, 0, 2, 2, 2); + + // Follow up rebalance since there were revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 1, 1, 1); + assertTaskAllocations(2, 2, 2, 2, 2, 2); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testImmediateRevocationsWhenMaxDelayIs0() { + + // Customize assignor for this test case + rebalanceDelay = 0; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned Review Comment: 3 connectors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org