cadonna commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r966991995
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final long now, for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) { Set<TopicPartition> inputPartitions; if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { - recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions); + recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseDirty(task.id())) { tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); transitRestoredTaskToRunning(task, now, offsetResetter); + } else if (tasks.removePendingActiveTaskToSuspend(task.id())) { + task.suspend(); + tasks.addTask(task); Review Comment: We still need the set of pending active tasks to suspend because an iteration of the poll loop might happen between the calls to `handleRevocation()` and `handleAssignment()`. During that iteration a revoked active task might be removed from the state updater because it is restored and processed which would violate the assumption that revoked tasks are not processed (i.e., commit offsets) anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org