Thanks for the summary, that looks good to me. Guozhang
On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io> wrote: > > Hello there! > > Thanks everyone for the comments. There's a lot of back-and-forth going on, > so I'll do my best to summarize what everyone's said in TLDR format: > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do similarly > for the other methods. > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > 3. Remove the `earliestOffset` parameter for performance reasons. > > If that's all fine with everyone, I'll update the KIP and we—well, mostly > Edu (: —will open a PR. > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <edu...@littlehorse.io> > wrote: > > > Hello everyone, > > > > Thanks for all your feedback for this KIP! > > > > I think that the key to choosing proper names for this API is understanding > > the terms used inside the StoreChangelogReader. Currently, this class has > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion, > > using StandbyUpdateListener for the interface fits better on these terms. > > Same applies for onUpdateStart/Suspended. > > > > StoreChangelogReader uses "the same mechanism" for active task restoration > > and standby task updates, but this is an implementation detail. Under > > normal circumstances (no rebalances or task migrations), the changelog > > reader will be in STANDBY_UPDATING, which means it will be updating standby > > tasks as long as there are new records in the changelog topic. That's why I > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100% > > align with StateRestoreListener, but either one is fine. > > > > Edu > > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <guozhang.wang...@gmail.com> > > wrote: > > > > > Hello Colt, > > > > > > Thanks for writing the KIP! I have read through the updated KIP and > > > overall it looks great. I only have minor naming comments (well, > > > aren't naming the least boring stuff to discuss and that takes the > > > most of the time for KIPs :P): > > > > > > 1. I tend to agree with Sophie regarding whether or not to include > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > > > it is also more consistent with the functions of > > > "StateRestoreListener" where we do not name it as > > > "onStateRestoreState" etc. > > > > > > 2. I know in community discussions we sometimes say "a standby is > > > promoted to active", but in the official code / java docs we did not > > > have a term of "promotion", since what the code does is really recycle > > > the task (while keeping its state stores open), and create a new > > > active task that takes in the recycled state stores and just changing > > > the other fields like task type etc. After thinking about this for a > > > bit, I tend to feel that "promoted" is indeed a better name for user > > > facing purposes while "recycle" is more of a technical detail inside > > > the code and could be abstracted away from users. So I feel keeping > > > the name "PROMOTED" is fine. > > > > > > 3. Regarding "earliestOffset", it does feel like we cannot always > > > avoid another call to the Kafka API. And on the other hand, I also > > > tend to think that such bookkeeping may be better done at the app > > > level than from the Streams' public API level. I.e. the app could keep > > > a "first ever starting offset" per "topic-partition-store" key, and a > > > when we have rolling restart and hence some standby task keeps > > > "jumping" from one client to another via task assignment, the app > > > would update this value just one when it finds the > > > ""topic-partition-store" was never triggered before. What do you > > > think? > > > > > > 4. I do not have a strong opinion either, but what about > > "onBatchUpdated" ? > > > > > > > > > Guozhang > > > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io> > > wrote: > > > > > > > > Sohpie— > > > > > > > > Thank you very much for such a detailed review of the KIP. It might > > > > actually be longer than the original KIP in the first place! > > > > > > > > 1. Ack'ed and fixed. > > > > > > > > 2. Correct, this is a confusing passage and requires context: > > > > > > > > One thing on our list of TODO's regarding reliability is to determine > > how > > > > to configure `session.timeout.ms`. In our Kubernetes Environment, an > > > > instance of our Streams App can be terminated, restarted, and get back > > > into > > > > the "RUNNING" Streams state in about 20 seconds. We have two options > > > here: > > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20 > > seconds > > > of > > > > unavailability for affected partitions, but avoid shuffling Tasks; or > > b) > > > > set session.timeout.ms to a low value, such as 6 seconds ( > > > > heartbeat.interval.ms of 2000), and reduce the unavailability window > > > during > > > > a rolling bounce but incur an "extra" rebalance. There are several > > > > different costs to a rebalance, including the shuffling of standby > > tasks. > > > > JMX metrics are not fine-grained enough to give us an accurate picture > > of > > > > what's going on with the whole Standby Task Shuffle Dance. I > > hypothesize > > > > that the Standby Update Listener might help us clarify just how the > > > > shuffling actually (not theoretically) works, which will help us make a > > > > more informed decision about the session timeout config. > > > > > > > > If you think this is worth putting in the KIP, I'll polish it and do > > so; > > > > else, I'll remove the current half-baked explanation. > > > > > > > > 3. Overall, I agree with this. In our app, each Task has only one Store > > > to > > > > reduce the number of changelog partitions, so I sometimes forget the > > > > distinction between the two concepts, as reflected in the KIP (: > > > > > > > > 3a. I don't like the word "Restore" here, since Restoration refers to > > an > > > > Active Task getting caught up in preparation to resume processing. > > > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a > > > > native Python speaker so I do prefer shorter names anyways (: > > > > > > > > 3b1. +1 to removing the word 'Task'. > > > > > > > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer > > > > `onStandbyUpdateStart()` which matches the name of the Interface > > > > "StandbyUpdateListener". (the python part of me hates this, however) > > > > > > > > 3b3. Going back to question 2), `earliestOffset` was intended to allow > > us > > > > to more easily calculate the amount of state _already loaded_ in the > > > store > > > > by subtracting (startingOffset - earliestOffset). This would help us > > see > > > > how much inefficiency is introduced in a rolling restart—if we end up > > > going > > > > from a situation with an up-to-date standby before the restart, and > > then > > > > after the whole restart, the Task is shuffled onto an instance where > > > there > > > > is no previous state, then that is expensive. However, if the final > > > > shuffling results in the Task back on an instance with a lot of > > pre-built > > > > state, it's not expensive. > > > > > > > > If a call over the network is required to determine the earliestOffset, > > > > then this is a "hard no-go" for me, and we will remove it (I'll have to > > > > check with Eduwer as he is close to having a working implementation). I > > > > think we can probably determine what we wanted to see in a different > > > > way, but it will take more thinking.. If `earliestOffset` is confusing, > > > > perhaps rename it to `earliestChangelogOffset`? > > > > > > > > `startingOffset` is easy to remove as it can be determined from the > > first > > > > call to `onBatch{Restored/Updated/Processed/Loaded}()`. > > > > > > > > Anyways, I've updated the JavaDoc in the interface; hopefully it's more > > > > clear. Awaiting further instructions here. > > > > > > > > 3c. Good point; after thinking, my preference is `onBatchLoaded()` -> > > > > `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. I am > > > > less fond of "processed" because when I was first learning Streams I > > > > mistakenly thought that standby tasks actually processed the input > > topic > > > > rather than loaded from the changelog. I'll defer to you here. > > > > > > > > 3d. +1 to `onUpdateSuspended()`, or better yet > > > > `onStandbyUpdateSuspended()`. Will check about the implementation of > > > > keeping track of the number of records loaded. > > > > > > > > 4a. I think this might be best in a separate KIP, especially given that > > > > this is my and Eduwer's first time contributing to Kafka (so we want to > > > > minimize the blast radius). > > > > > > > > 4b. I might respectfully (and timidly) push back here, RECYCLED for an > > > > Active Task is a bit confusing to me. DEMOTED and MIGRATED make sense > > > from > > > > the standpoint of an Active Task, recycling to me sounds like throwing > > > > stuff away, such that the resources (i.e. disk space) can be used by a > > > > separate Task. As an alternative rather than trying to reuse the same > > > enum, > > > > maybe rename it to `StandbySuspendReason` to avoid naming conflicts > > with > > > > `ActiveSuspendReason`? However, I could be convinced to rename PROMOTED > > > -> > > > > RECYCLED, especially if Eduwer agrees. > > > > > > > > TLDR: > > > > > > > > T1. Agreed, will remove the word "Task" as it's incorrect. > > > > T2. Will update to `onStandbyUpdateStart()` > > > > T3. Awaiting further instructions on earliestOffset and startingOffset. > > > > T4. I don't like `onBatchProcessed()` too much, perhaps > > > `onBatchLoaded()`? > > > > T5. Will update to `onStandbyUpdateSuspended()` > > > > T6. Thoughts on renaming SuspendReason to StandbySuspendReason, rather > > > than > > > > renaming PROMOTED to RECYCLED? @Eduwer? > > > > > > > > Long Live the Otter, > > > > Colt McNealy > > > > > > > > *Founder, LittleHorse.dev* > > > > > > > > > > > > On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman < > > > sop...@responsive.dev> > > > > wrote: > > > > > > > > > Hey Colt! Thanks for the KIP -- this will be a great addition to > > > Streams, I > > > > > can't believe we've gone so long without this. > > > > > > > > > > Overall the proposal makes sense, but I had a handful of fairly minor > > > > > questions and suggestions/requests > > > > > > > > > > 1. Seems like the last sentence in the 2nd paragraph of the > > Motivation > > > > > section is cut off and incomplete -- "want to be able to know " what > > > > > exactly? > > > > > > > > > > 2. This isn't that important since the motivation as a whole is clear > > > to me > > > > > and convincing enough, but I'm not quite sure I understand the > > example > > > at > > > > > the end of the Motivation section. How are standby tasks (and the > > > ability > > > > > to hook into and monitor their status) related to the > > > session.timeout.ms > > > > > config? > > > > > > > > > > 3. To help both old and new users of Kafka Streams understand this > > new > > > > > restore listener and its purpose/semantics, can we try to name the > > > class > > > > > and > > > > > callbacks in a way that's more consistent with the active task > > restore > > > > > listener? > > > > > > > > > > 3a. StandbyTaskUpdateListener: > > > > > The existing restore listener is called StateRestoreListener, so the > > > new > > > > > one could be called something like StandbyStateRestoreListener. > > > Although > > > > > we typically refer to standby tasks as "processing" rather than > > > "restoring" > > > > > records -- ie restoration is a term for active task state > > > specifically. I > > > > > actually > > > > > like the original suggestion if we just drop the "Task" part of the > > > name, > > > > > ie StandbyUpdateListener. I think either that or > > StandbyRestoreListener > > > > > would be fine and probably the two best options. > > > > > Also, this probably goes without saying but any change to the name of > > > this > > > > > class should of course be reflected in the KafkaStreams#setXXX API as > > > well > > > > > > > > > > 3b. #onTaskCreated > > > > > I know the "start" callback feels a bit different for the standby > > task > > > > > updater vs an active task beginning restoration, but I think we > > should > > > try > > > > > to > > > > > keep the various callbacks aligned to their active restore listener > > > > > counterpart. We can/should just replace the term "restore" with > > > "update" > > > > > for the > > > > > callback method names the same way we do for the class name, which in > > > this > > > > > case would give us #onUpdateStart. Personally I like this better, > > > > > but it's ultimately up to you. However, I would push back against > > > anything > > > > > that includes the word "Task" (eg #onTaskCreated) as the listener > > > > > is actually not scoped to the task itself but instead to the > > > individual > > > > > state store(s). This is the main reason I would prefer calling it > > > something > > > > > like #onUpdateStart, which keeps the focus on the store being updated > > > > > rather than the task that just happens to own this store > > > > > One last thing on this callback -- do we really need both the > > > > > `earliestOffset` and `startingOffset`? I feel like this might be more > > > > > confusing than it > > > > > is helpful (tbh even I'm not completely sure I know what the > > > earliestOffset > > > > > is supposed to represent) More importantly, is this all information > > > > > that is already available and able to be passed in to the callback by > > > > > Streams? I haven't checked on this but it feels like the > > > earliestOffset is > > > > > likely to require a remote call, either by the embedded consumer or > > > via the > > > > > admin client. If so, the ROI on including this parameter seems > > > > > quite low (if not outright negative) > > > > > > > > > > 3c. #onBatchRestored > > > > > If we opt to use the term "update" in place of "restore" elsewhere, > > > then we > > > > > should consider doing so here as well. What do you think about > > > > > #onBatchUpdated, or even #onBatchProcessed? > > > > > I'm actually not super concerned about this particular API, and > > > honestly I > > > > > think we can use restore or update interchangeably here, so if you > > > > > don't like any of the suggested names (and no one can think of > > > anything > > > > > better), I would just stick with #onBatchRestored. In this case, > > > > > it kind of makes the most sense. > > > > > > > > > > 3d. #onTaskSuspended > > > > > Along the same lines as 3b above, #onUpdateSuspended or just > > > > > #onRestoreSuspended probably makes more sense for this callback. > > Also, > > > > > I notice the StateRestoreListener passes in the total number of > > > records > > > > > restored to its #onRestoreSuspended. Assuming we already track > > > > > that information in Streams and have it readily available to pass in > > at > > > > > whatever point we would be invoking this callback, that might be a > > > > > useful parameter for the standby listener to have as well > > > > > > > > > > 4. I totally love the SuspendReason thing, just two notes/requests: > > > > > > > > > > 4a. Feel free to push back against adding onto the scope of this KIP, > > > but > > > > > it would be great to expand the active state restore listener with > > this > > > > > SuspendReason enum as well. It would be really useful for both > > > variants of > > > > > restore listener > > > > > > > > > > 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for > > standby > > > > > tasks it means basically the same thing, the point is that active > > > > > tasks can also be recycled into standbys through the same mechanism. > > > This > > > > > way they can share the SuspendReason enum -- not that it's > > > > > necessary for them to share, I just think it would be a good idea to > > > keep > > > > > the two restore listeners aligned to the highest degree possible for > > as > > > > > we can. > > > > > I was actually considering proposing a short KIP with a new > > > > > RecyclingListener (or something) specifically for this exact kind of > > > thing, > > > > > since we > > > > > currently have literally zero insight into the recycling process. > > It's > > > > > practically impossible to tell when a store has been converted from > > > active > > > > > to > > > > > standby, or vice versa. So having access to the SuspendReason, and > > more > > > > > importantly having a callback guaranteed to notify you when a > > > > > state store is recycled whether active or standby, would be amazing. > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman > > > > > > > > > > > > > > > ---------- Forwarded message --------- > > > > > > From: Colt McNealy <c...@littlehorse.io> > > > > > > Date: Tue, Oct 3, 2023 at 12:48 PM > > > > > > Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener > > > > > > To: <dev@kafka.apache.org> > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > We would like to propose a small KIP to improve the ability of > > > Streams > > > > > apps > > > > > > to monitor the progress of their standby tasks through a callback > > > > > > interface. > > > > > > > > > > > > We have a nearly-working implementation on our fork and are curious > > > for > > > > > > feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener > > > > > > > > > > > > Thank you, > > > > > > Colt McNealy > > > > > > > > > > > > *Founder, LittleHorse.dev* > > > > > > > > > > > > > > > >