Thanks for updating the KIP.

I am personally not convinced that adding a new config `` is the best way. Kafka in general has way too many configs and trying to avoid adding more configs seems to be desirable? -- It seems this might be a point of contention, and if the majority of people wants this new config so be it. I just wanted to stress my concerns about it.

One question about "earliest_local": if we fetch this offset from the server, and seek to it later, and start fetching, won't there be a race condition? The oldest segment could have been discarded locally in the mean time? Maybe not too much of a big deal: the server would just need to reload it from the tier. Just want to ensure my understanding is correct, and make sure nobody has concerns about it.

For Kafka Streams, just adding the new option so the existing enum does not work IMHO.

For example, we have a method:

    addSource(final AutoOffsetReset offsetReset,
              final String name,
              final String... topics)

If a user would use BY_DURATION, there is no option to pass in the actual duration (and we might need to throw an exception). And if we overload `addSource` adding a fourth parameter, it would be odd too, as users could pass in `EARLIEST` plus a duration, and we would need to throw an exception or ignore the given duration. Overall not an easy to use API.

Thus, I would suggest to deprecate the existing `AutoOffsetReset` enum, and add a new class `AutoOffsetReset` for which we can guide users accordingly avoiding corner cases:

public class AutoOffsetReset() {
  public static AutoOffsetReset earliest();
  public static AutoOffsetReset latest();
  public static AutoOffsetReset byDuration(Duration duration);
  public static AutoOffsetReset earliestLocal();

The existing enum is contained inside `Topology` so we can add the new class just for package `org.apache.kafka.streams` to avoid naming conflicts.

Additionally, we would deprecate all methods taking existing enum `AutoOffsetReset` (only two classed `Topology` and `Consumed` are affected) and add new overload which accepts the new `class AutoOffsetReset` instead.


On 11/7/24 1:57 PM, Jun Rao wrote:
Hi, Manikumar,

Thanks for the KIP. A couple of comments.

JR1. It doesn't seem that we need earliest_local. Intuitively, it makes
sense for a consumer app to bootstrap based on time. The time that a topic
is kept locally seems irrelevant to a consumer application.

JR2. I took a look at the proposal of KIP-842. It seems that this KIP
covers the common scenario in KIP-842 and is simpler. The scenario
described in KIP-842 is about which initial offset to use when a new
partition is added. This is more or less the same problem for choosing the
initial offset for existing partitions. Having a time based option provides
more flexibility than earliest or latest, and is likely good enough for
most usage.


On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal <>

Thanks Manikumar and Andrew.

For AM6: Though I was thinking of enhancing OffsetFetcher and adding
"seekToLocalBeginning" where OffsetFetcher does use Admin.listOffsets API
itself. However I agree that this is just another convenient way and not
necessary to support functionality. Skipping it sounds good to me.

Apoorv Mittal

On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield <> wrote:

Hi Manikumar,

AS7: In response to AM6, I agree that in general the combination
of offsetsForTimes() and seek() will do what is required. The one case
I think is more tricky is seeking to the local earliest/beginning.

The Consumer API has Consumer.beginningOffsets and
seekToBeginning, which correspond to OffsetResetStrategy.EARLIEST.
Similarly there are methods for OffsetResetStrategy.LATEST.

With the Consumer API, I don't think you can find the local earliest
That can be found using Admin.listOffsets with OffsetSpec.EarliestLocal.
We could add equivalents of Consumer.beginningOffsets and
Consumer.seekToBeginning. An application could always use the
Admin API with the Consumer API.

Personally, I would not complicate this KIP with these new methods when
the focus is more on duration-based offset reset and configs.

From: Manikumar <>
Sent: 06 November 2024 04:20
To: <>
Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option
for consumer clients

Hi Apoorv,

Thanks for the review.

AM5: Thanks, Updated the  KIP.

AM6: Currently we can achieve this with the combination of
offsetsForTimes() and seek() API..
Maybe we can add a generic API like seekToTimes(Map<TopicPartition,
Long> timestampsToSearch) if required.
Let's see what others think.


On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal <>

Hi Manikumar,
Thanks for the changes. Just minor comment and a question:

AM5: The description "How to initialize the share-partition start
for "**" seems incomplete as it ends with `:`.
Should we write the details as defined in KIP-932 i.e.

How to initialize the share-partition start offset:


    "earliest" : automatically reset the offset to the earliest offset

    "latest" : automatically reset the offset to the latest offset

    "*earliest_local": *automatically resets the offset to the earliest
    message stored in the local log on the broker.

    "*by_duration": *automatically resets the offset to the earliest
    whose timestamp is greater than or equal to the configured duration
    * (*

*or* maybe just replace ":" with "." i.e. "How to initialize the
share-partition start offset." as the details of the new configs are
defined at top in different sections as well.

AM6: Question: KafkaConsumer has public methods defined to "seek" for
reset strategy like "seekToBeginning" and "seekToEnd" which uses
OffsetResetStrategy of EARLIEST and LATEST respectively. Do you think
would be sensible to add additional KafkaConsumer APIs for newly

Apoorv Mittal

On Tue, Nov 5, 2024 at 6:10 PM Manikumar <>

Hi Matthias,

Thanks for the review.

MS1: looks like KIP-842 is stuck in the voting. Current changes
fit into KIP-842 (few updates definitely required) as KIP-842 is
proposing to add new configs (
and auto.offset.reset.on.invalid.offset). I prefer to revive KIP-842
after completing this KIP.

MS2: Added support for earliest-local config to reset to oldest local
offset and changes to kafka-consumer-group cmd.

MS3: Based on suggestions, I have updated the config option to
ISO format.

MS4: I have included the AutoOffsetReset changes to the KIP. I
definitely need your help on refining the API/solution for KS :)


On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax <>

Thanks for the KIP.

A somewhat orthogonal question I have is, if we should try to merge
KIP with the existing KIP-842:

While KIP-842 has a different intention, ie, separating the case of
offset during initial startup" vs "offset out of bounds during
processing", it might still be worth it? My motivation for this
is, that `auto.offset.reset` is one of the most central configs we
and thus, doing a single larger change (ie, both KIPs together)
might be
better (less noise) than doing two independent changes? Thoughts? I
it might be a little bit of a stretch, but asking cannot hurt :)

About the KIP itself, did we consider to add something like
"latest-local" to say reset to oldest local, but not fetch from

For the format of the config, did we consider what we do in
`bin/`? It has multiple different options

    --by-duration <duration: format `PnDTnHnMnS`>
    --shift-by <number-of-records>

    --to-datetime <fixed point in time: format
    --to-offset <absolute offset>

As it also uses ISO format, it might be good to also use them (even
if I
can see the appeal and simplicity of what you proposed).

I also want to add to (AM1), as there is also
enum in Kafka Streams. I think we would need to convert this into a
class. While it could be done is a follow up KIP, too, it seems
do this holistically in a single KIP, because KS is not something
of Kafka, but it's part of Kafka. I am happy to help with the
even PRs if necessary, but would strongly prefer to do it all in a
single KIP.

Btw: if we add something like "latest-local", it might also be good
extend `bin/` accordingly (even if the tool
slightly different, as it does commit an offset and there could be
race condition between committing "latest-local", tiering, and when
consumer is actually started?


On 11/4/24 7:38 AM, Apoorv Mittal wrote:
Hi Manikumar,
Thanks for the KIP, this new strategy would be helpful in

AM1: The config `auto.offset.reset` is currently applied as per
class OffsetResetStarategy which is part of kafka-clients javadoc
Are we also proposing to somehow add new definitions in the same
However as new configurations will be a string representation
are we
moving away from OffsetResetStartegy enum class altogether?
include the change in the KIP as OffsetResetStarategy is part of

AM2: While I can see the ISO-8601 format is in the rejected
should we not follow some standard of defining duration which has
been adopted in other systems?

AM3: We've introduced new config values using the format
minus-n-days, minus-n-months, and minus-n-years. Should we
define the "minus" prefix, or is it implied?

AM4: When supporting duration-based resets, should we also allow
specify a specific checkpoint time? For example, if a checkpoint
occurs 2
days, 5 hours and 30 minutes earlier, the current four formats
(minus-n-hours, minus-n-days, minus-n-months, minus-n-years)
sufficient. Should we consider adding a format to accommodate
checkpoint times, or is there a reason to limit the supported

Apoorv Mittal

On Mon, Nov 4, 2024 at 9:23 AM Manikumar <>

Hi everyone,
I would like to start a discussion on KIP-1106:

This KIP proposes to add an additional auto offset reset
consumer clients.


Reply via email to