Nick, Thanks for the response.
>Can you clarify how much state was restored in those 11 seconds? That was a full restoration of ~650MB of state after I wiped the state directory. The restoration after a crash with your branch is nearly instantaneous, whereas with plain Kafka 3.5.0 a crash triggers a full restoration (8 seconds). Additionally, I pulled, rebuilt, and re-tested your changes and now the restoration time with your branch is the same as with vanilla Streams! Fantastic work! I plan to do some more testing with larger state stores over the next couple weeks, both with RocksDB and Speedb OSS. And perhaps I might even try enabling some of the experimental Speedb OSS features, such as the [Improved Write Flow](https://docs.speedb.io/speedb-features/write-flow). As far as I understand, this isn't possible to do through the standard RocksDBConfigSetter since some of the config options are Speedb-specific. Cheers, Colt McNealy *Founder, LittleHorse.dev* On Mon, Sep 11, 2023 at 4:29 AM Nick Telford <nick.telf...@gmail.com> wrote: > Hi Colt, > > Thanks for taking the time to run your benchmarks on this, that's > incredibly helpful. > > > With KIP 892, I verified that unclean shutdown does not cause a fresh > > restore (!!!!). I got the following benchmark results: > > - Benchmark took 216 seconds > > - 1,401 tasks per second on one partition > > - 11 seconds to restore the state > > Can you clarify how much state was restored in those 11 seconds? Was this > the time to do the full restore regardless, or was it the time to only > restore a small fraction of the state (e.g. the last aborted transaction)? > > > -- QUESTION: Because we observed a significant (30% or so) and > reproducible > > slowdown during restoration, it seems like KIP-892 uses the checkpointing > > behavior during restoration as well? If so, I would argue that this might > > not be necessary, because everything we write is already committed, so we > > don't need to change the behavior during restoration or standby tasks. > > Perhaps we could write the offsets to RocksDB on every batch (or even > every > > 5 seconds or so). > > Restore has always used a completely separate code-path to regular writes, > and continues to do so. I had a quick pass over the code and I suspect I > know what's causing the performance degradation: for every restored record, > I was adding the changelog offset of that record to the batch along with > the record. This is different to the regular write-path, which only adds > the current offsets once, on-commit. This writeOffset method is fairly > expensive, since it has to serialize the TopicPartition and offset that's > being written to the database. > > Assuming this is the cause, I've already pushed a fix to my branch that > will only call writeOffset once per-batch, and also adds some caching to > the serialization in writeOffset, that should also enhance performance of > state commit in the normal write-path. > > Please let me know if this addresses the issue! > > Regards, > Nick > > > On Mon, 11 Sept 2023 at 05:38, Colt McNealy <c...@littlehorse.io> wrote: > > > Howdy folks, > > > > First I wanted to say fantastic work and thank you to Nick. I built your > > branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did > > some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0` > > branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1. > And > > it worked! Including the global store (we don't have any segmented > stores, > > unfortunately). > > > > The test I ran involved running 3,000 workflows with 100 tasks each, and > > roughly 650MB state total. > > > > With Streams 3.5.0, I indeed verified that unclean shutdown caused a > fresh > > restore from scratch. I also benchmarked my application at: > > - Running the benchmark took 211 seconds > > - 1,421 tasks per second on one partition > > - 8 seconds to restore the state (650MB or so) > > > > With KIP 892, I verified that unclean shutdown does not cause a fresh > > restore (!!!!). I got the following benchmark results: > > - Benchmark took 216 seconds > > - 1,401 tasks per second on one partition > > - 11 seconds to restore the state > > > > I ran the restorations many times to ensure that there was no rounding > > error or noise; the results were remarkably consistent. Additionally, I > ran > > the restorations with KIP-892 built with Speedb OSS. The restoration time > > consistently came out as 10 seconds, which was an improvement from the 11 > > seconds observed with RocksDB + KIP-892. > > > > My application is bottlenecked mostly by serialization and > deserialization, > > so improving the performance of the state store doesn't really impact our > > throughput that much. And the processing performance (benchmark time, > > tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at > > larger state store sizes, RocksDB performance begins to degrade, so that > > might not be true once we pass 20GB per partition. > > > > -- QUESTION: Because we observed a significant (30% or so) and > reproducible > > slowdown during restoration, it seems like KIP-892 uses the checkpointing > > behavior during restoration as well? If so, I would argue that this might > > not be necessary, because everything we write is already committed, so we > > don't need to change the behavior during restoration or standby tasks. > > Perhaps we could write the offsets to RocksDB on every batch (or even > every > > 5 seconds or so). > > > > -- Note: This was a very small-scale test, with <1GB of state (as I > didn't > > have time to spend hours building up state). In the past I have noted > that > > RocksDB performance degrades significantly after 25GB of state in one > > store. Future work involves determining the performance impact of KIP-892 > > relative to trunk at larger scale, since it's possible that the relative > > behaviors are far different (i.e. relative to trunk, 892's processing and > > restoration throughput might be much better or much worse). > > > > -- Note: For those who want to replicate the tests, you can find the > branch > > of our streams app here: > > > > > https://github.com/littlehorse-enterprises/littlehorse/tree/minor/testing-streams-forks > > . The example I ran was `examples/hundred-tasks`, and I ran the server > with > > `./local-dev/do-server.sh one-partition`. The `STREAMS_TESTS.md` file > has a > > detailed breakdown of the testing. > > > > Anyways, I'm super excited about this KIP and if a bit more future > testing > > goes well, we plan to ship our product with a build of KIP-892, Speedb > OSS, > > and potentially a few other minor tweaks that we are thinking about. > > > > Thanks Nick! > > > > Ride well, > > Colt McNealy > > > > *Founder, LittleHorse.dev* > > > > > > On Thu, Aug 24, 2023 at 3:19 AM Nick Telford <nick.telf...@gmail.com> > > wrote: > > > > > Hi Bruno, > > > > > > Thanks for taking the time to review the KIP. I'm back from leave now > and > > > intend to move this forwards as quickly as I can. > > > > > > Addressing your points: > > > > > > 1. > > > Because flush() is part of the StateStore API, it's exposed to custom > > > Processors, which might be making calls to flush(). This was actually > the > > > case in a few integration tests. > > > To maintain as much compatibility as possible, I'd prefer not to make > > this > > > an UnsupportedOperationException, as it will cause previously working > > > Processors to start throwing exceptions at runtime. > > > I agree that it doesn't make sense for it to proxy commit(), though, as > > > that would cause it to violate the "StateStores commit only when the > Task > > > commits" rule. > > > Instead, I think we should make this a no-op. That way, existing user > > > Processors will continue to work as-before, without violation of store > > > consistency that would be caused by premature flush/commit of > StateStore > > > data to disk. > > > What do you think? > > > > > > 2. > > > As stated in the JavaDoc, when a StateStore implementation is > > > transactional, but is unable to estimate the uncommitted memory usage, > > the > > > method will return -1. > > > The intention here is to permit third-party implementations that may > not > > be > > > able to estimate memory usage. > > > > > > Yes, it will be 0 when nothing has been written to the store yet. I > > thought > > > that was implied by "This method will return an approximation of the > > memory > > > would be freed by the next call to {@link #commit(Map)}" and "@return > The > > > approximate size of all records awaiting {@link #commit(Map)}", > however, > > I > > > can add it explicitly to the JavaDoc if you think this is unclear? > > > > > > 3. > > > I realise this is probably the most contentious point in my design, and > > I'm > > > open to changing it if I'm unable to convince you of the benefits. > > > Nevertheless, here's my argument: > > > The Interactive Query (IQ) API(s) are directly provided StateStores to > > > query, and it may be important for users to programmatically know which > > > mode the StateStore is operating under. If we simply provide an > > > "eosEnabled" boolean (as used throughout the internal streams engine), > or > > > similar, then users will need to understand the operation and > > consequences > > > of each available processing mode and how it pertains to their > > StateStore. > > > > > > Interactive Query users aren't the only people that care about the > > > processing.mode/IsolationLevel of a StateStore: implementers of custom > > > StateStores also need to understand the behaviour expected of their > > > implementation. KIP-892 introduces some assumptions into the Streams > > Engine > > > about how StateStores operate under each processing mode, and it's > > > important that custom implementations adhere to those assumptions in > > order > > > to maintain the consistency guarantees. > > > > > > IsolationLevels provide a high-level contract on the behaviour of the > > > StateStore: a user knows that under READ_COMMITTED, they will see > writes > > > only after the Task has committed, and under READ_UNCOMMITTED they will > > see > > > writes immediately. No understanding of the details of each > > processing.mode > > > is required, either for IQ users or StateStore implementers. > > > > > > An argument can be made that these contractual guarantees can simply be > > > documented for the processing.mode (i.e. that exactly-once and > > > exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves > like > > > READ_UNCOMMITTED), but there are several small issues with this I'd > > prefer > > > to avoid: > > > > > > - Where would we document these contracts, in a way that is > difficult > > > for users/implementers to miss/ignore? > > > - It's not clear to users that the processing mode is communicating > > > an expectation of read isolation, unless they read the > documentation. > > > Users > > > rarely consult documentation unless they feel they need to, so it's > > > likely > > > this detail would get missed by many users. > > > - It tightly couples processing modes to read isolation. Adding new > > > processing modes, or changing the read isolation of existing > > processing > > > modes would be difficult/impossible. > > > > > > Ultimately, the cost of introducing IsolationLevels is just a single > > > method, since we re-use the existing IsolationLevel enum from Kafka. > This > > > gives us a clear place to document the contractual guarantees expected > > > of/provided by StateStores, that is accessible both by the StateStore > > > itself, and by IQ users. > > > > > > (Writing this I've just realised that the StateStore and IQ APIs > actually > > > don't provide access to StateStoreContext that IQ users would have > direct > > > access to... Perhaps StateStore should expose isolationLevel() itself > > too?) > > > > > > 4. > > > Yeah, I'm not comfortable renaming the metrics in-place either, as > it's a > > > backwards incompatible change. My concern is that, if we leave the > > existing > > > "flush" metrics in place, they will be confusing to users. Right now, > > > "flush" metrics record explicit flushes to disk, but under KIP-892, > even > > a > > > commit() will not explicitly flush data to disk - RocksDB will decide > on > > > when to flush memtables to disk itself. > > > > > > If we keep the existing "flush" metrics, we'd have two options, which > > both > > > seem pretty bad to me: > > > > > > 1. Have them record calls to commit(), which would be misleading, as > > > data is no longer explicitly "flushed" to disk by this call. > > > 2. Have them record nothing at all, which is equivalent to removing > > the > > > metrics, except that users will see the metric still exists and so > > > assume > > > that the metric is correct, and that there's a problem with their > > system > > > when there isn't. > > > > > > I agree that removing them is also a bad solution, and I'd like some > > > guidance on the best path forward here. > > > > > > 5. > > > Position files are updated on every write to a StateStore. Since our > > writes > > > are now buffered until commit(), we can't update the Position file > until > > > commit() has been called, otherwise it would be inconsistent with the > > data > > > in the event of a rollback. Consequently, we need to manage these > offsets > > > the same way we manage the checkpoint offsets, and ensure they're only > > > written on commit(). > > > > > > 6. > > > Agreed, although I'm not exactly sure yet what tests to write. How > > explicit > > > do we need to be here in the KIP? > > > > > > As for upgrade/downgrade: upgrade is designed to be seamless, and we > > should > > > definitely add some tests around that. Downgrade, it transpires, isn't > > > currently possible, as the extra column family for offset storage is > > > incompatible with the pre-KIP-892 implementation: when you open a > RocksDB > > > database, you must open all available column families or receive an > > error. > > > What currently happens on downgrade is that it attempts to open the > > store, > > > throws an error about the offsets column family not being opened, which > > > triggers a wipe and rebuild of the Task. Given that downgrades should > be > > > uncommon, I think this is acceptable behaviour, as the end-state is > > > consistent, even if it results in an undesirable state restore. > > > > > > Should I document the upgrade/downgrade behaviour explicitly in the > KIP? > > > > > > -- > > > > > > Regards, > > > Nick > > > > > > > > > On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <cado...@apache.org> > wrote: > > > > > > > Hi Nick! > > > > > > > > Thanks for the updates! > > > > > > > > 1. > > > > Why does StateStore#flush() default to > > > > StateStore#commit(Collections.emptyMap())? > > > > Since calls to flush() will not exist anymore after this KIP is > > > > released, I would rather throw an unsupported operation exception by > > > > default. > > > > > > > > > > > > 2. > > > > When would a state store return -1 from > > > > StateStore#approximateNumUncommittedBytes() while being > transactional? > > > > > > > > Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if > > > > the state store is transactional but nothing has been written to the > > > > state store yet? > > > > > > > > > > > > 3. > > > > Sorry for bringing this up again. Does this KIP really need to > > introduce > > > > StateStoreContext#isolationLevel()? StateStoreContext has already > > > > appConfigs() which basically exposes the same information, i.e., if > EOS > > > > is enabled or not. > > > > In one of your previous e-mails you wrote: > > > > > > > > "My idea was to try to keep the StateStore interface as loosely > coupled > > > > from the Streams engine as possible, to give implementers more > freedom, > > > > and reduce the amount of internal knowledge required." > > > > > > > > While I understand the intent, I doubt that it decreases the coupling > > of > > > > a StateStore interface and the Streams engine. READ_COMMITTED only > > > > applies to IQ but not to reads by processors. Thus, implementers need > > to > > > > understand how Streams accesses the state stores. > > > > > > > > I would like to hear what others think about this. > > > > > > > > > > > > 4. > > > > Great exposing new metrics for transactional state stores! However, I > > > > would prefer to add new metrics and deprecate (in the docs) the old > > > > ones. You can find examples of deprecated metrics here: > > > > https://kafka.apache.org/documentation/#selector_monitoring > > > > > > > > > > > > 5. > > > > Why does the KIP mention position files? I do not think they are > > related > > > > to transactions or flushes. > > > > > > > > > > > > 6. > > > > I think we will also need to adapt/add integration tests besides unit > > > > tests. Additionally, we probably need integration or system tests to > > > > verify that upgrades and downgrades between transactional and > > > > non-transactional state stores work as expected. > > > > > > > > > > > > Best, > > > > Bruno > > > > > > > > > > > > > > > > > > > > > > > > On 7/21/23 10:34 PM, Nick Telford wrote: > > > > > One more thing: I noted John's suggestion in the KIP, under > "Rejected > > > > > Alternatives". I still think it's an idea worth pursuing, but I > > believe > > > > > that it's out of the scope of this KIP, because it solves a > different > > > set > > > > > of problems to this KIP, and the scope of this one has already > grown > > > > quite > > > > > large! > > > > > > > > > > On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com > > > > > > wrote: > > > > > > > > > >> Hi everyone, > > > > >> > > > > >> I've updated the KIP ( > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > > > ) > > > > >> with the latest changes; mostly bringing back "Atomic > Checkpointing" > > > > (for > > > > >> what feels like the 10th time!). I think the one thing missing is > > some > > > > >> changes to metrics (notably the store "flush" metrics will need to > > be > > > > >> renamed to "commit"). > > > > >> > > > > >> The reason I brought back Atomic Checkpointing was to decouple > store > > > > flush > > > > >> from store commit. This is important, because with Transactional > > > > >> StateStores, we now need to call "flush" on *every* Task commit, > and > > > not > > > > >> just when the StateStore is closing, otherwise our transaction > > buffer > > > > will > > > > >> never be written and persisted, instead growing unbounded! I > > > > experimented > > > > >> with some simple solutions, like forcing a store flush whenever > the > > > > >> transaction buffer was likely to exceed its configured size, but > > this > > > > was > > > > >> brittle: it prevented the transaction buffer from being configured > > to > > > be > > > > >> unbounded, and it still would have required explicit flushes of > > > RocksDB, > > > > >> yielding sub-optimal performance and memory utilization. > > > > >> > > > > >> I deemed Atomic Checkpointing to be the "right" way to resolve > this > > > > >> problem. By ensuring that the changelog offsets that correspond to > > the > > > > most > > > > >> recently written records are always atomically written to the > > > StateStore > > > > >> (by writing them to the same transaction buffer), we can avoid > > > forcibly > > > > >> flushing the RocksDB memtables to disk, letting RocksDB flush them > > > only > > > > >> when necessary, without losing any of our consistency guarantees. > > See > > > > the > > > > >> updated KIP for more info. > > > > >> > > > > >> I have fully implemented these changes, although I'm still not > > > entirely > > > > >> happy with the implementation for segmented StateStores, so I plan > > to > > > > >> refactor that. Despite that, all tests pass. If you'd like to try > > out > > > or > > > > >> review this highly experimental and incomplete branch, it's > > available > > > > here: > > > > >> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: > it's > > > > built > > > > >> against Kafka 3.5.0 so that I had a stable base to build and test > it > > > on, > > > > >> and to enable easy apples-to-apples comparisons in a live > > > environment. I > > > > >> plan to rebase it against trunk once it's nearer completion and > has > > > been > > > > >> proven on our main application. > > > > >> > > > > >> I would really appreciate help in reviewing and testing: > > > > >> - Segmented (Versioned, Session and Window) stores > > > > >> - Global stores > > > > >> > > > > >> As I do not currently use either of these, so my primary test > > > > environment > > > > >> doesn't test these areas. > > > > >> > > > > >> I'm going on Parental Leave starting next week for a few weeks, so > > > will > > > > >> not have time to move this forward until late August. That said, > > your > > > > >> feedback is welcome and appreciated, I just won't be able to > respond > > > as > > > > >> quickly as usual. > > > > >> > > > > >> Regards, > > > > >> Nick > > > > >> > > > > >> On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com > > > > > > wrote: > > > > >> > > > > >>> Hi Bruno > > > > >>> > > > > >>> Yes, that's correct, although the impact on IQ is not something I > > had > > > > >>> considered. > > > > >>> > > > > >>> What about atomically updating the state store from the > transaction > > > > >>>> buffer every commit interval and writing the checkpoint (thus, > > > > flushing > > > > >>>> the memtable) every configured amount of data and/or number of > > > commit > > > > >>>> intervals? > > > > >>>> > > > > >>> > > > > >>> I'm not quite sure I follow. Are you suggesting that we add an > > > > additional > > > > >>> config for the max number of commit intervals between > checkpoints? > > > That > > > > >>> way, we would checkpoint *either* when the transaction buffers > are > > > > nearly > > > > >>> full, *OR* whenever a certain number of commit intervals have > > > elapsed, > > > > >>> whichever comes first? > > > > >>> > > > > >>> That certainly seems reasonable, although this re-ignites an > > earlier > > > > >>> debate about whether a config should be measured in "number of > > commit > > > > >>> intervals", instead of just an absolute time. > > > > >>> > > > > >>> FWIW, I realised that this issue is the reason I was pursuing the > > > > Atomic > > > > >>> Checkpoints, as it de-couples memtable flush from checkpointing, > > > which > > > > >>> enables us to just checkpoint on every commit without any > > performance > > > > >>> impact. Atomic Checkpointing is definitely the "best" solution, > but > > > > I'm not > > > > >>> sure if this is enough to bring it back into this KIP. > > > > >>> > > > > >>> I'm currently working on moving all the transactional logic > > directly > > > > into > > > > >>> RocksDBStore itself, which does away with the > > > StateStore#newTransaction > > > > >>> method, and reduces the number of new classes introduced, > > > significantly > > > > >>> reducing the complexity. If it works, and the complexity is > > > drastically > > > > >>> reduced, I may try bringing back Atomic Checkpoints into this > KIP. > > > > >>> > > > > >>> Regards, > > > > >>> Nick > > > > >>> > > > > >>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org> > > > wrote: > > > > >>> > > > > >>>> Hi Nick, > > > > >>>> > > > > >>>> Thanks for the insights! Very interesting! > > > > >>>> > > > > >>>> As far as I understand, you want to atomically update the state > > > store > > > > >>>> from the transaction buffer, flush the memtable of a state store > > and > > > > >>>> write the checkpoint not after the commit time elapsed but after > > the > > > > >>>> transaction buffer reached a size that would lead to exceeding > > > > >>>> statestore.transaction.buffer.max.bytes before the next commit > > > > interval > > > > >>>> ends. > > > > >>>> That means, the Kafka transaction would commit every commit > > interval > > > > but > > > > >>>> the state store will only be atomically updated roughly every > > > > >>>> statestore.transaction.buffer.max.bytes of data. Also IQ would > > then > > > > only > > > > >>>> see new data roughly every > > statestore.transaction.buffer.max.bytes. > > > > >>>> After a failure the state store needs to restore up to > > > > >>>> statestore.transaction.buffer.max.bytes. > > > > >>>> > > > > >>>> Is this correct? > > > > >>>> > > > > >>>> What about atomically updating the state store from the > > transaction > > > > >>>> buffer every commit interval and writing the checkpoint (thus, > > > > flushing > > > > >>>> the memtable) every configured amount of data and/or number of > > > commit > > > > >>>> intervals? In such a way, we would have the same delay for > records > > > > >>>> appearing in output topics and IQ because both would appear when > > the > > > > >>>> Kafka transaction is committed. However, after a failure the > state > > > > store > > > > >>>> still needs to restore up to > > statestore.transaction.buffer.max.bytes > > > > and > > > > >>>> it might restore data that is already in the state store because > > the > > > > >>>> checkpoint lags behind the last stable offset (i.e. the last > > > committed > > > > >>>> offset) of the changelog topics. Restoring data that is already > in > > > the > > > > >>>> state store is idempotent, so eos should not violated. > > > > >>>> This solution needs at least one new config to specify when a > > > > checkpoint > > > > >>>> should be written. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> A small correction to your previous e-mail that does not change > > > > anything > > > > >>>> you said: Under alos the default commit interval is 30 seconds, > > not > > > > five > > > > >>>> seconds. > > > > >>>> > > > > >>>> > > > > >>>> Best, > > > > >>>> Bruno > > > > >>>> > > > > >>>> > > > > >>>> On 01.07.23 12:37, Nick Telford wrote: > > > > >>>>> Hi everyone, > > > > >>>>> > > > > >>>>> I've begun performance testing my branch on our staging > > > environment, > > > > >>>>> putting it through its paces in our non-trivial application. > I'm > > > > >>>> already > > > > >>>>> observing the same increased flush rate that we saw the last > time > > > we > > > > >>>>> attempted to use a version of this KIP, but this time, I think > I > > > know > > > > >>>> why. > > > > >>>>> > > > > >>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end > of > > > the > > > > >>>> Task > > > > >>>>> commit process, has the following behaviour: > > > > >>>>> > > > > >>>>> - Under ALOS: checkpoint the state stores. This includes > > > > >>>>> flushing memtables in RocksDB. This is acceptable because > > the > > > > >>>> default > > > > >>>>> commit.interval.ms is 5 seconds, so forcibly flushing > > > memtables > > > > >>>> every 5 > > > > >>>>> seconds is acceptable for most applications. > > > > >>>>> - Under EOS: checkpointing is not done, *unless* it's > being > > > > >>>> forced, due > > > > >>>>> to e.g. the Task closing or being revoked. This means that > > > under > > > > >>>> normal > > > > >>>>> processing conditions, the state stores will not be > > > > checkpointed, > > > > >>>> and will > > > > >>>>> not have memtables flushed at all , unless RocksDB decides > > to > > > > >>>> flush them on > > > > >>>>> its own. Checkpointing stores and force-flushing their > > > memtables > > > > >>>> is only > > > > >>>>> done when a Task is being closed. > > > > >>>>> > > > > >>>>> Under EOS, KIP-892 needs to checkpoint stores on at least > *some* > > > > normal > > > > >>>>> Task commits, in order to write the RocksDB transaction buffers > > to > > > > the > > > > >>>>> state stores, and to ensure the offsets are synced to disk to > > > prevent > > > > >>>>> restores from getting out of hand. Consequently, my current > > > > >>>> implementation > > > > >>>>> calls maybeCheckpoint on *every* Task commit, which is far too > > > > >>>> frequent. > > > > >>>>> This causes checkpoints every 10,000 records, which is a change > > in > > > > >>>> flush > > > > >>>>> behaviour, potentially causing performance problems for some > > > > >>>> applications. > > > > >>>>> > > > > >>>>> I'm looking into possible solutions, and I'm currently leaning > > > > towards > > > > >>>>> using the statestore.transaction.buffer.max.bytes configuration > > to > > > > >>>>> checkpoint Tasks once we are likely to exceed it. This would > > > > >>>> complement the > > > > >>>>> existing "early Task commit" functionality that this > > configuration > > > > >>>>> provides, in the following way: > > > > >>>>> > > > > >>>>> - Currently, we use > statestore.transaction.buffer.max.bytes > > to > > > > >>>> force an > > > > >>>>> early Task commit if processing more records would cause > our > > > > state > > > > >>>> store > > > > >>>>> transactions to exceed the memory assigned to them. > > > > >>>>> - New functionality: when a Task *does* commit, we will > not > > > > >>>> checkpoint > > > > >>>>> the stores (and hence flush the transaction buffers) > unless > > we > > > > >>>> expect to > > > > >>>>> cross the statestore.transaction.buffer.max.bytes > threshold > > > > before > > > > >>>> the next > > > > >>>>> commit > > > > >>>>> > > > > >>>>> I'm also open to suggestions. > > > > >>>>> > > > > >>>>> Regards, > > > > >>>>> Nick > > > > >>>>> > > > > >>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford < > > nick.telf...@gmail.com > > > > > > > > >>>> wrote: > > > > >>>>> > > > > >>>>>> Hi Bruno! > > > > >>>>>> > > > > >>>>>> 3. > > > > >>>>>> By "less predictable for users", I meant in terms of > > understanding > > > > the > > > > >>>>>> performance profile under various circumstances. The more > > complex > > > > the > > > > >>>>>> solution, the more difficult it would be for users to > understand > > > the > > > > >>>>>> performance they see. For example, spilling records to disk > when > > > the > > > > >>>>>> transaction buffer reaches a threshold would, I expect, reduce > > > write > > > > >>>>>> throughput. This reduction in write throughput could be > > > unexpected, > > > > >>>> and > > > > >>>>>> potentially difficult to diagnose/understand for users. > > > > >>>>>> At the moment, I think the "early commit" concept is > relatively > > > > >>>>>> straightforward; it's easy to document, and conceptually > fairly > > > > >>>> obvious to > > > > >>>>>> users. We could probably add a metric to make it easier to > > > > understand > > > > >>>> when > > > > >>>>>> it happens though. > > > > >>>>>> > > > > >>>>>> 3. (the second one) > > > > >>>>>> The IsolationLevel is *essentially* an indirect way of telling > > > > >>>> StateStores > > > > >>>>>> whether they should be transactional. READ_COMMITTED > essentially > > > > >>>> requires > > > > >>>>>> transactions, because it dictates that two threads calling > > > > >>>>>> `newTransaction()` should not see writes from the other > > > transaction > > > > >>>> until > > > > >>>>>> they have been committed. With READ_UNCOMMITTED, all bets are > > off, > > > > and > > > > >>>>>> stores can allow threads to observe written records at any > time, > > > > >>>> which is > > > > >>>>>> essentially "no transactions". That said, StateStores are free > > to > > > > >>>> implement > > > > >>>>>> these guarantees however they can, which is a bit more relaxed > > > than > > > > >>>>>> dictating "you must use transactions". For example, with > RocksDB > > > we > > > > >>>> would > > > > >>>>>> implement these as READ_COMMITTED == WBWI-based > "transactions", > > > > >>>>>> READ_UNCOMMITTED == direct writes to the database. But with > > other > > > > >>>> storage > > > > >>>>>> engines, it might be preferable to *always* use transactions, > > even > > > > >>>> when > > > > >>>>>> unnecessary; or there may be storage engines that don't > provide > > > > >>>>>> transactions, but the isolation guarantees can be met using a > > > > >>>> different > > > > >>>>>> technique. > > > > >>>>>> My idea was to try to keep the StateStore interface as loosely > > > > coupled > > > > >>>>>> from the Streams engine as possible, to give implementers more > > > > >>>> freedom, and > > > > >>>>>> reduce the amount of internal knowledge required. > > > > >>>>>> That said, I understand that "IsolationLevel" might not be the > > > right > > > > >>>>>> abstraction, and we can always make it much more explicit if > > > > >>>> required, e.g. > > > > >>>>>> boolean transactional() > > > > >>>>>> > > > > >>>>>> 7-8. > > > > >>>>>> I can make these changes either later today or tomorrow. > > > > >>>>>> > > > > >>>>>> Small update: > > > > >>>>>> I've rebased my branch on trunk and fixed a bunch of issues > that > > > > >>>> needed > > > > >>>>>> addressing. Currently, all the tests pass, which is promising, > > but > > > > it > > > > >>>> will > > > > >>>>>> need to undergo some performance testing. I haven't (yet) > worked > > > on > > > > >>>>>> removing the `newTransaction()` stuff, but I would expect > that, > > > > >>>>>> behaviourally, it should make no difference. The branch is > > > available > > > > >>>> at > > > > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone > > is > > > > >>>>>> interested in taking an early look. > > > > >>>>>> > > > > >>>>>> Regards, > > > > >>>>>> Nick > > > > >>>>>> > > > > >>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna < > cado...@apache.org > > > > > > > >>>> wrote: > > > > >>>>>> > > > > >>>>>>> Hi Nick, > > > > >>>>>>> > > > > >>>>>>> 1. > > > > >>>>>>> Yeah, I agree with you. That was actually also my point. I > > > > understood > > > > >>>>>>> that John was proposing the ingestion path as a way to avoid > > the > > > > >>>> early > > > > >>>>>>> commits. Probably, I misinterpreted the intent. > > > > >>>>>>> > > > > >>>>>>> 2. > > > > >>>>>>> I agree with John here, that actually it is public API. My > > > question > > > > >>>> is > > > > >>>>>>> how this usage pattern affects normal processing. > > > > >>>>>>> > > > > >>>>>>> 3. > > > > >>>>>>> My concern is that checking for the size of the transaction > > > buffer > > > > >>>> and > > > > >>>>>>> maybe triggering an early commit affects the whole processing > > of > > > > >>>> Kafka > > > > >>>>>>> Streams. The transactionality of a state store is not > confined > > to > > > > the > > > > >>>>>>> state store itself, but spills over and changes the behavior > of > > > > other > > > > >>>>>>> parts of the system. I agree with you that it is a decent > > > > >>>> compromise. I > > > > >>>>>>> just wanted to analyse the downsides and list the options to > > > > overcome > > > > >>>>>>> them. I also agree with you that all options seem quite heavy > > > > >>>> compared > > > > >>>>>>> with your KIP. I do not understand what you mean with "less > > > > >>>> predictable > > > > >>>>>>> for users", though. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> I found the discussions about the alternatives really > > > interesting. > > > > >>>> But I > > > > >>>>>>> also think that your plan sounds good and we should continue > > with > > > > it! > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Some comments on your reply to my e-mail on June 20th: > > > > >>>>>>> > > > > >>>>>>> 3. > > > > >>>>>>> Ah, now, I understand the reasoning behind putting isolation > > > level > > > > in > > > > >>>>>>> the state store context. Thanks! Should that also be a way to > > > give > > > > >>>> the > > > > >>>>>>> the state store the opportunity to decide whether to turn on > > > > >>>>>>> transactions or not? > > > > >>>>>>> With my comment, I was more concerned about how do you know > if > > a > > > > >>>>>>> checkpoint file needs to be written under EOS, if you do not > > > have a > > > > >>>> way > > > > >>>>>>> to know if the state store is transactional or not. If a > state > > > > store > > > > >>>> is > > > > >>>>>>> transactional, the checkpoint file can be written during > normal > > > > >>>>>>> processing under EOS. If the state store is not > transactional, > > > the > > > > >>>>>>> checkpoint file must not be written under EOS. > > > > >>>>>>> > > > > >>>>>>> 7. > > > > >>>>>>> My point was about not only considering the bytes in memory > in > > > > config > > > > >>>>>>> statestore.uncommitted.max.bytes, but also bytes that might > be > > > > >>>> spilled > > > > >>>>>>> on disk. Basically, I was wondering whether you should remove > > the > > > > >>>>>>> "memory" in "Maximum number of memory bytes to be used to > > > > >>>>>>> buffer uncommitted state-store records." My thinking was that > > > even > > > > >>>> if a > > > > >>>>>>> state store spills uncommitted bytes to disk, limiting the > > > overall > > > > >>>> bytes > > > > >>>>>>> might make sense. Thinking about it again and considering the > > > > recent > > > > >>>>>>> discussions, it does not make too much sense anymore. > > > > >>>>>>> I like the name statestore.transaction.buffer.max.bytes that > > you > > > > >>>> proposed. > > > > >>>>>>> > > > > >>>>>>> 8. > > > > >>>>>>> A high-level description (without implementation details) of > > how > > > > >>>> Kafka > > > > >>>>>>> Streams will manage the commit of changelog transactions, > state > > > > store > > > > >>>>>>> transactions and checkpointing would be great. Would be great > > if > > > > you > > > > >>>>>>> could also add some sentences about the behavior in case of a > > > > >>>> failure. > > > > >>>>>>> For instance how does a transactional state store recover > > after a > > > > >>>>>>> failure or what happens with the transaction buffer, etc. > (that > > > is > > > > >>>> what > > > > >>>>>>> I meant by "fail-over" in point 9.) > > > > >>>>>>> > > > > >>>>>>> Best, > > > > >>>>>>> Bruno > > > > >>>>>>> > > > > >>>>>>> On 21.06.23 18:50, Nick Telford wrote: > > > > >>>>>>>> Hi Bruno, > > > > >>>>>>>> > > > > >>>>>>>> 1. > > > > >>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex > > > > >>>> transactions > > > > >>>>>>>> have, whereby exceeding (or likely to exceed) configured > > memory > > > > >>>> needs to > > > > >>>>>>>> trigger an early commit? > > > > >>>>>>>> > > > > >>>>>>>> 2. > > > > >>>>>>>> This is one of my big concerns. Ultimately, any approach > based > > > on > > > > >>>>>>> cracking > > > > >>>>>>>> open RocksDB internals and using it in ways it's not really > > > > designed > > > > >>>>>>> for is > > > > >>>>>>>> likely to have some unforseen performance or consistency > > issues. > > > > >>>>>>>> > > > > >>>>>>>> 3. > > > > >>>>>>>> What's your motivation for removing these early commits? > While > > > not > > > > >>>>>>> ideal, I > > > > >>>>>>>> think they're a decent compromise to ensure consistency > whilst > > > > >>>>>>> maintaining > > > > >>>>>>>> good and predictable performance. > > > > >>>>>>>> All 3 of your suggested ideas seem *very* complicated, and > > might > > > > >>>>>>> actually > > > > >>>>>>>> make behaviour less predictable for users as a consequence. > > > > >>>>>>>> > > > > >>>>>>>> I'm a bit concerned that the scope of this KIP is growing a > > bit > > > > out > > > > >>>> of > > > > >>>>>>>> control. While it's good to discuss ideas for future > > > > improvements, I > > > > >>>>>>> think > > > > >>>>>>>> it's important to narrow the scope down to a design that > > > achieves > > > > >>>> the > > > > >>>>>>> most > > > > >>>>>>>> pressing objectives (constant sized restorations during > dirty > > > > >>>>>>>> close/unexpected errors). Any design that this KIP produces > > can > > > > >>>>>>> ultimately > > > > >>>>>>>> be changed in the future, especially if the bulk of it is > > > internal > > > > >>>>>>>> behaviour. > > > > >>>>>>>> > > > > >>>>>>>> I'm going to spend some time next week trying to re-work the > > > > >>>> original > > > > >>>>>>>> WriteBatchWithIndex design to remove the newTransaction() > > > method, > > > > >>>> such > > > > >>>>>>> that > > > > >>>>>>>> it's just an implementation detail of RocksDBStore. That > way, > > if > > > > we > > > > >>>>>>> want to > > > > >>>>>>>> replace WBWI with something in the future, like the SST file > > > > >>>> management > > > > >>>>>>>> outlined by John, then we can do so with little/no API > > changes. > > > > >>>>>>>> > > > > >>>>>>>> Regards, > > > > >>>>>>>> > > > > >>>>>>>> Nick > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >>> > > > > > > > > > > > > > > >