Hi Matthias,

Thanks for the comments.

> I am personally not convinced that adding a new config
`auto.offset.reset.by.duration` is the best way.

I think in this case, we felt it's better to add the strategy name to
auto.offset.reset config and add additional strategy specific configs.
This will be easy to understand and extend in future. I am open to
change if there are similar concerns.

> One question about "earliest_local": if we fetch this offset from the
 server, and seek it later, and start fetching, won't there be a race
 condition?  the server would just need to reload it from the tier.

Yes, your understanding is correct. the server will read from the tier storage.
Anyhow, I will consider dropping earliest_local based on Jun's suggestion.

> For Kafka Streams, just adding the new option so the existing enum does
 not work IMHO. If a user would use BY_DURATION, there is no option to pass
 in the actual duration (and we might need to throw an exception).

I was assuming we will be using the underlying consumer config here. I
will update based on your suggestions.


Thanks,




On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> Thanks for updating the KIP.
>
> I am personally not convinced that adding a new config
> `auto.offset.reset.by.duration` is the best way. Kafka in general has
> way too many configs and trying to avoid adding more configs seems to be
> desirable?  -- It seems this might be a point of contention, and if the
> majority of people wants this new config so be it. I just wanted to
> stress my concerns about it.
>
> One question about "earliest_local": if we fetch this offset from the
> server, and seek to it later, and start fetching, won't there be a race
> condition? The oldest segment could have been discarded locally in the
> mean time? Maybe not too much of a big deal: the server would just need
> to reload it from the tier. Just want to ensure my understanding is
> correct, and make sure nobody has concerns about it.
>
> For Kafka Streams, just adding the new option so the existing enum does
> not work IMHO.
>
> For example, we have a method:
>
>      addSource(final AutoOffsetReset offsetReset,
>                final String name,
>                final String... topics)
>
> If a user would use BY_DURATION, there is no option to pass in the
> actual duration (and we might need to throw an exception). And if we
> overload `addSource` adding a fourth parameter, it would be odd too, as
> users could pass in `EARLIEST` plus a duration, and we would need to
> throw an exception or ignore the given duration. Overall not an easy to
> use API.
>
> Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum,
> and add a new class `AutoOffsetReset` for which we can guide users
> accordingly avoiding corner cases:
>
> public class AutoOffsetReset() {
>    public static AutoOffsetReset earliest();
>    public static AutoOffsetReset latest();
>    public static AutoOffsetReset byDuration(Duration duration);
>    public static AutoOffsetReset earliestLocal();
> }
>
> The existing enum is contained inside `Topology` so we can add the new
> class just for package `org.apache.kafka.streams` to avoid naming conflicts.
>
> Additionally, we would deprecate all methods taking existing enum
> `AutoOffsetReset` (only two classed `Topology` and `Consumed` are
> affected) and add new overload which accepts the new `class
> AutoOffsetReset` instead.
>
>
> -Matthias
>
> On 11/7/24 1:57 PM, Jun Rao wrote:
> > Hi, Manikumar,
> >
> > Thanks for the KIP. A couple of comments.
> >
> > JR1. It doesn't seem that we need earliest_local. Intuitively, it makes
> > sense for a consumer app to bootstrap based on time. The time that a topic
> > is kept locally seems irrelevant to a consumer application.
> >
> > JR2. I took a look at the proposal of KIP-842. It seems that this KIP
> > covers the common scenario in KIP-842 and is simpler. The scenario
> > described in KIP-842 is about which initial offset to use when a new
> > partition is added. This is more or less the same problem for choosing the
> > initial offset for existing partitions. Having a time based option provides
> > more flexibility than earliest or latest, and is likely good enough for
> > most usage.
> >
> > Jun
> >
> >
> > On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> > wrote:
> >
> >> Thanks Manikumar and Andrew.
> >>
> >> For AM6: Though I was thinking of enhancing OffsetFetcher and adding
> >> "seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API
> >> itself. However I agree that this is just another convenient way and not
> >> necessary to support functionality. Skipping it sounds good to me.
> >>
> >> Regards,
> >> Apoorv Mittal
> >>
> >>
> >> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <
> >> andrew_schofield_j...@outlook.com> wrote:
> >>
> >>> Hi Manikumar,
> >>>
> >>> AS7: In response to AM6, I agree that in general the combination
> >>> of offsetsForTimes() and seek() will do what is required. The one case
> >> that
> >>> I think is more tricky is seeking to the local earliest/beginning.
> >>>
> >>> The Consumer API has Consumer.beginningOffsets and
> >>> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
> >>> Similarly there are methods for OffsetResetStrategy.LATEST.
> >>>
> >>> With the Consumer API, I don't think you can find the local earliest
> >>> offset.
> >>> That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
> >>> We could add equivalents of Consumer.beginningOffsets and
> >>> Consumer.seekToBeginning. An application could always use the
> >>> Admin API with the Consumer API.
> >>>
> >>> Personally, I would not complicate this KIP with these new methods when
> >>> the focus is more on duration-based offset reset and configs.
> >>>
> >>> Thanks,
> >>> Andrew
> >>> ________________________________________
> >>> From: Manikumar <manikumar.re...@gmail.com>
> >>> Sent: 06 November 2024 04:20
> >>> To: dev@kafka.apache.org <dev@kafka.apache.org>
> >>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option
> >>> for consumer clients
> >>>
> >>> Hi Apoorv,
> >>>
> >>> Thanks for the review.
> >>>
> >>> AM5: Thanks, Updated the  KIP.
> >>>
> >>> AM6: Currently we can achieve this with the combination of
> >>> offsetsForTimes() and seek() API..
> >>> Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
> >>> Long> timestampsToSearch) if required.
> >>> Let's see what others think.
> >>>
> >>> Thanks,
> >>>
> >>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> >>> wrote:
> >>>>
> >>>> Hi Manikumar,
> >>>> Thanks for the changes. Just minor comment and a question:
> >>>>
> >>>> AM5: The description "How to initialize the share-partition start
> >>> offset:"
> >>>> for "*share.auto.offset.reset*" seems incomplete as it ends with `:`.
> >>>> Should we write the details as defined in KIP-932 i.e.
> >>>>
> >>>> How to initialize the share-partition start offset:
> >>>>
> >>>>     -
> >>>>
> >>>>     "earliest" : automatically reset the offset to the earliest offset
> >>>>     -
> >>>>
> >>>>     "latest" : automatically reset the offset to the latest offset
> >>>>     -
> >>>>
> >>>>     "*earliest_local": *automatically resets the offset to the earliest
> >>>>     message stored in the local log on the broker.
> >>>>     -
> >>>>
> >>>>     "*by_duration": *automatically resets the offset to the earliest
> >>> offset
> >>>>     whose timestamp is greater than or equal to the configured duration
> >>>>     * (auto.offset.reset.by.duration).*
> >>>>
> >>>>
> >>>> *or* maybe just replace ":" with "." i.e. "How to initialize the
> >>>> share-partition start offset." as the details of the new configs are
> >>>> defined at top in different sections as well.
> >>>>
> >>>> AM6: Question: KafkaConsumer has public methods defined to "seek" for
> >> the
> >>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses
> >>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think
> >> it
> >>>> would be sensible to add additional KafkaConsumer APIs for newly
> >>> introduced
> >>>> strategies?
> >>>>
> >>>> Regards,
> >>>> Apoorv Mittal
> >>>>
> >>>>
> >>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Thanks for the review.
> >>>>>
> >>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes
> >> should
> >>>>> fit into KIP-842 (few updates definitely required) as KIP-842 is
> >>>>> proposing to add new configs (auto.offset.reset.on.no.initial.offset
> >>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
> >>>>> after completing this KIP.
> >>>>>
> >>>>> MS2: Added support for earliest-local config to reset to oldest local
> >>>>> offset and changes to kafka-consumer-group cmd.
> >>>>>
> >>>>> MS3: Based on suggestions, I have updated the config option to
> >> support
> >>>>> ISO format.
> >>>>>
> >>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I
> >>>>> definitely need your help on refining the API/solution for KS :)
> >>>>>
> >>>>> Thanks
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>>>>
> >>>>>> Thanks for the KIP.
> >>>>>>
> >>>>>> A somewhat orthogonal question I have is, if we should try to merge
> >>> this
> >>>>>> KIP with the existing KIP-842:
> >>>>>>
> >>>>>
> >>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms
> >>>>>>
> >>>>>> While KIP-842 has a different intention, ie, separating the case of
> >>> "no
> >>>>>> offset during initial startup" vs "offset out of bounds during
> >>>>>> processing", it might still be worth it? My motivation for this
> >>> question
> >>>>>> is, that `auto.offset.reset` is one of the most central configs we
> >>> have,
> >>>>>> and thus, doing a single larger change (ie, both KIPs together)
> >>> might be
> >>>>>> better (less noise) than doing two independent changes? Thoughts? I
> >>> know
> >>>>>> it might be a little bit of a stretch, but asking cannot hurt :)
> >>>>>>
> >>>>>>
> >>>>>> About the KIP itself, did we consider to add something like
> >>>>>> "latest-local" to say reset to oldest local, but not fetch from
> >>> tiered
> >>>>>> storage?
> >>>>>>
> >>>>>> For the format of the config, did we consider what we do in
> >>>>>> `bin/kafka-consumer-group.sh`? It has multiple different options
> >> like
> >>>>>>
> >>>>>>     --by-duration <duration: format `PnDTnHnMnS`>
> >>>>>>     --shift-by <number-of-records>
> >>>>>>
> >>>>>>     --to-datetime <fixed point in time: format
> >>> `YYYY-MM-DDTHH:mm:SS.sss`>
> >>>>>>     --to-offset <absolute offset>
> >>>>>>
> >>>>>> As it also uses ISO format, it might be good to also use them (even
> >>> if I
> >>>>>> can see the appeal and simplicity of what you proposed).
> >>>>>>
> >>>>>>
> >>>>>> I also want to add to (AM1), as there is also
> >>> `Topology.AutoOffsetReset`
> >>>>>> enum in Kafka Streams. I think we would need to convert this into a
> >>>>>> class. While it could be done is a follow up KIP, too, it seems
> >> best
> >>> to
> >>>>>> do this holistically in a single KIP, because KS is not something
> >>> on-top
> >>>>>> of Kafka, but it's part of Kafka. I am happy to help with the
> >> design
> >>> and
> >>>>>> even PRs if necessary, but would strongly prefer to do it all in a
> >>>>>> single KIP.
> >>>>>>
> >>>>>>
> >>>>>> Btw: if we add something like "latest-local", it might also be good
> >>> to
> >>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the tool
> >>> works
> >>>>>> slightly different, as it does commit an offset and there could be
> >>> some
> >>>>>> race condition between committing "latest-local", tiering, and when
> >>> the
> >>>>>> consumer is actually started?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote:
> >>>>>>> Hi Manikumar,
> >>>>>>> Thanks for the KIP, this new strategy would be helpful in
> >>> specifying
> >>>>> fetch
> >>>>>>> behaviour.
> >>>>>>>
> >>>>>>> AM1: The config `auto.offset.reset` is currently applied as per
> >> the
> >>>>> enum
> >>>>>>> class OffsetResetStarategy which is part of kafka-clients javadoc
> >>>>>>> <
> >>>>>
> >>> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html
> >>> .
> >>>>>>> Are we also proposing to somehow add new definitions in the same
> >>> class?
> >>>>>>> However as new configurations will be a string representation
> >> hence
> >>>>> are we
> >>>>>>> moving away from OffsetResetStartegy enum class altogether?
> >> Should
> >>> we
> >>>>>>> include the change in the KIP as OffsetResetStarategy is part of
> >>> public
> >>>>>>> javadoc?
> >>>>>>>
> >>>>>>> AM2: While I can see the ISO-8601 format is in the rejected
> >>>>> alternative,
> >>>>>>> should we not follow some standard of defining duration which has
> >>>>> already
> >>>>>>> been adopted in other systems?
> >>>>>>>
> >>>>>>> AM3: We've introduced new config values using the format
> >>> minus-n-hours,
> >>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we
> >>> explicitly
> >>>>>>> define the "minus" prefix, or is it implied?
> >>>>>>>
> >>>>>>> AM4: When supporting duration-based resets, should we also allow
> >>> users
> >>>>> to
> >>>>>>> specify a specific checkpoint time? For example, if a checkpoint
> >>>>> occurs 2
> >>>>>>> days, 5 hours and 30 minutes earlier, the current four formats
> >>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years)
> >> might
> >>> not
> >>>>> be
> >>>>>>> sufficient. Should we consider adding a format to accommodate
> >>> specific
> >>>>>>> checkpoint times, or is there a reason to limit the supported
> >>> formats?
> >>>>>>>
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Apoorv Mittal
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar <
> >>> manikumar.re...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>> I would like to start a discussion on KIP-1106:
> >>>>>>>>
> >>>>>>>>
> >>>>>
> >>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients
> >>>>>>>>
> >>>>>>>> This KIP proposes to add an additional auto offset reset
> >> strategy
> >>> for
> >>>>>>>> consumer clients.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Manikumar
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> >

Reply via email to