Hi everyone, I've updated the KIP to reflect the latest version of the design: https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
There are several changes in there that reflect feedback from this thread, and there's a new section and a bunch of interface changes relating to Atomic Checkpointing, which is the final piece of the puzzle to making everything robust. Let me know what you think! Regards, Nick On Tue, 3 Jan 2023 at 11:33, Nick Telford <nick.telf...@gmail.com> wrote: > Hi Lucas, > > Thanks for looking over my KIP. > > A) The bound is per-instance, not per-Task. This was a typo in the KIP > that I've now corrected. It was originally per-Task, but I changed it to > per-instance for exactly the reason you highlighted. > B) It's worth noting that transactionality is only enabled under EOS, and > in the default mode of operation (ALOS), there should be no change in > behavior at all. I think, under EOS, we can mitigate the impact on users by > sufficiently low default values for the memory bound configuration. I > understand your hesitation to include a significant change of behaviour, > especially in a minor release, but I suspect that most users will prefer > the memory impact (under EOS) to the existing behaviour of frequent state > restorations! If this is a problem, the changes can wait until the next > major release. I'll be running a patched version of streams in production > with these changes as soon as they're ready, so it won't disrupt me :-D > C) The main purpose of this sentence was just to note that some changes > will need to be made to the way Segments are handled in order to ensure > they also benefit from transactions. At the time I wrote it, I hadn't > figured out the specific changes necessary, so it was deliberately vague. > This is the one outstanding problem I'm currently working on, and I'll > update this section with more detail once I have figured out the exact > changes required. > D) newTransaction() provides the necessary isolation guarantees. While > the RocksDB implementation of transactions doesn't technically *need* > read-only users to call newTransaction(), other implementations (e.g. a > hypothetical PostgresStore) may require it. Calling newTransaction() when > no transaction is necessary is essentially free, as it will just return > this. > > I didn't do any profiling of the KIP-844 PoC, but I think it should be > fairly obvious where the performance problems stem from: writes under > KIP-844 require 3 extra memory-copies: 1 to encode it with the > tombstone/record flag, 1 to decode it from the tombstone/record flag, and 1 > to copy the record from the "temporary" store to the "main" store, when the > transaction commits. The different approach taken by KIP-869 should perform > much better, as it avoids all these copies, and may actually perform > slightly better than trunk, due to batched writes in RocksDB performing > better than non-batched writes.[1] > > Regards, > Nick > > 1: https://github.com/adamretter/rocksjava-write-methods-benchmark#results > > On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <lbruts...@confluent.io.invalid> > wrote: > >> Hi Nick, >> >> I'm just starting to read up on the whole discussion about KIP-892 and >> KIP-844. Thanks a lot for your work on this, I do think >> `WriteBatchWithIndex` may be the way to go here. I do have some >> questions about the latest draft. >> >> A) If I understand correctly, you propose to put a bound on the >> (native) memory consumed by each task. However, I wonder if this is >> sufficient if we have temporary imbalances in the cluster. For >> example, depending on the timing of rebalances during a cluster >> restart, it could happen that a single streams node is assigned a lot >> more tasks than expected. With your proposed change, this would mean >> that the memory required by this one node could be a multiple of what >> is required during normal operation. I wonder if it wouldn't be safer >> to put a global bound on the memory use, across all tasks. >> B) Generally, the memory concerns still give me the feeling that this >> should not be enabled by default for all users in a minor release. >> C) In section "Transaction Management": the sentence "A similar >> analogue will be created to automatically manage `Segment` >> transactions.". Maybe this is just me lacking some background, but I >> do not understand this, it would be great if you could clarify what >> you mean here. >> D) Could you please clarify why IQ has to call newTransaction(), when >> it's read-only. >> >> And one last thing not strictly related to your KIP: if there is an >> easy way for you to find out why the KIP-844 PoC is 20x slower (e.g. >> by providing a flame graph), that would be quite interesting. >> >> Cheers, >> Lucas >> >> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <nick.telf...@gmail.com> >> wrote: >> > >> > Hi everyone, >> > >> > I've updated the KIP with a more detailed design, which reflects the >> > implementation I've been working on: >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> > >> > This new design should address the outstanding points already made in >> the >> > thread. >> > >> > Please let me know if there are areas that are unclear or need more >> > clarification. >> > >> > I have a (nearly) working implementation. I'm confident that the >> remaining >> > work (making Segments behave) will not impact the documented design. >> > >> > Regards, >> > >> > Nick >> > >> > On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io> wrote: >> > >> > > Nick, >> > > >> > > Thank you for the reply; that makes sense. I was hoping that, since >> reading >> > > uncommitted records from IQ in EOS isn't part of the documented API, >> maybe >> > > you *wouldn't* have to wait for the next major release to make that >> change; >> > > but given that it would be considered a major change, I like your >> approach >> > > the best. >> > > >> > > Wishing you a speedy recovery and happy coding! >> > > >> > > Thanks, >> > > Colt McNealy >> > > *Founder, LittleHorse.io* >> > > >> > > >> > > On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <nick.telf...@gmail.com> >> > > wrote: >> > > >> > > > Hi Colt, >> > > > >> > > > 10: Yes, I agree it's not ideal. I originally intended to try to >> keep the >> > > > behaviour unchanged as much as possible, otherwise we'd have to >> wait for >> > > a >> > > > major version release to land these changes. >> > > > 20: Good point, ALOS doesn't need the same level of guarantee, and >> the >> > > > typically longer commit intervals would be problematic when reading >> only >> > > > "committed" records. >> > > > >> > > > I've been away for 5 days recovering from minor surgery, but I >> spent a >> > > > considerable amount of that time working through ideas for possible >> > > > solutions in my head. I think your suggestion of keeping ALOS >> as-is, but >> > > > buffering writes for EOS is the right path forwards, although I >> have a >> > > > solution that both expands on this, and provides for some more >> formal >> > > > guarantees. >> > > > >> > > > Essentially, adding support to KeyValueStores for "Transactions", >> with >> > > > clearly defined IsolationLevels. Using "Read Committed" when under >> EOS, >> > > and >> > > > "Read Uncommitted" under ALOS. >> > > > >> > > > The nice thing about this approach is that it gives us much more >> clearly >> > > > defined isolation behaviour that can be properly documented to >> ensure >> > > users >> > > > know what to expect. >> > > > >> > > > I'm still working out the kinks in the design, and will update the >> KIP >> > > when >> > > > I have something. The main struggle is trying to implement this >> without >> > > > making any major changes to the existing interfaces or breaking >> existing >> > > > implementations, because currently everything expects to operate >> directly >> > > > on a StateStore, and not a Transaction of that store. I think I'm >> getting >> > > > close, although sadly I won't be able to progress much until next >> week >> > > due >> > > > to some work commitments. >> > > > >> > > > Regards, >> > > > Nick >> > > > >> > > > On Thu, 1 Dec 2022 at 00:01, Colt McNealy <c...@littlehorse.io> >> wrote: >> > > > >> > > > > Nick, >> > > > > >> > > > > Thank you for the explanation, and also for the updated KIP. I am >> quite >> > > > > eager for this improvement to be released as it would greatly >> reduce >> > > the >> > > > > operational difficulties of EOS streams apps. >> > > > > >> > > > > Two questions: >> > > > > >> > > > > 10) >> > > > > >When reading records, we will use the >> > > > > WriteBatchWithIndex#getFromBatchAndDB >> > > > > and WriteBatchWithIndex#newIteratorWithBase utilities in order to >> > > ensure >> > > > > that uncommitted writes are available to query. >> > > > > Why do extra work to enable the reading of uncommitted writes >> during >> > > IQ? >> > > > > Code complexity aside, reading uncommitted writes is, in my >> opinion, a >> > > > > minor flaw in EOS IQ; it would be very nice to have the guarantee >> that, >> > > > > with EOS, IQ only reads committed records. In order to avoid dirty >> > > reads, >> > > > > one currently must query a standby replica (but this still doesn't >> > > fully >> > > > > guarantee monotonic reads). >> > > > > >> > > > > 20) Is it also necessary to enable this optimization on ALOS >> stores? >> > > The >> > > > > motivation of KIP-844 was mainly to reduce the need to restore >> state >> > > from >> > > > > scratch on unclean EOS shutdowns; with ALOS it was acceptable to >> accept >> > > > > that there may have been uncommitted writes on disk. On a side >> note, if >> > > > you >> > > > > enable this type of store on ALOS processors, the community would >> > > > > definitely want to enable queries on dirty reads; otherwise users >> would >> > > > > have to wait 30 seconds (default) to see an update. >> > > > > >> > > > > Thank you for doing this fantastic work! >> > > > > Colt McNealy >> > > > > *Founder, LittleHorse.io* >> > > > > >> > > > > >> > > > > On Wed, Nov 30, 2022 at 10:44 AM Nick Telford < >> nick.telf...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Hi everyone, >> > > > > > >> > > > > > I've drastically reduced the scope of this KIP to no longer >> include >> > > the >> > > > > > StateStore management of checkpointing. This can be added as a >> KIP >> > > > later >> > > > > on >> > > > > > to further optimize the consistency and performance of state >> stores. >> > > > > > >> > > > > > I've also added a section discussing some of the concerns around >> > > > > > concurrency, especially in the presence of Iterators. I'm >> thinking of >> > > > > > wrapping WriteBatchWithIndex with a reference-counting >> copy-on-write >> > > > > > implementation (that only makes a copy if there's an active >> > > iterator), >> > > > > but >> > > > > > I'm open to suggestions. >> > > > > > >> > > > > > Regards, >> > > > > > Nick >> > > > > > >> > > > > > On Mon, 28 Nov 2022 at 16:36, Nick Telford < >> nick.telf...@gmail.com> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Colt, >> > > > > > > >> > > > > > > I didn't do any profiling, but the 844 implementation: >> > > > > > > >> > > > > > > - Writes uncommitted records to a temporary RocksDB >> instance >> > > > > > > - Since tombstones need to be flagged, all record >> values are >> > > > > > > prefixed with a value/tombstone marker. This >> necessitates a >> > > > > memory >> > > > > > copy. >> > > > > > > - On-commit, iterates all records in this temporary >> instance and >> > > > > > > writes them to the main RocksDB store. >> > > > > > > - While iterating, the value/tombstone marker needs to be >> parsed >> > > > and >> > > > > > > the real value extracted. This necessitates another memory >> copy. >> > > > > > > >> > > > > > > My guess is that the cost of iterating the temporary RocksDB >> store >> > > is >> > > > > the >> > > > > > > major factor, with the 2 extra memory copies per-Record >> > > contributing >> > > > a >> > > > > > > significant amount too. >> > > > > > > >> > > > > > > Regards, >> > > > > > > Nick >> > > > > > > >> > > > > > > On Mon, 28 Nov 2022 at 16:12, Colt McNealy < >> c...@littlehorse.io> >> > > > > wrote: >> > > > > > > >> > > > > > >> Hi all, >> > > > > > >> >> > > > > > >> Out of curiosity, why does the performance of the store >> degrade so >> > > > > > >> significantly with the 844 implementation? I wouldn't be too >> > > > surprised >> > > > > > by >> > > > > > >> a >> > > > > > >> 50-60% drop (caused by each record being written twice), but >> 96% >> > > is >> > > > > > >> extreme. >> > > > > > >> >> > > > > > >> The only thing I can think of which could create such a >> bottleneck >> > > > > would >> > > > > > >> be >> > > > > > >> that perhaps the 844 implementation deserializes and then >> > > > > re-serializes >> > > > > > >> the >> > > > > > >> store values when copying from the uncommitted to committed >> store, >> > > > > but I >> > > > > > >> wasn't able to figure that out when I scanned the PR. >> > > > > > >> >> > > > > > >> Colt McNealy >> > > > > > >> *Founder, LittleHorse.io* >> > > > > > >> >> > > > > > >> >> > > > > > >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford < >> > > > nick.telf...@gmail.com> >> > > > > > >> wrote: >> > > > > > >> >> > > > > > >> > Hi everyone, >> > > > > > >> > >> > > > > > >> > I've updated the KIP to resolve all the points that have >> been >> > > > raised >> > > > > > so >> > > > > > >> > far, with one exception: the ALOS default commit interval >> of 5 >> > > > > minutes >> > > > > > >> is >> > > > > > >> > likely to cause WriteBatchWithIndex memory to grow too >> large. >> > > > > > >> > >> > > > > > >> > There's a couple of different things I can think of to >> solve >> > > this: >> > > > > > >> > >> > > > > > >> > - We already have a memory/record limit in the KIP to >> prevent >> > > > OOM >> > > > > > >> > errors. Should we choose a default value for these? My >> > > concern >> > > > > here >> > > > > > >> is >> > > > > > >> > that >> > > > > > >> > anything we choose might seem rather arbitrary. We could >> > > change >> > > > > > >> > its behaviour such that under ALOS, it only triggers the >> > > commit >> > > > > of >> > > > > > >> the >> > > > > > >> > StateStore, but under EOS, it triggers a commit of the >> Kafka >> > > > > > >> > transaction. >> > > > > > >> > - We could introduce a separate `checkpoint.interval.ms` >> to >> > > > > allow >> > > > > > >> ALOS >> > > > > > >> > to commit the StateStores more frequently than the >> general >> > > > > > >> > commit.interval.ms? My concern here is that the >> semantics of >> > > > > this >> > > > > > >> > config >> > > > > > >> > would depend on the processing.mode; under ALOS it would >> > > allow >> > > > > more >> > > > > > >> > frequently committing stores, whereas under EOS it >> couldn't. >> > > > > > >> > >> > > > > > >> > Any better ideas? >> > > > > > >> > >> > > > > > >> > On Wed, 23 Nov 2022 at 16:25, Nick Telford < >> > > > nick.telf...@gmail.com> >> > > > > > >> wrote: >> > > > > > >> > >> > > > > > >> > > Hi Alex, >> > > > > > >> > > >> > > > > > >> > > Thanks for the feedback. >> > > > > > >> > > >> > > > > > >> > > I've updated the discussion of OOM issues by describing >> how >> > > > we'll >> > > > > > >> handle >> > > > > > >> > > it. Here's the new text: >> > > > > > >> > > >> > > > > > >> > > To mitigate this, we will automatically force a Task >> commit if >> > > > the >> > > > > > >> total >> > > > > > >> > >> uncommitted records returned by >> > > > > > >> > >> StateStore#approximateNumUncommittedEntries() exceeds a >> > > > > threshold, >> > > > > > >> > >> configured by max.uncommitted.state.entries.per.task; >> or the >> > > > > total >> > > > > > >> > >> memory used for buffering uncommitted records returned >> by >> > > > > > >> > >> StateStore#approximateNumUncommittedBytes() exceeds the >> > > > threshold >> > > > > > >> > >> configured by max.uncommitted.state.bytes.per.task. >> This will >> > > > > > roughly >> > > > > > >> > >> bound the memory required per-Task for buffering >> uncommitted >> > > > > > records, >> > > > > > >> > >> irrespective of the commit.interval.ms, and will >> effectively >> > > > > bound >> > > > > > >> the >> > > > > > >> > >> number of records that will need to be restored in the >> event >> > > > of a >> > > > > > >> > failure. >> > > > > > >> > >> >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > These limits will be checked in StreamTask#process and a >> > > > premature >> > > > > > >> commit >> > > > > > >> > >> will be requested via Task#requestCommit(). >> > > > > > >> > >> >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > Note that these new methods provide default >> implementations >> > > that >> > > > > > >> ensure >> > > > > > >> > >> existing custom stores and non-transactional stores >> (e.g. >> > > > > > >> > >> InMemoryKeyValueStore) do not force any early commits. >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > I've chosen to have the StateStore expose approximations >> of >> > > its >> > > > > > buffer >> > > > > > >> > > size/count instead of opaquely requesting a commit in >> order to >> > > > > > >> delegate >> > > > > > >> > the >> > > > > > >> > > decision making to the Task itself. This enables Tasks >> to look >> > > > at >> > > > > > >> *all* >> > > > > > >> > of >> > > > > > >> > > their StateStores, and determine whether an early commit >> is >> > > > > > necessary. >> > > > > > >> > > Notably, it enables pre-Task thresholds, instead of >> per-Store, >> > > > > which >> > > > > > >> > > prevents Tasks with many StateStores from using much more >> > > memory >> > > > > > than >> > > > > > >> > Tasks >> > > > > > >> > > with one StateStore. This makes sense, since commits are >> done >> > > > > > by-Task, >> > > > > > >> > not >> > > > > > >> > > by-Store. >> > > > > > >> > > >> > > > > > >> > > Prizes* for anyone who can come up with a better name >> for the >> > > > new >> > > > > > >> config >> > > > > > >> > > properties! >> > > > > > >> > > >> > > > > > >> > > Thanks for pointing out the potential performance issues >> of >> > > > WBWI. >> > > > > > From >> > > > > > >> > the >> > > > > > >> > > benchmarks that user posted[1], it looks like WBWI still >> > > > performs >> > > > > > >> > > considerably better than individual puts, which is the >> > > existing >> > > > > > >> design, >> > > > > > >> > so >> > > > > > >> > > I'd actually expect a performance boost from WBWI, just >> not as >> > > > > great >> > > > > > >> as >> > > > > > >> > > we'd get from a plain WriteBatch. This does suggest that >> a >> > > good >> > > > > > >> > > optimization would be to use a regular WriteBatch for >> > > > restoration >> > > > > > (in >> > > > > > >> > > RocksDBStore#restoreBatch), since we know that those >> records >> > > > will >> > > > > > >> never >> > > > > > >> > be >> > > > > > >> > > queried before they're committed. >> > > > > > >> > > >> > > > > > >> > > 1: >> > > > > > >> > >> > > > > > >> > > > >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results >> > > > > > >> > > >> > > > > > >> > > * Just kidding, no prizes, sadly. >> > > > > > >> > > >> > > > > > >> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov >> > > > > > >> > > <asorokou...@confluent.io.invalid> wrote: >> > > > > > >> > > >> > > > > > >> > >> Hey Nick, >> > > > > > >> > >> >> > > > > > >> > >> Thank you for the KIP! With such a significant >> performance >> > > > > > >> degradation >> > > > > > >> > in >> > > > > > >> > >> the secondary store approach, we should definitely >> consider >> > > > > > >> > >> WriteBatchWithIndex. I also like encapsulating >> checkpointing >> > > > > inside >> > > > > > >> the >> > > > > > >> > >> default state store implementation to improve >> performance. >> > > > > > >> > >> >> > > > > > >> > >> +1 to John's comment to keep the current checkpointing >> as a >> > > > > > fallback >> > > > > > >> > >> mechanism. We want to keep existing users' workflows >> intact >> > > if >> > > > we >> > > > > > >> can. A >> > > > > > >> > >> non-intrusive way would be to add a separate StateStore >> > > method, >> > > > > > say, >> > > > > > >> > >> StateStore#managesCheckpointing(), that controls >> whether the >> > > > > state >> > > > > > >> store >> > > > > > >> > >> implementation owns checkpointing. >> > > > > > >> > >> >> > > > > > >> > >> I think that a solution to the transactional writes >> should >> > > > > address >> > > > > > >> the >> > > > > > >> > >> OOMEs. One possible way to address that is to wire >> > > StateStore's >> > > > > > >> commit >> > > > > > >> > >> request by adding, say, StateStore#commitNeeded that is >> > > checked >> > > > > in >> > > > > > >> > >> StreamTask#commitNeeded via the corresponding >> > > > > > ProcessorStateManager. >> > > > > > >> > With >> > > > > > >> > >> that change, RocksDBStore will have to track the current >> > > > > > transaction >> > > > > > >> > size >> > > > > > >> > >> and request a commit when the size goes over a >> (configurable) >> > > > > > >> threshold. >> > > > > > >> > >> >> > > > > > >> > >> AFAIU WriteBatchWithIndex might perform significantly >> slower >> > > > than >> > > > > > >> > non-txn >> > > > > > >> > >> puts as the batch size grows [1]. We should have a >> > > > configuration >> > > > > to >> > > > > > >> fall >> > > > > > >> > >> back to the current behavior (and/or disable txn stores >> for >> > > > ALOS) >> > > > > > >> unless >> > > > > > >> > >> the benchmarks show negligible overhead for longer >> commits / >> > > > > > >> > large-enough >> > > > > > >> > >> batch sizes. >> > > > > > >> > >> >> > > > > > >> > >> If you prefer to keep the KIP smaller, I would rather >> cut out >> > > > > > >> > >> state-store-managed checkpointing rather than proper >> OOMe >> > > > > handling >> > > > > > >> and >> > > > > > >> > >> being able to switch to non-txn behavior. The >> checkpointing >> > > is >> > > > > not >> > > > > > >> > >> necessary to solve the recovery-under-EOS problem. On >> the >> > > other >> > > > > > hand, >> > > > > > >> > once >> > > > > > >> > >> WriteBatchWithIndex is in, it will be much easier to add >> > > > > > >> > >> state-store-managed checkpointing. >> > > > > > >> > >> >> > > > > > >> > >> If you share the current implementation, I am happy to >> help >> > > you >> > > > > > >> address >> > > > > > >> > >> the >> > > > > > >> > >> OOMe and configuration parts as well as review and test >> the >> > > > > patch. >> > > > > > >> > >> >> > > > > > >> > >> Best, >> > > > > > >> > >> Alex >> > > > > > >> > >> >> > > > > > >> > >> >> > > > > > >> > >> 1. https://github.com/facebook/rocksdb/issues/608 >> > > > > > >> > >> >> > > > > > >> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford < >> > > > > > nick.telf...@gmail.com >> > > > > > >> > >> > > > > > >> > >> wrote: >> > > > > > >> > >> >> > > > > > >> > >> > Hi John, >> > > > > > >> > >> > >> > > > > > >> > >> > Thanks for the review and feedback! >> > > > > > >> > >> > >> > > > > > >> > >> > 1. Custom Stores: I've been mulling over this problem >> > > myself. >> > > > > As >> > > > > > it >> > > > > > >> > >> stands, >> > > > > > >> > >> > custom stores would essentially lose checkpointing >> with no >> > > > > > >> indication >> > > > > > >> > >> that >> > > > > > >> > >> > they're expected to make changes, besides a line in >> the >> > > > release >> > > > > > >> > notes. I >> > > > > > >> > >> > agree that the best solution would be to provide a >> default >> > > > that >> > > > > > >> > >> checkpoints >> > > > > > >> > >> > to a file. The one thing I would change is that the >> > > > > checkpointing >> > > > > > >> is >> > > > > > >> > to >> > > > > > >> > >> a >> > > > > > >> > >> > store-local file, instead of a per-Task file. This >> way the >> > > > > > >> StateStore >> > > > > > >> > >> still >> > > > > > >> > >> > technically owns its own checkpointing (via a default >> > > > > > >> implementation), >> > > > > > >> > >> and >> > > > > > >> > >> > the StateManager/Task execution engine doesn't need >> to know >> > > > > > >> anything >> > > > > > >> > >> about >> > > > > > >> > >> > checkpointing, which greatly simplifies some of the >> logic. >> > > > > > >> > >> > >> > > > > > >> > >> > 2. OOME errors: The main reasons why I didn't explore >> a >> > > > > solution >> > > > > > to >> > > > > > >> > >> this is >> > > > > > >> > >> > a) to keep this KIP as simple as possible, and b) >> because >> > > I'm >> > > > > not >> > > > > > >> > >> exactly >> > > > > > >> > >> > how to signal that a Task should commit prematurely. >> I'm >> > > > > > confident >> > > > > > >> > it's >> > > > > > >> > >> > possible, and I think it's worth adding a section on >> > > handling >> > > > > > this. >> > > > > > >> > >> Besides >> > > > > > >> > >> > my proposal to force an early commit once memory usage >> > > > reaches >> > > > > a >> > > > > > >> > >> threshold, >> > > > > > >> > >> > is there any other approach that you might suggest for >> > > > tackling >> > > > > > >> this >> > > > > > >> > >> > problem? >> > > > > > >> > >> > >> > > > > > >> > >> > 3. ALOS: I can add in an explicit paragraph, but my >> > > > assumption >> > > > > is >> > > > > > >> that >> > > > > > >> > >> > since transactional behaviour comes at little/no >> cost, that >> > > > it >> > > > > > >> should >> > > > > > >> > be >> > > > > > >> > >> > available by default on all stores, irrespective of >> the >> > > > > > processing >> > > > > > >> > mode. >> > > > > > >> > >> > While ALOS doesn't use transactions, the Task itself >> still >> > > > > > >> "commits", >> > > > > > >> > so >> > > > > > >> > >> > the behaviour should be correct under ALOS too. I'm >> not >> > > > > convinced >> > > > > > >> that >> > > > > > >> > >> it's >> > > > > > >> > >> > worth having both transactional/non-transactional >> stores >> > > > > > >> available, as >> > > > > > >> > >> it >> > > > > > >> > >> > would considerably increase the complexity of the >> codebase, >> > > > for >> > > > > > >> very >> > > > > > >> > >> little >> > > > > > >> > >> > benefit. >> > > > > > >> > >> > >> > > > > > >> > >> > 4. Method deprecation: Are you referring to >> > > > > > >> StateStore#getPosition()? >> > > > > > >> > >> As I >> > > > > > >> > >> > understand it, Position contains the position of the >> > > *source* >> > > > > > >> topics, >> > > > > > >> > >> > whereas the commit offsets would be the *changelog* >> > > offsets. >> > > > So >> > > > > > >> it's >> > > > > > >> > >> still >> > > > > > >> > >> > necessary to retain the Position data, as well as the >> > > > changelog >> > > > > > >> > offsets. >> > > > > > >> > >> > What I meant in the KIP is that Position offsets are >> > > > currently >> > > > > > >> stored >> > > > > > >> > >> in a >> > > > > > >> > >> > file, and since we can atomically store metadata >> along with >> > > > the >> > > > > > >> record >> > > > > > >> > >> > batch we commit to RocksDB, we can move our Position >> > > offsets >> > > > in >> > > > > > to >> > > > > > >> > this >> > > > > > >> > >> > metadata too, and gain the same transactional >> guarantees >> > > that >> > > > > we >> > > > > > >> will >> > > > > > >> > >> for >> > > > > > >> > >> > changelog offsets, ensuring that the Position offsets >> are >> > > > > > >> consistent >> > > > > > >> > >> with >> > > > > > >> > >> > the records that are read from the database. >> > > > > > >> > >> > >> > > > > > >> > >> > Regards, >> > > > > > >> > >> > Nick >> > > > > > >> > >> > >> > > > > > >> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler < >> > > > > vvcep...@apache.org> >> > > > > > >> > wrote: >> > > > > > >> > >> > >> > > > > > >> > >> > > Thanks for publishing this alternative, Nick! >> > > > > > >> > >> > > >> > > > > > >> > >> > > The benchmark you mentioned in the KIP-844 >> discussion >> > > seems >> > > > > > like >> > > > > > >> a >> > > > > > >> > >> > > compelling reason to revisit the built-in >> > > transactionality >> > > > > > >> > mechanism. >> > > > > > >> > >> I >> > > > > > >> > >> > > also appreciate you analysis, showing that for most >> use >> > > > > cases, >> > > > > > >> the >> > > > > > >> > >> write >> > > > > > >> > >> > > batch approach should be just fine. >> > > > > > >> > >> > > >> > > > > > >> > >> > > There are a couple of points that would hold me >> back from >> > > > > > >> approving >> > > > > > >> > >> this >> > > > > > >> > >> > > KIP right now: >> > > > > > >> > >> > > >> > > > > > >> > >> > > 1. Loss of coverage for custom stores. >> > > > > > >> > >> > > The fact that you can plug in a (relatively) simple >> > > > > > >> implementation >> > > > > > >> > of >> > > > > > >> > >> the >> > > > > > >> > >> > > XStateStore interfaces and automagically get a >> > > distributed >> > > > > > >> database >> > > > > > >> > >> out >> > > > > > >> > >> > of >> > > > > > >> > >> > > it is a significant benefit of Kafka Streams. I'd >> hate to >> > > > > lose >> > > > > > >> it, >> > > > > > >> > so >> > > > > > >> > >> it >> > > > > > >> > >> > > would be better to spend some time and come up with >> a way >> > > > to >> > > > > > >> > preserve >> > > > > > >> > >> > that >> > > > > > >> > >> > > property. For example, can we provide a default >> > > > > implementation >> > > > > > of >> > > > > > >> > >> > > `commit(..)` that re-implements the existing >> > > > checkpoint-file >> > > > > > >> > >> approach? Or >> > > > > > >> > >> > > perhaps add an `isTransactional()` flag to the state >> > > store >> > > > > > >> interface >> > > > > > >> > >> so >> > > > > > >> > >> > > that the runtime can decide whether to continue to >> manage >> > > > > > >> checkpoint >> > > > > > >> > >> > files >> > > > > > >> > >> > > vs delegating transactionality to the stores? >> > > > > > >> > >> > > >> > > > > > >> > >> > > 2. Guarding against OOME >> > > > > > >> > >> > > I appreciate your analysis, but I don't think it's >> > > > sufficient >> > > > > > to >> > > > > > >> say >> > > > > > >> > >> that >> > > > > > >> > >> > > we will solve the memory problem later if it becomes >> > > > > necessary. >> > > > > > >> The >> > > > > > >> > >> > > experience leading to that situation would be quite >> bad: >> > > > > > Imagine, >> > > > > > >> > you >> > > > > > >> > >> > > upgrade to AK 3.next, your tests pass, so you >> deploy to >> > > > > > >> production. >> > > > > > >> > >> That >> > > > > > >> > >> > > night, you get paged because your app is now >> crashing >> > > with >> > > > > > >> OOMEs. As >> > > > > > >> > >> with >> > > > > > >> > >> > > all OOMEs, you'll have a really hard time finding >> the >> > > root >> > > > > > cause, >> > > > > > >> > and >> > > > > > >> > >> > once >> > > > > > >> > >> > > you do, you won't have a clear path to resolve the >> issue. >> > > > You >> > > > > > >> could >> > > > > > >> > >> only >> > > > > > >> > >> > > tune down the commit interval and cache buffer size >> until >> > > > you >> > > > > > >> stop >> > > > > > >> > >> > getting >> > > > > > >> > >> > > crashes. >> > > > > > >> > >> > > >> > > > > > >> > >> > > FYI, I know of multiple cases where people run EOS >> with >> > > > much >> > > > > > >> larger >> > > > > > >> > >> > commit >> > > > > > >> > >> > > intervals to get better batching than the default, >> so I >> > > > don't >> > > > > > >> think >> > > > > > >> > >> this >> > > > > > >> > >> > > pathological case would be as rare as you suspect. >> > > > > > >> > >> > > >> > > > > > >> > >> > > Given that we already have the rudiments of an idea >> of >> > > what >> > > > > we >> > > > > > >> could >> > > > > > >> > >> do >> > > > > > >> > >> > to >> > > > > > >> > >> > > prevent this downside, we should take the time to >> design >> > > a >> > > > > > >> solution. >> > > > > > >> > >> We >> > > > > > >> > >> > owe >> > > > > > >> > >> > > it to our users to ensure that awesome new features >> don't >> > > > > come >> > > > > > >> with >> > > > > > >> > >> > bitter >> > > > > > >> > >> > > pills unless we can't avoid it. >> > > > > > >> > >> > > >> > > > > > >> > >> > > 3. ALOS mode. >> > > > > > >> > >> > > On the other hand, I didn't see an indication of how >> > > stores >> > > > > > will >> > > > > > >> be >> > > > > > >> > >> > > handled under ALOS (aka non-EOS) mode. >> Theoretically, the >> > > > > > >> > >> > transactionality >> > > > > > >> > >> > > of the store and the processing mode are >> orthogonal. A >> > > > > > >> transactional >> > > > > > >> > >> > store >> > > > > > >> > >> > > would serve ALOS just as well as a >> non-transactional one >> > > > (if >> > > > > > not >> > > > > > >> > >> better). >> > > > > > >> > >> > > Under ALOS, though, the default commit interval is >> five >> > > > > > minutes, >> > > > > > >> so >> > > > > > >> > >> the >> > > > > > >> > >> > > memory issue is far more pressing. >> > > > > > >> > >> > > >> > > > > > >> > >> > > As I see it, we have several options to resolve this >> > > point. >> > > > > We >> > > > > > >> could >> > > > > > >> > >> > > demonstrate that transactional stores work just >> fine for >> > > > ALOS >> > > > > > >> and we >> > > > > > >> > >> can >> > > > > > >> > >> > > therefore just swap over unconditionally. We could >> also >> > > > > disable >> > > > > > >> the >> > > > > > >> > >> > > transactional mechanism under ALOS so that stores >> operate >> > > > > just >> > > > > > >> the >> > > > > > >> > >> same >> > > > > > >> > >> > as >> > > > > > >> > >> > > they do today when run in ALOS mode. Finally, we >> could do >> > > > the >> > > > > > >> same >> > > > > > >> > as >> > > > > > >> > >> in >> > > > > > >> > >> > > KIP-844 and make transactional stores opt-in (it'd >> be >> > > > better >> > > > > to >> > > > > > >> > avoid >> > > > > > >> > >> the >> > > > > > >> > >> > > extra opt-in mechanism, but it's a good >> > > > get-out-of-jail-free >> > > > > > >> card). >> > > > > > >> > >> > > >> > > > > > >> > >> > > 4. (minor point) Deprecation of methods >> > > > > > >> > >> > > >> > > > > > >> > >> > > You mentioned that the new `commit` method replaces >> > > flush, >> > > > > > >> > >> > > updateChangelogOffsets, and checkpoint. It seems to >> me >> > > that >> > > > > the >> > > > > > >> > point >> > > > > > >> > >> > about >> > > > > > >> > >> > > atomicity and Position also suggests that it >> replaces the >> > > > > > >> Position >> > > > > > >> > >> > > callbacks. However, the proposal only deprecates >> `flush`. >> > > > > > Should >> > > > > > >> we >> > > > > > >> > be >> > > > > > >> > >> > > deprecating other methods as well? >> > > > > > >> > >> > > >> > > > > > >> > >> > > Thanks again for the KIP! It's really nice that you >> and >> > > > Alex >> > > > > > will >> > > > > > >> > get >> > > > > > >> > >> the >> > > > > > >> > >> > > chance to collaborate on both directions so that we >> can >> > > get >> > > > > the >> > > > > > >> best >> > > > > > >> > >> > > outcome for Streams and its users. >> > > > > > >> > >> > > >> > > > > > >> > >> > > -John >> > > > > > >> > >> > > >> > > > > > >> > >> > > >> > > > > > >> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote: >> > > > > > >> > >> > > > Hi everyone, >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > As I mentioned in the discussion thread for >> KIP-844, >> > > I've >> > > > > > been >> > > > > > >> > >> working >> > > > > > >> > >> > on >> > > > > > >> > >> > > > an alternative approach to achieving better >> > > transactional >> > > > > > >> > semantics >> > > > > > >> > >> for >> > > > > > >> > >> > > > Kafka Streams StateStores. >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > I've published this separately as KIP-892: >> > > Transactional >> > > > > > >> Semantics >> > > > > > >> > >> for >> > > > > > >> > >> > > > StateStores >> > > > > > >> > >> > > > < >> > > > > > >> > >> > > >> > > > > > >> > >> > >> > > > > > >> > >> >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> > > > > > >> > >> > > >, >> > > > > > >> > >> > > > so that it can be discussed/reviewed separately >> from >> > > > > KIP-844. >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > Alex: I'm especially interested in what you think! >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > I have a nearly complete implementation of the >> changes >> > > > > > >> outlined in >> > > > > > >> > >> this >> > > > > > >> > >> > > > KIP, please let me know if you'd like me to push >> them >> > > for >> > > > > > >> review >> > > > > > >> > in >> > > > > > >> > >> > > advance >> > > > > > >> > >> > > > of a vote. >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > Regards, >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > Nick >> > > > > > >> > >> > > > >> > > > > > >> > >> > > >> > > > > > >> > >> > >> > > > > > >> > >> >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> >