That all sounds good to me! Thanks for the KIP On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy <c...@littlehorse.io> wrote:
> Hi Sophie, Matthias, Bruno, and Eduwer— > > Thanks for your patience as I have been scrambling to catch up after a week > of business travel (and a few days with no time to code). I'd like to tie > up some loose ends here, but in short, I don't think the KIP document > itself needs any changes (our internal implementation does, however). > > 1. In the interest of a) not changing the KIP after it's already out for a > vote, and b) making sure our English grammar is "correct", let's stick with > 'onBatchLoaded()`. It is the Store that gets updated, not the Batch. > > 2. For me (and, thankfully, the community as well) adding a remote network > call at any point in this KIP is a non-starter. We'll ensure that > our implementation does not introduce one. > > 3. I really don't like changing API behavior, even if it's not documented > in the javadoc. As such, I am strongly against modifying the behavior of > endOffsets() on the consumer as some people may implicitly depend on the > contract. > 3a. The Consumer#currentLag() method gives us exactly what we want without > a network call (current lag from a cache, from which we can compute the > offset). > > 4. I have no opinion about whether we should pass endOffset or currentLag > to the callback. Either one has the same exact information inside it. In > the interest of not changing the KIP after the vote has started, I'll leave > it as endOffset. > > As such, I believe the KIP doesn't need any updates, nor has it been > updated since the vote started. > > Would anyone else like to discuss something before the Otter Council > adjourns regarding this matter? > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman < > sop...@responsive.dev> > wrote: > > > Just want to checkpoint the current state of this KIP and make sure we're > > on track to get it in to 3.7 (we still have a few weeks) -- looks like > > there are two remaining open questions, both relating to the > > middle/intermediate callback: > > > > 1. What to name it: seems like the primary candidates are onBatchLoaded > and > > onBatchUpdated (and maybe also onStandbyUpdated?) > > 2. What additional information can we pass in that would strike a good > > balance between being helpful and impacting performance. > > > > Regarding #1, I think all of the current options are reasonable enough > that > > we should just let Colt decide which he prefers. I personally think > > #onBatchUpdated is fine -- Bruno does make a fair point but the truth is > > that English grammar can be sticky and while it could be argued that it > is > > the store which is updated, not the batch, I feel that it is perfectly > > clear what is meant by "onBatchUpdated" and to me, this doesn't sound > weird > > at all. That's just my two cents in case it helps, but again, whatever > > makes sense to you Colt is fine > > > > When it comes to #2 -- as much as I would love to dig into the Consumer > > client lore and see if we can modify existing APIs or add new ones in > order > > to get the desired offset metadata in an efficient way, I think we're > > starting to go down a rabbit hole that is going to expand the scope way > > beyond what Colt thought he was signing up for. I would advocate to focus > > on just the basic feature for now and drop the end-offset from the > > callback. Once we have a standby listener it will be easy to expand on > with > > a followup KIP if/when we find an efficient way to add additional useful > > information. I think it will also become more clear what is and isn't > > useful after more people get to using it in the real world > > > > Colt/Eduwer: how necessary is receiving the end offset during a batch > > update to your own application use case? > > > > Also, for those who really do need to check the current end offset, I > > believe in theory you should be able to use the KafkaStreams#metrics API > to > > get the current lag and/or end offset for the changelog -- it's possible > > this does not represent the most up-to-date end offset (I'm not sure it > > does or does not), but it should be close enough to be reliable and > useful > > for the purpose of monitoring -- I mean it is a metric, after all. > > > > Hope this helps -- in the end, it's up to you (Colt) to decide what you > > want to bring in scope or not. We still have more than 3 weeks until the > > KIP freeze as currently proposed, so in theory you could even implement > > this KIP without the end offset and then do a followup KIP to add the end > > offset within the same release, ie without any deprecations. There are > > plenty of paths forward here, so don't let us drag this out forever if > you > > know what you want > > > > Cheers, > > Sophie > > > > On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax <mj...@apache.org> > wrote: > > > > > Forgot one thing: > > > > > > We could also pass `currentLag()` into `onBachLoaded()` instead of > > > end-offset. > > > > > > > > > -Matthias > > > > > > On 10/20/23 10:56 AM, Matthias J. Sax wrote: > > > > Thanks for digging into this Bruno. > > > > > > > > The JavaDoc on the consumer does not say anything specific about > > > > `endOffset` guarantees: > > > > > > > >> Get the end offsets for the given partitions. In the default {@code > > > >> read_uncommitted} isolation level, the end > > > >> offset is the high watermark (that is, the offset of the last > > > >> successfully replicated message plus one). For > > > >> {@code read_committed} consumers, the end offset is the last stable > > > >> offset (LSO), which is the minimum of > > > >> the high watermark and the smallest offset of any open transaction. > > > >> Finally, if the partition has never been > > > >> written to, the end offset is 0. > > > > > > > > Thus, I actually believe that it would be ok to change the > > > > implementation and serve the answer from the `TopicPartitionState`? > > > > > > > > Another idea would be, to use `currentLag()` in combination with > > > > `position()` (or the offset of the last read record) to compute the > > > > end-offset of the fly? > > > > > > > > > > > > -Matthias > > > > > > > > On 10/20/23 4:00 AM, Bruno Cadonna wrote: > > > >> Hi, > > > >> > > > >> Matthias is correct that the end offsets are stored somewhere in the > > > >> metadata of the consumer. More precisely, they are stored in the > > > >> `TopicPartitionState`. However, I could not find public API on the > > > >> consumer other than currentLag() that uses the stored end offsets. > If > > > >> I understand the code correctly, method endOffSets() always > triggers a > > > >> remote call. > > > >> > > > >> I am a bit concerned about doing remote calls every > > commit.interval.ms > > > >> (by default 200ms under EOS). At the moment the remote calls are > only > > > >> issued if an optimization for KTables is turned on where changelog > > > >> topics are replaced with the input topic of the KTable. The current > > > >> remote calls retrieve all committed offsets of the group at once. > If I > > > >> understand correctly, that is one single remote call. Remote calls > for > > > >> getting end offsets of changelog topics -- as I understand you are > > > >> planning to issue -- will probably result in multiple remote calls > to > > > >> multiple leaders of the changelog topic partitions. > > > >> > > > >> Please correct me if I misunderstood anything of the above. > > > >> > > > >> If my understanding is correct, I propose to modify the consumer in > > > >> such a way to get the end offset from the locally stored metadata > > > >> whenever possible as part of the implementation of this KIP. I do > not > > > >> know what the implications are of such a change of the consumer and > if > > > >> a KIP is needed for it. Maybe, endOffsets() guarantees to return the > > > >> freshest end offsets possible, which would not be satisfied with the > > > >> modification. > > > >> > > > >> Regarding the naming, I do not completely agree with Matthias. While > > > >> the pattern might be consistent with onBatchUpdated, what is the > > > >> meaning of onBatchUpdated? Is the batch updated? The names > > > >> onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO. > > > >> With "restore" the pattern works better. If I restore a batch of > > > >> records in a state, the records are not there although they should > be > > > >> there and I add them. If I update a batch of records in a state. > This > > > >> sounds like the batch of records is in the state and I modify the > > > >> existing records within the state. That is clearly not the meaning > of > > > >> the event for which the listener should be called. > > > >> > > > >> Best, > > > >> Bruno > > > >> > > > >> > > > >> > > > >> On 10/19/23 2:12 AM, Matthias J. Sax wrote: > > > >>> Thanks for the KIP. Seems I am almost late to the party. > > > >>> > > > >>> About naming (fun, fun, fun): I like the current proposal overall, > > > >>> except `onBachLoaded`, but would prefer `onBatchUpdated`. It better > > > >>> aligns to everything else: > > > >>> > > > >>> - it's an update-listener, not loaded-listener > > > >>> - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, > > > >>> `onRestoreSuspended, and `onBachRestored` (it's very consistent > > > >>> - `StandbyUpdateListener` should have `onUpdateStart`, > > > >>> `onUpdateSuspended` and `onBatchUpdated` to be equally consistent > > > >>> (using "loaded" breaks the pattern) > > > >>> > > > >>> > > > >>> About the end-offset question: I am relatively sure that the > consumer > > > >>> gets the latest end-offset as attached metadata in every fetch > > > >>> response. (We exploit this behavior to track end-offsets for input > > > >>> topic with regard to `max.task.idle.ms` without overhead -- it was > > > >>> also a concern when we did the corresponding KIP how we could track > > > >>> lag with no overhead). > > > >>> > > > >>> Thus, I believe we would "just" need to modify the code accordingly > > > >>> to get this information from the restore-consumer > > > >>> (`restorConsumer.endOffsets(...)`; should be served w/o RPC but > from > > > >>> internal metadata cache) for free, and pass into the listener. > > > >>> > > > >>> Please double check / verify this claim and keep me honest about > it. > > > >>> > > > >>> > > > >>> -Matthias > > > >>> > > > >>> On 10/17/23 6:38 AM, Eduwer Camacaro wrote: > > > >>>> Hi Bruno, > > > >>>> > > > >>>> Thanks for your observation; surely it will require a network call > > > >>>> using > > > >>>> the admin client in order to know this "endOffset" and that will > > > >>>> have an > > > >>>> impact on performance. We can either find a solution that has a > low > > > >>>> impact > > > >>>> on performance or ideally zero impact; unfortunately, I don't see > a > > > >>>> way to > > > >>>> have zero impact on performance. However, we can leverage the > > existing > > > >>>> #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses > the > > > >>>> admin > > > >>>> client to ask for these "endOffset"s. As far I can understand, > this > > > >>>> update > > > >>>> is done periodically using the "commit.interval.ms" > configuration. > > I > > > >>>> believe this option will force us to invoke StandbyUpdateLister > once > > > >>>> this > > > >>>> interval is reached. > > > >>>> > > > >>>> On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <cado...@apache.org > > > > > >>>> wrote: > > > >>>> > > > >>>>> Thanks for the KIP, Colt and Eduwer, > > > >>>>> > > > >>>>> Are you sure there is also not a significant performance impact > for > > > >>>>> passing into the callback `currentEndOffset`? > > > >>>>> > > > >>>>> I am asking because the comment here: > > > >>>>> > > > >>>>> > > > > > > https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 > > > >>>>> > > > >>>>> says that the end-offset is only updated once for standby tasks > > whose > > > >>>>> changelog topic is not piggy-backed on input topics. I could also > > not > > > >>>>> find the update of end-offset for those standbys. > > > >>>>> > > > >>>>> > > > >>>>> Best, > > > >>>>> Bruno > > > >>>>> > > > >>>>> On 10/16/23 10:55 AM, Lucas Brutschy wrote: > > > >>>>>> Hi all, > > > >>>>>> > > > >>>>>> it's a nice improvement! I don't have anything to add on top of > > the > > > >>>>>> previous comments, just came here to say that it seems to me > > > >>>>>> consensus > > > >>>>>> has been reached and the result looks good to me. > > > >>>>>> > > > >>>>>> Thanks Colt and Eduwer! > > > >>>>>> Lucas > > > >>>>>> > > > >>>>>> On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy < > c...@littlehorse.io > > > > > > >>>>> wrote: > > > >>>>>>> > > > >>>>>>> Thanks, Guozhang. I've updated the KIP and will start a vote. > > > >>>>>>> > > > >>>>>>> Colt McNealy > > > >>>>>>> > > > >>>>>>> *Founder, LittleHorse.dev* > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang < > > > >>>>> guozhang.wang...@gmail.com> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Thanks for the summary, that looks good to me. > > > >>>>>>>> > > > >>>>>>>> Guozhang > > > >>>>>>>> > > > >>>>>>>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy < > > c...@littlehorse.io > > > > > > > >>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> Hello there! > > > >>>>>>>>> > > > >>>>>>>>> Thanks everyone for the comments. There's a lot of > > back-and-forth > > > >>>>> going > > > >>>>>>>> on, > > > >>>>>>>>> so I'll do my best to summarize what everyone's said in TLDR > > > >>>>>>>>> format: > > > >>>>>>>>> > > > >>>>>>>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and > > do > > > >>>>>>>> similarly > > > >>>>>>>>> for the other methods. > > > >>>>>>>>> 2. Keep `SuspendReason.PROMOTED` and > `SuspendReason.MIGRATED`. > > > >>>>>>>>> 3. Remove the `earliestOffset` parameter for performance > > reasons. > > > >>>>>>>>> > > > >>>>>>>>> If that's all fine with everyone, I'll update the KIP and > > > we—well, > > > >>>>> mostly > > > >>>>>>>>> Edu (: —will open a PR. > > > >>>>>>>>> > > > >>>>>>>>> Cheers, > > > >>>>>>>>> Colt McNealy > > > >>>>>>>>> > > > >>>>>>>>> *Founder, LittleHorse.dev* > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro < > > > >>>>> edu...@littlehorse.io> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hello everyone, > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks for all your feedback for this KIP! > > > >>>>>>>>>> > > > >>>>>>>>>> I think that the key to choosing proper names for this API > is > > > >>>>>>>> understanding > > > >>>>>>>>>> the terms used inside the StoreChangelogReader. Currently, > > > >>>>>>>>>> this class > > > >>>>>>>> has > > > >>>>>>>>>> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. > In > > > my > > > >>>>>>>> opinion, > > > >>>>>>>>>> using StandbyUpdateListener for the interface fits better on > > > >>>>>>>>>> these > > > >>>>>>>> terms. > > > >>>>>>>>>> Same applies for onUpdateStart/Suspended. > > > >>>>>>>>>> > > > >>>>>>>>>> StoreChangelogReader uses "the same mechanism" for active > task > > > >>>>>>>> restoration > > > >>>>>>>>>> and standby task updates, but this is an implementation > > > >>>>>>>>>> detail. Under > > > >>>>>>>>>> normal circumstances (no rebalances or task migrations), the > > > >>>>> changelog > > > >>>>>>>>>> reader will be in STANDBY_UPDATING, which means it will be > > > >>>>>>>>>> updating > > > >>>>>>>> standby > > > >>>>>>>>>> tasks as long as there are new records in the changelog > topic. > > > >>>>>>>>>> That's > > > >>>>>>>> why I > > > >>>>>>>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if > it > > > >>>>>>>>>> doesn't > > > >>>>>>>> 100% > > > >>>>>>>>>> align with StateRestoreListener, but either one is fine. > > > >>>>>>>>>> > > > >>>>>>>>>> Edu > > > >>>>>>>>>> > > > >>>>>>>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > > > >>>>>>>> guozhang.wang...@gmail.com> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hello Colt, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks for writing the KIP! I have read through the updated > > > >>>>>>>>>>> KIP and > > > >>>>>>>>>>> overall it looks great. I only have minor naming comments > > > (well, > > > >>>>>>>>>>> aren't naming the least boring stuff to discuss and that > > > >>>>>>>>>>> takes the > > > >>>>>>>>>>> most of the time for KIPs :P): > > > >>>>>>>>>>> > > > >>>>>>>>>>> 1. I tend to agree with Sophie regarding whether or not to > > > >>>>>>>>>>> include > > > >>>>>>>>>>> "Standby" in the functions of > > "onStandbyUpdateStart/Suspended", > > > >>>>> since > > > >>>>>>>>>>> it is also more consistent with the functions of > > > >>>>>>>>>>> "StateRestoreListener" where we do not name it as > > > >>>>>>>>>>> "onStateRestoreState" etc. > > > >>>>>>>>>>> > > > >>>>>>>>>>> 2. I know in community discussions we sometimes say "a > > > >>>>>>>>>>> standby is > > > >>>>>>>>>>> promoted to active", but in the official code / java docs > we > > > >>>>>>>>>>> did not > > > >>>>>>>>>>> have a term of "promotion", since what the code does is > > really > > > >>>>>>>> recycle > > > >>>>>>>>>>> the task (while keeping its state stores open), and create > a > > > new > > > >>>>>>>>>>> active task that takes in the recycled state stores and > just > > > >>>>> changing > > > >>>>>>>>>>> the other fields like task type etc. After thinking about > > > >>>>>>>>>>> this for a > > > >>>>>>>>>>> bit, I tend to feel that "promoted" is indeed a better name > > > >>>>>>>>>>> for user > > > >>>>>>>>>>> facing purposes while "recycle" is more of a technical > detail > > > >>>>>>>>>>> inside > > > >>>>>>>>>>> the code and could be abstracted away from users. So I feel > > > >>>>>>>>>>> keeping > > > >>>>>>>>>>> the name "PROMOTED" is fine. > > > >>>>>>>>>>> > > > >>>>>>>>>>> 3. Regarding "earliestOffset", it does feel like we cannot > > > >>>>>>>>>>> always > > > >>>>>>>>>>> avoid another call to the Kafka API. And on the other > hand, I > > > >>>>>>>>>>> also > > > >>>>>>>>>>> tend to think that such bookkeeping may be better done at > the > > > >>>>>>>>>>> app > > > >>>>>>>>>>> level than from the Streams' public API level. I.e. the app > > > >>>>>>>>>>> could > > > >>>>>>>> keep > > > >>>>>>>>>>> a "first ever starting offset" per "topic-partition-store" > > > >>>>>>>>>>> key, and > > > >>>>> a > > > >>>>>>>>>>> when we have rolling restart and hence some standby task > > keeps > > > >>>>>>>>>>> "jumping" from one client to another via task assignment, > the > > > >>>>>>>>>>> app > > > >>>>>>>>>>> would update this value just one when it finds the > > > >>>>>>>>>>> ""topic-partition-store" was never triggered before. What > do > > > you > > > >>>>>>>>>>> think? > > > >>>>>>>>>>> > > > >>>>>>>>>>> 4. I do not have a strong opinion either, but what about > > > >>>>>>>>>> "onBatchUpdated" ? > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Guozhang > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy > > > >>>>>>>>>>> <c...@littlehorse.io> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Sohpie— > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thank you very much for such a detailed review of the KIP. > > > >>>>>>>>>>>> It might > > > >>>>>>>>>>>> actually be longer than the original KIP in the first > place! > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 1. Ack'ed and fixed. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 2. Correct, this is a confusing passage and requires > > context: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> One thing on our list of TODO's regarding reliability is > to > > > >>>>>>>> determine > > > >>>>>>>>>> how > > > >>>>>>>>>>>> to configure `session.timeout.ms`. In our Kubernetes > > > >>>>>>>>>>>> Environment, > > > >>>>>>>> an > > > >>>>>>>>>>>> instance of our Streams App can be terminated, restarted, > > > >>>>>>>>>>>> and get > > > >>>>>>>> back > > > >>>>>>>>>>> into > > > >>>>>>>>>>>> the "RUNNING" Streams state in about 20 seconds. We have > two > > > >>>>>>>> options > > > >>>>>>>>>>> here: > > > >>>>>>>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal > > with > > > 20 > > > >>>>>>>>>> seconds > > > >>>>>>>>>>> of > > > >>>>>>>>>>>> unavailability for affected partitions, but avoid > shuffling > > > >>>>>>>>>>>> Tasks; > > > >>>>>>>> or > > > >>>>>>>>>> b) > > > >>>>>>>>>>>> set session.timeout.ms to a low value, such as 6 seconds > ( > > > >>>>>>>>>>>> heartbeat.interval.ms of 2000), and reduce the > > unavailability > > > >>>>>>>> window > > > >>>>>>>>>>> during > > > >>>>>>>>>>>> a rolling bounce but incur an "extra" rebalance. There are > > > >>>>>>>>>>>> several > > > >>>>>>>>>>>> different costs to a rebalance, including the shuffling of > > > >>>>>>>>>>>> standby > > > >>>>>>>>>> tasks. > > > >>>>>>>>>>>> JMX metrics are not fine-grained enough to give us an > > accurate > > > >>>>>>>> picture > > > >>>>>>>>>> of > > > >>>>>>>>>>>> what's going on with the whole Standby Task Shuffle > Dance. I > > > >>>>>>>>>> hypothesize > > > >>>>>>>>>>>> that the Standby Update Listener might help us clarify > just > > > >>>>>>>>>>>> how the > > > >>>>>>>>>>>> shuffling actually (not theoretically) works, which will > > > >>>>>>>>>>>> help us > > > >>>>>>>> make a > > > >>>>>>>>>>>> more informed decision about the session timeout config. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> If you think this is worth putting in the KIP, I'll polish > > > >>>>>>>>>>>> it and > > > >>>>>>>> do > > > >>>>>>>>>> so; > > > >>>>>>>>>>>> else, I'll remove the current half-baked explanation. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3. Overall, I agree with this. In our app, each Task has > > > >>>>>>>>>>>> only one > > > >>>>>>>> Store > > > >>>>>>>>>>> to > > > >>>>>>>>>>>> reduce the number of changelog partitions, so I sometimes > > > >>>>>>>>>>>> forget > > > >>>>>>>> the > > > >>>>>>>>>>>> distinction between the two concepts, as reflected in the > > > >>>>>>>>>>>> KIP (: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3a. I don't like the word "Restore" here, since > Restoration > > > >>>>>>>>>>>> refers > > > >>>>>>>> to > > > >>>>>>>>>> an > > > >>>>>>>>>>>> Active Task getting caught up in preparation to resume > > > >>>>>>>>>>>> processing. > > > >>>>>>>>>>>> `StandbyUpdateListener` is fine by me; I have updated the > > > >>>>>>>>>>>> KIP. I > > > >>>>>>>> am a > > > >>>>>>>>>>>> native Python speaker so I do prefer shorter names anyways > > (: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3b1. +1 to removing the word 'Task'. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3b2. I like `onUpdateStart()`, but with your permission > I'd > > > >>>>>>>>>>>> prefer > > > >>>>>>>>>>>> `onStandbyUpdateStart()` which matches the name of the > > > >>>>>>>>>>>> Interface > > > >>>>>>>>>>>> "StandbyUpdateListener". (the python part of me hates > this, > > > >>>>>>>> however) > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3b3. Going back to question 2), `earliestOffset` was > > > >>>>>>>>>>>> intended to > > > >>>>>>>> allow > > > >>>>>>>>>> us > > > >>>>>>>>>>>> to more easily calculate the amount of state _already > > > >>>>>>>>>>>> loaded_ in > > > >>>>>>>> the > > > >>>>>>>>>>> store > > > >>>>>>>>>>>> by subtracting (startingOffset - earliestOffset). This > would > > > >>>>>>>>>>>> help > > > >>>>>>>> us > > > >>>>>>>>>> see > > > >>>>>>>>>>>> how much inefficiency is introduced in a rolling > restart—if > > > >>>>>>>>>>>> we end > > > >>>>>>>> up > > > >>>>>>>>>>> going > > > >>>>>>>>>>>> from a situation with an up-to-date standby before the > > > >>>>>>>>>>>> restart, and > > > >>>>>>>>>> then > > > >>>>>>>>>>>> after the whole restart, the Task is shuffled onto an > > instance > > > >>>>>>>> where > > > >>>>>>>>>>> there > > > >>>>>>>>>>>> is no previous state, then that is expensive. However, if > > > >>>>>>>>>>>> the final > > > >>>>>>>>>>>> shuffling results in the Task back on an instance with a > lot > > > of > > > >>>>>>>>>> pre-built > > > >>>>>>>>>>>> state, it's not expensive. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> If a call over the network is required to determine the > > > >>>>>>>> earliestOffset, > > > >>>>>>>>>>>> then this is a "hard no-go" for me, and we will remove it > > > (I'll > > > >>>>>>>> have to > > > >>>>>>>>>>>> check with Eduwer as he is close to having a working > > > >>>>>>>> implementation). I > > > >>>>>>>>>>>> think we can probably determine what we wanted to see in a > > > >>>>>>>> different > > > >>>>>>>>>>>> way, but it will take more thinking.. If `earliestOffset` > is > > > >>>>>>>> confusing, > > > >>>>>>>>>>>> perhaps rename it to `earliestChangelogOffset`? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> `startingOffset` is easy to remove as it can be determined > > > >>>>>>>>>>>> from the > > > >>>>>>>>>> first > > > >>>>>>>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Anyways, I've updated the JavaDoc in the interface; > > > >>>>>>>>>>>> hopefully it's > > > >>>>>>>> more > > > >>>>>>>>>>>> clear. Awaiting further instructions here. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3c. Good point; after thinking, my preference is > > > >>>>>>>> `onBatchLoaded()` -> > > > >>>>>>>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` -> > > > >>>>>>>>>>>> `onBatchRestored()`. > > > >>>>>>>> I am > > > >>>>>>>>>>>> less fond of "processed" because when I was first learning > > > >>>>>>>>>>>> Streams > > > >>>>>>>> I > > > >>>>>>>>>>>> mistakenly thought that standby tasks actually processed > the > > > >>>>>>>>>>>> input > > > >>>>>>>>>> topic > > > >>>>>>>>>>>> rather than loaded from the changelog. I'll defer to you > > here. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet > > > >>>>>>>>>>>> `onStandbyUpdateSuspended()`. Will check about the > > > >>>>>>>>>>>> implementation > > > >>>>>>>> of > > > >>>>>>>>>>>> keeping track of the number of records loaded. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 4a. I think this might be best in a separate KIP, > especially > > > >>>>>>>>>>>> given > > > >>>>>>>> that > > > >>>>>>>>>>>> this is my and Eduwer's first time contributing to Kafka > (so > > > we > > > >>>>>>>> want to > > > >>>>>>>>>>>> minimize the blast radius). > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 4b. I might respectfully (and timidly) push back here, > > > RECYCLED > > > >>>>>>>> for an > > > >>>>>>>>>>>> Active Task is a bit confusing to me. DEMOTED and MIGRATED > > > make > > > >>>>>>>> sense > > > >>>>>>>>>>> from > > > >>>>>>>>>>>> the standpoint of an Active Task, recycling to me sounds > > like > > > >>>>>>>> throwing > > > >>>>>>>>>>>> stuff away, such that the resources (i.e. disk space) can > be > > > >>>>>>>>>>>> used > > > >>>>>>>> by a > > > >>>>>>>>>>>> separate Task. As an alternative rather than trying to > reuse > > > >>>>>>>>>>>> the > > > >>>>>>>> same > > > >>>>>>>>>>> enum, > > > >>>>>>>>>>>> maybe rename it to `StandbySuspendReason` to avoid naming > > > >>>>>>>>>>>> conflicts > > > >>>>>>>>>> with > > > >>>>>>>>>>>> `ActiveSuspendReason`? However, I could be convinced to > > rename > > > >>>>>>>> PROMOTED > > > >>>>>>>>>>> -> > > > >>>>>>>>>>>> RECYCLED, especially if Eduwer agrees. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> TLDR: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> T1. Agreed, will remove the word "Task" as it's incorrect. > > > >>>>>>>>>>>> T2. Will update to `onStandbyUpdateStart()` > > > >>>>>>>>>>>> T3. Awaiting further instructions on earliestOffset and > > > >>>>>>>> startingOffset. > > > >>>>>>>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps > > > >>>>>>>>>>> `onBatchLoaded()`? > > > >>>>>>>>>>>> T5. Will update to `onStandbyUpdateSuspended()` > > > >>>>>>>>>>>> T6. Thoughts on renaming SuspendReason to > > > StandbySuspendReason, > > > >>>>>>>> rather > > > >>>>>>>>>>> than > > > >>>>>>>>>>>> renaming PROMOTED to RECYCLED? @Eduwer? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Long Live the Otter, > > > >>>>>>>>>>>> Colt McNealy > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> *Founder, LittleHorse.dev* > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman < > > > >>>>>>>>>>> sop...@responsive.dev> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great > > > >>>>>>>>>>>>> addition to > > > >>>>>>>>>>> Streams, I > > > >>>>>>>>>>>>> can't believe we've gone so long without this. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Overall the proposal makes sense, but I had a handful of > > > >>>>>>>>>>>>> fairly > > > >>>>>>>> minor > > > >>>>>>>>>>>>> questions and suggestions/requests > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of > the > > > >>>>>>>>>> Motivation > > > >>>>>>>>>>>>> section is cut off and incomplete -- "want to be able to > > > >>>>>>>>>>>>> know " > > > >>>>>>>> what > > > >>>>>>>>>>>>> exactly? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 2. This isn't that important since the motivation as a > > > >>>>>>>>>>>>> whole is > > > >>>>>>>> clear > > > >>>>>>>>>>> to me > > > >>>>>>>>>>>>> and convincing enough, but I'm not quite sure I > understand > > > the > > > >>>>>>>>>> example > > > >>>>>>>>>>> at > > > >>>>>>>>>>>>> the end of the Motivation section. How are standby tasks > > > >>>>>>>>>>>>> (and the > > > >>>>>>>>>>> ability > > > >>>>>>>>>>>>> to hook into and monitor their status) related to the > > > >>>>>>>>>>> session.timeout.ms > > > >>>>>>>>>>>>> config? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3. To help both old and new users of Kafka Streams > > understand > > > >>>>>>>> this > > > >>>>>>>>>> new > > > >>>>>>>>>>>>> restore listener and its purpose/semantics, can we try to > > > name > > > >>>>>>>> the > > > >>>>>>>>>>> class > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>> callbacks in a way that's more consistent with the > > > >>>>>>>>>>>>> active task > > > >>>>>>>>>> restore > > > >>>>>>>>>>>>> listener? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3a. StandbyTaskUpdateListener: > > > >>>>>>>>>>>>> The existing restore listener is called > > > >>>>>>>>>>>>> StateRestoreListener, so > > > >>>>>>>> the > > > >>>>>>>>>>> new > > > >>>>>>>>>>>>> one could be called something like > > > >>>>>>>>>>>>> StandbyStateRestoreListener. > > > >>>>>>>>>>> Although > > > >>>>>>>>>>>>> we typically refer to standby tasks as "processing" > rather > > > >>>>>>>>>>>>> than > > > >>>>>>>>>>> "restoring" > > > >>>>>>>>>>>>> records -- ie restoration is a term for active task state > > > >>>>>>>>>>> specifically. I > > > >>>>>>>>>>>>> actually > > > >>>>>>>>>>>>> like the original suggestion if we just drop the "Task" > > > >>>>>>>>>>>>> part of > > > >>>>>>>> the > > > >>>>>>>>>>> name, > > > >>>>>>>>>>>>> ie StandbyUpdateListener. I think either that or > > > >>>>>>>>>> StandbyRestoreListener > > > >>>>>>>>>>>>> would be fine and probably the two best options. > > > >>>>>>>>>>>>> Also, this probably goes without saying but any change to > > the > > > >>>>>>>> name of > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>> class should of course be reflected in the > > > KafkaStreams#setXXX > > > >>>>>>>> API as > > > >>>>>>>>>>> well > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3b. #onTaskCreated > > > >>>>>>>>>>>>> I know the "start" callback feels a bit different for > > the > > > >>>>>>>> standby > > > >>>>>>>>>> task > > > >>>>>>>>>>>>> updater vs an active task beginning restoration, but I > > > >>>>>>>>>>>>> think we > > > >>>>>>>>>> should > > > >>>>>>>>>>> try > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>> keep the various callbacks aligned to their active > restore > > > >>>>>>>> listener > > > >>>>>>>>>>>>> counterpart. We can/should just replace the term > "restore" > > > >>>>>>>>>>>>> with > > > >>>>>>>>>>> "update" > > > >>>>>>>>>>>>> for the > > > >>>>>>>>>>>>> callback method names the same way we do for the class > > name, > > > >>>>>>>> which in > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>> case would give us #onUpdateStart. Personally I like this > > > >>>>>>>>>>>>> better, > > > >>>>>>>>>>>>> but it's ultimately up to you. However, I would push back > > > >>>>>>>>>>>>> against > > > >>>>>>>>>>> anything > > > >>>>>>>>>>>>> that includes the word "Task" (eg #onTaskCreated) as the > > > >>>>>>>>>>>>> listener > > > >>>>>>>>>>>>> is actually not scoped to the task itself but instead > to > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>> individual > > > >>>>>>>>>>>>> state store(s). This is the main reason I would prefer > > > >>>>>>>>>>>>> calling it > > > >>>>>>>>>>> something > > > >>>>>>>>>>>>> like #onUpdateStart, which keeps the focus on the store > > being > > > >>>>>>>> updated > > > >>>>>>>>>>>>> rather than the task that just happens to own this store > > > >>>>>>>>>>>>> One last thing on this callback -- do we really need both > > the > > > >>>>>>>>>>>>> `earliestOffset` and `startingOffset`? I feel like this > > > >>>>>>>>>>>>> might be > > > >>>>>>>> more > > > >>>>>>>>>>>>> confusing than it > > > >>>>>>>>>>>>> is helpful (tbh even I'm not completely sure I know what > > the > > > >>>>>>>>>>> earliestOffset > > > >>>>>>>>>>>>> is supposed to represent) More importantly, is this all > > > >>>>>>>> information > > > >>>>>>>>>>>>> that is already available and able to be passed in to the > > > >>>>>>>> callback by > > > >>>>>>>>>>>>> Streams? I haven't checked on this but it feels like the > > > >>>>>>>>>>> earliestOffset is > > > >>>>>>>>>>>>> likely to require a remote call, either by the embedded > > > >>>>>>>>>>>>> consumer > > > >>>>>>>> or > > > >>>>>>>>>>> via the > > > >>>>>>>>>>>>> admin client. If so, the ROI on including this parameter > > > seems > > > >>>>>>>>>>>>> quite low (if not outright negative) > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3c. #onBatchRestored > > > >>>>>>>>>>>>> If we opt to use the term "update" in place of "restore" > > > >>>>>>>> elsewhere, > > > >>>>>>>>>>> then we > > > >>>>>>>>>>>>> should consider doing so here as well. What do you think > > > about > > > >>>>>>>>>>>>> #onBatchUpdated, or even #onBatchProcessed? > > > >>>>>>>>>>>>> I'm actually not super concerned about this particular > API, > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>> honestly I > > > >>>>>>>>>>>>> think we can use restore or update interchangeably here, > so > > > if > > > >>>>>>>> you > > > >>>>>>>>>>>>> don't like any of the suggested names (and no one can > > > >>>>>>>>>>>>> think of > > > >>>>>>>>>>> anything > > > >>>>>>>>>>>>> better), I would just stick with #onBatchRestored. In > this > > > >>>>>>>>>>>>> case, > > > >>>>>>>>>>>>> it kind of makes the most sense. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3d. #onTaskSuspended > > > >>>>>>>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or > > just > > > >>>>>>>>>>>>> #onRestoreSuspended probably makes more sense for this > > > >>>>>>>>>>>>> callback. > > > >>>>>>>>>> Also, > > > >>>>>>>>>>>>> I notice the StateRestoreListener passes in the total > > > >>>>>>>>>>>>> number of > > > >>>>>>>>>>> records > > > >>>>>>>>>>>>> restored to its #onRestoreSuspended. Assuming we already > > > track > > > >>>>>>>>>>>>> that information in Streams and have it readily available > > to > > > >>>>>>>> pass in > > > >>>>>>>>>> at > > > >>>>>>>>>>>>> whatever point we would be invoking this callback, that > > > >>>>>>>>>>>>> might be > > > >>>>>>>> a > > > >>>>>>>>>>>>> useful parameter for the standby listener to have as > well > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 4. I totally love the SuspendReason thing, just two > > > >>>>>>>> notes/requests: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 4a. Feel free to push back against adding onto the scope > of > > > >>>>>>>>>>>>> this > > > >>>>>>>> KIP, > > > >>>>>>>>>>> but > > > >>>>>>>>>>>>> it would be great to expand the active state restore > > listener > > > >>>>>>>> with > > > >>>>>>>>>> this > > > >>>>>>>>>>>>> SuspendReason enum as well. It would be really useful for > > > both > > > >>>>>>>>>>> variants of > > > >>>>>>>>>>>>> restore listener > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED > -- > > > >>>>>>>>>>>>> for > > > >>>>>>>>>> standby > > > >>>>>>>>>>>>> tasks it means basically the same thing, the point is > that > > > >>>>>>>>>>>>> active > > > >>>>>>>>>>>>> tasks can also be recycled into standbys through the same > > > >>>>>>>> mechanism. > > > >>>>>>>>>>> This > > > >>>>>>>>>>>>> way they can share the SuspendReason enum -- not that > it's > > > >>>>>>>>>>>>> necessary for them to share, I just think it would be a > > good > > > >>>>>>>> idea to > > > >>>>>>>>>>> keep > > > >>>>>>>>>>>>> the two restore listeners aligned to the highest degree > > > >>>>>>>>>>>>> possible > > > >>>>>>>> for > > > >>>>>>>>>> as > > > >>>>>>>>>>>>> we can. > > > >>>>>>>>>>>>> I was actually considering proposing a short KIP with a > new > > > >>>>>>>>>>>>> RecyclingListener (or something) specifically for this > > exact > > > >>>>>>>> kind of > > > >>>>>>>>>>> thing, > > > >>>>>>>>>>>>> since we > > > >>>>>>>>>>>>> currently have literally zero insight into the recycling > > > >>>>>>>>>>>>> process. > > > >>>>>>>>>> It's > > > >>>>>>>>>>>>> practically impossible to tell when a store has been > > > converted > > > >>>>>>>> from > > > >>>>>>>>>>> active > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>> standby, or vice versa. So having access to the > > > SuspendReason, > > > >>>>>>>> and > > > >>>>>>>>>> more > > > >>>>>>>>>>>>> importantly having a callback guaranteed to notify you > > when a > > > >>>>>>>>>>>>> state store is recycled whether active or standby, would > be > > > >>>>>>>> amazing. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for the KIP! > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> ---------- Forwarded message --------- > > > >>>>>>>>>>>>>> From: Colt McNealy <c...@littlehorse.io> > > > >>>>>>>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM > > > >>>>>>>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update > > > >>>>>>>>>>>>>> Listener > > > >>>>>>>>>>>>>> To: <dev@kafka.apache.org> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> We would like to propose a small KIP to improve the > > > >>>>>>>>>>>>>> ability of > > > >>>>>>>>>>> Streams > > > >>>>>>>>>>>>> apps > > > >>>>>>>>>>>>>> to monitor the progress of their standby tasks through a > > > >>>>>>>> callback > > > >>>>>>>>>>>>>> interface. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> We have a nearly-working implementation on our fork and > > are > > > >>>>>>>> curious > > > >>>>>>>>>>> for > > > >>>>>>>>>>>>>> feedback. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thank you, > > > >>>>>>>>>>>>>> Colt McNealy > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> *Founder, LittleHorse.dev* > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>> > > > >>>> > > > > > >