Hi, ManiKumar,

Thanks for the updated PR. Another comment.

JR3. Is there any benefit of using ISO8601
for auto.offset.reset.by.duration instead of just ms in long? The latter
covers a broader range. Also, AutoOffsetResetByDuration uses Java During,
which covers a different granularity than ISO8601. It might be better to
just use ms for both.

Jun

On Fri, Nov 8, 2024 at 5:51 AM Manikumar <manikumar.re...@gmail.com> wrote:

> Hi All,
>
> I have updated the KIP with recent suggestions.
>
> Thanks.
>
> On Fri, Nov 8, 2024 at 11:53 AM Manikumar <manikumar.re...@gmail.com>
> wrote:
> >
> > Hi Matthias,
> >
> > Thanks for the comments.
> >
> > > I am personally not convinced that adding a new config
> > `auto.offset.reset.by.duration` is the best way.
> >
> > I think in this case, we felt it's better to add the strategy name to
> > auto.offset.reset config and add additional strategy specific configs.
> > This will be easy to understand and extend in future. I am open to
> > change if there are similar concerns.
> >
> > > One question about "earliest_local": if we fetch this offset from the
> >  server, and seek it later, and start fetching, won't there be a race
> >  condition?  the server would just need to reload it from the tier.
> >
> > Yes, your understanding is correct. the server will read from the tier
> storage.
> > Anyhow, I will consider dropping earliest_local based on Jun's
> suggestion.
> >
> > > For Kafka Streams, just adding the new option so the existing enum does
> >  not work IMHO. If a user would use BY_DURATION, there is no option to
> pass
> >  in the actual duration (and we might need to throw an exception).
> >
> > I was assuming we will be using the underlying consumer config here. I
> > will update based on your suggestions.
> >
> >
> > Thanks,
> >
> >
> >
> >
> > On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote:
> > >
> > > Thanks for updating the KIP.
> > >
> > > I am personally not convinced that adding a new config
> > > `auto.offset.reset.by.duration` is the best way. Kafka in general has
> > > way too many configs and trying to avoid adding more configs seems to
> be
> > > desirable?  -- It seems this might be a point of contention, and if the
> > > majority of people wants this new config so be it. I just wanted to
> > > stress my concerns about it.
> > >
> > > One question about "earliest_local": if we fetch this offset from the
> > > server, and seek to it later, and start fetching, won't there be a race
> > > condition? The oldest segment could have been discarded locally in the
> > > mean time? Maybe not too much of a big deal: the server would just need
> > > to reload it from the tier. Just want to ensure my understanding is
> > > correct, and make sure nobody has concerns about it.
> > >
> > > For Kafka Streams, just adding the new option so the existing enum does
> > > not work IMHO.
> > >
> > > For example, we have a method:
> > >
> > >      addSource(final AutoOffsetReset offsetReset,
> > >                final String name,
> > >                final String... topics)
> > >
> > > If a user would use BY_DURATION, there is no option to pass in the
> > > actual duration (and we might need to throw an exception). And if we
> > > overload `addSource` adding a fourth parameter, it would be odd too, as
> > > users could pass in `EARLIEST` plus a duration, and we would need to
> > > throw an exception or ignore the given duration. Overall not an easy to
> > > use API.
> > >
> > > Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum,
> > > and add a new class `AutoOffsetReset` for which we can guide users
> > > accordingly avoiding corner cases:
> > >
> > > public class AutoOffsetReset() {
> > >    public static AutoOffsetReset earliest();
> > >    public static AutoOffsetReset latest();
> > >    public static AutoOffsetReset byDuration(Duration duration);
> > >    public static AutoOffsetReset earliestLocal();
> > > }
> > >
> > > The existing enum is contained inside `Topology` so we can add the new
> > > class just for package `org.apache.kafka.streams` to avoid naming
> conflicts.
> > >
> > > Additionally, we would deprecate all methods taking existing enum
> > > `AutoOffsetReset` (only two classed `Topology` and `Consumed` are
> > > affected) and add new overload which accepts the new `class
> > > AutoOffsetReset` instead.
> > >
> > >
> > > -Matthias
> > >
> > > On 11/7/24 1:57 PM, Jun Rao wrote:
> > > > Hi, Manikumar,
> > > >
> > > > Thanks for the KIP. A couple of comments.
> > > >
> > > > JR1. It doesn't seem that we need earliest_local. Intuitively, it
> makes
> > > > sense for a consumer app to bootstrap based on time. The time that a
> topic
> > > > is kept locally seems irrelevant to a consumer application.
> > > >
> > > > JR2. I took a look at the proposal of KIP-842. It seems that this KIP
> > > > covers the common scenario in KIP-842 and is simpler. The scenario
> > > > described in KIP-842 is about which initial offset to use when a new
> > > > partition is added. This is more or less the same problem for
> choosing the
> > > > initial offset for existing partitions. Having a time based option
> provides
> > > > more flexibility than earliest or latest, and is likely good enough
> for
> > > > most usage.
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <
> apoorvmitta...@gmail.com>
> > > > wrote:
> > > >
> > > >> Thanks Manikumar and Andrew.
> > > >>
> > > >> For AM6: Though I was thinking of enhancing OffsetFetcher and adding
> > > >> "seekToLocalBeginning" where OffsetFetcher does use
> Admin.listOffsets API
> > > >> itself. However I agree that this is just another convenient way
> and not
> > > >> necessary to support functionality. Skipping it sounds good to me.
> > > >>
> > > >> Regards,
> > > >> Apoorv Mittal
> > > >>
> > > >>
> > > >> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <
> > > >> andrew_schofield_j...@outlook.com> wrote:
> > > >>
> > > >>> Hi Manikumar,
> > > >>>
> > > >>> AS7: In response to AM6, I agree that in general the combination
> > > >>> of offsetsForTimes() and seek() will do what is required. The one
> case
> > > >> that
> > > >>> I think is more tricky is seeking to the local earliest/beginning.
> > > >>>
> > > >>> The Consumer API has Consumer.beginningOffsets and
> > > >>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
> > > >>> Similarly there are methods for OffsetResetStrategy.LATEST.
> > > >>>
> > > >>> With the Consumer API, I don't think you can find the local
> earliest
> > > >>> offset.
> > > >>> That can be found using Admin.listOffsets with
> OffsetSpec.EarliestLocal.
> > > >>> We could add equivalents of Consumer.beginningOffsets and
> > > >>> Consumer.seekToBeginning. An application could always use the
> > > >>> Admin API with the Consumer API.
> > > >>>
> > > >>> Personally, I would not complicate this KIP with these new methods
> when
> > > >>> the focus is more on duration-based offset reset and configs.
> > > >>>
> > > >>> Thanks,
> > > >>> Andrew
> > > >>> ________________________________________
> > > >>> From: Manikumar <manikumar.re...@gmail.com>
> > > >>> Sent: 06 November 2024 04:20
> > > >>> To: dev@kafka.apache.org <dev@kafka.apache.org>
> > > >>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset
> option
> > > >>> for consumer clients
> > > >>>
> > > >>> Hi Apoorv,
> > > >>>
> > > >>> Thanks for the review.
> > > >>>
> > > >>> AM5: Thanks, Updated the  KIP.
> > > >>>
> > > >>> AM6: Currently we can achieve this with the combination of
> > > >>> offsetsForTimes() and seek() API..
> > > >>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
> > > >>> Long> timestampsToSearch) if required.
> > > >>> Let's see what others think.
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <
> apoorvmitta...@gmail.com>
> > > >>> wrote:
> > > >>>>
> > > >>>> Hi Manikumar,
> > > >>>> Thanks for the changes. Just minor comment and a question:
> > > >>>>
> > > >>>> AM5: The description "How to initialize the share-partition start
> > > >>> offset:"
> > > >>>> for "*share.auto.offset.reset*" seems incomplete as it ends with
> `:`.
> > > >>>> Should we write the details as defined in KIP-932 i.e.
> > > >>>>
> > > >>>> How to initialize the share-partition start offset:
> > > >>>>
> > > >>>>     -
> > > >>>>
> > > >>>>     "earliest" : automatically reset the offset to the earliest
> offset
> > > >>>>     -
> > > >>>>
> > > >>>>     "latest" : automatically reset the offset to the latest offset
> > > >>>>     -
> > > >>>>
> > > >>>>     "*earliest_local": *automatically resets the offset to the
> earliest
> > > >>>>     message stored in the local log on the broker.
> > > >>>>     -
> > > >>>>
> > > >>>>     "*by_duration": *automatically resets the offset to the
> earliest
> > > >>> offset
> > > >>>>     whose timestamp is greater than or equal to the configured
> duration
> > > >>>>     * (auto.offset.reset.by.duration).*
> > > >>>>
> > > >>>>
> > > >>>> *or* maybe just replace ":" with "." i.e. "How to initialize the
> > > >>>> share-partition start offset." as the details of the new configs
> are
> > > >>>> defined at top in different sections as well.
> > > >>>>
> > > >>>> AM6: Question: KafkaConsumer has public methods defined to "seek"
> for
> > > >> the
> > > >>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses
> > > >>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you
> think
> > > >> it
> > > >>>> would be sensible to add additional KafkaConsumer APIs for newly
> > > >>> introduced
> > > >>>> strategies?
> > > >>>>
> > > >>>> Regards,
> > > >>>> Apoorv Mittal
> > > >>>>
> > > >>>>
> > > >>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <
> manikumar.re...@gmail.com>
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Matthias,
> > > >>>>>
> > > >>>>> Thanks for the review.
> > > >>>>>
> > > >>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes
> > > >> should
> > > >>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is
> > > >>>>> proposing to add new configs (auto.offset.reset.on.no
> .initial.offset
> > > >>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive
> KIP-842
> > > >>>>> after completing this KIP.
> > > >>>>>
> > > >>>>> MS2: Added support for earliest-local config to reset to oldest
> local
> > > >>>>> offset and changes to kafka-consumer-group cmd.
> > > >>>>>
> > > >>>>> MS3: Based on suggestions, I have updated the config option to
> > > >> support
> > > >>>>> ISO format.
> > > >>>>>
> > > >>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I
> > > >>>>> definitely need your help on refining the API/solution for KS :)
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org
> >
> > > >>> wrote:
> > > >>>>>>
> > > >>>>>> Thanks for the KIP.
> > > >>>>>>
> > > >>>>>> A somewhat orthogonal question I have is, if we should try to
> merge
> > > >>> this
> > > >>>>>> KIP with the existing KIP-842:
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms
> > > >>>>>>
> > > >>>>>> While KIP-842 has a different intention, ie, separating the
> case of
> > > >>> "no
> > > >>>>>> offset during initial startup" vs "offset out of bounds during
> > > >>>>>> processing", it might still be worth it? My motivation for this
> > > >>> question
> > > >>>>>> is, that `auto.offset.reset` is one of the most central configs
> we
> > > >>> have,
> > > >>>>>> and thus, doing a single larger change (ie, both KIPs together)
> > > >>> might be
> > > >>>>>> better (less noise) than doing two independent changes?
> Thoughts? I
> > > >>> know
> > > >>>>>> it might be a little bit of a stretch, but asking cannot hurt :)
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> About the KIP itself, did we consider to add something like
> > > >>>>>> "latest-local" to say reset to oldest local, but not fetch from
> > > >>> tiered
> > > >>>>>> storage?
> > > >>>>>>
> > > >>>>>> For the format of the config, did we consider what we do in
> > > >>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options
> > > >> like
> > > >>>>>>
> > > >>>>>>     --by-duration <duration: format `PnDTnHnMnS`>
> > > >>>>>>     --shift-by <number-of-records>
> > > >>>>>>
> > > >>>>>>     --to-datetime <fixed point in time: format
> > > >>> `YYYY-MM-DDTHH:mm:SS.sss`>
> > > >>>>>>     --to-offset <absolute offset>
> > > >>>>>>
> > > >>>>>> As it also uses ISO format, it might be good to also use them
> (even
> > > >>> if I
> > > >>>>>> can see the appeal and simplicity of what you proposed).
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> I also want to add to (AM1), as there is also
> > > >>> `Topology.AutoOffsetReset`
> > > >>>>>> enum in Kafka Streams. I think we would need to convert this
> into a
> > > >>>>>> class. While it could be done is a follow up KIP, too, it seems
> > > >> best
> > > >>> to
> > > >>>>>> do this holistically in a single KIP, because KS is not
> something
> > > >>> on-top
> > > >>>>>> of Kafka, but it's part of Kafka. I am happy to help with the
> > > >> design
> > > >>> and
> > > >>>>>> even PRs if necessary, but would strongly prefer to do it all
> in a
> > > >>>>>> single KIP.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Btw: if we add something like "latest-local", it might also be
> good
> > > >>> to
> > > >>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the
> tool
> > > >>> works
> > > >>>>>> slightly different, as it does commit an offset and there could
> be
> > > >>> some
> > > >>>>>> race condition between committing "latest-local", tiering, and
> when
> > > >>> the
> > > >>>>>> consumer is actually started?
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> -Matthias
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote:
> > > >>>>>>> Hi Manikumar,
> > > >>>>>>> Thanks for the KIP, this new strategy would be helpful in
> > > >>> specifying
> > > >>>>> fetch
> > > >>>>>>> behaviour.
> > > >>>>>>>
> > > >>>>>>> AM1: The config `auto.offset.reset` is currently applied as per
> > > >> the
> > > >>>>> enum
> > > >>>>>>> class OffsetResetStarategy which is part of kafka-clients
> javadoc
> > > >>>>>>> <
> > > >>>>>
> > > >>>
> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html
> > > >>> .
> > > >>>>>>> Are we also proposing to somehow add new definitions in the
> same
> > > >>> class?
> > > >>>>>>> However as new configurations will be a string representation
> > > >> hence
> > > >>>>> are we
> > > >>>>>>> moving away from OffsetResetStartegy enum class altogether?
> > > >> Should
> > > >>> we
> > > >>>>>>> include the change in the KIP as OffsetResetStarategy is part
> of
> > > >>> public
> > > >>>>>>> javadoc?
> > > >>>>>>>
> > > >>>>>>> AM2: While I can see the ISO-8601 format is in the rejected
> > > >>>>> alternative,
> > > >>>>>>> should we not follow some standard of defining duration which
> has
> > > >>>>> already
> > > >>>>>>> been adopted in other systems?
> > > >>>>>>>
> > > >>>>>>> AM3: We've introduced new config values using the format
> > > >>> minus-n-hours,
> > > >>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we
> > > >>> explicitly
> > > >>>>>>> define the "minus" prefix, or is it implied?
> > > >>>>>>>
> > > >>>>>>> AM4: When supporting duration-based resets, should we also
> allow
> > > >>> users
> > > >>>>> to
> > > >>>>>>> specify a specific checkpoint time? For example, if a
> checkpoint
> > > >>>>> occurs 2
> > > >>>>>>> days, 5 hours and 30 minutes earlier, the current four formats
> > > >>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years)
> > > >> might
> > > >>> not
> > > >>>>> be
> > > >>>>>>> sufficient. Should we consider adding a format to accommodate
> > > >>> specific
> > > >>>>>>> checkpoint times, or is there a reason to limit the supported
> > > >>> formats?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>> Apoorv Mittal
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar <
> > > >>> manikumar.re...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi everyone,
> > > >>>>>>>> I would like to start a discussion on KIP-1106:
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > > >>>
> > > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients
> > > >>>>>>>>
> > > >>>>>>>> This KIP proposes to add an additional auto offset reset
> > > >> strategy
> > > >>> for
> > > >>>>>>>> consumer clients.
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>> Manikumar
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > > >
>

Reply via email to