Hey Nick, Thank you for the KIP! With such a significant performance degradation in the secondary store approach, we should definitely consider WriteBatchWithIndex. I also like encapsulating checkpointing inside the default state store implementation to improve performance.
+1 to John's comment to keep the current checkpointing as a fallback mechanism. We want to keep existing users' workflows intact if we can. A non-intrusive way would be to add a separate StateStore method, say, StateStore#managesCheckpointing(), that controls whether the state store implementation owns checkpointing. I think that a solution to the transactional writes should address the OOMEs. One possible way to address that is to wire StateStore's commit request by adding, say, StateStore#commitNeeded that is checked in StreamTask#commitNeeded via the corresponding ProcessorStateManager. With that change, RocksDBStore will have to track the current transaction size and request a commit when the size goes over a (configurable) threshold. AFAIU WriteBatchWithIndex might perform significantly slower than non-txn puts as the batch size grows [1]. We should have a configuration to fall back to the current behavior (and/or disable txn stores for ALOS) unless the benchmarks show negligible overhead for longer commits / large-enough batch sizes. If you prefer to keep the KIP smaller, I would rather cut out state-store-managed checkpointing rather than proper OOMe handling and being able to switch to non-txn behavior. The checkpointing is not necessary to solve the recovery-under-EOS problem. On the other hand, once WriteBatchWithIndex is in, it will be much easier to add state-store-managed checkpointing. If you share the current implementation, I am happy to help you address the OOMe and configuration parts as well as review and test the patch. Best, Alex 1. https://github.com/facebook/rocksdb/issues/608 On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com> wrote: > Hi John, > > Thanks for the review and feedback! > > 1. Custom Stores: I've been mulling over this problem myself. As it stands, > custom stores would essentially lose checkpointing with no indication that > they're expected to make changes, besides a line in the release notes. I > agree that the best solution would be to provide a default that checkpoints > to a file. The one thing I would change is that the checkpointing is to a > store-local file, instead of a per-Task file. This way the StateStore still > technically owns its own checkpointing (via a default implementation), and > the StateManager/Task execution engine doesn't need to know anything about > checkpointing, which greatly simplifies some of the logic. > > 2. OOME errors: The main reasons why I didn't explore a solution to this is > a) to keep this KIP as simple as possible, and b) because I'm not exactly > how to signal that a Task should commit prematurely. I'm confident it's > possible, and I think it's worth adding a section on handling this. Besides > my proposal to force an early commit once memory usage reaches a threshold, > is there any other approach that you might suggest for tackling this > problem? > > 3. ALOS: I can add in an explicit paragraph, but my assumption is that > since transactional behaviour comes at little/no cost, that it should be > available by default on all stores, irrespective of the processing mode. > While ALOS doesn't use transactions, the Task itself still "commits", so > the behaviour should be correct under ALOS too. I'm not convinced that it's > worth having both transactional/non-transactional stores available, as it > would considerably increase the complexity of the codebase, for very little > benefit. > > 4. Method deprecation: Are you referring to StateStore#getPosition()? As I > understand it, Position contains the position of the *source* topics, > whereas the commit offsets would be the *changelog* offsets. So it's still > necessary to retain the Position data, as well as the changelog offsets. > What I meant in the KIP is that Position offsets are currently stored in a > file, and since we can atomically store metadata along with the record > batch we commit to RocksDB, we can move our Position offsets in to this > metadata too, and gain the same transactional guarantees that we will for > changelog offsets, ensuring that the Position offsets are consistent with > the records that are read from the database. > > Regards, > Nick > > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org> wrote: > > > Thanks for publishing this alternative, Nick! > > > > The benchmark you mentioned in the KIP-844 discussion seems like a > > compelling reason to revisit the built-in transactionality mechanism. I > > also appreciate you analysis, showing that for most use cases, the write > > batch approach should be just fine. > > > > There are a couple of points that would hold me back from approving this > > KIP right now: > > > > 1. Loss of coverage for custom stores. > > The fact that you can plug in a (relatively) simple implementation of the > > XStateStore interfaces and automagically get a distributed database out > of > > it is a significant benefit of Kafka Streams. I'd hate to lose it, so it > > would be better to spend some time and come up with a way to preserve > that > > property. For example, can we provide a default implementation of > > `commit(..)` that re-implements the existing checkpoint-file approach? Or > > perhaps add an `isTransactional()` flag to the state store interface so > > that the runtime can decide whether to continue to manage checkpoint > files > > vs delegating transactionality to the stores? > > > > 2. Guarding against OOME > > I appreciate your analysis, but I don't think it's sufficient to say that > > we will solve the memory problem later if it becomes necessary. The > > experience leading to that situation would be quite bad: Imagine, you > > upgrade to AK 3.next, your tests pass, so you deploy to production. That > > night, you get paged because your app is now crashing with OOMEs. As with > > all OOMEs, you'll have a really hard time finding the root cause, and > once > > you do, you won't have a clear path to resolve the issue. You could only > > tune down the commit interval and cache buffer size until you stop > getting > > crashes. > > > > FYI, I know of multiple cases where people run EOS with much larger > commit > > intervals to get better batching than the default, so I don't think this > > pathological case would be as rare as you suspect. > > > > Given that we already have the rudiments of an idea of what we could do > to > > prevent this downside, we should take the time to design a solution. We > owe > > it to our users to ensure that awesome new features don't come with > bitter > > pills unless we can't avoid it. > > > > 3. ALOS mode. > > On the other hand, I didn't see an indication of how stores will be > > handled under ALOS (aka non-EOS) mode. Theoretically, the > transactionality > > of the store and the processing mode are orthogonal. A transactional > store > > would serve ALOS just as well as a non-transactional one (if not better). > > Under ALOS, though, the default commit interval is five minutes, so the > > memory issue is far more pressing. > > > > As I see it, we have several options to resolve this point. We could > > demonstrate that transactional stores work just fine for ALOS and we can > > therefore just swap over unconditionally. We could also disable the > > transactional mechanism under ALOS so that stores operate just the same > as > > they do today when run in ALOS mode. Finally, we could do the same as in > > KIP-844 and make transactional stores opt-in (it'd be better to avoid the > > extra opt-in mechanism, but it's a good get-out-of-jail-free card). > > > > 4. (minor point) Deprecation of methods > > > > You mentioned that the new `commit` method replaces flush, > > updateChangelogOffsets, and checkpoint. It seems to me that the point > about > > atomicity and Position also suggests that it replaces the Position > > callbacks. However, the proposal only deprecates `flush`. Should we be > > deprecating other methods as well? > > > > Thanks again for the KIP! It's really nice that you and Alex will get the > > chance to collaborate on both directions so that we can get the best > > outcome for Streams and its users. > > > > -John > > > > > > On 2022/11/21 15:02:15 Nick Telford wrote: > > > Hi everyone, > > > > > > As I mentioned in the discussion thread for KIP-844, I've been working > on > > > an alternative approach to achieving better transactional semantics for > > > Kafka Streams StateStores. > > > > > > I've published this separately as KIP-892: Transactional Semantics for > > > StateStores > > > < > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > >, > > > so that it can be discussed/reviewed separately from KIP-844. > > > > > > Alex: I'm especially interested in what you think! > > > > > > I have a nearly complete implementation of the changes outlined in this > > > KIP, please let me know if you'd like me to push them for review in > > advance > > > of a vote. > > > > > > Regards, > > > > > > Nick > > > > > >