Hi Matthias and everyone—

Some clarification is necessary just for posterity. It turns out that, on a
fresh standby task before we start polling for records, we wouldn't be able
to get the current end offset without a network call. This leaves us three
options:

A) Make it an Optional<Long> or use a sentinel value to mark that it's not
present.
B) Perform a network call to get the endOffset when it's not there.
C) Remove it.

Option A) seems like it could be a confusing API, especially because in the
strong majority of cases, the Optional would be empty. Option B) is
undesirable because of the performance considerations—if we're going to
make a network round trip, we might as well get some records back! That
leaves us with option C), which is the least-bad of all of them.

At LittleHorse we actually do care about the endOffset in the
onUpdateStart() method, and having it would be useful to us. However, the
work-around isn't horrible, because the endOffset will be passed into the
first call to onBatchLoaded() , which normally follows onUpdateStart()
within <100ms.

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax <mj...@apache.org> wrote:

> > parameter is somewhat irrelevant to our use case
>
> Sounds like a weird justification to change the KIP. Providing more
> information is usually better than less, so it seems it won't hurt to
> just keep it (seems useful in general to get the current end offset in
> this callback) -- you can always ignore it, if it's not relevant for
> your use case.
>
>
> -Matthias
>
>
> On 11/30/23 6:56 AM, Eduwer Camacaro wrote:
> > Hello everyone,
> >
> > We have come to the conclusion, during our work on this KIP's
> > implementation, that the #onUpdateStart callback's "currentEndOffset"
> > parameter is somewhat irrelevant to our use case. When this callback is
> > invoked, I think this value is usually unknown. Our choice to delete this
> > parameter from the #onUpdateStart callback requires an update to the KIP.
> >
> > Please feel free to review the PR and provide any comments you may have.
> :)
> > Thanks in advance
> >
> > Edu-
> >
> > On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Thanks. SGTM.
> >>
> >> On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:
> >>> That all sounds good to me! Thanks for the KIP
> >>>
> >>> On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy <c...@littlehorse.io>
> >> wrote:
> >>>
> >>>> Hi Sophie, Matthias, Bruno, and Eduwer—
> >>>>
> >>>> Thanks for your patience as I have been scrambling to catch up after a
> >> week
> >>>> of business travel (and a few days with no time to code). I'd like to
> >> tie
> >>>> up some loose ends here, but in short, I don't think the KIP document
> >>>> itself needs any changes (our internal implementation does, however).
> >>>>
> >>>> 1. In the interest of a) not changing the KIP after it's already out
> >> for a
> >>>> vote, and b) making sure our English grammar is "correct", let's stick
> >> with
> >>>> 'onBatchLoaded()`. It is the Store that gets updated, not the Batch.
> >>>>
> >>>> 2. For me (and, thankfully, the community as well) adding a remote
> >> network
> >>>> call at any point in this KIP is a non-starter. We'll ensure that
> >>>> our implementation does not introduce one.
> >>>>
> >>>> 3. I really don't like changing API behavior, even if it's not
> >> documented
> >>>> in the javadoc. As such, I am strongly against modifying the behavior
> of
> >>>> endOffsets() on the consumer as some people may implicitly depend on
> the
> >>>> contract.
> >>>> 3a. The Consumer#currentLag() method gives us exactly what we want
> >> without
> >>>> a network call (current lag from a cache, from which we can compute
> the
> >>>> offset).
> >>>>
> >>>> 4. I have no opinion about whether we should pass endOffset or
> >> currentLag
> >>>> to the callback. Either one has the same exact information inside it.
> In
> >>>> the interest of not changing the KIP after the vote has started, I'll
> >> leave
> >>>> it as endOffset.
> >>>>
> >>>> As such, I believe the KIP doesn't need any updates, nor has it been
> >>>> updated since the vote started.
> >>>>
> >>>> Would anyone else like to discuss something before the Otter Council
> >>>> adjourns regarding this matter?
> >>>>
> >>>> Cheers,
> >>>> Colt McNealy
> >>>>
> >>>> *Founder, LittleHorse.dev*
> >>>>
> >>>>
> >>>> On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
> >>>> sop...@responsive.dev>
> >>>> wrote:
> >>>>
> >>>>> Just want to checkpoint the current state of this KIP and make sure
> >> we're
> >>>>> on track to get it in to 3.7 (we still have a few weeks)  -- looks
> like
> >>>>> there are two remaining open questions, both relating to the
> >>>>> middle/intermediate callback:
> >>>>>
> >>>>> 1. What to name it: seems like the primary candidates are
> onBatchLoaded
> >>>> and
> >>>>> onBatchUpdated (and maybe also onStandbyUpdated?)
> >>>>> 2. What additional information can we pass in that would strike a
> good
> >>>>> balance between being helpful and impacting performance.
> >>>>>
> >>>>> Regarding #1, I think all of the current options are reasonable
> enough
> >>>> that
> >>>>> we should just let Colt decide which he prefers. I personally think
> >>>>> #onBatchUpdated is fine -- Bruno does make a fair point but the truth
> >> is
> >>>>> that English grammar can be sticky and while it could be argued that
> it
> >>>> is
> >>>>> the store which is updated, not the batch, I feel that it is
> perfectly
> >>>>> clear what is meant by "onBatchUpdated" and to me, this doesn't sound
> >>>> weird
> >>>>> at all. That's just my two cents in case it helps, but again,
> whatever
> >>>>> makes sense to you Colt is fine
> >>>>>
> >>>>> When it comes to #2 -- as much as I would love to dig into the
> Consumer
> >>>>> client lore and see if we can modify existing APIs or add new ones in
> >>>> order
> >>>>> to get the desired offset metadata in an efficient way, I think we're
> >>>>> starting to go down a rabbit hole that is going to expand the scope
> way
> >>>>> beyond what Colt thought he was signing up for. I would advocate to
> >> focus
> >>>>> on just the basic feature for now and drop the end-offset from the
> >>>>> callback. Once we have a standby listener it will be easy to expand
> on
> >>>> with
> >>>>> a followup KIP if/when we find an efficient way to add additional
> >> useful
> >>>>> information. I think it will also become more clear what is and isn't
> >>>>> useful after more people get to using it in the real world
> >>>>>
> >>>>> Colt/Eduwer: how necessary is receiving the end offset during a batch
> >>>>> update to your own application use case?
> >>>>>
> >>>>> Also, for those who really do need to check the current end offset, I
> >>>>> believe in theory you should be able to use the KafkaStreams#metrics
> >> API
> >>>> to
> >>>>> get the current lag and/or end offset for the changelog -- it's
> >> possible
> >>>>> this does not represent the most up-to-date end offset (I'm not sure
> it
> >>>>> does or does not), but it should be close enough to be reliable and
> >>>> useful
> >>>>> for the purpose of monitoring -- I mean it is a metric, after all.
> >>>>>
> >>>>> Hope this helps -- in the end, it's up to you (Colt) to decide what
> you
> >>>>> want to bring in scope or not. We still have more than 3 weeks until
> >> the
> >>>>> KIP freeze as currently proposed, so in theory you could even
> implement
> >>>>> this KIP without the end offset and then do a followup KIP to add the
> >> end
> >>>>> offset within the same release, ie without any deprecations. There
> are
> >>>>> plenty of paths forward here, so don't let us drag this out forever
> if
> >>>> you
> >>>>> know what you want
> >>>>>
> >>>>> Cheers,
> >>>>> Sophie
> >>>>>
> >>>>> On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Forgot one thing:
> >>>>>>
> >>>>>> We could also pass `currentLag()` into `onBachLoaded()` instead of
> >>>>>> end-offset.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 10/20/23 10:56 AM, Matthias J. Sax wrote:
> >>>>>>> Thanks for digging into this Bruno.
> >>>>>>>
> >>>>>>> The JavaDoc on the consumer does not say anything specific about
> >>>>>>> `endOffset` guarantees:
> >>>>>>>
> >>>>>>>> Get the end offsets for the given partitions. In the default
> {@code
> >>>>>>>> read_uncommitted} isolation level, the end
> >>>>>>>> offset is the high watermark (that is, the offset of the last
> >>>>>>>> successfully replicated message plus one). For
> >>>>>>>> {@code read_committed} consumers, the end offset is the last
> stable
> >>>>>>>> offset (LSO), which is the minimum of
> >>>>>>>> the high watermark and the smallest offset of any open
> transaction.
> >>>>>>>> Finally, if the partition has never been
> >>>>>>>> written to, the end offset is 0.
> >>>>>>>
> >>>>>>> Thus, I actually believe that it would be ok to change the
> >>>>>>> implementation and serve the answer from the `TopicPartitionState`?
> >>>>>>>
> >>>>>>> Another idea would be, to use `currentLag()` in combination with
> >>>>>>> `position()` (or the offset of the last read record) to compute the
> >>>>>>> end-offset of the fly?
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 10/20/23 4:00 AM, Bruno Cadonna wrote:
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Matthias is correct that the end offsets are stored somewhere in
> the
> >>>>>>>> metadata of the consumer. More precisely, they are stored in the
> >>>>>>>> `TopicPartitionState`. However, I could not find public API on the
> >>>>>>>> consumer other than currentLag() that uses the stored end offsets.
> >>>> If
> >>>>>>>> I understand the code correctly, method endOffSets() always
> >>>> triggers a
> >>>>>>>> remote call.
> >>>>>>>>
> >>>>>>>> I am a bit concerned about doing remote calls every
> >>>>> commit.interval.ms
> >>>>>>>> (by default 200ms under EOS). At the moment the remote calls are
> >>>> only
> >>>>>>>> issued if an optimization for KTables is turned on where changelog
> >>>>>>>> topics are replaced with the input topic of the KTable. The
> current
> >>>>>>>> remote calls retrieve all committed offsets of the group at once.
> >>>> If I
> >>>>>>>> understand correctly, that is one single remote call. Remote calls
> >>>> for
> >>>>>>>> getting end offsets of changelog topics -- as I understand you are
> >>>>>>>> planning to issue -- will probably result in multiple remote calls
> >>>> to
> >>>>>>>> multiple leaders of the changelog topic partitions.
> >>>>>>>>
> >>>>>>>> Please correct me if I misunderstood anything of the above.
> >>>>>>>>
> >>>>>>>> If my understanding is correct, I propose to modify the consumer
> in
> >>>>>>>> such a way to get the end offset from the locally stored metadata
> >>>>>>>> whenever possible as part of the implementation of this KIP. I do
> >>>> not
> >>>>>>>> know what the implications are of such a change of the consumer
> and
> >>>> if
> >>>>>>>> a KIP is needed for it. Maybe, endOffsets() guarantees to return
> the
> >>>>>>>> freshest end offsets possible, which would not be satisfied with
> the
> >>>>>>>> modification.
> >>>>>>>>
> >>>>>>>> Regarding the naming, I do not completely agree with Matthias.
> While
> >>>>>>>> the pattern might be consistent with onBatchUpdated, what is the
> >>>>>>>> meaning of onBatchUpdated? Is the batch updated? The names
> >>>>>>>> onBatchLoaded or onBatchWritten or onBatchAdded are more clear
> IMO.
> >>>>>>>> With "restore" the pattern works better. If I restore a batch of
> >>>>>>>> records in a state, the records are not there although they should
> >>>> be
> >>>>>>>> there and I add them. If I update a batch of records in a state.
> >>>> This
> >>>>>>>> sounds like the batch of records is in the state and I modify the
> >>>>>>>> existing records within the state. That is clearly not the meaning
> >>>> of
> >>>>>>>> the event for which the listener should be called.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 10/19/23 2:12 AM, Matthias J. Sax wrote:
> >>>>>>>>> Thanks for the KIP. Seems I am almost late to the party.
> >>>>>>>>>
> >>>>>>>>> About naming (fun, fun, fun): I like the current proposal
> overall,
> >>>>>>>>> except `onBachLoaded`, but would prefer `onBatchUpdated`. It
> better
> >>>>>>>>> aligns to everything else:
> >>>>>>>>>
> >>>>>>>>>     - it's an update-listener, not loaded-listener
> >>>>>>>>>     - `StateRestoreListener` has `onRestoreStart`,
> `onRestoreEnd`,
> >>>>>>>>> `onRestoreSuspended, and `onBachRestored` (it's very consistent
> >>>>>>>>>     - `StandbyUpdateListener` should have `onUpdateStart`,
> >>>>>>>>> `onUpdateSuspended` and `onBatchUpdated`  to be equally
> consistent
> >>>>>>>>> (using "loaded" breaks the pattern)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> About the end-offset question: I am relatively sure that the
> >>>> consumer
> >>>>>>>>> gets the latest end-offset as attached metadata in every fetch
> >>>>>>>>> response. (We exploit this behavior to track end-offsets for
> input
> >>>>>>>>> topic with regard to `max.task.idle.ms` without overhead -- it
> was
> >>>>>>>>> also a concern when we did the corresponding KIP how we could
> track
> >>>>>>>>> lag with no overhead).
> >>>>>>>>>
> >>>>>>>>> Thus, I believe we would "just" need to modify the code
> accordingly
> >>>>>>>>> to get this information from the restore-consumer
> >>>>>>>>> (`restorConsumer.endOffsets(...)`; should be served w/o RPC but
> >>>> from
> >>>>>>>>> internal metadata cache) for free, and pass into the listener.
> >>>>>>>>>
> >>>>>>>>> Please double check / verify this claim and keep me honest about
> >>>> it.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 10/17/23 6:38 AM, Eduwer Camacaro wrote:
> >>>>>>>>>> Hi Bruno,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for your observation; surely it will require a network
> call
> >>>>>>>>>> using
> >>>>>>>>>> the admin client in order to know this "endOffset" and that will
> >>>>>>>>>> have an
> >>>>>>>>>> impact on performance. We can either find a solution that has a
> >>>> low
> >>>>>>>>>> impact
> >>>>>>>>>> on performance or ideally zero impact; unfortunately, I don't
> see
> >>>> a
> >>>>>>>>>> way to
> >>>>>>>>>> have zero impact on performance. However, we can leverage the
> >>>>> existing
> >>>>>>>>>> #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses
> >>>> the
> >>>>>>>>>> admin
> >>>>>>>>>> client to ask for these "endOffset"s. As far I can understand,
> >>>> this
> >>>>>>>>>> update
> >>>>>>>>>> is done periodically using the "commit.interval.ms"
> >>>> configuration.
> >>>>> I
> >>>>>>>>>> believe this option will force us to invoke StandbyUpdateLister
> >>>> once
> >>>>>>>>>> this
> >>>>>>>>>> interval is reached.
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <
> cado...@apache.org
> >>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks for the KIP, Colt and Eduwer,
> >>>>>>>>>>>
> >>>>>>>>>>> Are you sure there is also not a significant performance impact
> >>>> for
> >>>>>>>>>>> passing into the callback `currentEndOffset`?
> >>>>>>>>>>>
> >>>>>>>>>>> I am asking because the comment here:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129
> >>>>>>>>>>>
> >>>>>>>>>>> says that the end-offset is only updated once for standby tasks
> >>>>> whose
> >>>>>>>>>>> changelog topic is not piggy-backed on input topics. I could
> also
> >>>>> not
> >>>>>>>>>>> find the update of end-offset for those standbys.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>> On 10/16/23 10:55 AM, Lucas Brutschy wrote:
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>> it's a nice improvement! I don't have anything to add on top
> of
> >>>>> the
> >>>>>>>>>>>> previous comments, just came here to say that it seems to me
> >>>>>>>>>>>> consensus
> >>>>>>>>>>>> has been reached and the result looks good to me.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks Colt and Eduwer!
> >>>>>>>>>>>> Lucas
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <
> >>>> c...@littlehorse.io
> >>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks, Guozhang. I've updated the KIP and will start a vote.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <
> >>>>>>>>>>> guozhang.wang...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the summary, that looks good to me.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <
> >>>>> c...@littlehorse.io
> >>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello there!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks everyone for the comments. There's a lot of
> >>>>> back-and-forth
> >>>>>>>>>>> going
> >>>>>>>>>>>>>> on,
> >>>>>>>>>>>>>>> so I'll do my best to summarize what everyone's said in
> TLDR
> >>>>>>>>>>>>>>> format:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,
> and
> >>>>> do
> >>>>>>>>>>>>>> similarly
> >>>>>>>>>>>>>>> for the other methods.
> >>>>>>>>>>>>>>> 2. Keep `SuspendReason.PROMOTED` and
> >>>> `SuspendReason.MIGRATED`.
> >>>>>>>>>>>>>>> 3. Remove the `earliestOffset` parameter for performance
> >>>>> reasons.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If that's all fine with everyone, I'll update the KIP and
> >>>>>> we—well,
> >>>>>>>>>>> mostly
> >>>>>>>>>>>>>>> Edu (:  —will open a PR.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <
> >>>>>>>>>>> edu...@littlehorse.io>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hello everyone,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for all your feedback for this KIP!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I think that the key to choosing proper names for this API
> >>>> is
> >>>>>>>>>>>>>> understanding
> >>>>>>>>>>>>>>>> the terms used inside the StoreChangelogReader. Currently,
> >>>>>>>>>>>>>>>> this class
> >>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>> two possible states: ACTIVE_RESTORING and
> STANDBY_UPDATING.
> >>>> In
> >>>>>> my
> >>>>>>>>>>>>>> opinion,
> >>>>>>>>>>>>>>>> using StandbyUpdateListener for the interface fits better
> on
> >>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>> terms.
> >>>>>>>>>>>>>>>> Same applies for onUpdateStart/Suspended.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> StoreChangelogReader uses "the same mechanism" for active
> >>>> task
> >>>>>>>>>>>>>> restoration
> >>>>>>>>>>>>>>>> and standby task updates, but this is an implementation
> >>>>>>>>>>>>>>>> detail. Under
> >>>>>>>>>>>>>>>> normal circumstances (no rebalances or task migrations),
> the
> >>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>> reader will be in STANDBY_UPDATING, which means it will be
> >>>>>>>>>>>>>>>> updating
> >>>>>>>>>>>>>> standby
> >>>>>>>>>>>>>>>> tasks as long as there are new records in the changelog
> >>>> topic.
> >>>>>>>>>>>>>>>> That's
> >>>>>>>>>>>>>> why I
> >>>>>>>>>>>>>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if
> >>>> it
> >>>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>> 100%
> >>>>>>>>>>>>>>>> align with StateRestoreListener, but either one is fine.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Edu
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> >>>>>>>>>>>>>> guozhang.wang...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hello Colt,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for writing the KIP! I have read through the
> updated
> >>>>>>>>>>>>>>>>> KIP and
> >>>>>>>>>>>>>>>>> overall it looks great. I only have minor naming comments
> >>>>>> (well,
> >>>>>>>>>>>>>>>>> aren't naming the least boring stuff to discuss and that
> >>>>>>>>>>>>>>>>> takes the
> >>>>>>>>>>>>>>>>> most of the time for KIPs :P):
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. I tend to agree with Sophie regarding whether or not
> to
> >>>>>>>>>>>>>>>>> include
> >>>>>>>>>>>>>>>>> "Standby" in the functions of
> >>>>> "onStandbyUpdateStart/Suspended",
> >>>>>>>>>>> since
> >>>>>>>>>>>>>>>>> it is also more consistent with the functions of
> >>>>>>>>>>>>>>>>> "StateRestoreListener" where we do not name it as
> >>>>>>>>>>>>>>>>> "onStateRestoreState" etc.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. I know in community discussions we sometimes say "a
> >>>>>>>>>>>>>>>>> standby is
> >>>>>>>>>>>>>>>>> promoted to active", but in the official code / java docs
> >>>> we
> >>>>>>>>>>>>>>>>> did not
> >>>>>>>>>>>>>>>>> have a term of "promotion", since what the code does is
> >>>>> really
> >>>>>>>>>>>>>> recycle
> >>>>>>>>>>>>>>>>> the task (while keeping its state stores open), and
> create
> >>>> a
> >>>>>> new
> >>>>>>>>>>>>>>>>> active task that takes in the recycled state stores and
> >>>> just
> >>>>>>>>>>> changing
> >>>>>>>>>>>>>>>>> the other fields like task type etc. After thinking about
> >>>>>>>>>>>>>>>>> this for a
> >>>>>>>>>>>>>>>>> bit, I tend to feel that "promoted" is indeed a better
> name
> >>>>>>>>>>>>>>>>> for user
> >>>>>>>>>>>>>>>>> facing purposes while "recycle" is more of a technical
> >>>> detail
> >>>>>>>>>>>>>>>>> inside
> >>>>>>>>>>>>>>>>> the code and could be abstracted away from users. So I
> feel
> >>>>>>>>>>>>>>>>> keeping
> >>>>>>>>>>>>>>>>> the name "PROMOTED" is fine.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. Regarding "earliestOffset", it does feel like we
> cannot
> >>>>>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>> avoid another call to the Kafka API. And on the other
> >>>> hand, I
> >>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>> tend to think that such bookkeeping may be better done at
> >>>> the
> >>>>>>>>>>>>>>>>> app
> >>>>>>>>>>>>>>>>> level than from the Streams' public API level. I.e. the
> app
> >>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>> a "first ever starting offset" per
> "topic-partition-store"
> >>>>>>>>>>>>>>>>> key, and
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> when we have rolling restart and hence some standby task
> >>>>> keeps
> >>>>>>>>>>>>>>>>> "jumping" from one client to another via task assignment,
> >>>> the
> >>>>>>>>>>>>>>>>> app
> >>>>>>>>>>>>>>>>> would update this value just one when it finds the
> >>>>>>>>>>>>>>>>> ""topic-partition-store" was never triggered before. What
> >>>> do
> >>>>>> you
> >>>>>>>>>>>>>>>>> think?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 4. I do not have a strong opinion either, but what about
> >>>>>>>>>>>>>>>> "onBatchUpdated" ?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy
> >>>>>>>>>>>>>>>>> <c...@littlehorse.io>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Sohpie—
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thank you very much for such a detailed review of the
> KIP.
> >>>>>>>>>>>>>>>>>> It might
> >>>>>>>>>>>>>>>>>> actually be longer than the original KIP in the first
> >>>> place!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. Ack'ed and fixed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2. Correct, this is a confusing passage and requires
> >>>>> context:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> One thing on our list of TODO's regarding reliability is
> >>>> to
> >>>>>>>>>>>>>> determine
> >>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> to configure `session.timeout.ms`. In our Kubernetes
> >>>>>>>>>>>>>>>>>> Environment,
> >>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>> instance of our Streams App can be terminated,
> restarted,
> >>>>>>>>>>>>>>>>>> and get
> >>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>> the "RUNNING" Streams state in about 20 seconds. We have
> >>>> two
> >>>>>>>>>>>>>> options
> >>>>>>>>>>>>>>>>> here:
> >>>>>>>>>>>>>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal
> >>>>> with
> >>>>>> 20
> >>>>>>>>>>>>>>>> seconds
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> unavailability for affected partitions, but avoid
> >>>> shuffling
> >>>>>>>>>>>>>>>>>> Tasks;
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>> b)
> >>>>>>>>>>>>>>>>>> set session.timeout.ms to a low value, such as 6
> seconds
> >>>> (
> >>>>>>>>>>>>>>>>>> heartbeat.interval.ms of 2000), and reduce the
> >>>>> unavailability
> >>>>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>> during
> >>>>>>>>>>>>>>>>>> a rolling bounce but incur an "extra" rebalance. There
> are
> >>>>>>>>>>>>>>>>>> several
> >>>>>>>>>>>>>>>>>> different costs to a rebalance, including the shuffling
> of
> >>>>>>>>>>>>>>>>>> standby
> >>>>>>>>>>>>>>>> tasks.
> >>>>>>>>>>>>>>>>>> JMX metrics are not fine-grained enough to give us an
> >>>>> accurate
> >>>>>>>>>>>>>> picture
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> what's going on with the whole Standby Task Shuffle
> >>>> Dance. I
> >>>>>>>>>>>>>>>> hypothesize
> >>>>>>>>>>>>>>>>>> that the Standby Update Listener might help us clarify
> >>>> just
> >>>>>>>>>>>>>>>>>> how the
> >>>>>>>>>>>>>>>>>> shuffling actually (not theoretically) works, which will
> >>>>>>>>>>>>>>>>>> help us
> >>>>>>>>>>>>>> make a
> >>>>>>>>>>>>>>>>>> more informed decision about the session timeout config.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If you think this is worth putting in the KIP, I'll
> polish
> >>>>>>>>>>>>>>>>>> it and
> >>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>> so;
> >>>>>>>>>>>>>>>>>> else, I'll remove the current half-baked explanation.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3. Overall, I agree with this. In our app, each Task has
> >>>>>>>>>>>>>>>>>> only one
> >>>>>>>>>>>>>> Store
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> reduce the number of changelog partitions, so I
> sometimes
> >>>>>>>>>>>>>>>>>> forget
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> distinction between the two concepts, as reflected in
> the
> >>>>>>>>>>>>>>>>>> KIP (:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3a. I don't like the word "Restore" here, since
> >>>> Restoration
> >>>>>>>>>>>>>>>>>> refers
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>> Active Task getting caught up in preparation to resume
> >>>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>> `StandbyUpdateListener` is fine by me; I have updated
> the
> >>>>>>>>>>>>>>>>>> KIP. I
> >>>>>>>>>>>>>> am a
> >>>>>>>>>>>>>>>>>> native Python speaker so I do prefer shorter names
> anyways
> >>>>> (:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3b1. +1 to removing the word 'Task'.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3b2. I like `onUpdateStart()`, but with your permission
> >>>> I'd
> >>>>>>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>> `onStandbyUpdateStart()` which matches the name of the
> >>>>>>>>>>>>>>>>>> Interface
> >>>>>>>>>>>>>>>>>> "StandbyUpdateListener". (the python part of me hates
> >>>> this,
> >>>>>>>>>>>>>> however)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3b3. Going back to question 2), `earliestOffset` was
> >>>>>>>>>>>>>>>>>> intended to
> >>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>> to more easily calculate the amount of state _already
> >>>>>>>>>>>>>>>>>> loaded_ in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>> by subtracting (startingOffset - earliestOffset). This
> >>>> would
> >>>>>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>>>> how much inefficiency is introduced in a rolling
> >>>> restart—if
> >>>>>>>>>>>>>>>>>> we end
> >>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>> going
> >>>>>>>>>>>>>>>>>> from a situation with an up-to-date standby before the
> >>>>>>>>>>>>>>>>>> restart, and
> >>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>> after the whole restart, the Task is shuffled onto an
> >>>>> instance
> >>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>> is no previous state, then that is expensive. However,
> if
> >>>>>>>>>>>>>>>>>> the final
> >>>>>>>>>>>>>>>>>> shuffling results in the Task back on an instance with a
> >>>> lot
> >>>>>> of
> >>>>>>>>>>>>>>>> pre-built
> >>>>>>>>>>>>>>>>>> state, it's not expensive.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If a call over the network is required to determine the
> >>>>>>>>>>>>>> earliestOffset,
> >>>>>>>>>>>>>>>>>> then this is a "hard no-go" for me, and we will remove
> it
> >>>>>> (I'll
> >>>>>>>>>>>>>> have to
> >>>>>>>>>>>>>>>>>> check with Eduwer as he is close to having a working
> >>>>>>>>>>>>>> implementation). I
> >>>>>>>>>>>>>>>>>> think we can probably determine what we wanted to see
> in a
> >>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>> way, but it will take more thinking.. If
> `earliestOffset`
> >>>> is
> >>>>>>>>>>>>>> confusing,
> >>>>>>>>>>>>>>>>>> perhaps rename it to `earliestChangelogOffset`?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> `startingOffset` is easy to remove as it can be
> determined
> >>>>>>>>>>>>>>>>>> from the
> >>>>>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Anyways, I've updated the JavaDoc in the interface;
> >>>>>>>>>>>>>>>>>> hopefully it's
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>> clear. Awaiting further instructions here.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3c. Good point; after thinking, my preference is
> >>>>>>>>>>>>>> `onBatchLoaded()`  ->
> >>>>>>>>>>>>>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` ->
> >>>>>>>>>>>>>>>>>> `onBatchRestored()`.
> >>>>>>>>>>>>>> I am
> >>>>>>>>>>>>>>>>>> less fond of "processed" because when I was first
> learning
> >>>>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>> mistakenly thought that standby tasks actually processed
> >>>> the
> >>>>>>>>>>>>>>>>>> input
> >>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>> rather than loaded from the changelog. I'll defer to you
> >>>>> here.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet
> >>>>>>>>>>>>>>>>>> `onStandbyUpdateSuspended()`. Will check about the
> >>>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> keeping track of the number of records loaded.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 4a. I think this might be best in a separate KIP,
> >>>> especially
> >>>>>>>>>>>>>>>>>> given
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> this is my and Eduwer's first time contributing to Kafka
> >>>> (so
> >>>>>> we
> >>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>>> minimize the blast radius).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 4b. I might respectfully (and timidly) push back here,
> >>>>>> RECYCLED
> >>>>>>>>>>>>>> for an
> >>>>>>>>>>>>>>>>>> Active Task is a bit confusing to me. DEMOTED and
> MIGRATED
> >>>>>> make
> >>>>>>>>>>>>>> sense
> >>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>> the standpoint of an Active Task, recycling to me sounds
> >>>>> like
> >>>>>>>>>>>>>> throwing
> >>>>>>>>>>>>>>>>>> stuff away, such that the resources (i.e. disk space)
> can
> >>>> be
> >>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>> by a
> >>>>>>>>>>>>>>>>>> separate Task. As an alternative rather than trying to
> >>>> reuse
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>> enum,
> >>>>>>>>>>>>>>>>>> maybe rename it to `StandbySuspendReason` to avoid
> naming
> >>>>>>>>>>>>>>>>>> conflicts
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> `ActiveSuspendReason`? However, I could be convinced to
> >>>>> rename
> >>>>>>>>>>>>>> PROMOTED
> >>>>>>>>>>>>>>>>> ->
> >>>>>>>>>>>>>>>>>> RECYCLED, especially if Eduwer agrees.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> TLDR:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> T1. Agreed, will remove the word "Task" as it's
> incorrect.
> >>>>>>>>>>>>>>>>>> T2. Will update to `onStandbyUpdateStart()`
> >>>>>>>>>>>>>>>>>> T3. Awaiting further instructions on earliestOffset and
> >>>>>>>>>>>>>> startingOffset.
> >>>>>>>>>>>>>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps
> >>>>>>>>>>>>>>>>> `onBatchLoaded()`?
> >>>>>>>>>>>>>>>>>> T5. Will update to `onStandbyUpdateSuspended()`
> >>>>>>>>>>>>>>>>>> T6. Thoughts on renaming SuspendReason to
> >>>>>> StandbySuspendReason,
> >>>>>>>>>>>>>> rather
> >>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>> renaming PROMOTED to RECYCLED? @Eduwer?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Long Live the Otter,
> >>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> >>>>>>>>>>>>>>>>> sop...@responsive.dev>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great
> >>>>>>>>>>>>>>>>>>> addition to
> >>>>>>>>>>>>>>>>> Streams, I
> >>>>>>>>>>>>>>>>>>> can't believe we've gone so long without this.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Overall the proposal makes sense, but I had a handful
> of
> >>>>>>>>>>>>>>>>>>> fairly
> >>>>>>>>>>>>>> minor
> >>>>>>>>>>>>>>>>>>> questions and suggestions/requests
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of
> >>>> the
> >>>>>>>>>>>>>>>> Motivation
> >>>>>>>>>>>>>>>>>>> section is cut off and incomplete -- "want to be able
> to
> >>>>>>>>>>>>>>>>>>> know "
> >>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>> exactly?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2. This isn't that important since the motivation as a
> >>>>>>>>>>>>>>>>>>> whole is
> >>>>>>>>>>>>>> clear
> >>>>>>>>>>>>>>>>> to me
> >>>>>>>>>>>>>>>>>>> and convincing enough, but I'm not quite sure I
> >>>> understand
> >>>>>> the
> >>>>>>>>>>>>>>>> example
> >>>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>> the end of the Motivation section. How are standby
> tasks
> >>>>>>>>>>>>>>>>>>> (and the
> >>>>>>>>>>>>>>>>> ability
> >>>>>>>>>>>>>>>>>>> to hook into and monitor their status) related to the
> >>>>>>>>>>>>>>>>> session.timeout.ms
> >>>>>>>>>>>>>>>>>>> config?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3. To help both old and new users of Kafka Streams
> >>>>> understand
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>> restore listener and its purpose/semantics, can we try
> to
> >>>>>> name
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> class
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>      callbacks in a way that's more consistent with the
> >>>>>>>>>>>>>>>>>>> active task
> >>>>>>>>>>>>>>>> restore
> >>>>>>>>>>>>>>>>>>> listener?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3a. StandbyTaskUpdateListener:
> >>>>>>>>>>>>>>>>>>> The existing restore listener is called
> >>>>>>>>>>>>>>>>>>> StateRestoreListener, so
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>> one could be called something like
> >>>>>>>>>>>>>>>>>>> StandbyStateRestoreListener.
> >>>>>>>>>>>>>>>>> Although
> >>>>>>>>>>>>>>>>>>> we typically refer to standby tasks as "processing"
> >>>> rather
> >>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>> "restoring"
> >>>>>>>>>>>>>>>>>>> records -- ie restoration is a term for active task
> state
> >>>>>>>>>>>>>>>>> specifically. I
> >>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>> like the original suggestion if we just drop the "Task"
> >>>>>>>>>>>>>>>>>>> part of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> name,
> >>>>>>>>>>>>>>>>>>> ie StandbyUpdateListener. I think either that or
> >>>>>>>>>>>>>>>> StandbyRestoreListener
> >>>>>>>>>>>>>>>>>>> would be fine and probably the two best options.
> >>>>>>>>>>>>>>>>>>> Also, this probably goes without saying but any change
> to
> >>>>> the
> >>>>>>>>>>>>>> name of
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> class should of course be reflected in the
> >>>>>> KafkaStreams#setXXX
> >>>>>>>>>>>>>> API as
> >>>>>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3b. #onTaskCreated
> >>>>>>>>>>>>>>>>>>>      I know the "start" callback feels a bit different
> for
> >>>>> the
> >>>>>>>>>>>>>> standby
> >>>>>>>>>>>>>>>> task
> >>>>>>>>>>>>>>>>>>> updater vs an active task beginning restoration, but I
> >>>>>>>>>>>>>>>>>>> think we
> >>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> try
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> keep the various callbacks aligned to their active
> >>>> restore
> >>>>>>>>>>>>>> listener
> >>>>>>>>>>>>>>>>>>> counterpart. We can/should just replace the term
> >>>> "restore"
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> "update"
> >>>>>>>>>>>>>>>>>>> for the
> >>>>>>>>>>>>>>>>>>> callback method names the same way we do for the class
> >>>>> name,
> >>>>>>>>>>>>>> which in
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> case would give us #onUpdateStart. Personally I like
> this
> >>>>>>>>>>>>>>>>>>> better,
> >>>>>>>>>>>>>>>>>>> but it's ultimately up to you. However, I would push
> back
> >>>>>>>>>>>>>>>>>>> against
> >>>>>>>>>>>>>>>>> anything
> >>>>>>>>>>>>>>>>>>> that includes the word "Task" (eg #onTaskCreated) as
> the
> >>>>>>>>>>>>>>>>>>> listener
> >>>>>>>>>>>>>>>>>>>      is actually not scoped to the task itself but
> instead
> >>>> to
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> individual
> >>>>>>>>>>>>>>>>>>> state store(s). This is the main reason I would prefer
> >>>>>>>>>>>>>>>>>>> calling it
> >>>>>>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>>>>> like #onUpdateStart, which keeps the focus on the store
> >>>>> being
> >>>>>>>>>>>>>> updated
> >>>>>>>>>>>>>>>>>>> rather than the task that just happens to own this
> store
> >>>>>>>>>>>>>>>>>>> One last thing on this callback -- do we really need
> both
> >>>>> the
> >>>>>>>>>>>>>>>>>>> `earliestOffset` and `startingOffset`? I feel like this
> >>>>>>>>>>>>>>>>>>> might be
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> confusing than it
> >>>>>>>>>>>>>>>>>>> is helpful (tbh even I'm not completely sure I know
> what
> >>>>> the
> >>>>>>>>>>>>>>>>> earliestOffset
> >>>>>>>>>>>>>>>>>>> is supposed to represent) More importantly, is this all
> >>>>>>>>>>>>>> information
> >>>>>>>>>>>>>>>>>>> that is already available and able to be passed in to
> the
> >>>>>>>>>>>>>> callback by
> >>>>>>>>>>>>>>>>>>> Streams? I haven't checked on this but it feels like
> the
> >>>>>>>>>>>>>>>>> earliestOffset is
> >>>>>>>>>>>>>>>>>>> likely to require a remote call, either by the embedded
> >>>>>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> via the
> >>>>>>>>>>>>>>>>>>> admin client. If so, the ROI on including this
> parameter
> >>>>>> seems
> >>>>>>>>>>>>>>>>>>> quite low (if not outright negative)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3c. #onBatchRestored
> >>>>>>>>>>>>>>>>>>> If we opt to use the term "update" in place of
> "restore"
> >>>>>>>>>>>>>> elsewhere,
> >>>>>>>>>>>>>>>>> then we
> >>>>>>>>>>>>>>>>>>> should consider doing so here as well. What do you
> think
> >>>>>> about
> >>>>>>>>>>>>>>>>>>> #onBatchUpdated, or even #onBatchProcessed?
> >>>>>>>>>>>>>>>>>>> I'm actually not super concerned about this particular
> >>>> API,
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> honestly I
> >>>>>>>>>>>>>>>>>>> think we can use restore or update interchangeably
> here,
> >>>> so
> >>>>>> if
> >>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>      don't like any of the suggested names (and no one
> can
> >>>>>>>>>>>>>>>>>>> think of
> >>>>>>>>>>>>>>>>> anything
> >>>>>>>>>>>>>>>>>>> better), I would just stick with #onBatchRestored. In
> >>>> this
> >>>>>>>>>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>>>> it kind of makes the most sense.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3d. #onTaskSuspended
> >>>>>>>>>>>>>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or
> >>>>> just
> >>>>>>>>>>>>>>>>>>> #onRestoreSuspended probably makes more sense for this
> >>>>>>>>>>>>>>>>>>> callback.
> >>>>>>>>>>>>>>>> Also,
> >>>>>>>>>>>>>>>>>>>      I notice the StateRestoreListener passes in the
> total
> >>>>>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>> restored to its #onRestoreSuspended. Assuming we
> already
> >>>>>> track
> >>>>>>>>>>>>>>>>>>> that information in Streams and have it readily
> available
> >>>>> to
> >>>>>>>>>>>>>> pass in
> >>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>> whatever point we would be invoking this callback, that
> >>>>>>>>>>>>>>>>>>> might be
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> useful  parameter for the standby listener to have as
> >>>> well
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 4. I totally love the SuspendReason thing, just two
> >>>>>>>>>>>>>> notes/requests:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 4a. Feel free to push back against adding onto the
> scope
> >>>> of
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>> KIP,
> >>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>> it would be great to expand the active state restore
> >>>>> listener
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> SuspendReason enum as well. It would be really useful
> for
> >>>>>> both
> >>>>>>>>>>>>>>>>> variants of
> >>>>>>>>>>>>>>>>>>> restore listener
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to
> RECYCLED
> >>>> --
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> standby
> >>>>>>>>>>>>>>>>>>> tasks it means basically the same thing, the point is
> >>>> that
> >>>>>>>>>>>>>>>>>>> active
> >>>>>>>>>>>>>>>>>>> tasks can also be recycled into standbys through the
> same
> >>>>>>>>>>>>>> mechanism.
> >>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>> way they can share the SuspendReason enum -- not that
> >>>> it's
> >>>>>>>>>>>>>>>>>>> necessary for them to share, I just think it would be a
> >>>>> good
> >>>>>>>>>>>>>> idea to
> >>>>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>>> the two restore listeners aligned to the highest degree
> >>>>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>> we can.
> >>>>>>>>>>>>>>>>>>> I was actually considering proposing a short KIP with a
> >>>> new
> >>>>>>>>>>>>>>>>>>> RecyclingListener (or something) specifically for this
> >>>>> exact
> >>>>>>>>>>>>>> kind of
> >>>>>>>>>>>>>>>>> thing,
> >>>>>>>>>>>>>>>>>>> since we
> >>>>>>>>>>>>>>>>>>> currently have literally zero insight into the
> recycling
> >>>>>>>>>>>>>>>>>>> process.
> >>>>>>>>>>>>>>>> It's
> >>>>>>>>>>>>>>>>>>> practically impossible to tell when a store has been
> >>>>>> converted
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> active
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> standby, or vice versa. So having access to the
> >>>>>> SuspendReason,
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> importantly having a callback guaranteed to notify you
> >>>>> when a
> >>>>>>>>>>>>>>>>>>> state store is recycled whether active or standby,
> would
> >>>> be
> >>>>>>>>>>>>>> amazing.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the KIP!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P"
> Blee-Goldman
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> ---------- Forwarded message ---------
> >>>>>>>>>>>>>>>>>>>> From: Colt McNealy <c...@littlehorse.io>
> >>>>>>>>>>>>>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM
> >>>>>>>>>>>>>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update
> >>>>>>>>>>>>>>>>>>>> Listener
> >>>>>>>>>>>>>>>>>>>> To: <dev@kafka.apache.org>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We would like to propose a small KIP to improve the
> >>>>>>>>>>>>>>>>>>>> ability of
> >>>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>>> apps
> >>>>>>>>>>>>>>>>>>>> to monitor the progress of their standby tasks
> through a
> >>>>>>>>>>>>>> callback
> >>>>>>>>>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We have a nearly-working implementation on our fork
> and
> >>>>> are
> >>>>>>>>>>>>>> curious
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> feedback.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thank you,
> >>>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to