Hi everyone, I didn't spot this before, but it looks like the API of KeyValueStoreTestDriver will need to be updated to change the nomenclature from "flushed" to "committed":
numFlushedEntryRemoved() -> numCommittedEntryRemoved() numFlushedEntryStored() -> numCommittedEntryStored() flushedEntryRemoved(K) -> committedEntryRemoved(K) flushedEntryStored(K) -> committedEntryStored(K) The old methods will obviously be marked as @Deprecated. Any objections before I add this to the KIP? Regards, Nick On Wed, 29 May 2024 at 11:20, Nick Telford <nick.telf...@gmail.com> wrote: > I've updated the KIP with the following: > > - Deprecation of StateStore#managesOffsets > - Change StateStore#commit to throw UnsupportedOperationException when > called from a Processor (via AbstractReadWriteDecorator) > - Updated consumer rebalance lag computation strategy > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata> > based on our Meet discussion > - I've added a bit more detail here than we discussed, in > particular around how we handle the offsets for tasks assigned to our > local > instance, and how we handle offsets when Tasks are closed/revoked. > - Improved downgrade behaviour > - Note: users that don't downgrade with upgrade.from will still get > the wipe-and-restore behaviour by-default. > > I believe this covers all the outstanding changes that were requested. > Please let me know if I've missed anything or you think further changes are > needed. > > Regards, > Nick > > On Wed, 29 May 2024 at 09:28, Nick Telford <nick.telf...@gmail.com> wrote: > >> Hi everyone, >> >> Sorry I haven't got around to updating the KIP yet. Now that I've wrapped >> up KIP-989, I'm going to be working on 1035 starting today. >> >> I'll update the KIP first, and then call a vote. >> >> Regards, >> Nick >> >> On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org> wrote: >> >>> Totally agree on moving forward and starting the VOTE! >>> >>> However, the KIP should be updated with the new info before starting the >>> VOTE. >>> >>> Best, >>> Bruno >>> >>> On 5/29/24 2:36 AM, Matthias J. Sax wrote: >>> > Sounds like a good plan. -- I think we are still wrapping up 3.8 >>> > release, but would also like to move forward with with one. >>> > >>> > Should we start a VOTE? >>> > >>> > For merging PRs we need to wait after code freeze, and 3.8 branch was >>> > but. But we could start reviewing PRs before this already. >>> > >>> > >>> > -Matthias >>> > >>> > On 5/17/24 3:05 AM, Nick Telford wrote: >>> >> Hi everyone, >>> >> >>> >> As discussed on the Zoom call, we're going to handle rebalance >>> >> meta-data by: >>> >> >>> >> - On start-up, Streams will open each store and read its changelog >>> >> offsets >>> >> into an in-memory cache. This cache will be shared among all >>> >> StreamThreads. >>> >> - On rebalance, the cache will be consulted for Task offsets for any >>> Task >>> >> that is not active on any instance-local StreamThreads. If the Task is >>> >> active on *any* instance-local StreamThread, we will report the Task >>> >> lag as >>> >> "up to date" (i.e. -1), because we know that the local state is >>> currently >>> >> up-to-date. >>> >> >>> >> We will avoid caching offsets across restarts in the legacy >>> ".checkpoint" >>> >> file, so that we can eliminate the logic for handling this class. If >>> >> performance of opening/closing many state stores is poor, we can >>> >> parallelise it by forking off a thread for each Task directory when >>> >> reading >>> >> the offsets. >>> >> >>> >> I'll update the KIP later today to reflect this design, but I will >>> try to >>> >> keep it high-level, so that the exact implementation can vary. >>> >> >>> >> Regards, >>> >> >>> >> Nick >>> >> >>> >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman < >>> sop...@responsive.dev> >>> >> wrote: >>> >> >>> >>> 103: I like the idea of immediately deprecating #managesOffsets and >>> >>> aiming >>> >>> to make offset management mandatory in the long run. I assume we >>> >>> would also >>> >>> log a warning for any custom stores that return "false" from this >>> >>> method to >>> >>> encourage custom store implementations to start doing so? My only >>> >>> question/concern is that if we want folks to start managing their own >>> >>> offsets then we should make this transition easy for them, perhaps by >>> >>> exposing some public utility APIs for things that are currently >>> >>> handled by >>> >>> Kafka Streams such as reading/writing checkpoint files. Maybe it >>> >>> would be >>> >>> useful to include a small example in the KIP of what it would >>> >>> actually mean >>> >>> to "manage your own offsets" -- I know (all too well) that plugging >>> in >>> >>> custom storage implementations is not easy and most people who do >>> >>> this are >>> >>> probably fairly advanced users, but offset management will be a >>> >>> totally new >>> >>> ballgame to most people people and this kind of feels like throwing >>> them >>> >>> off the deep end. We should at least provide a lifejacket via some >>> >>> kind of >>> >>> utility API and/or example >>> >>> >>> >>> 200. There's been a lot of back and forth on the rebalance >>> metadata/task >>> >>> lag computation question, so forgive me if I missed any part of >>> this, >>> >>> but I >>> >>> think we've landed at the right idea here. To summarize: the "tl;dr" >>> >>> explanation is that we'll write the checkpoint file only on close >>> and >>> >>> will >>> >>> account for hard-crash scenarios by opening up the stores on startup >>> and >>> >>> writing a checkpoint file for any missing tasks. Does that sound >>> about >>> >>> right? >>> >>> >>> >>> A few clarifications: >>> >>> I think we're all more or less on the same page here but just to be >>> >>> absolutely clear, the task lags for each task directory found on >>> disk >>> >>> will >>> >>> be reported by only one of the StreamThreads, and each StreamThread >>> will >>> >>> report lags only for tasks that it already owns or are not assigned >>> >>> to any >>> >>> other StreamThread in the client. In other words, we only need to >>> get >>> >>> the >>> >>> task lag for completely unassigned/unlocked tasks, which means if >>> >>> there is >>> >>> a checkpoint file at all then it must be up-to-date, because there >>> is no >>> >>> other StreamThread actively writing to that state store (if so then >>> only >>> >>> that StreamThread would report lag for that particular task). >>> >>> >>> >>> This still leaves the "no checkpoint at all" case which as previously >>> >>> mentioned can occur after a hard-crash. Luckily we only have to >>> worry >>> >>> about this once, after starting up again following said hard crash. >>> >>> We can >>> >>> simply open up each of the state stores before ever joining the >>> >>> group, get >>> >>> the offsets from rocksdb, and write them to a new checkpoint file. >>> After >>> >>> that, we can depend on the checkpoints written at close and won't >>> >>> have to >>> >>> open up any stores that aren't already assigned for the reasons laid >>> >>> out in >>> >>> the paragraph above. >>> >>> >>> >>> As for the specific mechanism and which thread-does-what, since >>> there >>> >>> were >>> >>> some questions, this is how I'm imagining the process: >>> >>> >>> >>> 1. The general idea is that we simply go through each task >>> >>> directories >>> >>> with state but no checkpoint file and open the StateStore, call >>> >>> #committedOffset, and then write it to the checkpoint file. We >>> >>> can then >>> >>> close these stores and let things proceed as normal. >>> >>> 2. This only has to happen once, during startup, but we have two >>> >>> options: >>> >>> 1. Do this from KafkaStreams#start, ie before we even create >>> the >>> >>> StreamThreads >>> >>> 2. Do this from StreamThread#start, following a similar >>> >>> lock-based >>> >>> approach to the one used #computeTaskLags, where each >>> >>> StreamThread >>> >>> just >>> >>> makes a pass over the task directories on disk and attempts >>> to >>> >>> lock >>> >>> them >>> >>> one by one. If they obtain the lock, check whether there is >>> state >>> >>> but no >>> >>> checkpoint, and write the checkpoint if needed. If it can't >>> grab >>> >>> the lock, >>> >>> then we know one of the other StreamThreads must be handling >>> the >>> >>> checkpoint >>> >>> file for that task directory, and we can move on. >>> >>> >>> >>> Don't really feel too strongly about which approach is best, doing >>> >>> it in >>> >>> KafkaStreams#start is certainly the most simple while doing it in the >>> >>> StreamThread's startup is more efficient. If we're worried about >>> >>> adding too >>> >>> much weight to KafkaStreams#start then the 2nd option is probably >>> best, >>> >>> though slightly more complicated. >>> >>> >>> >>> Thoughts? >>> >>> >>> >>> On Tue, May 14, 2024 at 10:02 AM Nick Telford < >>> nick.telf...@gmail.com> >>> >>> wrote: >>> >>> >>> >>>> Hi everyone, >>> >>>> >>> >>>> Sorry for the delay in replying. I've finally now got some time to >>> work >>> >>> on >>> >>>> this. >>> >>>> >>> >>>> Addressing Matthias's comments: >>> >>>> >>> >>>> 100. >>> >>>> Good point. As Bruno mentioned, there's already >>> >>> AbstractReadWriteDecorator >>> >>>> which we could leverage to provide that protection. I'll add >>> details on >>> >>>> this to the KIP. >>> >>>> >>> >>>> 101,102. >>> >>>> It looks like these points have already been addressed by Bruno. >>> Let me >>> >>>> know if anything here is still unclear or you feel needs to be >>> detailed >>> >>>> more in the KIP. >>> >>>> >>> >>>> 103. >>> >>>> I'm in favour of anything that gets the old code removed sooner, but >>> >>>> wouldn't deprecating an API that we expect (some) users to implement >>> >>> cause >>> >>>> problems? >>> >>>> I'm thinking about implementers of custom StateStores, as they may >>> be >>> >>>> confused by managesOffsets() being deprecated, especially since they >>> >>> would >>> >>>> have to mark their implementation as @Deprecated in order to avoid >>> >>> compile >>> >>>> warnings. >>> >>>> If deprecating an API *while it's still expected to be implemented* >>> is >>> >>>> something that's generally done in the project, then I'm happy to >>> do so >>> >>>> here. >>> >>>> >>> >>>> 104. >>> >>>> I think this is technically possible, but at the cost of >>> considerable >>> >>>> additional code to maintain. Would we ever have a pathway to remove >>> >>>> this >>> >>>> downgrade code in the future? >>> >>>> >>> >>>> >>> >>>> Regarding rebalance metadata: >>> >>>> Opening all stores on start-up to read and cache their offsets is an >>> >>>> interesting idea, especially if we can avoid re-opening the stores >>> once >>> >>> the >>> >>>> Tasks have been assigned. Scalability shouldn't be too much of a >>> >>>> problem, >>> >>>> because typically users have a fairly short state.cleanup.delay, so >>> the >>> >>>> number of on-disk Task directories should rarely exceed the number >>> of >>> >>> Tasks >>> >>>> previously assigned to that instance. >>> >>>> An advantage of this approach is that it would also simplify >>> StateStore >>> >>>> implementations, as they would only need to guarantee that committed >>> >>>> offsets are available when the store is open. >>> >>>> >>> >>>> I'll investigate this approach this week for feasibility and report >>> >>>> back. >>> >>>> >>> >>>> I think that covers all the outstanding feedback, unless I missed >>> >>> anything? >>> >>>> >>> >>>> Regards, >>> >>>> Nick >>> >>>> >>> >>>> On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org> >>> wrote: >>> >>>> >>> >>>>> Hi Matthias, >>> >>>>> >>> >>>>> I see what you mean. >>> >>>>> >>> >>>>> To sum up: >>> >>>>> >>> >>>>> With this KIP the .checkpoint file is written when the store >>> closes. >>> >>>>> That is when: >>> >>>>> 1. a task moves away from Kafka Streams client >>> >>>>> 2. Kafka Streams client shuts down >>> >>>>> >>> >>>>> A Kafka Streams client needs the information in the .checkpoint >>> file >>> >>>>> 1. on startup because it does not have any open stores yet. >>> >>>>> 2. during rebalances for non-empty state directories of tasks that >>> are >>> >>>>> not assigned to the Kafka Streams client. >>> >>>>> >>> >>>>> With hard crashes, i.e., when the Streams client is not able to >>> close >>> >>>>> its state stores and write the .checkpoint file, the .checkpoint >>> file >>> >>>>> might be quite stale. That influences the next rebalance after >>> >>>>> failover >>> >>>>> negatively. >>> >>>>> >>> >>>>> >>> >>>>> My conclusion is that Kafka Streams either needs to open the state >>> >>>>> stores at start up or we write the checkpoint file more often. >>> >>>>> >>> >>>>> Writing the .checkpoint file during processing more often without >>> >>>>> controlling the flush to disk would work. However, Kafka Streams >>> would >>> >>>>> checkpoint offsets that are not yet persisted on disk by the state >>> >>>>> store. That is with a hard crash the offsets in the .checkpoint >>> file >>> >>>>> might be larger than the offsets checkpointed in the state store. >>> That >>> >>>>> might not be a problem if Kafka Streams uses the .checkpoint file >>> only >>> >>>>> to compute the task lag. The downside is that it makes the >>> managing of >>> >>>>> checkpoints more complex because now we have to maintain two >>> >>>>> checkpoints: one for restoration and one for computing the task >>> lag. >>> >>>>> I think we should explore the option where Kafka Streams opens the >>> >>> state >>> >>>>> stores at start up to get the offsets. >>> >>>>> >>> >>>>> I also checked when Kafka Streams needs the checkpointed offsets to >>> >>>>> compute the task lag during a rebalance. Turns out Kafka Streams >>> needs >>> >>>>> them before sending the join request. Now, I am wondering if >>> opening >>> >>> the >>> >>>>> state stores of unassigned tasks whose state directory exists >>> locally >>> >>> is >>> >>>>> actually such a big issue due to the expected higher latency since >>> it >>> >>>>> happens actually before the Kafka Streams client joins the >>> rebalance. >>> >>>>> >>> >>>>> Best, >>> >>>>> Bruno >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> On 5/4/24 12:05 AM, Matthias J. Sax wrote: >>> >>>>>> That's good questions... I could think of a few approaches, but I >>> >>> admit >>> >>>>>> it might all be a little bit tricky to code up... >>> >>>>>> >>> >>>>>> However if we don't solve this problem, I think this KIP does not >>> >>>> really >>> >>>>>> solve the core issue we are facing? In the end, if we rely on the >>> >>>>>> `.checkpoint` file to compute a task assignment, but the >>> >>> `.checkpoint` >>> >>>>>> file can be arbitrary stale after a crash because we only write it >>> >>> on a >>> >>>>>> clean close, there would be still a huge gap that this KIP does >>> not >>> >>>>> close? >>> >>>>>> >>> >>>>>> For the case in which we keep the checkpoint file, this KIP would >>> >>> still >>> >>>>>> help for "soft errors" in which KS can recover, and roll back the >>> >>>> store. >>> >>>>>> A significant win for sure. -- But hard crashes would still be an >>> >>>>>> problem? We might assign tasks to "wrong" instance, ie, which are >>> not >>> >>>>>> most up to date, as the checkpoint information could be very >>> >>> outdated? >>> >>>>>> Would we end up with a half-baked solution? Would this be good >>> enough >>> >>>> to >>> >>>>>> justify the introduced complexity? In the, for soft failures it's >>> >>> still >>> >>>>>> a win. Just want to make sure we understand the limitations and >>> make >>> >>> an >>> >>>>>> educated decision. >>> >>>>>> >>> >>>>>> Or do I miss something? >>> >>>>>> >>> >>>>>> >>> >>>>>> -Matthias >>> >>>>>> >>> >>>>>> On 5/3/24 10:20 AM, Bruno Cadonna wrote: >>> >>>>>>> Hi Matthias, >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> 200: >>> >>>>>>> I like the idea in general. However, it is not clear to me how >>> the >>> >>>>>>> behavior should be with multiple stream threads in the same Kafka >>> >>>>>>> Streams client. What stream thread opens which store? How can a >>> >>> stream >>> >>>>>>> thread pass an open store to another stream thread that got the >>> >>>>>>> corresponding task assigned? How does a stream thread know that a >>> >>> task >>> >>>>>>> was not assigned to any of the stream threads of the Kafka >>> Streams >>> >>>>>>> client? I have the feeling we should just keep the .checkpoint >>> file >>> >>> on >>> >>>>>>> close for now to unblock this KIP and try to find a solution to >>> get >>> >>>>>>> totally rid of it later. >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> Best, >>> >>>>>>> Bruno >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> On 5/3/24 6:29 PM, Matthias J. Sax wrote: >>> >>>>>>>> 101: Yes, but what I am saying is, that we don't need to flush >>> the >>> >>>>>>>> .position file to disk periodically, but only maintain it in >>> main >>> >>>>>>>> memory, and only write it to disk on close() to preserve it >>> across >>> >>>>>>>> restarts. This way, it would never be ahead, but might only lag? >>> >>> But >>> >>>>>>>> with my better understanding about (102) it might be mood >>> anyway... >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> 102: Thanks for clarifying. Looked into the code now. Makes >>> sense. >>> >>>>>>>> Might be something to be worth calling out explicitly in the KIP >>> >>>>>>>> writeup. -- Now that I realize that the position is tracked >>> inside >>> >>>>>>>> the store (not outside as the changelog offsets) it makes much >>> more >>> >>>>>>>> sense to pull position into RocksDB itself. In the end, it's >>> >>> actually >>> >>>>>>>> a "store implementation" detail how it tracks the position (and >>> >>> kinda >>> >>>>>>>> leaky abstraction currently, that we re-use the checkpoint file >>> >>>>>>>> mechanism to track it and flush to disk). >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> 200: I was thinking about this a little bit more, and maybe it's >>> >>> not >>> >>>>>>>> too bad? When KS starts up, we could upon all stores we find on >>> >>> local >>> >>>>>>>> disk pro-actively, and keep them all open until the first >>> rebalance >>> >>>>>>>> finishes: For tasks we get assigned, we hand in the already >>> opened >>> >>>>>>>> store (this would amortize the cost to open the store before the >>> >>>>>>>> rebalance) and for non-assigned tasks, we know the offset >>> >>> information >>> >>>>>>>> won't change and we could just cache it in-memory for later >>> reuse >>> >>>>>>>> (ie, next rebalance) and close the store to free up resources? >>> -- >>> >>>>>>>> Assuming that we would get a large percentage of opened stores >>> >>>>>>>> assigned as tasks anyway, this could work? >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> -Matthias >>> >>>>>>>> >>> >>>>>>>> On 5/3/24 1:29 AM, Bruno Cadonna wrote: >>> >>>>>>>>> Hi Matthias, >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> 101: >>> >>>>>>>>> Let's assume a RocksDB store, but I think the following might >>> be >>> >>>>>>>>> true also for other store implementations. With this KIP, if >>> Kafka >>> >>>>>>>>> Streams commits the offsets, the committed offsets will be >>> stored >>> >>> in >>> >>>>>>>>> an in-memory data structure (i.e. the memtable) and stay there >>> >>> until >>> >>>>>>>>> RocksDB decides that it is time to persist its in-memory data >>> >>>>>>>>> structure. If Kafka Streams writes its position to the >>> .position >>> >>>>>>>>> file during a commit and a crash happens before RocksDB persist >>> >>> the >>> >>>>>>>>> memtable then the position in the .position file is ahead of >>> the >>> >>>>>>>>> persisted offset. If IQ is done between the crash and the state >>> >>>>>>>>> store fully restored the changelog, the position might tell IQ >>> >>> that >>> >>>>>>>>> the state store is more up-to-date than it actually is. >>> >>>>>>>>> In contrast, if Kafka Streams handles persisting positions the >>> >>> same >>> >>>>>>>>> as persisting offset, the position should always be consistent >>> >>> with >>> >>>>>>>>> the offset, because they are persisted together. >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> 102: >>> >>>>>>>>> I am confused about your confusion which tells me that we are >>> >>>>>>>>> talking about two different things. >>> >>>>>>>>> You asked >>> >>>>>>>>> >>> >>>>>>>>> "Do you intent to add this information [i.e. position] to the >>> map >>> >>>>>>>>> passed via commit(final Map<TopicPartition, Long> >>> >>>> changelogOffsets)?" >>> >>>>>>>>> >>> >>>>>>>>> and with what I wrote I meant that we do not need to pass the >>> >>>>>>>>> position into the implementation of the StateStore interface >>> since >>> >>>>>>>>> the position is updated within the implementation of the >>> >>> StateStore >>> >>>>>>>>> interface (e.g. RocksDBStore [1]). My statement describes the >>> >>>>>>>>> behavior now, not the change proposed in this KIP, so it does >>> not >>> >>>>>>>>> contradict what is stated in the KIP. >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> 200: >>> >>>>>>>>> This is about Matthias' main concern about rebalance metadata. >>> >>>>>>>>> As far as I understand the KIP, Kafka Streams will only use the >>> >>>>>>>>> .checkpoint files to compute the task lag for unassigned tasks >>> >>> whose >>> >>>>>>>>> state is locally available. For assigned tasks, it will use the >>> >>>>>>>>> offsets managed by the open state store. >>> >>>>>>>>> >>> >>>>>>>>> Best, >>> >>>>>>>>> Bruno >>> >>>>>>>>> >>> >>>>>>>>> [1] >>> >>>>>>>>> >>> >>>>> >>> >>>> >>> >>> >>> https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397 >>> >>>>>>>>> >>> >>>>>>>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote: >>> >>>>>>>>>> Thanks Bruno. >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> 101: I think I understand this better now. But just want to >>> make >>> >>>>>>>>>> sure I do. What do you mean by "they can diverge" and >>> "Recovering >>> >>>>>>>>>> after a failure might load inconsistent offsets and >>> positions." >>> >>>>>>>>>> >>> >>>>>>>>>> The checkpoint is the offset from the changelog, while the >>> >>> position >>> >>>>>>>>>> is the offset from the upstream source topic, right? -- In the >>> >>> end, >>> >>>>>>>>>> the position is about IQ, and if we fail to update it, it only >>> >>>>>>>>>> means that there is some gap when we might not be able to >>> query a >>> >>>>>>>>>> standby task, because we think it's not up-to-date enough >>> even if >>> >>>>>>>>>> it is, which would resolve itself soon? Ie, the position might >>> >>>>>>>>>> "lag", but it's not "inconsistent". Do we believe that this >>> lag >>> >>>>>>>>>> would be highly problematic? >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> 102: I am confused. >>> >>>>>>>>>> >>> >>>>>>>>>>> The position is maintained inside the state store, but is >>> >>>>>>>>>>> persisted in the .position file when the state store closes. >>> >>>>>>>>>> >>> >>>>>>>>>> This contradicts the KIP: >>> >>>>>>>>>> >>> >>>>>>>>>>> these position offsets will be stored in RocksDB, in the >>> same >>> >>>>>>>>>>> column family as the changelog offsets, instead of the >>> .position >>> >>>>> file >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> My main concern is currently about rebalance metadata -- >>> opening >>> >>>>>>>>>> RocksDB stores seems to be very expensive, but if we follow >>> the >>> >>>> KIP: >>> >>>>>>>>>> >>> >>>>>>>>>>> We will do this under EOS by updating the .checkpoint file >>> >>>>>>>>>>> whenever a store is close()d. >>> >>>>>>>>>> >>> >>>>>>>>>> It seems, having the offset inside RocksDB does not help us at >>> >>> all? >>> >>>>>>>>>> In the end, when we crash, we don't want to lose the state, >>> but >>> >>>>>>>>>> when we update the .checkpoint only on a clean close, the >>> >>>>>>>>>> .checkpoint might be stale (ie, still contains the checkpoint >>> >>> when >>> >>>>>>>>>> we opened the store when we got a task assigned). >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> -Matthias >>> >>>>>>>>>> >>> >>>>>>>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote: >>> >>>>>>>>>>> Hi all, >>> >>>>>>>>>>> >>> >>>>>>>>>>> 100 >>> >>>>>>>>>>> I think we already have such a wrapper. It is called >>> >>>>>>>>>>> AbstractReadWriteDecorator. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> 101 >>> >>>>>>>>>>> Currently, the position is checkpointed when a offset >>> checkpoint >>> >>>>>>>>>>> is written. If we let the state store manage the committed >>> >>>>>>>>>>> offsets, we need to also let the state store also manage the >>> >>>>>>>>>>> position otherwise they might diverge. State store managed >>> >>> offsets >>> >>>>>>>>>>> can get flushed (i.e. checkpointed) to the disk when the >>> state >>> >>>>>>>>>>> store decides to flush its in-memory data structures, but the >>> >>>>>>>>>>> position is only checkpointed at commit time. Recovering >>> after a >>> >>>>>>>>>>> failure might load inconsistent offsets and positions. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> 102 >>> >>>>>>>>>>> The position is maintained inside the state store, but is >>> >>>>>>>>>>> persisted in the .position file when the state store closes. >>> The >>> >>>>>>>>>>> only public interface that uses the position is IQv2 in a >>> >>>>>>>>>>> read-only mode. So the position is only updated within the >>> state >>> >>>>>>>>>>> store and read from IQv2. No need to add anything to the >>> public >>> >>>>>>>>>>> StateStore interface. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> 103 >>> >>>>>>>>>>> Deprecating managesOffsets() right away might be a good idea. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> 104 >>> >>>>>>>>>>> I agree that we should try to support downgrades without >>> wipes. >>> >>> At >>> >>>>>>>>>>> least Nick should state in the KIP why we do not support it. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> Best, >>> >>>>>>>>>>> Bruno >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote: >>> >>>>>>>>>>>> Thanks for splitting out this KIP. The discussion shows, >>> that >>> >>> it >>> >>>>>>>>>>>> is a complex beast by itself, so worth to discuss by its >>> own. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Couple of question / comment: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be >>> called >>> >>>>>>>>>>>> by users" -- I would propose to put a guard in place for >>> this, >>> >>> by >>> >>>>>>>>>>>> either throwing an exception (preferable) or adding a no-op >>> >>>>>>>>>>>> implementation (at least for our own stores, by wrapping >>> them >>> >>> -- >>> >>>>>>>>>>>> we cannot enforce it for custom stores I assume), and >>> document >>> >>>>>>>>>>>> this contract explicitly. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 101 adding `.position` to the store: Why do we actually need >>> >>>>>>>>>>>> this? The KIP says "To ensure consistency with the committed >>> >>> data >>> >>>>>>>>>>>> and changelog offsets" but I am not sure if I can follow? >>> Can >>> >>> you >>> >>>>>>>>>>>> elaborate why leaving the `.position` file as-is won't work? >>> >>>>>>>>>>>> >>> >>>>>>>>>>>>> If it's possible at all, it will need to be done by >>> >>>>>>>>>>>>> creating temporary StateManagers and StateStores during >>> >>>>>>>>>>>>> rebalance. I think >>> >>>>>>>>>>>>> it is possible, and probably not too expensive, but the >>> devil >>> >>>>>>>>>>>>> will be in >>> >>>>>>>>>>>>> the detail. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> This sounds like a significant overhead to me. We know that >>> >>>>>>>>>>>> opening a single RocksDB takes about 500ms, and thus opening >>> >>>>>>>>>>>> RocksDB to get this information might slow down rebalances >>> >>>>>>>>>>>> significantly. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 102: It's unclear to me, how `.position` information is >>> added. >>> >>>>>>>>>>>> The KIP only says: "position offsets will be stored in >>> RocksDB, >>> >>>>>>>>>>>> in the same column family as the changelog offsets". Do you >>> >>>>>>>>>>>> intent to add this information to the map passed via >>> >>>>>>>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`? >>> The >>> >>>>>>>>>>>> KIP should describe this in more detail. Also, if my >>> assumption >>> >>>>>>>>>>>> is correct, we might want to rename the parameter and also >>> >>> have a >>> >>>>>>>>>>>> better JavaDoc description? >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 103: Should we make it mandatory (long-term) that all stores >>> >>>>>>>>>>>> (including custom stores) manage their offsets internally? >>> >>>>>>>>>>>> Maintaining both options and thus both code paths puts a >>> burden >>> >>>>>>>>>>>> on everyone and make the code messy. I would strongly >>> prefer if >>> >>>>>>>>>>>> we could have mid-term path to get rid of supporting both. >>> -- >>> >>>>>>>>>>>> For this case, we should deprecate the newly added >>> >>>>>>>>>>>> `managesOffsets()` method right away, to point out that we >>> >>> intend >>> >>>>>>>>>>>> to remove it. If it's mandatory to maintain offsets for >>> stores, >>> >>>>>>>>>>>> we won't need this method any longer. In memory stores can >>> just >>> >>>>>>>>>>>> return null from #committedOffset(). >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 104 "downgrading": I think it might be worth to add support >>> for >>> >>>>>>>>>>>> downgrading w/o the need to wipe stores? Leveraging >>> >>>>>>>>>>>> `upgrade.from` parameter, we could build a two rolling >>> bounce >>> >>>>>>>>>>>> downgrade: (1) the new code is started with `upgrade.from` >>> set >>> >>> to >>> >>>>>>>>>>>> a lower version, telling the runtime to do the cleanup on >>> >>>>>>>>>>>> `close()` -- (ie, ensure that all data is written into >>> >>>>>>>>>>>> `.checkpoint` and `.position` file, and the newly added CL >>> is >>> >>>>>>>>>>>> deleted). In a second, rolling bounce, the old code would be >>> >>> able >>> >>>>>>>>>>>> to open RocksDB. -- I understand that this implies much more >>> >>>>>>>>>>>> work, but downgrade seems to be common enough, that it >>> might be >>> >>>>>>>>>>>> worth it? Even if we did not always support this in the >>> past, >>> >>> we >>> >>>>>>>>>>>> have the face the fact that KS is getting more and more >>> adopted >>> >>>>>>>>>>>> and as a more mature product should support this? >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> -Matthias >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote: >>> >>>>>>>>>>>>> Hi all, >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> How should we proceed here? >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> 1. with the plain .checkpoint file >>> >>>>>>>>>>>>> 2. with a way to use the state store interface on >>> unassigned >>> >>> but >>> >>>>>>>>>>>>> locally existing task state >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> While I like option 2, I think option 1 is less risky and >>> will >>> >>>>>>>>>>>>> give us the benefits of transactional state stores sooner. >>> We >>> >>>>>>>>>>>>> should consider the interface approach afterwards, though. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> Best, >>> >>>>>>>>>>>>> Bruno >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote: >>> >>>>>>>>>>>>>> Hi Nick and Sophie, >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> I think the task ID is not enough to create a state store >>> >>> that >>> >>>>>>>>>>>>>> can read the offsets of non-assigned tasks for lag >>> >>> computation >>> >>>>>>>>>>>>>> during rebalancing. The state store also needs the state >>> >>>>>>>>>>>>>> directory so that it knows where to find the information >>> that >>> >>>>>>>>>>>>>> it needs to return from changelogOffsets(). >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> In general, I think we should proceed with the plain >>> >>>>>>>>>>>>>> .checkpoint file for now and iterate back to the state >>> store >>> >>>>>>>>>>>>>> solution later since it seems it is not that >>> straightforward. >>> >>>>>>>>>>>>>> Alternatively, Nick could timebox an effort to better >>> >>>>>>>>>>>>>> understand what would be needed for the state store >>> solution. >>> >>>>>>>>>>>>>> Nick, let us know your decision. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Regarding your question about the state store instance. I >>> am >>> >>>>>>>>>>>>>> not too familiar with that part of the code, but I think >>> the >>> >>>>>>>>>>>>>> state store is build when the processor topology is build >>> and >>> >>>>>>>>>>>>>> the processor topology is build per stream task. So there >>> is >>> >>>>>>>>>>>>>> one instance of processor topology and state store per >>> stream >>> >>>>>>>>>>>>>> task. Try to follow the call in [1]. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Best, >>> >>>>>>>>>>>>>> Bruno >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> [1] >>> >>>>>>>>>>>>>> >>> >>>>> >>> >>>> >>> >>> >>> https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153 >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote: >>> >>>>>>>>>>>>>>> That does make sense. The one thing I can't figure out is >>> >>> how >>> >>>>>>>>>>>>>>> per-Task >>> >>>>>>>>>>>>>>> StateStore instances are constructed. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> It looks like we construct one StateStore instance for >>> the >>> >>>>>>>>>>>>>>> whole Topology >>> >>>>>>>>>>>>>>> (in InternalTopologyBuilder), and pass that into >>> >>>>>>>>>>>>>>> ProcessorStateManager (via >>> >>>>>>>>>>>>>>> StateManagerUtil) for each Task, which then initializes >>> it. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> This can't be the case though, otherwise multiple >>> partitions >>> >>>>>>>>>>>>>>> of the same >>> >>>>>>>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore >>> >>>>>>>>>>>>>>> instance, which >>> >>>>>>>>>>>>>>> they don't. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> What am I missing? >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman >>> >>>>>>>>>>>>>>> <sop...@responsive.dev> >>> >>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> I don't think we need to *require* a constructor accept >>> the >>> >>>>>>>>>>>>>>>> TaskId, but we >>> >>>>>>>>>>>>>>>> would definitely make sure that the RocksDB state store >>> >>>>>>>>>>>>>>>> changes its >>> >>>>>>>>>>>>>>>> constructor to one that accepts the TaskID (which we >>> can do >>> >>>>>>>>>>>>>>>> without >>> >>>>>>>>>>>>>>>> deprecation since its an internal API), and custom state >>> >>>>>>>>>>>>>>>> stores can just >>> >>>>>>>>>>>>>>>> decide for themselves whether they want to opt-in/use >>> the >>> >>>>>>>>>>>>>>>> TaskId param >>> >>>>>>>>>>>>>>>> or not. I mean custom state stores would have to opt-in >>> >>>>>>>>>>>>>>>> anyways by >>> >>>>>>>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and >>> the >>> >>>> only >>> >>>>>>>>>>>>>>>> reason to do that would be to have created a constructor >>> >>> that >>> >>>>>>>>>>>>>>>> accepts >>> >>>>>>>>>>>>>>>> a TaskId >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Just to be super clear about the proposal, this is what >>> I >>> >>> had >>> >>>>>>>>>>>>>>>> in mind. >>> >>>>>>>>>>>>>>>> It's actually fairly simple and wouldn't add much to the >>> >>>>>>>>>>>>>>>> scope of the >>> >>>>>>>>>>>>>>>> KIP (I think -- if it turns out to be more complicated >>> than >>> >>>>>>>>>>>>>>>> I'm assuming, >>> >>>>>>>>>>>>>>>> we should definitely do whatever has the smallest LOE to >>> >>> get >>> >>>>>>>>>>>>>>>> this done >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Anyways, the (only) public API changes would be to add >>> this >>> >>>> new >>> >>>>>>>>>>>>>>>> method to the StoreSupplier API: >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> default T get(final TaskId taskId) { >>> >>>>>>>>>>>>>>>> return get(); >>> >>>>>>>>>>>>>>>> } >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> We can decide whether or not to deprecate the old #get >>> but >>> >>>>>>>>>>>>>>>> it's not >>> >>>>>>>>>>>>>>>> really necessary and might cause a lot of turmoil, so >>> I'd >>> >>>>>>>>>>>>>>>> personally >>> >>>>>>>>>>>>>>>> say we just leave both APIs in place. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> And that's it for public API changes! Internally, we >>> would >>> >>>>>>>>>>>>>>>> just adapt >>> >>>>>>>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement >>> this >>> >>>> new >>> >>>>>>>>>>>>>>>> API. So for example with the >>> >>>> RocksDBKeyValueBytesStoreSupplier, >>> >>>>>>>>>>>>>>>> we just add >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> @Override >>> >>>>>>>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId >>> >>> taskId) >>> >>>> { >>> >>>>>>>>>>>>>>>> return returnTimestampedStore ? >>> >>>>>>>>>>>>>>>> new RocksDBTimestampedStore(name, >>> metricsScope(), >>> >>>>>>>>>>>>>>>> taskId) : >>> >>>>>>>>>>>>>>>> new RocksDBStore(name, metricsScope(), >>> taskId); >>> >>>>>>>>>>>>>>>> } >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> And of course add the TaskId parameter to each of the >>> >>> actual >>> >>>>>>>>>>>>>>>> state store constructors returned here. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Does that make sense? It's entirely possible I'm missing >>> >>>>>>>>>>>>>>>> something >>> >>>>>>>>>>>>>>>> important here, but I think this would be a pretty small >>> >>>>>>>>>>>>>>>> addition that >>> >>>>>>>>>>>>>>>> would solve the problem you mentioned earlier while also >>> >>>> being >>> >>>>>>>>>>>>>>>> useful to anyone who uses custom state stores. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford >>> >>>>>>>>>>>>>>>> <nick.telf...@gmail.com> >>> >>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Hi Sophie, >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Interesting idea! Although what would that mean for the >>> >>>>>>>>>>>>>>>>> StateStore >>> >>>>>>>>>>>>>>>>> interface? Obviously we can't require that the >>> constructor >>> >>>>>>>>>>>>>>>>> take the >>> >>>>>>>>>>>>>>>> TaskId. >>> >>>>>>>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier? >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we >>> >>>>>>>>>>>>>>>>> over-complicating >>> >>>>>>>>>>>>>>>> it? >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Nick >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman >>> >>>>>>>>>>>>>>>>> <sop...@responsive.dev >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> Somewhat minor point overall, but it actually drives >>> me >>> >>>>>>>>>>>>>>>>>> crazy that you >>> >>>>>>>>>>>>>>>>>> can't get access to the taskId of a StateStore until >>> >>> #init >>> >>>>>>>>>>>>>>>>>> is called. >>> >>>>>>>>>>>>>>>>> This >>> >>>>>>>>>>>>>>>>>> has caused me a huge headache personally (since the >>> same >>> >>> is >>> >>>>>>>>>>>>>>>>>> true for >>> >>>>>>>>>>>>>>>>>> processors and I was trying to do something that's >>> >>> probably >>> >>>>>>>>>>>>>>>>>> too hacky >>> >>>>>>>>>>>>>>>> to >>> >>>>>>>>>>>>>>>>>> actually complain about here lol) >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive >>> and >>> >>>>>>>>>>>>>>>>>> pass along the >>> >>>>>>>>>>>>>>>>>> taskId when creating a new store? Presumably by >>> adding a >>> >>>>>>>>>>>>>>>>>> new version of >>> >>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>> #get method that takes in a taskId parameter? We can >>> have >>> >>>>>>>>>>>>>>>>>> it default to >>> >>>>>>>>>>>>>>>>>> invoking the old one for compatibility reasons and it >>> >>>>>>>>>>>>>>>>>> should be >>> >>>>>>>>>>>>>>>>> completely >>> >>>>>>>>>>>>>>>>>> safe to tack on. >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier, >>> but >>> >>>>> that's >>> >>>>>>>>>>>>>>>> definitely >>> >>>>>>>>>>>>>>>>>> outside the scope of this KIP >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford >>> >>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com> >>> >>>>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> On further thought, it's clear that this can't work >>> for >>> >>>>>>>>>>>>>>>>>>> one simple >>> >>>>>>>>>>>>>>>>>> reason: >>> >>>>>>>>>>>>>>>>>>> StateStores don't know their associated TaskId (and >>> >>> hence, >>> >>>>>>>>>>>>>>>>>>> their >>> >>>>>>>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore, >>> >>>>>>>>>>>>>>>>>>> committedOffset() >>> >>>>>>>>>>>>>>>>> can't >>> >>>>>>>>>>>>>>>>>>> be called before init(), unless we also added a >>> >>>>>>>>>>>>>>>>>>> StateStoreContext >>> >>>>>>>>>>>>>>>>>> argument >>> >>>>>>>>>>>>>>>>>>> to committedOffset(), which I think might be trying >>> to >>> >>>>>>>>>>>>>>>>>>> shoehorn too >>> >>>>>>>>>>>>>>>>> much >>> >>>>>>>>>>>>>>>>>>> into committedOffset(). >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> I still don't like the idea of the Streams engine >>> >>>>>>>>>>>>>>>>>>> maintaining the >>> >>>>>>>>>>>>>>>> cache >>> >>>>>>>>>>>>>>>>>> of >>> >>>>>>>>>>>>>>>>>>> changelog offsets independently of stores, mostly >>> >>> because >>> >>>>>>>>>>>>>>>>>>> of the >>> >>>>>>>>>>>>>>>>>>> maintenance burden of the code duplication, but it >>> looks >>> >>>>>>>>>>>>>>>>>>> like we'll >>> >>>>>>>>>>>>>>>>> have >>> >>>>>>>>>>>>>>>>>> to >>> >>>>>>>>>>>>>>>>>>> live with it. >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> Unless you have any better ideas? >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> Regards, >>> >>>>>>>>>>>>>>>>>>> Nick >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford >>> >>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com> >>> >>>>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> Hi Bruno, >>> >>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> Immediately after I sent my response, I looked at >>> the >>> >>>>>>>>>>>>>>>>>>>> codebase and >>> >>>>>>>>>>>>>>>>> came >>> >>>>>>>>>>>>>>>>>>> to >>> >>>>>>>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it >>> will >>> >>>>>>>>>>>>>>>>>>>> need to be >>> >>>>>>>>>>>>>>>> done >>> >>>>>>>>>>>>>>>>>> by >>> >>>>>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores >>> during >>> >>>>>>>>>>>>>>>>>>>> rebalance. >>> >>>>>>>>>>>>>>>> I >>> >>>>>>>>>>>>>>>>>>> think >>> >>>>>>>>>>>>>>>>>>>> it is possible, and probably not too expensive, but >>> the >>> >>>>>>>>>>>>>>>>>>>> devil will >>> >>>>>>>>>>>>>>>> be >>> >>>>>>>>>>>>>>>>>> in >>> >>>>>>>>>>>>>>>>>>>> the detail. >>> >>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> I'll try to find some time to explore the idea to >>> see >>> >>> if >>> >>>>>>>>>>>>>>>>>>>> it's >>> >>>>>>>>>>>>>>>>> possible >>> >>>>>>>>>>>>>>>>>>> and >>> >>>>>>>>>>>>>>>>>>>> report back, because we'll need to determine this >>> >>> before >>> >>>>>>>>>>>>>>>>>>>> we can >>> >>>>>>>>>>>>>>>> vote >>> >>>>>>>>>>>>>>>>> on >>> >>>>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>> KIP. >>> >>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> Regards, >>> >>>>>>>>>>>>>>>>>>>> Nick >>> >>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna >>> >>>>>>>>>>>>>>>>>>>> <cado...@apache.org> >>> >>>>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> Hi Nick, >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly! >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> 2. >>> >>>>>>>>>>>>>>>>>>>>> Some thoughts on your proposal. >>> >>>>>>>>>>>>>>>>>>>>> State managers (and state stores) are parts of >>> tasks. >>> >>> If >>> >>>>>>>>>>>>>>>>>>>>> the task >>> >>>>>>>>>>>>>>>> is >>> >>>>>>>>>>>>>>>>>> not >>> >>>>>>>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To >>> get >>> >>>>>>>>>>>>>>>>>>>>> the offsets >>> >>>>>>>>>>>>>>>>>> with >>> >>>>>>>>>>>>>>>>>>>>> your approach, we would need to either create kind >>> of >>> >>>>>>>>>>>>>>>>>>>>> inactive >>> >>>>>>>>>>>>>>>> tasks >>> >>>>>>>>>>>>>>>>>>>>> besides active and standby tasks or store and >>> manage >>> >>>> state >>> >>>>>>>>>>>>>>>> managers >>> >>>>>>>>>>>>>>>>> of >>> >>>>>>>>>>>>>>>>>>>>> non-assigned tasks differently than the state >>> managers >>> >>>>>>>>>>>>>>>>>>>>> of assigned >>> >>>>>>>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that >>> removes >>> >>>>>>>>>>>>>>>>>>>>> unassigned >>> >>>>>>>>>>>>>>>> task >>> >>>>>>>>>>>>>>>>>>>>> directories needs to concurrently delete those >>> >>> inactive >>> >>>>>>>>>>>>>>>>>>>>> tasks or >>> >>>>>>>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This >>> >>> seems >>> >>>>>>>>>>>>>>>>>>>>> all quite >>> >>>>>>>>>>>>>>>>>> messy >>> >>>>>>>>>>>>>>>>>>>>> to me. >>> >>>>>>>>>>>>>>>>>>>>> Could we create those state managers (or state >>> stores) >>> >>>>>>>>>>>>>>>>>>>>> for locally >>> >>>>>>>>>>>>>>>>>>>>> existing but unassigned tasks on demand when >>> >>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or >>> have a >>> >>>>>>>>>>>>>>>>>>>>> different >>> >>>>>>>>>>>>>>>>>>>>> encapsulation for the unused task directories? >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> Best, >>> >>>>>>>>>>>>>>>>>>>>> Bruno >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote: >>> >>>>>>>>>>>>>>>>>>>>>> Hi Bruno, >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the review! >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> 1, 4, 5. >>> >>>>>>>>>>>>>>>>>>>>>> Done >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> 3. >>> >>>>>>>>>>>>>>>>>>>>>> You're right. I've removed the offending >>> paragraph. I >>> >>>> had >>> >>>>>>>>>>>>>>>>> originally >>> >>>>>>>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in >>> KIP-892. >>> >>>>>>>>>>>>>>>>>>>>>> But it's >>> >>>>>>>>>>>>>>>>>>>>> difficult to >>> >>>>>>>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892 >>> >>>> transaction >>> >>>>>>>>>>>>>>>> buffers. >>> >>>>>>>>>>>>>>>>>>>>> Instead, >>> >>>>>>>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc >>> when >>> >>>>>>>>>>>>>>>>>>>>>> KIP-892 >>> >>>>>>>>>>>>>>>> lands. >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> 2. >>> >>>>>>>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that >>> was >>> >>>>>>>>>>>>>>>>>> (significantly) >>> >>>>>>>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My >>> >>> prototype >>> >>>>>>>>>>>>>>>>>>>>>> currently >>> >>>>>>>>>>>>>>>>>>>>> maintains >>> >>>>>>>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint, >>> but >>> >>>>>>>>>>>>>>>>>>>>>> doing so >>> >>>>>>>>>>>>>>>>>> becomes >>> >>>>>>>>>>>>>>>>>>>>> very >>> >>>>>>>>>>>>>>>>>>>>>> messy. My intent with this change was to try to >>> >>> better >>> >>>>>>>>>>>>>>>> encapsulate >>> >>>>>>>>>>>>>>>>>>> this >>> >>>>>>>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that >>> can >>> >>>>>>>>>>>>>>>>>>>>>> cheaply >>> >>>>>>>>>>>>>>>>>> provide >>> >>>>>>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>>>> offsets stored directly in them without needing to >>> >>>>>>>>>>>>>>>>>>>>>> duplicate >>> >>>>>>>>>>>>>>>> them >>> >>>>>>>>>>>>>>>>> in >>> >>>>>>>>>>>>>>>>>>>>> this >>> >>>>>>>>>>>>>>>>>>>>>> cache. >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> It's clear some more work is needed here to better >>> >>>>>>>>>>>>>>>>>>>>>> encapsulate >>> >>>>>>>>>>>>>>>>> this. >>> >>>>>>>>>>>>>>>>>>> My >>> >>>>>>>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but >>> don't >>> >>>>>>>>>>>>>>>> initialize* >>> >>>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>>>> StateManager and StateStores for every Task >>> directory >>> >>>>>>>>>>>>>>>>>>>>>> on-disk? >>> >>>>>>>>>>>>>>>>> That >>> >>>>>>>>>>>>>>>>>>>>> should >>> >>>>>>>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to >>> >>>>>>>>>>>>>>>>>>>>>> query the >>> >>>>>>>>>>>>>>>>> offsets >>> >>>>>>>>>>>>>>>>>>> for >>> >>>>>>>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If >>> the >>> >>>>>>>>>>>>>>>> StateManager >>> >>>>>>>>>>>>>>>>>>> (aka. >>> >>>>>>>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves >>> too >>> >>>>>>>>>>>>>>>>>>>>>> expensive >>> >>>>>>>>>>>>>>>> to >>> >>>>>>>>>>>>>>>>>> hold >>> >>>>>>>>>>>>>>>>>>>>> open >>> >>>>>>>>>>>>>>>>>>>>>> for closed stores, we could always have a >>> >>>>>>>>>>>>>>>>>>>>>> "StubStateManager" in >>> >>>>>>>>>>>>>>>>> its >>> >>>>>>>>>>>>>>>>>>>>> place, >>> >>>>>>>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing >>> >>> else? >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> IDK, what do you think? >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> Nick >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna >>> >>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org> >>> >>>>>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> Hi Nick, >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892! >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> Here a couple of comments/questions: >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> 1. >>> >>>>>>>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline >>> which >>> >>>>>>>>>>>>>>>>>>>>>>> says to not >>> >>>>>>>>>>>>>>>>> use >>> >>>>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could >>> >>> you >>> >>>>>>>>>>>>>>>>>>>>>>> please >>> >>>>>>>>>>>>>>>>> change >>> >>>>>>>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()? >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> 2. >>> >>>>>>>>>>>>>>>>>>>>>>> It is not clear to me how >>> >>>>> TaskManager#getTaskOffsetSums() >>> >>>>>>>>>>>>>>>> should >>> >>>>>>>>>>>>>>>>>> read >>> >>>>>>>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own >>> but >>> >>>>>>>>>>>>>>>>>>>>>>> that have a >>> >>>>>>>>>>>>>>>>>> state >>> >>>>>>>>>>>>>>>>>>>>>>> directory on the Streams client by calling >>> >>>>>>>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread >>> does >>> >>>>>>>>>>>>>>>>>>>>>>> not own a >>> >>>>>>>>>>>>>>>>> task >>> >>>>>>>>>>>>>>>>>>> it >>> >>>>>>>>>>>>>>>>>>>>>>> does also not create any state stores for the >>> task, >>> >>>>>>>>>>>>>>>>>>>>>>> which means >>> >>>>>>>>>>>>>>>>>> there >>> >>>>>>>>>>>>>>>>>>>>> is >>> >>>>>>>>>>>>>>>>>>>>>>> no state store on which to call >>> >>> getCommittedOffsets(). >>> >>>>>>>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint >>> file >>> >>> is >>> >>>>>>>>>>>>>>>>>>>>>>> written >>> >>>>>>>>>>>>>>>>> for >>> >>>>>>>>>>>>>>>>>>> all >>> >>>>>>>>>>>>>>>>>>>>>>> state stores on close -- not only for the >>> >>> RocksDBStore >>> >>>>>>>>>>>>>>>>>>>>>>> -- and >>> >>>>>>>>>>>>>>>>> that >>> >>>>>>>>>>>>>>>>>>> this >>> >>>>>>>>>>>>>>>>>>>>>>> checkpoint file is read in >>> >>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for >>> >>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>>> tasks >>> >>>>>>>>>>>>>>>>>>>>>>> that have a state directory on the client but are >>> >>> not >>> >>>>>>>>>>>>>>>>>>>>>>> currently >>> >>>>>>>>>>>>>>>>>>>>> assigned >>> >>>>>>>>>>>>>>>>>>>>>>> to any stream thread of the Streams client. >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> 3. >>> >>>>>>>>>>>>>>>>>>>>>>> In the javadocs for commit() you write >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or >>> since >>> >>>>>>>>>>>>>>>>>> init(StateStore) >>> >>>>>>>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a >>> >>> restart." >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> This is only true for a clean close before the >>> >>>>>>>>>>>>>>>>>>>>>>> restart, isn't >>> >>>>>>>>>>>>>>>> it? >>> >>>>>>>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka >>> Streams >>> >>>>>>>>>>>>>>>>>>>>>>> cannot >>> >>>>>>>>>>>>>>>>>> guarantee >>> >>>>>>>>>>>>>>>>>>>>>>> that the in-memory structures of the state store >>> >>> (e.g. >>> >>>>>>>>>>>>>>>>>>>>>>> memtable >>> >>>>>>>>>>>>>>>>> in >>> >>>>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records >>> and >>> >>>> the >>> >>>>>>>>>>>>>>>>> committed >>> >>>>>>>>>>>>>>>>>>>>>>> offsets are persisted. >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> 4. >>> >>>>>>>>>>>>>>>>>>>>>>> The wrapper that provides the legacy >>> checkpointing >>> >>>>>>>>>>>>>>>>>>>>>>> behavior is >>> >>>>>>>>>>>>>>>>>>> actually >>> >>>>>>>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from >>> the >>> >>>>>>>>>>>>>>>>>>>>>>> KIP, but >>> >>>>>>>>>>>>>>>>> still >>> >>>>>>>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior >>> will be >>> >>>>>>>>>>>>>>>>>>>>>>> supported >>> >>>>>>>>>>>>>>>>> when >>> >>>>>>>>>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>>>>>>>>> state store does not manage the checkpoints. >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> 5. >>> >>>>>>>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the >>> >>> tags, >>> >>>>>>>>>>>>>>>>>>>>>>> and the >>> >>>>>>>>>>>>>>>>>>> recording >>> >>>>>>>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or >>> KIP-444. >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> Best, >>> >>>>>>>>>>>>>>>>>>>>>>> Bruno >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote: >>> >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out >>> >>> the >>> >>>>>>>>>>>>>>>>>>>>>>>> "Atomic >>> >>>>>>>>>>>>>>>>>>>>>>> Checkpointing" >>> >>>>>>>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics >>> for >>> >>>>>>>>>>>>>>>>>>>>>>>> StateStores, >>> >>>>>>>>>>>>>>>>>> into >>> >>>>>>>>>>>>>>>>>>>>> its >>> >>>>>>>>>>>>>>>>>>>>>>> own >>> >>>>>>>>>>>>>>>>>>>>>>>> KIP >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> >>> >>>>> >>> >>>> >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes >>> >>> outlined >>> >>>>> in >>> >>>>>>>>>>>>>>>>> KIP-1035, >>> >>>>>>>>>>>>>>>>>>>>> these >>> >>>>>>>>>>>>>>>>>>>>>>>> changes were always the most contentious part, >>> and >>> >>>>>>>>>>>>>>>>>>>>>>>> continued >>> >>>>>>>>>>>>>>>> to >>> >>>>>>>>>>>>>>>>>> spur >>> >>>>>>>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted. >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been >>> >>>>>>>>>>>>>>>>>>>>>>>> removed from >>> >>>>>>>>>>>>>>>>>>> KIP-892, >>> >>>>>>>>>>>>>>>>>>>>>>> and >>> >>>>>>>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to >>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-892 in >>> >>>>>>>>>>>>>>>> their >>> >>>>>>>>>>>>>>>>>>>>> place. >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this >>> set >>> >>> of >>> >>>>>>>>>>>>>>>>>>>>>>>> changes, >>> >>>>>>>>>>>>>>>> we >>> >>>>>>>>>>>>>>>>>> can >>> >>>>>>>>>>>>>>>>>>>>>>>> deliver something that we're all happy with. >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>>> Regards, >>> >>>>>>>>>>>>>>>>>>>>>>>> Nick >>> >>>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> >>>>> >>> >>>> >>> >>> >>> >> >>> >>