showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r963289740


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -843,12 +1002,6 @@ public void 
testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
         performStandardRebalance();
         assertDelay(0);
         assertConnectorAllocations(1, 1);
-        assertTaskAllocations(2, 4);
-
-        // fourth rebalance after revocations

Review Comment:
   The comment in L1008 should change to 
   // `Fourth` rebalance should not change assignments



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -520,6 +554,9 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
                 newSubmissions.tasks().addAll(lostAssignments.tasks());
             }
             resetDelay();
+            // Resetting the flag as now we can permit successive revoking 
rebalances.
+            // since we have gone through the full rebalance delay
+            revokedInPrevious = false;

Review Comment:
   Should we also reset `numSuccessiveRevokingRebalances = 0;`? Otherwise, 
after another successive revoking, the exponential backoff will increase 
immediately, right?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance. We should not perform any 
revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance 
delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in 
progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance but we should still revoke 
as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void 
testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance. We shouldn't revoke as 
maxDelay is 1 ms
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(1);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Follow up rebalance post revocations. No revocations should have 
happened
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);

Review Comment:
   Why is there the follow-up rebalance? We don't have revocation happened in 
previous round, do we? 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance. We should not perform any 
revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance 
delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in 
progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance but we should still revoke 
as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void 
testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned

Review Comment:
   3 connectors?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -877,15 +1030,9 @@ public void 
testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted()
         assertDelay(0);
         assertWorkers("worker1", "worker2");
         assertConnectorAllocations(0, 1);
-        assertTaskAllocations(0, 4);
-
-        // Third assignment after revocations
-        performStandardRebalance();
-        assertDelay(0);
-        assertConnectorAllocations(0, 1);
         assertTaskAllocations(0, 2);
 
-        // fourth rebalance after revocations
+        // Third assignment after revocations

Review Comment:
   The comment in L1042 should change to 
   // Fourth rebalance should not change assignments



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned

Review Comment:
   3 connectors?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance. We should not perform any 
revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance 
delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in 
progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned

Review Comment:
   3 connectors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to