Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-06-10 Thread Matthias J. Sax
Thanks Nick. 201: This make sense. Would it make sense to actually document the hard requirement? 202: SG. 204: Ah yes. Thanks for clarifying. 205: SG. I think you can start a VOTE. -Matthias On 6/6/24 2:53 AM, Nick Telford wrote: Hi Matthias, Thanks for your thorough review. 200.

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-06-06 Thread Nick Telford
Hi Matthias, Thanks for your thorough review. 200. (#managesOffsets requirements) Done 201. (#commit atomicity recommendation vs. guarantee) There are possible StateStore implementations, including existing ones, that can't guarantee atomicity - because the underlying database/system doesn't sup

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-06-04 Thread Matthias J. Sax
Nick, Thanks a lot for updating the KIP. I made a pass over it. Overall LGTM. A few nits and some more minor questions: 200: nit (Javadocs for `StateStore.managesOffsets()`): This is highly recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-30 Thread Nick Telford
Hi everyone, I didn't spot this before, but it looks like the API of KeyValueStoreTestDriver will need to be updated to change the nomenclature from "flushed" to "committed": numFlushedEntryRemoved() -> numCommittedEntryRemoved() numFlushedEntryStored() -> numCommittedEntryStored() flushedEntryRe

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-29 Thread Nick Telford
I've updated the KIP with the following: - Deprecation of StateStore#managesOffsets - Change StateStore#commit to throw UnsupportedOperationException when called from a Processor (via AbstractReadWriteDecorator) - Updated consumer rebalance lag computation strategy

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-29 Thread Nick Telford
Hi everyone, Sorry I haven't got around to updating the KIP yet. Now that I've wrapped up KIP-989, I'm going to be working on 1035 starting today. I'll update the KIP first, and then call a vote. Regards, Nick On Wed, 29 May 2024 at 07:25, Bruno Cadonna wrote: > Totally agree on moving forwar

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-28 Thread Bruno Cadonna
Totally agree on moving forward and starting the VOTE! However, the KIP should be updated with the new info before starting the VOTE. Best, Bruno On 5/29/24 2:36 AM, Matthias J. Sax wrote: Sounds like a good plan. -- I think we are still wrapping up 3.8 release, but would also like to move f

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-28 Thread Matthias J. Sax
Sounds like a good plan. -- I think we are still wrapping up 3.8 release, but would also like to move forward with with one. Should we start a VOTE? For merging PRs we need to wait after code freeze, and 3.8 branch was but. But we could start reviewing PRs before this already. -Matthias On

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-17 Thread Nick Telford
Hi everyone, As discussed on the Zoom call, we're going to handle rebalance meta-data by: - On start-up, Streams will open each store and read its changelog offsets into an in-memory cache. This cache will be shared among all StreamThreads. - On rebalance, the cache will be consulted for Task off

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-15 Thread Sophie Blee-Goldman
103: I like the idea of immediately deprecating #managesOffsets and aiming to make offset management mandatory in the long run. I assume we would also log a warning for any custom stores that return "false" from this method to encourage custom store implementations to start doing so? My only questi

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-14 Thread Nick Telford
Hi everyone, Sorry for the delay in replying. I've finally now got some time to work on this. Addressing Matthias's comments: 100. Good point. As Bruno mentioned, there's already AbstractReadWriteDecorator which we could leverage to provide that protection. I'll add details on this to the KIP.

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-06 Thread Bruno Cadonna
Hi Matthias, I see what you mean. To sum up: With this KIP the .checkpoint file is written when the store closes. That is when: 1. a task moves away from Kafka Streams client 2. Kafka Streams client shuts down A Kafka Streams client needs the information in the .checkpoint file 1. on startup

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
That's good questions... I could think of a few approaches, but I admit it might all be a little bit tricky to code up... However if we don't solve this problem, I think this KIP does not really solve the core issue we are facing? In the end, if we rely on the `.checkpoint` file to compute a t

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna
Hi Matthias, 200: I like the idea in general. However, it is not clear to me how the behavior should be with multiple stream threads in the same Kafka Streams client. What stream thread opens which store? How can a stream thread pass an open store to another stream thread that got the corres

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
101: Yes, but what I am saying is, that we don't need to flush the .position file to disk periodically, but only maintain it in main memory, and only write it to disk on close() to preserve it across restarts. This way, it would never be ahead, but might only lag? But with my better understandi

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna
Hi Matthias, 101: Let's assume a RocksDB store, but I think the following might be true also for other store implementations. With this KIP, if Kafka Streams commits the offsets, the committed offsets will be stored in an in-memory data structure (i.e. the memtable) and stay there until Rock

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Matthias J. Sax
Thanks Bruno. 101: I think I understand this better now. But just want to make sure I do. What do you mean by "they can diverge" and "Recovering after a failure might load inconsistent offsets and positions." The checkpoint is the offset from the changelog, while the position is the offset

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Bruno Cadonna
Hi all, 100 I think we already have such a wrapper. It is called AbstractReadWriteDecorator. 101 Currently, the position is checkpointed when a offset checkpoint is written. If we let the state store manage the committed offsets, we need to also let the state store also manage the position

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-22 Thread Matthias J. Sax
Thanks for splitting out this KIP. The discussion shows, that it is a complex beast by itself, so worth to discuss by its own. Couple of question / comment: 100 `StateStore#commit()`: The JavaDoc says "must not be called by users" -- I would propose to put a guard in place for this, by eithe

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-21 Thread Bruno Cadonna
Hi all, How should we proceed here? 1. with the plain .checkpoint file 2. with a way to use the state store interface on unassigned but locally existing task state While I like option 2, I think option 1 is less risky and will give us the benefits of transactional state stores sooner. We sho

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-17 Thread Bruno Cadonna
Hi Nick and Sophie, I think the task ID is not enough to create a state store that can read the offsets of non-assigned tasks for lag computation during rebalancing. The state store also needs the state directory so that it knows where to find the information that it needs to return from chan

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Nick Telford
That does make sense. The one thing I can't figure out is how per-Task StateStore instances are constructed. It looks like we construct one StateStore instance for the whole Topology (in InternalTopologyBuilder), and pass that into ProcessorStateManager (via StateManagerUtil) for each Task, which

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Sophie Blee-Goldman
I don't think we need to *require* a constructor accept the TaskId, but we would definitely make sure that the RocksDB state store changes its constructor to one that accepts the TaskID (which we can do without deprecation since its an internal API), and custom state stores can just decide for them

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-15 Thread Nick Telford
Hi Sophie, Interesting idea! Although what would that mean for the StateStore interface? Obviously we can't require that the constructor take the TaskId. Is it enough to add the parameter to the StoreSupplier? Would doing this be in-scope for this KIP, or are we over-complicating it? Nick On Fr

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Sophie Blee-Goldman
Somewhat minor point overall, but it actually drives me crazy that you can't get access to the taskId of a StateStore until #init is called. This has caused me a huge headache personally (since the same is true for processors and I was trying to do something that's probably too hacky to actually co

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Nick Telford
On further thought, it's clear that this can't work for one simple reason: StateStores don't know their associated TaskId (and hence, their StateDirectory) until the init() call. Therefore, committedOffset() can't be called before init(), unless we also added a StateStoreContext argument to committ

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno, Immediately after I sent my response, I looked at the codebase and came to the same conclusion. If it's possible at all, it will need to be done by creating temporary StateManagers and StateStores during rebalance. I think it is possible, and probably not too expensive, but the devil wil

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Bruno Cadonna
Hi Nick, Thanks for reacting on my comments so quickly! 2. Some thoughts on your proposal. State managers (and state stores) are parts of tasks. If the task is not assigned locally, we do not create those tasks. To get the offsets with your approach, we would need to either create kind of ina

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno, Thanks for the review! 1, 4, 5. Done 3. You're right. I've removed the offending paragraph. I had originally adapted this from the guarantees outlined in KIP-892. But it's difficult to provide these guarantees without the KIP-892 transaction buffers. Instead, we'll add the guarantees b

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-09 Thread Bruno Cadonna
Hi Nick, Thanks for breaking out the KIP from KIP-892! Here a couple of comments/questions: 1. In Kafka Streams, we have a design guideline which says to not use the "get"-prefix for getters on the public API. Could you please change getCommittedOffsets() to committedOffsets()? 2. It is no

[DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-07 Thread Nick Telford
Hi everyone, Based on some offline discussion, I've split out the "Atomic Checkpointing" section from KIP-892: Transactional Semantics for StateStores, into its own KIP KIP-1035: StateStore managed changelog offsets https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+