Hi, ManiKumar, Thanks for the updated PR. Another comment.
JR3. Is there any benefit of using ISO8601 for auto.offset.reset.by.duration instead of just ms in long? The latter covers a broader range. Also, AutoOffsetResetByDuration uses Java During, which covers a different granularity than ISO8601. It might be better to just use ms for both. Jun On Fri, Nov 8, 2024 at 5:51 AM Manikumar <manikumar.re...@gmail.com> wrote: > Hi All, > > I have updated the KIP with recent suggestions. > > Thanks. > > On Fri, Nov 8, 2024 at 11:53 AM Manikumar <manikumar.re...@gmail.com> > wrote: > > > > Hi Matthias, > > > > Thanks for the comments. > > > > > I am personally not convinced that adding a new config > > `auto.offset.reset.by.duration` is the best way. > > > > I think in this case, we felt it's better to add the strategy name to > > auto.offset.reset config and add additional strategy specific configs. > > This will be easy to understand and extend in future. I am open to > > change if there are similar concerns. > > > > > One question about "earliest_local": if we fetch this offset from the > > server, and seek it later, and start fetching, won't there be a race > > condition? the server would just need to reload it from the tier. > > > > Yes, your understanding is correct. the server will read from the tier > storage. > > Anyhow, I will consider dropping earliest_local based on Jun's > suggestion. > > > > > For Kafka Streams, just adding the new option so the existing enum does > > not work IMHO. If a user would use BY_DURATION, there is no option to > pass > > in the actual duration (and we might need to throw an exception). > > > > I was assuming we will be using the underlying consumer config here. I > > will update based on your suggestions. > > > > > > Thanks, > > > > > > > > > > On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote: > > > > > > Thanks for updating the KIP. > > > > > > I am personally not convinced that adding a new config > > > `auto.offset.reset.by.duration` is the best way. Kafka in general has > > > way too many configs and trying to avoid adding more configs seems to > be > > > desirable? -- It seems this might be a point of contention, and if the > > > majority of people wants this new config so be it. I just wanted to > > > stress my concerns about it. > > > > > > One question about "earliest_local": if we fetch this offset from the > > > server, and seek to it later, and start fetching, won't there be a race > > > condition? The oldest segment could have been discarded locally in the > > > mean time? Maybe not too much of a big deal: the server would just need > > > to reload it from the tier. Just want to ensure my understanding is > > > correct, and make sure nobody has concerns about it. > > > > > > For Kafka Streams, just adding the new option so the existing enum does > > > not work IMHO. > > > > > > For example, we have a method: > > > > > > addSource(final AutoOffsetReset offsetReset, > > > final String name, > > > final String... topics) > > > > > > If a user would use BY_DURATION, there is no option to pass in the > > > actual duration (and we might need to throw an exception). And if we > > > overload `addSource` adding a fourth parameter, it would be odd too, as > > > users could pass in `EARLIEST` plus a duration, and we would need to > > > throw an exception or ignore the given duration. Overall not an easy to > > > use API. > > > > > > Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum, > > > and add a new class `AutoOffsetReset` for which we can guide users > > > accordingly avoiding corner cases: > > > > > > public class AutoOffsetReset() { > > > public static AutoOffsetReset earliest(); > > > public static AutoOffsetReset latest(); > > > public static AutoOffsetReset byDuration(Duration duration); > > > public static AutoOffsetReset earliestLocal(); > > > } > > > > > > The existing enum is contained inside `Topology` so we can add the new > > > class just for package `org.apache.kafka.streams` to avoid naming > conflicts. > > > > > > Additionally, we would deprecate all methods taking existing enum > > > `AutoOffsetReset` (only two classed `Topology` and `Consumed` are > > > affected) and add new overload which accepts the new `class > > > AutoOffsetReset` instead. > > > > > > > > > -Matthias > > > > > > On 11/7/24 1:57 PM, Jun Rao wrote: > > > > Hi, Manikumar, > > > > > > > > Thanks for the KIP. A couple of comments. > > > > > > > > JR1. It doesn't seem that we need earliest_local. Intuitively, it > makes > > > > sense for a consumer app to bootstrap based on time. The time that a > topic > > > > is kept locally seems irrelevant to a consumer application. > > > > > > > > JR2. I took a look at the proposal of KIP-842. It seems that this KIP > > > > covers the common scenario in KIP-842 and is simpler. The scenario > > > > described in KIP-842 is about which initial offset to use when a new > > > > partition is added. This is more or less the same problem for > choosing the > > > > initial offset for existing partitions. Having a time based option > provides > > > > more flexibility than earliest or latest, and is likely good enough > for > > > > most usage. > > > > > > > > Jun > > > > > > > > > > > > On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal < > apoorvmitta...@gmail.com> > > > > wrote: > > > > > > > >> Thanks Manikumar and Andrew. > > > >> > > > >> For AM6: Though I was thinking of enhancing OffsetFetcher and adding > > > >> "seekToLocalBeginning" where OffsetFetcher does use > Admin.listOffsets API > > > >> itself. However I agree that this is just another convenient way > and not > > > >> necessary to support functionality. Skipping it sounds good to me. > > > >> > > > >> Regards, > > > >> Apoorv Mittal > > > >> > > > >> > > > >> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield < > > > >> andrew_schofield_j...@outlook.com> wrote: > > > >> > > > >>> Hi Manikumar, > > > >>> > > > >>> AS7: In response to AM6, I agree that in general the combination > > > >>> of offsetsForTimes() and seek() will do what is required. The one > case > > > >> that > > > >>> I think is more tricky is seeking to the local earliest/beginning. > > > >>> > > > >>> The Consumer API has Consumer.beginningOffsets and > > > >>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST. > > > >>> Similarly there are methods for OffsetResetStrategy.LATEST. > > > >>> > > > >>> With the Consumer API, I don't think you can find the local > earliest > > > >>> offset. > > > >>> That can be found using Admin.listOffsets with > OffsetSpec.EarliestLocal. > > > >>> We could add equivalents of Consumer.beginningOffsets and > > > >>> Consumer.seekToBeginning. An application could always use the > > > >>> Admin API with the Consumer API. > > > >>> > > > >>> Personally, I would not complicate this KIP with these new methods > when > > > >>> the focus is more on duration-based offset reset and configs. > > > >>> > > > >>> Thanks, > > > >>> Andrew > > > >>> ________________________________________ > > > >>> From: Manikumar <manikumar.re...@gmail.com> > > > >>> Sent: 06 November 2024 04:20 > > > >>> To: dev@kafka.apache.org <dev@kafka.apache.org> > > > >>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset > option > > > >>> for consumer clients > > > >>> > > > >>> Hi Apoorv, > > > >>> > > > >>> Thanks for the review. > > > >>> > > > >>> AM5: Thanks, Updated the KIP. > > > >>> > > > >>> AM6: Currently we can achieve this with the combination of > > > >>> offsetsForTimes() and seek() API.. > > > >>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition, > > > >>> Long> timestampsToSearch) if required. > > > >>> Let's see what others think. > > > >>> > > > >>> Thanks, > > > >>> > > > >>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal < > apoorvmitta...@gmail.com> > > > >>> wrote: > > > >>>> > > > >>>> Hi Manikumar, > > > >>>> Thanks for the changes. Just minor comment and a question: > > > >>>> > > > >>>> AM5: The description "How to initialize the share-partition start > > > >>> offset:" > > > >>>> for "*share.auto.offset.reset*" seems incomplete as it ends with > `:`. > > > >>>> Should we write the details as defined in KIP-932 i.e. > > > >>>> > > > >>>> How to initialize the share-partition start offset: > > > >>>> > > > >>>> - > > > >>>> > > > >>>> "earliest" : automatically reset the offset to the earliest > offset > > > >>>> - > > > >>>> > > > >>>> "latest" : automatically reset the offset to the latest offset > > > >>>> - > > > >>>> > > > >>>> "*earliest_local": *automatically resets the offset to the > earliest > > > >>>> message stored in the local log on the broker. > > > >>>> - > > > >>>> > > > >>>> "*by_duration": *automatically resets the offset to the > earliest > > > >>> offset > > > >>>> whose timestamp is greater than or equal to the configured > duration > > > >>>> * (auto.offset.reset.by.duration).* > > > >>>> > > > >>>> > > > >>>> *or* maybe just replace ":" with "." i.e. "How to initialize the > > > >>>> share-partition start offset." as the details of the new configs > are > > > >>>> defined at top in different sections as well. > > > >>>> > > > >>>> AM6: Question: KafkaConsumer has public methods defined to "seek" > for > > > >> the > > > >>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses > > > >>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you > think > > > >> it > > > >>>> would be sensible to add additional KafkaConsumer APIs for newly > > > >>> introduced > > > >>>> strategies? > > > >>>> > > > >>>> Regards, > > > >>>> Apoorv Mittal > > > >>>> > > > >>>> > > > >>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar < > manikumar.re...@gmail.com> > > > >>> wrote: > > > >>>> > > > >>>>> Hi Matthias, > > > >>>>> > > > >>>>> Thanks for the review. > > > >>>>> > > > >>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes > > > >> should > > > >>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is > > > >>>>> proposing to add new configs (auto.offset.reset.on.no > .initial.offset > > > >>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive > KIP-842 > > > >>>>> after completing this KIP. > > > >>>>> > > > >>>>> MS2: Added support for earliest-local config to reset to oldest > local > > > >>>>> offset and changes to kafka-consumer-group cmd. > > > >>>>> > > > >>>>> MS3: Based on suggestions, I have updated the config option to > > > >> support > > > >>>>> ISO format. > > > >>>>> > > > >>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I > > > >>>>> definitely need your help on refining the API/solution for KS :) > > > >>>>> > > > >>>>> Thanks > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org > > > > > >>> wrote: > > > >>>>>> > > > >>>>>> Thanks for the KIP. > > > >>>>>> > > > >>>>>> A somewhat orthogonal question I have is, if we should try to > merge > > > >>> this > > > >>>>>> KIP with the existing KIP-842: > > > >>>>>> > > > >>>>> > > > >>> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms > > > >>>>>> > > > >>>>>> While KIP-842 has a different intention, ie, separating the > case of > > > >>> "no > > > >>>>>> offset during initial startup" vs "offset out of bounds during > > > >>>>>> processing", it might still be worth it? My motivation for this > > > >>> question > > > >>>>>> is, that `auto.offset.reset` is one of the most central configs > we > > > >>> have, > > > >>>>>> and thus, doing a single larger change (ie, both KIPs together) > > > >>> might be > > > >>>>>> better (less noise) than doing two independent changes? > Thoughts? I > > > >>> know > > > >>>>>> it might be a little bit of a stretch, but asking cannot hurt :) > > > >>>>>> > > > >>>>>> > > > >>>>>> About the KIP itself, did we consider to add something like > > > >>>>>> "latest-local" to say reset to oldest local, but not fetch from > > > >>> tiered > > > >>>>>> storage? > > > >>>>>> > > > >>>>>> For the format of the config, did we consider what we do in > > > >>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options > > > >> like > > > >>>>>> > > > >>>>>> --by-duration <duration: format `PnDTnHnMnS`> > > > >>>>>> --shift-by <number-of-records> > > > >>>>>> > > > >>>>>> --to-datetime <fixed point in time: format > > > >>> `YYYY-MM-DDTHH:mm:SS.sss`> > > > >>>>>> --to-offset <absolute offset> > > > >>>>>> > > > >>>>>> As it also uses ISO format, it might be good to also use them > (even > > > >>> if I > > > >>>>>> can see the appeal and simplicity of what you proposed). > > > >>>>>> > > > >>>>>> > > > >>>>>> I also want to add to (AM1), as there is also > > > >>> `Topology.AutoOffsetReset` > > > >>>>>> enum in Kafka Streams. I think we would need to convert this > into a > > > >>>>>> class. While it could be done is a follow up KIP, too, it seems > > > >> best > > > >>> to > > > >>>>>> do this holistically in a single KIP, because KS is not > something > > > >>> on-top > > > >>>>>> of Kafka, but it's part of Kafka. I am happy to help with the > > > >> design > > > >>> and > > > >>>>>> even PRs if necessary, but would strongly prefer to do it all > in a > > > >>>>>> single KIP. > > > >>>>>> > > > >>>>>> > > > >>>>>> Btw: if we add something like "latest-local", it might also be > good > > > >>> to > > > >>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the > tool > > > >>> works > > > >>>>>> slightly different, as it does commit an offset and there could > be > > > >>> some > > > >>>>>> race condition between committing "latest-local", tiering, and > when > > > >>> the > > > >>>>>> consumer is actually started? > > > >>>>>> > > > >>>>>> > > > >>>>>> -Matthias > > > >>>>>> > > > >>>>>> > > > >>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote: > > > >>>>>>> Hi Manikumar, > > > >>>>>>> Thanks for the KIP, this new strategy would be helpful in > > > >>> specifying > > > >>>>> fetch > > > >>>>>>> behaviour. > > > >>>>>>> > > > >>>>>>> AM1: The config `auto.offset.reset` is currently applied as per > > > >> the > > > >>>>> enum > > > >>>>>>> class OffsetResetStarategy which is part of kafka-clients > javadoc > > > >>>>>>> < > > > >>>>> > > > >>> > https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html > > > >>> . > > > >>>>>>> Are we also proposing to somehow add new definitions in the > same > > > >>> class? > > > >>>>>>> However as new configurations will be a string representation > > > >> hence > > > >>>>> are we > > > >>>>>>> moving away from OffsetResetStartegy enum class altogether? > > > >> Should > > > >>> we > > > >>>>>>> include the change in the KIP as OffsetResetStarategy is part > of > > > >>> public > > > >>>>>>> javadoc? > > > >>>>>>> > > > >>>>>>> AM2: While I can see the ISO-8601 format is in the rejected > > > >>>>> alternative, > > > >>>>>>> should we not follow some standard of defining duration which > has > > > >>>>> already > > > >>>>>>> been adopted in other systems? > > > >>>>>>> > > > >>>>>>> AM3: We've introduced new config values using the format > > > >>> minus-n-hours, > > > >>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we > > > >>> explicitly > > > >>>>>>> define the "minus" prefix, or is it implied? > > > >>>>>>> > > > >>>>>>> AM4: When supporting duration-based resets, should we also > allow > > > >>> users > > > >>>>> to > > > >>>>>>> specify a specific checkpoint time? For example, if a > checkpoint > > > >>>>> occurs 2 > > > >>>>>>> days, 5 hours and 30 minutes earlier, the current four formats > > > >>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) > > > >> might > > > >>> not > > > >>>>> be > > > >>>>>>> sufficient. Should we consider adding a format to accommodate > > > >>> specific > > > >>>>>>> checkpoint times, or is there a reason to limit the supported > > > >>> formats? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Regards, > > > >>>>>>> Apoorv Mittal > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar < > > > >>> manikumar.re...@gmail.com> > > > >>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi everyone, > > > >>>>>>>> I would like to start a discussion on KIP-1106: > > > >>>>>>>> > > > >>>>>>>> > > > >>>>> > > > >>> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients > > > >>>>>>>> > > > >>>>>>>> This KIP proposes to add an additional auto offset reset > > > >> strategy > > > >>> for > > > >>>>>>>> consumer clients. > > > >>>>>>>> > > > >>>>>>>> Regards, > > > >>>>>>>> Manikumar > > > >>>>>>>> > > > >>>>>>> > > > >>>>> > > > >>> > > > >> > > > > >