Thanks for updating the KIP.

I am personally not convinced that adding a new config `auto.offset.reset.by.duration` is the best way. Kafka in general has way too many configs and trying to avoid adding more configs seems to be desirable? -- It seems this might be a point of contention, and if the majority of people wants this new config so be it. I just wanted to stress my concerns about it.

One question about "earliest_local": if we fetch this offset from the server, and seek to it later, and start fetching, won't there be a race condition? The oldest segment could have been discarded locally in the mean time? Maybe not too much of a big deal: the server would just need to reload it from the tier. Just want to ensure my understanding is correct, and make sure nobody has concerns about it.

For Kafka Streams, just adding the new option so the existing enum does not work IMHO.

For example, we have a method:

    addSource(final AutoOffsetReset offsetReset,
              final String name,
              final String... topics)

If a user would use BY_DURATION, there is no option to pass in the actual duration (and we might need to throw an exception). And if we overload `addSource` adding a fourth parameter, it would be odd too, as users could pass in `EARLIEST` plus a duration, and we would need to throw an exception or ignore the given duration. Overall not an easy to use API.

Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum, and add a new class `AutoOffsetReset` for which we can guide users accordingly avoiding corner cases:

public class AutoOffsetReset() {
  public static AutoOffsetReset earliest();
  public static AutoOffsetReset latest();
  public static AutoOffsetReset byDuration(Duration duration);
  public static AutoOffsetReset earliestLocal();
}

The existing enum is contained inside `Topology` so we can add the new class just for package `org.apache.kafka.streams` to avoid naming conflicts.

Additionally, we would deprecate all methods taking existing enum `AutoOffsetReset` (only two classed `Topology` and `Consumed` are affected) and add new overload which accepts the new `class AutoOffsetReset` instead.


-Matthias

On 11/7/24 1:57 PM, Jun Rao wrote:
Hi, Manikumar,

Thanks for the KIP. A couple of comments.

JR1. It doesn't seem that we need earliest_local. Intuitively, it makes
sense for a consumer app to bootstrap based on time. The time that a topic
is kept locally seems irrelevant to a consumer application.

JR2. I took a look at the proposal of KIP-842. It seems that this KIP
covers the common scenario in KIP-842 and is simpler. The scenario
described in KIP-842 is about which initial offset to use when a new
partition is added. This is more or less the same problem for choosing the
initial offset for existing partitions. Having a time based option provides
more flexibility than earliest or latest, and is likely good enough for
most usage.

Jun


On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <apoorvmitta...@gmail.com>
wrote:

Thanks Manikumar and Andrew.

For AM6: Though I was thinking of enhancing OffsetFetcher and adding
"seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API
itself. However I agree that this is just another convenient way and not
necessary to support functionality. Skipping it sounds good to me.

Regards,
Apoorv Mittal


On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

Hi Manikumar,

AS7: In response to AM6, I agree that in general the combination
of offsetsForTimes() and seek() will do what is required. The one case
that
I think is more tricky is seeking to the local earliest/beginning.

The Consumer API has Consumer.beginningOffsets and
seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
Similarly there are methods for OffsetResetStrategy.LATEST.

With the Consumer API, I don't think you can find the local earliest
offset.
That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
We could add equivalents of Consumer.beginningOffsets and
Consumer.seekToBeginning. An application could always use the
Admin API with the Consumer API.

Personally, I would not complicate this KIP with these new methods when
the focus is more on duration-based offset reset and configs.

Thanks,
Andrew
________________________________________
From: Manikumar <manikumar.re...@gmail.com>
Sent: 06 November 2024 04:20
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option
for consumer clients

Hi Apoorv,

Thanks for the review.

AM5: Thanks, Updated the  KIP.

AM6: Currently we can achieve this with the combination of
offsetsForTimes() and seek() API..
Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
Long> timestampsToSearch) if required.
Let's see what others think.

Thanks,

On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <apoorvmitta...@gmail.com>
wrote:

Hi Manikumar,
Thanks for the changes. Just minor comment and a question:

AM5: The description "How to initialize the share-partition start
offset:"
for "*share.auto.offset.reset*" seems incomplete as it ends with `:`.
Should we write the details as defined in KIP-932 i.e.

How to initialize the share-partition start offset:

    -

    "earliest" : automatically reset the offset to the earliest offset
    -

    "latest" : automatically reset the offset to the latest offset
    -

    "*earliest_local": *automatically resets the offset to the earliest
    message stored in the local log on the broker.
    -

    "*by_duration": *automatically resets the offset to the earliest
offset
    whose timestamp is greater than or equal to the configured duration
    * (auto.offset.reset.by.duration).*


*or* maybe just replace ":" with "." i.e. "How to initialize the
share-partition start offset." as the details of the new configs are
defined at top in different sections as well.

AM6: Question: KafkaConsumer has public methods defined to "seek" for
the
reset strategy like "seekToBeginning" and "seekToEnd" which uses
OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think
it
would be sensible to add additional KafkaConsumer APIs for newly
introduced
strategies?

Regards,
Apoorv Mittal


On Tue, Nov 5, 2024 at 6:10 PM Manikumar <manikumar.re...@gmail.com>
wrote:

Hi Matthias,

Thanks for the review.

MS1: looks like KIP-842 is stuck in the voting. Current changes
should
fit into KIP-842 (few updates definitely required) as KIP-842 is
proposing to add new configs (auto.offset.reset.on.no.initial.offset
and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
after completing this KIP.

MS2: Added support for earliest-local config to reset to oldest local
offset and changes to kafka-consumer-group cmd.

MS3: Based on suggestions, I have updated the config option to
support
ISO format.

MS4: I have included the AutoOffsetReset changes to the KIP. I
definitely need your help on refining the API/solution for KS :)

Thanks





On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for the KIP.

A somewhat orthogonal question I have is, if we should try to merge
this
KIP with the existing KIP-842:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms

While KIP-842 has a different intention, ie, separating the case of
"no
offset during initial startup" vs "offset out of bounds during
processing", it might still be worth it? My motivation for this
question
is, that `auto.offset.reset` is one of the most central configs we
have,
and thus, doing a single larger change (ie, both KIPs together)
might be
better (less noise) than doing two independent changes? Thoughts? I
know
it might be a little bit of a stretch, but asking cannot hurt :)


About the KIP itself, did we consider to add something like
"latest-local" to say reset to oldest local, but not fetch from
tiered
storage?

For the format of the config, did we consider what we do in
`bin/kafka-consumer-group.sh`? It has multiple different options
like

    --by-duration <duration: format `PnDTnHnMnS`>
    --shift-by <number-of-records>

    --to-datetime <fixed point in time: format
`YYYY-MM-DDTHH:mm:SS.sss`>
    --to-offset <absolute offset>

As it also uses ISO format, it might be good to also use them (even
if I
can see the appeal and simplicity of what you proposed).


I also want to add to (AM1), as there is also
`Topology.AutoOffsetReset`
enum in Kafka Streams. I think we would need to convert this into a
class. While it could be done is a follow up KIP, too, it seems
best
to
do this holistically in a single KIP, because KS is not something
on-top
of Kafka, but it's part of Kafka. I am happy to help with the
design
and
even PRs if necessary, but would strongly prefer to do it all in a
single KIP.


Btw: if we add something like "latest-local", it might also be good
to
extend `bin/kafka-consumer-group.sh` accordingly (even if the tool
works
slightly different, as it does commit an offset and there could be
some
race condition between committing "latest-local", tiering, and when
the
consumer is actually started?


-Matthias


On 11/4/24 7:38 AM, Apoorv Mittal wrote:
Hi Manikumar,
Thanks for the KIP, this new strategy would be helpful in
specifying
fetch
behaviour.

AM1: The config `auto.offset.reset` is currently applied as per
the
enum
class OffsetResetStarategy which is part of kafka-clients javadoc
<

https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html
.
Are we also proposing to somehow add new definitions in the same
class?
However as new configurations will be a string representation
hence
are we
moving away from OffsetResetStartegy enum class altogether?
Should
we
include the change in the KIP as OffsetResetStarategy is part of
public
javadoc?

AM2: While I can see the ISO-8601 format is in the rejected
alternative,
should we not follow some standard of defining duration which has
already
been adopted in other systems?

AM3: We've introduced new config values using the format
minus-n-hours,
minus-n-days, minus-n-months, and minus-n-years. Should we
explicitly
define the "minus" prefix, or is it implied?

AM4: When supporting duration-based resets, should we also allow
users
to
specify a specific checkpoint time? For example, if a checkpoint
occurs 2
days, 5 hours and 30 minutes earlier, the current four formats
(minus-n-hours, minus-n-days, minus-n-months, minus-n-years)
might
not
be
sufficient. Should we consider adding a format to accommodate
specific
checkpoint times, or is there a reason to limit the supported
formats?


Regards,
Apoorv Mittal


On Mon, Nov 4, 2024 at 9:23 AM Manikumar <
manikumar.re...@gmail.com>
wrote:

Hi everyone,
I would like to start a discussion on KIP-1106:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients

This KIP proposes to add an additional auto offset reset
strategy
for
consumer clients.

Regards,
Manikumar






Reply via email to