ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r431482714



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -272,6 +274,30 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             }
         }
 
+        if (taskCloseExceptions.isEmpty()) {
+            for (final Task oldTask : tasksToRecycle) {
+                final Task newTask;
+                try {
+                    if (oldTask.isActive()) {
+                        final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
+                        newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    } else {
+                        final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
+                        newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
+                    }
+                    tasks.remove(oldTask.id());
+                    addNewTask(newTask);
+                } catch (final RuntimeException e) {
+                    final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. Attempting to close remaining tasks before 
re-throwing:", oldTask.id());
+                    log.error(uncleanMessage, e);
+                    taskCloseExceptions.put(oldTask.id(), e);
+                    dirtyTasks.add(oldTask);
+                }
+            }
+        } else {
+            dirtyTasks.addAll(tasksToRecycle);
+        }

Review comment:
       Yeah, it still seems to me like if we have to close any tasks as dirty 
we will ultimately have to do so for them all (as in `handleLostAll`) But 
that's a big assumption and even if true now, it may not always be...I'll 
remove this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to