Hi everyone,

As discussed on the Zoom call, we're going to handle rebalance meta-data by:

- On start-up, Streams will open each store and read its changelog offsets
into an in-memory cache. This cache will be shared among all StreamThreads.
- On rebalance, the cache will be consulted for Task offsets for any Task
that is not active on any instance-local StreamThreads. If the Task is
active on *any* instance-local StreamThread, we will report the Task lag as
"up to date" (i.e. -1), because we know that the local state is currently
up-to-date.

We will avoid caching offsets across restarts in the legacy ".checkpoint"
file, so that we can eliminate the logic for handling this class. If
performance of opening/closing many state stores is poor, we can
parallelise it by forking off a thread for each Task directory when reading
the offsets.

I'll update the KIP later today to reflect this design, but I will try to
keep it high-level, so that the exact implementation can vary.

Regards,

Nick

On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> 103: I like the idea of immediately deprecating #managesOffsets and aiming
> to make offset management mandatory in the long run. I assume we would also
> log a warning for any custom stores that return "false" from this method to
> encourage custom store implementations to start doing so? My only
> question/concern is that if we want folks to start managing their own
> offsets then we should make this transition easy for them, perhaps by
> exposing some public utility APIs for things that are currently handled by
> Kafka Streams such as reading/writing checkpoint files. Maybe it would be
> useful to include a small example in the KIP of what it would actually mean
> to "manage your own offsets" -- I know (all too well) that plugging in
> custom storage implementations is not easy and most people who do this are
> probably fairly advanced users, but offset management will be a totally new
> ballgame to most people people and this kind of feels like throwing them
> off the deep end. We should at least provide a lifejacket via some kind of
> utility API and/or example
>
> 200. There's been a lot of back and forth on the rebalance metadata/task
> lag computation question, so forgive me if I missed any part of this, but I
> think we've landed at the right idea here. To summarize: the "tl;dr"
> explanation is that we'll write the checkpoint file only on close and will
> account for hard-crash scenarios by opening up the stores on startup and
> writing a checkpoint file for any missing tasks. Does that sound about
> right?
>
> A few clarifications:
> I think we're all more or less on the same page here but just to be
> absolutely clear, the task lags for each task directory found on disk will
> be reported by only one of the StreamThreads, and each StreamThread will
> report lags only for tasks that it already owns or are not assigned to any
> other StreamThread in the client. In other words, we only need to get the
> task lag for completely unassigned/unlocked tasks, which means if there is
> a checkpoint file at all then it must be up-to-date, because there is no
> other StreamThread actively writing to that state store (if so then only
> that StreamThread would report lag for that particular task).
>
> This still leaves the "no checkpoint at all" case which as previously
> mentioned can occur after a  hard-crash. Luckily we only have to worry
> about this once, after starting up again following said hard crash. We can
> simply open up each of the state stores before ever joining the group, get
> the offsets from rocksdb, and write them to a new checkpoint file. After
> that, we can depend on the checkpoints written at close and won't have to
> open up any stores that aren't already assigned for the reasons laid out in
> the paragraph above.
>
> As for the specific mechanism and which thread-does-what, since there were
> some questions, this is how I'm imagining the process:
>
>    1.   The general idea is that we simply go through each task directories
>    with state but no checkpoint file and open the StateStore, call
>    #committedOffset, and then write it to the checkpoint file. We can then
>    close these stores and let things proceed as normal.
>    2.  This only has to happen once, during startup, but we have two
>    options:
>       1. Do this from KafkaStreams#start, ie before we even create the
>       StreamThreads
>       2.  Do this from StreamThread#start, following a similar lock-based
>       approach to the one used #computeTaskLags, where each StreamThread
> just
>       makes a pass over the task directories on disk and attempts to lock
> them
>       one by one. If they obtain the lock, check whether there is state
> but no
>       checkpoint, and write the checkpoint if needed. If it can't grab
> the lock,
>       then we know one of the other StreamThreads must be handling the
> checkpoint
>       file for that task directory, and we can move on.
>
> Don't really feel too strongly about which approach is best,  doing it in
> KafkaStreams#start is certainly the most simple while doing it in the
> StreamThread's startup is more efficient. If we're worried about adding too
> much weight to KafkaStreams#start then the 2nd option is probably best,
> though slightly more complicated.
>
> Thoughts?
>
> On Tue, May 14, 2024 at 10:02 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > Sorry for the delay in replying. I've finally now got some time to work
> on
> > this.
> >
> > Addressing Matthias's comments:
> >
> > 100.
> > Good point. As Bruno mentioned, there's already
> AbstractReadWriteDecorator
> > which we could leverage to provide that protection. I'll add details on
> > this to the KIP.
> >
> > 101,102.
> > It looks like these points have already been addressed by Bruno. Let me
> > know if anything here is still unclear or you feel needs to be detailed
> > more in the KIP.
> >
> > 103.
> > I'm in favour of anything that gets the old code removed sooner, but
> > wouldn't deprecating an API that we expect (some) users to implement
> cause
> > problems?
> > I'm thinking about implementers of custom StateStores, as they may be
> > confused by managesOffsets() being deprecated, especially since they
> would
> > have to mark their implementation as @Deprecated in order to avoid
> compile
> > warnings.
> > If deprecating an API *while it's still expected to be implemented* is
> > something that's generally done in the project, then I'm happy to do so
> > here.
> >
> > 104.
> > I think this is technically possible, but at the cost of considerable
> > additional code to maintain. Would we ever have a pathway to remove this
> > downgrade code in the future?
> >
> >
> > Regarding rebalance metadata:
> > Opening all stores on start-up to read and cache their offsets is an
> > interesting idea, especially if we can avoid re-opening the stores once
> the
> > Tasks have been assigned. Scalability shouldn't be too much of a problem,
> > because typically users have a fairly short state.cleanup.delay, so the
> > number of on-disk Task directories should rarely exceed the number of
> Tasks
> > previously assigned to that instance.
> > An advantage of this approach is that it would also simplify StateStore
> > implementations, as they would only need to guarantee that committed
> > offsets are available when the store is open.
> >
> > I'll investigate this approach this week for feasibility and report back.
> >
> > I think that covers all the outstanding feedback, unless I missed
> anything?
> >
> > Regards,
> > Nick
> >
> > On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org> wrote:
> >
> > > Hi Matthias,
> > >
> > > I see what you mean.
> > >
> > > To sum up:
> > >
> > > With this KIP the .checkpoint file is written when the store closes.
> > > That is when:
> > > 1. a task moves away from Kafka Streams client
> > > 2. Kafka Streams client shuts down
> > >
> > > A Kafka Streams client needs the information in the .checkpoint file
> > > 1. on startup because it does not have any open stores yet.
> > > 2. during rebalances for non-empty state directories of tasks that are
> > > not assigned to the Kafka Streams client.
> > >
> > > With hard crashes, i.e., when the Streams client is not able to close
> > > its state stores and write the .checkpoint file, the .checkpoint file
> > > might be quite stale. That influences the next rebalance after failover
> > > negatively.
> > >
> > >
> > > My conclusion is that Kafka Streams either needs to open the state
> > > stores at start up or we write the checkpoint file more often.
> > >
> > > Writing the .checkpoint file during processing more often without
> > > controlling the flush to disk would work. However, Kafka Streams would
> > > checkpoint offsets that are not yet persisted on disk by the state
> > > store. That is with a hard crash the offsets in the .checkpoint file
> > > might be larger than the offsets checkpointed in the state store. That
> > > might not be a problem if Kafka Streams uses the .checkpoint file only
> > > to compute the task lag. The downside is that it makes the managing of
> > > checkpoints more complex because now we have to maintain two
> > > checkpoints: one for restoration and one for computing the task lag.
> > > I think we should explore the option where Kafka Streams opens the
> state
> > > stores at start up to get the offsets.
> > >
> > > I also checked when Kafka Streams needs the checkpointed offsets to
> > > compute the task lag during a rebalance. Turns out Kafka Streams needs
> > > them before sending the join request. Now, I am wondering if opening
> the
> > > state stores of unassigned tasks whose state directory exists locally
> is
> > > actually such a big issue due to the expected higher latency since it
> > > happens actually before the Kafka Streams client joins the rebalance.
> > >
> > > Best,
> > > Bruno
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 5/4/24 12:05 AM, Matthias J. Sax wrote:
> > > > That's good questions... I could think of a few approaches, but I
> admit
> > > > it might all be a little bit tricky to code up...
> > > >
> > > > However if we don't solve this problem, I think this KIP does not
> > really
> > > > solve the core issue we are facing? In the end, if we rely on the
> > > > `.checkpoint` file to compute a task assignment, but the
> `.checkpoint`
> > > > file can be arbitrary stale after a crash because we only write it
> on a
> > > > clean close, there would be still a huge gap that this KIP does not
> > > close?
> > > >
> > > > For the case in which we keep the checkpoint file, this KIP would
> still
> > > > help for "soft errors" in which KS can recover, and roll back the
> > store.
> > > > A significant win for sure. -- But hard crashes would still be an
> > > > problem? We might assign tasks to "wrong" instance, ie, which are not
> > > > most up to date, as the checkpoint information could be very
> outdated?
> > > > Would we end up with a half-baked solution? Would this be good enough
> > to
> > > > justify the introduced complexity? In the, for soft failures it's
> still
> > > > a win. Just want to make sure we understand the limitations and make
> an
> > > > educated decision.
> > > >
> > > > Or do I miss something?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 5/3/24 10:20 AM, Bruno Cadonna wrote:
> > > >> Hi Matthias,
> > > >>
> > > >>
> > > >> 200:
> > > >> I like the idea in general. However, it is not clear to me how the
> > > >> behavior should be with multiple stream threads in the same Kafka
> > > >> Streams client. What stream thread opens which store? How can a
> stream
> > > >> thread pass an open store to another stream thread that got the
> > > >> corresponding task assigned? How does a stream thread know that a
> task
> > > >> was not assigned to any of the stream threads of the Kafka Streams
> > > >> client? I have the feeling we should just keep the .checkpoint file
> on
> > > >> close for now to unblock this KIP and try to find a solution to get
> > > >> totally rid of it later.
> > > >>
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >>
> > > >>
> > > >> On 5/3/24 6:29 PM, Matthias J. Sax wrote:
> > > >>> 101: Yes, but what I am saying is, that we don't need to flush the
> > > >>> .position file to disk periodically, but only maintain it in main
> > > >>> memory, and only write it to disk on close() to preserve it across
> > > >>> restarts. This way, it would never be ahead, but might only lag?
> But
> > > >>> with my better understanding about (102) it might be mood anyway...
> > > >>>
> > > >>>
> > > >>> 102: Thanks for clarifying. Looked into the code now. Makes sense.
> > > >>> Might be something to be worth calling out explicitly in the KIP
> > > >>> writeup. -- Now that I realize that the position is tracked inside
> > > >>> the store (not outside as the changelog offsets) it makes much more
> > > >>> sense to pull position into RocksDB itself. In the end, it's
> actually
> > > >>> a "store implementation" detail how it tracks the position (and
> kinda
> > > >>> leaky abstraction currently, that we re-use the checkpoint file
> > > >>> mechanism to track it and flush to disk).
> > > >>>
> > > >>>
> > > >>> 200: I was thinking about this a little bit more, and maybe it's
> not
> > > >>> too bad? When KS starts up, we could upon all stores we find on
> local
> > > >>> disk pro-actively, and keep them all open until the first rebalance
> > > >>> finishes: For tasks we get assigned, we hand in the already opened
> > > >>> store (this would amortize the cost to open the store before the
> > > >>> rebalance) and for non-assigned tasks, we know the offset
> information
> > > >>> won't change and we could just cache it in-memory for later reuse
> > > >>> (ie, next rebalance) and close the store to free up resources? --
> > > >>> Assuming that we would get a large percentage of opened stores
> > > >>> assigned as tasks anyway, this could work?
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 5/3/24 1:29 AM, Bruno Cadonna wrote:
> > > >>>> Hi Matthias,
> > > >>>>
> > > >>>>
> > > >>>> 101:
> > > >>>> Let's assume a RocksDB store, but I think the following might be
> > > >>>> true also for other store implementations. With this KIP, if Kafka
> > > >>>> Streams commits the offsets, the committed offsets will be stored
> in
> > > >>>> an in-memory data structure (i.e. the memtable) and stay there
> until
> > > >>>> RocksDB decides that it is time to persist its in-memory data
> > > >>>> structure. If Kafka Streams writes its position to the .position
> > > >>>> file during a commit and a crash happens before RocksDB persist
> the
> > > >>>> memtable then the position in the .position file is ahead of the
> > > >>>> persisted offset. If IQ is done between the crash and the state
> > > >>>> store fully restored the changelog, the position might tell IQ
> that
> > > >>>> the state store is more up-to-date than it actually is.
> > > >>>> In contrast, if Kafka Streams handles persisting positions the
> same
> > > >>>> as persisting offset, the position should always be consistent
> with
> > > >>>> the offset, because they are persisted together.
> > > >>>>
> > > >>>>
> > > >>>> 102:
> > > >>>> I am confused about your confusion which tells me that we are
> > > >>>> talking about two different things.
> > > >>>> You asked
> > > >>>>
> > > >>>> "Do you intent to add this information [i.e. position] to the map
> > > >>>> passed via commit(final Map<TopicPartition, Long>
> > changelogOffsets)?"
> > > >>>>
> > > >>>> and with what I wrote I meant that we do not need to pass the
> > > >>>> position into the implementation of the StateStore interface since
> > > >>>> the position is updated within the implementation of the
> StateStore
> > > >>>> interface (e.g. RocksDBStore [1]). My statement describes the
> > > >>>> behavior now, not the change proposed in this KIP, so it does not
> > > >>>> contradict what is stated in the KIP.
> > > >>>>
> > > >>>>
> > > >>>> 200:
> > > >>>> This is about Matthias' main concern about rebalance metadata.
> > > >>>> As far as I understand the KIP, Kafka Streams will only use the
> > > >>>> .checkpoint files to compute the task lag for unassigned tasks
> whose
> > > >>>> state is locally available. For assigned tasks, it will use the
> > > >>>> offsets managed by the open state store.
> > > >>>>
> > > >>>> Best,
> > > >>>> Bruno
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > >
> >
> https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
> > > >>>>
> > > >>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote:
> > > >>>>> Thanks Bruno.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 101: I think I understand this better now. But just want to make
> > > >>>>> sure I do. What do you mean by "they can diverge" and "Recovering
> > > >>>>> after a failure might load inconsistent offsets and positions."
> > > >>>>>
> > > >>>>> The checkpoint is the offset from the changelog, while the
> position
> > > >>>>> is the offset from the upstream source topic, right? -- In the
> end,
> > > >>>>> the position is about IQ, and if we fail to update it, it only
> > > >>>>> means that there is some gap when we might not be able to query a
> > > >>>>> standby task, because we think it's not up-to-date enough even if
> > > >>>>> it is, which would resolve itself soon? Ie, the position might
> > > >>>>> "lag", but it's not "inconsistent". Do we believe that this lag
> > > >>>>> would be highly problematic?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 102: I am confused.
> > > >>>>>
> > > >>>>>> The position is maintained inside the state store, but is
> > > >>>>>> persisted in the .position file when the state store closes.
> > > >>>>>
> > > >>>>> This contradicts the KIP:
> > > >>>>>
> > > >>>>>>  these position offsets will be stored in RocksDB, in the same
> > > >>>>>> column family as the changelog offsets, instead of the .position
> > > file
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> My main concern is currently about rebalance metadata -- opening
> > > >>>>> RocksDB stores seems to be very expensive, but if we follow the
> > KIP:
> > > >>>>>
> > > >>>>>> We will do this under EOS by updating the .checkpoint file
> > > >>>>>> whenever a store is close()d.
> > > >>>>>
> > > >>>>> It seems, having the offset inside RocksDB does not help us at
> all?
> > > >>>>> In the end, when we crash, we don't want to lose the state, but
> > > >>>>> when we update the .checkpoint only on a clean close, the
> > > >>>>> .checkpoint might be stale (ie, still contains the checkpoint
> when
> > > >>>>> we opened the store when we got a task assigned).
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote:
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> 100
> > > >>>>>> I think we already have such a wrapper. It is called
> > > >>>>>> AbstractReadWriteDecorator.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 101
> > > >>>>>> Currently, the position is checkpointed when a offset checkpoint
> > > >>>>>> is written. If we let the state store manage the committed
> > > >>>>>> offsets, we need to also let the state store also manage the
> > > >>>>>> position otherwise they might diverge. State store managed
> offsets
> > > >>>>>> can get flushed (i.e. checkpointed) to the disk when the state
> > > >>>>>> store decides to flush its in-memory data structures, but the
> > > >>>>>> position is only checkpointed at commit time. Recovering after a
> > > >>>>>> failure might load inconsistent offsets and positions.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 102
> > > >>>>>> The position is maintained inside the state store, but is
> > > >>>>>> persisted in the .position file when the state store closes. The
> > > >>>>>> only public interface that uses the position is IQv2 in a
> > > >>>>>> read-only mode. So the position is only updated within the state
> > > >>>>>> store and read from IQv2. No need to add anything to the public
> > > >>>>>> StateStore interface.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 103
> > > >>>>>> Deprecating managesOffsets() right away might be a good idea.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 104
> > > >>>>>> I agree that we should try to support downgrades without wipes.
> At
> > > >>>>>> least Nick should state in the KIP why we do not support it.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Bruno
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote:
> > > >>>>>>> Thanks for splitting out this KIP. The discussion shows, that
> it
> > > >>>>>>> is a complex beast by itself, so worth to discuss by its own.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Couple of question / comment:
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be called
> > > >>>>>>> by users" -- I would propose to put a guard in place for this,
> by
> > > >>>>>>> either throwing an exception (preferable) or adding a no-op
> > > >>>>>>> implementation (at least for our own stores, by wrapping them
> --
> > > >>>>>>> we cannot enforce it for custom stores I assume), and document
> > > >>>>>>> this contract explicitly.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 101 adding `.position` to the store: Why do we actually need
> > > >>>>>>> this? The KIP says "To ensure consistency with the committed
> data
> > > >>>>>>> and changelog offsets" but I am not sure if I can follow? Can
> you
> > > >>>>>>> elaborate why leaving the `.position` file as-is won't work?
> > > >>>>>>>
> > > >>>>>>>> If it's possible at all, it will need to be done by
> > > >>>>>>>> creating temporary StateManagers and StateStores during
> > > >>>>>>>> rebalance. I think
> > > >>>>>>>> it is possible, and probably not too expensive, but the devil
> > > >>>>>>>> will be in
> > > >>>>>>>> the detail.
> > > >>>>>>>
> > > >>>>>>> This sounds like a significant overhead to me. We know that
> > > >>>>>>> opening a single RocksDB takes about 500ms, and thus opening
> > > >>>>>>> RocksDB to get this information might slow down rebalances
> > > >>>>>>> significantly.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 102: It's unclear to me, how `.position` information is added.
> > > >>>>>>> The KIP only says: "position offsets will be stored in RocksDB,
> > > >>>>>>> in the same column family as the changelog offsets". Do you
> > > >>>>>>> intent to add this information to the map passed via
> > > >>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`? The
> > > >>>>>>> KIP should describe this in more detail. Also, if my assumption
> > > >>>>>>> is correct, we might want to rename the parameter and also
> have a
> > > >>>>>>> better JavaDoc description?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 103: Should we make it mandatory (long-term) that all stores
> > > >>>>>>> (including custom stores) manage their offsets internally?
> > > >>>>>>> Maintaining both options and thus both code paths puts a burden
> > > >>>>>>> on everyone and make the code messy. I would strongly prefer if
> > > >>>>>>> we could have mid-term path to get rid of supporting both.  --
> > > >>>>>>> For this case, we should deprecate the newly added
> > > >>>>>>> `managesOffsets()` method right away, to point out that we
> intend
> > > >>>>>>> to remove it. If it's mandatory to maintain offsets for stores,
> > > >>>>>>> we won't need this method any longer. In memory stores can just
> > > >>>>>>> return null from #committedOffset().
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 104 "downgrading": I think it might be worth to add support for
> > > >>>>>>> downgrading w/o the need to wipe stores? Leveraging
> > > >>>>>>> `upgrade.from` parameter, we could build a two rolling bounce
> > > >>>>>>> downgrade: (1) the new code is started with `upgrade.from` set
> to
> > > >>>>>>> a lower version, telling the runtime to do the cleanup on
> > > >>>>>>> `close()` -- (ie, ensure that all data is written into
> > > >>>>>>> `.checkpoint` and `.position` file, and the newly added CL is
> > > >>>>>>> deleted). In a second, rolling bounce, the old code would be
> able
> > > >>>>>>> to open RocksDB. -- I understand that this implies much more
> > > >>>>>>> work, but downgrade seems to be common enough, that it might be
> > > >>>>>>> worth it? Even if we did not always support this in the past,
> we
> > > >>>>>>> have the face the fact that KS is getting more and more adopted
> > > >>>>>>> and as a more mature product should support this?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> -Matthias
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote:
> > > >>>>>>>> Hi all,
> > > >>>>>>>>
> > > >>>>>>>> How should we proceed here?
> > > >>>>>>>>
> > > >>>>>>>> 1. with the plain .checkpoint file
> > > >>>>>>>> 2. with a way to use the state store interface on unassigned
> but
> > > >>>>>>>> locally existing task state
> > > >>>>>>>>
> > > >>>>>>>> While I like option 2, I think option 1 is less risky and will
> > > >>>>>>>> give us the benefits of transactional state stores sooner. We
> > > >>>>>>>> should consider the interface approach afterwards, though.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Bruno
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote:
> > > >>>>>>>>> Hi Nick and Sophie,
> > > >>>>>>>>>
> > > >>>>>>>>> I think the task ID is not enough to create a state store
> that
> > > >>>>>>>>> can read the offsets of non-assigned tasks for lag
> computation
> > > >>>>>>>>> during rebalancing. The state store also needs the state
> > > >>>>>>>>> directory so that it knows where to find the information that
> > > >>>>>>>>> it needs to return from changelogOffsets().
> > > >>>>>>>>>
> > > >>>>>>>>> In general, I think we should proceed with the plain
> > > >>>>>>>>> .checkpoint file for now and iterate back to the state store
> > > >>>>>>>>> solution later since it seems it is not that straightforward.
> > > >>>>>>>>> Alternatively, Nick could timebox an effort to better
> > > >>>>>>>>> understand what would be needed for the state store solution.
> > > >>>>>>>>> Nick, let us know your decision.
> > > >>>>>>>>>
> > > >>>>>>>>> Regarding your question about the state store instance. I am
> > > >>>>>>>>> not too familiar with that part of the code, but I think the
> > > >>>>>>>>> state store is build when the processor topology is build and
> > > >>>>>>>>> the processor topology is build per stream task. So there is
> > > >>>>>>>>> one instance of processor topology and state store per stream
> > > >>>>>>>>> task. Try to follow the call in [1].
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Bruno
> > > >>>>>>>>>
> > > >>>>>>>>> [1]
> > > >>>>>>>>>
> > >
> >
> https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote:
> > > >>>>>>>>>> That does make sense. The one thing I can't figure out is
> how
> > > >>>>>>>>>> per-Task
> > > >>>>>>>>>> StateStore instances are constructed.
> > > >>>>>>>>>>
> > > >>>>>>>>>> It looks like we construct one StateStore instance for the
> > > >>>>>>>>>> whole Topology
> > > >>>>>>>>>> (in InternalTopologyBuilder), and pass that into
> > > >>>>>>>>>> ProcessorStateManager (via
> > > >>>>>>>>>> StateManagerUtil) for each Task, which then initializes it.
> > > >>>>>>>>>>
> > > >>>>>>>>>> This can't be the case though, otherwise multiple partitions
> > > >>>>>>>>>> of the same
> > > >>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore
> > > >>>>>>>>>> instance, which
> > > >>>>>>>>>> they don't.
> > > >>>>>>>>>>
> > > >>>>>>>>>> What am I missing?
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
> > > >>>>>>>>>> <sop...@responsive.dev>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> I don't think we need to *require* a constructor accept the
> > > >>>>>>>>>>> TaskId, but we
> > > >>>>>>>>>>> would definitely make sure that the RocksDB state store
> > > >>>>>>>>>>> changes its
> > > >>>>>>>>>>> constructor to one that accepts the TaskID (which we can do
> > > >>>>>>>>>>> without
> > > >>>>>>>>>>> deprecation since its an internal API), and custom state
> > > >>>>>>>>>>> stores can just
> > > >>>>>>>>>>> decide for themselves whether they want to opt-in/use the
> > > >>>>>>>>>>> TaskId param
> > > >>>>>>>>>>> or not. I mean custom state stores would have to opt-in
> > > >>>>>>>>>>> anyways by
> > > >>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and the
> > only
> > > >>>>>>>>>>> reason to do that would be to have created a constructor
> that
> > > >>>>>>>>>>> accepts
> > > >>>>>>>>>>> a TaskId
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Just to be super clear about the proposal, this is what I
> had
> > > >>>>>>>>>>> in mind.
> > > >>>>>>>>>>> It's actually fairly simple and wouldn't add much to the
> > > >>>>>>>>>>> scope of the
> > > >>>>>>>>>>> KIP (I think -- if it turns out to be more complicated than
> > > >>>>>>>>>>> I'm assuming,
> > > >>>>>>>>>>> we should definitely do whatever has the smallest LOE to
> get
> > > >>>>>>>>>>> this done
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Anyways, the (only) public API changes would be to add this
> > new
> > > >>>>>>>>>>> method to the StoreSupplier API:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> default T get(final TaskId taskId) {
> > > >>>>>>>>>>>      return get();
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> We can decide whether or not to deprecate the old #get but
> > > >>>>>>>>>>> it's not
> > > >>>>>>>>>>> really necessary and might cause a lot of turmoil, so I'd
> > > >>>>>>>>>>> personally
> > > >>>>>>>>>>> say we just leave both APIs in place.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> And that's it for public API changes! Internally, we would
> > > >>>>>>>>>>> just adapt
> > > >>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement this
> > new
> > > >>>>>>>>>>> API. So for example with the
> > RocksDBKeyValueBytesStoreSupplier,
> > > >>>>>>>>>>> we just add
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> @Override
> > > >>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId
> taskId)
> > {
> > > >>>>>>>>>>>      return returnTimestampedStore ?
> > > >>>>>>>>>>>          new RocksDBTimestampedStore(name, metricsScope(),
> > > >>>>>>>>>>> taskId) :
> > > >>>>>>>>>>>          new RocksDBStore(name, metricsScope(), taskId);
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> And of course add the TaskId parameter to each of the
> actual
> > > >>>>>>>>>>> state store constructors returned here.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Does that make sense? It's entirely possible I'm missing
> > > >>>>>>>>>>> something
> > > >>>>>>>>>>> important here, but I think this would be a pretty small
> > > >>>>>>>>>>> addition that
> > > >>>>>>>>>>> would solve the problem you mentioned earlier while also
> > being
> > > >>>>>>>>>>> useful to anyone who uses custom state stores.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
> > > >>>>>>>>>>> <nick.telf...@gmail.com>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Sophie,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Interesting idea! Although what would that mean for the
> > > >>>>>>>>>>>> StateStore
> > > >>>>>>>>>>>> interface? Obviously we can't require that the constructor
> > > >>>>>>>>>>>> take the
> > > >>>>>>>>>>> TaskId.
> > > >>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we
> > > >>>>>>>>>>>> over-complicating
> > > >>>>>>>>>>> it?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Nick
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
> > > >>>>>>>>>>>> <sop...@responsive.dev
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Somewhat minor point overall, but it actually drives me
> > > >>>>>>>>>>>>> crazy that you
> > > >>>>>>>>>>>>> can't get access to the taskId of a StateStore until
> #init
> > > >>>>>>>>>>>>> is called.
> > > >>>>>>>>>>>> This
> > > >>>>>>>>>>>>> has caused me a huge headache personally (since the same
> is
> > > >>>>>>>>>>>>> true for
> > > >>>>>>>>>>>>> processors and I was trying to do something that's
> probably
> > > >>>>>>>>>>>>> too hacky
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>> actually complain about here lol)
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive and
> > > >>>>>>>>>>>>> pass along the
> > > >>>>>>>>>>>>> taskId when creating a new store? Presumably by adding a
> > > >>>>>>>>>>>>> new version of
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>> #get method that takes in a taskId parameter? We can have
> > > >>>>>>>>>>>>> it default to
> > > >>>>>>>>>>>>> invoking the old one for compatibility reasons and it
> > > >>>>>>>>>>>>> should be
> > > >>>>>>>>>>>> completely
> > > >>>>>>>>>>>>> safe to tack on.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier, but
> > > that's
> > > >>>>>>>>>>> definitely
> > > >>>>>>>>>>>>> outside the scope of this KIP
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
> > > >>>>>>>>>>>>> <nick.telf...@gmail.com>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On further thought, it's clear that this can't work for
> > > >>>>>>>>>>>>>> one simple
> > > >>>>>>>>>>>>> reason:
> > > >>>>>>>>>>>>>> StateStores don't know their associated TaskId (and
> hence,
> > > >>>>>>>>>>>>>> their
> > > >>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore,
> > > >>>>>>>>>>>>>> committedOffset()
> > > >>>>>>>>>>>> can't
> > > >>>>>>>>>>>>>> be called before init(), unless we also added a
> > > >>>>>>>>>>>>>> StateStoreContext
> > > >>>>>>>>>>>>> argument
> > > >>>>>>>>>>>>>> to committedOffset(), which I think might be trying to
> > > >>>>>>>>>>>>>> shoehorn too
> > > >>>>>>>>>>>> much
> > > >>>>>>>>>>>>>> into committedOffset().
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I still don't like the idea of the Streams engine
> > > >>>>>>>>>>>>>> maintaining the
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> changelog offsets independently of stores, mostly
> because
> > > >>>>>>>>>>>>>> of the
> > > >>>>>>>>>>>>>> maintenance burden of the code duplication, but it looks
> > > >>>>>>>>>>>>>> like we'll
> > > >>>>>>>>>>>> have
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>> live with it.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Unless you have any better ideas?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>> Nick
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford
> > > >>>>>>>>>>>>>> <nick.telf...@gmail.com>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi Bruno,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Immediately after I sent my response, I looked at the
> > > >>>>>>>>>>>>>>> codebase and
> > > >>>>>>>>>>>> came
> > > >>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it will
> > > >>>>>>>>>>>>>>> need to be
> > > >>>>>>>>>>> done
> > > >>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>> creating temporary StateManagers and StateStores during
> > > >>>>>>>>>>>>>>> rebalance.
> > > >>>>>>>>>>> I
> > > >>>>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>> it is possible, and probably not too expensive, but the
> > > >>>>>>>>>>>>>>> devil will
> > > >>>>>>>>>>> be
> > > >>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>> the detail.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I'll try to find some time to explore the idea to see
> if
> > > >>>>>>>>>>>>>>> it's
> > > >>>>>>>>>>>> possible
> > > >>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>> report back, because we'll need to determine this
> before
> > > >>>>>>>>>>>>>>> we can
> > > >>>>>>>>>>> vote
> > > >>>>>>>>>>>> on
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> KIP.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>> Nick
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna
> > > >>>>>>>>>>>>>>> <cado...@apache.org>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi Nick,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly!
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> 2.
> > > >>>>>>>>>>>>>>>> Some thoughts on your proposal.
> > > >>>>>>>>>>>>>>>> State managers (and state stores) are parts of tasks.
> If
> > > >>>>>>>>>>>>>>>> the task
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To get
> > > >>>>>>>>>>>>>>>> the offsets
> > > >>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>> your approach, we would need to either create kind of
> > > >>>>>>>>>>>>>>>> inactive
> > > >>>>>>>>>>> tasks
> > > >>>>>>>>>>>>>>>> besides active and standby tasks or store and manage
> > state
> > > >>>>>>>>>>> managers
> > > >>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>> non-assigned tasks differently than the state managers
> > > >>>>>>>>>>>>>>>> of assigned
> > > >>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that removes
> > > >>>>>>>>>>>>>>>> unassigned
> > > >>>>>>>>>>> task
> > > >>>>>>>>>>>>>>>> directories needs to concurrently delete those
> inactive
> > > >>>>>>>>>>>>>>>> tasks or
> > > >>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This
> seems
> > > >>>>>>>>>>>>>>>> all quite
> > > >>>>>>>>>>>>> messy
> > > >>>>>>>>>>>>>>>> to me.
> > > >>>>>>>>>>>>>>>> Could we create those state managers (or state stores)
> > > >>>>>>>>>>>>>>>> for locally
> > > >>>>>>>>>>>>>>>> existing but unassigned tasks on demand when
> > > >>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or have a
> > > >>>>>>>>>>>>>>>> different
> > > >>>>>>>>>>>>>>>> encapsulation for the unused task directories?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>> Bruno
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote:
> > > >>>>>>>>>>>>>>>>> Hi Bruno,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Thanks for the review!
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> 1, 4, 5.
> > > >>>>>>>>>>>>>>>>> Done
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> 3.
> > > >>>>>>>>>>>>>>>>> You're right. I've removed the offending paragraph. I
> > had
> > > >>>>>>>>>>>> originally
> > > >>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in KIP-892.
> > > >>>>>>>>>>>>>>>>> But it's
> > > >>>>>>>>>>>>>>>> difficult to
> > > >>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892
> > transaction
> > > >>>>>>>>>>> buffers.
> > > >>>>>>>>>>>>>>>> Instead,
> > > >>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc when
> > > >>>>>>>>>>>>>>>>> KIP-892
> > > >>>>>>>>>>> lands.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> 2.
> > > >>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that was
> > > >>>>>>>>>>>>> (significantly)
> > > >>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My
> prototype
> > > >>>>>>>>>>>>>>>>> currently
> > > >>>>>>>>>>>>>>>> maintains
> > > >>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint, but
> > > >>>>>>>>>>>>>>>>> doing so
> > > >>>>>>>>>>>>> becomes
> > > >>>>>>>>>>>>>>>> very
> > > >>>>>>>>>>>>>>>>> messy. My intent with this change was to try to
> better
> > > >>>>>>>>>>> encapsulate
> > > >>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that can
> > > >>>>>>>>>>>>>>>>> cheaply
> > > >>>>>>>>>>>>> provide
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> offsets stored directly in them without needing to
> > > >>>>>>>>>>>>>>>>> duplicate
> > > >>>>>>>>>>> them
> > > >>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>> cache.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> It's clear some more work is needed here to better
> > > >>>>>>>>>>>>>>>>> encapsulate
> > > >>>>>>>>>>>> this.
> > > >>>>>>>>>>>>>> My
> > > >>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but don't
> > > >>>>>>>>>>> initialize*
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> StateManager and StateStores for every Task directory
> > > >>>>>>>>>>>>>>>>> on-disk?
> > > >>>>>>>>>>>> That
> > > >>>>>>>>>>>>>>>> should
> > > >>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to
> > > >>>>>>>>>>>>>>>>> query the
> > > >>>>>>>>>>>> offsets
> > > >>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If the
> > > >>>>>>>>>>> StateManager
> > > >>>>>>>>>>>>>> (aka.
> > > >>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves too
> > > >>>>>>>>>>>>>>>>> expensive
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>> hold
> > > >>>>>>>>>>>>>>>> open
> > > >>>>>>>>>>>>>>>>> for closed stores, we could always have a
> > > >>>>>>>>>>>>>>>>> "StubStateManager" in
> > > >>>>>>>>>>>> its
> > > >>>>>>>>>>>>>>>> place,
> > > >>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing
> else?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> IDK, what do you think?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Nick
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
> > > >>>>>>>>>>>>>>>>> <cado...@apache.org>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Hi Nick,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892!
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Here a couple of comments/questions:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 1.
> > > >>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline which
> > > >>>>>>>>>>>>>>>>>> says to not
> > > >>>>>>>>>>>> use
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could
> you
> > > >>>>>>>>>>>>>>>>>> please
> > > >>>>>>>>>>>> change
> > > >>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 2.
> > > >>>>>>>>>>>>>>>>>> It is not clear to me how
> > > TaskManager#getTaskOffsetSums()
> > > >>>>>>>>>>> should
> > > >>>>>>>>>>>>> read
> > > >>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own but
> > > >>>>>>>>>>>>>>>>>> that have a
> > > >>>>>>>>>>>>> state
> > > >>>>>>>>>>>>>>>>>> directory on the Streams client by calling
> > > >>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread does
> > > >>>>>>>>>>>>>>>>>> not own a
> > > >>>>>>>>>>>> task
> > > >>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>>>> does also not create any state stores for the task,
> > > >>>>>>>>>>>>>>>>>> which means
> > > >>>>>>>>>>>>> there
> > > >>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>> no state store on which to call
> getCommittedOffsets().
> > > >>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint file
> is
> > > >>>>>>>>>>>>>>>>>> written
> > > >>>>>>>>>>>> for
> > > >>>>>>>>>>>>>> all
> > > >>>>>>>>>>>>>>>>>> state stores on close -- not only for the
> RocksDBStore
> > > >>>>>>>>>>>>>>>>>> -- and
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>>> checkpoint file is read in
> > > >>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> tasks
> > > >>>>>>>>>>>>>>>>>> that have a state directory on the client but are
> not
> > > >>>>>>>>>>>>>>>>>> currently
> > > >>>>>>>>>>>>>>>> assigned
> > > >>>>>>>>>>>>>>>>>> to any stream thread of the Streams client.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 3.
> > > >>>>>>>>>>>>>>>>>> In the javadocs for commit() you write
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or since
> > > >>>>>>>>>>>>> init(StateStore)
> > > >>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a
> restart."
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> This is only true for a clean close before the
> > > >>>>>>>>>>>>>>>>>> restart, isn't
> > > >>>>>>>>>>> it?
> > > >>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka Streams
> > > >>>>>>>>>>>>>>>>>> cannot
> > > >>>>>>>>>>>>> guarantee
> > > >>>>>>>>>>>>>>>>>> that the in-memory structures of the state store
> (e.g.
> > > >>>>>>>>>>>>>>>>>> memtable
> > > >>>>>>>>>>>> in
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records and
> > the
> > > >>>>>>>>>>>> committed
> > > >>>>>>>>>>>>>>>>>> offsets are persisted.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 4.
> > > >>>>>>>>>>>>>>>>>> The wrapper that provides the legacy checkpointing
> > > >>>>>>>>>>>>>>>>>> behavior is
> > > >>>>>>>>>>>>>> actually
> > > >>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from the
> > > >>>>>>>>>>>>>>>>>> KIP, but
> > > >>>>>>>>>>>> still
> > > >>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior will be
> > > >>>>>>>>>>>>>>>>>> supported
> > > >>>>>>>>>>>> when
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> state store does not manage the checkpoints.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 5.
> > > >>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the
> tags,
> > > >>>>>>>>>>>>>>>>>> and the
> > > >>>>>>>>>>>>>> recording
> > > >>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or KIP-444.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>> Bruno
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote:
> > > >>>>>>>>>>>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out
> the
> > > >>>>>>>>>>>>>>>>>>> "Atomic
> > > >>>>>>>>>>>>>>>>>> Checkpointing"
> > > >>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics for
> > > >>>>>>>>>>>>>>>>>>> StateStores,
> > > >>>>>>>>>>>>> into
> > > >>>>>>>>>>>>>>>> its
> > > >>>>>>>>>>>>>>>>>> own
> > > >>>>>>>>>>>>>>>>>>> KIP
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes
> outlined
> > > in
> > > >>>>>>>>>>>> KIP-1035,
> > > >>>>>>>>>>>>>>>> these
> > > >>>>>>>>>>>>>>>>>>> changes were always the most contentious part, and
> > > >>>>>>>>>>>>>>>>>>> continued
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>> spur
> > > >>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been
> > > >>>>>>>>>>>>>>>>>>> removed from
> > > >>>>>>>>>>>>>> KIP-892,
> > > >>>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to
> > > >>>>>>>>>>>>>>>>>>> KIP-892 in
> > > >>>>>>>>>>> their
> > > >>>>>>>>>>>>>>>> place.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this set
> of
> > > >>>>>>>>>>>>>>>>>>> changes,
> > > >>>>>>>>>>> we
> > > >>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>> deliver something that we're all happy with.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>>>> Nick
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > >
> >
>

Reply via email to