Hi All, I have updated the KIP with recent suggestions.
Thanks. On Fri, Nov 8, 2024 at 11:53 AM Manikumar <manikumar.re...@gmail.com> wrote: > > Hi Matthias, > > Thanks for the comments. > > > I am personally not convinced that adding a new config > `auto.offset.reset.by.duration` is the best way. > > I think in this case, we felt it's better to add the strategy name to > auto.offset.reset config and add additional strategy specific configs. > This will be easy to understand and extend in future. I am open to > change if there are similar concerns. > > > One question about "earliest_local": if we fetch this offset from the > server, and seek it later, and start fetching, won't there be a race > condition? the server would just need to reload it from the tier. > > Yes, your understanding is correct. the server will read from the tier > storage. > Anyhow, I will consider dropping earliest_local based on Jun's suggestion. > > > For Kafka Streams, just adding the new option so the existing enum does > not work IMHO. If a user would use BY_DURATION, there is no option to pass > in the actual duration (and we might need to throw an exception). > > I was assuming we will be using the underlying consumer config here. I > will update based on your suggestions. > > > Thanks, > > > > > On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote: > > > > Thanks for updating the KIP. > > > > I am personally not convinced that adding a new config > > `auto.offset.reset.by.duration` is the best way. Kafka in general has > > way too many configs and trying to avoid adding more configs seems to be > > desirable? -- It seems this might be a point of contention, and if the > > majority of people wants this new config so be it. I just wanted to > > stress my concerns about it. > > > > One question about "earliest_local": if we fetch this offset from the > > server, and seek to it later, and start fetching, won't there be a race > > condition? The oldest segment could have been discarded locally in the > > mean time? Maybe not too much of a big deal: the server would just need > > to reload it from the tier. Just want to ensure my understanding is > > correct, and make sure nobody has concerns about it. > > > > For Kafka Streams, just adding the new option so the existing enum does > > not work IMHO. > > > > For example, we have a method: > > > > addSource(final AutoOffsetReset offsetReset, > > final String name, > > final String... topics) > > > > If a user would use BY_DURATION, there is no option to pass in the > > actual duration (and we might need to throw an exception). And if we > > overload `addSource` adding a fourth parameter, it would be odd too, as > > users could pass in `EARLIEST` plus a duration, and we would need to > > throw an exception or ignore the given duration. Overall not an easy to > > use API. > > > > Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum, > > and add a new class `AutoOffsetReset` for which we can guide users > > accordingly avoiding corner cases: > > > > public class AutoOffsetReset() { > > public static AutoOffsetReset earliest(); > > public static AutoOffsetReset latest(); > > public static AutoOffsetReset byDuration(Duration duration); > > public static AutoOffsetReset earliestLocal(); > > } > > > > The existing enum is contained inside `Topology` so we can add the new > > class just for package `org.apache.kafka.streams` to avoid naming conflicts. > > > > Additionally, we would deprecate all methods taking existing enum > > `AutoOffsetReset` (only two classed `Topology` and `Consumed` are > > affected) and add new overload which accepts the new `class > > AutoOffsetReset` instead. > > > > > > -Matthias > > > > On 11/7/24 1:57 PM, Jun Rao wrote: > > > Hi, Manikumar, > > > > > > Thanks for the KIP. A couple of comments. > > > > > > JR1. It doesn't seem that we need earliest_local. Intuitively, it makes > > > sense for a consumer app to bootstrap based on time. The time that a topic > > > is kept locally seems irrelevant to a consumer application. > > > > > > JR2. I took a look at the proposal of KIP-842. It seems that this KIP > > > covers the common scenario in KIP-842 and is simpler. The scenario > > > described in KIP-842 is about which initial offset to use when a new > > > partition is added. This is more or less the same problem for choosing the > > > initial offset for existing partitions. Having a time based option > > > provides > > > more flexibility than earliest or latest, and is likely good enough for > > > most usage. > > > > > > Jun > > > > > > > > > On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com> > > > wrote: > > > > > >> Thanks Manikumar and Andrew. > > >> > > >> For AM6: Though I was thinking of enhancing OffsetFetcher and adding > > >> "seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API > > >> itself. However I agree that this is just another convenient way and not > > >> necessary to support functionality. Skipping it sounds good to me. > > >> > > >> Regards, > > >> Apoorv Mittal > > >> > > >> > > >> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield < > > >> andrew_schofield_j...@outlook.com> wrote: > > >> > > >>> Hi Manikumar, > > >>> > > >>> AS7: In response to AM6, I agree that in general the combination > > >>> of offsetsForTimes() and seek() will do what is required. The one case > > >> that > > >>> I think is more tricky is seeking to the local earliest/beginning. > > >>> > > >>> The Consumer API has Consumer.beginningOffsets and > > >>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST. > > >>> Similarly there are methods for OffsetResetStrategy.LATEST. > > >>> > > >>> With the Consumer API, I don't think you can find the local earliest > > >>> offset. > > >>> That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal. > > >>> We could add equivalents of Consumer.beginningOffsets and > > >>> Consumer.seekToBeginning. An application could always use the > > >>> Admin API with the Consumer API. > > >>> > > >>> Personally, I would not complicate this KIP with these new methods when > > >>> the focus is more on duration-based offset reset and configs. > > >>> > > >>> Thanks, > > >>> Andrew > > >>> ________________________________________ > > >>> From: Manikumar <manikumar.re...@gmail.com> > > >>> Sent: 06 November 2024 04:20 > > >>> To: dev@kafka.apache.org <dev@kafka.apache.org> > > >>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option > > >>> for consumer clients > > >>> > > >>> Hi Apoorv, > > >>> > > >>> Thanks for the review. > > >>> > > >>> AM5: Thanks, Updated the KIP. > > >>> > > >>> AM6: Currently we can achieve this with the combination of > > >>> offsetsForTimes() and seek() API.. > > >>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition, > > >>> Long> timestampsToSearch) if required. > > >>> Let's see what others think. > > >>> > > >>> Thanks, > > >>> > > >>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com> > > >>> wrote: > > >>>> > > >>>> Hi Manikumar, > > >>>> Thanks for the changes. Just minor comment and a question: > > >>>> > > >>>> AM5: The description "How to initialize the share-partition start > > >>> offset:" > > >>>> for "*share.auto.offset.reset*" seems incomplete as it ends with `:`. > > >>>> Should we write the details as defined in KIP-932 i.e. > > >>>> > > >>>> How to initialize the share-partition start offset: > > >>>> > > >>>> - > > >>>> > > >>>> "earliest" : automatically reset the offset to the earliest offset > > >>>> - > > >>>> > > >>>> "latest" : automatically reset the offset to the latest offset > > >>>> - > > >>>> > > >>>> "*earliest_local": *automatically resets the offset to the earliest > > >>>> message stored in the local log on the broker. > > >>>> - > > >>>> > > >>>> "*by_duration": *automatically resets the offset to the earliest > > >>> offset > > >>>> whose timestamp is greater than or equal to the configured duration > > >>>> * (auto.offset.reset.by.duration).* > > >>>> > > >>>> > > >>>> *or* maybe just replace ":" with "." i.e. "How to initialize the > > >>>> share-partition start offset." as the details of the new configs are > > >>>> defined at top in different sections as well. > > >>>> > > >>>> AM6: Question: KafkaConsumer has public methods defined to "seek" for > > >> the > > >>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses > > >>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think > > >> it > > >>>> would be sensible to add additional KafkaConsumer APIs for newly > > >>> introduced > > >>>> strategies? > > >>>> > > >>>> Regards, > > >>>> Apoorv Mittal > > >>>> > > >>>> > > >>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com> > > >>> wrote: > > >>>> > > >>>>> Hi Matthias, > > >>>>> > > >>>>> Thanks for the review. > > >>>>> > > >>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes > > >> should > > >>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is > > >>>>> proposing to add new configs (auto.offset.reset.on.no.initial.offset > > >>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842 > > >>>>> after completing this KIP. > > >>>>> > > >>>>> MS2: Added support for earliest-local config to reset to oldest local > > >>>>> offset and changes to kafka-consumer-group cmd. > > >>>>> > > >>>>> MS3: Based on suggestions, I have updated the config option to > > >> support > > >>>>> ISO format. > > >>>>> > > >>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I > > >>>>> definitely need your help on refining the API/solution for KS :) > > >>>>> > > >>>>> Thanks > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org> > > >>> wrote: > > >>>>>> > > >>>>>> Thanks for the KIP. > > >>>>>> > > >>>>>> A somewhat orthogonal question I have is, if we should try to merge > > >>> this > > >>>>>> KIP with the existing KIP-842: > > >>>>>> > > >>>>> > > >>> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms > > >>>>>> > > >>>>>> While KIP-842 has a different intention, ie, separating the case of > > >>> "no > > >>>>>> offset during initial startup" vs "offset out of bounds during > > >>>>>> processing", it might still be worth it? My motivation for this > > >>> question > > >>>>>> is, that `auto.offset.reset` is one of the most central configs we > > >>> have, > > >>>>>> and thus, doing a single larger change (ie, both KIPs together) > > >>> might be > > >>>>>> better (less noise) than doing two independent changes? Thoughts? I > > >>> know > > >>>>>> it might be a little bit of a stretch, but asking cannot hurt :) > > >>>>>> > > >>>>>> > > >>>>>> About the KIP itself, did we consider to add something like > > >>>>>> "latest-local" to say reset to oldest local, but not fetch from > > >>> tiered > > >>>>>> storage? > > >>>>>> > > >>>>>> For the format of the config, did we consider what we do in > > >>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options > > >> like > > >>>>>> > > >>>>>> --by-duration <duration: format `PnDTnHnMnS`> > > >>>>>> --shift-by <number-of-records> > > >>>>>> > > >>>>>> --to-datetime <fixed point in time: format > > >>> `YYYY-MM-DDTHH:mm:SS.sss`> > > >>>>>> --to-offset <absolute offset> > > >>>>>> > > >>>>>> As it also uses ISO format, it might be good to also use them (even > > >>> if I > > >>>>>> can see the appeal and simplicity of what you proposed). > > >>>>>> > > >>>>>> > > >>>>>> I also want to add to (AM1), as there is also > > >>> `Topology.AutoOffsetReset` > > >>>>>> enum in Kafka Streams. I think we would need to convert this into a > > >>>>>> class. While it could be done is a follow up KIP, too, it seems > > >> best > > >>> to > > >>>>>> do this holistically in a single KIP, because KS is not something > > >>> on-top > > >>>>>> of Kafka, but it's part of Kafka. I am happy to help with the > > >> design > > >>> and > > >>>>>> even PRs if necessary, but would strongly prefer to do it all in a > > >>>>>> single KIP. > > >>>>>> > > >>>>>> > > >>>>>> Btw: if we add something like "latest-local", it might also be good > > >>> to > > >>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the tool > > >>> works > > >>>>>> slightly different, as it does commit an offset and there could be > > >>> some > > >>>>>> race condition between committing "latest-local", tiering, and when > > >>> the > > >>>>>> consumer is actually started? > > >>>>>> > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> > > >>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote: > > >>>>>>> Hi Manikumar, > > >>>>>>> Thanks for the KIP, this new strategy would be helpful in > > >>> specifying > > >>>>> fetch > > >>>>>>> behaviour. > > >>>>>>> > > >>>>>>> AM1: The config `auto.offset.reset` is currently applied as per > > >> the > > >>>>> enum > > >>>>>>> class OffsetResetStarategy which is part of kafka-clients javadoc > > >>>>>>> < > > >>>>> > > >>> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html > > >>> . > > >>>>>>> Are we also proposing to somehow add new definitions in the same > > >>> class? > > >>>>>>> However as new configurations will be a string representation > > >> hence > > >>>>> are we > > >>>>>>> moving away from OffsetResetStartegy enum class altogether? > > >> Should > > >>> we > > >>>>>>> include the change in the KIP as OffsetResetStarategy is part of > > >>> public > > >>>>>>> javadoc? > > >>>>>>> > > >>>>>>> AM2: While I can see the ISO-8601 format is in the rejected > > >>>>> alternative, > > >>>>>>> should we not follow some standard of defining duration which has > > >>>>> already > > >>>>>>> been adopted in other systems? > > >>>>>>> > > >>>>>>> AM3: We've introduced new config values using the format > > >>> minus-n-hours, > > >>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we > > >>> explicitly > > >>>>>>> define the "minus" prefix, or is it implied? > > >>>>>>> > > >>>>>>> AM4: When supporting duration-based resets, should we also allow > > >>> users > > >>>>> to > > >>>>>>> specify a specific checkpoint time? For example, if a checkpoint > > >>>>> occurs 2 > > >>>>>>> days, 5 hours and 30 minutes earlier, the current four formats > > >>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) > > >> might > > >>> not > > >>>>> be > > >>>>>>> sufficient. Should we consider adding a format to accommodate > > >>> specific > > >>>>>>> checkpoint times, or is there a reason to limit the supported > > >>> formats? > > >>>>>>> > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Apoorv Mittal > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar < > > >>> manikumar.re...@gmail.com> > > >>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi everyone, > > >>>>>>>> I would like to start a discussion on KIP-1106: > > >>>>>>>> > > >>>>>>>> > > >>>>> > > >>> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients > > >>>>>>>> > > >>>>>>>> This KIP proposes to add an additional auto offset reset > > >> strategy > > >>> for > > >>>>>>>> consumer clients. > > >>>>>>>> > > >>>>>>>> Regards, > > >>>>>>>> Manikumar > > >>>>>>>> > > >>>>>>> > > >>>>> > > >>> > > >> > > >