Hi Colt, 10: Yes, I agree it's not ideal. I originally intended to try to keep the behaviour unchanged as much as possible, otherwise we'd have to wait for a major version release to land these changes. 20: Good point, ALOS doesn't need the same level of guarantee, and the typically longer commit intervals would be problematic when reading only "committed" records.
I've been away for 5 days recovering from minor surgery, but I spent a considerable amount of that time working through ideas for possible solutions in my head. I think your suggestion of keeping ALOS as-is, but buffering writes for EOS is the right path forwards, although I have a solution that both expands on this, and provides for some more formal guarantees. Essentially, adding support to KeyValueStores for "Transactions", with clearly defined IsolationLevels. Using "Read Committed" when under EOS, and "Read Uncommitted" under ALOS. The nice thing about this approach is that it gives us much more clearly defined isolation behaviour that can be properly documented to ensure users know what to expect. I'm still working out the kinks in the design, and will update the KIP when I have something. The main struggle is trying to implement this without making any major changes to the existing interfaces or breaking existing implementations, because currently everything expects to operate directly on a StateStore, and not a Transaction of that store. I think I'm getting close, although sadly I won't be able to progress much until next week due to some work commitments. Regards, Nick On Thu, 1 Dec 2022 at 00:01, Colt McNealy <c...@littlehorse.io> wrote: > Nick, > > Thank you for the explanation, and also for the updated KIP. I am quite > eager for this improvement to be released as it would greatly reduce the > operational difficulties of EOS streams apps. > > Two questions: > > 10) > >When reading records, we will use the > WriteBatchWithIndex#getFromBatchAndDB > and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure > that uncommitted writes are available to query. > Why do extra work to enable the reading of uncommitted writes during IQ? > Code complexity aside, reading uncommitted writes is, in my opinion, a > minor flaw in EOS IQ; it would be very nice to have the guarantee that, > with EOS, IQ only reads committed records. In order to avoid dirty reads, > one currently must query a standby replica (but this still doesn't fully > guarantee monotonic reads). > > 20) Is it also necessary to enable this optimization on ALOS stores? The > motivation of KIP-844 was mainly to reduce the need to restore state from > scratch on unclean EOS shutdowns; with ALOS it was acceptable to accept > that there may have been uncommitted writes on disk. On a side note, if you > enable this type of store on ALOS processors, the community would > definitely want to enable queries on dirty reads; otherwise users would > have to wait 30 seconds (default) to see an update. > > Thank you for doing this fantastic work! > Colt McNealy > *Founder, LittleHorse.io* > > > On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <nick.telf...@gmail.com> > wrote: > > > Hi everyone, > > > > I've drastically reduced the scope of this KIP to no longer include the > > StateStore management of checkpointing. This can be added as a KIP later > on > > to further optimize the consistency and performance of state stores. > > > > I've also added a section discussing some of the concerns around > > concurrency, especially in the presence of Iterators. I'm thinking of > > wrapping WriteBatchWithIndex with a reference-counting copy-on-write > > implementation (that only makes a copy if there's an active iterator), > but > > I'm open to suggestions. > > > > Regards, > > Nick > > > > On Mon, 28 Nov 2022 at 16:36, Nick Telford <nick.telf...@gmail.com> > wrote: > > > > > Hi Colt, > > > > > > I didn't do any profiling, but the 844 implementation: > > > > > > - Writes uncommitted records to a temporary RocksDB instance > > > - Since tombstones need to be flagged, all record values are > > > prefixed with a value/tombstone marker. This necessitates a > memory > > copy. > > > - On-commit, iterates all records in this temporary instance and > > > writes them to the main RocksDB store. > > > - While iterating, the value/tombstone marker needs to be parsed and > > > the real value extracted. This necessitates another memory copy. > > > > > > My guess is that the cost of iterating the temporary RocksDB store is > the > > > major factor, with the 2 extra memory copies per-Record contributing a > > > significant amount too. > > > > > > Regards, > > > Nick > > > > > > On Mon, 28 Nov 2022 at 16:12, Colt McNealy <c...@littlehorse.io> > wrote: > > > > > >> Hi all, > > >> > > >> Out of curiosity, why does the performance of the store degrade so > > >> significantly with the 844 implementation? I wouldn't be too surprised > > by > > >> a > > >> 50-60% drop (caused by each record being written twice), but 96% is > > >> extreme. > > >> > > >> The only thing I can think of which could create such a bottleneck > would > > >> be > > >> that perhaps the 844 implementation deserializes and then > re-serializes > > >> the > > >> store values when copying from the uncommitted to committed store, > but I > > >> wasn't able to figure that out when I scanned the PR. > > >> > > >> Colt McNealy > > >> *Founder, LittleHorse.io* > > >> > > >> > > >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <nick.telf...@gmail.com> > > >> wrote: > > >> > > >> > Hi everyone, > > >> > > > >> > I've updated the KIP to resolve all the points that have been raised > > so > > >> > far, with one exception: the ALOS default commit interval of 5 > minutes > > >> is > > >> > likely to cause WriteBatchWithIndex memory to grow too large. > > >> > > > >> > There's a couple of different things I can think of to solve this: > > >> > > > >> > - We already have a memory/record limit in the KIP to prevent OOM > > >> > errors. Should we choose a default value for these? My concern > here > > >> is > > >> > that > > >> > anything we choose might seem rather arbitrary. We could change > > >> > its behaviour such that under ALOS, it only triggers the commit > of > > >> the > > >> > StateStore, but under EOS, it triggers a commit of the Kafka > > >> > transaction. > > >> > - We could introduce a separate `checkpoint.interval.ms` to > allow > > >> ALOS > > >> > to commit the StateStores more frequently than the general > > >> > commit.interval.ms? My concern here is that the semantics of > this > > >> > config > > >> > would depend on the processing.mode; under ALOS it would allow > more > > >> > frequently committing stores, whereas under EOS it couldn't. > > >> > > > >> > Any better ideas? > > >> > > > >> > On Wed, 23 Nov 2022 at 16:25, Nick Telford <nick.telf...@gmail.com> > > >> wrote: > > >> > > > >> > > Hi Alex, > > >> > > > > >> > > Thanks for the feedback. > > >> > > > > >> > > I've updated the discussion of OOM issues by describing how we'll > > >> handle > > >> > > it. Here's the new text: > > >> > > > > >> > > To mitigate this, we will automatically force a Task commit if the > > >> total > > >> > >> uncommitted records returned by > > >> > >> StateStore#approximateNumUncommittedEntries() exceeds a > threshold, > > >> > >> configured by max.uncommitted.state.entries.per.task; or the > total > > >> > >> memory used for buffering uncommitted records returned by > > >> > >> StateStore#approximateNumUncommittedBytes() exceeds the threshold > > >> > >> configured by max.uncommitted.state.bytes.per.task. This will > > roughly > > >> > >> bound the memory required per-Task for buffering uncommitted > > records, > > >> > >> irrespective of the commit.interval.ms, and will effectively > bound > > >> the > > >> > >> number of records that will need to be restored in the event of a > > >> > failure. > > >> > >> > > >> > > > > >> > > > > >> > > These limits will be checked in StreamTask#process and a premature > > >> commit > > >> > >> will be requested via Task#requestCommit(). > > >> > >> > > >> > > > > >> > > > > >> > > Note that these new methods provide default implementations that > > >> ensure > > >> > >> existing custom stores and non-transactional stores (e.g. > > >> > >> InMemoryKeyValueStore) do not force any early commits. > > >> > > > > >> > > > > >> > > I've chosen to have the StateStore expose approximations of its > > buffer > > >> > > size/count instead of opaquely requesting a commit in order to > > >> delegate > > >> > the > > >> > > decision making to the Task itself. This enables Tasks to look at > > >> *all* > > >> > of > > >> > > their StateStores, and determine whether an early commit is > > necessary. > > >> > > Notably, it enables pre-Task thresholds, instead of per-Store, > which > > >> > > prevents Tasks with many StateStores from using much more memory > > than > > >> > Tasks > > >> > > with one StateStore. This makes sense, since commits are done > > by-Task, > > >> > not > > >> > > by-Store. > > >> > > > > >> > > Prizes* for anyone who can come up with a better name for the new > > >> config > > >> > > properties! > > >> > > > > >> > > Thanks for pointing out the potential performance issues of WBWI. > > From > > >> > the > > >> > > benchmarks that user posted[1], it looks like WBWI still performs > > >> > > considerably better than individual puts, which is the existing > > >> design, > > >> > so > > >> > > I'd actually expect a performance boost from WBWI, just not as > great > > >> as > > >> > > we'd get from a plain WriteBatch. This does suggest that a good > > >> > > optimization would be to use a regular WriteBatch for restoration > > (in > > >> > > RocksDBStore#restoreBatch), since we know that those records will > > >> never > > >> > be > > >> > > queried before they're committed. > > >> > > > > >> > > 1: > > >> > > > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > > >> > > > > >> > > * Just kidding, no prizes, sadly. > > >> > > > > >> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov > > >> > > <asorokou...@confluent.io.invalid> wrote: > > >> > > > > >> > >> Hey Nick, > > >> > >> > > >> > >> Thank you for the KIP! With such a significant performance > > >> degradation > > >> > in > > >> > >> the secondary store approach, we should definitely consider > > >> > >> WriteBatchWithIndex. I also like encapsulating checkpointing > inside > > >> the > > >> > >> default state store implementation to improve performance. > > >> > >> > > >> > >> +1 to John's comment to keep the current checkpointing as a > > fallback > > >> > >> mechanism. We want to keep existing users' workflows intact if we > > >> can. A > > >> > >> non-intrusive way would be to add a separate StateStore method, > > say, > > >> > >> StateStore#managesCheckpointing(), that controls whether the > state > > >> store > > >> > >> implementation owns checkpointing. > > >> > >> > > >> > >> I think that a solution to the transactional writes should > address > > >> the > > >> > >> OOMEs. One possible way to address that is to wire StateStore's > > >> commit > > >> > >> request by adding, say, StateStore#commitNeeded that is checked > in > > >> > >> StreamTask#commitNeeded via the corresponding > > ProcessorStateManager. > > >> > With > > >> > >> that change, RocksDBStore will have to track the current > > transaction > > >> > size > > >> > >> and request a commit when the size goes over a (configurable) > > >> threshold. > > >> > >> > > >> > >> AFAIU WriteBatchWithIndex might perform significantly slower than > > >> > non-txn > > >> > >> puts as the batch size grows [1]. We should have a configuration > to > > >> fall > > >> > >> back to the current behavior (and/or disable txn stores for ALOS) > > >> unless > > >> > >> the benchmarks show negligible overhead for longer commits / > > >> > large-enough > > >> > >> batch sizes. > > >> > >> > > >> > >> If you prefer to keep the KIP smaller, I would rather cut out > > >> > >> state-store-managed checkpointing rather than proper OOMe > handling > > >> and > > >> > >> being able to switch to non-txn behavior. The checkpointing is > not > > >> > >> necessary to solve the recovery-under-EOS problem. On the other > > hand, > > >> > once > > >> > >> WriteBatchWithIndex is in, it will be much easier to add > > >> > >> state-store-managed checkpointing. > > >> > >> > > >> > >> If you share the current implementation, I am happy to help you > > >> address > > >> > >> the > > >> > >> OOMe and configuration parts as well as review and test the > patch. > > >> > >> > > >> > >> Best, > > >> > >> Alex > > >> > >> > > >> > >> > > >> > >> 1. https://github.com/facebook/rocksdb/issues/608 > > >> > >> > > >> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford < > > nick.telf...@gmail.com > > >> > > > >> > >> wrote: > > >> > >> > > >> > >> > Hi John, > > >> > >> > > > >> > >> > Thanks for the review and feedback! > > >> > >> > > > >> > >> > 1. Custom Stores: I've been mulling over this problem myself. > As > > it > > >> > >> stands, > > >> > >> > custom stores would essentially lose checkpointing with no > > >> indication > > >> > >> that > > >> > >> > they're expected to make changes, besides a line in the release > > >> > notes. I > > >> > >> > agree that the best solution would be to provide a default that > > >> > >> checkpoints > > >> > >> > to a file. The one thing I would change is that the > checkpointing > > >> is > > >> > to > > >> > >> a > > >> > >> > store-local file, instead of a per-Task file. This way the > > >> StateStore > > >> > >> still > > >> > >> > technically owns its own checkpointing (via a default > > >> implementation), > > >> > >> and > > >> > >> > the StateManager/Task execution engine doesn't need to know > > >> anything > > >> > >> about > > >> > >> > checkpointing, which greatly simplifies some of the logic. > > >> > >> > > > >> > >> > 2. OOME errors: The main reasons why I didn't explore a > solution > > to > > >> > >> this is > > >> > >> > a) to keep this KIP as simple as possible, and b) because I'm > not > > >> > >> exactly > > >> > >> > how to signal that a Task should commit prematurely. I'm > > confident > > >> > it's > > >> > >> > possible, and I think it's worth adding a section on handling > > this. > > >> > >> Besides > > >> > >> > my proposal to force an early commit once memory usage reaches > a > > >> > >> threshold, > > >> > >> > is there any other approach that you might suggest for tackling > > >> this > > >> > >> > problem? > > >> > >> > > > >> > >> > 3. ALOS: I can add in an explicit paragraph, but my assumption > is > > >> that > > >> > >> > since transactional behaviour comes at little/no cost, that it > > >> should > > >> > be > > >> > >> > available by default on all stores, irrespective of the > > processing > > >> > mode. > > >> > >> > While ALOS doesn't use transactions, the Task itself still > > >> "commits", > > >> > so > > >> > >> > the behaviour should be correct under ALOS too. I'm not > convinced > > >> that > > >> > >> it's > > >> > >> > worth having both transactional/non-transactional stores > > >> available, as > > >> > >> it > > >> > >> > would considerably increase the complexity of the codebase, for > > >> very > > >> > >> little > > >> > >> > benefit. > > >> > >> > > > >> > >> > 4. Method deprecation: Are you referring to > > >> StateStore#getPosition()? > > >> > >> As I > > >> > >> > understand it, Position contains the position of the *source* > > >> topics, > > >> > >> > whereas the commit offsets would be the *changelog* offsets. So > > >> it's > > >> > >> still > > >> > >> > necessary to retain the Position data, as well as the changelog > > >> > offsets. > > >> > >> > What I meant in the KIP is that Position offsets are currently > > >> stored > > >> > >> in a > > >> > >> > file, and since we can atomically store metadata along with the > > >> record > > >> > >> > batch we commit to RocksDB, we can move our Position offsets in > > to > > >> > this > > >> > >> > metadata too, and gain the same transactional guarantees that > we > > >> will > > >> > >> for > > >> > >> > changelog offsets, ensuring that the Position offsets are > > >> consistent > > >> > >> with > > >> > >> > the records that are read from the database. > > >> > >> > > > >> > >> > Regards, > > >> > >> > Nick > > >> > >> > > > >> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler < > vvcep...@apache.org> > > >> > wrote: > > >> > >> > > > >> > >> > > Thanks for publishing this alternative, Nick! > > >> > >> > > > > >> > >> > > The benchmark you mentioned in the KIP-844 discussion seems > > like > > >> a > > >> > >> > > compelling reason to revisit the built-in transactionality > > >> > mechanism. > > >> > >> I > > >> > >> > > also appreciate you analysis, showing that for most use > cases, > > >> the > > >> > >> write > > >> > >> > > batch approach should be just fine. > > >> > >> > > > > >> > >> > > There are a couple of points that would hold me back from > > >> approving > > >> > >> this > > >> > >> > > KIP right now: > > >> > >> > > > > >> > >> > > 1. Loss of coverage for custom stores. > > >> > >> > > The fact that you can plug in a (relatively) simple > > >> implementation > > >> > of > > >> > >> the > > >> > >> > > XStateStore interfaces and automagically get a distributed > > >> database > > >> > >> out > > >> > >> > of > > >> > >> > > it is a significant benefit of Kafka Streams. I'd hate to > lose > > >> it, > > >> > so > > >> > >> it > > >> > >> > > would be better to spend some time and come up with a way to > > >> > preserve > > >> > >> > that > > >> > >> > > property. For example, can we provide a default > implementation > > of > > >> > >> > > `commit(..)` that re-implements the existing checkpoint-file > > >> > >> approach? Or > > >> > >> > > perhaps add an `isTransactional()` flag to the state store > > >> interface > > >> > >> so > > >> > >> > > that the runtime can decide whether to continue to manage > > >> checkpoint > > >> > >> > files > > >> > >> > > vs delegating transactionality to the stores? > > >> > >> > > > > >> > >> > > 2. Guarding against OOME > > >> > >> > > I appreciate your analysis, but I don't think it's sufficient > > to > > >> say > > >> > >> that > > >> > >> > > we will solve the memory problem later if it becomes > necessary. > > >> The > > >> > >> > > experience leading to that situation would be quite bad: > > Imagine, > > >> > you > > >> > >> > > upgrade to AK 3.next, your tests pass, so you deploy to > > >> production. > > >> > >> That > > >> > >> > > night, you get paged because your app is now crashing with > > >> OOMEs. As > > >> > >> with > > >> > >> > > all OOMEs, you'll have a really hard time finding the root > > cause, > > >> > and > > >> > >> > once > > >> > >> > > you do, you won't have a clear path to resolve the issue. You > > >> could > > >> > >> only > > >> > >> > > tune down the commit interval and cache buffer size until you > > >> stop > > >> > >> > getting > > >> > >> > > crashes. > > >> > >> > > > > >> > >> > > FYI, I know of multiple cases where people run EOS with much > > >> larger > > >> > >> > commit > > >> > >> > > intervals to get better batching than the default, so I don't > > >> think > > >> > >> this > > >> > >> > > pathological case would be as rare as you suspect. > > >> > >> > > > > >> > >> > > Given that we already have the rudiments of an idea of what > we > > >> could > > >> > >> do > > >> > >> > to > > >> > >> > > prevent this downside, we should take the time to design a > > >> solution. > > >> > >> We > > >> > >> > owe > > >> > >> > > it to our users to ensure that awesome new features don't > come > > >> with > > >> > >> > bitter > > >> > >> > > pills unless we can't avoid it. > > >> > >> > > > > >> > >> > > 3. ALOS mode. > > >> > >> > > On the other hand, I didn't see an indication of how stores > > will > > >> be > > >> > >> > > handled under ALOS (aka non-EOS) mode. Theoretically, the > > >> > >> > transactionality > > >> > >> > > of the store and the processing mode are orthogonal. A > > >> transactional > > >> > >> > store > > >> > >> > > would serve ALOS just as well as a non-transactional one (if > > not > > >> > >> better). > > >> > >> > > Under ALOS, though, the default commit interval is five > > minutes, > > >> so > > >> > >> the > > >> > >> > > memory issue is far more pressing. > > >> > >> > > > > >> > >> > > As I see it, we have several options to resolve this point. > We > > >> could > > >> > >> > > demonstrate that transactional stores work just fine for ALOS > > >> and we > > >> > >> can > > >> > >> > > therefore just swap over unconditionally. We could also > disable > > >> the > > >> > >> > > transactional mechanism under ALOS so that stores operate > just > > >> the > > >> > >> same > > >> > >> > as > > >> > >> > > they do today when run in ALOS mode. Finally, we could do the > > >> same > > >> > as > > >> > >> in > > >> > >> > > KIP-844 and make transactional stores opt-in (it'd be better > to > > >> > avoid > > >> > >> the > > >> > >> > > extra opt-in mechanism, but it's a good get-out-of-jail-free > > >> card). > > >> > >> > > > > >> > >> > > 4. (minor point) Deprecation of methods > > >> > >> > > > > >> > >> > > You mentioned that the new `commit` method replaces flush, > > >> > >> > > updateChangelogOffsets, and checkpoint. It seems to me that > the > > >> > point > > >> > >> > about > > >> > >> > > atomicity and Position also suggests that it replaces the > > >> Position > > >> > >> > > callbacks. However, the proposal only deprecates `flush`. > > Should > > >> we > > >> > be > > >> > >> > > deprecating other methods as well? > > >> > >> > > > > >> > >> > > Thanks again for the KIP! It's really nice that you and Alex > > will > > >> > get > > >> > >> the > > >> > >> > > chance to collaborate on both directions so that we can get > the > > >> best > > >> > >> > > outcome for Streams and its users. > > >> > >> > > > > >> > >> > > -John > > >> > >> > > > > >> > >> > > > > >> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote: > > >> > >> > > > Hi everyone, > > >> > >> > > > > > >> > >> > > > As I mentioned in the discussion thread for KIP-844, I've > > been > > >> > >> working > > >> > >> > on > > >> > >> > > > an alternative approach to achieving better transactional > > >> > semantics > > >> > >> for > > >> > >> > > > Kafka Streams StateStores. > > >> > >> > > > > > >> > >> > > > I've published this separately as KIP-892: Transactional > > >> Semantics > > >> > >> for > > >> > >> > > > StateStores > > >> > >> > > > < > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > >> > >> > > >, > > >> > >> > > > so that it can be discussed/reviewed separately from > KIP-844. > > >> > >> > > > > > >> > >> > > > Alex: I'm especially interested in what you think! > > >> > >> > > > > > >> > >> > > > I have a nearly complete implementation of the changes > > >> outlined in > > >> > >> this > > >> > >> > > > KIP, please let me know if you'd like me to push them for > > >> review > > >> > in > > >> > >> > > advance > > >> > >> > > > of a vote. > > >> > >> > > > > > >> > >> > > > Regards, > > >> > >> > > > > > >> > >> > > > Nick > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > > >> > > > >> > > > > > >