Hi Matthias, Thanks for the explanation.
> I am wondering if AutoOffsetReset should be an interface (we usually just use classes)? I have converted `AutoOffsetReset` to a class now. Thanks. On Fri, Nov 8, 2024 at 11:41 PM Matthias J. Sax <mj...@apache.org> wrote: > > >> I was assuming we will be using the underlying consumer config here. I > >> will update based on your suggestions. > > Well, yes we can, be for KS we only use the underlying consumer config > if there is not code base overwrite. The consumer config only serves as > the default. It works similar to `default.api.timeout.ms` which might be > overwritten on a per method basis. > > For such on code based overwrite, we need a good API. > > For Kafka Streams, it's in general more complicate. We can read from > more than one input topic, and allow to set a different reset policy per > topic. Thus, under the hood, KS often need to configure the consumer > with `auto.offset.reset=none` and catches the corresponding exception > and does an explicit `seekToBeginning()` or `seekToEnd()` on a per topic > basis. > > > I am wondering if AutoOffsetReset should be an interface (we usually > just use classes)? I would also not expose getters `name()` and > `timestamp()` -- in KS, we usually use non-public sub-classed which add > the necessary getters. Users don't need getters, so we try to hide them > o keep the public API surface area as small as possible. > > Cf GroupedInternal which extends Grouped: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java > > We don't have an interface as we don't need none, and GroupedInternal > has the necessary getters. Of course, the KIP does not need to define > `AutoOffsetResetInternal` but only the public / user-facing > `AutoOffsetReset` class. > > > -Matthias > > On 11/7/24 10:23 PM, Manikumar wrote: > > Hi Matthias, > > > > Thanks for the comments. > > > >> I am personally not convinced that adding a new config > > `auto.offset.reset.by.duration` is the best way. > > > > I think in this case, we felt it's better to add the strategy name to > > auto.offset.reset config and add additional strategy specific configs. > > This will be easy to understand and extend in future. I am open to > > change if there are similar concerns. > > > >> One question about "earliest_local": if we fetch this offset from the > > server, and seek it later, and start fetching, won't there be a race > > condition? the server would just need to reload it from the tier. > > > > Yes, your understanding is correct. the server will read from the tier > > storage. > > Anyhow, I will consider dropping earliest_local based on Jun's suggestion. > > > >> For Kafka Streams, just adding the new option so the existing enum does > > not work IMHO. If a user would use BY_DURATION, there is no option to pass > > in the actual duration (and we might need to throw an exception). > > > > I was assuming we will be using the underlying consumer config here. I > > will update based on your suggestions. > > > > > > Thanks, > > > > > > > > > > On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote: > >> > >> Thanks for updating the KIP. > >> > >> I am personally not convinced that adding a new config > >> `auto.offset.reset.by.duration` is the best way. Kafka in general has > >> way too many configs and trying to avoid adding more configs seems to be > >> desirable? -- It seems this might be a point of contention, and if the > >> majority of people wants this new config so be it. I just wanted to > >> stress my concerns about it. > >> > >> One question about "earliest_local": if we fetch this offset from the > >> server, and seek to it later, and start fetching, won't there be a race > >> condition? The oldest segment could have been discarded locally in the > >> mean time? Maybe not too much of a big deal: the server would just need > >> to reload it from the tier. Just want to ensure my understanding is > >> correct, and make sure nobody has concerns about it. > >> > >> For Kafka Streams, just adding the new option so the existing enum does > >> not work IMHO. > >> > >> For example, we have a method: > >> > >> addSource(final AutoOffsetReset offsetReset, > >> final String name, > >> final String... topics) > >> > >> If a user would use BY_DURATION, there is no option to pass in the > >> actual duration (and we might need to throw an exception). And if we > >> overload `addSource` adding a fourth parameter, it would be odd too, as > >> users could pass in `EARLIEST` plus a duration, and we would need to > >> throw an exception or ignore the given duration. Overall not an easy to > >> use API. > >> > >> Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum, > >> and add a new class `AutoOffsetReset` for which we can guide users > >> accordingly avoiding corner cases: > >> > >> public class AutoOffsetReset() { > >> public static AutoOffsetReset earliest(); > >> public static AutoOffsetReset latest(); > >> public static AutoOffsetReset byDuration(Duration duration); > >> public static AutoOffsetReset earliestLocal(); > >> } > >> > >> The existing enum is contained inside `Topology` so we can add the new > >> class just for package `org.apache.kafka.streams` to avoid naming > >> conflicts. > >> > >> Additionally, we would deprecate all methods taking existing enum > >> `AutoOffsetReset` (only two classed `Topology` and `Consumed` are > >> affected) and add new overload which accepts the new `class > >> AutoOffsetReset` instead. > >> > >> > >> -Matthias > >> > >> On 11/7/24 1:57 PM, Jun Rao wrote: > >>> Hi, Manikumar, > >>> > >>> Thanks for the KIP. A couple of comments. > >>> > >>> JR1. It doesn't seem that we need earliest_local. Intuitively, it makes > >>> sense for a consumer app to bootstrap based on time. The time that a topic > >>> is kept locally seems irrelevant to a consumer application. > >>> > >>> JR2. I took a look at the proposal of KIP-842. It seems that this KIP > >>> covers the common scenario in KIP-842 and is simpler. The scenario > >>> described in KIP-842 is about which initial offset to use when a new > >>> partition is added. This is more or less the same problem for choosing the > >>> initial offset for existing partitions. Having a time based option > >>> provides > >>> more flexibility than earliest or latest, and is likely good enough for > >>> most usage. > >>> > >>> Jun > >>> > >>> > >>> On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com> > >>> wrote: > >>> > >>>> Thanks Manikumar and Andrew. > >>>> > >>>> For AM6: Though I was thinking of enhancing OffsetFetcher and adding > >>>> "seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API > >>>> itself. However I agree that this is just another convenient way and not > >>>> necessary to support functionality. Skipping it sounds good to me. > >>>> > >>>> Regards, > >>>> Apoorv Mittal > >>>> > >>>> > >>>> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield < > >>>> andrew_schofield_j...@outlook.com> wrote: > >>>> > >>>>> Hi Manikumar, > >>>>> > >>>>> AS7: In response to AM6, I agree that in general the combination > >>>>> of offsetsForTimes() and seek() will do what is required. The one case > >>>> that > >>>>> I think is more tricky is seeking to the local earliest/beginning. > >>>>> > >>>>> The Consumer API has Consumer.beginningOffsets and > >>>>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST. > >>>>> Similarly there are methods for OffsetResetStrategy.LATEST. > >>>>> > >>>>> With the Consumer API, I don't think you can find the local earliest > >>>>> offset. > >>>>> That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal. > >>>>> We could add equivalents of Consumer.beginningOffsets and > >>>>> Consumer.seekToBeginning. An application could always use the > >>>>> Admin API with the Consumer API. > >>>>> > >>>>> Personally, I would not complicate this KIP with these new methods when > >>>>> the focus is more on duration-based offset reset and configs. > >>>>> > >>>>> Thanks, > >>>>> Andrew > >>>>> ________________________________________ > >>>>> From: Manikumar <manikumar.re...@gmail.com> > >>>>> Sent: 06 November 2024 04:20 > >>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> > >>>>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option > >>>>> for consumer clients > >>>>> > >>>>> Hi Apoorv, > >>>>> > >>>>> Thanks for the review. > >>>>> > >>>>> AM5: Thanks, Updated the KIP. > >>>>> > >>>>> AM6: Currently we can achieve this with the combination of > >>>>> offsetsForTimes() and seek() API.. > >>>>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition, > >>>>> Long> timestampsToSearch) if required. > >>>>> Let's see what others think. > >>>>> > >>>>> Thanks, > >>>>> > >>>>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>> Hi Manikumar, > >>>>>> Thanks for the changes. Just minor comment and a question: > >>>>>> > >>>>>> AM5: The description "How to initialize the share-partition start > >>>>> offset:" > >>>>>> for "*share.auto.offset.reset*" seems incomplete as it ends with `:`. > >>>>>> Should we write the details as defined in KIP-932 i.e. > >>>>>> > >>>>>> How to initialize the share-partition start offset: > >>>>>> > >>>>>> - > >>>>>> > >>>>>> "earliest" : automatically reset the offset to the earliest offset > >>>>>> - > >>>>>> > >>>>>> "latest" : automatically reset the offset to the latest offset > >>>>>> - > >>>>>> > >>>>>> "*earliest_local": *automatically resets the offset to the > >>>>>> earliest > >>>>>> message stored in the local log on the broker. > >>>>>> - > >>>>>> > >>>>>> "*by_duration": *automatically resets the offset to the earliest > >>>>> offset > >>>>>> whose timestamp is greater than or equal to the configured > >>>>>> duration > >>>>>> * (auto.offset.reset.by.duration).* > >>>>>> > >>>>>> > >>>>>> *or* maybe just replace ":" with "." i.e. "How to initialize the > >>>>>> share-partition start offset." as the details of the new configs are > >>>>>> defined at top in different sections as well. > >>>>>> > >>>>>> AM6: Question: KafkaConsumer has public methods defined to "seek" for > >>>> the > >>>>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses > >>>>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think > >>>> it > >>>>>> would be sensible to add additional KafkaConsumer APIs for newly > >>>>> introduced > >>>>>> strategies? > >>>>>> > >>>>>> Regards, > >>>>>> Apoorv Mittal > >>>>>> > >>>>>> > >>>>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> Thanks for the review. > >>>>>>> > >>>>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes > >>>> should > >>>>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is > >>>>>>> proposing to add new configs (auto.offset.reset.on.no.initial.offset > >>>>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842 > >>>>>>> after completing this KIP. > >>>>>>> > >>>>>>> MS2: Added support for earliest-local config to reset to oldest local > >>>>>>> offset and changes to kafka-consumer-group cmd. > >>>>>>> > >>>>>>> MS3: Based on suggestions, I have updated the config option to > >>>> support > >>>>>>> ISO format. > >>>>>>> > >>>>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I > >>>>>>> definitely need your help on refining the API/solution for KS :) > >>>>>>> > >>>>>>> Thanks > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org> > >>>>> wrote: > >>>>>>>> > >>>>>>>> Thanks for the KIP. > >>>>>>>> > >>>>>>>> A somewhat orthogonal question I have is, if we should try to merge > >>>>> this > >>>>>>>> KIP with the existing KIP-842: > >>>>>>>> > >>>>>>> > >>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms > >>>>>>>> > >>>>>>>> While KIP-842 has a different intention, ie, separating the case of > >>>>> "no > >>>>>>>> offset during initial startup" vs "offset out of bounds during > >>>>>>>> processing", it might still be worth it? My motivation for this > >>>>> question > >>>>>>>> is, that `auto.offset.reset` is one of the most central configs we > >>>>> have, > >>>>>>>> and thus, doing a single larger change (ie, both KIPs together) > >>>>> might be > >>>>>>>> better (less noise) than doing two independent changes? Thoughts? I > >>>>> know > >>>>>>>> it might be a little bit of a stretch, but asking cannot hurt :) > >>>>>>>> > >>>>>>>> > >>>>>>>> About the KIP itself, did we consider to add something like > >>>>>>>> "latest-local" to say reset to oldest local, but not fetch from > >>>>> tiered > >>>>>>>> storage? > >>>>>>>> > >>>>>>>> For the format of the config, did we consider what we do in > >>>>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options > >>>> like > >>>>>>>> > >>>>>>>> --by-duration <duration: format `PnDTnHnMnS`> > >>>>>>>> --shift-by <number-of-records> > >>>>>>>> > >>>>>>>> --to-datetime <fixed point in time: format > >>>>> `YYYY-MM-DDTHH:mm:SS.sss`> > >>>>>>>> --to-offset <absolute offset> > >>>>>>>> > >>>>>>>> As it also uses ISO format, it might be good to also use them (even > >>>>> if I > >>>>>>>> can see the appeal and simplicity of what you proposed). > >>>>>>>> > >>>>>>>> > >>>>>>>> I also want to add to (AM1), as there is also > >>>>> `Topology.AutoOffsetReset` > >>>>>>>> enum in Kafka Streams. I think we would need to convert this into a > >>>>>>>> class. While it could be done is a follow up KIP, too, it seems > >>>> best > >>>>> to > >>>>>>>> do this holistically in a single KIP, because KS is not something > >>>>> on-top > >>>>>>>> of Kafka, but it's part of Kafka. I am happy to help with the > >>>> design > >>>>> and > >>>>>>>> even PRs if necessary, but would strongly prefer to do it all in a > >>>>>>>> single KIP. > >>>>>>>> > >>>>>>>> > >>>>>>>> Btw: if we add something like "latest-local", it might also be good > >>>>> to > >>>>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the tool > >>>>> works > >>>>>>>> slightly different, as it does commit an offset and there could be > >>>>> some > >>>>>>>> race condition between committing "latest-local", tiering, and when > >>>>> the > >>>>>>>> consumer is actually started? > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote: > >>>>>>>>> Hi Manikumar, > >>>>>>>>> Thanks for the KIP, this new strategy would be helpful in > >>>>> specifying > >>>>>>> fetch > >>>>>>>>> behaviour. > >>>>>>>>> > >>>>>>>>> AM1: The config `auto.offset.reset` is currently applied as per > >>>> the > >>>>>>> enum > >>>>>>>>> class OffsetResetStarategy which is part of kafka-clients javadoc > >>>>>>>>> < > >>>>>>> > >>>>> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html > >>>>> . > >>>>>>>>> Are we also proposing to somehow add new definitions in the same > >>>>> class? > >>>>>>>>> However as new configurations will be a string representation > >>>> hence > >>>>>>> are we > >>>>>>>>> moving away from OffsetResetStartegy enum class altogether? > >>>> Should > >>>>> we > >>>>>>>>> include the change in the KIP as OffsetResetStarategy is part of > >>>>> public > >>>>>>>>> javadoc? > >>>>>>>>> > >>>>>>>>> AM2: While I can see the ISO-8601 format is in the rejected > >>>>>>> alternative, > >>>>>>>>> should we not follow some standard of defining duration which has > >>>>>>> already > >>>>>>>>> been adopted in other systems? > >>>>>>>>> > >>>>>>>>> AM3: We've introduced new config values using the format > >>>>> minus-n-hours, > >>>>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we > >>>>> explicitly > >>>>>>>>> define the "minus" prefix, or is it implied? > >>>>>>>>> > >>>>>>>>> AM4: When supporting duration-based resets, should we also allow > >>>>> users > >>>>>>> to > >>>>>>>>> specify a specific checkpoint time? For example, if a checkpoint > >>>>>>> occurs 2 > >>>>>>>>> days, 5 hours and 30 minutes earlier, the current four formats > >>>>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) > >>>> might > >>>>> not > >>>>>>> be > >>>>>>>>> sufficient. Should we consider adding a format to accommodate > >>>>> specific > >>>>>>>>> checkpoint times, or is there a reason to limit the supported > >>>>> formats? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Apoorv Mittal > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar < > >>>>> manikumar.re...@gmail.com> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi everyone, > >>>>>>>>>> I would like to start a discussion on KIP-1106: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients > >>>>>>>>>> > >>>>>>>>>> This KIP proposes to add an additional auto offset reset > >>>> strategy > >>>>> for > >>>>>>>>>> consumer clients. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Manikumar > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>>