Hi Manikumar,

AS7: In response to AM6, I agree that in general the combination
of offsetsForTimes() and seek() will do what is required. The one case that
I think is more tricky is seeking to the local earliest/beginning.

The Consumer API has Consumer.beginningOffsets and
seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
Similarly there are methods for OffsetResetStrategy.LATEST.

With the Consumer API, I don't think you can find the local earliest offset.
That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
We could add equivalents of Consumer.beginningOffsets and
Consumer.seekToBeginning. An application could always use the
Admin API with the Consumer API.

Personally, I would not complicate this KIP with these new methods when
the focus is more on duration-based offset reset and configs.

Thanks,
Andrew
________________________________________
From: Manikumar <manikumar.re...@gmail.com>
Sent: 06 November 2024 04:20
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option for 
consumer clients

Hi Apoorv,

Thanks for the review.

AM5: Thanks, Updated the  KIP.

AM6: Currently we can achieve this with the combination of
offsetsForTimes() and seek() API..
Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
Long> timestampsToSearch) if required.
Let's see what others think.

Thanks,

On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com> wrote:
>
> Hi Manikumar,
> Thanks for the changes. Just minor comment and a question:
>
> AM5: The description "How to initialize the share-partition start offset:"
> for "*share.auto.offset.reset*" seems incomplete as it ends with `:`.
> Should we write the details as defined in KIP-932 i.e.
>
> How to initialize the share-partition start offset:
>
>    -
>
>    "earliest" : automatically reset the offset to the earliest offset
>    -
>
>    "latest" : automatically reset the offset to the latest offset
>    -
>
>    "*earliest_local": *automatically resets the offset to the earliest
>    message stored in the local log on the broker.
>    -
>
>    "*by_duration": *automatically resets the offset to the earliest offset
>    whose timestamp is greater than or equal to the configured duration
>    * (auto.offset.reset.by.duration).*
>
>
> *or* maybe just replace ":" with "." i.e. "How to initialize the
> share-partition start offset." as the details of the new configs are
> defined at top in different sections as well.
>
> AM6: Question: KafkaConsumer has public methods defined to "seek" for the
> reset strategy like "seekToBeginning" and "seekToEnd" which uses
> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think it
> would be sensible to add additional KafkaConsumer APIs for newly introduced
> strategies?
>
> Regards,
> Apoorv Mittal
>
>
> On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > Thanks for the review.
> >
> > MS1: looks like KIP-842 is stuck in the voting. Current changes should
> > fit into KIP-842 (few updates definitely required) as KIP-842 is
> > proposing to add new configs (auto.offset.reset.on.no.initial.offset
> > and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
> > after completing this KIP.
> >
> > MS2: Added support for earliest-local config to reset to oldest local
> > offset and changes to kafka-consumer-group cmd.
> >
> > MS3: Based on suggestions, I have updated the config option to support
> > ISO format.
> >
> > MS4: I have included the AutoOffsetReset changes to the KIP. I
> > definitely need your help on refining the API/solution for KS :)
> >
> > Thanks
> >
> >
> >
> >
> >
> > On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org> wrote:
> > >
> > > Thanks for the KIP.
> > >
> > > A somewhat orthogonal question I have is, if we should try to merge this
> > > KIP with the existing KIP-842:
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms
> > >
> > > While KIP-842 has a different intention, ie, separating the case of "no
> > > offset during initial startup" vs "offset out of bounds during
> > > processing", it might still be worth it? My motivation for this question
> > > is, that `auto.offset.reset` is one of the most central configs we have,
> > > and thus, doing a single larger change (ie, both KIPs together) might be
> > > better (less noise) than doing two independent changes? Thoughts? I know
> > > it might be a little bit of a stretch, but asking cannot hurt :)
> > >
> > >
> > > About the KIP itself, did we consider to add something like
> > > "latest-local" to say reset to oldest local, but not fetch from tiered
> > > storage?
> > >
> > > For the format of the config, did we consider what we do in
> > > `bin/kafka-consumer-group.sh`? It has multiple different options like
> > >
> > >    --by-duration <duration: format `PnDTnHnMnS`>
> > >    --shift-by <number-of-records>
> > >
> > >    --to-datetime <fixed point in time: format `YYYY-MM-DDTHH:mm:SS.sss`>
> > >    --to-offset <absolute offset>
> > >
> > > As it also uses ISO format, it might be good to also use them (even if I
> > > can see the appeal and simplicity of what you proposed).
> > >
> > >
> > > I also want to add to (AM1), as there is also `Topology.AutoOffsetReset`
> > > enum in Kafka Streams. I think we would need to convert this into a
> > > class. While it could be done is a follow up KIP, too, it seems best to
> > > do this holistically in a single KIP, because KS is not something on-top
> > > of Kafka, but it's part of Kafka. I am happy to help with the design and
> > > even PRs if necessary, but would strongly prefer to do it all in a
> > > single KIP.
> > >
> > >
> > > Btw: if we add something like "latest-local", it might also be good to
> > > extend `bin/kafka-consumer-group.sh` accordingly (even if the tool works
> > > slightly different, as it does commit an offset and there could be some
> > > race condition between committing "latest-local", tiering, and when the
> > > consumer is actually started?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 11/4/24 7:38 AM, Apoorv Mittal wrote:
> > > > Hi Manikumar,
> > > > Thanks for the KIP, this new strategy would be helpful in specifying
> > fetch
> > > > behaviour.
> > > >
> > > > AM1: The config `auto.offset.reset` is currently applied as per the
> > enum
> > > > class OffsetResetStarategy which is part of kafka-clients javadoc
> > > > <
> > https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html>.
> > > > Are we also proposing to somehow add new definitions in the same class?
> > > > However as new configurations will be a string representation hence
> > are we
> > > > moving away from OffsetResetStartegy enum class altogether? Should we
> > > > include the change in the KIP as OffsetResetStarategy is part of public
> > > > javadoc?
> > > >
> > > > AM2: While I can see the ISO-8601 format is in the rejected
> > alternative,
> > > > should we not follow some standard of defining duration which has
> > already
> > > > been adopted in other systems?
> > > >
> > > > AM3: We've introduced new config values using the format minus-n-hours,
> > > > minus-n-days, minus-n-months, and minus-n-years. Should we explicitly
> > > > define the "minus" prefix, or is it implied?
> > > >
> > > > AM4: When supporting duration-based resets, should we also allow users
> > to
> > > > specify a specific checkpoint time? For example, if a checkpoint
> > occurs 2
> > > > days, 5 hours and 30 minutes earlier, the current four formats
> > > > (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) might not
> > be
> > > > sufficient. Should we consider adding a format to accommodate specific
> > > > checkpoint times, or is there a reason to limit the supported formats?
> > > >
> > > >
> > > > Regards,
> > > > Apoorv Mittal
> > > >
> > > >
> > > > On Mon, Nov 4, 2024 at 9:23 AM Manikumar <manikumar.re...@gmail.com>
> > wrote:
> > > >
> > > >> Hi everyone,
> > > >> I would like to start a discussion on KIP-1106:
> > > >>
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients
> > > >>
> > > >> This KIP proposes to add an additional auto offset reset strategy for
> > > >> consumer clients.
> > > >>
> > > >> Regards,
> > > >> Manikumar
> > > >>
> > > >
> >

Reply via email to