[ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16017:
-------------------------------------

    Assignee: Bruno Cadonna

> Checkpointed offset is incorrect when task is revived and restoring 
> --------------------------------------------------------------------
>
>                 Key: KAFKA-16017
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16017
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.3.1
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to