Lucas Brutschy created KAFKA-20724:
--------------------------------------
Summary: Topology pausing may have race condition
Key: KAFKA-20724
URL: https://issues.apache.org/jira/browse/KAFKA-20724
Project: Kafka
Issue Type: Task
Components: streams
Reporter: Lucas Brutschy
h2. Summary
{{DefaultStateUpdater.tasks()}} can return the same task twice when a topology
pause or resume is in progress, causing {{handleAssignment}} and
{{handleLostAll}} to queue two REMOVE actions for the same task ID. The second
REMOVE finds nothing and completes its future with null.
{{TaskManager.waitForFuture}} treats null as a fatal bug and throws
{{{}IllegalStateException{}}}, killing the StreamThread.
h2. Root cause
{{pauseTask}} and {{resumeTask}} in {{StateUpdaterThread}} move tasks between
{{updatingTasks}} and {{pausedTasks}} with no lock held:
{{// pauseTask — no lock
pausedTasks.put(taskId, task); // task now visible in pausedTasks
updatingTasks.remove(taskId); // task still visible in updatingTasks until
here}}
{{// resumeTask — no lock
updatingTasks.put(taskId, task); // task now visible in updatingTasks
pausedTasks.remove(taskId); // task still visible in pausedTasks until
here}}
{{DefaultStateUpdater.tasks()}} calls {{{}executeWithQueuesLocked{}}}, which
holds {{{}tasksAndActionsLock{}}}, {{{}restoredActiveTasksLock{}}}, and
{{exceptionsAndFailedTasksLock}} — but no lock covering {{updatingTasks}} or
{{{}pausedTasks{}}}. {{streamOfTasks()}} streams {{updatingTasks}} and
{{pausedTasks}} as two separate sub-streams. If either of the above transitions
is in flight, the task appears in both, and {{tasks()}} returns two distinct
{{ReadOnlyTask}} wrappers (no {{{}equals{}}}/{{{}hashCode{}}} override on
{{{}ReadOnlyTask{}}}).
h2. Why that becomes fatal
{{handleTasksInStateUpdater}} (called from {{{}handleAssignment{}}}) and
{{removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit}} (called from
{{{}handleLostAll{}}}) iterate {{stateUpdater.tasks()}} and queue a REMOVE per
task into a {{{}LinkedHashMap<TaskId, CompletableFuture>{}}}. A duplicate task
ID means {{put}} silently overwrites the first future. Two REMOVE actions are
now in the queue but only the second future is tracked.
The StateUpdater thread processes both REMOVEs. The first finds the task and
removes it (completes the orphaned first future that nobody waits on). The
second finds nothing and calls {{{}future.complete(null){}}}. Back on the
StreamThread, {{waitForFuture}} sees null and throws:
{{IllegalStateException: Task X was not found in the state updater. This
indicates a bug.}}
This propagates out of the rebalance callback, the default uncaught exception
handler shuts down the client.
h2. Affected paths
* {{handleAssignment}} → {{handleRestoringAndUpdatingTasks}} →
{{handleTasksInStateUpdater}} (line 606): triggered on every assignment when
topology is paused/resuming.
* {{handleLostAll}} →
{{removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit}} (line 1230):
triggered on producer fence with paused topology.
{{revokeTasksInStateUpdater}} is NOT affected: it removes matched partitions
from {{remainingRevokedPartitions}} after each task, so the second occurrence
of a duplicate task fails the {{containsAll}} guard and no second REMOVE is
queued.
h2. Fix
Make {{pauseTask}} and {{resumeTask}} hold {{restoredActiveTasksLock}} (or a
dedicated lock) during the two-step transition, consistent with how
{{addToRestoredTasks}} and
{{addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks}} protect their
equivalent transitions. Alternatively, add a guard in
{{handleTasksInStateUpdater}} (and the lost-all path) analogous to the
{{remainingRevokedPartitions}} guard in {{revokeTasksInStateUpdater}} — skip a
task ID already present in the futures map.
h2. Reproduction condition
Any topology that calls {{KafkaStreams.pauseTopology()}} or
{{resumeTopology()}} during a rebalance. The window is two
{{ConcurrentHashMap}} operations wide, so it requires timing luck but no
injected delay.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)