guozhangwang opened a new pull request, #12519: URL: https://github.com/apache/kafka/pull/12519
1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all. 2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from `handleAssignment`): 1) Task-migrated, throw all the way to stream-thread as `handleTaskMigrated`, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as `handleCorruption`. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from `handleAssignment` or not (when they are thrown from the state updater). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org