[ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

linyue li updated KAFKA-7672:
-----------------------------
    Description: 
Normally, when a task is mitigated to a new thread and no checkpoint file was 
found under its task folder, Kafka Stream needs to restore the local state for 
remote changelog topic completely and then resume running. However, in some 
scenarios, we found that Kafka Stream *NOT* restore this state even no 
checkpoint was found, but just clean the state folder and transition to running 
state directly, resulting the historic data loss. 

To be specific, I will give the detailed logs for Kafka Stream in our project 
to show this scenario:
{quote}2018-10-23 08:27:07,684 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
Revoking previously assigned partitions [audittrailbatch-57]

2018-10-23 08:27:07,684 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_ASSIGNED to 
PARTITIONS_REVOKED

2018-10-23 08:27:10,856 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
(Re-)joining group

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
Successfully joined group with generation 323

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_REVOKED to 
PARTITIONS_ASSIGNED

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] Creating producer client for task 1_1

2018-10-23 08:27:53,622 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] partition assignment took 469 ms.

2018-10-23 08:27:54,357 INFO  
org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
stream-thread [AuditTrailBatch-StreamThread-4] *No checkpoint found for task 
1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
Reinitializing the task and restore its state from the beginning.

2018-10-23 08:27:54,357 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher          - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-restore-consumer, groupId=] *Resetting 
offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*

2018-10-23 08:27:54,653 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] *State transition from PARTITIONS_ASSIGNED to 
RUNNING*
{quote}

  was:
Normally, when a task is mitigated to a new thread and no checkpoint file was 
found under its task folder, Kafka Stream needs to restore the local state for 
remote changelog topic completely and then resume running. Howerver, in some 
scenarios, we found that Kafka Stream *NOT* restore this state even no 
checkpoint was found, but just clean the state folder and transition to running 
state directly, resulting the historic data loss. 

To be specific, I will give the detailed logs for Kafka Stream in our project 
to show this scenario:

2018-10-23 08:27:07,684 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
Revoking previously assigned partitions [audittrailbatch-57]

2018-10-23 08:27:07,684 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_ASSIGNED to 
PARTITIONS_REVOKED

2018-10-23 08:27:10,856 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
(Re-)joining group

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
Successfully joined group with generation 323

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_REVOKED to 
PARTITIONS_ASSIGNED

2018-10-23 08:27:53,153 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] Creating producer client for task 1_1

2018-10-23 08:27:53,622 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] partition assignment took 469 ms.

2018-10-23 08:27:54,357 INFO  
org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
stream-thread [AuditTrailBatch-StreamThread-4] *No checkpoint found for task 
1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
Reinitializing the task and restore its state from the beginning.

2018-10-23 08:27:54,357 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher          - [Consumer 
clientId=AuditTrailBatch-StreamThread-4-restore-consumer, groupId=] *Resetting 
offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*

2018-10-23 08:27:54,653 INFO  
org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
[AuditTrailBatch-StreamThread-4] *State transition from PARTITIONS_ASSIGNED to 
RUNNING*


> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7672
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7672
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>            Reporter: linyue li
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario:
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [audittrailbatch-57]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-4] Creating producer client for task 1_1
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-4] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-4] *No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> Reinitializing the task and restore its state from the beginning.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher          - [Consumer 
> clientId=AuditTrailBatch-StreamThread-4-restore-consumer, groupId=] 
> *Resetting offset for partition AuditTrailBatch-store1-changelog-1 to offset 
> 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-4] *State transition from PARTITIONS_ASSIGNED 
> to RUNNING*
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to