[ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
----------------------------------
    Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset that the record collector stored when it sent the records to the 
changelog topic. However, since in step 2 the state directory is wiped out, the 
state does not contain those records anymore. It only contains the records that 
it restored in step 3 which might be less. The root cause of this is that the 
offsets in the record collector are not cleaned up when the record collector is 
closed. 

I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.

The repro can be started with

{code}
./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
{code}

The repro writes records into a state store and tries to retrieve them again 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
 It will throw an {{IllegalStateException}} if it cannot find a record in the 
state 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
 Once the offsets in the record collector are cleared on close 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
 and 
https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
 the {{IllegalStateException}} does not occur anymore.

In the logs you can check for 
- {{Restore batch end offset is}} which are the restored offsets in the state.
- {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
- {{task [0_1] Checkpointable offsets}} which show the offsets coming from the 
sending records to the changelog topic 
{{RestoreIntegrationTesttest-stateStore-changelog-1}}
Always the last instances of these before the {{IllegalStateException}} is 
thrown.

You will see that the restored offsets are less than the offsets that are 
written to the checkpoint. The offsets written to the checkpoint come from the 
offsets stored when sending the records to the changelog topic.  



  was:
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset that the record collector stored when it sent the records to the 
changelog topic. However, since in step 2 the state directory is wiped out, the 
state does not contain those records anymore. It only contains the records that 
it restored in step 3 which might be less. The root cause of this is that the 
offsets in the record collector are not cleaned up when the record collector is 
closed. 

I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.


> Checkpointed offset is incorrect when task is revived and restoring 
> --------------------------------------------------------------------
>
>                 Key: KAFKA-16017
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16017
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.3.1
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.
> The repro can be started with
> {code}
> ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
> spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
> {code}
> The repro writes records into a state store and tries to retrieve them again 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
>  It will throw an {{IllegalStateException}} if it cannot find a record in the 
> state 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
>  Once the offsets in the record collector are cleared on close 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
>  and 
> https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
>  the {{IllegalStateException}} does not occur anymore.
> In the logs you can check for 
> - {{Restore batch end offset is}} which are the restored offsets in the state.
> - {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
> - {{task [0_1] Checkpointable offsets}} which show the offsets coming from 
> the sending records to the changelog topic 
> {{RestoreIntegrationTesttest-stateStore-changelog-1}}
> Always the last instances of these before the {{IllegalStateException}} is 
> thrown.
> You will see that the restored offsets are less than the offsets that are 
> written to the checkpoint. The offsets written to the checkpoint come from 
> the offsets stored when sending the records to the changelog topic.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to