Hi, Andrew, Thanks for the reply. I agree that ISO8601 is more human readable and less error prone to configure. It probably covers all common usage. So, we can keep it as it is.
Jun On Fri, Nov 8, 2024 at 10:26 AM Andrew Schofield < andrew_schofield_j...@outlook.com> wrote: > Hi Jun, > I suggest there are two reasons why ISO8601 is a more appropriate > choice than ms. > > First, kafka-consumer-groups.sh --reset-offsets --by-duration uses ISO8601. > This KIP is providing essentially the Kafka consumer equivalent of this > tool. > I favour consistency with the existing way of doing this. > > Second, I suggest that people are going to be thinking of larger time > units than > milliseconds in practice, such as a few hours or days. They should not > have to calculate how many milliseconds there are in a day. > > Thanks, > Andrew > ________________________________________ > From: Jun Rao <j...@confluent.io.INVALID> > Sent: 08 November 2024 17:37 > To: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset option > for consumer clients > > Hi, ManiKumar, > > Thanks for the updated PR. Another comment. > > JR3. Is there any benefit of using ISO8601 > for auto.offset.reset.by.duration instead of just ms in long? The latter > covers a broader range. Also, AutoOffsetResetByDuration uses Java During, > which covers a different granularity than ISO8601. It might be better to > just use ms for both. > > Jun > > On Fri, Nov 8, 2024 at 5:51 AM Manikumar <manikumar.re...@gmail.com> > wrote: > > > Hi All, > > > > I have updated the KIP with recent suggestions. > > > > Thanks. > > > > On Fri, Nov 8, 2024 at 11:53 AM Manikumar <manikumar.re...@gmail.com> > > wrote: > > > > > > Hi Matthias, > > > > > > Thanks for the comments. > > > > > > > I am personally not convinced that adding a new config > > > `auto.offset.reset.by.duration` is the best way. > > > > > > I think in this case, we felt it's better to add the strategy name to > > > auto.offset.reset config and add additional strategy specific configs. > > > This will be easy to understand and extend in future. I am open to > > > change if there are similar concerns. > > > > > > > One question about "earliest_local": if we fetch this offset from the > > > server, and seek it later, and start fetching, won't there be a race > > > condition? the server would just need to reload it from the tier. > > > > > > Yes, your understanding is correct. the server will read from the tier > > storage. > > > Anyhow, I will consider dropping earliest_local based on Jun's > > suggestion. > > > > > > > For Kafka Streams, just adding the new option so the existing enum > does > > > not work IMHO. If a user would use BY_DURATION, there is no option to > > pass > > > in the actual duration (and we might need to throw an exception). > > > > > > I was assuming we will be using the underlying consumer config here. I > > > will update based on your suggestions. > > > > > > > > > Thanks, > > > > > > > > > > > > > > > On Fri, Nov 8, 2024 at 6:21 AM Matthias J. Sax <mj...@apache.org> > wrote: > > > > > > > > Thanks for updating the KIP. > > > > > > > > I am personally not convinced that adding a new config > > > > `auto.offset.reset.by.duration` is the best way. Kafka in general > has > > > > way too many configs and trying to avoid adding more configs seems to > > be > > > > desirable? -- It seems this might be a point of contention, and if > the > > > > majority of people wants this new config so be it. I just wanted to > > > > stress my concerns about it. > > > > > > > > One question about "earliest_local": if we fetch this offset from the > > > > server, and seek to it later, and start fetching, won't there be a > race > > > > condition? The oldest segment could have been discarded locally in > the > > > > mean time? Maybe not too much of a big deal: the server would just > need > > > > to reload it from the tier. Just want to ensure my understanding is > > > > correct, and make sure nobody has concerns about it. > > > > > > > > For Kafka Streams, just adding the new option so the existing enum > does > > > > not work IMHO. > > > > > > > > For example, we have a method: > > > > > > > > addSource(final AutoOffsetReset offsetReset, > > > > final String name, > > > > final String... topics) > > > > > > > > If a user would use BY_DURATION, there is no option to pass in the > > > > actual duration (and we might need to throw an exception). And if we > > > > overload `addSource` adding a fourth parameter, it would be odd too, > as > > > > users could pass in `EARLIEST` plus a duration, and we would need to > > > > throw an exception or ignore the given duration. Overall not an easy > to > > > > use API. > > > > > > > > Thus, I would suggest to deprecate the existing `AutoOffsetReset` > enum, > > > > and add a new class `AutoOffsetReset` for which we can guide users > > > > accordingly avoiding corner cases: > > > > > > > > public class AutoOffsetReset() { > > > > public static AutoOffsetReset earliest(); > > > > public static AutoOffsetReset latest(); > > > > public static AutoOffsetReset byDuration(Duration duration); > > > > public static AutoOffsetReset earliestLocal(); > > > > } > > > > > > > > The existing enum is contained inside `Topology` so we can add the > new > > > > class just for package `org.apache.kafka.streams` to avoid naming > > conflicts. > > > > > > > > Additionally, we would deprecate all methods taking existing enum > > > > `AutoOffsetReset` (only two classed `Topology` and `Consumed` are > > > > affected) and add new overload which accepts the new `class > > > > AutoOffsetReset` instead. > > > > > > > > > > > > -Matthias > > > > > > > > On 11/7/24 1:57 PM, Jun Rao wrote: > > > > > Hi, Manikumar, > > > > > > > > > > Thanks for the KIP. A couple of comments. > > > > > > > > > > JR1. It doesn't seem that we need earliest_local. Intuitively, it > > makes > > > > > sense for a consumer app to bootstrap based on time. The time that > a > > topic > > > > > is kept locally seems irrelevant to a consumer application. > > > > > > > > > > JR2. I took a look at the proposal of KIP-842. It seems that this > KIP > > > > > covers the common scenario in KIP-842 and is simpler. The scenario > > > > > described in KIP-842 is about which initial offset to use when a > new > > > > > partition is added. This is more or less the same problem for > > choosing the > > > > > initial offset for existing partitions. Having a time based option > > provides > > > > > more flexibility than earliest or latest, and is likely good enough > > for > > > > > most usage. > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Wed, Nov 6, 2024 at 4:00 AM Apoorv Mittal < > > apoorvmitta...@gmail.com> > > > > > wrote: > > > > > > > > > >> Thanks Manikumar and Andrew. > > > > >> > > > > >> For AM6: Though I was thinking of enhancing OffsetFetcher and > adding > > > > >> "seekToLocalBeginning" where OffsetFetcher does use > > Admin.listOffsets API > > > > >> itself. However I agree that this is just another convenient way > > and not > > > > >> necessary to support functionality. Skipping it sounds good to me. > > > > >> > > > > >> Regards, > > > > >> Apoorv Mittal > > > > >> > > > > >> > > > > >> On Wed, Nov 6, 2024 at 9:12 AM Andrew Schofield < > > > > >> andrew_schofield_j...@outlook.com> wrote: > > > > >> > > > > >>> Hi Manikumar, > > > > >>> > > > > >>> AS7: In response to AM6, I agree that in general the combination > > > > >>> of offsetsForTimes() and seek() will do what is required. The one > > case > > > > >> that > > > > >>> I think is more tricky is seeking to the local > earliest/beginning. > > > > >>> > > > > >>> The Consumer API has Consumer.beginningOffsets and > > > > >>> seekToBeginning, which correspond to > OffsetResetStrategy.EARLIEST. > > > > >>> Similarly there are methods for OffsetResetStrategy.LATEST. > > > > >>> > > > > >>> With the Consumer API, I don't think you can find the local > > earliest > > > > >>> offset. > > > > >>> That can be found using Admin.listOffsets with > > OffsetSpec.EarliestLocal. > > > > >>> We could add equivalents of Consumer.beginningOffsets and > > > > >>> Consumer.seekToBeginning. An application could always use the > > > > >>> Admin API with the Consumer API. > > > > >>> > > > > >>> Personally, I would not complicate this KIP with these new > methods > > when > > > > >>> the focus is more on duration-based offset reset and configs. > > > > >>> > > > > >>> Thanks, > > > > >>> Andrew > > > > >>> ________________________________________ > > > > >>> From: Manikumar <manikumar.re...@gmail.com> > > > > >>> Sent: 06 November 2024 04:20 > > > > >>> To: dev@kafka.apache.org <dev@kafka.apache.org> > > > > >>> Subject: Re: [DISCUSS] KIP-1106: Add duration based offset reset > > option > > > > >>> for consumer clients > > > > >>> > > > > >>> Hi Apoorv, > > > > >>> > > > > >>> Thanks for the review. > > > > >>> > > > > >>> AM5: Thanks, Updated the KIP. > > > > >>> > > > > >>> AM6: Currently we can achieve this with the combination of > > > > >>> offsetsForTimes() and seek() API.. > > > > >>> Maybe we can add a generic API like > seekToTimes(Map<TopicPartition, > > > > >>> Long> timestampsToSearch) if required. > > > > >>> Let's see what others think. > > > > >>> > > > > >>> Thanks, > > > > >>> > > > > >>> On Wed, Nov 6, 2024 at 1:45 AM Apoorv Mittal < > > apoorvmitta...@gmail.com> > > > > >>> wrote: > > > > >>>> > > > > >>>> Hi Manikumar, > > > > >>>> Thanks for the changes. Just minor comment and a question: > > > > >>>> > > > > >>>> AM5: The description "How to initialize the share-partition > start > > > > >>> offset:" > > > > >>>> for "*share.auto.offset.reset*" seems incomplete as it ends with > > `:`. > > > > >>>> Should we write the details as defined in KIP-932 i.e. > > > > >>>> > > > > >>>> How to initialize the share-partition start offset: > > > > >>>> > > > > >>>> - > > > > >>>> > > > > >>>> "earliest" : automatically reset the offset to the earliest > > offset > > > > >>>> - > > > > >>>> > > > > >>>> "latest" : automatically reset the offset to the latest > offset > > > > >>>> - > > > > >>>> > > > > >>>> "*earliest_local": *automatically resets the offset to the > > earliest > > > > >>>> message stored in the local log on the broker. > > > > >>>> - > > > > >>>> > > > > >>>> "*by_duration": *automatically resets the offset to the > > earliest > > > > >>> offset > > > > >>>> whose timestamp is greater than or equal to the configured > > duration > > > > >>>> * (auto.offset.reset.by.duration).* > > > > >>>> > > > > >>>> > > > > >>>> *or* maybe just replace ":" with "." i.e. "How to initialize the > > > > >>>> share-partition start offset." as the details of the new configs > > are > > > > >>>> defined at top in different sections as well. > > > > >>>> > > > > >>>> AM6: Question: KafkaConsumer has public methods defined to > "seek" > > for > > > > >> the > > > > >>>> reset strategy like "seekToBeginning" and "seekToEnd" which uses > > > > >>>> OffsetResetStrategy of EARLIEST and LATEST respectively. Do you > > think > > > > >> it > > > > >>>> would be sensible to add additional KafkaConsumer APIs for newly > > > > >>> introduced > > > > >>>> strategies? > > > > >>>> > > > > >>>> Regards, > > > > >>>> Apoorv Mittal > > > > >>>> > > > > >>>> > > > > >>>> On Tue, Nov 5, 2024 at 6:10 PM Manikumar < > > manikumar.re...@gmail.com> > > > > >>> wrote: > > > > >>>> > > > > >>>>> Hi Matthias, > > > > >>>>> > > > > >>>>> Thanks for the review. > > > > >>>>> > > > > >>>>> MS1: looks like KIP-842 is stuck in the voting. Current changes > > > > >> should > > > > >>>>> fit into KIP-842 (few updates definitely required) as KIP-842 > is > > > > >>>>> proposing to add new configs (auto.offset.reset.on.no > > .initial.offset > > > > >>>>> and auto.offset.reset.on.invalid.offset). I prefer to revive > > KIP-842 > > > > >>>>> after completing this KIP. > > > > >>>>> > > > > >>>>> MS2: Added support for earliest-local config to reset to oldest > > local > > > > >>>>> offset and changes to kafka-consumer-group cmd. > > > > >>>>> > > > > >>>>> MS3: Based on suggestions, I have updated the config option to > > > > >> support > > > > >>>>> ISO format. > > > > >>>>> > > > > >>>>> MS4: I have included the AutoOffsetReset changes to the KIP. I > > > > >>>>> definitely need your help on refining the API/solution for KS > :) > > > > >>>>> > > > > >>>>> Thanks > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> On Tue, Nov 5, 2024 at 7:26 AM Matthias J. Sax < > mj...@apache.org > > > > > > > >>> wrote: > > > > >>>>>> > > > > >>>>>> Thanks for the KIP. > > > > >>>>>> > > > > >>>>>> A somewhat orthogonal question I have is, if we should try to > > merge > > > > >>> this > > > > >>>>>> KIP with the existing KIP-842: > > > > >>>>>> > > > > >>>>> > > > > >>> > > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms > > > > >>>>>> > > > > >>>>>> While KIP-842 has a different intention, ie, separating the > > case of > > > > >>> "no > > > > >>>>>> offset during initial startup" vs "offset out of bounds during > > > > >>>>>> processing", it might still be worth it? My motivation for > this > > > > >>> question > > > > >>>>>> is, that `auto.offset.reset` is one of the most central > configs > > we > > > > >>> have, > > > > >>>>>> and thus, doing a single larger change (ie, both KIPs > together) > > > > >>> might be > > > > >>>>>> better (less noise) than doing two independent changes? > > Thoughts? I > > > > >>> know > > > > >>>>>> it might be a little bit of a stretch, but asking cannot hurt > :) > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> About the KIP itself, did we consider to add something like > > > > >>>>>> "latest-local" to say reset to oldest local, but not fetch > from > > > > >>> tiered > > > > >>>>>> storage? > > > > >>>>>> > > > > >>>>>> For the format of the config, did we consider what we do in > > > > >>>>>> `bin/kafka-consumer-group.sh`? It has multiple different > options > > > > >> like > > > > >>>>>> > > > > >>>>>> --by-duration <duration: format `PnDTnHnMnS`> > > > > >>>>>> --shift-by <number-of-records> > > > > >>>>>> > > > > >>>>>> --to-datetime <fixed point in time: format > > > > >>> `YYYY-MM-DDTHH:mm:SS.sss`> > > > > >>>>>> --to-offset <absolute offset> > > > > >>>>>> > > > > >>>>>> As it also uses ISO format, it might be good to also use them > > (even > > > > >>> if I > > > > >>>>>> can see the appeal and simplicity of what you proposed). > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> I also want to add to (AM1), as there is also > > > > >>> `Topology.AutoOffsetReset` > > > > >>>>>> enum in Kafka Streams. I think we would need to convert this > > into a > > > > >>>>>> class. While it could be done is a follow up KIP, too, it > seems > > > > >> best > > > > >>> to > > > > >>>>>> do this holistically in a single KIP, because KS is not > > something > > > > >>> on-top > > > > >>>>>> of Kafka, but it's part of Kafka. I am happy to help with the > > > > >> design > > > > >>> and > > > > >>>>>> even PRs if necessary, but would strongly prefer to do it all > > in a > > > > >>>>>> single KIP. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Btw: if we add something like "latest-local", it might also be > > good > > > > >>> to > > > > >>>>>> extend `bin/kafka-consumer-group.sh` accordingly (even if the > > tool > > > > >>> works > > > > >>>>>> slightly different, as it does commit an offset and there > could > > be > > > > >>> some > > > > >>>>>> race condition between committing "latest-local", tiering, and > > when > > > > >>> the > > > > >>>>>> consumer is actually started? > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> -Matthias > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On 11/4/24 7:38 AM, Apoorv Mittal wrote: > > > > >>>>>>> Hi Manikumar, > > > > >>>>>>> Thanks for the KIP, this new strategy would be helpful in > > > > >>> specifying > > > > >>>>> fetch > > > > >>>>>>> behaviour. > > > > >>>>>>> > > > > >>>>>>> AM1: The config `auto.offset.reset` is currently applied as > per > > > > >> the > > > > >>>>> enum > > > > >>>>>>> class OffsetResetStarategy which is part of kafka-clients > > javadoc > > > > >>>>>>> < > > > > >>>>> > > > > >>> > > https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/index.html > > > > >>> . > > > > >>>>>>> Are we also proposing to somehow add new definitions in the > > same > > > > >>> class? > > > > >>>>>>> However as new configurations will be a string representation > > > > >> hence > > > > >>>>> are we > > > > >>>>>>> moving away from OffsetResetStartegy enum class altogether? > > > > >> Should > > > > >>> we > > > > >>>>>>> include the change in the KIP as OffsetResetStarategy is part > > of > > > > >>> public > > > > >>>>>>> javadoc? > > > > >>>>>>> > > > > >>>>>>> AM2: While I can see the ISO-8601 format is in the rejected > > > > >>>>> alternative, > > > > >>>>>>> should we not follow some standard of defining duration which > > has > > > > >>>>> already > > > > >>>>>>> been adopted in other systems? > > > > >>>>>>> > > > > >>>>>>> AM3: We've introduced new config values using the format > > > > >>> minus-n-hours, > > > > >>>>>>> minus-n-days, minus-n-months, and minus-n-years. Should we > > > > >>> explicitly > > > > >>>>>>> define the "minus" prefix, or is it implied? > > > > >>>>>>> > > > > >>>>>>> AM4: When supporting duration-based resets, should we also > > allow > > > > >>> users > > > > >>>>> to > > > > >>>>>>> specify a specific checkpoint time? For example, if a > > checkpoint > > > > >>>>> occurs 2 > > > > >>>>>>> days, 5 hours and 30 minutes earlier, the current four > formats > > > > >>>>>>> (minus-n-hours, minus-n-days, minus-n-months, minus-n-years) > > > > >> might > > > > >>> not > > > > >>>>> be > > > > >>>>>>> sufficient. Should we consider adding a format to accommodate > > > > >>> specific > > > > >>>>>>> checkpoint times, or is there a reason to limit the supported > > > > >>> formats? > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Regards, > > > > >>>>>>> Apoorv Mittal > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Mon, Nov 4, 2024 at 9:23 AM Manikumar < > > > > >>> manikumar.re...@gmail.com> > > > > >>>>> wrote: > > > > >>>>>>> > > > > >>>>>>>> Hi everyone, > > > > >>>>>>>> I would like to start a discussion on KIP-1106: > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>> > > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients > > > > >>>>>>>> > > > > >>>>>>>> This KIP proposes to add an additional auto offset reset > > > > >> strategy > > > > >>> for > > > > >>>>>>>> consumer clients. > > > > >>>>>>>> > > > > >>>>>>>> Regards, > > > > >>>>>>>> Manikumar > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > >