Thanks Manikumar and Andrew.

For AM6: Though I was thinking of enhancing OffsetFetcher and adding
"seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API
itself. However I agree that this is just another convenient way and not
necessary to support functionality. Skipping it sounds good to me.

Regards,
Apoorv Mittal


On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Manikumar,
>
> AS7: In response to AM6, I agree that in general the combination
> of offsetsForTimes() and seek() will do what is required. The one case that
> I think is more tricky is seeking to the local earliest/beginning.
>
> The Consumer API has Consumer.beginningOffsets and
> seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
> Similarly there are methods for OffsetResetStrategy.LATEST.
>
> With the Consumer API, I don't think you can find the local earliest
> offset.
> That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
> We could add equivalents of Consumer.beginningOffsets and
> Consumer.seekToBeginning. An application could always use the
> Admin API with the Consumer API.
>
> Personally, I would not complicate this KIP with these new methods when
> the focus is more on duration-based offset reset and configs.
>
> Thanks,
> Andrew
> ________________________________________
> From: Manikumar <manikumar.re...@gmail.com>
> Sent: 06 November 2024 04:20
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option
> for consumer clients
>
> Hi Apoorv,
>
> Thanks for the review.
>
> AM5: Thanks, Updated the  KIP.
>
> AM6: Currently we can achieve this with the combination of
> offsetsForTimes() and seek() API..
> Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
> Long> timestampsToSearch) if required.
> Let's see what others think.
>
> Thanks,
>
> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com>
> wrote:
> >
> > Hi Manikumar,
> > Thanks for the changes. Just minor comment and a question:
> >
> > AM5: The description "How to initialize the share-partition start
> offset:"
> > for "*share.auto.offset.reset*" seems incomplete as it ends with `:`.
> > Should we write the details as defined in KIP-932 i.e.
> >
> > How to initialize the share-partition start offset:
> >
> >    -
> >
> >    "earliest" : automatically reset the offset to the earliest offset
> >    -
> >
> >    "latest" : automatically reset the offset to the latest offset
> >    -
> >
> >    "*earliest_local": *automatically resets the offset to the earliest
> >    message stored in the local log on the broker.
> >    -
> >
> >    "*by_duration": *automatically resets the offset to the earliest
> offset
> >    whose timestamp is greater than or equal to the configured duration
> >    * (auto.offset.reset.by.duration).*
> >
> >
> > *or* maybe just replace ":" with "." i.e. "How to initialize the
> > share-partition start offset." as the details of the new configs are
> > defined at top in different sections as well.
> >
> > AM6: Question: KafkaConsumer has public methods defined to "seek" for the
> > reset strategy like "seekToBeginning" and "seekToEnd" which uses
> > OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think it
> > would be sensible to add additional KafkaConsumer APIs for newly
> introduced
> > strategies?
> >
> > Regards,
> > Apoorv Mittal
> >
> >
> > On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com>
> wrote:
> >
> > > Hi Matthias,
> > >
> > > Thanks for the review.
> > >
> > > MS1: looks like KIP-842 is stuck in the voting. Current changes should
> > > fit into KIP-842 (few updates definitely required) as KIP-842 is
> > > proposing to add new configs (auto.offset.reset.on.no.initial.offset
> > > and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
> > > after completing this KIP.
> > >
> > > MS2: Added support for earliest-local config to reset to oldest local
> > > offset and changes to kafka-consumer-group cmd.
> > >
> > > MS3: Based on suggestions, I have updated the config option to support
> > > ISO format.
> > >
> > > MS4: I have included the AutoOffsetReset changes to the KIP. I
> > > definitely need your help on refining the API/solution for KS :)
> > >
> > > Thanks
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> > > >
> > > > Thanks for the KIP.
> > > >
> > > > A somewhat orthogonal question I have is, if we should try to merge
> this
> > > > KIP with the existing KIP-842:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms
> > > >
> > > > While KIP-842 has a different intention, ie, separating the case of
> "no
> > > > offset during initial startup" vs "offset out of bounds during
> > > > processing", it might still be worth it? My motivation for this
> question
> > > > is, that `auto.offset.reset` is one of the most central configs we
> have,
> > > > and thus, doing a single larger change (ie, both KIPs together)
> might be
> > > > better (less noise) than doing two independent changes? Thoughts? I
> know
> > > > it might be a little bit of a stretch, but asking cannot hurt :)
> > > >
> > > >
> > > > About the KIP itself, did we consider to add something like
> > > > "latest-local" to say reset to oldest local, but not fetch from
> tiered
> > > > storage?
> > > >
> > > > For the format of the config, did we consider what we do in
> > > > `bin/kafka-consumer-group.sh`? It has multiple different options like
> > > >
> > > >    --by-duration <duration: format `PnDTnHnMnS`>
> > > >    --shift-by <number-of-records>
> > > >
> > > >    --to-datetime <fixed point in time: format
> `YYYY-MM-DDTHH:mm:SS.sss`>
> > > >    --to-offset <absolute offset>
> > > >
> > > > As it also uses ISO format, it might be good to also use them (even
> if I
> > > > can see the appeal and simplicity of what you proposed).
> > > >
> > > >
> > > > I also want to add to (AM1), as there is also
> `Topology.AutoOffsetReset`
> > > > enum in Kafka Streams. I think we would need to convert this into a
> > > > class. While it could be done is a follow up KIP, too, it seems best
> to
> > > > do this holistically in a single KIP, because KS is not something
> on-top
> > > > of Kafka, but it's part of Kafka. I am happy to help with the design
> and
> > > > even PRs if necessary, but would strongly prefer to do it all in a
> > > > single KIP.
> > > >
> > > >
> > > > Btw: if we add something like "latest-local", it might also be good
> to
> > > > extend `bin/kafka-consumer-group.sh` accordingly (even if the tool
> works
> > > > slightly different, as it does commit an offset and there could be
> some
> > > > race condition between committing "latest-local", tiering, and when
> the
> > > > consumer is actually started?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 11/4/24 7:38 AM, Apoorv Mittal wrote:
> > > > > Hi Manikumar,
> > > > > Thanks for the KIP, this new strategy would be helpful in
> specifying
> > > fetch
> > > > > behaviour.
> > > > >
> > > > > AM1: The config `auto.offset.reset` is currently applied as per the
> > > enum
> > > > > class OffsetResetStarategy which is part of kafka-clients javadoc
> > > > > <
> > >
> https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html>.
> > > > > Are we also proposing to somehow add new definitions in the same
> class?
> > > > > However as new configurations will be a string representation hence
> > > are we
> > > > > moving away from OffsetResetStartegy enum class altogether? Should
> we
> > > > > include the change in the KIP as OffsetResetStarategy is part of
> public
> > > > > javadoc?
> > > > >
> > > > > AM2: While I can see the ISO-8601 format is in the rejected
> > > alternative,
> > > > > should we not follow some standard of defining duration which has
> > > already
> > > > > been adopted in other systems?
> > > > >
> > > > > AM3: We've introduced new config values using the format
> minus-n-hours,
> > > > > minus-n-days, minus-n-months, and minus-n-years. Should we
> explicitly
> > > > > define the "minus" prefix, or is it implied?
> > > > >
> > > > > AM4: When supporting duration-based resets, should we also allow
> users
> > > to
> > > > > specify a specific checkpoint time? For example, if a checkpoint
> > > occurs 2
> > > > > days, 5 hours and 30 minutes earlier, the current four formats
> > > > > (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) might
> not
> > > be
> > > > > sufficient. Should we consider adding a format to accommodate
> specific
> > > > > checkpoint times, or is there a reason to limit the supported
> formats?
> > > > >
> > > > >
> > > > > Regards,
> > > > > Apoorv Mittal
> > > > >
> > > > >
> > > > > On Mon, Nov 4, 2024 at 9:23 AM Manikumar <
> manikumar.re...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi everyone,
> > > > >> I would like to start a discussion on KIP-1106:
> > > > >>
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients
> > > > >>
> > > > >> This KIP proposes to add an additional auto offset reset strategy
> for
> > > > >> consumer clients.
> > > > >>
> > > > >> Regards,
> > > > >> Manikumar
> > > > >>
> > > > >
> > >
>

Reply via email to