[ https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-8574. ---------------------------------- Fix Version/s: 2.6.0 Assignee: Guozhang Wang Resolution: Fixed > EOS race condition during task transition leads to LocalStateStore truncation > in Kafka Streams 2.0.1 > ---------------------------------------------------------------------------------------------------- > > Key: KAFKA-8574 > URL: https://issues.apache.org/jira/browse/KAFKA-8574 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.1 > Reporter: William Greer > Assignee: Guozhang Wang > Priority: Major > Fix For: 2.6.0 > > > *Overview* > While using EOS in Kafka Stream there is a race condition where the > checkpoint file is written by the previous owning thread (Thread A) after the > new owning thread (Thread B) reads the checkpoint file. Thread B then starts > a restoration since no checkpoint file was found. A re-balance occurs before > Thread B completes the restoration and a third Thread (Thread C) becomes the > owning thread (Thread C) reads the checkpoint file written by Thread A which > does not correspond to the current state of the RocksDB state store. When > this race condition occurs the state store will have the most recent records > and some amount of the oldest records but will be missing some amount of > records in between. If A->Z represents the entire changelog to the present > then when this scenario occurs the state store would contain records [A->K > and Y->Z] where the state store is missing records K->Y. > > This race condition is possible due to dirty writes and dirty reads of the > checkpoint file. > > *Example:* > Thread refers to a Kafka Streams StreamThread [0] > Thread A, B and C are running in the same JVM in the same streams > application. > > Scenario: > Thread-A is in RUNNING state and up to date on partition 1. > Thread-A is suspended on 1. This does not write a checkpoint file because > EOS is enabled [1] > Thread-B is assigned to 1 > Thread-B does not find checkpoint in StateManager [2] > Thread-A is assigned a different partition. Task writes suspended tasks > checkpoints to disk. Checkpoint for 1 is written. [3] > Thread-B deletes LocalStore and starts restoring. The deletion of the > LocalStore does not delete checkpoint file. [4] > Thread-C is revoked > Thread-A is revoked > Thread-B is revoked from the assigned status. Does not write a checkpoint > file > - Note Thread-B never reaches the running state, it remains in the > PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state > Thread-C is assigned 1 > Thread-C finds checkpoint in StateManager. This checkpoint corresponds to > where Thread-A left the state store for partition 1 at and not where Thread-B > left the state store at. > Thread-C begins restoring from checkpoint. The state store is missing an > unknown number of records at this point > Thread-B is assigned does not write a checkpoint file for partition 1, > because it had not reached a running status before being revoked > > [0] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java] > [1] > [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553] > [2] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98] > [3] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105] > & > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331] > [4] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228] > & > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123] > Specifically > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119] > is where the state store is deleted but the checkpoint file is not. > > *How we recovered:* > 1. Deleted the impacted state store. This triggered multiple exceptions and > initiated a re-balance. > > *Possible approaches to address this issue:* > 1. Add a collection of global task locks for concurrency protection of the > checkpoint file. With the lock for suspended tasks being released after > closeNonAssignedSuspendedTasks and the locks being acquired after lock > release for the assigned tasks. > 2. Delete checkpoint file in EOS when partitions are revoked. This doesn't > address the race condition but would make it so that the checkpoint file > would never be ahead of the LocalStore in EOS, this would increase the > likelihood of triggering a full restoration of a LocalStore on partition > movement between threads on one host. > 3. Configure task stickiness for StreamThreads. E.G. if a host with multiple > StreamThreads is assigned a task the host had before prefer to assign the > task to the thread on the host that had the task before. > 4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up > previous assignment step and a bootstrap new assignment. This would require > all valid threads to complete the clean up step before any thread could > progress into the bootstrap new assignment step. > 5. Force a checkpoint of the current position during PARTITIONS_REVOKED. I > don't think this addresses the race condition but I think it mitigates the > truncation scenario. > > *Made less likely by KAFKA-7672* > It seems the fix for https://issues.apache.org/jira/browse/KAFKA-7672 > introduces a forced checkpoint during EOS so this truncation scenario may be > less likely for 2.2.0 but not for earlier versions, The change-set for > KAFKA-7672 doesn't address the race condition's around reading and writing > the checkpoint files. As far as I can tell It is still possible for a > StreamThread to not have completed the checkpoint writing in > PARTITIONS_REVOKED before another StreamThread has completed the checkpoint > read in PARTITIONS_ASSIGNED. -- This message was sent by Atlassian Jira (v8.3.4#803005)