cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r833117940



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,45 +108,27 @@ static int assignActiveTaskMovements(final Map<TaskId, 
SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
-                movement.task,
-                c -> clientStates.get(c).hasStandbyTask(movement.task)
-            );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the 
task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
-                );
-
-                moveActiveAndTryToWarmUp(
-                    remainingWarmupReplicas,
-                    movement.task,
-                    clientStates.get(sourceClient),
-                    clientStates.get(movement.destination),
-                    warmups.computeIfAbsent(movement.destination, x -> new 
TreeSet<>())
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, 
movement.destination));
-            } else {
-                // we found a candidate to trade standby/active state with our 
destination, so we don't need a warmup
-                swapStandbyAndActive(
-                    movement.task,
-                    clientStates.get(standbySourceClient),
-                    clientStates.get(movement.destination)
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(standbySourceClient, 
movement.destination));
+            // Attempt to find a caught up standby, otherwise find any caught 
up client, failing that use the most
+            // caught up client.
+            final boolean moved = 
tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, 
caughtUpClientsByTaskLoad, movement) ||
+                    
tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, 
remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
+                    tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, 
clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, 
movement);
+
+            if (!moved) {
+                throw new IllegalStateException("Tried to move task to more 
caught-up client but none exist");

Review comment:
       ```suggestion
                   throw new IllegalStateException("Tried to move task to more 
caught-up client as scheduled before but none exist");
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,45 +108,27 @@ static int assignActiveTaskMovements(final Map<TaskId, 
SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
-                movement.task,
-                c -> clientStates.get(c).hasStandbyTask(movement.task)
-            );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the 
task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
-                );
-
-                moveActiveAndTryToWarmUp(
-                    remainingWarmupReplicas,
-                    movement.task,
-                    clientStates.get(sourceClient),
-                    clientStates.get(movement.destination),
-                    warmups.computeIfAbsent(movement.destination, x -> new 
TreeSet<>())
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, 
movement.destination));
-            } else {
-                // we found a candidate to trade standby/active state with our 
destination, so we don't need a warmup
-                swapStandbyAndActive(
-                    movement.task,
-                    clientStates.get(standbySourceClient),
-                    clientStates.get(movement.destination)
-                );
-                caughtUpClientsByTaskLoad.offerAll(asList(standbySourceClient, 
movement.destination));
+            // Attempt to find a caught up standby, otherwise find any caught 
up client, failing that use the most
+            // caught up client.
+            final boolean moved = 
tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, 
caughtUpClientsByTaskLoad, movement) ||
+                    
tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, 
remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
+                    tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, 
clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, 
movement);

Review comment:
       Could you add tests to `TaskMovementTest` for case 
`tryToMoveActiveToMostCaughtUpClient()` that you added?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to