cadonna commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r966987932
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartiti } } - private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, - final Map<Task, Set<TopicPartition>> tasksToRecycle, - final Set<Task> tasksToCloseClean) { + private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Map<Task, Set<TopicPartition>> tasksToRecycle, + final Set<Task> tasksToCloseClean) { + handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); + handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate); + } + + private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, + final Map<Task, Set<TopicPartition>> tasksToRecycle, + final Set<Task> tasksToCloseClean) { for (final Task task : tasks.allTasks()) { + if (!task.isActive()) { + throw new IllegalStateException("Standby tasks should only be managed by the state updater"); + } final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { - if (task.isActive()) { - final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId); - if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { - task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); - } - task.resume(); - } else { - throw new IllegalStateException("Standby tasks should only be managed by the state updater"); - } + handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { - if (!task.isActive()) { - throw new IllegalStateException("Standby tasks should only be managed by the state updater"); - } else { - tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); - } + tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } - private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, - final Map<Task, Set<TopicPartition>> tasksToRecycle, - final Set<Task> tasksToCloseClean) { - classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); + private void handleReAssignedActiveTask(final Task task, + final Set<TopicPartition> inputPartitions) { + if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); + } + if (task.state() == State.SUSPENDED) { + task.resume(); + moveTaskFromTasksRegistryToStateUpdater(task); + } + } + + private void moveTaskFromTasksRegistryToStateUpdater(final Task task) { + tasks.removeTask(task); + stateUpdater.add(task); + } + + private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) { for (final Task task : stateUpdater.getTasks()) { final TaskId taskId = task.id(); - final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId); + tasks.removePendingActiveTaskToSuspend(taskId); Review Comment: We try to remove the task in the state updater from the set of pending tasks to suspend in any case. A revoked active task will be handled in one of the following ways: - reassigned as active with unmodified input partitions: task will stay in the state updater - reassigned as active with modified input partitions: task will be removed from the state updater and added to the set of pending tasks that need updating input partitions - reassigned as standby: task is removed from the state updater and added to the pending tasks to recycle. - not reassigned: task is removed from the state updater and added to the pending tasks to close cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org