One more thing: I noted John's suggestion in the KIP, under "Rejected Alternatives". I still think it's an idea worth pursuing, but I believe that it's out of the scope of this KIP, because it solves a different set of problems to this KIP, and the scope of this one has already grown quite large!
On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com> wrote: > Hi everyone, > > I've updated the KIP ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores) > with the latest changes; mostly bringing back "Atomic Checkpointing" (for > what feels like the 10th time!). I think the one thing missing is some > changes to metrics (notably the store "flush" metrics will need to be > renamed to "commit"). > > The reason I brought back Atomic Checkpointing was to decouple store flush > from store commit. This is important, because with Transactional > StateStores, we now need to call "flush" on *every* Task commit, and not > just when the StateStore is closing, otherwise our transaction buffer will > never be written and persisted, instead growing unbounded! I experimented > with some simple solutions, like forcing a store flush whenever the > transaction buffer was likely to exceed its configured size, but this was > brittle: it prevented the transaction buffer from being configured to be > unbounded, and it still would have required explicit flushes of RocksDB, > yielding sub-optimal performance and memory utilization. > > I deemed Atomic Checkpointing to be the "right" way to resolve this > problem. By ensuring that the changelog offsets that correspond to the most > recently written records are always atomically written to the StateStore > (by writing them to the same transaction buffer), we can avoid forcibly > flushing the RocksDB memtables to disk, letting RocksDB flush them only > when necessary, without losing any of our consistency guarantees. See the > updated KIP for more info. > > I have fully implemented these changes, although I'm still not entirely > happy with the implementation for segmented StateStores, so I plan to > refactor that. Despite that, all tests pass. If you'd like to try out or > review this highly experimental and incomplete branch, it's available here: > https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's built > against Kafka 3.5.0 so that I had a stable base to build and test it on, > and to enable easy apples-to-apples comparisons in a live environment. I > plan to rebase it against trunk once it's nearer completion and has been > proven on our main application. > > I would really appreciate help in reviewing and testing: > - Segmented (Versioned, Session and Window) stores > - Global stores > > As I do not currently use either of these, so my primary test environment > doesn't test these areas. > > I'm going on Parental Leave starting next week for a few weeks, so will > not have time to move this forward until late August. That said, your > feedback is welcome and appreciated, I just won't be able to respond as > quickly as usual. > > Regards, > Nick > > On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com> wrote: > >> Hi Bruno >> >> Yes, that's correct, although the impact on IQ is not something I had >> considered. >> >> What about atomically updating the state store from the transaction >>> buffer every commit interval and writing the checkpoint (thus, flushing >>> the memtable) every configured amount of data and/or number of commit >>> intervals? >>> >> >> I'm not quite sure I follow. Are you suggesting that we add an additional >> config for the max number of commit intervals between checkpoints? That >> way, we would checkpoint *either* when the transaction buffers are nearly >> full, *OR* whenever a certain number of commit intervals have elapsed, >> whichever comes first? >> >> That certainly seems reasonable, although this re-ignites an earlier >> debate about whether a config should be measured in "number of commit >> intervals", instead of just an absolute time. >> >> FWIW, I realised that this issue is the reason I was pursuing the Atomic >> Checkpoints, as it de-couples memtable flush from checkpointing, which >> enables us to just checkpoint on every commit without any performance >> impact. Atomic Checkpointing is definitely the "best" solution, but I'm not >> sure if this is enough to bring it back into this KIP. >> >> I'm currently working on moving all the transactional logic directly into >> RocksDBStore itself, which does away with the StateStore#newTransaction >> method, and reduces the number of new classes introduced, significantly >> reducing the complexity. If it works, and the complexity is drastically >> reduced, I may try bringing back Atomic Checkpoints into this KIP. >> >> Regards, >> Nick >> >> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org> wrote: >> >>> Hi Nick, >>> >>> Thanks for the insights! Very interesting! >>> >>> As far as I understand, you want to atomically update the state store >>> from the transaction buffer, flush the memtable of a state store and >>> write the checkpoint not after the commit time elapsed but after the >>> transaction buffer reached a size that would lead to exceeding >>> statestore.transaction.buffer.max.bytes before the next commit interval >>> ends. >>> That means, the Kafka transaction would commit every commit interval but >>> the state store will only be atomically updated roughly every >>> statestore.transaction.buffer.max.bytes of data. Also IQ would then only >>> see new data roughly every statestore.transaction.buffer.max.bytes. >>> After a failure the state store needs to restore up to >>> statestore.transaction.buffer.max.bytes. >>> >>> Is this correct? >>> >>> What about atomically updating the state store from the transaction >>> buffer every commit interval and writing the checkpoint (thus, flushing >>> the memtable) every configured amount of data and/or number of commit >>> intervals? In such a way, we would have the same delay for records >>> appearing in output topics and IQ because both would appear when the >>> Kafka transaction is committed. However, after a failure the state store >>> still needs to restore up to statestore.transaction.buffer.max.bytes and >>> it might restore data that is already in the state store because the >>> checkpoint lags behind the last stable offset (i.e. the last committed >>> offset) of the changelog topics. Restoring data that is already in the >>> state store is idempotent, so eos should not violated. >>> This solution needs at least one new config to specify when a checkpoint >>> should be written. >>> >>> >>> >>> A small correction to your previous e-mail that does not change anything >>> you said: Under alos the default commit interval is 30 seconds, not five >>> seconds. >>> >>> >>> Best, >>> Bruno >>> >>> >>> On 01.07.23 12:37, Nick Telford wrote: >>> > Hi everyone, >>> > >>> > I've begun performance testing my branch on our staging environment, >>> > putting it through its paces in our non-trivial application. I'm >>> already >>> > observing the same increased flush rate that we saw the last time we >>> > attempted to use a version of this KIP, but this time, I think I know >>> why. >>> > >>> > Pre-KIP-892, StreamTask#postCommit, which is called at the end of the >>> Task >>> > commit process, has the following behaviour: >>> > >>> > - Under ALOS: checkpoint the state stores. This includes >>> > flushing memtables in RocksDB. This is acceptable because the >>> default >>> > commit.interval.ms is 5 seconds, so forcibly flushing memtables >>> every 5 >>> > seconds is acceptable for most applications. >>> > - Under EOS: checkpointing is not done, *unless* it's being >>> forced, due >>> > to e.g. the Task closing or being revoked. This means that under >>> normal >>> > processing conditions, the state stores will not be checkpointed, >>> and will >>> > not have memtables flushed at all , unless RocksDB decides to >>> flush them on >>> > its own. Checkpointing stores and force-flushing their memtables >>> is only >>> > done when a Task is being closed. >>> > >>> > Under EOS, KIP-892 needs to checkpoint stores on at least *some* normal >>> > Task commits, in order to write the RocksDB transaction buffers to the >>> > state stores, and to ensure the offsets are synced to disk to prevent >>> > restores from getting out of hand. Consequently, my current >>> implementation >>> > calls maybeCheckpoint on *every* Task commit, which is far too >>> frequent. >>> > This causes checkpoints every 10,000 records, which is a change in >>> flush >>> > behaviour, potentially causing performance problems for some >>> applications. >>> > >>> > I'm looking into possible solutions, and I'm currently leaning towards >>> > using the statestore.transaction.buffer.max.bytes configuration to >>> > checkpoint Tasks once we are likely to exceed it. This would >>> complement the >>> > existing "early Task commit" functionality that this configuration >>> > provides, in the following way: >>> > >>> > - Currently, we use statestore.transaction.buffer.max.bytes to >>> force an >>> > early Task commit if processing more records would cause our state >>> store >>> > transactions to exceed the memory assigned to them. >>> > - New functionality: when a Task *does* commit, we will not >>> checkpoint >>> > the stores (and hence flush the transaction buffers) unless we >>> expect to >>> > cross the statestore.transaction.buffer.max.bytes threshold before >>> the next >>> > commit >>> > >>> > I'm also open to suggestions. >>> > >>> > Regards, >>> > Nick >>> > >>> > On Thu, 22 Jun 2023 at 14:06, Nick Telford <nick.telf...@gmail.com> >>> wrote: >>> > >>> >> Hi Bruno! >>> >> >>> >> 3. >>> >> By "less predictable for users", I meant in terms of understanding the >>> >> performance profile under various circumstances. The more complex the >>> >> solution, the more difficult it would be for users to understand the >>> >> performance they see. For example, spilling records to disk when the >>> >> transaction buffer reaches a threshold would, I expect, reduce write >>> >> throughput. This reduction in write throughput could be unexpected, >>> and >>> >> potentially difficult to diagnose/understand for users. >>> >> At the moment, I think the "early commit" concept is relatively >>> >> straightforward; it's easy to document, and conceptually fairly >>> obvious to >>> >> users. We could probably add a metric to make it easier to understand >>> when >>> >> it happens though. >>> >> >>> >> 3. (the second one) >>> >> The IsolationLevel is *essentially* an indirect way of telling >>> StateStores >>> >> whether they should be transactional. READ_COMMITTED essentially >>> requires >>> >> transactions, because it dictates that two threads calling >>> >> `newTransaction()` should not see writes from the other transaction >>> until >>> >> they have been committed. With READ_UNCOMMITTED, all bets are off, and >>> >> stores can allow threads to observe written records at any time, >>> which is >>> >> essentially "no transactions". That said, StateStores are free to >>> implement >>> >> these guarantees however they can, which is a bit more relaxed than >>> >> dictating "you must use transactions". For example, with RocksDB we >>> would >>> >> implement these as READ_COMMITTED == WBWI-based "transactions", >>> >> READ_UNCOMMITTED == direct writes to the database. But with other >>> storage >>> >> engines, it might be preferable to *always* use transactions, even >>> when >>> >> unnecessary; or there may be storage engines that don't provide >>> >> transactions, but the isolation guarantees can be met using a >>> different >>> >> technique. >>> >> My idea was to try to keep the StateStore interface as loosely coupled >>> >> from the Streams engine as possible, to give implementers more >>> freedom, and >>> >> reduce the amount of internal knowledge required. >>> >> That said, I understand that "IsolationLevel" might not be the right >>> >> abstraction, and we can always make it much more explicit if >>> required, e.g. >>> >> boolean transactional() >>> >> >>> >> 7-8. >>> >> I can make these changes either later today or tomorrow. >>> >> >>> >> Small update: >>> >> I've rebased my branch on trunk and fixed a bunch of issues that >>> needed >>> >> addressing. Currently, all the tests pass, which is promising, but it >>> will >>> >> need to undergo some performance testing. I haven't (yet) worked on >>> >> removing the `newTransaction()` stuff, but I would expect that, >>> >> behaviourally, it should make no difference. The branch is available >>> at >>> >> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is >>> >> interested in taking an early look. >>> >> >>> >> Regards, >>> >> Nick >>> >> >>> >> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <cado...@apache.org> >>> wrote: >>> >> >>> >>> Hi Nick, >>> >>> >>> >>> 1. >>> >>> Yeah, I agree with you. That was actually also my point. I understood >>> >>> that John was proposing the ingestion path as a way to avoid the >>> early >>> >>> commits. Probably, I misinterpreted the intent. >>> >>> >>> >>> 2. >>> >>> I agree with John here, that actually it is public API. My question >>> is >>> >>> how this usage pattern affects normal processing. >>> >>> >>> >>> 3. >>> >>> My concern is that checking for the size of the transaction buffer >>> and >>> >>> maybe triggering an early commit affects the whole processing of >>> Kafka >>> >>> Streams. The transactionality of a state store is not confined to the >>> >>> state store itself, but spills over and changes the behavior of other >>> >>> parts of the system. I agree with you that it is a decent >>> compromise. I >>> >>> just wanted to analyse the downsides and list the options to overcome >>> >>> them. I also agree with you that all options seem quite heavy >>> compared >>> >>> with your KIP. I do not understand what you mean with "less >>> predictable >>> >>> for users", though. >>> >>> >>> >>> >>> >>> I found the discussions about the alternatives really interesting. >>> But I >>> >>> also think that your plan sounds good and we should continue with it! >>> >>> >>> >>> >>> >>> Some comments on your reply to my e-mail on June 20th: >>> >>> >>> >>> 3. >>> >>> Ah, now, I understand the reasoning behind putting isolation level in >>> >>> the state store context. Thanks! Should that also be a way to give >>> the >>> >>> the state store the opportunity to decide whether to turn on >>> >>> transactions or not? >>> >>> With my comment, I was more concerned about how do you know if a >>> >>> checkpoint file needs to be written under EOS, if you do not have a >>> way >>> >>> to know if the state store is transactional or not. If a state store >>> is >>> >>> transactional, the checkpoint file can be written during normal >>> >>> processing under EOS. If the state store is not transactional, the >>> >>> checkpoint file must not be written under EOS. >>> >>> >>> >>> 7. >>> >>> My point was about not only considering the bytes in memory in config >>> >>> statestore.uncommitted.max.bytes, but also bytes that might be >>> spilled >>> >>> on disk. Basically, I was wondering whether you should remove the >>> >>> "memory" in "Maximum number of memory bytes to be used to >>> >>> buffer uncommitted state-store records." My thinking was that even >>> if a >>> >>> state store spills uncommitted bytes to disk, limiting the overall >>> bytes >>> >>> might make sense. Thinking about it again and considering the recent >>> >>> discussions, it does not make too much sense anymore. >>> >>> I like the name statestore.transaction.buffer.max.bytes that you >>> proposed. >>> >>> >>> >>> 8. >>> >>> A high-level description (without implementation details) of how >>> Kafka >>> >>> Streams will manage the commit of changelog transactions, state store >>> >>> transactions and checkpointing would be great. Would be great if you >>> >>> could also add some sentences about the behavior in case of a >>> failure. >>> >>> For instance how does a transactional state store recover after a >>> >>> failure or what happens with the transaction buffer, etc. (that is >>> what >>> >>> I meant by "fail-over" in point 9.) >>> >>> >>> >>> Best, >>> >>> Bruno >>> >>> >>> >>> On 21.06.23 18:50, Nick Telford wrote: >>> >>>> Hi Bruno, >>> >>>> >>> >>>> 1. >>> >>>> Isn't this exactly the same issue that WriteBatchWithIndex >>> transactions >>> >>>> have, whereby exceeding (or likely to exceed) configured memory >>> needs to >>> >>>> trigger an early commit? >>> >>>> >>> >>>> 2. >>> >>>> This is one of my big concerns. Ultimately, any approach based on >>> >>> cracking >>> >>>> open RocksDB internals and using it in ways it's not really designed >>> >>> for is >>> >>>> likely to have some unforseen performance or consistency issues. >>> >>>> >>> >>>> 3. >>> >>>> What's your motivation for removing these early commits? While not >>> >>> ideal, I >>> >>>> think they're a decent compromise to ensure consistency whilst >>> >>> maintaining >>> >>>> good and predictable performance. >>> >>>> All 3 of your suggested ideas seem *very* complicated, and might >>> >>> actually >>> >>>> make behaviour less predictable for users as a consequence. >>> >>>> >>> >>>> I'm a bit concerned that the scope of this KIP is growing a bit out >>> of >>> >>>> control. While it's good to discuss ideas for future improvements, I >>> >>> think >>> >>>> it's important to narrow the scope down to a design that achieves >>> the >>> >>> most >>> >>>> pressing objectives (constant sized restorations during dirty >>> >>>> close/unexpected errors). Any design that this KIP produces can >>> >>> ultimately >>> >>>> be changed in the future, especially if the bulk of it is internal >>> >>>> behaviour. >>> >>>> >>> >>>> I'm going to spend some time next week trying to re-work the >>> original >>> >>>> WriteBatchWithIndex design to remove the newTransaction() method, >>> such >>> >>> that >>> >>>> it's just an implementation detail of RocksDBStore. That way, if we >>> >>> want to >>> >>>> replace WBWI with something in the future, like the SST file >>> management >>> >>>> outlined by John, then we can do so with little/no API changes. >>> >>>> >>> >>>> Regards, >>> >>>> >>> >>>> Nick >>> >>>> >>> >>> >>> >> >>> > >>> >>