Hi Matthias,

Thanks for the explanation.

> I am wondering if AutoOffsetReset should be an interface (we usually
just use classes)?

 I have converted `AutoOffsetReset` to a class now.


Thanks.
On Fri, Nov 8, 2024 at 11:41 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> >> I was assuming we will be using the underlying consumer config here. I
> >> will update based on your suggestions.
>
> Well, yes we can, be for KS we only use the underlying consumer config
> if there is not code base overwrite. The consumer config only serves as
> the default. It works similar to `default.api.timeout.ms` which might be
> overwritten on a per method basis.
>
> For such on code based overwrite, we need a good API.
>
> For Kafka Streams, it's in general more complicate. We can read from
> more than one input topic, and allow to set a different reset policy per
> topic. Thus, under the hood, KS often need to configure the  consumer
> with `auto.offset.reset=none` and catches the corresponding exception
> and does an explicit `seekToBeginning()` or `seekToEnd()` on a per topic
> basis.
>
>
> I am wondering if AutoOffsetReset should be an interface (we usually
> just use classes)? I would also not expose getters `name()` and
> `timestamp()` -- in KS, we usually use non-public sub-classed which add
> the necessary getters. Users don't need getters, so we try to hide them
> o keep the public API surface area as small as possible.
>
> Cf GroupedInternal which extends Grouped:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
>
> We don't have an interface as we don't need none, and GroupedInternal
> has the necessary getters. Of course, the KIP does not need to define
> `AutoOffsetResetInternal` but only the public / user-facing
> `AutoOffsetReset` class.
>
>
> -Matthias
>
> On 11/7/24 10:23 PM, Manikumar wrote:
> > Hi Matthias,
> >
> > Thanks for the comments.
> >
> >> I am personally not convinced that adding a new config
> > `auto.offset.reset.by.duration` is the best way.
> >
> > I think in this case, we felt it's better to add the strategy name to
> > auto.offset.reset config and add additional strategy specific configs.
> > This will be easy to understand and extend in future. I am open to
> > change if there are similar concerns.
> >
> >> One question about "earliest_local": if we fetch this offset from the
> >   server, and seek it later, and start fetching, won't there be a race
> >   condition?  the server would just need to reload it from the tier.
> >
> > Yes, your understanding is correct. the server will read from the tier 
> > storage.
> > Anyhow, I will consider dropping earliest_local based on Jun's suggestion.
> >
> >> For Kafka Streams, just adding the new option so the existing enum does
> >   not work IMHO. If a user would use BY_DURATION, there is no option to pass
> >   in the actual duration (and we might need to throw an exception).
> >
> > I was assuming we will be using the underlying consumer config here. I
> > will update based on your suggestions.
> >
> >
> > Thanks,
> >
> >
> >
> >
> > On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote:
> >>
> >> Thanks for updating the KIP.
> >>
> >> I am personally not convinced that adding a new config
> >> `auto.offset.reset.by.duration` is the best way. Kafka in general has
> >> way too many configs and trying to avoid adding more configs seems to be
> >> desirable?  -- It seems this might be a point of contention, and if the
> >> majority of people wants this new config so be it. I just wanted to
> >> stress my concerns about it.
> >>
> >> One question about "earliest_local": if we fetch this offset from the
> >> server, and seek to it later, and start fetching, won't there be a race
> >> condition? The oldest segment could have been discarded locally in the
> >> mean time? Maybe not too much of a big deal: the server would just need
> >> to reload it from the tier. Just want to ensure my understanding is
> >> correct, and make sure nobody has concerns about it.
> >>
> >> For Kafka Streams, just adding the new option so the existing enum does
> >> not work IMHO.
> >>
> >> For example, we have a method:
> >>
> >>       addSource(final AutoOffsetReset offsetReset,
> >>                 final String name,
> >>                 final String... topics)
> >>
> >> If a user would use BY_DURATION, there is no option to pass in the
> >> actual duration (and we might need to throw an exception). And if we
> >> overload `addSource` adding a fourth parameter, it would be odd too, as
> >> users could pass in `EARLIEST` plus a duration, and we would need to
> >> throw an exception or ignore the given duration. Overall not an easy to
> >> use API.
> >>
> >> Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum,
> >> and add a new class `AutoOffsetReset` for which we can guide users
> >> accordingly avoiding corner cases:
> >>
> >> public class AutoOffsetReset() {
> >>     public static AutoOffsetReset earliest();
> >>     public static AutoOffsetReset latest();
> >>     public static AutoOffsetReset byDuration(Duration duration);
> >>     public static AutoOffsetReset earliestLocal();
> >> }
> >>
> >> The existing enum is contained inside `Topology` so we can add the new
> >> class just for package `org.apache.kafka.streams` to avoid naming 
> >> conflicts.
> >>
> >> Additionally, we would deprecate all methods taking existing enum
> >> `AutoOffsetReset` (only two classed `Topology` and `Consumed` are
> >> affected) and add new overload which accepts the new `class
> >> AutoOffsetReset` instead.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/7/24 1:57 PM, Jun Rao wrote:
> >>> Hi, Manikumar,
> >>>
> >>> Thanks for the KIP. A couple of comments.
> >>>
> >>> JR1. It doesn't seem that we need earliest_local. Intuitively, it makes
> >>> sense for a consumer app to bootstrap based on time. The time that a topic
> >>> is kept locally seems irrelevant to a consumer application.
> >>>
> >>> JR2. I took a look at the proposal of KIP-842. It seems that this KIP
> >>> covers the common scenario in KIP-842 and is simpler. The scenario
> >>> described in KIP-842 is about which initial offset to use when a new
> >>> partition is added. This is more or less the same problem for choosing the
> >>> initial offset for existing partitions. Having a time based option 
> >>> provides
> >>> more flexibility than earliest or latest, and is likely good enough for
> >>> most usage.
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> >>> wrote:
> >>>
> >>>> Thanks Manikumar and Andrew.
> >>>>
> >>>> For AM6: Though I was thinking of enhancing OffsetFetcher and adding
> >>>> "seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API
> >>>> itself. However I agree that this is just another convenient way and not
> >>>> necessary to support functionality. Skipping it sounds good to me.
> >>>>
> >>>> Regards,
> >>>> Apoorv Mittal
> >>>>
> >>>>
> >>>> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <
> >>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>
> >>>>> Hi Manikumar,
> >>>>>
> >>>>> AS7: In response to AM6, I agree that in general the combination
> >>>>> of offsetsForTimes() and seek() will do what is required. The one case
> >>>> that
> >>>>> I think is more tricky is seeking to the local earliest/beginning.
> >>>>>
> >>>>> The Consumer API has Consumer.beginningOffsets and
> >>>>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
> >>>>> Similarly there are methods for OffsetResetStrategy.LATEST.
> >>>>>
> >>>>> With the Consumer API, I don't think you can find the local earliest
> >>>>> offset.
> >>>>> That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
> >>>>> We could add equivalents of Consumer.beginningOffsets and
> >>>>> Consumer.seekToBeginning. An application could always use the
> >>>>> Admin API with the Consumer API.
> >>>>>
> >>>>> Personally, I would not complicate this KIP with these new methods when
> >>>>> the focus is more on duration-based offset reset and configs.
> >>>>>
> >>>>> Thanks,
> >>>>> Andrew
> >>>>> ________________________________________
> >>>>> From: Manikumar <manikumar.re...@gmail.com>
> >>>>> Sent: 06 November 2024 04:20
> >>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
> >>>>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option
> >>>>> for consumer clients
> >>>>>
> >>>>> Hi Apoorv,
> >>>>>
> >>>>> Thanks for the review.
> >>>>>
> >>>>> AM5: Thanks, Updated the  KIP.
> >>>>>
> >>>>> AM6: Currently we can achieve this with the combination of
> >>>>> offsetsForTimes() and seek() API..
> >>>>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
> >>>>> Long> timestampsToSearch) if required.
> >>>>> Let's see what others think.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi Manikumar,
> >>>>>> Thanks for the changes. Just minor comment and a question:
> >>>>>>
> >>>>>> AM5: The description "How to initialize the share-partition start
> >>>>> offset:"
> >>>>>> for "*share.auto.offset.reset*" seems incomplete as it ends with `:`.
> >>>>>> Should we write the details as defined in KIP-932 i.e.
> >>>>>>
> >>>>>> How to initialize the share-partition start offset:
> >>>>>>
> >>>>>>      -
> >>>>>>
> >>>>>>      "earliest" : automatically reset the offset to the earliest offset
> >>>>>>      -
> >>>>>>
> >>>>>>      "latest" : automatically reset the offset to the latest offset
> >>>>>>      -
> >>>>>>
> >>>>>>      "*earliest_local": *automatically resets the offset to the 
> >>>>>> earliest
> >>>>>>      message stored in the local log on the broker.
> >>>>>>      -
> >>>>>>
> >>>>>>      "*by_duration": *automatically resets the offset to the earliest
> >>>>> offset
> >>>>>>      whose timestamp is greater than or equal to the configured 
> >>>>>> duration
> >>>>>>      * (auto.offset.reset.by.duration).*
> >>>>>>
> >>>>>>
> >>>>>> *or* maybe just replace ":" with "." i.e. "How to initialize the
> >>>>>> share-partition start offset." as the details of the new configs are
> >>>>>> defined at top in different sections as well.
> >>>>>>
> >>>>>> AM6: Question: KafkaConsumer has public methods defined to "seek" for
> >>>> the
> >>>>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses
> >>>>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think
> >>>> it
> >>>>>> would be sensible to add additional KafkaConsumer APIs for newly
> >>>>> introduced
> >>>>>> strategies?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Apoorv Mittal
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Thanks for the review.
> >>>>>>>
> >>>>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes
> >>>> should
> >>>>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is
> >>>>>>> proposing to add new configs (auto.offset.reset.on.no.initial.offset
> >>>>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
> >>>>>>> after completing this KIP.
> >>>>>>>
> >>>>>>> MS2: Added support for earliest-local config to reset to oldest local
> >>>>>>> offset and changes to kafka-consumer-group cmd.
> >>>>>>>
> >>>>>>> MS3: Based on suggestions, I have updated the config option to
> >>>> support
> >>>>>>> ISO format.
> >>>>>>>
> >>>>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I
> >>>>>>> definitely need your help on refining the API/solution for KS :)
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for the KIP.
> >>>>>>>>
> >>>>>>>> A somewhat orthogonal question I have is, if we should try to merge
> >>>>> this
> >>>>>>>> KIP with the existing KIP-842:
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms
> >>>>>>>>
> >>>>>>>> While KIP-842 has a different intention, ie, separating the case of
> >>>>> "no
> >>>>>>>> offset during initial startup" vs "offset out of bounds during
> >>>>>>>> processing", it might still be worth it? My motivation for this
> >>>>> question
> >>>>>>>> is, that `auto.offset.reset` is one of the most central configs we
> >>>>> have,
> >>>>>>>> and thus, doing a single larger change (ie, both KIPs together)
> >>>>> might be
> >>>>>>>> better (less noise) than doing two independent changes? Thoughts? I
> >>>>> know
> >>>>>>>> it might be a little bit of a stretch, but asking cannot hurt :)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> About the KIP itself, did we consider to add something like
> >>>>>>>> "latest-local" to say reset to oldest local, but not fetch from
> >>>>> tiered
> >>>>>>>> storage?
> >>>>>>>>
> >>>>>>>> For the format of the config, did we consider what we do in
> >>>>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options
> >>>> like
> >>>>>>>>
> >>>>>>>>      --by-duration <duration: format `PnDTnHnMnS`>
> >>>>>>>>      --shift-by <number-of-records>
> >>>>>>>>
> >>>>>>>>      --to-datetime <fixed point in time: format
> >>>>> `YYYY-MM-DDTHH:mm:SS.sss`>
> >>>>>>>>      --to-offset <absolute offset>
> >>>>>>>>
> >>>>>>>> As it also uses ISO format, it might be good to also use them (even
> >>>>> if I
> >>>>>>>> can see the appeal and simplicity of what you proposed).
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I also want to add to (AM1), as there is also
> >>>>> `Topology.AutoOffsetReset`
> >>>>>>>> enum in Kafka Streams. I think we would need to convert this into a
> >>>>>>>> class. While it could be done is a follow up KIP, too, it seems
> >>>> best
> >>>>> to
> >>>>>>>> do this holistically in a single KIP, because KS is not something
> >>>>> on-top
> >>>>>>>> of Kafka, but it's part of Kafka. I am happy to help with the
> >>>> design
> >>>>> and
> >>>>>>>> even PRs if necessary, but would strongly prefer to do it all in a
> >>>>>>>> single KIP.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Btw: if we add something like "latest-local", it might also be good
> >>>>> to
> >>>>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the tool
> >>>>> works
> >>>>>>>> slightly different, as it does commit an offset and there could be
> >>>>> some
> >>>>>>>> race condition between committing "latest-local", tiering, and when
> >>>>> the
> >>>>>>>> consumer is actually started?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote:
> >>>>>>>>> Hi Manikumar,
> >>>>>>>>> Thanks for the KIP, this new strategy would be helpful in
> >>>>> specifying
> >>>>>>> fetch
> >>>>>>>>> behaviour.
> >>>>>>>>>
> >>>>>>>>> AM1: The config `auto.offset.reset` is currently applied as per
> >>>> the
> >>>>>>> enum
> >>>>>>>>> class OffsetResetStarategy which is part of kafka-clients javadoc
> >>>>>>>>> <
> >>>>>>>
> >>>>> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html
> >>>>> .
> >>>>>>>>> Are we also proposing to somehow add new definitions in the same
> >>>>> class?
> >>>>>>>>> However as new configurations will be a string representation
> >>>> hence
> >>>>>>> are we
> >>>>>>>>> moving away from OffsetResetStartegy enum class altogether?
> >>>> Should
> >>>>> we
> >>>>>>>>> include the change in the KIP as OffsetResetStarategy is part of
> >>>>> public
> >>>>>>>>> javadoc?
> >>>>>>>>>
> >>>>>>>>> AM2: While I can see the ISO-8601 format is in the rejected
> >>>>>>> alternative,
> >>>>>>>>> should we not follow some standard of defining duration which has
> >>>>>>> already
> >>>>>>>>> been adopted in other systems?
> >>>>>>>>>
> >>>>>>>>> AM3: We've introduced new config values using the format
> >>>>> minus-n-hours,
> >>>>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we
> >>>>> explicitly
> >>>>>>>>> define the "minus" prefix, or is it implied?
> >>>>>>>>>
> >>>>>>>>> AM4: When supporting duration-based resets, should we also allow
> >>>>> users
> >>>>>>> to
> >>>>>>>>> specify a specific checkpoint time? For example, if a checkpoint
> >>>>>>> occurs 2
> >>>>>>>>> days, 5 hours and 30 minutes earlier, the current four formats
> >>>>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years)
> >>>> might
> >>>>> not
> >>>>>>> be
> >>>>>>>>> sufficient. Should we consider adding a format to accommodate
> >>>>> specific
> >>>>>>>>> checkpoint times, or is there a reason to limit the supported
> >>>>> formats?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Apoorv Mittal
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar <
> >>>>> manikumar.re...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi everyone,
> >>>>>>>>>> I would like to start a discussion on KIP-1106:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients
> >>>>>>>>>>
> >>>>>>>>>> This KIP proposes to add an additional auto offset reset
> >>>> strategy
> >>>>> for
> >>>>>>>>>> consumer clients.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Manikumar
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>

Reply via email to