Eswarar Siva created KAFKA-20721:
------------------------------------

             Summary: Streams state updater: remove() during rebalance 
revocation can complete with null and throw a fatal "Task was not found in the 
state updater. This indicates a bug."
                 Key: KAFKA-20721
                 URL: https://issues.apache.org/jira/browse/KAFKA-20721
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 4.2.1, 4.1.2, 4.3.0
         Environment: Kafka Streams, exactly_once_v2, default state updater, 
several stateful tasks. Hit during a cold start with a lot of rebalancing. 
Verified the code at 4.1.2, 4.2.1, 4.3.0 and trunk.
            Reporter: Eswarar Siva
            Assignee: Eswarar Siva
         Attachments: captured-stacktrace.txt, task-not-found-repro.patch

Ran into this on a Streams app running exactly_once_v2, during a cold start 
while a bunch of stateful tasks were
still restoring and the group kept rebalancing. One of the StreamThreads just 
dies with:

{noformat}
java.lang.IllegalStateException: Task 1_2 was not found in the state updater. 
This indicates a bug.
  Please report at https://issues.apache.org/jira/projects/KAFKA/issues ...
    at 
org.apache.kafka.streams.processor.internals.TaskManager.waitForFuture(TaskManager.java:711)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.lambda$getNonFailedTasks$10(TaskManager.java:672)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.revokeTasksInStateUpdater(TaskManager.java:1168)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:1058)
{noformat}

It is thrown inside the rebalance callback, so it comes out as "User rebalance 
callback throws an error", the
default handler does SHUTDOWN_CLIENT, and the client goes REBALANCING to 
PENDING_ERROR. In my case there was also a
native RocksDB SIGSEGV on the way down. The thread does not recover without a 
restart.

After digging into it this is a race, not a real "impossible" state.

revokeTasksInStateUpdater walks stateUpdater.tasks() and calls 
stateUpdater.remove(taskId) for every revoked active
task it still sees. tasks() (streamOfNonPausedTasks) also includes tasks 
sitting in restoredActiveTasks, meaning
ones that already finished restoring but the stream thread has not drained yet. 
If that task leaves every one of the
updater collections (updatingTasks, pausedTasks, restoredActiveTasks, 
exceptionsAndFailedTasks) before the state
updater thread actually processes the queued remove (drained by 
handleRestoredTasksFromStateUpdater, or pulled out
by another action during the churn), removeTask matches none of the four 
branches and does future.complete(null).
waitForFuture then sees the null and throws the fatal "This indicates a bug".

So the task was just already gone by the time the remove ran. That is a benign 
lost race and it should not kill the
thread.

It is still there on the latest code. I read the source at 4.1.2, 4.2.1, 4.3.0 
and trunk: the throw in waitForFuture
and the future.complete(null) in removeTask are the same, and the 
tasks()/streamOfNonPausedTasks that exposes a
restored task is identical between 4.3.0 and trunk. So upgrading does not help.

h3. How to reproduce

I could not get a single thread schedule to fire it reliably (it is a timing 
race), so I split it into two
deterministic tests, both green on trunk. Patch attached.

* DefaultStateUpdaterTest: real DefaultStateUpdater, let a task finish 
restoring so it lands in restoredActiveTasks,
confirm tasks() still reports it, call drainRestoredActiveTasks(), then 
remove(id).get() comes back null.
* TaskManagerTest: drive the real handleRevocation -> revokeTasksInStateUpdater 
with tasks() reporting the task and
its remove() future completing null (the outcome the first test shows is real), 
and it throws the IllegalStateException
above. The stack trace at the top is the real one from this test.

h3. Fix idea

Treat "task is not in any collection when remove runs" as already removed 
instead of a fatal bug. Either have
remove() finish with an explicit already removed result that 
revokeTasksInStateUpdater / getNonFailedTasks skip, or
let waitForFuture log and skip on a null result for the revoke path instead of 
throwing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to