cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966991995


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final 
long now,
         for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) 
{
             Set<TopicPartition> inputPartitions;
             if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
-                recycleTask(task, inputPartitions, tasksToCloseDirty, 
taskExceptions);
+                recycleTaskFromStateUpdater(task, inputPartitions, 
tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
                 tasksToCloseDirty.add(task);
             } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
                 transitRestoredTaskToRunning(task, now, offsetResetter);
+            } else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+                task.suspend();
+                tasks.addTask(task);

Review Comment:
   We still need the set of pending active tasks to suspend because an 
iteration of the poll loop might happen between the calls to 
`handleRevocation()` and `handleAssignment()`. During that iteration a revoked 
active task might be removed from the state updater because it is restored and 
processed which would violate the assumption that revoked tasks are not 
processed (i.e., commit offsets) anymore.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to