[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16849218#comment-16849218 ]
Guozhang Wang commented on KAFKA-8187: -------------------------------------- [~hustclf] Thanks for filling the PR! [~bbejeck] and I will take a look and see if it can be merged to 2.3 asap. > State store record loss across multiple reassignments when using standby tasks > ------------------------------------------------------------------------------ > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.1 > Reporter: William Greer > Assignee: Bill Bejeck > Priority: Major > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the > directory. > If this host is assigned the active task for 1_0 then all the history in the > state store will be missing from before the offset that was read at the > previous reassignment. > If this host is assigned the standby task for 1_0 then the lock will be > acquired and the standby will start to consume records, but it will still be > missing all records from before the offset that was read at the previous > reassignment. > If this host is not assigned 1_0, then the state directory will get cleaned > up by the stateDirCleaner thread 10 to 20 minutes later and the record loss > issue will be hidden. > [0] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869 > [1] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L324-L340 > [2] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L65-L84 > [3] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L212-L236 > [4] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L332 > [5] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L245-L264 > [6] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L797 > [7] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L798-L803 > [8] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java#L262-L327 > How we recovered from the record loss: > 1. Stop the streams application > 2. Delete the impacted task directory to remove the .checkpoint file > 3. Restart the streams application > Some possible ways of addressing this issue could be the following: > 1. Check that the assigned standbys are running in addition to the assigned > active tasks before returning in this method [1] > 2. Only write the checkpoint file for a task if the thread still has the > state directory lock for the task [9], on close StandbyTasks commit the > offsets they have in memory. [10] > 3. Read the checkpoint file after acquiring the locks for a task in the > StreamThread. > [9] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L325 > [10] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java#L125-L148 -- This message was sent by Atlassian JIRA (v7.6.3#76005)