kkonstantine commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r512487036



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -260,7 +259,7 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Do not revoke resources for re-assignment while a delayed rebalance 
is active
         // Also we do not revoke in two consecutive rebalances by the same 
leader
         canRevoke = delay == 0 && canRevoke;
-
+        log.debug("Connector and task to revoke assignment post load balancer 
calculation: {}", toRevoke);

Review comment:
       Why do we print the assignment again here. They don't seem to change 
since we log them[ in line 
249](https://github.com/apache/kafka/pull/9319/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4L249)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
 
-            if (candidateWorkerLoad.isPresent()) {
-                WorkerLoad workerLoad = candidateWorkerLoad.get();
-                log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-                lostAssignments.connectors().forEach(workerLoad::assign);
-                lostAssignments.tasks().forEach(workerLoad::assign);
+            if (!candidateWorkerLoad.isEmpty()) {
+                log.debug("Assigning lost tasks to {} candidate workers: {}", 
+                        candidateWorkerLoad.size(),
+                        
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+                Iterator<WorkerLoad> candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                for (String connector : lostAssignments.connectors()) {
+                    // Loop over the the candidate workers as many times as it 
takes

Review comment:
       So we balance the lost tasks among the new workers now. That balances 
the tasks but only among the new workers. Have you checked how this works with 
task revocation called right after?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +596,14 @@ private void resetDelay() {
         numToRevoke = floorTasks;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            numToRevoke = existing.tasksSize() - ceilTasks;

Review comment:
       Also, we apply this logic in tasks only. But why not in the connectors 
too, if it helps?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +595,14 @@ private void resetDelay() {
         numToRevoke = floorTasks;

Review comment:
       This assignment is unused now. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -577,15 +596,14 @@ private void resetDelay() {
         numToRevoke = floorTasks;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            numToRevoke = existing.tasksSize() - ceilTasks;

Review comment:
       Can you explain a bit what you aim to achieve with this change here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to