lucasbru opened a new pull request, #14454: URL: https://github.com/apache/kafka/pull/14454
Kafka Streams needs to include the leader epoch when committing offsets. Leader epoch is required to detect situations where a consumer with outdated metadata is trying to fetch the committed offset of a partition after being assigned that partition during a rebalance. The committed offset may be for a newer epoch than the consumer has in its metadata, leading to an OFFSET_OUT_OF_RANGE error and possible data loss. Without an extension of the consumer interface, it is not possible to set the correct leader offset in all circumstances. In particular, when we attempt to commit an offset that ends with a batch of control records, the leader offset of the control records are not exposed to Kafka streams. This can happen primarily in EOS mode, whenever Kafka streams' internal record buffers are depleted. This is a partial fix to avoid the situation described above in most cases - with the final fix for EOS still open. When committing an offset, and our internal record queue is non-empty, we commit the leader offset of the next record in the queue. We extend `StampedRecord`, `RecordQueue`, `RecordDeserializer` and `PartitionGroup` to expose and keep track of leader epochs. If our internal record queue is empty, we commit the position of the consumer. If the position of the consumer happens to be the last consumed offset + 1, we use the last consumed leader epoch in the commit, otherwise, we omit the leader epoch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org