Hi everyone, As discussed on the Zoom call, we're going to handle rebalance meta-data by:
- On start-up, Streams will open each store and read its changelog offsets into an in-memory cache. This cache will be shared among all StreamThreads. - On rebalance, the cache will be consulted for Task offsets for any Task that is not active on any instance-local StreamThreads. If the Task is active on *any* instance-local StreamThread, we will report the Task lag as "up to date" (i.e. -1), because we know that the local state is currently up-to-date. We will avoid caching offsets across restarts in the legacy ".checkpoint" file, so that we can eliminate the logic for handling this class. If performance of opening/closing many state stores is poor, we can parallelise it by forking off a thread for each Task directory when reading the offsets. I'll update the KIP later today to reflect this design, but I will try to keep it high-level, so that the exact implementation can vary. Regards, Nick On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <sop...@responsive.dev> wrote: > 103: I like the idea of immediately deprecating #managesOffsets and aiming > to make offset management mandatory in the long run. I assume we would also > log a warning for any custom stores that return "false" from this method to > encourage custom store implementations to start doing so? My only > question/concern is that if we want folks to start managing their own > offsets then we should make this transition easy for them, perhaps by > exposing some public utility APIs for things that are currently handled by > Kafka Streams such as reading/writing checkpoint files. Maybe it would be > useful to include a small example in the KIP of what it would actually mean > to "manage your own offsets" -- I know (all too well) that plugging in > custom storage implementations is not easy and most people who do this are > probably fairly advanced users, but offset management will be a totally new > ballgame to most people people and this kind of feels like throwing them > off the deep end. We should at least provide a lifejacket via some kind of > utility API and/or example > > 200. There's been a lot of back and forth on the rebalance metadata/task > lag computation question, so forgive me if I missed any part of this, but I > think we've landed at the right idea here. To summarize: the "tl;dr" > explanation is that we'll write the checkpoint file only on close and will > account for hard-crash scenarios by opening up the stores on startup and > writing a checkpoint file for any missing tasks. Does that sound about > right? > > A few clarifications: > I think we're all more or less on the same page here but just to be > absolutely clear, the task lags for each task directory found on disk will > be reported by only one of the StreamThreads, and each StreamThread will > report lags only for tasks that it already owns or are not assigned to any > other StreamThread in the client. In other words, we only need to get the > task lag for completely unassigned/unlocked tasks, which means if there is > a checkpoint file at all then it must be up-to-date, because there is no > other StreamThread actively writing to that state store (if so then only > that StreamThread would report lag for that particular task). > > This still leaves the "no checkpoint at all" case which as previously > mentioned can occur after a hard-crash. Luckily we only have to worry > about this once, after starting up again following said hard crash. We can > simply open up each of the state stores before ever joining the group, get > the offsets from rocksdb, and write them to a new checkpoint file. After > that, we can depend on the checkpoints written at close and won't have to > open up any stores that aren't already assigned for the reasons laid out in > the paragraph above. > > As for the specific mechanism and which thread-does-what, since there were > some questions, this is how I'm imagining the process: > > 1. The general idea is that we simply go through each task directories > with state but no checkpoint file and open the StateStore, call > #committedOffset, and then write it to the checkpoint file. We can then > close these stores and let things proceed as normal. > 2. This only has to happen once, during startup, but we have two > options: > 1. Do this from KafkaStreams#start, ie before we even create the > StreamThreads > 2. Do this from StreamThread#start, following a similar lock-based > approach to the one used #computeTaskLags, where each StreamThread > just > makes a pass over the task directories on disk and attempts to lock > them > one by one. If they obtain the lock, check whether there is state > but no > checkpoint, and write the checkpoint if needed. If it can't grab > the lock, > then we know one of the other StreamThreads must be handling the > checkpoint > file for that task directory, and we can move on. > > Don't really feel too strongly about which approach is best, doing it in > KafkaStreams#start is certainly the most simple while doing it in the > StreamThread's startup is more efficient. If we're worried about adding too > much weight to KafkaStreams#start then the 2nd option is probably best, > though slightly more complicated. > > Thoughts? > > On Tue, May 14, 2024 at 10:02 AM Nick Telford <nick.telf...@gmail.com> > wrote: > > > Hi everyone, > > > > Sorry for the delay in replying. I've finally now got some time to work > on > > this. > > > > Addressing Matthias's comments: > > > > 100. > > Good point. As Bruno mentioned, there's already > AbstractReadWriteDecorator > > which we could leverage to provide that protection. I'll add details on > > this to the KIP. > > > > 101,102. > > It looks like these points have already been addressed by Bruno. Let me > > know if anything here is still unclear or you feel needs to be detailed > > more in the KIP. > > > > 103. > > I'm in favour of anything that gets the old code removed sooner, but > > wouldn't deprecating an API that we expect (some) users to implement > cause > > problems? > > I'm thinking about implementers of custom StateStores, as they may be > > confused by managesOffsets() being deprecated, especially since they > would > > have to mark their implementation as @Deprecated in order to avoid > compile > > warnings. > > If deprecating an API *while it's still expected to be implemented* is > > something that's generally done in the project, then I'm happy to do so > > here. > > > > 104. > > I think this is technically possible, but at the cost of considerable > > additional code to maintain. Would we ever have a pathway to remove this > > downgrade code in the future? > > > > > > Regarding rebalance metadata: > > Opening all stores on start-up to read and cache their offsets is an > > interesting idea, especially if we can avoid re-opening the stores once > the > > Tasks have been assigned. Scalability shouldn't be too much of a problem, > > because typically users have a fairly short state.cleanup.delay, so the > > number of on-disk Task directories should rarely exceed the number of > Tasks > > previously assigned to that instance. > > An advantage of this approach is that it would also simplify StateStore > > implementations, as they would only need to guarantee that committed > > offsets are available when the store is open. > > > > I'll investigate this approach this week for feasibility and report back. > > > > I think that covers all the outstanding feedback, unless I missed > anything? > > > > Regards, > > Nick > > > > On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org> wrote: > > > > > Hi Matthias, > > > > > > I see what you mean. > > > > > > To sum up: > > > > > > With this KIP the .checkpoint file is written when the store closes. > > > That is when: > > > 1. a task moves away from Kafka Streams client > > > 2. Kafka Streams client shuts down > > > > > > A Kafka Streams client needs the information in the .checkpoint file > > > 1. on startup because it does not have any open stores yet. > > > 2. during rebalances for non-empty state directories of tasks that are > > > not assigned to the Kafka Streams client. > > > > > > With hard crashes, i.e., when the Streams client is not able to close > > > its state stores and write the .checkpoint file, the .checkpoint file > > > might be quite stale. That influences the next rebalance after failover > > > negatively. > > > > > > > > > My conclusion is that Kafka Streams either needs to open the state > > > stores at start up or we write the checkpoint file more often. > > > > > > Writing the .checkpoint file during processing more often without > > > controlling the flush to disk would work. However, Kafka Streams would > > > checkpoint offsets that are not yet persisted on disk by the state > > > store. That is with a hard crash the offsets in the .checkpoint file > > > might be larger than the offsets checkpointed in the state store. That > > > might not be a problem if Kafka Streams uses the .checkpoint file only > > > to compute the task lag. The downside is that it makes the managing of > > > checkpoints more complex because now we have to maintain two > > > checkpoints: one for restoration and one for computing the task lag. > > > I think we should explore the option where Kafka Streams opens the > state > > > stores at start up to get the offsets. > > > > > > I also checked when Kafka Streams needs the checkpointed offsets to > > > compute the task lag during a rebalance. Turns out Kafka Streams needs > > > them before sending the join request. Now, I am wondering if opening > the > > > state stores of unassigned tasks whose state directory exists locally > is > > > actually such a big issue due to the expected higher latency since it > > > happens actually before the Kafka Streams client joins the rebalance. > > > > > > Best, > > > Bruno > > > > > > > > > > > > > > > > > > > > > > > > On 5/4/24 12:05 AM, Matthias J. Sax wrote: > > > > That's good questions... I could think of a few approaches, but I > admit > > > > it might all be a little bit tricky to code up... > > > > > > > > However if we don't solve this problem, I think this KIP does not > > really > > > > solve the core issue we are facing? In the end, if we rely on the > > > > `.checkpoint` file to compute a task assignment, but the > `.checkpoint` > > > > file can be arbitrary stale after a crash because we only write it > on a > > > > clean close, there would be still a huge gap that this KIP does not > > > close? > > > > > > > > For the case in which we keep the checkpoint file, this KIP would > still > > > > help for "soft errors" in which KS can recover, and roll back the > > store. > > > > A significant win for sure. -- But hard crashes would still be an > > > > problem? We might assign tasks to "wrong" instance, ie, which are not > > > > most up to date, as the checkpoint information could be very > outdated? > > > > Would we end up with a half-baked solution? Would this be good enough > > to > > > > justify the introduced complexity? In the, for soft failures it's > still > > > > a win. Just want to make sure we understand the limitations and make > an > > > > educated decision. > > > > > > > > Or do I miss something? > > > > > > > > > > > > -Matthias > > > > > > > > On 5/3/24 10:20 AM, Bruno Cadonna wrote: > > > >> Hi Matthias, > > > >> > > > >> > > > >> 200: > > > >> I like the idea in general. However, it is not clear to me how the > > > >> behavior should be with multiple stream threads in the same Kafka > > > >> Streams client. What stream thread opens which store? How can a > stream > > > >> thread pass an open store to another stream thread that got the > > > >> corresponding task assigned? How does a stream thread know that a > task > > > >> was not assigned to any of the stream threads of the Kafka Streams > > > >> client? I have the feeling we should just keep the .checkpoint file > on > > > >> close for now to unblock this KIP and try to find a solution to get > > > >> totally rid of it later. > > > >> > > > >> > > > >> Best, > > > >> Bruno > > > >> > > > >> > > > >> > > > >> On 5/3/24 6:29 PM, Matthias J. Sax wrote: > > > >>> 101: Yes, but what I am saying is, that we don't need to flush the > > > >>> .position file to disk periodically, but only maintain it in main > > > >>> memory, and only write it to disk on close() to preserve it across > > > >>> restarts. This way, it would never be ahead, but might only lag? > But > > > >>> with my better understanding about (102) it might be mood anyway... > > > >>> > > > >>> > > > >>> 102: Thanks for clarifying. Looked into the code now. Makes sense. > > > >>> Might be something to be worth calling out explicitly in the KIP > > > >>> writeup. -- Now that I realize that the position is tracked inside > > > >>> the store (not outside as the changelog offsets) it makes much more > > > >>> sense to pull position into RocksDB itself. In the end, it's > actually > > > >>> a "store implementation" detail how it tracks the position (and > kinda > > > >>> leaky abstraction currently, that we re-use the checkpoint file > > > >>> mechanism to track it and flush to disk). > > > >>> > > > >>> > > > >>> 200: I was thinking about this a little bit more, and maybe it's > not > > > >>> too bad? When KS starts up, we could upon all stores we find on > local > > > >>> disk pro-actively, and keep them all open until the first rebalance > > > >>> finishes: For tasks we get assigned, we hand in the already opened > > > >>> store (this would amortize the cost to open the store before the > > > >>> rebalance) and for non-assigned tasks, we know the offset > information > > > >>> won't change and we could just cache it in-memory for later reuse > > > >>> (ie, next rebalance) and close the store to free up resources? -- > > > >>> Assuming that we would get a large percentage of opened stores > > > >>> assigned as tasks anyway, this could work? > > > >>> > > > >>> > > > >>> -Matthias > > > >>> > > > >>> On 5/3/24 1:29 AM, Bruno Cadonna wrote: > > > >>>> Hi Matthias, > > > >>>> > > > >>>> > > > >>>> 101: > > > >>>> Let's assume a RocksDB store, but I think the following might be > > > >>>> true also for other store implementations. With this KIP, if Kafka > > > >>>> Streams commits the offsets, the committed offsets will be stored > in > > > >>>> an in-memory data structure (i.e. the memtable) and stay there > until > > > >>>> RocksDB decides that it is time to persist its in-memory data > > > >>>> structure. If Kafka Streams writes its position to the .position > > > >>>> file during a commit and a crash happens before RocksDB persist > the > > > >>>> memtable then the position in the .position file is ahead of the > > > >>>> persisted offset. If IQ is done between the crash and the state > > > >>>> store fully restored the changelog, the position might tell IQ > that > > > >>>> the state store is more up-to-date than it actually is. > > > >>>> In contrast, if Kafka Streams handles persisting positions the > same > > > >>>> as persisting offset, the position should always be consistent > with > > > >>>> the offset, because they are persisted together. > > > >>>> > > > >>>> > > > >>>> 102: > > > >>>> I am confused about your confusion which tells me that we are > > > >>>> talking about two different things. > > > >>>> You asked > > > >>>> > > > >>>> "Do you intent to add this information [i.e. position] to the map > > > >>>> passed via commit(final Map<TopicPartition, Long> > > changelogOffsets)?" > > > >>>> > > > >>>> and with what I wrote I meant that we do not need to pass the > > > >>>> position into the implementation of the StateStore interface since > > > >>>> the position is updated within the implementation of the > StateStore > > > >>>> interface (e.g. RocksDBStore [1]). My statement describes the > > > >>>> behavior now, not the change proposed in this KIP, so it does not > > > >>>> contradict what is stated in the KIP. > > > >>>> > > > >>>> > > > >>>> 200: > > > >>>> This is about Matthias' main concern about rebalance metadata. > > > >>>> As far as I understand the KIP, Kafka Streams will only use the > > > >>>> .checkpoint files to compute the task lag for unassigned tasks > whose > > > >>>> state is locally available. For assigned tasks, it will use the > > > >>>> offsets managed by the open state store. > > > >>>> > > > >>>> Best, > > > >>>> Bruno > > > >>>> > > > >>>> [1] > > > >>>> > > > > > > https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397 > > > >>>> > > > >>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote: > > > >>>>> Thanks Bruno. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> 101: I think I understand this better now. But just want to make > > > >>>>> sure I do. What do you mean by "they can diverge" and "Recovering > > > >>>>> after a failure might load inconsistent offsets and positions." > > > >>>>> > > > >>>>> The checkpoint is the offset from the changelog, while the > position > > > >>>>> is the offset from the upstream source topic, right? -- In the > end, > > > >>>>> the position is about IQ, and if we fail to update it, it only > > > >>>>> means that there is some gap when we might not be able to query a > > > >>>>> standby task, because we think it's not up-to-date enough even if > > > >>>>> it is, which would resolve itself soon? Ie, the position might > > > >>>>> "lag", but it's not "inconsistent". Do we believe that this lag > > > >>>>> would be highly problematic? > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> 102: I am confused. > > > >>>>> > > > >>>>>> The position is maintained inside the state store, but is > > > >>>>>> persisted in the .position file when the state store closes. > > > >>>>> > > > >>>>> This contradicts the KIP: > > > >>>>> > > > >>>>>> these position offsets will be stored in RocksDB, in the same > > > >>>>>> column family as the changelog offsets, instead of the .position > > > file > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> My main concern is currently about rebalance metadata -- opening > > > >>>>> RocksDB stores seems to be very expensive, but if we follow the > > KIP: > > > >>>>> > > > >>>>>> We will do this under EOS by updating the .checkpoint file > > > >>>>>> whenever a store is close()d. > > > >>>>> > > > >>>>> It seems, having the offset inside RocksDB does not help us at > all? > > > >>>>> In the end, when we crash, we don't want to lose the state, but > > > >>>>> when we update the .checkpoint only on a clean close, the > > > >>>>> .checkpoint might be stale (ie, still contains the checkpoint > when > > > >>>>> we opened the store when we got a task assigned). > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> -Matthias > > > >>>>> > > > >>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote: > > > >>>>>> Hi all, > > > >>>>>> > > > >>>>>> 100 > > > >>>>>> I think we already have such a wrapper. It is called > > > >>>>>> AbstractReadWriteDecorator. > > > >>>>>> > > > >>>>>> > > > >>>>>> 101 > > > >>>>>> Currently, the position is checkpointed when a offset checkpoint > > > >>>>>> is written. If we let the state store manage the committed > > > >>>>>> offsets, we need to also let the state store also manage the > > > >>>>>> position otherwise they might diverge. State store managed > offsets > > > >>>>>> can get flushed (i.e. checkpointed) to the disk when the state > > > >>>>>> store decides to flush its in-memory data structures, but the > > > >>>>>> position is only checkpointed at commit time. Recovering after a > > > >>>>>> failure might load inconsistent offsets and positions. > > > >>>>>> > > > >>>>>> > > > >>>>>> 102 > > > >>>>>> The position is maintained inside the state store, but is > > > >>>>>> persisted in the .position file when the state store closes. The > > > >>>>>> only public interface that uses the position is IQv2 in a > > > >>>>>> read-only mode. So the position is only updated within the state > > > >>>>>> store and read from IQv2. No need to add anything to the public > > > >>>>>> StateStore interface. > > > >>>>>> > > > >>>>>> > > > >>>>>> 103 > > > >>>>>> Deprecating managesOffsets() right away might be a good idea. > > > >>>>>> > > > >>>>>> > > > >>>>>> 104 > > > >>>>>> I agree that we should try to support downgrades without wipes. > At > > > >>>>>> least Nick should state in the KIP why we do not support it. > > > >>>>>> > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Bruno > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote: > > > >>>>>>> Thanks for splitting out this KIP. The discussion shows, that > it > > > >>>>>>> is a complex beast by itself, so worth to discuss by its own. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Couple of question / comment: > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be called > > > >>>>>>> by users" -- I would propose to put a guard in place for this, > by > > > >>>>>>> either throwing an exception (preferable) or adding a no-op > > > >>>>>>> implementation (at least for our own stores, by wrapping them > -- > > > >>>>>>> we cannot enforce it for custom stores I assume), and document > > > >>>>>>> this contract explicitly. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 101 adding `.position` to the store: Why do we actually need > > > >>>>>>> this? The KIP says "To ensure consistency with the committed > data > > > >>>>>>> and changelog offsets" but I am not sure if I can follow? Can > you > > > >>>>>>> elaborate why leaving the `.position` file as-is won't work? > > > >>>>>>> > > > >>>>>>>> If it's possible at all, it will need to be done by > > > >>>>>>>> creating temporary StateManagers and StateStores during > > > >>>>>>>> rebalance. I think > > > >>>>>>>> it is possible, and probably not too expensive, but the devil > > > >>>>>>>> will be in > > > >>>>>>>> the detail. > > > >>>>>>> > > > >>>>>>> This sounds like a significant overhead to me. We know that > > > >>>>>>> opening a single RocksDB takes about 500ms, and thus opening > > > >>>>>>> RocksDB to get this information might slow down rebalances > > > >>>>>>> significantly. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 102: It's unclear to me, how `.position` information is added. > > > >>>>>>> The KIP only says: "position offsets will be stored in RocksDB, > > > >>>>>>> in the same column family as the changelog offsets". Do you > > > >>>>>>> intent to add this information to the map passed via > > > >>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`? The > > > >>>>>>> KIP should describe this in more detail. Also, if my assumption > > > >>>>>>> is correct, we might want to rename the parameter and also > have a > > > >>>>>>> better JavaDoc description? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 103: Should we make it mandatory (long-term) that all stores > > > >>>>>>> (including custom stores) manage their offsets internally? > > > >>>>>>> Maintaining both options and thus both code paths puts a burden > > > >>>>>>> on everyone and make the code messy. I would strongly prefer if > > > >>>>>>> we could have mid-term path to get rid of supporting both. -- > > > >>>>>>> For this case, we should deprecate the newly added > > > >>>>>>> `managesOffsets()` method right away, to point out that we > intend > > > >>>>>>> to remove it. If it's mandatory to maintain offsets for stores, > > > >>>>>>> we won't need this method any longer. In memory stores can just > > > >>>>>>> return null from #committedOffset(). > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 104 "downgrading": I think it might be worth to add support for > > > >>>>>>> downgrading w/o the need to wipe stores? Leveraging > > > >>>>>>> `upgrade.from` parameter, we could build a two rolling bounce > > > >>>>>>> downgrade: (1) the new code is started with `upgrade.from` set > to > > > >>>>>>> a lower version, telling the runtime to do the cleanup on > > > >>>>>>> `close()` -- (ie, ensure that all data is written into > > > >>>>>>> `.checkpoint` and `.position` file, and the newly added CL is > > > >>>>>>> deleted). In a second, rolling bounce, the old code would be > able > > > >>>>>>> to open RocksDB. -- I understand that this implies much more > > > >>>>>>> work, but downgrade seems to be common enough, that it might be > > > >>>>>>> worth it? Even if we did not always support this in the past, > we > > > >>>>>>> have the face the fact that KS is getting more and more adopted > > > >>>>>>> and as a more mature product should support this? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> -Matthias > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote: > > > >>>>>>>> Hi all, > > > >>>>>>>> > > > >>>>>>>> How should we proceed here? > > > >>>>>>>> > > > >>>>>>>> 1. with the plain .checkpoint file > > > >>>>>>>> 2. with a way to use the state store interface on unassigned > but > > > >>>>>>>> locally existing task state > > > >>>>>>>> > > > >>>>>>>> While I like option 2, I think option 1 is less risky and will > > > >>>>>>>> give us the benefits of transactional state stores sooner. We > > > >>>>>>>> should consider the interface approach afterwards, though. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> Bruno > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote: > > > >>>>>>>>> Hi Nick and Sophie, > > > >>>>>>>>> > > > >>>>>>>>> I think the task ID is not enough to create a state store > that > > > >>>>>>>>> can read the offsets of non-assigned tasks for lag > computation > > > >>>>>>>>> during rebalancing. The state store also needs the state > > > >>>>>>>>> directory so that it knows where to find the information that > > > >>>>>>>>> it needs to return from changelogOffsets(). > > > >>>>>>>>> > > > >>>>>>>>> In general, I think we should proceed with the plain > > > >>>>>>>>> .checkpoint file for now and iterate back to the state store > > > >>>>>>>>> solution later since it seems it is not that straightforward. > > > >>>>>>>>> Alternatively, Nick could timebox an effort to better > > > >>>>>>>>> understand what would be needed for the state store solution. > > > >>>>>>>>> Nick, let us know your decision. > > > >>>>>>>>> > > > >>>>>>>>> Regarding your question about the state store instance. I am > > > >>>>>>>>> not too familiar with that part of the code, but I think the > > > >>>>>>>>> state store is build when the processor topology is build and > > > >>>>>>>>> the processor topology is build per stream task. So there is > > > >>>>>>>>> one instance of processor topology and state store per stream > > > >>>>>>>>> task. Try to follow the call in [1]. > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Bruno > > > >>>>>>>>> > > > >>>>>>>>> [1] > > > >>>>>>>>> > > > > > > https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153 > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote: > > > >>>>>>>>>> That does make sense. The one thing I can't figure out is > how > > > >>>>>>>>>> per-Task > > > >>>>>>>>>> StateStore instances are constructed. > > > >>>>>>>>>> > > > >>>>>>>>>> It looks like we construct one StateStore instance for the > > > >>>>>>>>>> whole Topology > > > >>>>>>>>>> (in InternalTopologyBuilder), and pass that into > > > >>>>>>>>>> ProcessorStateManager (via > > > >>>>>>>>>> StateManagerUtil) for each Task, which then initializes it. > > > >>>>>>>>>> > > > >>>>>>>>>> This can't be the case though, otherwise multiple partitions > > > >>>>>>>>>> of the same > > > >>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore > > > >>>>>>>>>> instance, which > > > >>>>>>>>>> they don't. > > > >>>>>>>>>> > > > >>>>>>>>>> What am I missing? > > > >>>>>>>>>> > > > >>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman > > > >>>>>>>>>> <sop...@responsive.dev> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> I don't think we need to *require* a constructor accept the > > > >>>>>>>>>>> TaskId, but we > > > >>>>>>>>>>> would definitely make sure that the RocksDB state store > > > >>>>>>>>>>> changes its > > > >>>>>>>>>>> constructor to one that accepts the TaskID (which we can do > > > >>>>>>>>>>> without > > > >>>>>>>>>>> deprecation since its an internal API), and custom state > > > >>>>>>>>>>> stores can just > > > >>>>>>>>>>> decide for themselves whether they want to opt-in/use the > > > >>>>>>>>>>> TaskId param > > > >>>>>>>>>>> or not. I mean custom state stores would have to opt-in > > > >>>>>>>>>>> anyways by > > > >>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and the > > only > > > >>>>>>>>>>> reason to do that would be to have created a constructor > that > > > >>>>>>>>>>> accepts > > > >>>>>>>>>>> a TaskId > > > >>>>>>>>>>> > > > >>>>>>>>>>> Just to be super clear about the proposal, this is what I > had > > > >>>>>>>>>>> in mind. > > > >>>>>>>>>>> It's actually fairly simple and wouldn't add much to the > > > >>>>>>>>>>> scope of the > > > >>>>>>>>>>> KIP (I think -- if it turns out to be more complicated than > > > >>>>>>>>>>> I'm assuming, > > > >>>>>>>>>>> we should definitely do whatever has the smallest LOE to > get > > > >>>>>>>>>>> this done > > > >>>>>>>>>>> > > > >>>>>>>>>>> Anyways, the (only) public API changes would be to add this > > new > > > >>>>>>>>>>> method to the StoreSupplier API: > > > >>>>>>>>>>> > > > >>>>>>>>>>> default T get(final TaskId taskId) { > > > >>>>>>>>>>> return get(); > > > >>>>>>>>>>> } > > > >>>>>>>>>>> > > > >>>>>>>>>>> We can decide whether or not to deprecate the old #get but > > > >>>>>>>>>>> it's not > > > >>>>>>>>>>> really necessary and might cause a lot of turmoil, so I'd > > > >>>>>>>>>>> personally > > > >>>>>>>>>>> say we just leave both APIs in place. > > > >>>>>>>>>>> > > > >>>>>>>>>>> And that's it for public API changes! Internally, we would > > > >>>>>>>>>>> just adapt > > > >>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement this > > new > > > >>>>>>>>>>> API. So for example with the > > RocksDBKeyValueBytesStoreSupplier, > > > >>>>>>>>>>> we just add > > > >>>>>>>>>>> > > > >>>>>>>>>>> @Override > > > >>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId > taskId) > > { > > > >>>>>>>>>>> return returnTimestampedStore ? > > > >>>>>>>>>>> new RocksDBTimestampedStore(name, metricsScope(), > > > >>>>>>>>>>> taskId) : > > > >>>>>>>>>>> new RocksDBStore(name, metricsScope(), taskId); > > > >>>>>>>>>>> } > > > >>>>>>>>>>> > > > >>>>>>>>>>> And of course add the TaskId parameter to each of the > actual > > > >>>>>>>>>>> state store constructors returned here. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Does that make sense? It's entirely possible I'm missing > > > >>>>>>>>>>> something > > > >>>>>>>>>>> important here, but I think this would be a pretty small > > > >>>>>>>>>>> addition that > > > >>>>>>>>>>> would solve the problem you mentioned earlier while also > > being > > > >>>>>>>>>>> useful to anyone who uses custom state stores. > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford > > > >>>>>>>>>>> <nick.telf...@gmail.com> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Hi Sophie, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Interesting idea! Although what would that mean for the > > > >>>>>>>>>>>> StateStore > > > >>>>>>>>>>>> interface? Obviously we can't require that the constructor > > > >>>>>>>>>>>> take the > > > >>>>>>>>>>> TaskId. > > > >>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we > > > >>>>>>>>>>>> over-complicating > > > >>>>>>>>>>> it? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Nick > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman > > > >>>>>>>>>>>> <sop...@responsive.dev > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Somewhat minor point overall, but it actually drives me > > > >>>>>>>>>>>>> crazy that you > > > >>>>>>>>>>>>> can't get access to the taskId of a StateStore until > #init > > > >>>>>>>>>>>>> is called. > > > >>>>>>>>>>>> This > > > >>>>>>>>>>>>> has caused me a huge headache personally (since the same > is > > > >>>>>>>>>>>>> true for > > > >>>>>>>>>>>>> processors and I was trying to do something that's > probably > > > >>>>>>>>>>>>> too hacky > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> actually complain about here lol) > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive and > > > >>>>>>>>>>>>> pass along the > > > >>>>>>>>>>>>> taskId when creating a new store? Presumably by adding a > > > >>>>>>>>>>>>> new version of > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>> #get method that takes in a taskId parameter? We can have > > > >>>>>>>>>>>>> it default to > > > >>>>>>>>>>>>> invoking the old one for compatibility reasons and it > > > >>>>>>>>>>>>> should be > > > >>>>>>>>>>>> completely > > > >>>>>>>>>>>>> safe to tack on. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier, but > > > that's > > > >>>>>>>>>>> definitely > > > >>>>>>>>>>>>> outside the scope of this KIP > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford > > > >>>>>>>>>>>>> <nick.telf...@gmail.com> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On further thought, it's clear that this can't work for > > > >>>>>>>>>>>>>> one simple > > > >>>>>>>>>>>>> reason: > > > >>>>>>>>>>>>>> StateStores don't know their associated TaskId (and > hence, > > > >>>>>>>>>>>>>> their > > > >>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore, > > > >>>>>>>>>>>>>> committedOffset() > > > >>>>>>>>>>>> can't > > > >>>>>>>>>>>>>> be called before init(), unless we also added a > > > >>>>>>>>>>>>>> StateStoreContext > > > >>>>>>>>>>>>> argument > > > >>>>>>>>>>>>>> to committedOffset(), which I think might be trying to > > > >>>>>>>>>>>>>> shoehorn too > > > >>>>>>>>>>>> much > > > >>>>>>>>>>>>>> into committedOffset(). > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I still don't like the idea of the Streams engine > > > >>>>>>>>>>>>>> maintaining the > > > >>>>>>>>>>> cache > > > >>>>>>>>>>>>> of > > > >>>>>>>>>>>>>> changelog offsets independently of stores, mostly > because > > > >>>>>>>>>>>>>> of the > > > >>>>>>>>>>>>>> maintenance burden of the code duplication, but it looks > > > >>>>>>>>>>>>>> like we'll > > > >>>>>>>>>>>> have > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>>> live with it. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Unless you have any better ideas? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>> Nick > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford > > > >>>>>>>>>>>>>> <nick.telf...@gmail.com> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hi Bruno, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Immediately after I sent my response, I looked at the > > > >>>>>>>>>>>>>>> codebase and > > > >>>>>>>>>>>> came > > > >>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it will > > > >>>>>>>>>>>>>>> need to be > > > >>>>>>>>>>> done > > > >>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>> creating temporary StateManagers and StateStores during > > > >>>>>>>>>>>>>>> rebalance. > > > >>>>>>>>>>> I > > > >>>>>>>>>>>>>> think > > > >>>>>>>>>>>>>>> it is possible, and probably not too expensive, but the > > > >>>>>>>>>>>>>>> devil will > > > >>>>>>>>>>> be > > > >>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>> the detail. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I'll try to find some time to explore the idea to see > if > > > >>>>>>>>>>>>>>> it's > > > >>>>>>>>>>>> possible > > > >>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>> report back, because we'll need to determine this > before > > > >>>>>>>>>>>>>>> we can > > > >>>>>>>>>>> vote > > > >>>>>>>>>>>> on > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> KIP. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>> Nick > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna > > > >>>>>>>>>>>>>>> <cado...@apache.org> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi Nick, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly! > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> 2. > > > >>>>>>>>>>>>>>>> Some thoughts on your proposal. > > > >>>>>>>>>>>>>>>> State managers (and state stores) are parts of tasks. > If > > > >>>>>>>>>>>>>>>> the task > > > >>>>>>>>>>> is > > > >>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To get > > > >>>>>>>>>>>>>>>> the offsets > > > >>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>> your approach, we would need to either create kind of > > > >>>>>>>>>>>>>>>> inactive > > > >>>>>>>>>>> tasks > > > >>>>>>>>>>>>>>>> besides active and standby tasks or store and manage > > state > > > >>>>>>>>>>> managers > > > >>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>> non-assigned tasks differently than the state managers > > > >>>>>>>>>>>>>>>> of assigned > > > >>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that removes > > > >>>>>>>>>>>>>>>> unassigned > > > >>>>>>>>>>> task > > > >>>>>>>>>>>>>>>> directories needs to concurrently delete those > inactive > > > >>>>>>>>>>>>>>>> tasks or > > > >>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This > seems > > > >>>>>>>>>>>>>>>> all quite > > > >>>>>>>>>>>>> messy > > > >>>>>>>>>>>>>>>> to me. > > > >>>>>>>>>>>>>>>> Could we create those state managers (or state stores) > > > >>>>>>>>>>>>>>>> for locally > > > >>>>>>>>>>>>>>>> existing but unassigned tasks on demand when > > > >>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or have a > > > >>>>>>>>>>>>>>>> different > > > >>>>>>>>>>>>>>>> encapsulation for the unused task directories? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>> Bruno > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote: > > > >>>>>>>>>>>>>>>>> Hi Bruno, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Thanks for the review! > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> 1, 4, 5. > > > >>>>>>>>>>>>>>>>> Done > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> 3. > > > >>>>>>>>>>>>>>>>> You're right. I've removed the offending paragraph. I > > had > > > >>>>>>>>>>>> originally > > > >>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in KIP-892. > > > >>>>>>>>>>>>>>>>> But it's > > > >>>>>>>>>>>>>>>> difficult to > > > >>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892 > > transaction > > > >>>>>>>>>>> buffers. > > > >>>>>>>>>>>>>>>> Instead, > > > >>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc when > > > >>>>>>>>>>>>>>>>> KIP-892 > > > >>>>>>>>>>> lands. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> 2. > > > >>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that was > > > >>>>>>>>>>>>> (significantly) > > > >>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My > prototype > > > >>>>>>>>>>>>>>>>> currently > > > >>>>>>>>>>>>>>>> maintains > > > >>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint, but > > > >>>>>>>>>>>>>>>>> doing so > > > >>>>>>>>>>>>> becomes > > > >>>>>>>>>>>>>>>> very > > > >>>>>>>>>>>>>>>>> messy. My intent with this change was to try to > better > > > >>>>>>>>>>> encapsulate > > > >>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that can > > > >>>>>>>>>>>>>>>>> cheaply > > > >>>>>>>>>>>>> provide > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> offsets stored directly in them without needing to > > > >>>>>>>>>>>>>>>>> duplicate > > > >>>>>>>>>>> them > > > >>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>> cache. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> It's clear some more work is needed here to better > > > >>>>>>>>>>>>>>>>> encapsulate > > > >>>>>>>>>>>> this. > > > >>>>>>>>>>>>>> My > > > >>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but don't > > > >>>>>>>>>>> initialize* > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> StateManager and StateStores for every Task directory > > > >>>>>>>>>>>>>>>>> on-disk? > > > >>>>>>>>>>>> That > > > >>>>>>>>>>>>>>>> should > > > >>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to > > > >>>>>>>>>>>>>>>>> query the > > > >>>>>>>>>>>> offsets > > > >>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If the > > > >>>>>>>>>>> StateManager > > > >>>>>>>>>>>>>> (aka. > > > >>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves too > > > >>>>>>>>>>>>>>>>> expensive > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> hold > > > >>>>>>>>>>>>>>>> open > > > >>>>>>>>>>>>>>>>> for closed stores, we could always have a > > > >>>>>>>>>>>>>>>>> "StubStateManager" in > > > >>>>>>>>>>>> its > > > >>>>>>>>>>>>>>>> place, > > > >>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing > else? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> IDK, what do you think? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Nick > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna > > > >>>>>>>>>>>>>>>>> <cado...@apache.org> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Hi Nick, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892! > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Here a couple of comments/questions: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 1. > > > >>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline which > > > >>>>>>>>>>>>>>>>>> says to not > > > >>>>>>>>>>>> use > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could > you > > > >>>>>>>>>>>>>>>>>> please > > > >>>>>>>>>>>> change > > > >>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()? > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 2. > > > >>>>>>>>>>>>>>>>>> It is not clear to me how > > > TaskManager#getTaskOffsetSums() > > > >>>>>>>>>>> should > > > >>>>>>>>>>>>> read > > > >>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own but > > > >>>>>>>>>>>>>>>>>> that have a > > > >>>>>>>>>>>>> state > > > >>>>>>>>>>>>>>>>>> directory on the Streams client by calling > > > >>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread does > > > >>>>>>>>>>>>>>>>>> not own a > > > >>>>>>>>>>>> task > > > >>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>>>> does also not create any state stores for the task, > > > >>>>>>>>>>>>>>>>>> which means > > > >>>>>>>>>>>>> there > > > >>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>> no state store on which to call > getCommittedOffsets(). > > > >>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint file > is > > > >>>>>>>>>>>>>>>>>> written > > > >>>>>>>>>>>> for > > > >>>>>>>>>>>>>> all > > > >>>>>>>>>>>>>>>>>> state stores on close -- not only for the > RocksDBStore > > > >>>>>>>>>>>>>>>>>> -- and > > > >>>>>>>>>>>> that > > > >>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>>> checkpoint file is read in > > > >>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> tasks > > > >>>>>>>>>>>>>>>>>> that have a state directory on the client but are > not > > > >>>>>>>>>>>>>>>>>> currently > > > >>>>>>>>>>>>>>>> assigned > > > >>>>>>>>>>>>>>>>>> to any stream thread of the Streams client. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 3. > > > >>>>>>>>>>>>>>>>>> In the javadocs for commit() you write > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or since > > > >>>>>>>>>>>>> init(StateStore) > > > >>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a > restart." > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> This is only true for a clean close before the > > > >>>>>>>>>>>>>>>>>> restart, isn't > > > >>>>>>>>>>> it? > > > >>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka Streams > > > >>>>>>>>>>>>>>>>>> cannot > > > >>>>>>>>>>>>> guarantee > > > >>>>>>>>>>>>>>>>>> that the in-memory structures of the state store > (e.g. > > > >>>>>>>>>>>>>>>>>> memtable > > > >>>>>>>>>>>> in > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records and > > the > > > >>>>>>>>>>>> committed > > > >>>>>>>>>>>>>>>>>> offsets are persisted. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 4. > > > >>>>>>>>>>>>>>>>>> The wrapper that provides the legacy checkpointing > > > >>>>>>>>>>>>>>>>>> behavior is > > > >>>>>>>>>>>>>> actually > > > >>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from the > > > >>>>>>>>>>>>>>>>>> KIP, but > > > >>>>>>>>>>>> still > > > >>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior will be > > > >>>>>>>>>>>>>>>>>> supported > > > >>>>>>>>>>>> when > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> state store does not manage the checkpoints. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 5. > > > >>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the > tags, > > > >>>>>>>>>>>>>>>>>> and the > > > >>>>>>>>>>>>>> recording > > > >>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or KIP-444. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>> Bruno > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote: > > > >>>>>>>>>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out > the > > > >>>>>>>>>>>>>>>>>>> "Atomic > > > >>>>>>>>>>>>>>>>>> Checkpointing" > > > >>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics for > > > >>>>>>>>>>>>>>>>>>> StateStores, > > > >>>>>>>>>>>>> into > > > >>>>>>>>>>>>>>>> its > > > >>>>>>>>>>>>>>>>>> own > > > >>>>>>>>>>>>>>>>>>> KIP > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes > outlined > > > in > > > >>>>>>>>>>>> KIP-1035, > > > >>>>>>>>>>>>>>>> these > > > >>>>>>>>>>>>>>>>>>> changes were always the most contentious part, and > > > >>>>>>>>>>>>>>>>>>> continued > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> spur > > > >>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been > > > >>>>>>>>>>>>>>>>>>> removed from > > > >>>>>>>>>>>>>> KIP-892, > > > >>>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to > > > >>>>>>>>>>>>>>>>>>> KIP-892 in > > > >>>>>>>>>>> their > > > >>>>>>>>>>>>>>>> place. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this set > of > > > >>>>>>>>>>>>>>>>>>> changes, > > > >>>>>>>>>>> we > > > >>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>>> deliver something that we're all happy with. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>>>>>> Nick > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > > > >