Hi everyone, I've drastically reduced the scope of this KIP to no longer include the StateStore management of checkpointing. This can be added as a KIP later on to further optimize the consistency and performance of state stores.
I've also added a section discussing some of the concerns around concurrency, especially in the presence of Iterators. I'm thinking of wrapping WriteBatchWithIndex with a reference-counting copy-on-write implementation (that only makes a copy if there's an active iterator), but I'm open to suggestions. Regards, Nick On Mon, 28 Nov 2022 at 16:36, Nick Telford <nick.telf...@gmail.com> wrote: > Hi Colt, > > I didn't do any profiling, but the 844 implementation: > > - Writes uncommitted records to a temporary RocksDB instance > - Since tombstones need to be flagged, all record values are > prefixed with a value/tombstone marker. This necessitates a memory copy. > - On-commit, iterates all records in this temporary instance and > writes them to the main RocksDB store. > - While iterating, the value/tombstone marker needs to be parsed and > the real value extracted. This necessitates another memory copy. > > My guess is that the cost of iterating the temporary RocksDB store is the > major factor, with the 2 extra memory copies per-Record contributing a > significant amount too. > > Regards, > Nick > > On Mon, 28 Nov 2022 at 16:12, Colt McNealy <c...@littlehorse.io> wrote: > >> Hi all, >> >> Out of curiosity, why does the performance of the store degrade so >> significantly with the 844 implementation? I wouldn't be too surprised by >> a >> 50-60% drop (caused by each record being written twice), but 96% is >> extreme. >> >> The only thing I can think of which could create such a bottleneck would >> be >> that perhaps the 844 implementation deserializes and then re-serializes >> the >> store values when copying from the uncommitted to committed store, but I >> wasn't able to figure that out when I scanned the PR. >> >> Colt McNealy >> *Founder, LittleHorse.io* >> >> >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <nick.telf...@gmail.com> >> wrote: >> >> > Hi everyone, >> > >> > I've updated the KIP to resolve all the points that have been raised so >> > far, with one exception: the ALOS default commit interval of 5 minutes >> is >> > likely to cause WriteBatchWithIndex memory to grow too large. >> > >> > There's a couple of different things I can think of to solve this: >> > >> > - We already have a memory/record limit in the KIP to prevent OOM >> > errors. Should we choose a default value for these? My concern here >> is >> > that >> > anything we choose might seem rather arbitrary. We could change >> > its behaviour such that under ALOS, it only triggers the commit of >> the >> > StateStore, but under EOS, it triggers a commit of the Kafka >> > transaction. >> > - We could introduce a separate `checkpoint.interval.ms` to allow >> ALOS >> > to commit the StateStores more frequently than the general >> > commit.interval.ms? My concern here is that the semantics of this >> > config >> > would depend on the processing.mode; under ALOS it would allow more >> > frequently committing stores, whereas under EOS it couldn't. >> > >> > Any better ideas? >> > >> > On Wed, 23 Nov 2022 at 16:25, Nick Telford <nick.telf...@gmail.com> >> wrote: >> > >> > > Hi Alex, >> > > >> > > Thanks for the feedback. >> > > >> > > I've updated the discussion of OOM issues by describing how we'll >> handle >> > > it. Here's the new text: >> > > >> > > To mitigate this, we will automatically force a Task commit if the >> total >> > >> uncommitted records returned by >> > >> StateStore#approximateNumUncommittedEntries() exceeds a threshold, >> > >> configured by max.uncommitted.state.entries.per.task; or the total >> > >> memory used for buffering uncommitted records returned by >> > >> StateStore#approximateNumUncommittedBytes() exceeds the threshold >> > >> configured by max.uncommitted.state.bytes.per.task. This will roughly >> > >> bound the memory required per-Task for buffering uncommitted records, >> > >> irrespective of the commit.interval.ms, and will effectively bound >> the >> > >> number of records that will need to be restored in the event of a >> > failure. >> > >> >> > > >> > > >> > > These limits will be checked in StreamTask#process and a premature >> commit >> > >> will be requested via Task#requestCommit(). >> > >> >> > > >> > > >> > > Note that these new methods provide default implementations that >> ensure >> > >> existing custom stores and non-transactional stores (e.g. >> > >> InMemoryKeyValueStore) do not force any early commits. >> > > >> > > >> > > I've chosen to have the StateStore expose approximations of its buffer >> > > size/count instead of opaquely requesting a commit in order to >> delegate >> > the >> > > decision making to the Task itself. This enables Tasks to look at >> *all* >> > of >> > > their StateStores, and determine whether an early commit is necessary. >> > > Notably, it enables pre-Task thresholds, instead of per-Store, which >> > > prevents Tasks with many StateStores from using much more memory than >> > Tasks >> > > with one StateStore. This makes sense, since commits are done by-Task, >> > not >> > > by-Store. >> > > >> > > Prizes* for anyone who can come up with a better name for the new >> config >> > > properties! >> > > >> > > Thanks for pointing out the potential performance issues of WBWI. From >> > the >> > > benchmarks that user posted[1], it looks like WBWI still performs >> > > considerably better than individual puts, which is the existing >> design, >> > so >> > > I'd actually expect a performance boost from WBWI, just not as great >> as >> > > we'd get from a plain WriteBatch. This does suggest that a good >> > > optimization would be to use a regular WriteBatch for restoration (in >> > > RocksDBStore#restoreBatch), since we know that those records will >> never >> > be >> > > queried before they're committed. >> > > >> > > 1: >> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results >> > > >> > > * Just kidding, no prizes, sadly. >> > > >> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov >> > > <asorokou...@confluent.io.invalid> wrote: >> > > >> > >> Hey Nick, >> > >> >> > >> Thank you for the KIP! With such a significant performance >> degradation >> > in >> > >> the secondary store approach, we should definitely consider >> > >> WriteBatchWithIndex. I also like encapsulating checkpointing inside >> the >> > >> default state store implementation to improve performance. >> > >> >> > >> +1 to John's comment to keep the current checkpointing as a fallback >> > >> mechanism. We want to keep existing users' workflows intact if we >> can. A >> > >> non-intrusive way would be to add a separate StateStore method, say, >> > >> StateStore#managesCheckpointing(), that controls whether the state >> store >> > >> implementation owns checkpointing. >> > >> >> > >> I think that a solution to the transactional writes should address >> the >> > >> OOMEs. One possible way to address that is to wire StateStore's >> commit >> > >> request by adding, say, StateStore#commitNeeded that is checked in >> > >> StreamTask#commitNeeded via the corresponding ProcessorStateManager. >> > With >> > >> that change, RocksDBStore will have to track the current transaction >> > size >> > >> and request a commit when the size goes over a (configurable) >> threshold. >> > >> >> > >> AFAIU WriteBatchWithIndex might perform significantly slower than >> > non-txn >> > >> puts as the batch size grows [1]. We should have a configuration to >> fall >> > >> back to the current behavior (and/or disable txn stores for ALOS) >> unless >> > >> the benchmarks show negligible overhead for longer commits / >> > large-enough >> > >> batch sizes. >> > >> >> > >> If you prefer to keep the KIP smaller, I would rather cut out >> > >> state-store-managed checkpointing rather than proper OOMe handling >> and >> > >> being able to switch to non-txn behavior. The checkpointing is not >> > >> necessary to solve the recovery-under-EOS problem. On the other hand, >> > once >> > >> WriteBatchWithIndex is in, it will be much easier to add >> > >> state-store-managed checkpointing. >> > >> >> > >> If you share the current implementation, I am happy to help you >> address >> > >> the >> > >> OOMe and configuration parts as well as review and test the patch. >> > >> >> > >> Best, >> > >> Alex >> > >> >> > >> >> > >> 1. https://github.com/facebook/rocksdb/issues/608 >> > >> >> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com >> > >> > >> wrote: >> > >> >> > >> > Hi John, >> > >> > >> > >> > Thanks for the review and feedback! >> > >> > >> > >> > 1. Custom Stores: I've been mulling over this problem myself. As it >> > >> stands, >> > >> > custom stores would essentially lose checkpointing with no >> indication >> > >> that >> > >> > they're expected to make changes, besides a line in the release >> > notes. I >> > >> > agree that the best solution would be to provide a default that >> > >> checkpoints >> > >> > to a file. The one thing I would change is that the checkpointing >> is >> > to >> > >> a >> > >> > store-local file, instead of a per-Task file. This way the >> StateStore >> > >> still >> > >> > technically owns its own checkpointing (via a default >> implementation), >> > >> and >> > >> > the StateManager/Task execution engine doesn't need to know >> anything >> > >> about >> > >> > checkpointing, which greatly simplifies some of the logic. >> > >> > >> > >> > 2. OOME errors: The main reasons why I didn't explore a solution to >> > >> this is >> > >> > a) to keep this KIP as simple as possible, and b) because I'm not >> > >> exactly >> > >> > how to signal that a Task should commit prematurely. I'm confident >> > it's >> > >> > possible, and I think it's worth adding a section on handling this. >> > >> Besides >> > >> > my proposal to force an early commit once memory usage reaches a >> > >> threshold, >> > >> > is there any other approach that you might suggest for tackling >> this >> > >> > problem? >> > >> > >> > >> > 3. ALOS: I can add in an explicit paragraph, but my assumption is >> that >> > >> > since transactional behaviour comes at little/no cost, that it >> should >> > be >> > >> > available by default on all stores, irrespective of the processing >> > mode. >> > >> > While ALOS doesn't use transactions, the Task itself still >> "commits", >> > so >> > >> > the behaviour should be correct under ALOS too. I'm not convinced >> that >> > >> it's >> > >> > worth having both transactional/non-transactional stores >> available, as >> > >> it >> > >> > would considerably increase the complexity of the codebase, for >> very >> > >> little >> > >> > benefit. >> > >> > >> > >> > 4. Method deprecation: Are you referring to >> StateStore#getPosition()? >> > >> As I >> > >> > understand it, Position contains the position of the *source* >> topics, >> > >> > whereas the commit offsets would be the *changelog* offsets. So >> it's >> > >> still >> > >> > necessary to retain the Position data, as well as the changelog >> > offsets. >> > >> > What I meant in the KIP is that Position offsets are currently >> stored >> > >> in a >> > >> > file, and since we can atomically store metadata along with the >> record >> > >> > batch we commit to RocksDB, we can move our Position offsets in to >> > this >> > >> > metadata too, and gain the same transactional guarantees that we >> will >> > >> for >> > >> > changelog offsets, ensuring that the Position offsets are >> consistent >> > >> with >> > >> > the records that are read from the database. >> > >> > >> > >> > Regards, >> > >> > Nick >> > >> > >> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org> >> > wrote: >> > >> > >> > >> > > Thanks for publishing this alternative, Nick! >> > >> > > >> > >> > > The benchmark you mentioned in the KIP-844 discussion seems like >> a >> > >> > > compelling reason to revisit the built-in transactionality >> > mechanism. >> > >> I >> > >> > > also appreciate you analysis, showing that for most use cases, >> the >> > >> write >> > >> > > batch approach should be just fine. >> > >> > > >> > >> > > There are a couple of points that would hold me back from >> approving >> > >> this >> > >> > > KIP right now: >> > >> > > >> > >> > > 1. Loss of coverage for custom stores. >> > >> > > The fact that you can plug in a (relatively) simple >> implementation >> > of >> > >> the >> > >> > > XStateStore interfaces and automagically get a distributed >> database >> > >> out >> > >> > of >> > >> > > it is a significant benefit of Kafka Streams. I'd hate to lose >> it, >> > so >> > >> it >> > >> > > would be better to spend some time and come up with a way to >> > preserve >> > >> > that >> > >> > > property. For example, can we provide a default implementation of >> > >> > > `commit(..)` that re-implements the existing checkpoint-file >> > >> approach? Or >> > >> > > perhaps add an `isTransactional()` flag to the state store >> interface >> > >> so >> > >> > > that the runtime can decide whether to continue to manage >> checkpoint >> > >> > files >> > >> > > vs delegating transactionality to the stores? >> > >> > > >> > >> > > 2. Guarding against OOME >> > >> > > I appreciate your analysis, but I don't think it's sufficient to >> say >> > >> that >> > >> > > we will solve the memory problem later if it becomes necessary. >> The >> > >> > > experience leading to that situation would be quite bad: Imagine, >> > you >> > >> > > upgrade to AK 3.next, your tests pass, so you deploy to >> production. >> > >> That >> > >> > > night, you get paged because your app is now crashing with >> OOMEs. As >> > >> with >> > >> > > all OOMEs, you'll have a really hard time finding the root cause, >> > and >> > >> > once >> > >> > > you do, you won't have a clear path to resolve the issue. You >> could >> > >> only >> > >> > > tune down the commit interval and cache buffer size until you >> stop >> > >> > getting >> > >> > > crashes. >> > >> > > >> > >> > > FYI, I know of multiple cases where people run EOS with much >> larger >> > >> > commit >> > >> > > intervals to get better batching than the default, so I don't >> think >> > >> this >> > >> > > pathological case would be as rare as you suspect. >> > >> > > >> > >> > > Given that we already have the rudiments of an idea of what we >> could >> > >> do >> > >> > to >> > >> > > prevent this downside, we should take the time to design a >> solution. >> > >> We >> > >> > owe >> > >> > > it to our users to ensure that awesome new features don't come >> with >> > >> > bitter >> > >> > > pills unless we can't avoid it. >> > >> > > >> > >> > > 3. ALOS mode. >> > >> > > On the other hand, I didn't see an indication of how stores will >> be >> > >> > > handled under ALOS (aka non-EOS) mode. Theoretically, the >> > >> > transactionality >> > >> > > of the store and the processing mode are orthogonal. A >> transactional >> > >> > store >> > >> > > would serve ALOS just as well as a non-transactional one (if not >> > >> better). >> > >> > > Under ALOS, though, the default commit interval is five minutes, >> so >> > >> the >> > >> > > memory issue is far more pressing. >> > >> > > >> > >> > > As I see it, we have several options to resolve this point. We >> could >> > >> > > demonstrate that transactional stores work just fine for ALOS >> and we >> > >> can >> > >> > > therefore just swap over unconditionally. We could also disable >> the >> > >> > > transactional mechanism under ALOS so that stores operate just >> the >> > >> same >> > >> > as >> > >> > > they do today when run in ALOS mode. Finally, we could do the >> same >> > as >> > >> in >> > >> > > KIP-844 and make transactional stores opt-in (it'd be better to >> > avoid >> > >> the >> > >> > > extra opt-in mechanism, but it's a good get-out-of-jail-free >> card). >> > >> > > >> > >> > > 4. (minor point) Deprecation of methods >> > >> > > >> > >> > > You mentioned that the new `commit` method replaces flush, >> > >> > > updateChangelogOffsets, and checkpoint. It seems to me that the >> > point >> > >> > about >> > >> > > atomicity and Position also suggests that it replaces the >> Position >> > >> > > callbacks. However, the proposal only deprecates `flush`. Should >> we >> > be >> > >> > > deprecating other methods as well? >> > >> > > >> > >> > > Thanks again for the KIP! It's really nice that you and Alex will >> > get >> > >> the >> > >> > > chance to collaborate on both directions so that we can get the >> best >> > >> > > outcome for Streams and its users. >> > >> > > >> > >> > > -John >> > >> > > >> > >> > > >> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote: >> > >> > > > Hi everyone, >> > >> > > > >> > >> > > > As I mentioned in the discussion thread for KIP-844, I've been >> > >> working >> > >> > on >> > >> > > > an alternative approach to achieving better transactional >> > semantics >> > >> for >> > >> > > > Kafka Streams StateStores. >> > >> > > > >> > >> > > > I've published this separately as KIP-892: Transactional >> Semantics >> > >> for >> > >> > > > StateStores >> > >> > > > < >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> > >> > > >, >> > >> > > > so that it can be discussed/reviewed separately from KIP-844. >> > >> > > > >> > >> > > > Alex: I'm especially interested in what you think! >> > >> > > > >> > >> > > > I have a nearly complete implementation of the changes >> outlined in >> > >> this >> > >> > > > KIP, please let me know if you'd like me to push them for >> review >> > in >> > >> > > advance >> > >> > > > of a vote. >> > >> > > > >> > >> > > > Regards, >> > >> > > > >> > >> > > > Nick >> > >> > > > >> > >> > > >> > >> > >> > >> >> > > >> > >> >