Eswarar Siva created KAFKA-20721:
------------------------------------
Summary: Streams state updater: remove() during rebalance
revocation can complete with null and throw a fatal "Task was not found in the
state updater. This indicates a bug."
Key: KAFKA-20721
URL: https://issues.apache.org/jira/browse/KAFKA-20721
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 4.2.1, 4.1.2, 4.3.0
Environment: Kafka Streams, exactly_once_v2, default state updater,
several stateful tasks. Hit during a cold start with a lot of rebalancing.
Verified the code at 4.1.2, 4.2.1, 4.3.0 and trunk.
Reporter: Eswarar Siva
Assignee: Eswarar Siva
Attachments: captured-stacktrace.txt, task-not-found-repro.patch
Ran into this on a Streams app running exactly_once_v2, during a cold start
while a bunch of stateful tasks were
still restoring and the group kept rebalancing. One of the StreamThreads just
dies with:
{noformat}
java.lang.IllegalStateException: Task 1_2 was not found in the state updater.
This indicates a bug.
Please report at https://issues.apache.org/jira/projects/KAFKA/issues ...
at
org.apache.kafka.streams.processor.internals.TaskManager.waitForFuture(TaskManager.java:711)
at
org.apache.kafka.streams.processor.internals.TaskManager.lambda$getNonFailedTasks$10(TaskManager.java:672)
at
org.apache.kafka.streams.processor.internals.TaskManager.revokeTasksInStateUpdater(TaskManager.java:1168)
at
org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:1058)
{noformat}
It is thrown inside the rebalance callback, so it comes out as "User rebalance
callback throws an error", the
default handler does SHUTDOWN_CLIENT, and the client goes REBALANCING to
PENDING_ERROR. In my case there was also a
native RocksDB SIGSEGV on the way down. The thread does not recover without a
restart.
After digging into it this is a race, not a real "impossible" state.
revokeTasksInStateUpdater walks stateUpdater.tasks() and calls
stateUpdater.remove(taskId) for every revoked active
task it still sees. tasks() (streamOfNonPausedTasks) also includes tasks
sitting in restoredActiveTasks, meaning
ones that already finished restoring but the stream thread has not drained yet.
If that task leaves every one of the
updater collections (updatingTasks, pausedTasks, restoredActiveTasks,
exceptionsAndFailedTasks) before the state
updater thread actually processes the queued remove (drained by
handleRestoredTasksFromStateUpdater, or pulled out
by another action during the churn), removeTask matches none of the four
branches and does future.complete(null).
waitForFuture then sees the null and throws the fatal "This indicates a bug".
So the task was just already gone by the time the remove ran. That is a benign
lost race and it should not kill the
thread.
It is still there on the latest code. I read the source at 4.1.2, 4.2.1, 4.3.0
and trunk: the throw in waitForFuture
and the future.complete(null) in removeTask are the same, and the
tasks()/streamOfNonPausedTasks that exposes a
restored task is identical between 4.3.0 and trunk. So upgrading does not help.
h3. How to reproduce
I could not get a single thread schedule to fire it reliably (it is a timing
race), so I split it into two
deterministic tests, both green on trunk. Patch attached.
* DefaultStateUpdaterTest: real DefaultStateUpdater, let a task finish
restoring so it lands in restoredActiveTasks,
confirm tasks() still reports it, call drainRestoredActiveTasks(), then
remove(id).get() comes back null.
* TaskManagerTest: drive the real handleRevocation -> revokeTasksInStateUpdater
with tasks() reporting the task and
its remove() future completing null (the outcome the first test shows is real),
and it throws the IllegalStateException
above. The stack trace at the top is the real one from this test.
h3. Fix idea
Treat "task is not in any collection when remove runs" as already removed
instead of a fatal bug. Either have
remove() finish with an explicit already removed result that
revokeTasksInStateUpdater / getNonFailedTasks skip, or
let waitForFuture log and skip on a null result for the revoke path instead of
throwing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)