Hi Lucas, This sounds like an interesting approach and I wonder whether anyone will have the context about why this part of KIP-320 was not completed.
There is one little detail that I can think of, which we can address later as the details are confirmed. KafkaShareConsumer also uses ConsumerRecords for delivering records to the application, but consumers in share groups do not have a position. If we do implement ConsumerRecords.nextOffsets, we need to have a way to indicate that no information is available for KafkaShareConsumer, such as returning an empty map. I’m happy to include this detail in KIP-932 if that proves necessary. Thanks, Andrew > On 4 Sep 2024, at 07:37, Lucas Brutschy <lbruts...@confluent.io.INVALID> > wrote: > > Hi everyone, > > we noticed that KIP-320 (accepted) proposes implementing a new public > method `ConsumerRecords.nextOffsets`, but the method was never > implemented in Apache Kafka. We are now planning to implement this > method, since it is a gap in the public interface of the consumer, in > particular when consuming from topics that contain Kafka transaction > control records. Since the method was already proposed and accepted > back in the day, we formally do not need to write a new KIP for it and > discuss the change again. We thought we'd anyway announce this public > interface change on the mailing list, since the KIP is rather old. > > Background: The concrete gap that we want to fill in Kafka Streams > (but which will affect other applications using Kafka transactions as > well) is the following. Let's say a Kafka Streams application wants to > commit offsets, and we have already processed all records in the > internal buffers of Kafka Streams. The naive approach would be to > commit the offset of the last record inside the ConsumerRecords > object, plus one. However, with Kafka transactions, this will lead to > a constant non-zero lag for the consumer, since the last (data) record > is typically followed by one or more control records, so we'd never > actually commit the end offset of the topic. That is why Kafka Streams > commits the offset returned by `KafkaConsumer.position`, which is the > next offset to be fetched after the last fetched batch, in particular > after a potential commit control record. However, > `KafkaConsumer.position` does not include the leader epoch, and > committing offsets without including the corresponding leader epoch > defeats the purpose of KIP-320, which is a gap that we are currently > trying to close. We debated introducing something like > `KafkaConsumer.positionWithMetadata`. However, the originally proposed > method `ConsumerRecords.nextOffsets` from KIP-320 seems to be a more > elegant approach and also something already accepted by the community. > > Let us know if there are any concerns with implementing this method. > If there are any concerns, we are open to writing a new KIP. > > Cheers, > Lucas