Regarding the cache: if we disable write-caching when transactional stores are enabled, that will disable the caching of intermediate values for aggregations, causing aggregation processors to run on every write. Depending on the application, this could be negligible (which I believe will be the case for basically any builtin aggregation), or it could be significant (if the aggregation processor does anything particularly heavyweight, like I/O).
Is this an acceptable trade-off? I'm not sure there's a way to preserve the caching of intermediate writes for aggregations without effectively "double-buffering" writes. Could we also add the cache's "flush to downstream processors only on commit" behaviour to our transaction buffers? My concern with that is that our transaction buffering requires a WriteLock during commit, to prevent IQ threads seeing an inconsistent view of the store. Therefore, store commit should not block while awaiting extensive downstream processing. Regards, Nick On Thu, 30 Apr 2026 at 10:34, Nick Telford <[email protected]> wrote: > Thanks Bill! > > I've updated the KIP as requested: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > 1. Updated the section on thread safety with specifics about RocksDBStore > and InMemory stores. > 2. Added a section on State Store Caching > > Let me know what you think, especially about the caching section, and if > there's any other changes you'd like to see. > > Cheers, > Nick > > On Wed, 29 Apr 2026 at 21:57, Bill Bejeck <[email protected]> wrote: > >> Thanks for the responses Nick. >> >> >BB1 fair enough. >> >> >BB2 - can we update the transactional thread safety part of the KIP with >> the response you provided here? >> >> >BB3b - I think that approach will work. My main concern is to provide a >> clear picture of the memory used and what's the interaction with both >> caches and flushing. >> >> On Tue, Apr 21, 2026 at 10:52 AM Nick Telford <[email protected]> >> wrote: >> >> > BB3b: Thinking a bit more about the cache vs. transaction buffer - what >> if >> > we make a subtle change to the cache: when >> enable.transactional.statestores >> > = true, we no longer cache writes (put/delete), only reads. Since the >> > transaction buffer will already be storing recently written records (at >> > least until commit), we don't need to keep them in another cache. We >> would >> > still cache records read *from* the transaction buffer (because we would >> > take the read of a recently written record as a signal that it's worth >> > caching). The only caveat to this would be that the transaction buffer >> > flushes recently written records on-commit, so cache hit-rate of >> recently >> > written records will dip during commits. >> > >> > What do you think? >> > >> > On Tue, 21 Apr 2026 at 10:39, Nick Telford <[email protected]> >> wrote: >> > >> > > Hi Bill, >> > > >> > > Thanks for reviewing the updated KIP! >> > > >> > > BB1: I'm not quite sure I understand what you're saying here. We need >> to >> > > take a copy/snapshot of the read buffer because the read buffer is >> > emptied >> > > on-commit, and any Iterator created on a ConcurrentSkipListMap >> > > automatically reflects those changes, so would terminate iteration >> > > prematurely. Also, any new writes to the buffer (put/delete) would be >> > > immediately reflected in the iterator, making its results >> inconsistent. >> > > TL;DR: copying it gives us full snapshot isolation for IQ iterators. >> > > FWIW, the implementation already takes an Iterator over the underlying >> > > store, as well as the buffer, and merges the two iterators together. >> But >> > > the read buffer's Iterator must be over a copy of the read buffer, to >> > > provide a consistent snapshot of the store. >> > > >> > > BB2: Basically, the RocksDBTransactionBuffer will maintain an extra >> > "write >> > > buffer", which uses a RocksDB WriteBatch to stage writes during >> > > put/delete. This yields better throughput than iterating the >> > > ConcurrentSkipListMap and writing to a WriteBatch during commit, and >> > > reduces the time that IQ threads (that need a readLock on the >> > > ConcurrentSkipListMap) are blocked from creating iterators during >> commit. >> > > >> > > InMemoryTransactionBuffers do not need this extra "write buffer", as >> > there >> > > is no more efficient implementation for them to use. This means that, >> vs. >> > > RocksDB, InMemoryStores will actually use less memory buffering >> records. >> > > >> > > BB3: This is a good question, and one that I've not really dug too >> deeply >> > > into. I suspect that there's a level of duplication of concerns here, >> but >> > > there are subtle differences: the cache is an LRU cache, which can >> cache >> > > records across multiple commits (if they're frequently read, for >> > example), >> > > whereas the transaction buffer always exclusively contains uncommitted >> > > writes. >> > > >> > > In terms of correctness/consistency, I don't think we have a problem. >> For >> > > READ_COMMITTED IQ, we skip both the cache and transaction buffers. For >> > > READ_UNCOMMITTED IQ (and StreamThread reads) we read in the order >> > dictated >> > > by the StateStore layering, which is: cache first, then transaction >> > buffer, >> > > then underlying store. Since writes always update the cache, this is >> > > correct. The one wrinkle is that cached writes are only written to >> > > downstream store layers (and therefore buffered by the transaction >> > buffer) >> > > when the entry is evicted, or the cache is flushed or committed. This >> > > essentially double-buffers writes, which is a bit wasteful of memory. >> > > >> > > I don't really know much about the cache, but as far as I can tell it >> > > might eliminate some (de)serialization overhead? Regardless, given >> that >> > it >> > > implements a different behaviour (LRU vs. buffering uncommitted >> writes), >> > > eliminating it might cause some subtle performance regressions for >> *some* >> > > users. >> > > >> > > Maybe instead we have the *default* cache size change to 0 when >> > > enable.transactional.statestores = true and >> > > statestore.uncommitted.max.bytes > 0? That would reduce memory usage >> for >> > > users that aren't explicitly relying on the cache. >> > > >> > > BB4: Ahh yes, good catch! This is showing how old this KIP is :-) >> > > >> > > Regards, >> > > Nick >> > > >> > > On Mon, 20 Apr 2026 at 21:11, Bill Bejeck <[email protected]> wrote: >> > > >> > >> Hi Nick, >> > >> >> > >> Thanks for the update to the KIP! It's great to see this moving >> again. >> > >> Overall the KIP updates LGTM, but I do have a couple of comments >> > >> >> > >> BB1- The KIP specificies using a ConcurrentSkipListMap for the >> > >> transactional buffer and for IQ READ_UNCOMMITTED queries KS will >> take a >> > >> snapshot and during the copy lock the StreamThread with >> > >> RenetrantReadWriteLock. >> > >> Now if the IQ queries will only be served from the in-memory >> > transactional >> > >> buffer, that's an acceptable trade-off since even though the >> > >> ConcurrentSkipListMap uses weakly consistent iterators, a commit >> > >> mid-iteration would mean those records not viewed as a result of >> > clearing >> > >> the buffer. Would it be worth exploring using IQ against the store >> and >> > >> the >> > >> buffer thus removing the copy+locking? I think this could be >> > >> worthwhile (unless just technically unfeasible) as the way it stands >> > users >> > >> will have to possibly make tradeoffs in performance by adjusting >> either >> > >> the >> > >> commit interval or the size of uncommitted bytes. Of course as >> specified >> > >> in >> > >> the KIP they have the option of going with READ_COMMITTED. >> > >> >> > >> BB2 - In the KIP section "Transaction Buffer Thread Safety" there's >> this >> > >> sentence which I find confusing: >> > >> >> > >> > Reads from the StreamThread and READ_UNCOMMITTED IQ threads are >> > serviced >> > >> > by the read buffer. The write buffer is (optionally) used to buffer >> > >> > writes for commit to the database, if doing so would yield better >> > >> > performance than using the read buffer. >> > >> >> > >> Can you clarify what this means? >> > >> >> > >> BB3 - How does the `statestore.uncommitted.max.bytes` work in >> > >> conjuction with the `statestore.cache.max.bytes.buffering` config? >> If >> > >> users elect to go with transactional stores is the statestore cache >> > still >> > >> needed? Since the statestore cache flushes on commit() as well as >> the >> > >> transactional cache what's the workflow like between the two? The >> > default >> > >> size of the store cache is 10M so if it becomes full and a commit is >> not >> > >> called would it empty into the transactional cache? >> > >> >> > >> BB4- In discussing the `enable.transactional.statestores` config it >> > >> mentions "exactly-once, exactly-once-v2 or exactly-once-beta" but we >> can >> > >> go >> > >> with just `exactly-once-v2` , we removed the others since 4.0 >> > >> >> > >> Thanks! >> > >> Bill >> > >> >> > >> >> > >> On Fri, Apr 17, 2026 at 1:13 PM Nick Telford <[email protected] >> > >> > >> wrote: >> > >> >> > >> > Hi everyone, >> > >> > >> > >> > We're circling back to KIP-892, with the goal of including it in >> 4.4. >> > To >> > >> > that end, I've updated the KIP with substantial design >> improvements: >> > >> > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892:+Transactional+Semantics+for+StateStores >> > >> > >> > >> > Highlights: >> > >> > >> > >> > - TransactionBuffers that support both RocksDB and InMemory >> stores >> > >> > - Support for custom IsolationLevel of Interactive Queries, >> > >> including a >> > >> > default (IQv1 and IQv2) and a per-request isolation level (IQv2 >> > only) >> > >> > - New store-level metric for uncommitted-bytes >> > >> > >> > >> > Let me know what you think! >> > >> > >> > >> > Regards, >> > >> > Nick >> > >> > >> > >> > On Wed, 17 Apr 2024 at 11:50, Nick Telford <[email protected] >> > >> > >> wrote: >> > >> > >> > >> > > Hi Walker, >> > >> > > >> > >> > > Feel free to ask away, either on the mailing list of the >> Confluent >> > >> > > Community Slack, where I hang out :-) >> > >> > > >> > >> > > The implementation is *mostly* complete, although it needs some >> > >> > polishing. >> > >> > > It's worth noting that KIP-1035 is a hard prerequisite for this. >> > >> > > >> > >> > > Regards, >> > >> > > Nick >> > >> > > >> > >> > >> > >> >> > > >> > >> >
