Eswarar Siva created KAFKA-20731:
------------------------------------

             Summary: Streams: transitToUpdateStandby is called without 
checking ACTIVE_RESTORING, crashing the state updater thread
                 Key: KAFKA-20731
                 URL: https://issues.apache.org/jira/browse/KAFKA-20731
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 4.1.2, 4.3.0
            Reporter: Eswarar Siva


The state updater thread can die with (captured on 4.3.0):

java.lang.IllegalStateException: The changelog reader is not restoring active 
tasks (is STANDBY_UPDATING) while trying to
transit to update standby tasks: {...}
    at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.transitToUpdateStandby(StoreChangelogReader.java:326)
    at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.resumeTask(DefaultStateUpdater.java:661)
    at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.resumeTasks(DefaultStateUpdater.java:241)
    at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.runOnce(DefaultStateUpdater.java:197)
    at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.run(DefaultStateUpdater.java:163)

transitToUpdateStandby() is documented as not idempotent and throws when the 
reader state is not ACTIVE_RESTORING. Two call sites on the state updater 
thread call it without checking the state first:

1. resumeTask, in the standby branch: when a standby task is resumed and 
updatingTasks.size() == 1,
   it calls changelogReader.transitToUpdateStandby() with no guard on the 
reader state.
2. transitToUpdateStandbysIfOnlyStandbysLeft: when onlyStandbyTasksUpdating() 
is true it calls
   transitToUpdateStandby() with no guard on the reader state.

This is a single thread problem and needs no cross thread race. Ordering on the 
state updater thread:

-> the reader is already STANDBY_UPDATING, which is the normal state once only 
standbys are left,
-> the updating tasks are paused (pauseTask does not move the reader back to 
ACTIVE_RESTORING),
-> one standby is resumed, updatingTasks.size() becomes 1, resumeTask calls 
transitToUpdateStandby() on a reader that is already STANDBY_UPDATING, and it 
throws, killing the state updater thread.

Reproduced on 4.1.2 and 4.3.0 with a standalone Streams application under 
topology pause and resume churn (the same reproducer used for KAFKA-20724). The 
code path is unchanged on trunk.

Relationship to KAFKA-17946: that ticket was the flaky test 
shouldResumeStandbyTask. Its fix (PR #18253) stabilized the test but did not 
change resumeTask or transitToUpdateStandby, so the production transition is 
still not idempotent and still reachable.

Suggested fix: preserve the invariant that transitToUpdateStandby is only 
called when the reader is ACTIVE_RESTORING. Guard both call sites with 
isRestoringActive(), or make transitToUpdateStandby idempotent so it is a no op 
when the reader is already STANDBY_UPDATING.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to