> > I'm not sure there's a way to preserve the > caching of intermediate writes for aggregations without effectively > "double-buffering" writes.
Yes, I beggining to have the same opinion, we'll see as the development progresses. -Bill On Thu, Apr 30, 2026 at 8:38 AM Nick Telford <[email protected]> wrote: > Regarding the cache: if we disable write-caching when transactional stores > are enabled, that will disable the caching of intermediate values for > aggregations, causing aggregation processors to run on every write. > Depending on the application, this could be negligible (which I believe > will be the case for basically any builtin aggregation), or it could be > significant (if the aggregation processor does anything particularly > heavyweight, like I/O). > > Is this an acceptable trade-off? I'm not sure there's a way to preserve the > caching of intermediate writes for aggregations without effectively > "double-buffering" writes. > > Could we also add the cache's "flush to downstream processors only on > commit" behaviour to our transaction buffers? My concern with that is that > our transaction buffering requires a WriteLock during commit, to prevent IQ > threads seeing an inconsistent view of the store. Therefore, store commit > should not block while awaiting extensive downstream processing. > > Regards, > Nick > > On Thu, 30 Apr 2026 at 10:34, Nick Telford <[email protected]> wrote: > > > Thanks Bill! > > > > I've updated the KIP as requested: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > > > 1. Updated the section on thread safety with specifics about RocksDBStore > > and InMemory stores. > > 2. Added a section on State Store Caching > > > > Let me know what you think, especially about the caching section, and if > > there's any other changes you'd like to see. > > > > Cheers, > > Nick > > > > On Wed, 29 Apr 2026 at 21:57, Bill Bejeck <[email protected]> wrote: > > > >> Thanks for the responses Nick. > >> > >> >BB1 fair enough. > >> > >> >BB2 - can we update the transactional thread safety part of the KIP > with > >> the response you provided here? > >> > >> >BB3b - I think that approach will work. My main concern is to provide > a > >> clear picture of the memory used and what's the interaction with both > >> caches and flushing. > >> > >> On Tue, Apr 21, 2026 at 10:52 AM Nick Telford <[email protected]> > >> wrote: > >> > >> > BB3b: Thinking a bit more about the cache vs. transaction buffer - > what > >> if > >> > we make a subtle change to the cache: when > >> enable.transactional.statestores > >> > = true, we no longer cache writes (put/delete), only reads. Since the > >> > transaction buffer will already be storing recently written records > (at > >> > least until commit), we don't need to keep them in another cache. We > >> would > >> > still cache records read *from* the transaction buffer (because we > would > >> > take the read of a recently written record as a signal that it's worth > >> > caching). The only caveat to this would be that the transaction buffer > >> > flushes recently written records on-commit, so cache hit-rate of > >> recently > >> > written records will dip during commits. > >> > > >> > What do you think? > >> > > >> > On Tue, 21 Apr 2026 at 10:39, Nick Telford <[email protected]> > >> wrote: > >> > > >> > > Hi Bill, > >> > > > >> > > Thanks for reviewing the updated KIP! > >> > > > >> > > BB1: I'm not quite sure I understand what you're saying here. We > need > >> to > >> > > take a copy/snapshot of the read buffer because the read buffer is > >> > emptied > >> > > on-commit, and any Iterator created on a ConcurrentSkipListMap > >> > > automatically reflects those changes, so would terminate iteration > >> > > prematurely. Also, any new writes to the buffer (put/delete) would > be > >> > > immediately reflected in the iterator, making its results > >> inconsistent. > >> > > TL;DR: copying it gives us full snapshot isolation for IQ iterators. > >> > > FWIW, the implementation already takes an Iterator over the > underlying > >> > > store, as well as the buffer, and merges the two iterators together. > >> But > >> > > the read buffer's Iterator must be over a copy of the read buffer, > to > >> > > provide a consistent snapshot of the store. > >> > > > >> > > BB2: Basically, the RocksDBTransactionBuffer will maintain an extra > >> > "write > >> > > buffer", which uses a RocksDB WriteBatch to stage writes during > >> > > put/delete. This yields better throughput than iterating the > >> > > ConcurrentSkipListMap and writing to a WriteBatch during commit, and > >> > > reduces the time that IQ threads (that need a readLock on the > >> > > ConcurrentSkipListMap) are blocked from creating iterators during > >> commit. > >> > > > >> > > InMemoryTransactionBuffers do not need this extra "write buffer", as > >> > there > >> > > is no more efficient implementation for them to use. This means > that, > >> vs. > >> > > RocksDB, InMemoryStores will actually use less memory buffering > >> records. > >> > > > >> > > BB3: This is a good question, and one that I've not really dug too > >> deeply > >> > > into. I suspect that there's a level of duplication of concerns > here, > >> but > >> > > there are subtle differences: the cache is an LRU cache, which can > >> cache > >> > > records across multiple commits (if they're frequently read, for > >> > example), > >> > > whereas the transaction buffer always exclusively contains > uncommitted > >> > > writes. > >> > > > >> > > In terms of correctness/consistency, I don't think we have a > problem. > >> For > >> > > READ_COMMITTED IQ, we skip both the cache and transaction buffers. > For > >> > > READ_UNCOMMITTED IQ (and StreamThread reads) we read in the order > >> > dictated > >> > > by the StateStore layering, which is: cache first, then transaction > >> > buffer, > >> > > then underlying store. Since writes always update the cache, this is > >> > > correct. The one wrinkle is that cached writes are only written to > >> > > downstream store layers (and therefore buffered by the transaction > >> > buffer) > >> > > when the entry is evicted, or the cache is flushed or committed. > This > >> > > essentially double-buffers writes, which is a bit wasteful of > memory. > >> > > > >> > > I don't really know much about the cache, but as far as I can tell > it > >> > > might eliminate some (de)serialization overhead? Regardless, given > >> that > >> > it > >> > > implements a different behaviour (LRU vs. buffering uncommitted > >> writes), > >> > > eliminating it might cause some subtle performance regressions for > >> *some* > >> > > users. > >> > > > >> > > Maybe instead we have the *default* cache size change to 0 when > >> > > enable.transactional.statestores = true and > >> > > statestore.uncommitted.max.bytes > 0? That would reduce memory usage > >> for > >> > > users that aren't explicitly relying on the cache. > >> > > > >> > > BB4: Ahh yes, good catch! This is showing how old this KIP is :-) > >> > > > >> > > Regards, > >> > > Nick > >> > > > >> > > On Mon, 20 Apr 2026 at 21:11, Bill Bejeck <[email protected]> > wrote: > >> > > > >> > >> Hi Nick, > >> > >> > >> > >> Thanks for the update to the KIP! It's great to see this moving > >> again. > >> > >> Overall the KIP updates LGTM, but I do have a couple of comments > >> > >> > >> > >> BB1- The KIP specificies using a ConcurrentSkipListMap for the > >> > >> transactional buffer and for IQ READ_UNCOMMITTED queries KS will > >> take a > >> > >> snapshot and during the copy lock the StreamThread with > >> > >> RenetrantReadWriteLock. > >> > >> Now if the IQ queries will only be served from the in-memory > >> > transactional > >> > >> buffer, that's an acceptable trade-off since even though the > >> > >> ConcurrentSkipListMap uses weakly consistent iterators, a commit > >> > >> mid-iteration would mean those records not viewed as a result of > >> > clearing > >> > >> the buffer. Would it be worth exploring using IQ against the store > >> and > >> > >> the > >> > >> buffer thus removing the copy+locking? I think this could be > >> > >> worthwhile (unless just technically unfeasible) as the way it > stands > >> > users > >> > >> will have to possibly make tradeoffs in performance by adjusting > >> either > >> > >> the > >> > >> commit interval or the size of uncommitted bytes. Of course as > >> specified > >> > >> in > >> > >> the KIP they have the option of going with READ_COMMITTED. > >> > >> > >> > >> BB2 - In the KIP section "Transaction Buffer Thread Safety" there's > >> this > >> > >> sentence which I find confusing: > >> > >> > >> > >> > Reads from the StreamThread and READ_UNCOMMITTED IQ threads are > >> > serviced > >> > >> > by the read buffer. The write buffer is (optionally) used to > buffer > >> > >> > writes for commit to the database, if doing so would yield better > >> > >> > performance than using the read buffer. > >> > >> > >> > >> Can you clarify what this means? > >> > >> > >> > >> BB3 - How does the `statestore.uncommitted.max.bytes` work in > >> > >> conjuction with the `statestore.cache.max.bytes.buffering` config? > >> If > >> > >> users elect to go with transactional stores is the statestore cache > >> > still > >> > >> needed? Since the statestore cache flushes on commit() as well as > >> the > >> > >> transactional cache what's the workflow like between the two? The > >> > default > >> > >> size of the store cache is 10M so if it becomes full and a commit > is > >> not > >> > >> called would it empty into the transactional cache? > >> > >> > >> > >> BB4- In discussing the `enable.transactional.statestores` config it > >> > >> mentions "exactly-once, exactly-once-v2 or exactly-once-beta" but > we > >> can > >> > >> go > >> > >> with just `exactly-once-v2` , we removed the others since 4.0 > >> > >> > >> > >> Thanks! > >> > >> Bill > >> > >> > >> > >> > >> > >> On Fri, Apr 17, 2026 at 1:13 PM Nick Telford < > [email protected] > >> > > >> > >> wrote: > >> > >> > >> > >> > Hi everyone, > >> > >> > > >> > >> > We're circling back to KIP-892, with the goal of including it in > >> 4.4. > >> > To > >> > >> > that end, I've updated the KIP with substantial design > >> improvements: > >> > >> > > >> > >> > > >> > >> > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892:+Transactional+Semantics+for+StateStores > >> > >> > > >> > >> > Highlights: > >> > >> > > >> > >> > - TransactionBuffers that support both RocksDB and InMemory > >> stores > >> > >> > - Support for custom IsolationLevel of Interactive Queries, > >> > >> including a > >> > >> > default (IQv1 and IQv2) and a per-request isolation level > (IQv2 > >> > only) > >> > >> > - New store-level metric for uncommitted-bytes > >> > >> > > >> > >> > Let me know what you think! > >> > >> > > >> > >> > Regards, > >> > >> > Nick > >> > >> > > >> > >> > On Wed, 17 Apr 2024 at 11:50, Nick Telford < > [email protected] > >> > > >> > >> wrote: > >> > >> > > >> > >> > > Hi Walker, > >> > >> > > > >> > >> > > Feel free to ask away, either on the mailing list of the > >> Confluent > >> > >> > > Community Slack, where I hang out :-) > >> > >> > > > >> > >> > > The implementation is *mostly* complete, although it needs some > >> > >> > polishing. > >> > >> > > It's worth noting that KIP-1035 is a hard prerequisite for > this. > >> > >> > > > >> > >> > > Regards, > >> > >> > > Nick > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > >> > > >
