Hi, Manikumar, Thanks for the KIP. A couple of comments.
JR1. It doesn't seem that we need earliest_local. Intuitively, it makes sense for a consumer app to bootstrap based on time. The time that a topic is kept locally seems irrelevant to a consumer application. JR2. I took a look at the proposal of KIP-842. It seems that this KIP covers the common scenario in KIP-842 and is simpler. The scenario described in KIP-842 is about which initial offset to use when a new partition is added. This is more or less the same problem for choosing the initial offset for existing partitions. Having a time based option provides more flexibility than earliest or latest, and is likely good enough for most usage. Jun On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com> wrote: > Thanks Manikumar and Andrew. > > For AM6: Though I was thinking of enhancing OffsetFetcher and adding > "seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API > itself. However I agree that this is just another convenient way and not > necessary to support functionality. Skipping it sounds good to me. > > Regards, > Apoorv Mittal > > > On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield < > andrew_schofield_j...@outlook.com> wrote: > > > Hi Manikumar, > > > > AS7: In response to AM6, I agree that in general the combination > > of offsetsForTimes() and seek() will do what is required. The one case > that > > I think is more tricky is seeking to the local earliest/beginning. > > > > The Consumer API has Consumer.beginningOffsets and > > seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST. > > Similarly there are methods for OffsetResetStrategy.LATEST. > > > > With the Consumer API, I don't think you can find the local earliest > > offset. > > That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal. > > We could add equivalents of Consumer.beginningOffsets and > > Consumer.seekToBeginning. An application could always use the > > Admin API with the Consumer API. > > > > Personally, I would not complicate this KIP with these new methods when > > the focus is more on duration-based offset reset and configs. > > > > Thanks, > > Andrew > > ________________________________________ > > From: Manikumar <manikumar.re...@gmail.com> > > Sent: 06 November 2024 04:20 > > To: dev@kafka.apache.org <dev@kafka.apache.org> > > Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option > > for consumer clients > > > > Hi Apoorv, > > > > Thanks for the review. > > > > AM5: Thanks, Updated the KIP. > > > > AM6: Currently we can achieve this with the combination of > > offsetsForTimes() and seek() API.. > > Maybe we can add a generic API like seekToTimes(Map<TopicPartition, > > Long> timestampsToSearch) if required. > > Let's see what others think. > > > > Thanks, > > > > On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com> > > wrote: > > > > > > Hi Manikumar, > > > Thanks for the changes. Just minor comment and a question: > > > > > > AM5: The description "How to initialize the share-partition start > > offset:" > > > for "*share.auto.offset.reset*" seems incomplete as it ends with `:`. > > > Should we write the details as defined in KIP-932 i.e. > > > > > > How to initialize the share-partition start offset: > > > > > > - > > > > > > "earliest" : automatically reset the offset to the earliest offset > > > - > > > > > > "latest" : automatically reset the offset to the latest offset > > > - > > > > > > "*earliest_local": *automatically resets the offset to the earliest > > > message stored in the local log on the broker. > > > - > > > > > > "*by_duration": *automatically resets the offset to the earliest > > offset > > > whose timestamp is greater than or equal to the configured duration > > > * (auto.offset.reset.by.duration).* > > > > > > > > > *or* maybe just replace ":" with "." i.e. "How to initialize the > > > share-partition start offset." as the details of the new configs are > > > defined at top in different sections as well. > > > > > > AM6: Question: KafkaConsumer has public methods defined to "seek" for > the > > > reset strategy like "seekToBeginning" and "seekToEnd" which uses > > > OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think > it > > > would be sensible to add additional KafkaConsumer APIs for newly > > introduced > > > strategies? > > > > > > Regards, > > > Apoorv Mittal > > > > > > > > > On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com> > > wrote: > > > > > > > Hi Matthias, > > > > > > > > Thanks for the review. > > > > > > > > MS1: looks like KIP-842 is stuck in the voting. Current changes > should > > > > fit into KIP-842 (few updates definitely required) as KIP-842 is > > > > proposing to add new configs (auto.offset.reset.on.no.initial.offset > > > > and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842 > > > > after completing this KIP. > > > > > > > > MS2: Added support for earliest-local config to reset to oldest local > > > > offset and changes to kafka-consumer-group cmd. > > > > > > > > MS3: Based on suggestions, I have updated the config option to > support > > > > ISO format. > > > > > > > > MS4: I have included the AutoOffsetReset changes to the KIP. I > > > > definitely need your help on refining the API/solution for KS :) > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > A somewhat orthogonal question I have is, if we should try to merge > > this > > > > > KIP with the existing KIP-842: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms > > > > > > > > > > While KIP-842 has a different intention, ie, separating the case of > > "no > > > > > offset during initial startup" vs "offset out of bounds during > > > > > processing", it might still be worth it? My motivation for this > > question > > > > > is, that `auto.offset.reset` is one of the most central configs we > > have, > > > > > and thus, doing a single larger change (ie, both KIPs together) > > might be > > > > > better (less noise) than doing two independent changes? Thoughts? I > > know > > > > > it might be a little bit of a stretch, but asking cannot hurt :) > > > > > > > > > > > > > > > About the KIP itself, did we consider to add something like > > > > > "latest-local" to say reset to oldest local, but not fetch from > > tiered > > > > > storage? > > > > > > > > > > For the format of the config, did we consider what we do in > > > > > `bin/kafka-consumer-group.sh`? It has multiple different options > like > > > > > > > > > > --by-duration <duration: format `PnDTnHnMnS`> > > > > > --shift-by <number-of-records> > > > > > > > > > > --to-datetime <fixed point in time: format > > `YYYY-MM-DDTHH:mm:SS.sss`> > > > > > --to-offset <absolute offset> > > > > > > > > > > As it also uses ISO format, it might be good to also use them (even > > if I > > > > > can see the appeal and simplicity of what you proposed). > > > > > > > > > > > > > > > I also want to add to (AM1), as there is also > > `Topology.AutoOffsetReset` > > > > > enum in Kafka Streams. I think we would need to convert this into a > > > > > class. While it could be done is a follow up KIP, too, it seems > best > > to > > > > > do this holistically in a single KIP, because KS is not something > > on-top > > > > > of Kafka, but it's part of Kafka. I am happy to help with the > design > > and > > > > > even PRs if necessary, but would strongly prefer to do it all in a > > > > > single KIP. > > > > > > > > > > > > > > > Btw: if we add something like "latest-local", it might also be good > > to > > > > > extend `bin/kafka-consumer-group.sh` accordingly (even if the tool > > works > > > > > slightly different, as it does commit an offset and there could be > > some > > > > > race condition between committing "latest-local", tiering, and when > > the > > > > > consumer is actually started? > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 11/4/24 7:38 AM, Apoorv Mittal wrote: > > > > > > Hi Manikumar, > > > > > > Thanks for the KIP, this new strategy would be helpful in > > specifying > > > > fetch > > > > > > behaviour. > > > > > > > > > > > > AM1: The config `auto.offset.reset` is currently applied as per > the > > > > enum > > > > > > class OffsetResetStarategy which is part of kafka-clients javadoc > > > > > > < > > > > > > https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html > >. > > > > > > Are we also proposing to somehow add new definitions in the same > > class? > > > > > > However as new configurations will be a string representation > hence > > > > are we > > > > > > moving away from OffsetResetStartegy enum class altogether? > Should > > we > > > > > > include the change in the KIP as OffsetResetStarategy is part of > > public > > > > > > javadoc? > > > > > > > > > > > > AM2: While I can see the ISO-8601 format is in the rejected > > > > alternative, > > > > > > should we not follow some standard of defining duration which has > > > > already > > > > > > been adopted in other systems? > > > > > > > > > > > > AM3: We've introduced new config values using the format > > minus-n-hours, > > > > > > minus-n-days, minus-n-months, and minus-n-years. Should we > > explicitly > > > > > > define the "minus" prefix, or is it implied? > > > > > > > > > > > > AM4: When supporting duration-based resets, should we also allow > > users > > > > to > > > > > > specify a specific checkpoint time? For example, if a checkpoint > > > > occurs 2 > > > > > > days, 5 hours and 30 minutes earlier, the current four formats > > > > > > (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) > might > > not > > > > be > > > > > > sufficient. Should we consider adding a format to accommodate > > specific > > > > > > checkpoint times, or is there a reason to limit the supported > > formats? > > > > > > > > > > > > > > > > > > Regards, > > > > > > Apoorv Mittal > > > > > > > > > > > > > > > > > > On Mon, Nov 4, 2024 at 9:23 AM Manikumar < > > manikumar.re...@gmail.com> > > > > wrote: > > > > > > > > > > > >> Hi everyone, > > > > > >> I would like to start a discussion on KIP-1106: > > > > > >> > > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients > > > > > >> > > > > > >> This KIP proposes to add an additional auto offset reset > strategy > > for > > > > > >> consumer clients. > > > > > >> > > > > > >> Regards, > > > > > >> Manikumar > > > > > >> > > > > > > > > > > > > >