[
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698502#comment-16698502
]
Matthias J. Sax commented on KAFKA-7672:
----------------------------------------
Thanks for reporting this and for the detailed analysis!
> The local state not fully restored after KafkaStream rebalanced, resulting in
> data loss
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
> Reporter: linyue li
> Priority: Major
> Fix For: 2.1.0
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was
> found under its task folder, Kafka Stream needs to restore the local state
> for remote changelog topic completely and then resume running. However, in
> some scenarios, we found that Kafka Stream *NOT* restore this state even no
> checkpoint was found, but just clean the state folder and transition to
> running state directly, resulting the historic data loss.
> To be specific, I will give the detailed logs for Kafka Stream in our project
> to show this scenario:
> {quote}2018-10-23 08:27:07,684 INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO
> org.apache.kafka.streams.processor.internals.StoreChangelogReader -
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.*
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread
> AuditTrailBatch-StreamThread-1:
> # the previous running task assigned to thread 1 is task 0_5 (the
> corresponding partition is AuditTrailBatch-0-5)
> # group begins to rebalance, the new task 1_1 is assigned to thread 1.
> # no checkpoint was found under 1_1 state folder, so reset the offset to 0
> and clean the local state folder.
> # thread 1 transitions to RUNNING state directly without the restoration for
> task 1_1, so the historic data for state 1_1 is lost for thread 1.
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set<TopicPartition> completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the
> thread initialization, and not accessed crossly by other threads. The
> completedRestorers is used to record the partitions that has been restored
> completely in the thread.
>
> {code:java}
> if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
> restorer.restoreDone();
> endOffsets.remove(partition);
> completedRestorers.add(partition);
> }{code}
>
> Once the partition is added to completedRestorers set, it will be returned by
> restore() and pass to the next caller updateRestored(), and then the
> transitionToRunning() will set this task to running state.
> But we found that completedRestorers never cleared during the life cycle of
> this thread, even in the reset function:
>
>
>
> {code:java}
> @Override
> public void reset() {
> partitionInfo.clear();
> stateRestorers.clear();
> needsRestoring.clear();
> endOffsets.clear();
> needsInitializing.clear();
> }
> {code}
> It will cause a problem: we assume that the task 1 once assigned to thread A,
> so its partition has been added to completeRestores. Then it mitigated to
> another thread (maybe in other instance). After several rounds of
> rebalancing, it transitioned to thread A again and no checkpoint was here for
> some reason. The right way is to clean the state and restore it for
> beginning, but now, it found this task's partition is already in
> completedRestorers set, so it will consider this task as restored completely
> and resumed running directly.
> To avoid to miss restoring state, we should clean the historical
> completedRestorers set every time after reassignment. So I add the clear
> operation in the reset() and validate it works.
> {code:java}
> @Override
> public void reset() {
> partitionInfo.clear();
> stateRestorers.clear();
> needsRestoring.clear();
> endOffsets.clear();
> needsInitializing.clear();
>
> //add by linyli
> completedRestorers.clear();
> }{code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)