guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r464684660
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -267,80 +266,26 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { - tasksToClose.add(task); - } - } - - for (final Task task : tasksToClose) { - try { - if (task.isActive()) { - // Active tasks are revoked and suspended/committed during #handleRevocation - if (!task.state().equals(State.SUSPENDED)) { - log.error("Active task {} should be suspended prior to attempting to close but was in {}", - task.id(), task.state()); - throw new IllegalStateException("Active task " + task.id() + " should have been suspended"); - } - } else { - task.suspend(); - task.prepareCommit(); - task.postCommit(); - } - completeTaskCloseClean(task); - cleanUpTaskProducer(task, taskCloseExceptions); - tasks.remove(task.id()); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format( - "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", - task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); + tasksToCloseClean.add(task); } } - for (final Task oldTask : tasksToRecycle) { - final Task newTask; - try { - if (oldTask.isActive()) { - if (!oldTask.state().equals(State.SUSPENDED)) { - // Active tasks are revoked and suspended/committed during #handleRevocation - log.error("Active task {} should be suspended prior to attempting to close but was in {}", - oldTask.id(), oldTask.state()); - throw new IllegalStateException("Active task " + oldTask.id() + " should have been suspended"); - } - final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); - newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); - cleanUpTaskProducer(oldTask, taskCloseExceptions); - } else { - oldTask.suspend(); - oldTask.prepareCommit(); - oldTask.postCommit(); - final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id()); - newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); - } - tasks.remove(oldTask.id()); - addNewTask(newTask); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(oldTask.id(), e); - dirtyTasks.add(oldTask); - } - } + // close and recycle those tasks + handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, taskCloseExceptions); - for (final Task task : dirtyTasks) { + // for tasks that cannot be cleanly closed or recycled, close them dirty + for (final Task task : tasksToCloseDirty) { Review comment: That's a good point, I will update. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org