Hi All,

I have updated the KIP with recent suggestions.

Thanks.

On Fri, Nov 8, 2024 at 11:53 AM Manikumar <manikumar.re...@gmail.com> wrote:
>
> Hi Matthias,
>
> Thanks for the comments.
>
> > I am personally not convinced that adding a new config
> `auto.offset.reset.by.duration` is the best way.
>
> I think in this case, we felt it's better to add the strategy name to
> auto.offset.reset config and add additional strategy specific configs.
> This will be easy to understand and extend in future. I am open to
> change if there are similar concerns.
>
> > One question about "earliest_local": if we fetch this offset from the
>  server, and seek it later, and start fetching, won't there be a race
>  condition?  the server would just need to reload it from the tier.
>
> Yes, your understanding is correct. the server will read from the tier 
> storage.
> Anyhow, I will consider dropping earliest_local based on Jun's suggestion.
>
> > For Kafka Streams, just adding the new option so the existing enum does
>  not work IMHO. If a user would use BY_DURATION, there is no option to pass
>  in the actual duration (and we might need to throw an exception).
>
> I was assuming we will be using the underlying consumer config here. I
> will update based on your suggestions.
>
>
> Thanks,
>
>
>
>
> On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote:
> >
> > Thanks for updating the KIP.
> >
> > I am personally not convinced that adding a new config
> > `auto.offset.reset.by.duration` is the best way. Kafka in general has
> > way too many configs and trying to avoid adding more configs seems to be
> > desirable?  -- It seems this might be a point of contention, and if the
> > majority of people wants this new config so be it. I just wanted to
> > stress my concerns about it.
> >
> > One question about "earliest_local": if we fetch this offset from the
> > server, and seek to it later, and start fetching, won't there be a race
> > condition? The oldest segment could have been discarded locally in the
> > mean time? Maybe not too much of a big deal: the server would just need
> > to reload it from the tier. Just want to ensure my understanding is
> > correct, and make sure nobody has concerns about it.
> >
> > For Kafka Streams, just adding the new option so the existing enum does
> > not work IMHO.
> >
> > For example, we have a method:
> >
> >      addSource(final AutoOffsetReset offsetReset,
> >                final String name,
> >                final String... topics)
> >
> > If a user would use BY_DURATION, there is no option to pass in the
> > actual duration (and we might need to throw an exception). And if we
> > overload `addSource` adding a fourth parameter, it would be odd too, as
> > users could pass in `EARLIEST` plus a duration, and we would need to
> > throw an exception or ignore the given duration. Overall not an easy to
> > use API.
> >
> > Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum,
> > and add a new class `AutoOffsetReset` for which we can guide users
> > accordingly avoiding corner cases:
> >
> > public class AutoOffsetReset() {
> >    public static AutoOffsetReset earliest();
> >    public static AutoOffsetReset latest();
> >    public static AutoOffsetReset byDuration(Duration duration);
> >    public static AutoOffsetReset earliestLocal();
> > }
> >
> > The existing enum is contained inside `Topology` so we can add the new
> > class just for package `org.apache.kafka.streams` to avoid naming conflicts.
> >
> > Additionally, we would deprecate all methods taking existing enum
> > `AutoOffsetReset` (only two classed `Topology` and `Consumed` are
> > affected) and add new overload which accepts the new `class
> > AutoOffsetReset` instead.
> >
> >
> > -Matthias
> >
> > On 11/7/24 1:57 PM, Jun Rao wrote:
> > > Hi, Manikumar,
> > >
> > > Thanks for the KIP. A couple of comments.
> > >
> > > JR1. It doesn't seem that we need earliest_local. Intuitively, it makes
> > > sense for a consumer app to bootstrap based on time. The time that a topic
> > > is kept locally seems irrelevant to a consumer application.
> > >
> > > JR2. I took a look at the proposal of KIP-842. It seems that this KIP
> > > covers the common scenario in KIP-842 and is simpler. The scenario
> > > described in KIP-842 is about which initial offset to use when a new
> > > partition is added. This is more or less the same problem for choosing the
> > > initial offset for existing partitions. Having a time based option 
> > > provides
> > > more flexibility than earliest or latest, and is likely good enough for
> > > most usage.
> > >
> > > Jun
> > >
> > >
> > > On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> > > wrote:
> > >
> > >> Thanks Manikumar and Andrew.
> > >>
> > >> For AM6: Though I was thinking of enhancing OffsetFetcher and adding
> > >> "seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API
> > >> itself. However I agree that this is just another convenient way and not
> > >> necessary to support functionality. Skipping it sounds good to me.
> > >>
> > >> Regards,
> > >> Apoorv Mittal
> > >>
> > >>
> > >> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <
> > >> andrew_schofield_j...@outlook.com> wrote:
> > >>
> > >>> Hi Manikumar,
> > >>>
> > >>> AS7: In response to AM6, I agree that in general the combination
> > >>> of offsetsForTimes() and seek() will do what is required. The one case
> > >> that
> > >>> I think is more tricky is seeking to the local earliest/beginning.
> > >>>
> > >>> The Consumer API has Consumer.beginningOffsets and
> > >>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
> > >>> Similarly there are methods for OffsetResetStrategy.LATEST.
> > >>>
> > >>> With the Consumer API, I don't think you can find the local earliest
> > >>> offset.
> > >>> That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
> > >>> We could add equivalents of Consumer.beginningOffsets and
> > >>> Consumer.seekToBeginning. An application could always use the
> > >>> Admin API with the Consumer API.
> > >>>
> > >>> Personally, I would not complicate this KIP with these new methods when
> > >>> the focus is more on duration-based offset reset and configs.
> > >>>
> > >>> Thanks,
> > >>> Andrew
> > >>> ________________________________________
> > >>> From: Manikumar <manikumar.re...@gmail.com>
> > >>> Sent: 06 November 2024 04:20
> > >>> To: dev@kafka.apache.org <dev@kafka.apache.org>
> > >>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option
> > >>> for consumer clients
> > >>>
> > >>> Hi Apoorv,
> > >>>
> > >>> Thanks for the review.
> > >>>
> > >>> AM5: Thanks, Updated the  KIP.
> > >>>
> > >>> AM6: Currently we can achieve this with the combination of
> > >>> offsetsForTimes() and seek() API..
> > >>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
> > >>> Long> timestampsToSearch) if required.
> > >>> Let's see what others think.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>> Hi Manikumar,
> > >>>> Thanks for the changes. Just minor comment and a question:
> > >>>>
> > >>>> AM5: The description "How to initialize the share-partition start
> > >>> offset:"
> > >>>> for "*share.auto.offset.reset*" seems incomplete as it ends with `:`.
> > >>>> Should we write the details as defined in KIP-932 i.e.
> > >>>>
> > >>>> How to initialize the share-partition start offset:
> > >>>>
> > >>>>     -
> > >>>>
> > >>>>     "earliest" : automatically reset the offset to the earliest offset
> > >>>>     -
> > >>>>
> > >>>>     "latest" : automatically reset the offset to the latest offset
> > >>>>     -
> > >>>>
> > >>>>     "*earliest_local": *automatically resets the offset to the earliest
> > >>>>     message stored in the local log on the broker.
> > >>>>     -
> > >>>>
> > >>>>     "*by_duration": *automatically resets the offset to the earliest
> > >>> offset
> > >>>>     whose timestamp is greater than or equal to the configured duration
> > >>>>     * (auto.offset.reset.by.duration).*
> > >>>>
> > >>>>
> > >>>> *or* maybe just replace ":" with "." i.e. "How to initialize the
> > >>>> share-partition start offset." as the details of the new configs are
> > >>>> defined at top in different sections as well.
> > >>>>
> > >>>> AM6: Question: KafkaConsumer has public methods defined to "seek" for
> > >> the
> > >>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses
> > >>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think
> > >> it
> > >>>> would be sensible to add additional KafkaConsumer APIs for newly
> > >>> introduced
> > >>>> strategies?
> > >>>>
> > >>>> Regards,
> > >>>> Apoorv Mittal
> > >>>>
> > >>>>
> > >>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>>> Hi Matthias,
> > >>>>>
> > >>>>> Thanks for the review.
> > >>>>>
> > >>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes
> > >> should
> > >>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is
> > >>>>> proposing to add new configs (auto.offset.reset.on.no.initial.offset
> > >>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
> > >>>>> after completing this KIP.
> > >>>>>
> > >>>>> MS2: Added support for earliest-local config to reset to oldest local
> > >>>>> offset and changes to kafka-consumer-group cmd.
> > >>>>>
> > >>>>> MS3: Based on suggestions, I have updated the config option to
> > >> support
> > >>>>> ISO format.
> > >>>>>
> > >>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I
> > >>>>> definitely need your help on refining the API/solution for KS :)
> > >>>>>
> > >>>>> Thanks
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org>
> > >>> wrote:
> > >>>>>>
> > >>>>>> Thanks for the KIP.
> > >>>>>>
> > >>>>>> A somewhat orthogonal question I have is, if we should try to merge
> > >>> this
> > >>>>>> KIP with the existing KIP-842:
> > >>>>>>
> > >>>>>
> > >>>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms
> > >>>>>>
> > >>>>>> While KIP-842 has a different intention, ie, separating the case of
> > >>> "no
> > >>>>>> offset during initial startup" vs "offset out of bounds during
> > >>>>>> processing", it might still be worth it? My motivation for this
> > >>> question
> > >>>>>> is, that `auto.offset.reset` is one of the most central configs we
> > >>> have,
> > >>>>>> and thus, doing a single larger change (ie, both KIPs together)
> > >>> might be
> > >>>>>> better (less noise) than doing two independent changes? Thoughts? I
> > >>> know
> > >>>>>> it might be a little bit of a stretch, but asking cannot hurt :)
> > >>>>>>
> > >>>>>>
> > >>>>>> About the KIP itself, did we consider to add something like
> > >>>>>> "latest-local" to say reset to oldest local, but not fetch from
> > >>> tiered
> > >>>>>> storage?
> > >>>>>>
> > >>>>>> For the format of the config, did we consider what we do in
> > >>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options
> > >> like
> > >>>>>>
> > >>>>>>     --by-duration <duration: format `PnDTnHnMnS`>
> > >>>>>>     --shift-by <number-of-records>
> > >>>>>>
> > >>>>>>     --to-datetime <fixed point in time: format
> > >>> `YYYY-MM-DDTHH:mm:SS.sss`>
> > >>>>>>     --to-offset <absolute offset>
> > >>>>>>
> > >>>>>> As it also uses ISO format, it might be good to also use them (even
> > >>> if I
> > >>>>>> can see the appeal and simplicity of what you proposed).
> > >>>>>>
> > >>>>>>
> > >>>>>> I also want to add to (AM1), as there is also
> > >>> `Topology.AutoOffsetReset`
> > >>>>>> enum in Kafka Streams. I think we would need to convert this into a
> > >>>>>> class. While it could be done is a follow up KIP, too, it seems
> > >> best
> > >>> to
> > >>>>>> do this holistically in a single KIP, because KS is not something
> > >>> on-top
> > >>>>>> of Kafka, but it's part of Kafka. I am happy to help with the
> > >> design
> > >>> and
> > >>>>>> even PRs if necessary, but would strongly prefer to do it all in a
> > >>>>>> single KIP.
> > >>>>>>
> > >>>>>>
> > >>>>>> Btw: if we add something like "latest-local", it might also be good
> > >>> to
> > >>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the tool
> > >>> works
> > >>>>>> slightly different, as it does commit an offset and there could be
> > >>> some
> > >>>>>> race condition between committing "latest-local", tiering, and when
> > >>> the
> > >>>>>> consumer is actually started?
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote:
> > >>>>>>> Hi Manikumar,
> > >>>>>>> Thanks for the KIP, this new strategy would be helpful in
> > >>> specifying
> > >>>>> fetch
> > >>>>>>> behaviour.
> > >>>>>>>
> > >>>>>>> AM1: The config `auto.offset.reset` is currently applied as per
> > >> the
> > >>>>> enum
> > >>>>>>> class OffsetResetStarategy which is part of kafka-clients javadoc
> > >>>>>>> <
> > >>>>>
> > >>> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html
> > >>> .
> > >>>>>>> Are we also proposing to somehow add new definitions in the same
> > >>> class?
> > >>>>>>> However as new configurations will be a string representation
> > >> hence
> > >>>>> are we
> > >>>>>>> moving away from OffsetResetStartegy enum class altogether?
> > >> Should
> > >>> we
> > >>>>>>> include the change in the KIP as OffsetResetStarategy is part of
> > >>> public
> > >>>>>>> javadoc?
> > >>>>>>>
> > >>>>>>> AM2: While I can see the ISO-8601 format is in the rejected
> > >>>>> alternative,
> > >>>>>>> should we not follow some standard of defining duration which has
> > >>>>> already
> > >>>>>>> been adopted in other systems?
> > >>>>>>>
> > >>>>>>> AM3: We've introduced new config values using the format
> > >>> minus-n-hours,
> > >>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we
> > >>> explicitly
> > >>>>>>> define the "minus" prefix, or is it implied?
> > >>>>>>>
> > >>>>>>> AM4: When supporting duration-based resets, should we also allow
> > >>> users
> > >>>>> to
> > >>>>>>> specify a specific checkpoint time? For example, if a checkpoint
> > >>>>> occurs 2
> > >>>>>>> days, 5 hours and 30 minutes earlier, the current four formats
> > >>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years)
> > >> might
> > >>> not
> > >>>>> be
> > >>>>>>> sufficient. Should we consider adding a format to accommodate
> > >>> specific
> > >>>>>>> checkpoint times, or is there a reason to limit the supported
> > >>> formats?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Apoorv Mittal
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar <
> > >>> manikumar.re...@gmail.com>
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi everyone,
> > >>>>>>>> I would like to start a discussion on KIP-1106:
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>
> > >>>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients
> > >>>>>>>>
> > >>>>>>>> This KIP proposes to add an additional auto offset reset
> > >> strategy
> > >>> for
> > >>>>>>>> consumer clients.
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>> Manikumar
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > >>
> > >

Reply via email to