[ 
https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15344:
--------------------------------------

    Assignee: Lucas Brutschy

> Kafka Streams should include the message leader epoch when committing offsets
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-15344
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15344
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: David Mao
>            Assignee: Lucas Brutschy
>            Priority: Major
>
> We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
> network partition and streams task rebalance and subsequently reset its 
> offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> An example of where this can cause issues:
> 1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
> assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
> stale metadata for P.
> 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
> 50 without an epoch.
> 3. The consumer group rebalances and P is now assigned to consumer 2. 
> Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). 
> Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a 
> zombie leader due to a network partition, the zombie leader may accept 
> consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 
> 2.
> If in step 1, consumer 1 committed the leader epoch for the message, then 
> when consumer 2 receives assignment P it would force a metadata refresh to 
> discover a sufficiently new leader epoch for the committed offset.
> The low-hanging fruit fix would be to have streams pass in the message epoch 
> for each commit. Another fix discussed with [~hachikuji] is to have the 
> consumer cache leader epoch ranges, similar to how the broker maintains a 
> leader epoch cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to