[ 
https://issues.apache.org/jira/browse/KAFKA-13350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13350:
------------------------------------
    Description: 
When we hit an `OffsetOutOfRangeException` during restore, we close a tasks as 
dirty and retry the restore process from scratch. For this case, we wipe out 
the task's state stores.

If a task has multiple state stores, we also wipe out state that is actually 
clean and thus need to redo work for no reason. Instead of wiping out all state 
store, we should only wipe out the single state store that corresponds to the 
changelog topic partition that hit the `OffsetOutOfRangeException`, but 
preserve the restore progress for all other state stores.

We need to consider persistent and in-memory stores: for persistent stores, it 
would be fine to close the not affected stores cleanly and also write the 
checkpoint file. For in-memory stores however, we should not close the store to 
avoid dropping the in-memory data.

*TODO:* verify consumer behavior: if a consumer subscribes to multiple 
partitions, and two or more partitions are on the same broker, both could 
trigger an `OffsetOutOfRangeException` from a single fetch request at the same 
time – however, it seems that the consumer only reports a single 
`TopicPartition` when it raises an `OffsetOutOfRangeException`. Thus, we need 
to ensure to not lose information and maybe need to update the consumer to 
report all affected partitions at once? Or maybe it won't be an issue, because 
the next fetch request would send the same offset for the "missed" partitions 
and thus we would get a new `OffsetOutOfRangeException` anyway (it might still 
be more efficient to get all affected partitions at once).

  was:
When we hit an `OffsetOutOfRangeException` during restore, we close a tasks as 
dirty and retry the restore process from scratch. For this case, we wipe out 
the task's state stores.

If a task has multiple state stores, we also wipe out state that is actually 
clean and thus need to redo work for no reason. Instead of wiping out all state 
store, we should only wipe out the single state store that corresponds to the 
changelog topic partition that hit the `OffsetOutOfRangeException`, but 
preserve the restore progress for all other state stores.

We need to consider persistent and in-memory stores: for persistent stores, it 
would be fine to close the not affected stores cleanly and also write the 
checkpoint file. For in-memory stores however, we should not close the store to 
avoid dropping the in-memory data.


> Handle task corrupted exception on a per state store basis
> ----------------------------------------------------------
>
>                 Key: KAFKA-13350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13350
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>
> When we hit an `OffsetOutOfRangeException` during restore, we close a tasks 
> as dirty and retry the restore process from scratch. For this case, we wipe 
> out the task's state stores.
> If a task has multiple state stores, we also wipe out state that is actually 
> clean and thus need to redo work for no reason. Instead of wiping out all 
> state store, we should only wipe out the single state store that corresponds 
> to the changelog topic partition that hit the `OffsetOutOfRangeException`, 
> but preserve the restore progress for all other state stores.
> We need to consider persistent and in-memory stores: for persistent stores, 
> it would be fine to close the not affected stores cleanly and also write the 
> checkpoint file. For in-memory stores however, we should not close the store 
> to avoid dropping the in-memory data.
> *TODO:* verify consumer behavior: if a consumer subscribes to multiple 
> partitions, and two or more partitions are on the same broker, both could 
> trigger an `OffsetOutOfRangeException` from a single fetch request at the 
> same time – however, it seems that the consumer only reports a single 
> `TopicPartition` when it raises an `OffsetOutOfRangeException`. Thus, we need 
> to ensure to not lose information and maybe need to update the consumer to 
> report all affected partitions at once? Or maybe it won't be an issue, 
> because the next fetch request would send the same offset for the "missed" 
> partitions and thus we would get a new `OffsetOutOfRangeException` anyway (it 
> might still be more efficient to get all affected partitions at once).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to