Hi All,

The following changes are made to the KIP while implementing the changes to
Kafka Streams.

1. We have changed AutoOffsetReset.duration(...) API to
AutoOffsetReset.byDuration(...)
2. Added "none" reset strategy as an option for the newly added
org.apache.kafka.streams.AutoOffsetReset class
3. And updated the org/apache/kafka/streams/scala/kstream/Consumed.scala to
support the new AutoOffsetReset.
   We need deprecated existing two methods and added two new overloaded
methods
     - deprecated `with(TimestampExtractor, Topology.AutoOffsetReset)`
method
     - deprecated `with(Topology.AutoOffsetReset)` method
     - added new `with(TimestampExtractor, AutoOffsetReset)` method
     - added new `with(TimestampExtractor, AutoOffsetReset)` method


 Thanks

On Tue, Nov 19, 2024 at 5:36 PM Manikumar <manikumar.re...@gmail.com> wrote:

> Hi All,
>
> I have made a minor update to the KIP.
> Since we are deprecating the OffsetResetStrategy enum, I have deprecated
> the `MockConsumer(OffsetResetStrategy offsetResetStrategy)` constructor
> and added a new constructor.
>
>
> Thanks,
>
>
> On Thu, Nov 14, 2024 at 10:25 AM Manikumar <manikumar.re...@gmail.com>
> wrote:
>
>> Hi Jun,
>>
>> JR10: Yes, we can remove the name field. Updated the KIP.
>>
>>
>> Thanks,
>>
>> On Thu, Nov 14, 2024 at 3:57 AM Jun Rao <j...@confluent.io.invalid> wrote:
>>
>>> Hi, Manikumar,
>>>
>>> Thanks for the explanation. We can follow the existing KStreams pattern.
>>>
>>> Could AutoOffsetReset(String name, long duration) just
>>> be AutoOffsetReset(long duration)?
>>>
>>> Jun
>>>
>>> On Wed, Nov 13, 2024 at 11:53 AM Manikumar <manikumar.re...@gmail.com>
>>> wrote:
>>>
>>> > HI Jun,
>>> >
>>> > JR10: I was trying to follow KS conventions where we use non-public
>>> > sub-class with necessary getters.
>>> > Please check Matthias's suggestion here:
>>> > https://lists.apache.org/thread/mln783cbswgj14wt4hykcks4kfyl9zpf
>>> > There will be an AutoOffsetResetInternal class which will have getter
>>> > method.
>>> > We can follow your suggestions if we do not use Internal class
>>> approach,
>>> >
>>> >
>>> > Thanks,
>>> >
>>> > On Wed, Nov 13, 2024 at 11:12 PM Jun Rao <j...@confluent.io.invalid>
>>> wrote:
>>> >
>>> > > Hi, Manikumar,
>>> > >
>>> > > Thanks for the updated KIP.
>>> > >
>>> > > JR10. The new AutoOffsetReset class. Could the following
>>> constructors be
>>> > > private?
>>> > >     protected AutoOffsetReset(String name, long duration)
>>> > >     protected AutoOffsetReset(String name)
>>> > > Could AutoOffsetReset(String name, long duration) just
>>> > > be AutoOffsetReset(long duration)?
>>> > > Do we need the following field?
>>> > >     protected final String name;
>>> > > Instead of the following field, is it better to expose it through a
>>> > public
>>> > > method?
>>> > >     protected final Optional<Long> duration;
>>> > >
>>> > > Jun
>>> > >
>>> > > On Tue, Nov 12, 2024 at 9:41 PM Manikumar <manikumar.re...@gmail.com
>>> >
>>> > > wrote:
>>> > >
>>> > > > Hi Matthias.
>>> > > >
>>> > > > Thanks for the review
>>> > > >
>>> > > > 1. looks like the majority of us are leaning towards BY_DURATION
>>> > naming.
>>> > > I
>>> > > > have updated the same in the KIP.
>>> > > >
>>> > > > 2. Thanks. Updated the KIP to remove private/internal
>>> implementation.
>>> > > >
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > On Wed, Nov 13, 2024 at 6:28 AM Matthias J. Sax <mj...@apache.org>
>>> > > wrote:
>>> > > >
>>> > > > > Thanks for updating the KIP.
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > I am happy to see that we seem to align to use a single config
>>> only
>>> > :)
>>> > > > >
>>> > > > > Obviously, I need to bikeshed on the format: `BACK:<duration>`
>>> does
>>> > not
>>> > > > > read well IMHO, and I think
>>> `auto.offset.reset="BACK_BY:<duration>"`
>>> > > > > would read much better. Andrew's suggestion of
>>> > `BY_DURATION:<duration>`
>>> > > > > might even be better (but I would be ok with either one).
>>> > > > >
>>> > > > > (I would not use `DURATION:<duration>` personally -- similar to
>>> > > > > `BACK:<duration>` is does not read well from my POV).
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > About KS related changes. Overall LGTM. What is the reason for
>>> having
>>> > > > >
>>> > > > >    AutoOffsetReset.LATEST
>>> > > > >    AutoOffsetReset.EARLIEST
>>> > > > >
>>> > > > > as `public` variables? Seems we don't need them, but that they
>>> are
>>> > > > > rather an internal implementation detail?
>>> > > > >
>>> > > > > In general, I would recommend to omit everything `private` on
>>> the KIP
>>> > > > > (including method implementations), as it's not user facing, but
>>> only
>>> > > > > have method signatures in the KIP.
>>> > > > >
>>> > > > > What I believe we need to add is a `protected` constructor (for
>>> the
>>> > > > > internal sub-class):
>>> > > > >
>>> > > > >     protected AutoOffsetReset(AutoOffsetReset autoOffsetReset);
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > -Matthias
>>> > > > >
>>> > > > > On 11/12/24 6:09 AM, Apoorv Mittal wrote:
>>> > > > > > Thanks Manikumar for explaining, sounds good to me.
>>> > > > > >
>>> > > > > > Regards,
>>> > > > > > Apoorv Mittal
>>> > > > > >
>>> > > > > >
>>> > > > > > On Tue, Nov 12, 2024 at 1:47 PM Andrew Schofield <
>>> > > > > > andrew_schofield_j...@outlook.com> wrote:
>>> > > > > >
>>> > > > > >> Hi,
>>> > > > > >> Looks good now. Just one suggestion.
>>> > > > > >>
>>> > > > > >> AS8: Instead of "back:30D", I wonder whether the word
>>> 'duration'
>>> > > ought
>>> > > > > to
>>> > > > > >> be
>>> > > > > >> used to be consistent with kafka-consumer-groups.sh. So,
>>> > > > > >> "by-duration:P3D" or "duration:P3D" might be better.
>>> > > > > >>
>>> > > > > >> The overall idea of merging the configs into one config is
>>> fine in
>>> > > the
>>> > > > > >> current
>>> > > > > >> text of the KIP.
>>> > > > > >>
>>> > > > > >> Thanks,
>>> > > > > >> Andrew
>>> > > > > >>
>>> > > > > >> ________________________________________
>>> > > > > >> From: Manikumar <manikumar.re...@gmail.com>
>>> > > > > >> Sent: 12 November 2024 13:30
>>> > > > > >> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>> > > > > >> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset
>>> reset
>>> > > > option
>>> > > > > >> for consumer clients
>>> > > > > >>
>>> > > > > >> Hi Apoorv,
>>> > > > > >>
>>> > > > > >> AM7: AutoOffsetReset.java is for Kafka Streams API. I am not
>>> > > proposing
>>> > > > > any
>>> > > > > >> public Interface/class for Kafka Consumer.
>>> > > > > >> As mentioned in the KIP, even though OffsetResetStrategy is a
>>> > public
>>> > > > > class,
>>> > > > > >> it's not used in any public APIs. I think new internal classes
>>> > > should
>>> > > > be
>>> > > > > >> sufficient.
>>> > > > > >>
>>> > > > > >> AM8: Fixed
>>> > > > > >>
>>> > > > > >> AM9: This class is for kafka streams API
>>> > > > > >>
>>> > > > > >> AM10: I was using the EARLIEST_TIMESTAMP, LATEST_TIMESTAMP
>>> > constant
>>> > > > > values
>>> > > > > >> from ListOffsetsRequest
>>> > > > > >> <
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java#L41
>>> > > > > >>>
>>> > > > > >>
>>> > > > > >> But yes, we can use Optional in AutoOffsetReset class.
>>> Updated the
>>> > > > KIP.
>>> > > > > >>
>>> > > > > >>
>>> > > > > >> Thanks.
>>> > > > > >>
>>> > > > > >>
>>> > > > > >>
>>> > > > > >> On Tue, Nov 12, 2024 at 6:15 PM Apoorv Mittal <
>>> > > > apoorvmitta...@gmail.com
>>> > > > > >
>>> > > > > >> wrote:
>>> > > > > >>
>>> > > > > >>> Hi,
>>> > > > > >>> I read the changes for single configuration and deprecated
>>> > > > > >>> OffsetResetStrategy.java.
>>> > > > > >>>
>>> > > > > >>> AM7: Question: The KIP says that previous supported values
>>> were
>>> > > > > >>> earliest/latest/none and new back:<duration> config would be
>>> > added.
>>> > > > We
>>> > > > > >> have
>>> > > > > >>> no definition of "none" in the newly introduced
>>> > > AutoOffsetReset.java
>>> > > > > >> class
>>> > > > > >>> hence I am assuming that if "none" is specified as a config
>>> > option
>>> > > > then
>>> > > > > >>> that config will be ignored, correct? Or are we deprecating
>>> the
>>> > > usage
>>> > > > > of
>>> > > > > >>> "none" altogether?
>>> > > > > >>>
>>> > > > > >>> AM8: Minor: The new class AutoOffsetReset under interfaces
>>> > mentions
>>> > > > the
>>> > > > > >>> name as OffsetResetStrategy.java. This requires correction.
>>> > > > > >>>
>>> > > > > >>> AM9: Is the package name correct for AutoOffsetReset as
>>> > > > > >>> org.apache.kafka.streams, shouldn't it be under clients
>>> package?
>>> > > > > >>>
>>> > > > > >>> AM10: What does -1L and -2L mean as long for latest and
>>> earliest
>>> > in
>>> > > > > >>> AutoOffsetReset.java? Is it just a long placeholder which
>>> will
>>> > > never
>>> > > > be
>>> > > > > >>> used elsewhere for latest and earliest? If yes then does it
>>> make
>>> > > > sense
>>> > > > > to
>>> > > > > >>> keep long as Optional, and use Optional.empty() for latest
>>> and
>>> > > > > earliest?
>>> > > > > >>>
>>> > > > > >>> Regards,
>>> > > > > >>> Apoorv Mittal
>>> > > > > >>>
>>> > > > > >>>
>>> > > > > >>> On Tue, Nov 12, 2024 at 12:06 PM Manikumar <
>>> > > > manikumar.re...@gmail.com>
>>> > > > > >>> wrote:
>>> > > > > >>>
>>> > > > > >>>> Thanks Ismael and Lianet for the reviews.
>>> > > > > >>>>
>>> > > > > >>>> Based on suggestions, I have updated the KIP to in favour of
>>> > > having
>>> > > > a
>>> > > > > >>>> single config (auto.offset.reset).
>>> > > > > >>>> I have also adopted the Lianet's suggestion on naming.
>>> > > > > >>>>
>>> > > > > >>>> auto.offset.reset=back:P3D -> reset back 3 days
>>> > > > > >>>>
>>> > > > > >>>>
>>> > > > > >>>> Let me know if there are any concerns.
>>> > > > > >>>>
>>> > > > > >>>>
>>> > > > > >>>> Thanks,
>>> > > > > >>>>
>>> > > > > >>>>
>>> > > > > >>>> On Sat, Nov 9, 2024 at 10:06 PM Lianet M. <
>>> liane...@gmail.com>
>>> > > > wrote:
>>> > > > > >>>>>
>>> > > > > >>>>> Hi all. Thanks Manikumar for the nice improvement, useful
>>> > indeed.
>>> > > > > >>>>>
>>> > > > > >>>>> I also lean towards having a single config given that it's
>>> all
>>> > > > about
>>> > > > > >>> the
>>> > > > > >>>>> reset policy, seems all the same "what" (auto reset
>>> policy) and
>>> > > we
>>> > > > > >> are
>>> > > > > >>>>> just extending the with a new behaviour. Regarding the
>>> naming
>>> > > > (agree
>>> > > > > >> on
>>> > > > > >>>>> duration being confusing), what about something to show
>>> that
>>> > it's
>>> > > > > >>> simply
>>> > > > > >>>>> about how far back to reset:
>>> > > > > >>>>>
>>> > > > > >>>>> auto.offset.reset=EARLIEST
>>> > > > > >>>>> auto.offset.reset=BACK:P3D -> reset back 3 days (combined
>>> with
>>> > > > > >> ISO8601
>>> > > > > >>>>> seems really easy to read/understand from the config
>>> definition
>>> > > > > >> itself)
>>> > > > > >>>>>
>>> > > > > >>>>> Something like that would definitely break consistency
>>> with the
>>> > > > > >> command
>>> > > > > >>>>> line tool argument "by_duration", but if it seems clearer
>>> we
>>> > > should
>>> > > > > >>>>> consider the tradeoff and not penalize the consumer
>>> > API/configs.
>>> > > > > >>>>>
>>> > > > > >>>>> Thanks!
>>> > > > > >>>>>
>>> > > > > >>>>> Lianet
>>> > > > > >>>>>
>>> > > > > >>>>>
>>> > > > > >>>>> On Sat, Nov 9, 2024 at 2:47 AM Ismael Juma <
>>> m...@ismaeljuma.com>
>>> > > > > >> wrote:
>>> > > > > >>>>>
>>> > > > > >>>>>> Thanks for the KIP, this is useful. A comment below.
>>> > > > > >>>>>>
>>> > > > > >>>>>> On Thu, Nov 7, 2024 at 4:51 PM Matthias J. Sax <
>>> > > mj...@apache.org>
>>> > > > > >>>> wrote:
>>> > > > > >>>>>>
>>> > > > > >>>>>>> I am personally not convinced that adding a new config
>>> > > > > >>>>>>> `auto.offset.reset.by.duration` is the best way. Kafka
>>> in
>>> > > > > >> general
>>> > > > > >>>> has
>>> > > > > >>>>>>> way too many configs and trying to avoid adding more
>>> configs
>>> > > > > >> seems
>>> > > > > >>>> to be
>>> > > > > >>>>>>> desirable?  -- It seems this might be a point of
>>> contention,
>>> > > and
>>> > > > > >> if
>>> > > > > >>>> the
>>> > > > > >>>>>>> majority of people wants this new config so be it. I just
>>> > > wanted
>>> > > > > >> to
>>> > > > > >>>>>>> stress my concerns about it.
>>> > > > > >>>>>>
>>> > > > > >>>>>>
>>> > > > > >>>>>> I agree that we don't need a new config. We can simply
>>> use a
>>> > > value
>>> > > > > >>> for
>>> > > > > >>>> the
>>> > > > > >>>>>> existing config. I think a prefix followed by the relevant
>>> > > ISO8601
>>> > > > > >>>> string
>>> > > > > >>>>>> would be clear enough. For example,
>>> "by-duration:P23DT23H" or
>>> > > > > >>> something
>>> > > > > >>>>>> along those lines. I do find the "by-duration"
>>> description a
>>> > bit
>>> > > > > >>>> confusing
>>> > > > > >>>>>> for what we're doing here (i.e. current time - duration)
>>> > > although
>>> > > > > >>>> there is
>>> > > > > >>>>>> precedent in the reset offsets tool.
>>> > > > > >>>>>>
>>> > > > > >>>>>> Ismael
>>> > > > > >>>>>>
>>> > > > > >>>>
>>> > > > > >>>
>>> > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to