>
>  I'm not sure there's a way to preserve the
> caching of intermediate writes for aggregations without effectively
> "double-buffering" writes.


Yes, I beggining to have the same opinion, we'll see as the development
progresses.

-Bill

On Thu, Apr 30, 2026 at 8:38 AM Nick Telford <[email protected]> wrote:

> Regarding the cache: if we disable write-caching when transactional stores
> are enabled, that will disable the caching of intermediate values for
> aggregations, causing aggregation processors to run on every write.
> Depending on the application, this could be negligible (which I believe
> will be the case for basically any builtin aggregation), or it could be
> significant (if the aggregation processor does anything particularly
> heavyweight, like I/O).
>
> Is this an acceptable trade-off? I'm not sure there's a way to preserve the
> caching of intermediate writes for aggregations without effectively
> "double-buffering" writes.
>
> Could we also add the cache's "flush to downstream processors only on
> commit" behaviour to our transaction buffers? My concern with that is that
> our transaction buffering requires a WriteLock during commit, to prevent IQ
> threads seeing an inconsistent view of the store. Therefore, store commit
> should not block while awaiting extensive downstream processing.
>
> Regards,
> Nick
>
> On Thu, 30 Apr 2026 at 10:34, Nick Telford <[email protected]> wrote:
>
> > Thanks Bill!
> >
> > I've updated the KIP as requested:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >
> > 1. Updated the section on thread safety with specifics about RocksDBStore
> > and InMemory stores.
> > 2. Added a section on State Store Caching
> >
> > Let me know what you think, especially about the caching section, and if
> > there's any other changes you'd like to see.
> >
> > Cheers,
> > Nick
> >
> > On Wed, 29 Apr 2026 at 21:57, Bill Bejeck <[email protected]> wrote:
> >
> >> Thanks for the responses Nick.
> >>
> >> >BB1 fair enough.
> >>
> >> >BB2 - can we update the transactional thread safety part of the KIP
> with
> >> the response you provided here?
> >>
> >> >BB3b - I think that approach will work.  My main concern is to provide
> a
> >> clear picture of the memory used and what's the interaction with both
> >> caches and flushing.
> >>
> >> On Tue, Apr 21, 2026 at 10:52 AM Nick Telford <[email protected]>
> >> wrote:
> >>
> >> > BB3b: Thinking a bit more about the cache vs. transaction buffer -
> what
> >> if
> >> > we make a subtle change to the cache: when
> >> enable.transactional.statestores
> >> > = true, we no longer cache writes (put/delete), only reads. Since the
> >> > transaction buffer will already be storing recently written records
> (at
> >> > least until commit), we don't need to keep them in another cache. We
> >> would
> >> > still cache records read *from* the transaction buffer (because we
> would
> >> > take the read of a recently written record as a signal that it's worth
> >> > caching). The only caveat to this would be that the transaction buffer
> >> > flushes recently written records on-commit, so cache hit-rate of
> >> recently
> >> > written records will dip during commits.
> >> >
> >> > What do you think?
> >> >
> >> > On Tue, 21 Apr 2026 at 10:39, Nick Telford <[email protected]>
> >> wrote:
> >> >
> >> > > Hi Bill,
> >> > >
> >> > > Thanks for reviewing the updated KIP!
> >> > >
> >> > > BB1: I'm not quite sure I understand what you're saying here. We
> need
> >> to
> >> > > take a copy/snapshot of the read buffer because the read buffer is
> >> > emptied
> >> > > on-commit, and any Iterator created on a ConcurrentSkipListMap
> >> > > automatically reflects those changes, so would terminate iteration
> >> > > prematurely. Also, any new writes to the buffer (put/delete) would
> be
> >> > > immediately reflected in the iterator, making its results
> >> inconsistent.
> >> > > TL;DR: copying it gives us full snapshot isolation for IQ iterators.
> >> > > FWIW, the implementation already takes an Iterator over the
> underlying
> >> > > store, as well as the buffer, and merges the two iterators together.
> >> But
> >> > > the read buffer's Iterator must be over a copy of the read buffer,
> to
> >> > > provide a consistent snapshot of the store.
> >> > >
> >> > > BB2: Basically, the RocksDBTransactionBuffer will maintain an extra
> >> > "write
> >> > > buffer", which uses a RocksDB WriteBatch to stage writes during
> >> > > put/delete. This yields better throughput than iterating the
> >> > > ConcurrentSkipListMap and writing to a WriteBatch during commit, and
> >> > > reduces the time that IQ threads (that need a readLock on the
> >> > > ConcurrentSkipListMap) are blocked from creating iterators during
> >> commit.
> >> > >
> >> > > InMemoryTransactionBuffers do not need this extra "write buffer", as
> >> > there
> >> > > is no more efficient implementation for them to use. This means
> that,
> >> vs.
> >> > > RocksDB, InMemoryStores will actually use less memory buffering
> >> records.
> >> > >
> >> > > BB3: This is a good question, and one that I've not really dug too
> >> deeply
> >> > > into. I suspect that there's a level of duplication of concerns
> here,
> >> but
> >> > > there are subtle differences: the cache is an LRU cache, which can
> >> cache
> >> > > records across multiple commits (if they're frequently read, for
> >> > example),
> >> > > whereas the transaction buffer always exclusively contains
> uncommitted
> >> > > writes.
> >> > >
> >> > > In terms of correctness/consistency, I don't think we have a
> problem.
> >> For
> >> > > READ_COMMITTED IQ, we skip both the cache and transaction buffers.
> For
> >> > > READ_UNCOMMITTED IQ (and StreamThread reads) we read in the order
> >> > dictated
> >> > > by the StateStore layering, which is: cache first, then transaction
> >> > buffer,
> >> > > then underlying store. Since writes always update the cache, this is
> >> > > correct. The one wrinkle is that cached writes are only written to
> >> > > downstream store layers (and therefore buffered by the transaction
> >> > buffer)
> >> > > when the entry is evicted, or the cache is flushed or committed.
> This
> >> > > essentially double-buffers writes, which is a bit wasteful of
> memory.
> >> > >
> >> > > I don't really know much about the cache, but as far as I can tell
> it
> >> > > might eliminate some (de)serialization overhead? Regardless, given
> >> that
> >> > it
> >> > > implements a different behaviour (LRU vs. buffering uncommitted
> >> writes),
> >> > > eliminating it might cause some subtle performance regressions for
> >> *some*
> >> > > users.
> >> > >
> >> > > Maybe instead we have the *default* cache size change to 0 when
> >> > > enable.transactional.statestores = true and
> >> > > statestore.uncommitted.max.bytes > 0? That would reduce memory usage
> >> for
> >> > > users that aren't explicitly relying on the cache.
> >> > >
> >> > > BB4: Ahh yes, good catch! This is showing how old this KIP is :-)
> >> > >
> >> > > Regards,
> >> > > Nick
> >> > >
> >> > > On Mon, 20 Apr 2026 at 21:11, Bill Bejeck <[email protected]>
> wrote:
> >> > >
> >> > >> Hi Nick,
> >> > >>
> >> > >> Thanks for the update to the KIP!  It's great to see this moving
> >> again.
> >> > >> Overall the KIP updates LGTM, but I do have a couple of comments
> >> > >>
> >> > >> BB1- The KIP specificies using a ConcurrentSkipListMap for the
> >> > >> transactional buffer and for IQ READ_UNCOMMITTED queries KS will
> >> take a
> >> > >> snapshot and during the copy lock the StreamThread with
> >> > >> RenetrantReadWriteLock.
> >> > >> Now if the IQ queries will only be served from the in-memory
> >> > transactional
> >> > >> buffer, that's an acceptable trade-off since even though the
> >> > >> ConcurrentSkipListMap uses weakly consistent iterators, a commit
> >> > >> mid-iteration would mean those records not viewed as a result of
> >> > clearing
> >> > >> the buffer.  Would it be worth exploring using IQ against the store
> >> and
> >> > >> the
> >> > >> buffer thus removing the copy+locking?  I think this could be
> >> > >> worthwhile (unless just technically unfeasible) as the way it
> stands
> >> > users
> >> > >> will have to possibly make tradeoffs in performance by adjusting
> >> either
> >> > >> the
> >> > >> commit interval or the size of uncommitted bytes. Of course as
> >> specified
> >> > >> in
> >> > >> the KIP they have the option of going with READ_COMMITTED.
> >> > >>
> >> > >> BB2 - In the KIP section "Transaction Buffer Thread Safety" there's
> >> this
> >> > >> sentence which I find confusing:
> >> > >>
> >> > >> > Reads from the StreamThread and READ_UNCOMMITTED IQ threads are
> >> > serviced
> >> > >> > by the read buffer. The write buffer is (optionally) used to
> buffer
> >> > >> > writes for commit to the database, if doing so would yield better
> >> > >> > performance than using the read buffer.
> >> > >>
> >> > >> Can you clarify what this means?
> >> > >>
> >> > >> BB3 - How does the `statestore.uncommitted.max.bytes` work in
> >> > >> conjuction with the `statestore.cache.max.bytes.buffering` config?
> >> If
> >> > >> users elect to go with transactional stores is the statestore cache
> >> > still
> >> > >> needed?  Since the statestore cache flushes on commit() as well as
> >> the
> >> > >> transactional cache what's the workflow like between the two?  The
> >> > default
> >> > >> size of the store cache is 10M so if it becomes full and a commit
> is
> >> not
> >> > >> called  would it empty into the transactional cache?
> >> > >>
> >> > >> BB4- In discussing the `enable.transactional.statestores` config it
> >> > >> mentions "exactly-once, exactly-once-v2 or exactly-once-beta" but
> we
> >> can
> >> > >> go
> >> > >> with just `exactly-once-v2` , we removed the others since 4.0
> >> > >>
> >> > >> Thanks!
> >> > >> Bill
> >> > >>
> >> > >>
> >> > >> On Fri, Apr 17, 2026 at 1:13 PM Nick Telford <
> [email protected]
> >> >
> >> > >> wrote:
> >> > >>
> >> > >> > Hi everyone,
> >> > >> >
> >> > >> > We're circling back to KIP-892, with the goal of including it in
> >> 4.4.
> >> > To
> >> > >> > that end, I've updated the KIP with substantial design
> >> improvements:
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892:+Transactional+Semantics+for+StateStores
> >> > >> >
> >> > >> > Highlights:
> >> > >> >
> >> > >> >    - TransactionBuffers that support both RocksDB and InMemory
> >> stores
> >> > >> >    - Support for custom IsolationLevel of Interactive Queries,
> >> > >> including a
> >> > >> >    default (IQv1 and IQv2) and a per-request isolation level
> (IQv2
> >> > only)
> >> > >> >    - New store-level metric for uncommitted-bytes
> >> > >> >
> >> > >> > Let me know what you think!
> >> > >> >
> >> > >> > Regards,
> >> > >> > Nick
> >> > >> >
> >> > >> > On Wed, 17 Apr 2024 at 11:50, Nick Telford <
> [email protected]
> >> >
> >> > >> wrote:
> >> > >> >
> >> > >> > > Hi Walker,
> >> > >> > >
> >> > >> > > Feel free to ask away, either on the mailing list of the
> >> Confluent
> >> > >> > > Community Slack, where I hang out :-)
> >> > >> > >
> >> > >> > > The implementation is *mostly* complete, although it needs some
> >> > >> > polishing.
> >> > >> > > It's worth noting that KIP-1035 is a hard prerequisite for
> this.
> >> > >> > >
> >> > >> > > Regards,
> >> > >> > > Nick
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
>

Reply via email to