That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy <c...@littlehorse.io> wrote:

> Hi Sophie, Matthias, Bruno, and Eduwer—
>
> Thanks for your patience as I have been scrambling to catch up after a week
> of business travel (and a few days with no time to code). I'd like to tie
> up some loose ends here, but in short, I don't think the KIP document
> itself needs any changes (our internal implementation does, however).
>
> 1. In the interest of a) not changing the KIP after it's already out for a
> vote, and b) making sure our English grammar is "correct", let's stick with
> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch.
>
> 2. For me (and, thankfully, the community as well) adding a remote network
> call at any point in this KIP is a non-starter. We'll ensure that
> our implementation does not introduce one.
>
> 3. I really don't like changing API behavior, even if it's not documented
> in the javadoc. As such, I am strongly against modifying the behavior of
> endOffsets() on the consumer as some people may implicitly depend on the
> contract.
> 3a. The Consumer#currentLag() method gives us exactly what we want without
> a network call (current lag from a cache, from which we can compute the
> offset).
>
> 4. I have no opinion about whether we should pass endOffset or currentLag
> to the callback. Either one has the same exact information inside it. In
> the interest of not changing the KIP after the vote has started, I'll leave
> it as endOffset.
>
> As such, I believe the KIP doesn't need any updates, nor has it been
> updated since the vote started.
>
> Would anyone else like to discuss something before the Otter Council
> adjourns regarding this matter?
>
> Cheers,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
> sop...@responsive.dev>
> wrote:
>
> > Just want to checkpoint the current state of this KIP and make sure we're
> > on track to get it in to 3.7 (we still have a few weeks)  -- looks like
> > there are two remaining open questions, both relating to the
> > middle/intermediate callback:
> >
> > 1. What to name it: seems like the primary candidates are onBatchLoaded
> and
> > onBatchUpdated (and maybe also onStandbyUpdated?)
> > 2. What additional information can we pass in that would strike a good
> > balance between being helpful and impacting performance.
> >
> > Regarding #1, I think all of the current options are reasonable enough
> that
> > we should just let Colt decide which he prefers. I personally think
> > #onBatchUpdated is fine -- Bruno does make a fair point but the truth is
> > that English grammar can be sticky and while it could be argued that it
> is
> > the store which is updated, not the batch, I feel that it is perfectly
> > clear what is meant by "onBatchUpdated" and to me, this doesn't sound
> weird
> > at all. That's just my two cents in case it helps, but again, whatever
> > makes sense to you Colt is fine
> >
> > When it comes to #2 -- as much as I would love to dig into the Consumer
> > client lore and see if we can modify existing APIs or add new ones in
> order
> > to get the desired offset metadata in an efficient way, I think we're
> > starting to go down a rabbit hole that is going to expand the scope way
> > beyond what Colt thought he was signing up for. I would advocate to focus
> > on just the basic feature for now and drop the end-offset from the
> > callback. Once we have a standby listener it will be easy to expand on
> with
> > a followup KIP if/when we find an efficient way to add additional useful
> > information. I think it will also become more clear what is and isn't
> > useful after more people get to using it in the real world
> >
> > Colt/Eduwer: how necessary is receiving the end offset during a batch
> > update to your own application use case?
> >
> > Also, for those who really do need to check the current end offset, I
> > believe in theory you should be able to use the KafkaStreams#metrics API
> to
> > get the current lag and/or end offset for the changelog -- it's possible
> > this does not represent the most up-to-date end offset (I'm not sure it
> > does or does not), but it should be close enough to be reliable and
> useful
> > for the purpose of monitoring -- I mean it is a metric, after all.
> >
> > Hope this helps -- in the end, it's up to you (Colt) to decide what you
> > want to bring in scope or not. We still have more than 3 weeks until the
> > KIP freeze as currently proposed, so in theory you could even implement
> > this KIP without the end offset and then do a followup KIP to add the end
> > offset within the same release, ie without any deprecations. There are
> > plenty of paths forward here, so don't let us drag this out forever if
> you
> > know what you want
> >
> > Cheers,
> > Sophie
> >
> > On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > Forgot one thing:
> > >
> > > We could also pass `currentLag()` into `onBachLoaded()` instead of
> > > end-offset.
> > >
> > >
> > > -Matthias
> > >
> > > On 10/20/23 10:56 AM, Matthias J. Sax wrote:
> > > > Thanks for digging into this Bruno.
> > > >
> > > > The JavaDoc on the consumer does not say anything specific about
> > > > `endOffset` guarantees:
> > > >
> > > >> Get the end offsets for the given partitions. In the default {@code
> > > >> read_uncommitted} isolation level, the end
> > > >> offset is the high watermark (that is, the offset of the last
> > > >> successfully replicated message plus one). For
> > > >> {@code read_committed} consumers, the end offset is the last stable
> > > >> offset (LSO), which is the minimum of
> > > >> the high watermark and the smallest offset of any open transaction.
> > > >> Finally, if the partition has never been
> > > >> written to, the end offset is 0.
> > > >
> > > > Thus, I actually believe that it would be ok to change the
> > > > implementation and serve the answer from the `TopicPartitionState`?
> > > >
> > > > Another idea would be, to use `currentLag()` in combination with
> > > > `position()` (or the offset of the last read record) to compute the
> > > > end-offset of the fly?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 10/20/23 4:00 AM, Bruno Cadonna wrote:
> > > >> Hi,
> > > >>
> > > >> Matthias is correct that the end offsets are stored somewhere in the
> > > >> metadata of the consumer. More precisely, they are stored in the
> > > >> `TopicPartitionState`. However, I could not find public API on the
> > > >> consumer other than currentLag() that uses the stored end offsets.
> If
> > > >> I understand the code correctly, method endOffSets() always
> triggers a
> > > >> remote call.
> > > >>
> > > >> I am a bit concerned about doing remote calls every
> > commit.interval.ms
> > > >> (by default 200ms under EOS). At the moment the remote calls are
> only
> > > >> issued if an optimization for KTables is turned on where changelog
> > > >> topics are replaced with the input topic of the KTable. The current
> > > >> remote calls retrieve all committed offsets of the group at once.
> If I
> > > >> understand correctly, that is one single remote call. Remote calls
> for
> > > >> getting end offsets of changelog topics -- as I understand you are
> > > >> planning to issue -- will probably result in multiple remote calls
> to
> > > >> multiple leaders of the changelog topic partitions.
> > > >>
> > > >> Please correct me if I misunderstood anything of the above.
> > > >>
> > > >> If my understanding is correct, I propose to modify the consumer in
> > > >> such a way to get the end offset from the locally stored metadata
> > > >> whenever possible as part of the implementation of this KIP. I do
> not
> > > >> know what the implications are of such a change of the consumer and
> if
> > > >> a KIP is needed for it. Maybe, endOffsets() guarantees to return the
> > > >> freshest end offsets possible, which would not be satisfied with the
> > > >> modification.
> > > >>
> > > >> Regarding the naming, I do not completely agree with Matthias. While
> > > >> the pattern might be consistent with onBatchUpdated, what is the
> > > >> meaning of onBatchUpdated? Is the batch updated? The names
> > > >> onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO.
> > > >> With "restore" the pattern works better. If I restore a batch of
> > > >> records in a state, the records are not there although they should
> be
> > > >> there and I add them. If I update a batch of records in a state.
> This
> > > >> sounds like the batch of records is in the state and I modify the
> > > >> existing records within the state. That is clearly not the meaning
> of
> > > >> the event for which the listener should be called.
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >>
> > > >>
> > > >> On 10/19/23 2:12 AM, Matthias J. Sax wrote:
> > > >>> Thanks for the KIP. Seems I am almost late to the party.
> > > >>>
> > > >>> About naming (fun, fun, fun): I like the current proposal overall,
> > > >>> except `onBachLoaded`, but would prefer `onBatchUpdated`. It better
> > > >>> aligns to everything else:
> > > >>>
> > > >>>   - it's an update-listener, not loaded-listener
> > > >>>   - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`,
> > > >>> `onRestoreSuspended, and `onBachRestored` (it's very consistent
> > > >>>   - `StandbyUpdateListener` should have `onUpdateStart`,
> > > >>> `onUpdateSuspended` and `onBatchUpdated`  to be equally consistent
> > > >>> (using "loaded" breaks the pattern)
> > > >>>
> > > >>>
> > > >>> About the end-offset question: I am relatively sure that the
> consumer
> > > >>> gets the latest end-offset as attached metadata in every fetch
> > > >>> response. (We exploit this behavior to track end-offsets for input
> > > >>> topic with regard to `max.task.idle.ms` without overhead -- it was
> > > >>> also a concern when we did the corresponding KIP how we could track
> > > >>> lag with no overhead).
> > > >>>
> > > >>> Thus, I believe we would "just" need to modify the code accordingly
> > > >>> to get this information from the restore-consumer
> > > >>> (`restorConsumer.endOffsets(...)`; should be served w/o RPC but
> from
> > > >>> internal metadata cache) for free, and pass into the listener.
> > > >>>
> > > >>> Please double check / verify this claim and keep me honest about
> it.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 10/17/23 6:38 AM, Eduwer Camacaro wrote:
> > > >>>> Hi Bruno,
> > > >>>>
> > > >>>> Thanks for your observation; surely it will require a network call
> > > >>>> using
> > > >>>> the admin client in order to know this "endOffset" and that will
> > > >>>> have an
> > > >>>> impact on performance. We can either find a solution that has a
> low
> > > >>>> impact
> > > >>>> on performance or ideally zero impact; unfortunately, I don't see
> a
> > > >>>> way to
> > > >>>> have zero impact on performance. However, we can leverage the
> > existing
> > > >>>> #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses
> the
> > > >>>> admin
> > > >>>> client to ask for these "endOffset"s. As far I can understand,
> this
> > > >>>> update
> > > >>>> is done periodically using the "commit.interval.ms"
> configuration.
> > I
> > > >>>> believe this option will force us to invoke StandbyUpdateLister
> once
> > > >>>> this
> > > >>>> interval is reached.
> > > >>>>
> > > >>>> On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <cado...@apache.org
> >
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Thanks for the KIP, Colt and Eduwer,
> > > >>>>>
> > > >>>>> Are you sure there is also not a significant performance impact
> for
> > > >>>>> passing into the callback `currentEndOffset`?
> > > >>>>>
> > > >>>>> I am asking because the comment here:
> > > >>>>>
> > > >>>>>
> > >
> >
> https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129
> > > >>>>>
> > > >>>>> says that the end-offset is only updated once for standby tasks
> > whose
> > > >>>>> changelog topic is not piggy-backed on input topics. I could also
> > not
> > > >>>>> find the update of end-offset for those standbys.
> > > >>>>>
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Bruno
> > > >>>>>
> > > >>>>> On 10/16/23 10:55 AM, Lucas Brutschy wrote:
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> it's a nice improvement! I don't have anything to add on top of
> > the
> > > >>>>>> previous comments, just came here to say that it seems to me
> > > >>>>>> consensus
> > > >>>>>> has been reached and the result looks good to me.
> > > >>>>>>
> > > >>>>>> Thanks Colt and Eduwer!
> > > >>>>>> Lucas
> > > >>>>>>
> > > >>>>>> On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <
> c...@littlehorse.io
> > >
> > > >>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Thanks, Guozhang. I've updated the KIP and will start a vote.
> > > >>>>>>>
> > > >>>>>>> Colt McNealy
> > > >>>>>>>
> > > >>>>>>> *Founder, LittleHorse.dev*
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <
> > > >>>>> guozhang.wang...@gmail.com>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Thanks for the summary, that looks good to me.
> > > >>>>>>>>
> > > >>>>>>>> Guozhang
> > > >>>>>>>>
> > > >>>>>>>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <
> > c...@littlehorse.io
> > > >
> > > >>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> Hello there!
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks everyone for the comments. There's a lot of
> > back-and-forth
> > > >>>>> going
> > > >>>>>>>> on,
> > > >>>>>>>>> so I'll do my best to summarize what everyone's said in TLDR
> > > >>>>>>>>> format:
> > > >>>>>>>>>
> > > >>>>>>>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and
> > do
> > > >>>>>>>> similarly
> > > >>>>>>>>> for the other methods.
> > > >>>>>>>>> 2. Keep `SuspendReason.PROMOTED` and
> `SuspendReason.MIGRATED`.
> > > >>>>>>>>> 3. Remove the `earliestOffset` parameter for performance
> > reasons.
> > > >>>>>>>>>
> > > >>>>>>>>> If that's all fine with everyone, I'll update the KIP and
> > > we—well,
> > > >>>>> mostly
> > > >>>>>>>>> Edu (:  —will open a PR.
> > > >>>>>>>>>
> > > >>>>>>>>> Cheers,
> > > >>>>>>>>> Colt McNealy
> > > >>>>>>>>>
> > > >>>>>>>>> *Founder, LittleHorse.dev*
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <
> > > >>>>> edu...@littlehorse.io>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hello everyone,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for all your feedback for this KIP!
> > > >>>>>>>>>>
> > > >>>>>>>>>> I think that the key to choosing proper names for this API
> is
> > > >>>>>>>> understanding
> > > >>>>>>>>>> the terms used inside the StoreChangelogReader. Currently,
> > > >>>>>>>>>> this class
> > > >>>>>>>> has
> > > >>>>>>>>>> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING.
> In
> > > my
> > > >>>>>>>> opinion,
> > > >>>>>>>>>> using StandbyUpdateListener for the interface fits better on
> > > >>>>>>>>>> these
> > > >>>>>>>> terms.
> > > >>>>>>>>>> Same applies for onUpdateStart/Suspended.
> > > >>>>>>>>>>
> > > >>>>>>>>>> StoreChangelogReader uses "the same mechanism" for active
> task
> > > >>>>>>>> restoration
> > > >>>>>>>>>> and standby task updates, but this is an implementation
> > > >>>>>>>>>> detail. Under
> > > >>>>>>>>>> normal circumstances (no rebalances or task migrations), the
> > > >>>>> changelog
> > > >>>>>>>>>> reader will be in STANDBY_UPDATING, which means it will be
> > > >>>>>>>>>> updating
> > > >>>>>>>> standby
> > > >>>>>>>>>> tasks as long as there are new records in the changelog
> topic.
> > > >>>>>>>>>> That's
> > > >>>>>>>> why I
> > > >>>>>>>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if
> it
> > > >>>>>>>>>> doesn't
> > > >>>>>>>> 100%
> > > >>>>>>>>>> align with StateRestoreListener, but either one is fine.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Edu
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> > > >>>>>>>> guozhang.wang...@gmail.com>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hello Colt,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks for writing the KIP! I have read through the updated
> > > >>>>>>>>>>> KIP and
> > > >>>>>>>>>>> overall it looks great. I only have minor naming comments
> > > (well,
> > > >>>>>>>>>>> aren't naming the least boring stuff to discuss and that
> > > >>>>>>>>>>> takes the
> > > >>>>>>>>>>> most of the time for KIPs :P):
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 1. I tend to agree with Sophie regarding whether or not to
> > > >>>>>>>>>>> include
> > > >>>>>>>>>>> "Standby" in the functions of
> > "onStandbyUpdateStart/Suspended",
> > > >>>>> since
> > > >>>>>>>>>>> it is also more consistent with the functions of
> > > >>>>>>>>>>> "StateRestoreListener" where we do not name it as
> > > >>>>>>>>>>> "onStateRestoreState" etc.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 2. I know in community discussions we sometimes say "a
> > > >>>>>>>>>>> standby is
> > > >>>>>>>>>>> promoted to active", but in the official code / java docs
> we
> > > >>>>>>>>>>> did not
> > > >>>>>>>>>>> have a term of "promotion", since what the code does is
> > really
> > > >>>>>>>> recycle
> > > >>>>>>>>>>> the task (while keeping its state stores open), and create
> a
> > > new
> > > >>>>>>>>>>> active task that takes in the recycled state stores and
> just
> > > >>>>> changing
> > > >>>>>>>>>>> the other fields like task type etc. After thinking about
> > > >>>>>>>>>>> this for a
> > > >>>>>>>>>>> bit, I tend to feel that "promoted" is indeed a better name
> > > >>>>>>>>>>> for user
> > > >>>>>>>>>>> facing purposes while "recycle" is more of a technical
> detail
> > > >>>>>>>>>>> inside
> > > >>>>>>>>>>> the code and could be abstracted away from users. So I feel
> > > >>>>>>>>>>> keeping
> > > >>>>>>>>>>> the name "PROMOTED" is fine.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 3. Regarding "earliestOffset", it does feel like we cannot
> > > >>>>>>>>>>> always
> > > >>>>>>>>>>> avoid another call to the Kafka API. And on the other
> hand, I
> > > >>>>>>>>>>> also
> > > >>>>>>>>>>> tend to think that such bookkeeping may be better done at
> the
> > > >>>>>>>>>>> app
> > > >>>>>>>>>>> level than from the Streams' public API level. I.e. the app
> > > >>>>>>>>>>> could
> > > >>>>>>>> keep
> > > >>>>>>>>>>> a "first ever starting offset" per "topic-partition-store"
> > > >>>>>>>>>>> key, and
> > > >>>>> a
> > > >>>>>>>>>>> when we have rolling restart and hence some standby task
> > keeps
> > > >>>>>>>>>>> "jumping" from one client to another via task assignment,
> the
> > > >>>>>>>>>>> app
> > > >>>>>>>>>>> would update this value just one when it finds the
> > > >>>>>>>>>>> ""topic-partition-store" was never triggered before. What
> do
> > > you
> > > >>>>>>>>>>> think?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 4. I do not have a strong opinion either, but what about
> > > >>>>>>>>>> "onBatchUpdated" ?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Guozhang
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy
> > > >>>>>>>>>>> <c...@littlehorse.io>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Sohpie—
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you very much for such a detailed review of the KIP.
> > > >>>>>>>>>>>> It might
> > > >>>>>>>>>>>> actually be longer than the original KIP in the first
> place!
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 1. Ack'ed and fixed.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 2. Correct, this is a confusing passage and requires
> > context:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> One thing on our list of TODO's regarding reliability is
> to
> > > >>>>>>>> determine
> > > >>>>>>>>>> how
> > > >>>>>>>>>>>> to configure `session.timeout.ms`. In our Kubernetes
> > > >>>>>>>>>>>> Environment,
> > > >>>>>>>> an
> > > >>>>>>>>>>>> instance of our Streams App can be terminated, restarted,
> > > >>>>>>>>>>>> and get
> > > >>>>>>>> back
> > > >>>>>>>>>>> into
> > > >>>>>>>>>>>> the "RUNNING" Streams state in about 20 seconds. We have
> two
> > > >>>>>>>> options
> > > >>>>>>>>>>> here:
> > > >>>>>>>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal
> > with
> > > 20
> > > >>>>>>>>>> seconds
> > > >>>>>>>>>>> of
> > > >>>>>>>>>>>> unavailability for affected partitions, but avoid
> shuffling
> > > >>>>>>>>>>>> Tasks;
> > > >>>>>>>> or
> > > >>>>>>>>>> b)
> > > >>>>>>>>>>>> set session.timeout.ms to a low value, such as 6 seconds
> (
> > > >>>>>>>>>>>> heartbeat.interval.ms of 2000), and reduce the
> > unavailability
> > > >>>>>>>> window
> > > >>>>>>>>>>> during
> > > >>>>>>>>>>>> a rolling bounce but incur an "extra" rebalance. There are
> > > >>>>>>>>>>>> several
> > > >>>>>>>>>>>> different costs to a rebalance, including the shuffling of
> > > >>>>>>>>>>>> standby
> > > >>>>>>>>>> tasks.
> > > >>>>>>>>>>>> JMX metrics are not fine-grained enough to give us an
> > accurate
> > > >>>>>>>> picture
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>> what's going on with the whole Standby Task Shuffle
> Dance. I
> > > >>>>>>>>>> hypothesize
> > > >>>>>>>>>>>> that the Standby Update Listener might help us clarify
> just
> > > >>>>>>>>>>>> how the
> > > >>>>>>>>>>>> shuffling actually (not theoretically) works, which will
> > > >>>>>>>>>>>> help us
> > > >>>>>>>> make a
> > > >>>>>>>>>>>> more informed decision about the session timeout config.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> If you think this is worth putting in the KIP, I'll polish
> > > >>>>>>>>>>>> it and
> > > >>>>>>>> do
> > > >>>>>>>>>> so;
> > > >>>>>>>>>>>> else, I'll remove the current half-baked explanation.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3. Overall, I agree with this. In our app, each Task has
> > > >>>>>>>>>>>> only one
> > > >>>>>>>> Store
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>> reduce the number of changelog partitions, so I sometimes
> > > >>>>>>>>>>>> forget
> > > >>>>>>>> the
> > > >>>>>>>>>>>> distinction between the two concepts, as reflected in the
> > > >>>>>>>>>>>> KIP (:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3a. I don't like the word "Restore" here, since
> Restoration
> > > >>>>>>>>>>>> refers
> > > >>>>>>>> to
> > > >>>>>>>>>> an
> > > >>>>>>>>>>>> Active Task getting caught up in preparation to resume
> > > >>>>>>>>>>>> processing.
> > > >>>>>>>>>>>> `StandbyUpdateListener` is fine by me; I have updated the
> > > >>>>>>>>>>>> KIP. I
> > > >>>>>>>> am a
> > > >>>>>>>>>>>> native Python speaker so I do prefer shorter names anyways
> > (:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3b1. +1 to removing the word 'Task'.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3b2. I like `onUpdateStart()`, but with your permission
> I'd
> > > >>>>>>>>>>>> prefer
> > > >>>>>>>>>>>> `onStandbyUpdateStart()` which matches the name of the
> > > >>>>>>>>>>>> Interface
> > > >>>>>>>>>>>> "StandbyUpdateListener". (the python part of me hates
> this,
> > > >>>>>>>> however)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3b3. Going back to question 2), `earliestOffset` was
> > > >>>>>>>>>>>> intended to
> > > >>>>>>>> allow
> > > >>>>>>>>>> us
> > > >>>>>>>>>>>> to more easily calculate the amount of state _already
> > > >>>>>>>>>>>> loaded_ in
> > > >>>>>>>> the
> > > >>>>>>>>>>> store
> > > >>>>>>>>>>>> by subtracting (startingOffset - earliestOffset). This
> would
> > > >>>>>>>>>>>> help
> > > >>>>>>>> us
> > > >>>>>>>>>> see
> > > >>>>>>>>>>>> how much inefficiency is introduced in a rolling
> restart—if
> > > >>>>>>>>>>>> we end
> > > >>>>>>>> up
> > > >>>>>>>>>>> going
> > > >>>>>>>>>>>> from a situation with an up-to-date standby before the
> > > >>>>>>>>>>>> restart, and
> > > >>>>>>>>>> then
> > > >>>>>>>>>>>> after the whole restart, the Task is shuffled onto an
> > instance
> > > >>>>>>>> where
> > > >>>>>>>>>>> there
> > > >>>>>>>>>>>> is no previous state, then that is expensive. However, if
> > > >>>>>>>>>>>> the final
> > > >>>>>>>>>>>> shuffling results in the Task back on an instance with a
> lot
> > > of
> > > >>>>>>>>>> pre-built
> > > >>>>>>>>>>>> state, it's not expensive.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> If a call over the network is required to determine the
> > > >>>>>>>> earliestOffset,
> > > >>>>>>>>>>>> then this is a "hard no-go" for me, and we will remove it
> > > (I'll
> > > >>>>>>>> have to
> > > >>>>>>>>>>>> check with Eduwer as he is close to having a working
> > > >>>>>>>> implementation). I
> > > >>>>>>>>>>>> think we can probably determine what we wanted to see in a
> > > >>>>>>>> different
> > > >>>>>>>>>>>> way, but it will take more thinking.. If `earliestOffset`
> is
> > > >>>>>>>> confusing,
> > > >>>>>>>>>>>> perhaps rename it to `earliestChangelogOffset`?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> `startingOffset` is easy to remove as it can be determined
> > > >>>>>>>>>>>> from the
> > > >>>>>>>>>> first
> > > >>>>>>>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Anyways, I've updated the JavaDoc in the interface;
> > > >>>>>>>>>>>> hopefully it's
> > > >>>>>>>> more
> > > >>>>>>>>>>>> clear. Awaiting further instructions here.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3c. Good point; after thinking, my preference is
> > > >>>>>>>> `onBatchLoaded()`  ->
> > > >>>>>>>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` ->
> > > >>>>>>>>>>>> `onBatchRestored()`.
> > > >>>>>>>> I am
> > > >>>>>>>>>>>> less fond of "processed" because when I was first learning
> > > >>>>>>>>>>>> Streams
> > > >>>>>>>> I
> > > >>>>>>>>>>>> mistakenly thought that standby tasks actually processed
> the
> > > >>>>>>>>>>>> input
> > > >>>>>>>>>> topic
> > > >>>>>>>>>>>> rather than loaded from the changelog. I'll defer to you
> > here.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet
> > > >>>>>>>>>>>> `onStandbyUpdateSuspended()`. Will check about the
> > > >>>>>>>>>>>> implementation
> > > >>>>>>>> of
> > > >>>>>>>>>>>> keeping track of the number of records loaded.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 4a. I think this might be best in a separate KIP,
> especially
> > > >>>>>>>>>>>> given
> > > >>>>>>>> that
> > > >>>>>>>>>>>> this is my and Eduwer's first time contributing to Kafka
> (so
> > > we
> > > >>>>>>>> want to
> > > >>>>>>>>>>>> minimize the blast radius).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 4b. I might respectfully (and timidly) push back here,
> > > RECYCLED
> > > >>>>>>>> for an
> > > >>>>>>>>>>>> Active Task is a bit confusing to me. DEMOTED and MIGRATED
> > > make
> > > >>>>>>>> sense
> > > >>>>>>>>>>> from
> > > >>>>>>>>>>>> the standpoint of an Active Task, recycling to me sounds
> > like
> > > >>>>>>>> throwing
> > > >>>>>>>>>>>> stuff away, such that the resources (i.e. disk space) can
> be
> > > >>>>>>>>>>>> used
> > > >>>>>>>> by a
> > > >>>>>>>>>>>> separate Task. As an alternative rather than trying to
> reuse
> > > >>>>>>>>>>>> the
> > > >>>>>>>> same
> > > >>>>>>>>>>> enum,
> > > >>>>>>>>>>>> maybe rename it to `StandbySuspendReason` to avoid naming
> > > >>>>>>>>>>>> conflicts
> > > >>>>>>>>>> with
> > > >>>>>>>>>>>> `ActiveSuspendReason`? However, I could be convinced to
> > rename
> > > >>>>>>>> PROMOTED
> > > >>>>>>>>>>> ->
> > > >>>>>>>>>>>> RECYCLED, especially if Eduwer agrees.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> TLDR:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> T1. Agreed, will remove the word "Task" as it's incorrect.
> > > >>>>>>>>>>>> T2. Will update to `onStandbyUpdateStart()`
> > > >>>>>>>>>>>> T3. Awaiting further instructions on earliestOffset and
> > > >>>>>>>> startingOffset.
> > > >>>>>>>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps
> > > >>>>>>>>>>> `onBatchLoaded()`?
> > > >>>>>>>>>>>> T5. Will update to `onStandbyUpdateSuspended()`
> > > >>>>>>>>>>>> T6. Thoughts on renaming SuspendReason to
> > > StandbySuspendReason,
> > > >>>>>>>> rather
> > > >>>>>>>>>>> than
> > > >>>>>>>>>>>> renaming PROMOTED to RECYCLED? @Eduwer?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Long Live the Otter,
> > > >>>>>>>>>>>> Colt McNealy
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> *Founder, LittleHorse.dev*
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> > > >>>>>>>>>>> sop...@responsive.dev>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great
> > > >>>>>>>>>>>>> addition to
> > > >>>>>>>>>>> Streams, I
> > > >>>>>>>>>>>>> can't believe we've gone so long without this.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Overall the proposal makes sense, but I had a handful of
> > > >>>>>>>>>>>>> fairly
> > > >>>>>>>> minor
> > > >>>>>>>>>>>>> questions and suggestions/requests
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of
> the
> > > >>>>>>>>>> Motivation
> > > >>>>>>>>>>>>> section is cut off and incomplete -- "want to be able to
> > > >>>>>>>>>>>>> know "
> > > >>>>>>>> what
> > > >>>>>>>>>>>>> exactly?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2. This isn't that important since the motivation as a
> > > >>>>>>>>>>>>> whole is
> > > >>>>>>>> clear
> > > >>>>>>>>>>> to me
> > > >>>>>>>>>>>>> and convincing enough, but I'm not quite sure I
> understand
> > > the
> > > >>>>>>>>>> example
> > > >>>>>>>>>>> at
> > > >>>>>>>>>>>>> the end of the Motivation section. How are standby tasks
> > > >>>>>>>>>>>>> (and the
> > > >>>>>>>>>>> ability
> > > >>>>>>>>>>>>> to hook into and monitor their status) related to the
> > > >>>>>>>>>>> session.timeout.ms
> > > >>>>>>>>>>>>> config?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3. To help both old and new users of Kafka Streams
> > understand
> > > >>>>>>>> this
> > > >>>>>>>>>> new
> > > >>>>>>>>>>>>> restore listener and its purpose/semantics, can we try to
> > > name
> > > >>>>>>>> the
> > > >>>>>>>>>>> class
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>    callbacks in a way that's more consistent with the
> > > >>>>>>>>>>>>> active task
> > > >>>>>>>>>> restore
> > > >>>>>>>>>>>>> listener?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3a. StandbyTaskUpdateListener:
> > > >>>>>>>>>>>>> The existing restore listener is called
> > > >>>>>>>>>>>>> StateRestoreListener, so
> > > >>>>>>>> the
> > > >>>>>>>>>>> new
> > > >>>>>>>>>>>>> one could be called something like
> > > >>>>>>>>>>>>> StandbyStateRestoreListener.
> > > >>>>>>>>>>> Although
> > > >>>>>>>>>>>>> we typically refer to standby tasks as "processing"
> rather
> > > >>>>>>>>>>>>> than
> > > >>>>>>>>>>> "restoring"
> > > >>>>>>>>>>>>> records -- ie restoration is a term for active task state
> > > >>>>>>>>>>> specifically. I
> > > >>>>>>>>>>>>> actually
> > > >>>>>>>>>>>>> like the original suggestion if we just drop the "Task"
> > > >>>>>>>>>>>>> part of
> > > >>>>>>>> the
> > > >>>>>>>>>>> name,
> > > >>>>>>>>>>>>> ie StandbyUpdateListener. I think either that or
> > > >>>>>>>>>> StandbyRestoreListener
> > > >>>>>>>>>>>>> would be fine and probably the two best options.
> > > >>>>>>>>>>>>> Also, this probably goes without saying but any change to
> > the
> > > >>>>>>>> name of
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>>>> class should of course be reflected in the
> > > KafkaStreams#setXXX
> > > >>>>>>>> API as
> > > >>>>>>>>>>> well
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3b. #onTaskCreated
> > > >>>>>>>>>>>>>    I know the "start" callback feels a bit different for
> > the
> > > >>>>>>>> standby
> > > >>>>>>>>>> task
> > > >>>>>>>>>>>>> updater vs an active task beginning restoration, but I
> > > >>>>>>>>>>>>> think we
> > > >>>>>>>>>> should
> > > >>>>>>>>>>> try
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>> keep the various callbacks aligned to their active
> restore
> > > >>>>>>>> listener
> > > >>>>>>>>>>>>> counterpart. We can/should just replace the term
> "restore"
> > > >>>>>>>>>>>>> with
> > > >>>>>>>>>>> "update"
> > > >>>>>>>>>>>>> for the
> > > >>>>>>>>>>>>> callback method names the same way we do for the class
> > name,
> > > >>>>>>>> which in
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>>>> case would give us #onUpdateStart. Personally I like this
> > > >>>>>>>>>>>>> better,
> > > >>>>>>>>>>>>> but it's ultimately up to you. However, I would push back
> > > >>>>>>>>>>>>> against
> > > >>>>>>>>>>> anything
> > > >>>>>>>>>>>>> that includes the word "Task" (eg #onTaskCreated) as the
> > > >>>>>>>>>>>>> listener
> > > >>>>>>>>>>>>>    is actually not scoped to the task itself but instead
> to
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>> individual
> > > >>>>>>>>>>>>> state store(s). This is the main reason I would prefer
> > > >>>>>>>>>>>>> calling it
> > > >>>>>>>>>>> something
> > > >>>>>>>>>>>>> like #onUpdateStart, which keeps the focus on the store
> > being
> > > >>>>>>>> updated
> > > >>>>>>>>>>>>> rather than the task that just happens to own this store
> > > >>>>>>>>>>>>> One last thing on this callback -- do we really need both
> > the
> > > >>>>>>>>>>>>> `earliestOffset` and `startingOffset`? I feel like this
> > > >>>>>>>>>>>>> might be
> > > >>>>>>>> more
> > > >>>>>>>>>>>>> confusing than it
> > > >>>>>>>>>>>>> is helpful (tbh even I'm not completely sure I know what
> > the
> > > >>>>>>>>>>> earliestOffset
> > > >>>>>>>>>>>>> is supposed to represent) More importantly, is this all
> > > >>>>>>>> information
> > > >>>>>>>>>>>>> that is already available and able to be passed in to the
> > > >>>>>>>> callback by
> > > >>>>>>>>>>>>> Streams? I haven't checked on this but it feels like the
> > > >>>>>>>>>>> earliestOffset is
> > > >>>>>>>>>>>>> likely to require a remote call, either by the embedded
> > > >>>>>>>>>>>>> consumer
> > > >>>>>>>> or
> > > >>>>>>>>>>> via the
> > > >>>>>>>>>>>>> admin client. If so, the ROI on including this parameter
> > > seems
> > > >>>>>>>>>>>>> quite low (if not outright negative)
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3c. #onBatchRestored
> > > >>>>>>>>>>>>> If we opt to use the term "update" in place of "restore"
> > > >>>>>>>> elsewhere,
> > > >>>>>>>>>>> then we
> > > >>>>>>>>>>>>> should consider doing so here as well. What do you think
> > > about
> > > >>>>>>>>>>>>> #onBatchUpdated, or even #onBatchProcessed?
> > > >>>>>>>>>>>>> I'm actually not super concerned about this particular
> API,
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>> honestly I
> > > >>>>>>>>>>>>> think we can use restore or update interchangeably here,
> so
> > > if
> > > >>>>>>>> you
> > > >>>>>>>>>>>>>    don't like any of the suggested names (and no one can
> > > >>>>>>>>>>>>> think of
> > > >>>>>>>>>>> anything
> > > >>>>>>>>>>>>> better), I would just stick with #onBatchRestored. In
> this
> > > >>>>>>>>>>>>> case,
> > > >>>>>>>>>>>>> it kind of makes the most sense.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3d. #onTaskSuspended
> > > >>>>>>>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or
> > just
> > > >>>>>>>>>>>>> #onRestoreSuspended probably makes more sense for this
> > > >>>>>>>>>>>>> callback.
> > > >>>>>>>>>> Also,
> > > >>>>>>>>>>>>>    I notice the StateRestoreListener passes in the total
> > > >>>>>>>>>>>>> number of
> > > >>>>>>>>>>> records
> > > >>>>>>>>>>>>> restored to its #onRestoreSuspended. Assuming we already
> > > track
> > > >>>>>>>>>>>>> that information in Streams and have it readily available
> > to
> > > >>>>>>>> pass in
> > > >>>>>>>>>> at
> > > >>>>>>>>>>>>> whatever point we would be invoking this callback, that
> > > >>>>>>>>>>>>> might be
> > > >>>>>>>> a
> > > >>>>>>>>>>>>> useful  parameter for the standby listener to have as
> well
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 4. I totally love the SuspendReason thing, just two
> > > >>>>>>>> notes/requests:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 4a. Feel free to push back against adding onto the scope
> of
> > > >>>>>>>>>>>>> this
> > > >>>>>>>> KIP,
> > > >>>>>>>>>>> but
> > > >>>>>>>>>>>>> it would be great to expand the active state restore
> > listener
> > > >>>>>>>> with
> > > >>>>>>>>>> this
> > > >>>>>>>>>>>>> SuspendReason enum as well. It would be really useful for
> > > both
> > > >>>>>>>>>>> variants of
> > > >>>>>>>>>>>>> restore listener
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED
> --
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>> standby
> > > >>>>>>>>>>>>> tasks it means basically the same thing, the point is
> that
> > > >>>>>>>>>>>>> active
> > > >>>>>>>>>>>>> tasks can also be recycled into standbys through the same
> > > >>>>>>>> mechanism.
> > > >>>>>>>>>>> This
> > > >>>>>>>>>>>>> way they can share the SuspendReason enum -- not that
> it's
> > > >>>>>>>>>>>>> necessary for them to share, I just think it would be a
> > good
> > > >>>>>>>> idea to
> > > >>>>>>>>>>> keep
> > > >>>>>>>>>>>>> the two restore listeners aligned to the highest degree
> > > >>>>>>>>>>>>> possible
> > > >>>>>>>> for
> > > >>>>>>>>>> as
> > > >>>>>>>>>>>>> we can.
> > > >>>>>>>>>>>>> I was actually considering proposing a short KIP with a
> new
> > > >>>>>>>>>>>>> RecyclingListener (or something) specifically for this
> > exact
> > > >>>>>>>> kind of
> > > >>>>>>>>>>> thing,
> > > >>>>>>>>>>>>> since we
> > > >>>>>>>>>>>>> currently have literally zero insight into the recycling
> > > >>>>>>>>>>>>> process.
> > > >>>>>>>>>> It's
> > > >>>>>>>>>>>>> practically impossible to tell when a store has been
> > > converted
> > > >>>>>>>> from
> > > >>>>>>>>>>> active
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>> standby, or vice versa. So having access to the
> > > SuspendReason,
> > > >>>>>>>> and
> > > >>>>>>>>>> more
> > > >>>>>>>>>>>>> importantly having a callback guaranteed to notify you
> > when a
> > > >>>>>>>>>>>>> state store is recycled whether active or standby, would
> be
> > > >>>>>>>> amazing.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for the KIP!
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> ---------- Forwarded message ---------
> > > >>>>>>>>>>>>>> From: Colt McNealy <c...@littlehorse.io>
> > > >>>>>>>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM
> > > >>>>>>>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update
> > > >>>>>>>>>>>>>> Listener
> > > >>>>>>>>>>>>>> To: <dev@kafka.apache.org>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> We would like to propose a small KIP to improve the
> > > >>>>>>>>>>>>>> ability of
> > > >>>>>>>>>>> Streams
> > > >>>>>>>>>>>>> apps
> > > >>>>>>>>>>>>>> to monitor the progress of their standby tasks through a
> > > >>>>>>>> callback
> > > >>>>>>>>>>>>>> interface.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> We have a nearly-working implementation on our fork and
> > are
> > > >>>>>>>> curious
> > > >>>>>>>>>>> for
> > > >>>>>>>>>>>>>> feedback.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>>>> Colt McNealy
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > > >>>>
> > >
> >
>

Reply via email to