guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r466105496
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -267,80 +266,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { - tasksToClose.add(task); - } - } - - for (final Task task : tasksToClose) { - try { - if (task.isActive()) { - // Active tasks are revoked and suspended/committed during #handleRevocation - if (!task.state().equals(State.SUSPENDED)) { - log.error("Active task {} should be suspended prior to attempting to close but was in {}", - task.id(), task.state()); - throw new IllegalStateException("Active task " + task.id() + " should have been suspended"); - } - } else { - task.suspend(); - task.prepareCommit(); - task.postCommit(); - } - completeTaskCloseClean(task); - cleanUpTaskProducer(task, taskCloseExceptions); - tasks.remove(task.id()); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format( - "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", - task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } - } - - for (final Task oldTask : tasksToRecycle) { - final Task newTask; - try { - if (oldTask.isActive()) { - if (!oldTask.state().equals(State.SUSPENDED)) { - // Active tasks are revoked and suspended/committed during #handleRevocation - log.error("Active task {} should be suspended prior to attempting to close but was in {}", - oldTask.id(), oldTask.state()); - throw new IllegalStateException("Active task " + oldTask.id() + " should have been suspended"); - } - final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); - newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); - cleanUpTaskProducer(oldTask, taskCloseExceptions); - } else { - oldTask.suspend(); - oldTask.prepareCommit(); - oldTask.postCommit(); - final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id()); - newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); - } - tasks.remove(oldTask.id()); - addNewTask(newTask); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(oldTask.id(), e); - dirtyTasks.add(oldTask); + tasksToCloseClean.add(task); } } - for (final Task task : dirtyTasks) { - closeTaskDirty(task); - cleanUpTaskProducer(task, taskCloseExceptions); - tasks.remove(task.id()); - } + // close and recycle those tasks + handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, taskCloseExceptions); if (!taskCloseExceptions.isEmpty()) { + log.error("Hit exceptions while closing / recycling tasks: {}", taskCloseExceptions); Review comment: Sounds good. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org