[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Brutschy reassigned KAFKA-15344: -------------------------------------- Assignee: Lucas Brutschy > Kafka Streams should include the message leader epoch when committing offsets > ----------------------------------------------------------------------------- > > Key: KAFKA-15344 > URL: https://issues.apache.org/jira/browse/KAFKA-15344 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: David Mao > Assignee: Lucas Brutschy > Priority: Major > > We noticed an application received an OFFSET_OUT_OF_RANGE error following a > network partition and streams task rebalance and subsequently reset its > offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has > stale metadata for P. > 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset > 50 without an epoch. > 3. The consumer group rebalances and P is now assigned to consumer 2. > Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). > Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a > zombie leader due to a network partition, the zombie leader may accept > consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer > 2. > If in step 1, consumer 1 committed the leader epoch for the message, then > when consumer 2 receives assignment P it would force a metadata refresh to > discover a sufficiently new leader epoch for the committed offset. > The low-hanging fruit fix would be to have streams pass in the message epoch > for each commit. Another fix discussed with [~hachikuji] is to have the > consumer cache leader epoch ranges, similar to how the broker maintains a > leader epoch cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)