[ https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna reassigned KAFKA-16017: ------------------------------------- Assignee: Bruno Cadonna > Checkpointed offset is incorrect when task is revived and restoring > -------------------------------------------------------------------- > > Key: KAFKA-16017 > URL: https://issues.apache.org/jira/browse/KAFKA-16017 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.3.1 > Reporter: Bruno Cadonna > Assignee: Bruno Cadonna > Priority: Major > > Streams checkpoints the wrong offset when a task is revived after a > {{TaskCorruptedException}} and the task is then migrated to another stream > thread during restoration. > This might happen in a situation like the following if the Streams > application runs under EOS: > 1. Streams encounters a Network error which triggers a > {{TaskCorruptedException}} > 2. The task that encountered the exception is closed dirty and revived. The > state store directory is wiped out and a rebalance is triggered. > 3. Until the sync of the rebalance is received the revived task is restoring. > 4. When the sync is received the revived task is revoked and a new rebalance > is triggered. During the revocation the task is closed cleanly and a > checkpoint file is written. > 5. With the next rebalance the task moves back to stream thread from which it > was revoked, read the checkpoint and starts restoring. (I might be enough if > the task moves to a stream thread on the same Streams client that shares the > same state directory). > 6. The state of the task misses some records > To mitigate the issue one can restart the the stream thread and delete of the > state on disk. After that the state restores completely from the changelog > topic and the state does not miss any records anymore. > The root cause is that the checkpoint that is written in step 4 contains the > offset that the record collector stored when it sent the records to the > changelog topic. However, since in step 2 the state directory is wiped out, > the state does not contain those records anymore. It only contains the > records that it restored in step 3 which might be less. The root cause of > this is that the offsets in the record collector are not cleaned up when the > record collector is closed. > I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017. -- This message was sent by Atlassian Jira (v8.20.10#820010)