Hi Matthias and everyone— Some clarification is necessary just for posterity. It turns out that, on a fresh standby task before we start polling for records, we wouldn't be able to get the current end offset without a network call. This leaves us three options:
A) Make it an Optional<Long> or use a sentinel value to mark that it's not present. B) Perform a network call to get the endOffset when it's not there. C) Remove it. Option A) seems like it could be a confusing API, especially because in the strong majority of cases, the Optional would be empty. Option B) is undesirable because of the performance considerations—if we're going to make a network round trip, we might as well get some records back! That leaves us with option C), which is the least-bad of all of them. At LittleHorse we actually do care about the endOffset in the onUpdateStart() method, and having it would be useful to us. However, the work-around isn't horrible, because the endOffset will be passed into the first call to onBatchLoaded() , which normally follows onUpdateStart() within <100ms. Thanks, Colt McNealy *Founder, LittleHorse.dev* On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax <mj...@apache.org> wrote: > > parameter is somewhat irrelevant to our use case > > Sounds like a weird justification to change the KIP. Providing more > information is usually better than less, so it seems it won't hurt to > just keep it (seems useful in general to get the current end offset in > this callback) -- you can always ignore it, if it's not relevant for > your use case. > > > -Matthias > > > On 11/30/23 6:56 AM, Eduwer Camacaro wrote: > > Hello everyone, > > > > We have come to the conclusion, during our work on this KIP's > > implementation, that the #onUpdateStart callback's "currentEndOffset" > > parameter is somewhat irrelevant to our use case. When this callback is > > invoked, I think this value is usually unknown. Our choice to delete this > > parameter from the #onUpdateStart callback requires an update to the KIP. > > > > Please feel free to review the PR and provide any comments you may have. > :) > > Thanks in advance > > > > Edu- > > > > On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Thanks. SGTM. > >> > >> On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote: > >>> That all sounds good to me! Thanks for the KIP > >>> > >>> On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy <c...@littlehorse.io> > >> wrote: > >>> > >>>> Hi Sophie, Matthias, Bruno, and Eduwer— > >>>> > >>>> Thanks for your patience as I have been scrambling to catch up after a > >> week > >>>> of business travel (and a few days with no time to code). I'd like to > >> tie > >>>> up some loose ends here, but in short, I don't think the KIP document > >>>> itself needs any changes (our internal implementation does, however). > >>>> > >>>> 1. In the interest of a) not changing the KIP after it's already out > >> for a > >>>> vote, and b) making sure our English grammar is "correct", let's stick > >> with > >>>> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. > >>>> > >>>> 2. For me (and, thankfully, the community as well) adding a remote > >> network > >>>> call at any point in this KIP is a non-starter. We'll ensure that > >>>> our implementation does not introduce one. > >>>> > >>>> 3. I really don't like changing API behavior, even if it's not > >> documented > >>>> in the javadoc. As such, I am strongly against modifying the behavior > of > >>>> endOffsets() on the consumer as some people may implicitly depend on > the > >>>> contract. > >>>> 3a. The Consumer#currentLag() method gives us exactly what we want > >> without > >>>> a network call (current lag from a cache, from which we can compute > the > >>>> offset). > >>>> > >>>> 4. I have no opinion about whether we should pass endOffset or > >> currentLag > >>>> to the callback. Either one has the same exact information inside it. > In > >>>> the interest of not changing the KIP after the vote has started, I'll > >> leave > >>>> it as endOffset. > >>>> > >>>> As such, I believe the KIP doesn't need any updates, nor has it been > >>>> updated since the vote started. > >>>> > >>>> Would anyone else like to discuss something before the Otter Council > >>>> adjourns regarding this matter? > >>>> > >>>> Cheers, > >>>> Colt McNealy > >>>> > >>>> *Founder, LittleHorse.dev* > >>>> > >>>> > >>>> On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < > >>>> sop...@responsive.dev> > >>>> wrote: > >>>> > >>>>> Just want to checkpoint the current state of this KIP and make sure > >> we're > >>>>> on track to get it in to 3.7 (we still have a few weeks) -- looks > like > >>>>> there are two remaining open questions, both relating to the > >>>>> middle/intermediate callback: > >>>>> > >>>>> 1. What to name it: seems like the primary candidates are > onBatchLoaded > >>>> and > >>>>> onBatchUpdated (and maybe also onStandbyUpdated?) > >>>>> 2. What additional information can we pass in that would strike a > good > >>>>> balance between being helpful and impacting performance. > >>>>> > >>>>> Regarding #1, I think all of the current options are reasonable > enough > >>>> that > >>>>> we should just let Colt decide which he prefers. I personally think > >>>>> #onBatchUpdated is fine -- Bruno does make a fair point but the truth > >> is > >>>>> that English grammar can be sticky and while it could be argued that > it > >>>> is > >>>>> the store which is updated, not the batch, I feel that it is > perfectly > >>>>> clear what is meant by "onBatchUpdated" and to me, this doesn't sound > >>>> weird > >>>>> at all. That's just my two cents in case it helps, but again, > whatever > >>>>> makes sense to you Colt is fine > >>>>> > >>>>> When it comes to #2 -- as much as I would love to dig into the > Consumer > >>>>> client lore and see if we can modify existing APIs or add new ones in > >>>> order > >>>>> to get the desired offset metadata in an efficient way, I think we're > >>>>> starting to go down a rabbit hole that is going to expand the scope > way > >>>>> beyond what Colt thought he was signing up for. I would advocate to > >> focus > >>>>> on just the basic feature for now and drop the end-offset from the > >>>>> callback. Once we have a standby listener it will be easy to expand > on > >>>> with > >>>>> a followup KIP if/when we find an efficient way to add additional > >> useful > >>>>> information. I think it will also become more clear what is and isn't > >>>>> useful after more people get to using it in the real world > >>>>> > >>>>> Colt/Eduwer: how necessary is receiving the end offset during a batch > >>>>> update to your own application use case? > >>>>> > >>>>> Also, for those who really do need to check the current end offset, I > >>>>> believe in theory you should be able to use the KafkaStreams#metrics > >> API > >>>> to > >>>>> get the current lag and/or end offset for the changelog -- it's > >> possible > >>>>> this does not represent the most up-to-date end offset (I'm not sure > it > >>>>> does or does not), but it should be close enough to be reliable and > >>>> useful > >>>>> for the purpose of monitoring -- I mean it is a metric, after all. > >>>>> > >>>>> Hope this helps -- in the end, it's up to you (Colt) to decide what > you > >>>>> want to bring in scope or not. We still have more than 3 weeks until > >> the > >>>>> KIP freeze as currently proposed, so in theory you could even > implement > >>>>> this KIP without the end offset and then do a followup KIP to add the > >> end > >>>>> offset within the same release, ie without any deprecations. There > are > >>>>> plenty of paths forward here, so don't let us drag this out forever > if > >>>> you > >>>>> know what you want > >>>>> > >>>>> Cheers, > >>>>> Sophie > >>>>> > >>>>> On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>> > >>>>>> Forgot one thing: > >>>>>> > >>>>>> We could also pass `currentLag()` into `onBachLoaded()` instead of > >>>>>> end-offset. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 10/20/23 10:56 AM, Matthias J. Sax wrote: > >>>>>>> Thanks for digging into this Bruno. > >>>>>>> > >>>>>>> The JavaDoc on the consumer does not say anything specific about > >>>>>>> `endOffset` guarantees: > >>>>>>> > >>>>>>>> Get the end offsets for the given partitions. In the default > {@code > >>>>>>>> read_uncommitted} isolation level, the end > >>>>>>>> offset is the high watermark (that is, the offset of the last > >>>>>>>> successfully replicated message plus one). For > >>>>>>>> {@code read_committed} consumers, the end offset is the last > stable > >>>>>>>> offset (LSO), which is the minimum of > >>>>>>>> the high watermark and the smallest offset of any open > transaction. > >>>>>>>> Finally, if the partition has never been > >>>>>>>> written to, the end offset is 0. > >>>>>>> > >>>>>>> Thus, I actually believe that it would be ok to change the > >>>>>>> implementation and serve the answer from the `TopicPartitionState`? > >>>>>>> > >>>>>>> Another idea would be, to use `currentLag()` in combination with > >>>>>>> `position()` (or the offset of the last read record) to compute the > >>>>>>> end-offset of the fly? > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> On 10/20/23 4:00 AM, Bruno Cadonna wrote: > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> Matthias is correct that the end offsets are stored somewhere in > the > >>>>>>>> metadata of the consumer. More precisely, they are stored in the > >>>>>>>> `TopicPartitionState`. However, I could not find public API on the > >>>>>>>> consumer other than currentLag() that uses the stored end offsets. > >>>> If > >>>>>>>> I understand the code correctly, method endOffSets() always > >>>> triggers a > >>>>>>>> remote call. > >>>>>>>> > >>>>>>>> I am a bit concerned about doing remote calls every > >>>>> commit.interval.ms > >>>>>>>> (by default 200ms under EOS). At the moment the remote calls are > >>>> only > >>>>>>>> issued if an optimization for KTables is turned on where changelog > >>>>>>>> topics are replaced with the input topic of the KTable. The > current > >>>>>>>> remote calls retrieve all committed offsets of the group at once. > >>>> If I > >>>>>>>> understand correctly, that is one single remote call. Remote calls > >>>> for > >>>>>>>> getting end offsets of changelog topics -- as I understand you are > >>>>>>>> planning to issue -- will probably result in multiple remote calls > >>>> to > >>>>>>>> multiple leaders of the changelog topic partitions. > >>>>>>>> > >>>>>>>> Please correct me if I misunderstood anything of the above. > >>>>>>>> > >>>>>>>> If my understanding is correct, I propose to modify the consumer > in > >>>>>>>> such a way to get the end offset from the locally stored metadata > >>>>>>>> whenever possible as part of the implementation of this KIP. I do > >>>> not > >>>>>>>> know what the implications are of such a change of the consumer > and > >>>> if > >>>>>>>> a KIP is needed for it. Maybe, endOffsets() guarantees to return > the > >>>>>>>> freshest end offsets possible, which would not be satisfied with > the > >>>>>>>> modification. > >>>>>>>> > >>>>>>>> Regarding the naming, I do not completely agree with Matthias. > While > >>>>>>>> the pattern might be consistent with onBatchUpdated, what is the > >>>>>>>> meaning of onBatchUpdated? Is the batch updated? The names > >>>>>>>> onBatchLoaded or onBatchWritten or onBatchAdded are more clear > IMO. > >>>>>>>> With "restore" the pattern works better. If I restore a batch of > >>>>>>>> records in a state, the records are not there although they should > >>>> be > >>>>>>>> there and I add them. If I update a batch of records in a state. > >>>> This > >>>>>>>> sounds like the batch of records is in the state and I modify the > >>>>>>>> existing records within the state. That is clearly not the meaning > >>>> of > >>>>>>>> the event for which the listener should be called. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Bruno > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 10/19/23 2:12 AM, Matthias J. Sax wrote: > >>>>>>>>> Thanks for the KIP. Seems I am almost late to the party. > >>>>>>>>> > >>>>>>>>> About naming (fun, fun, fun): I like the current proposal > overall, > >>>>>>>>> except `onBachLoaded`, but would prefer `onBatchUpdated`. It > better > >>>>>>>>> aligns to everything else: > >>>>>>>>> > >>>>>>>>> - it's an update-listener, not loaded-listener > >>>>>>>>> - `StateRestoreListener` has `onRestoreStart`, > `onRestoreEnd`, > >>>>>>>>> `onRestoreSuspended, and `onBachRestored` (it's very consistent > >>>>>>>>> - `StandbyUpdateListener` should have `onUpdateStart`, > >>>>>>>>> `onUpdateSuspended` and `onBatchUpdated` to be equally > consistent > >>>>>>>>> (using "loaded" breaks the pattern) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> About the end-offset question: I am relatively sure that the > >>>> consumer > >>>>>>>>> gets the latest end-offset as attached metadata in every fetch > >>>>>>>>> response. (We exploit this behavior to track end-offsets for > input > >>>>>>>>> topic with regard to `max.task.idle.ms` without overhead -- it > was > >>>>>>>>> also a concern when we did the corresponding KIP how we could > track > >>>>>>>>> lag with no overhead). > >>>>>>>>> > >>>>>>>>> Thus, I believe we would "just" need to modify the code > accordingly > >>>>>>>>> to get this information from the restore-consumer > >>>>>>>>> (`restorConsumer.endOffsets(...)`; should be served w/o RPC but > >>>> from > >>>>>>>>> internal metadata cache) for free, and pass into the listener. > >>>>>>>>> > >>>>>>>>> Please double check / verify this claim and keep me honest about > >>>> it. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 10/17/23 6:38 AM, Eduwer Camacaro wrote: > >>>>>>>>>> Hi Bruno, > >>>>>>>>>> > >>>>>>>>>> Thanks for your observation; surely it will require a network > call > >>>>>>>>>> using > >>>>>>>>>> the admin client in order to know this "endOffset" and that will > >>>>>>>>>> have an > >>>>>>>>>> impact on performance. We can either find a solution that has a > >>>> low > >>>>>>>>>> impact > >>>>>>>>>> on performance or ideally zero impact; unfortunately, I don't > see > >>>> a > >>>>>>>>>> way to > >>>>>>>>>> have zero impact on performance. However, we can leverage the > >>>>> existing > >>>>>>>>>> #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses > >>>> the > >>>>>>>>>> admin > >>>>>>>>>> client to ask for these "endOffset"s. As far I can understand, > >>>> this > >>>>>>>>>> update > >>>>>>>>>> is done periodically using the "commit.interval.ms" > >>>> configuration. > >>>>> I > >>>>>>>>>> believe this option will force us to invoke StandbyUpdateLister > >>>> once > >>>>>>>>>> this > >>>>>>>>>> interval is reached. > >>>>>>>>>> > >>>>>>>>>> On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna < > cado...@apache.org > >>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks for the KIP, Colt and Eduwer, > >>>>>>>>>>> > >>>>>>>>>>> Are you sure there is also not a significant performance impact > >>>> for > >>>>>>>>>>> passing into the callback `currentEndOffset`? > >>>>>>>>>>> > >>>>>>>>>>> I am asking because the comment here: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 > >>>>>>>>>>> > >>>>>>>>>>> says that the end-offset is only updated once for standby tasks > >>>>> whose > >>>>>>>>>>> changelog topic is not piggy-backed on input topics. I could > also > >>>>> not > >>>>>>>>>>> find the update of end-offset for those standbys. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Bruno > >>>>>>>>>>> > >>>>>>>>>>> On 10/16/23 10:55 AM, Lucas Brutschy wrote: > >>>>>>>>>>>> Hi all, > >>>>>>>>>>>> > >>>>>>>>>>>> it's a nice improvement! I don't have anything to add on top > of > >>>>> the > >>>>>>>>>>>> previous comments, just came here to say that it seems to me > >>>>>>>>>>>> consensus > >>>>>>>>>>>> has been reached and the result looks good to me. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks Colt and Eduwer! > >>>>>>>>>>>> Lucas > >>>>>>>>>>>> > >>>>>>>>>>>> On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy < > >>>> c...@littlehorse.io > >>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, Guozhang. I've updated the KIP and will start a vote. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>> > >>>>>>>>>>>>> *Founder, LittleHorse.dev* > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang < > >>>>>>>>>>> guozhang.wang...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the summary, that looks good to me. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy < > >>>>> c...@littlehorse.io > >>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hello there! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks everyone for the comments. There's a lot of > >>>>> back-and-forth > >>>>>>>>>>> going > >>>>>>>>>>>>>> on, > >>>>>>>>>>>>>>> so I'll do my best to summarize what everyone's said in > TLDR > >>>>>>>>>>>>>>> format: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, > and > >>>>> do > >>>>>>>>>>>>>> similarly > >>>>>>>>>>>>>>> for the other methods. > >>>>>>>>>>>>>>> 2. Keep `SuspendReason.PROMOTED` and > >>>> `SuspendReason.MIGRATED`. > >>>>>>>>>>>>>>> 3. Remove the `earliestOffset` parameter for performance > >>>>> reasons. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> If that's all fine with everyone, I'll update the KIP and > >>>>>> we—well, > >>>>>>>>>>> mostly > >>>>>>>>>>>>>>> Edu (: —will open a PR. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> *Founder, LittleHorse.dev* > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro < > >>>>>>>>>>> edu...@littlehorse.io> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello everyone, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for all your feedback for this KIP! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think that the key to choosing proper names for this API > >>>> is > >>>>>>>>>>>>>> understanding > >>>>>>>>>>>>>>>> the terms used inside the StoreChangelogReader. Currently, > >>>>>>>>>>>>>>>> this class > >>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>> two possible states: ACTIVE_RESTORING and > STANDBY_UPDATING. > >>>> In > >>>>>> my > >>>>>>>>>>>>>> opinion, > >>>>>>>>>>>>>>>> using StandbyUpdateListener for the interface fits better > on > >>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>> terms. > >>>>>>>>>>>>>>>> Same applies for onUpdateStart/Suspended. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> StoreChangelogReader uses "the same mechanism" for active > >>>> task > >>>>>>>>>>>>>> restoration > >>>>>>>>>>>>>>>> and standby task updates, but this is an implementation > >>>>>>>>>>>>>>>> detail. Under > >>>>>>>>>>>>>>>> normal circumstances (no rebalances or task migrations), > the > >>>>>>>>>>> changelog > >>>>>>>>>>>>>>>> reader will be in STANDBY_UPDATING, which means it will be > >>>>>>>>>>>>>>>> updating > >>>>>>>>>>>>>> standby > >>>>>>>>>>>>>>>> tasks as long as there are new records in the changelog > >>>> topic. > >>>>>>>>>>>>>>>> That's > >>>>>>>>>>>>>> why I > >>>>>>>>>>>>>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if > >>>> it > >>>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>> 100% > >>>>>>>>>>>>>>>> align with StateRestoreListener, but either one is fine. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Edu > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > >>>>>>>>>>>>>> guozhang.wang...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hello Colt, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for writing the KIP! I have read through the > updated > >>>>>>>>>>>>>>>>> KIP and > >>>>>>>>>>>>>>>>> overall it looks great. I only have minor naming comments > >>>>>> (well, > >>>>>>>>>>>>>>>>> aren't naming the least boring stuff to discuss and that > >>>>>>>>>>>>>>>>> takes the > >>>>>>>>>>>>>>>>> most of the time for KIPs :P): > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1. I tend to agree with Sophie regarding whether or not > to > >>>>>>>>>>>>>>>>> include > >>>>>>>>>>>>>>>>> "Standby" in the functions of > >>>>> "onStandbyUpdateStart/Suspended", > >>>>>>>>>>> since > >>>>>>>>>>>>>>>>> it is also more consistent with the functions of > >>>>>>>>>>>>>>>>> "StateRestoreListener" where we do not name it as > >>>>>>>>>>>>>>>>> "onStateRestoreState" etc. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. I know in community discussions we sometimes say "a > >>>>>>>>>>>>>>>>> standby is > >>>>>>>>>>>>>>>>> promoted to active", but in the official code / java docs > >>>> we > >>>>>>>>>>>>>>>>> did not > >>>>>>>>>>>>>>>>> have a term of "promotion", since what the code does is > >>>>> really > >>>>>>>>>>>>>> recycle > >>>>>>>>>>>>>>>>> the task (while keeping its state stores open), and > create > >>>> a > >>>>>> new > >>>>>>>>>>>>>>>>> active task that takes in the recycled state stores and > >>>> just > >>>>>>>>>>> changing > >>>>>>>>>>>>>>>>> the other fields like task type etc. After thinking about > >>>>>>>>>>>>>>>>> this for a > >>>>>>>>>>>>>>>>> bit, I tend to feel that "promoted" is indeed a better > name > >>>>>>>>>>>>>>>>> for user > >>>>>>>>>>>>>>>>> facing purposes while "recycle" is more of a technical > >>>> detail > >>>>>>>>>>>>>>>>> inside > >>>>>>>>>>>>>>>>> the code and could be abstracted away from users. So I > feel > >>>>>>>>>>>>>>>>> keeping > >>>>>>>>>>>>>>>>> the name "PROMOTED" is fine. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. Regarding "earliestOffset", it does feel like we > cannot > >>>>>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>> avoid another call to the Kafka API. And on the other > >>>> hand, I > >>>>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>> tend to think that such bookkeeping may be better done at > >>>> the > >>>>>>>>>>>>>>>>> app > >>>>>>>>>>>>>>>>> level than from the Streams' public API level. I.e. the > app > >>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>> keep > >>>>>>>>>>>>>>>>> a "first ever starting offset" per > "topic-partition-store" > >>>>>>>>>>>>>>>>> key, and > >>>>>>>>>>> a > >>>>>>>>>>>>>>>>> when we have rolling restart and hence some standby task > >>>>> keeps > >>>>>>>>>>>>>>>>> "jumping" from one client to another via task assignment, > >>>> the > >>>>>>>>>>>>>>>>> app > >>>>>>>>>>>>>>>>> would update this value just one when it finds the > >>>>>>>>>>>>>>>>> ""topic-partition-store" was never triggered before. What > >>>> do > >>>>>> you > >>>>>>>>>>>>>>>>> think? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 4. I do not have a strong opinion either, but what about > >>>>>>>>>>>>>>>> "onBatchUpdated" ? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy > >>>>>>>>>>>>>>>>> <c...@littlehorse.io> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Sohpie— > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thank you very much for such a detailed review of the > KIP. > >>>>>>>>>>>>>>>>>> It might > >>>>>>>>>>>>>>>>>> actually be longer than the original KIP in the first > >>>> place! > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. Ack'ed and fixed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2. Correct, this is a confusing passage and requires > >>>>> context: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> One thing on our list of TODO's regarding reliability is > >>>> to > >>>>>>>>>>>>>> determine > >>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>> to configure `session.timeout.ms`. In our Kubernetes > >>>>>>>>>>>>>>>>>> Environment, > >>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>> instance of our Streams App can be terminated, > restarted, > >>>>>>>>>>>>>>>>>> and get > >>>>>>>>>>>>>> back > >>>>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>> the "RUNNING" Streams state in about 20 seconds. We have > >>>> two > >>>>>>>>>>>>>> options > >>>>>>>>>>>>>>>>> here: > >>>>>>>>>>>>>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal > >>>>> with > >>>>>> 20 > >>>>>>>>>>>>>>>> seconds > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> unavailability for affected partitions, but avoid > >>>> shuffling > >>>>>>>>>>>>>>>>>> Tasks; > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>> b) > >>>>>>>>>>>>>>>>>> set session.timeout.ms to a low value, such as 6 > seconds > >>>> ( > >>>>>>>>>>>>>>>>>> heartbeat.interval.ms of 2000), and reduce the > >>>>> unavailability > >>>>>>>>>>>>>> window > >>>>>>>>>>>>>>>>> during > >>>>>>>>>>>>>>>>>> a rolling bounce but incur an "extra" rebalance. There > are > >>>>>>>>>>>>>>>>>> several > >>>>>>>>>>>>>>>>>> different costs to a rebalance, including the shuffling > of > >>>>>>>>>>>>>>>>>> standby > >>>>>>>>>>>>>>>> tasks. > >>>>>>>>>>>>>>>>>> JMX metrics are not fine-grained enough to give us an > >>>>> accurate > >>>>>>>>>>>>>> picture > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> what's going on with the whole Standby Task Shuffle > >>>> Dance. I > >>>>>>>>>>>>>>>> hypothesize > >>>>>>>>>>>>>>>>>> that the Standby Update Listener might help us clarify > >>>> just > >>>>>>>>>>>>>>>>>> how the > >>>>>>>>>>>>>>>>>> shuffling actually (not theoretically) works, which will > >>>>>>>>>>>>>>>>>> help us > >>>>>>>>>>>>>> make a > >>>>>>>>>>>>>>>>>> more informed decision about the session timeout config. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If you think this is worth putting in the KIP, I'll > polish > >>>>>>>>>>>>>>>>>> it and > >>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>> so; > >>>>>>>>>>>>>>>>>> else, I'll remove the current half-baked explanation. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3. Overall, I agree with this. In our app, each Task has > >>>>>>>>>>>>>>>>>> only one > >>>>>>>>>>>>>> Store > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> reduce the number of changelog partitions, so I > sometimes > >>>>>>>>>>>>>>>>>> forget > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> distinction between the two concepts, as reflected in > the > >>>>>>>>>>>>>>>>>> KIP (: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3a. I don't like the word "Restore" here, since > >>>> Restoration > >>>>>>>>>>>>>>>>>> refers > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>> Active Task getting caught up in preparation to resume > >>>>>>>>>>>>>>>>>> processing. > >>>>>>>>>>>>>>>>>> `StandbyUpdateListener` is fine by me; I have updated > the > >>>>>>>>>>>>>>>>>> KIP. I > >>>>>>>>>>>>>> am a > >>>>>>>>>>>>>>>>>> native Python speaker so I do prefer shorter names > anyways > >>>>> (: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3b1. +1 to removing the word 'Task'. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3b2. I like `onUpdateStart()`, but with your permission > >>>> I'd > >>>>>>>>>>>>>>>>>> prefer > >>>>>>>>>>>>>>>>>> `onStandbyUpdateStart()` which matches the name of the > >>>>>>>>>>>>>>>>>> Interface > >>>>>>>>>>>>>>>>>> "StandbyUpdateListener". (the python part of me hates > >>>> this, > >>>>>>>>>>>>>> however) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3b3. Going back to question 2), `earliestOffset` was > >>>>>>>>>>>>>>>>>> intended to > >>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>> us > >>>>>>>>>>>>>>>>>> to more easily calculate the amount of state _already > >>>>>>>>>>>>>>>>>> loaded_ in > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>> by subtracting (startingOffset - earliestOffset). This > >>>> would > >>>>>>>>>>>>>>>>>> help > >>>>>>>>>>>>>> us > >>>>>>>>>>>>>>>> see > >>>>>>>>>>>>>>>>>> how much inefficiency is introduced in a rolling > >>>> restart—if > >>>>>>>>>>>>>>>>>> we end > >>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>> going > >>>>>>>>>>>>>>>>>> from a situation with an up-to-date standby before the > >>>>>>>>>>>>>>>>>> restart, and > >>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>> after the whole restart, the Task is shuffled onto an > >>>>> instance > >>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>> is no previous state, then that is expensive. However, > if > >>>>>>>>>>>>>>>>>> the final > >>>>>>>>>>>>>>>>>> shuffling results in the Task back on an instance with a > >>>> lot > >>>>>> of > >>>>>>>>>>>>>>>> pre-built > >>>>>>>>>>>>>>>>>> state, it's not expensive. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If a call over the network is required to determine the > >>>>>>>>>>>>>> earliestOffset, > >>>>>>>>>>>>>>>>>> then this is a "hard no-go" for me, and we will remove > it > >>>>>> (I'll > >>>>>>>>>>>>>> have to > >>>>>>>>>>>>>>>>>> check with Eduwer as he is close to having a working > >>>>>>>>>>>>>> implementation). I > >>>>>>>>>>>>>>>>>> think we can probably determine what we wanted to see > in a > >>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>> way, but it will take more thinking.. If > `earliestOffset` > >>>> is > >>>>>>>>>>>>>> confusing, > >>>>>>>>>>>>>>>>>> perhaps rename it to `earliestChangelogOffset`? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> `startingOffset` is easy to remove as it can be > determined > >>>>>>>>>>>>>>>>>> from the > >>>>>>>>>>>>>>>> first > >>>>>>>>>>>>>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Anyways, I've updated the JavaDoc in the interface; > >>>>>>>>>>>>>>>>>> hopefully it's > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>> clear. Awaiting further instructions here. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3c. Good point; after thinking, my preference is > >>>>>>>>>>>>>> `onBatchLoaded()` -> > >>>>>>>>>>>>>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` -> > >>>>>>>>>>>>>>>>>> `onBatchRestored()`. > >>>>>>>>>>>>>> I am > >>>>>>>>>>>>>>>>>> less fond of "processed" because when I was first > learning > >>>>>>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>> mistakenly thought that standby tasks actually processed > >>>> the > >>>>>>>>>>>>>>>>>> input > >>>>>>>>>>>>>>>> topic > >>>>>>>>>>>>>>>>>> rather than loaded from the changelog. I'll defer to you > >>>>> here. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet > >>>>>>>>>>>>>>>>>> `onStandbyUpdateSuspended()`. Will check about the > >>>>>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> keeping track of the number of records loaded. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 4a. I think this might be best in a separate KIP, > >>>> especially > >>>>>>>>>>>>>>>>>> given > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> this is my and Eduwer's first time contributing to Kafka > >>>> (so > >>>>>> we > >>>>>>>>>>>>>> want to > >>>>>>>>>>>>>>>>>> minimize the blast radius). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 4b. I might respectfully (and timidly) push back here, > >>>>>> RECYCLED > >>>>>>>>>>>>>> for an > >>>>>>>>>>>>>>>>>> Active Task is a bit confusing to me. DEMOTED and > MIGRATED > >>>>>> make > >>>>>>>>>>>>>> sense > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>> the standpoint of an Active Task, recycling to me sounds > >>>>> like > >>>>>>>>>>>>>> throwing > >>>>>>>>>>>>>>>>>> stuff away, such that the resources (i.e. disk space) > can > >>>> be > >>>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>> by a > >>>>>>>>>>>>>>>>>> separate Task. As an alternative rather than trying to > >>>> reuse > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>> enum, > >>>>>>>>>>>>>>>>>> maybe rename it to `StandbySuspendReason` to avoid > naming > >>>>>>>>>>>>>>>>>> conflicts > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> `ActiveSuspendReason`? However, I could be convinced to > >>>>> rename > >>>>>>>>>>>>>> PROMOTED > >>>>>>>>>>>>>>>>> -> > >>>>>>>>>>>>>>>>>> RECYCLED, especially if Eduwer agrees. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> TLDR: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> T1. Agreed, will remove the word "Task" as it's > incorrect. > >>>>>>>>>>>>>>>>>> T2. Will update to `onStandbyUpdateStart()` > >>>>>>>>>>>>>>>>>> T3. Awaiting further instructions on earliestOffset and > >>>>>>>>>>>>>> startingOffset. > >>>>>>>>>>>>>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps > >>>>>>>>>>>>>>>>> `onBatchLoaded()`? > >>>>>>>>>>>>>>>>>> T5. Will update to `onStandbyUpdateSuspended()` > >>>>>>>>>>>>>>>>>> T6. Thoughts on renaming SuspendReason to > >>>>>> StandbySuspendReason, > >>>>>>>>>>>>>> rather > >>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>> renaming PROMOTED to RECYCLED? @Eduwer? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Long Live the Otter, > >>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev* > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman < > >>>>>>>>>>>>>>>>> sop...@responsive.dev> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great > >>>>>>>>>>>>>>>>>>> addition to > >>>>>>>>>>>>>>>>> Streams, I > >>>>>>>>>>>>>>>>>>> can't believe we've gone so long without this. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Overall the proposal makes sense, but I had a handful > of > >>>>>>>>>>>>>>>>>>> fairly > >>>>>>>>>>>>>> minor > >>>>>>>>>>>>>>>>>>> questions and suggestions/requests > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of > >>>> the > >>>>>>>>>>>>>>>> Motivation > >>>>>>>>>>>>>>>>>>> section is cut off and incomplete -- "want to be able > to > >>>>>>>>>>>>>>>>>>> know " > >>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>> exactly? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. This isn't that important since the motivation as a > >>>>>>>>>>>>>>>>>>> whole is > >>>>>>>>>>>>>> clear > >>>>>>>>>>>>>>>>> to me > >>>>>>>>>>>>>>>>>>> and convincing enough, but I'm not quite sure I > >>>> understand > >>>>>> the > >>>>>>>>>>>>>>>> example > >>>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>> the end of the Motivation section. How are standby > tasks > >>>>>>>>>>>>>>>>>>> (and the > >>>>>>>>>>>>>>>>> ability > >>>>>>>>>>>>>>>>>>> to hook into and monitor their status) related to the > >>>>>>>>>>>>>>>>> session.timeout.ms > >>>>>>>>>>>>>>>>>>> config? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3. To help both old and new users of Kafka Streams > >>>>> understand > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>> restore listener and its purpose/semantics, can we try > to > >>>>>> name > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> class > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> callbacks in a way that's more consistent with the > >>>>>>>>>>>>>>>>>>> active task > >>>>>>>>>>>>>>>> restore > >>>>>>>>>>>>>>>>>>> listener? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3a. StandbyTaskUpdateListener: > >>>>>>>>>>>>>>>>>>> The existing restore listener is called > >>>>>>>>>>>>>>>>>>> StateRestoreListener, so > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>> one could be called something like > >>>>>>>>>>>>>>>>>>> StandbyStateRestoreListener. > >>>>>>>>>>>>>>>>> Although > >>>>>>>>>>>>>>>>>>> we typically refer to standby tasks as "processing" > >>>> rather > >>>>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>> "restoring" > >>>>>>>>>>>>>>>>>>> records -- ie restoration is a term for active task > state > >>>>>>>>>>>>>>>>> specifically. I > >>>>>>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>> like the original suggestion if we just drop the "Task" > >>>>>>>>>>>>>>>>>>> part of > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> name, > >>>>>>>>>>>>>>>>>>> ie StandbyUpdateListener. I think either that or > >>>>>>>>>>>>>>>> StandbyRestoreListener > >>>>>>>>>>>>>>>>>>> would be fine and probably the two best options. > >>>>>>>>>>>>>>>>>>> Also, this probably goes without saying but any change > to > >>>>> the > >>>>>>>>>>>>>> name of > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> class should of course be reflected in the > >>>>>> KafkaStreams#setXXX > >>>>>>>>>>>>>> API as > >>>>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3b. #onTaskCreated > >>>>>>>>>>>>>>>>>>> I know the "start" callback feels a bit different > for > >>>>> the > >>>>>>>>>>>>>> standby > >>>>>>>>>>>>>>>> task > >>>>>>>>>>>>>>>>>>> updater vs an active task beginning restoration, but I > >>>>>>>>>>>>>>>>>>> think we > >>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>> try > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> keep the various callbacks aligned to their active > >>>> restore > >>>>>>>>>>>>>> listener > >>>>>>>>>>>>>>>>>>> counterpart. We can/should just replace the term > >>>> "restore" > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> "update" > >>>>>>>>>>>>>>>>>>> for the > >>>>>>>>>>>>>>>>>>> callback method names the same way we do for the class > >>>>> name, > >>>>>>>>>>>>>> which in > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> case would give us #onUpdateStart. Personally I like > this > >>>>>>>>>>>>>>>>>>> better, > >>>>>>>>>>>>>>>>>>> but it's ultimately up to you. However, I would push > back > >>>>>>>>>>>>>>>>>>> against > >>>>>>>>>>>>>>>>> anything > >>>>>>>>>>>>>>>>>>> that includes the word "Task" (eg #onTaskCreated) as > the > >>>>>>>>>>>>>>>>>>> listener > >>>>>>>>>>>>>>>>>>> is actually not scoped to the task itself but > instead > >>>> to > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> individual > >>>>>>>>>>>>>>>>>>> state store(s). This is the main reason I would prefer > >>>>>>>>>>>>>>>>>>> calling it > >>>>>>>>>>>>>>>>> something > >>>>>>>>>>>>>>>>>>> like #onUpdateStart, which keeps the focus on the store > >>>>> being > >>>>>>>>>>>>>> updated > >>>>>>>>>>>>>>>>>>> rather than the task that just happens to own this > store > >>>>>>>>>>>>>>>>>>> One last thing on this callback -- do we really need > both > >>>>> the > >>>>>>>>>>>>>>>>>>> `earliestOffset` and `startingOffset`? I feel like this > >>>>>>>>>>>>>>>>>>> might be > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> confusing than it > >>>>>>>>>>>>>>>>>>> is helpful (tbh even I'm not completely sure I know > what > >>>>> the > >>>>>>>>>>>>>>>>> earliestOffset > >>>>>>>>>>>>>>>>>>> is supposed to represent) More importantly, is this all > >>>>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>>> that is already available and able to be passed in to > the > >>>>>>>>>>>>>> callback by > >>>>>>>>>>>>>>>>>>> Streams? I haven't checked on this but it feels like > the > >>>>>>>>>>>>>>>>> earliestOffset is > >>>>>>>>>>>>>>>>>>> likely to require a remote call, either by the embedded > >>>>>>>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>> via the > >>>>>>>>>>>>>>>>>>> admin client. If so, the ROI on including this > parameter > >>>>>> seems > >>>>>>>>>>>>>>>>>>> quite low (if not outright negative) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3c. #onBatchRestored > >>>>>>>>>>>>>>>>>>> If we opt to use the term "update" in place of > "restore" > >>>>>>>>>>>>>> elsewhere, > >>>>>>>>>>>>>>>>> then we > >>>>>>>>>>>>>>>>>>> should consider doing so here as well. What do you > think > >>>>>> about > >>>>>>>>>>>>>>>>>>> #onBatchUpdated, or even #onBatchProcessed? > >>>>>>>>>>>>>>>>>>> I'm actually not super concerned about this particular > >>>> API, > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> honestly I > >>>>>>>>>>>>>>>>>>> think we can use restore or update interchangeably > here, > >>>> so > >>>>>> if > >>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>> don't like any of the suggested names (and no one > can > >>>>>>>>>>>>>>>>>>> think of > >>>>>>>>>>>>>>>>> anything > >>>>>>>>>>>>>>>>>>> better), I would just stick with #onBatchRestored. In > >>>> this > >>>>>>>>>>>>>>>>>>> case, > >>>>>>>>>>>>>>>>>>> it kind of makes the most sense. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3d. #onTaskSuspended > >>>>>>>>>>>>>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or > >>>>> just > >>>>>>>>>>>>>>>>>>> #onRestoreSuspended probably makes more sense for this > >>>>>>>>>>>>>>>>>>> callback. > >>>>>>>>>>>>>>>> Also, > >>>>>>>>>>>>>>>>>>> I notice the StateRestoreListener passes in the > total > >>>>>>>>>>>>>>>>>>> number of > >>>>>>>>>>>>>>>>> records > >>>>>>>>>>>>>>>>>>> restored to its #onRestoreSuspended. Assuming we > already > >>>>>> track > >>>>>>>>>>>>>>>>>>> that information in Streams and have it readily > available > >>>>> to > >>>>>>>>>>>>>> pass in > >>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>> whatever point we would be invoking this callback, that > >>>>>>>>>>>>>>>>>>> might be > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> useful parameter for the standby listener to have as > >>>> well > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 4. I totally love the SuspendReason thing, just two > >>>>>>>>>>>>>> notes/requests: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 4a. Feel free to push back against adding onto the > scope > >>>> of > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>> KIP, > >>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>> it would be great to expand the active state restore > >>>>> listener > >>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> SuspendReason enum as well. It would be really useful > for > >>>>>> both > >>>>>>>>>>>>>>>>> variants of > >>>>>>>>>>>>>>>>>>> restore listener > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to > RECYCLED > >>>> -- > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> standby > >>>>>>>>>>>>>>>>>>> tasks it means basically the same thing, the point is > >>>> that > >>>>>>>>>>>>>>>>>>> active > >>>>>>>>>>>>>>>>>>> tasks can also be recycled into standbys through the > same > >>>>>>>>>>>>>> mechanism. > >>>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>>> way they can share the SuspendReason enum -- not that > >>>> it's > >>>>>>>>>>>>>>>>>>> necessary for them to share, I just think it would be a > >>>>> good > >>>>>>>>>>>>>> idea to > >>>>>>>>>>>>>>>>> keep > >>>>>>>>>>>>>>>>>>> the two restore listeners aligned to the highest degree > >>>>>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>> we can. > >>>>>>>>>>>>>>>>>>> I was actually considering proposing a short KIP with a > >>>> new > >>>>>>>>>>>>>>>>>>> RecyclingListener (or something) specifically for this > >>>>> exact > >>>>>>>>>>>>>> kind of > >>>>>>>>>>>>>>>>> thing, > >>>>>>>>>>>>>>>>>>> since we > >>>>>>>>>>>>>>>>>>> currently have literally zero insight into the > recycling > >>>>>>>>>>>>>>>>>>> process. > >>>>>>>>>>>>>>>> It's > >>>>>>>>>>>>>>>>>>> practically impossible to tell when a store has been > >>>>>> converted > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>> active > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> standby, or vice versa. So having access to the > >>>>>> SuspendReason, > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> importantly having a callback guaranteed to notify you > >>>>> when a > >>>>>>>>>>>>>>>>>>> state store is recycled whether active or standby, > would > >>>> be > >>>>>>>>>>>>>> amazing. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the KIP! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P" > Blee-Goldman > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> ---------- Forwarded message --------- > >>>>>>>>>>>>>>>>>>>> From: Colt McNealy <c...@littlehorse.io> > >>>>>>>>>>>>>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM > >>>>>>>>>>>>>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update > >>>>>>>>>>>>>>>>>>>> Listener > >>>>>>>>>>>>>>>>>>>> To: <dev@kafka.apache.org> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> We would like to propose a small KIP to improve the > >>>>>>>>>>>>>>>>>>>> ability of > >>>>>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>>>>> apps > >>>>>>>>>>>>>>>>>>>> to monitor the progress of their standby tasks > through a > >>>>>>>>>>>>>> callback > >>>>>>>>>>>>>>>>>>>> interface. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> We have a nearly-working implementation on our fork > and > >>>>> are > >>>>>>>>>>>>>> curious > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> feedback. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thank you, > >>>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev* > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >