Hi Alex, Thanks for the feedback.
I've updated the discussion of OOM issues by describing how we'll handle it. Here's the new text: To mitigate this, we will automatically force a Task commit if the total > uncommitted records returned by > StateStore#approximateNumUncommittedEntries() exceeds a threshold, > configured by max.uncommitted.state.entries.per.task; or the total memory > used for buffering uncommitted records returned by > StateStore#approximateNumUncommittedBytes() exceeds the threshold > configured by max.uncommitted.state.bytes.per.task. This will roughly > bound the memory required per-Task for buffering uncommitted records, > irrespective of the commit.interval.ms, and will effectively bound the > number of records that will need to be restored in the event of a failure. > These limits will be checked in StreamTask#process and a premature commit > will be requested via Task#requestCommit(). > Note that these new methods provide default implementations that ensure > existing custom stores and non-transactional stores (e.g. > InMemoryKeyValueStore) do not force any early commits. I've chosen to have the StateStore expose approximations of its buffer size/count instead of opaquely requesting a commit in order to delegate the decision making to the Task itself. This enables Tasks to look at *all* of their StateStores, and determine whether an early commit is necessary. Notably, it enables pre-Task thresholds, instead of per-Store, which prevents Tasks with many StateStores from using much more memory than Tasks with one StateStore. This makes sense, since commits are done by-Task, not by-Store. Prizes* for anyone who can come up with a better name for the new config properties! Thanks for pointing out the potential performance issues of WBWI. From the benchmarks that user posted[1], it looks like WBWI still performs considerably better than individual puts, which is the existing design, so I'd actually expect a performance boost from WBWI, just not as great as we'd get from a plain WriteBatch. This does suggest that a good optimization would be to use a regular WriteBatch for restoration (in RocksDBStore#restoreBatch), since we know that those records will never be queried before they're committed. 1: https://github.com/adamretter/rocksjava-write-methods-benchmark#results * Just kidding, no prizes, sadly. On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov <asorokou...@confluent.io.invalid> wrote: > Hey Nick, > > Thank you for the KIP! With such a significant performance degradation in > the secondary store approach, we should definitely consider > WriteBatchWithIndex. I also like encapsulating checkpointing inside the > default state store implementation to improve performance. > > +1 to John's comment to keep the current checkpointing as a fallback > mechanism. We want to keep existing users' workflows intact if we can. A > non-intrusive way would be to add a separate StateStore method, say, > StateStore#managesCheckpointing(), that controls whether the state store > implementation owns checkpointing. > > I think that a solution to the transactional writes should address the > OOMEs. One possible way to address that is to wire StateStore's commit > request by adding, say, StateStore#commitNeeded that is checked in > StreamTask#commitNeeded via the corresponding ProcessorStateManager. With > that change, RocksDBStore will have to track the current transaction size > and request a commit when the size goes over a (configurable) threshold. > > AFAIU WriteBatchWithIndex might perform significantly slower than non-txn > puts as the batch size grows [1]. We should have a configuration to fall > back to the current behavior (and/or disable txn stores for ALOS) unless > the benchmarks show negligible overhead for longer commits / large-enough > batch sizes. > > If you prefer to keep the KIP smaller, I would rather cut out > state-store-managed checkpointing rather than proper OOMe handling and > being able to switch to non-txn behavior. The checkpointing is not > necessary to solve the recovery-under-EOS problem. On the other hand, once > WriteBatchWithIndex is in, it will be much easier to add > state-store-managed checkpointing. > > If you share the current implementation, I am happy to help you address the > OOMe and configuration parts as well as review and test the patch. > > Best, > Alex > > > 1. https://github.com/facebook/rocksdb/issues/608 > > On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com> > wrote: > > > Hi John, > > > > Thanks for the review and feedback! > > > > 1. Custom Stores: I've been mulling over this problem myself. As it > stands, > > custom stores would essentially lose checkpointing with no indication > that > > they're expected to make changes, besides a line in the release notes. I > > agree that the best solution would be to provide a default that > checkpoints > > to a file. The one thing I would change is that the checkpointing is to a > > store-local file, instead of a per-Task file. This way the StateStore > still > > technically owns its own checkpointing (via a default implementation), > and > > the StateManager/Task execution engine doesn't need to know anything > about > > checkpointing, which greatly simplifies some of the logic. > > > > 2. OOME errors: The main reasons why I didn't explore a solution to this > is > > a) to keep this KIP as simple as possible, and b) because I'm not exactly > > how to signal that a Task should commit prematurely. I'm confident it's > > possible, and I think it's worth adding a section on handling this. > Besides > > my proposal to force an early commit once memory usage reaches a > threshold, > > is there any other approach that you might suggest for tackling this > > problem? > > > > 3. ALOS: I can add in an explicit paragraph, but my assumption is that > > since transactional behaviour comes at little/no cost, that it should be > > available by default on all stores, irrespective of the processing mode. > > While ALOS doesn't use transactions, the Task itself still "commits", so > > the behaviour should be correct under ALOS too. I'm not convinced that > it's > > worth having both transactional/non-transactional stores available, as it > > would considerably increase the complexity of the codebase, for very > little > > benefit. > > > > 4. Method deprecation: Are you referring to StateStore#getPosition()? As > I > > understand it, Position contains the position of the *source* topics, > > whereas the commit offsets would be the *changelog* offsets. So it's > still > > necessary to retain the Position data, as well as the changelog offsets. > > What I meant in the KIP is that Position offsets are currently stored in > a > > file, and since we can atomically store metadata along with the record > > batch we commit to RocksDB, we can move our Position offsets in to this > > metadata too, and gain the same transactional guarantees that we will for > > changelog offsets, ensuring that the Position offsets are consistent with > > the records that are read from the database. > > > > Regards, > > Nick > > > > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org> wrote: > > > > > Thanks for publishing this alternative, Nick! > > > > > > The benchmark you mentioned in the KIP-844 discussion seems like a > > > compelling reason to revisit the built-in transactionality mechanism. I > > > also appreciate you analysis, showing that for most use cases, the > write > > > batch approach should be just fine. > > > > > > There are a couple of points that would hold me back from approving > this > > > KIP right now: > > > > > > 1. Loss of coverage for custom stores. > > > The fact that you can plug in a (relatively) simple implementation of > the > > > XStateStore interfaces and automagically get a distributed database out > > of > > > it is a significant benefit of Kafka Streams. I'd hate to lose it, so > it > > > would be better to spend some time and come up with a way to preserve > > that > > > property. For example, can we provide a default implementation of > > > `commit(..)` that re-implements the existing checkpoint-file approach? > Or > > > perhaps add an `isTransactional()` flag to the state store interface so > > > that the runtime can decide whether to continue to manage checkpoint > > files > > > vs delegating transactionality to the stores? > > > > > > 2. Guarding against OOME > > > I appreciate your analysis, but I don't think it's sufficient to say > that > > > we will solve the memory problem later if it becomes necessary. The > > > experience leading to that situation would be quite bad: Imagine, you > > > upgrade to AK 3.next, your tests pass, so you deploy to production. > That > > > night, you get paged because your app is now crashing with OOMEs. As > with > > > all OOMEs, you'll have a really hard time finding the root cause, and > > once > > > you do, you won't have a clear path to resolve the issue. You could > only > > > tune down the commit interval and cache buffer size until you stop > > getting > > > crashes. > > > > > > FYI, I know of multiple cases where people run EOS with much larger > > commit > > > intervals to get better batching than the default, so I don't think > this > > > pathological case would be as rare as you suspect. > > > > > > Given that we already have the rudiments of an idea of what we could do > > to > > > prevent this downside, we should take the time to design a solution. We > > owe > > > it to our users to ensure that awesome new features don't come with > > bitter > > > pills unless we can't avoid it. > > > > > > 3. ALOS mode. > > > On the other hand, I didn't see an indication of how stores will be > > > handled under ALOS (aka non-EOS) mode. Theoretically, the > > transactionality > > > of the store and the processing mode are orthogonal. A transactional > > store > > > would serve ALOS just as well as a non-transactional one (if not > better). > > > Under ALOS, though, the default commit interval is five minutes, so the > > > memory issue is far more pressing. > > > > > > As I see it, we have several options to resolve this point. We could > > > demonstrate that transactional stores work just fine for ALOS and we > can > > > therefore just swap over unconditionally. We could also disable the > > > transactional mechanism under ALOS so that stores operate just the same > > as > > > they do today when run in ALOS mode. Finally, we could do the same as > in > > > KIP-844 and make transactional stores opt-in (it'd be better to avoid > the > > > extra opt-in mechanism, but it's a good get-out-of-jail-free card). > > > > > > 4. (minor point) Deprecation of methods > > > > > > You mentioned that the new `commit` method replaces flush, > > > updateChangelogOffsets, and checkpoint. It seems to me that the point > > about > > > atomicity and Position also suggests that it replaces the Position > > > callbacks. However, the proposal only deprecates `flush`. Should we be > > > deprecating other methods as well? > > > > > > Thanks again for the KIP! It's really nice that you and Alex will get > the > > > chance to collaborate on both directions so that we can get the best > > > outcome for Streams and its users. > > > > > > -John > > > > > > > > > On 2022/11/21 15:02:15 Nick Telford wrote: > > > > Hi everyone, > > > > > > > > As I mentioned in the discussion thread for KIP-844, I've been > working > > on > > > > an alternative approach to achieving better transactional semantics > for > > > > Kafka Streams StateStores. > > > > > > > > I've published this separately as KIP-892: Transactional Semantics > for > > > > StateStores > > > > < > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > > >, > > > > so that it can be discussed/reviewed separately from KIP-844. > > > > > > > > Alex: I'm especially interested in what you think! > > > > > > > > I have a nearly complete implementation of the changes outlined in > this > > > > KIP, please let me know if you'd like me to push them for review in > > > advance > > > > of a vote. > > > > > > > > Regards, > > > > > > > > Nick > > > > > > > > > >