guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r464684660



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -267,80 +266,26 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                tasksToClose.add(task);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during 
#handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
-                }
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format(
-                    "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                    task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of 
clean).
-                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-                dirtyTasks.add(task);
+                tasksToCloseClean.add(task);
             }
         }
 
-        for (final Task oldTask : tasksToRecycle) {
-            final Task newTask;
-            try {
-                if (oldTask.isActive()) {
-                    if (!oldTask.state().equals(State.SUSPENDED)) {
-                        // Active tasks are revoked and suspended/committed 
during #handleRevocation
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  oldTask.id(), oldTask.state());
-                        throw new IllegalStateException("Active task " + 
oldTask.id() + " should have been suspended");
-                    }
-                    final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
-                    newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
-                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
-                } else {
-                    oldTask.suspend();
-                    oldTask.prepareCommit();
-                    oldTask.postCommit();
-                    final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
-                    newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
-                }
-                tasks.remove(oldTask.id());
-                addNewTask(newTask);
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(oldTask.id(), e);
-                dirtyTasks.add(oldTask);
-            }
-        }
+        // close and recycle those tasks
+        handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, 
tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, 
taskCloseExceptions);
 
-        for (final Task task : dirtyTasks) {
+        // for tasks that cannot be cleanly closed or recycled, close them 
dirty
+        for (final Task task : tasksToCloseDirty) {

Review comment:
       That's a good point, I will update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to