Lucas Brutschy created KAFKA-20724:
--------------------------------------

             Summary: Topology pausing may have race condition
                 Key: KAFKA-20724
                 URL: https://issues.apache.org/jira/browse/KAFKA-20724
             Project: Kafka
          Issue Type: Task
          Components: streams
            Reporter: Lucas Brutschy


h2. Summary

{{DefaultStateUpdater.tasks()}} can return the same task twice when a topology 
pause or resume is in progress, causing {{handleAssignment}} and 
{{handleLostAll}} to queue two REMOVE actions for the same task ID. The second 
REMOVE finds nothing and completes its future with null. 
{{TaskManager.waitForFuture}} treats null as a fatal bug and throws 
{{{}IllegalStateException{}}}, killing the StreamThread.
h2. Root cause

{{pauseTask}} and {{resumeTask}} in {{StateUpdaterThread}} move tasks between 
{{updatingTasks}} and {{pausedTasks}} with no lock held:

 

{{// pauseTask — no lock
pausedTasks.put(taskId, task);   // task now visible in pausedTasks
updatingTasks.remove(taskId);    // task still visible in updatingTasks until 
here}}

 

{{// resumeTask — no lock
updatingTasks.put(taskId, task); // task now visible in updatingTasks
pausedTasks.remove(taskId);      // task still visible in pausedTasks until 
here}}

{{DefaultStateUpdater.tasks()}} calls {{{}executeWithQueuesLocked{}}}, which 
holds {{{}tasksAndActionsLock{}}}, {{{}restoredActiveTasksLock{}}}, and 
{{exceptionsAndFailedTasksLock}} — but no lock covering {{updatingTasks}} or 
{{{}pausedTasks{}}}. {{streamOfTasks()}} streams {{updatingTasks}} and 
{{pausedTasks}} as two separate sub-streams. If either of the above transitions 
is in flight, the task appears in both, and {{tasks()}} returns two distinct 
{{ReadOnlyTask}} wrappers (no {{{}equals{}}}/{{{}hashCode{}}} override on 
{{{}ReadOnlyTask{}}}).
h2. Why that becomes fatal

{{handleTasksInStateUpdater}} (called from {{{}handleAssignment{}}}) and 
{{removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit}} (called from 
{{{}handleLostAll{}}}) iterate {{stateUpdater.tasks()}} and queue a REMOVE per 
task into a {{{}LinkedHashMap<TaskId, CompletableFuture>{}}}. A duplicate task 
ID means {{put}} silently overwrites the first future. Two REMOVE actions are 
now in the queue but only the second future is tracked.

The StateUpdater thread processes both REMOVEs. The first finds the task and 
removes it (completes the orphaned first future that nobody waits on). The 
second finds nothing and calls {{{}future.complete(null){}}}. Back on the 
StreamThread, {{waitForFuture}} sees null and throws:

 

{{IllegalStateException: Task X was not found in the state updater. This 
indicates a bug.}}

This propagates out of the rebalance callback, the default uncaught exception 
handler shuts down the client.
h2. Affected paths
 * {{handleAssignment}} → {{handleRestoringAndUpdatingTasks}} → 
{{handleTasksInStateUpdater}} (line 606): triggered on every assignment when 
topology is paused/resuming.
 * {{handleLostAll}} → 
{{removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit}} (line 1230): 
triggered on producer fence with paused topology.

{{revokeTasksInStateUpdater}} is NOT affected: it removes matched partitions 
from {{remainingRevokedPartitions}} after each task, so the second occurrence 
of a duplicate task fails the {{containsAll}} guard and no second REMOVE is 
queued.
h2. Fix

Make {{pauseTask}} and {{resumeTask}} hold {{restoredActiveTasksLock}} (or a 
dedicated lock) during the two-step transition, consistent with how 
{{addToRestoredTasks}} and 
{{addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks}} protect their 
equivalent transitions. Alternatively, add a guard in 
{{handleTasksInStateUpdater}} (and the lost-all path) analogous to the 
{{remainingRevokedPartitions}} guard in {{revokeTasksInStateUpdater}} — skip a 
task ID already present in the futures map.
h2. Reproduction condition

Any topology that calls {{KafkaStreams.pauseTopology()}} or 
{{resumeTopology()}} during a rebalance. The window is two 
{{ConcurrentHashMap}} operations wide, so it requires timing luck but no 
injected delay.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to