[ 
https://issues.apache.org/jira/browse/KAFKA-9972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103577#comment-17103577
 ] 

Sophie Blee-Goldman commented on KAFKA-9972:
--------------------------------------------

Not exactly, I was proposing to remove the filter entirely and just handle each 
state inside Task#commit. That is, instead of throwing an exception inside 
Task#commit if the task is anything other than RUNNING or RESTORING, we just 
don't do anything. Like we do in Task#suspend, for example: the TaskManager 
shouldn't have to keep track of which task API's it's allowed to call on a task 
of a given state, it should just call the API and have the Task implementation 
choose the appropriate action based on its state.

 

We've pushed most of the task state handling into the Task implementation 
already, but some of it still bleeds into the TaskManager. It would be nice to 
be consistent and establish reasonable expectations for the TaskManager: to 
take another example, Task#initializeIfNeeded allows all task states but only 
takes any action if the state is CREATED. But Task#completeRestoration will 
throw an exception if the task is not in RESTORING. 

> Corrupted standby task could be committed
> -----------------------------------------
>
>                 Key: KAFKA-9972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9972
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> A corrupted standby task could revive and transit to the CREATED state, which 
> will then trigger by `taskManager.commitAll` in next runOnce, causing an 
> illegal state:
> ```
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,646] WARN 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException 
> fetching records from restore consumer for partitions 
> [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1], it 
> is likely that the consumer's position has fallen out of the topic partition 
> offset range because the topic was truncated or compacted on the broker, 
> marking the corresponding tasks as corrupted and re-initializing it later. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,646] WARN 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Detected the states of tasks 
> \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]}
>  are corrupted. Will close the task as dirty and re-create and bootstrap from 
> scratch. (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
> \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]}
>  are corrupted and hence needs to be re-initialized
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,652] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> [Consumer 
> clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer,
>  groupId=null] Unsubscribed all topics or patterns and assigned partitions 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,652] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> standby-task [1_1] Prepared dirty close 
> (org.apache.kafka.streams.processor.internals.StandbyTask)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,679] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> standby-task [1_1] Closed dirty 
> (org.apache.kafka.streams.processor.internals.StandbyTask)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,751] ERROR 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
> java.lang.IllegalStateException: Illegal state CREATED while preparing 
> standby task 1_1 for committing
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,751] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> ```
> Two solutions here: either we deprecate `commitAll` and always enforce state 
> check to selectively commit tasks, or we enforce a state check inside standby 
> task commitNeeded call to reference its state. Added a fix for option one 
> here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to