guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r463790190
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -368,7 +313,96 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, addNewTask(task); } } + } + + private void handleCloseAndRecycle(final List<Task> tasksToRecycle, + final List<Task> tasksToCloseClean, + final List<Task> tasksToCloseDirty, + final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) { + if (!tasksToCloseDirty.isEmpty()) { + throw new IllegalArgumentException("Tasks to close-dirty should be empty"); + } + + // for all tasks to close or recycle, we should first right a checkpoint as in post-commit + final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean); + tasksToCheckpoint.addAll(tasksToRecycle); + for (final Task task : tasksToCheckpoint) { + try { + // Always try to first suspend and commit the task before checkpointing it; + // some tasks may already be suspended which should be a no-op. + // + // Also since active tasks should already be suspended / committed and + // standby tasks should have no offsets to commit, we should expect nothing to commit + task.suspend(); + + // Note that we are not actually committing here but just check if we need to write checkpoint file: + // 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully, + // and their changelog positions should not change at all postCommit would not write the checkpoint again. + // 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably + // write the checkpoint file. + final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit(); Review comment: I tried some ways and ended-up with explicitly specifying suspend / postCommit for `standby` tasks only, and use `prepareCommit` to check if the previous revocation has failed or not. Personally I'm happy with the current workflow now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org