Addendum: I think we would also face the same problem with the approach John outlined earlier (using the record cache as a transaction buffer and flushing it straight to SST files). This is because the record cache (the ThreadCache class) is not thread-safe, so every commit would invalidate open IQ Iterators in the same way that RocksDB WriteBatches do. -- Nick
On Wed, 13 Sept 2023 at 16:58, Nick Telford <nick.telf...@gmail.com> wrote: > Hi Bruno, > > I've updated the KIP based on our conversation. The only things I've not > yet done are: > > 1. Using transactions under ALOS and EOS. > 2. Making IsolationLevel a query-time constraint, rather than linking it > to the processing.guarantee. > > There's a wrinkle that makes this a challenge: Interactive Queries that > open an Iterator, when using transactions and READ_UNCOMMITTED. > The problem is that under READ_UNCOMMITTED, queries need to be able to > read records from the currently uncommitted transaction buffer > (WriteBatch). This includes for Iterators, which should iterate both the > transaction buffer and underlying database (using > WriteBatch#iteratorWithBase()). > > The issue is that when the StreamThread commits, it writes the current > WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the > WriteBatch while an Interactive Query holds an open Iterator on it will > invalidate the Iterator. Worse, it turns out that Iterators over a > WriteBatch become invalidated not just when the WriteBatch is cleared, but > also when the Iterators' current key receives a new write. > > Now that I'm writing this, I remember that this is the major reason that I > switched the original design from having a query-time IsolationLevel to > having the IsolationLevel linked to the transactionality of the stores > themselves. > > It *might* be possible to resolve this, by having a "chain" of > WriteBatches, with the StreamThread switching to a new WriteBatch whenever > a new Interactive Query attempts to read from the database, but that could > cause some performance problems/memory pressure when subjected to a high > Interactive Query load. It would also reduce the efficiency of WriteBatches > on-commit, as we'd have to write N WriteBatches, where N is the number of > Interactive Queries since the last commit. > > I realise this is getting into the weeds of the implementation, and you'd > rather we focus on the API for now, but I think it's important to consider > how to implement the desired API, in case we come up with an API that > cannot be implemented efficiently, or even at all! > > Thoughts? > -- > Nick > > On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org> wrote: > >> Hi Nick, >> >> 6. >> Of course, you are right! My bad! >> Wiping out the state in the downgrading case is fine. >> >> >> 3a. >> Focus on the public facing changes for the KIP. We will manage to get >> the internals right. Regarding state stores that do not support >> READ_COMMITTED, they should throw an error stating that they do not >> support READ_COMMITTED. No need to adapt all state stores immediately. >> >> 3b. >> I am in favor of using transactions also for ALOS. >> >> >> Best, >> Bruno >> >> On 9/13/23 11:57 AM, Nick Telford wrote: >> > Hi Bruno, >> > >> > Thanks for getting back to me! >> > >> > 2. >> > The fact that implementations can always track estimated memory usage in >> > the wrapper is a good point. I can remove -1 as an option, and I'll >> clarify >> > the JavaDoc that 0 is not just for non-transactional stores, which is >> > currently misleading. >> > >> > 6. >> > The problem with catching the exception in the downgrade process is that >> > would require new code in the Kafka version being downgraded to. Since >> > users could conceivably downgrade to almost *any* older version of Kafka >> > Streams, I'm not sure how we could add that code? >> > The only way I can think of doing it would be to provide a dedicated >> > downgrade tool, that goes through every local store and removes the >> > offsets column families. But that seems like an unnecessary amount of >> extra >> > code to maintain just to handle a somewhat niche situation, when the >> > alternative (automatically wipe and restore stores) should be >> acceptable. >> > >> > 1, 4, 5: Agreed. I'll make the changes you've requested. >> > >> > 3a. >> > I agree that IsolationLevel makes more sense at query-time, and I >> actually >> > initially attempted to place the IsolationLevel at query-time, but I ran >> > into some problems: >> > - The key issue is that, under ALOS we're not staging writes in >> > transactions, so can't perform writes at the READ_COMMITTED isolation >> > level. However, this may be addressed if we decide to *always* use >> > transactions as discussed under 3b. >> > - IQv1 and IQv2 have quite different implementations. I remember having >> > some difficulty understanding the IQv1 internals, which made it >> difficult >> > to determine what needed to be changed. However, I *think* this can be >> > addressed for both implementations by wrapping the RocksDBStore in an >> > IsolationLevel-dependent wrapper, that overrides read methods (get, >> etc.) >> > to either read directly from the database or from the ongoing >> transaction. >> > But IQv1 might still be difficult. >> > - If IsolationLevel becomes a query constraint, then all other >> StateStores >> > will need to respect it, including the in-memory stores. This would >> require >> > us to adapt in-memory stores to stage their writes so they can be >> isolated >> > from READ_COMMITTTED queries. It would also become an important >> > consideration for third-party stores on upgrade, as without changes, >> they >> > would not support READ_COMMITTED queries correctly. >> > >> > Ultimately, I may need some help making the necessary change to IQv1 to >> > support this, but I don't think it's fundamentally impossible, if we >> want >> > to pursue this route. >> > >> > 3b. >> > The main reason I chose to keep ALOS un-transactional was to minimize >> > behavioural change for most users (I believe most Streams users use the >> > default configuration, which is ALOS). That said, it's clear that if >> ALOS >> > also used transactional stores, the only change in behaviour would be >> that >> > it would become *more correct*, which could be considered a "bug fix" by >> > users, rather than a change they need to handle. >> > >> > I believe that performance using transactions (aka. RocksDB >> WriteBatches) >> > should actually be *better* than the un-batched write-path that is >> > currently used[1]. The only "performance" consideration will be the >> > increased memory usage that transactions require. Given the mitigations >> for >> > this memory that we have in place, I would expect that this is not a >> > problem for most users. >> > >> > If we're happy to do so, we can make ALOS also use transactions. >> > >> > Regards, >> > Nick >> > >> > Link 1: >> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results >> > >> > On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org> >> wrote: >> > >> >> Hi Nick, >> >> >> >> Thanks for the updates and sorry for the delay on my side! >> >> >> >> >> >> 1. >> >> Making the default implementation for flush() a no-op sounds good to >> me. >> >> >> >> >> >> 2. >> >> I think what was bugging me here is that a third-party state store >> needs >> >> to implement the state store interface. That means they need to >> >> implement a wrapper around the actual state store as we do for RocksDB >> >> with RocksDBStore. So, a third-party state store can always estimate >> the >> >> uncommitted bytes, if it wants, because the wrapper can record the >> added >> >> bytes. >> >> One case I can think of where returning -1 makes sense is when Streams >> >> does not need to estimate the size of the write batch and trigger >> >> extraordinary commits, because the third-party state store takes care >> of >> >> memory. But in that case the method could also just return 0. Even that >> >> case would be better solved with a method that returns whether the >> state >> >> store manages itself the memory used for uncommitted bytes or not. >> >> Said that, I am fine with keeping the -1 return value, I was just >> >> wondering when and if it will be used. >> >> >> >> Regarding returning 0 for transactional state stores when the batch is >> >> empty, I was just wondering because you explicitly stated >> >> >> >> "or {@code 0} if this StateStore does not support transactions." >> >> >> >> So it seemed to me returning 0 could only happen for non-transactional >> >> state stores. >> >> >> >> >> >> 3. >> >> >> >> a) What do you think if we move the isolation level to IQ (v1 and v2)? >> >> In the end this is the only component that really needs to specify the >> >> isolation level. It is similar to the Kafka consumer that can choose >> >> with what isolation level to read the input topic. >> >> For IQv1 the isolation level should go into StoreQueryParameters. For >> >> IQv2, I would add it to the Query interface. >> >> >> >> b) Point a) raises the question what should happen during at-least-once >> >> processing when the state store does not use transactions? John in the >> >> past proposed to also use transactions on state stores for >> >> at-least-once. I like that idea, because it avoids aggregating the same >> >> records over and over again in the case of a failure. We had a case in >> >> the past where a Streams applications in at-least-once mode was failing >> >> continuously for some reasons I do not remember before committing the >> >> offsets. After each failover, the app aggregated again and again the >> >> same records. Of course the aggregate increased to very wrong values >> >> just because of the failover. With transactions on the state stores we >> >> could have avoided this. The app would have output the same aggregate >> >> multiple times (i.e., after each failover) but at least the value of >> the >> >> aggregate would not depend on the number of failovers. Outputting the >> >> same aggregate multiple times would be incorrect under exactly-once but >> >> it is OK for at-least-once. >> >> If it makes sense to add a config to turn on and off transactions on >> >> state stores under at-least-once or just use transactions in any case >> is >> >> a question we should also discuss in this KIP. It depends a bit on the >> >> performance trade-off. Maybe to be safe, I would add a config. >> >> >> >> >> >> 4. >> >> Your points are all valid. I tend to say to keep the metrics around >> >> flush() until we remove flush() completely from the interface. Calls to >> >> flush() might still exist since existing processors might still call >> >> flush() explicitly as you mentioned in 1). For sure, we need to >> document >> >> how the metrics change due to the transactions in the upgrade notes. >> >> >> >> >> >> 5. >> >> I see. Then you should describe how the .position files are handled in >> >> a dedicated section of the KIP or incorporate the description in the >> >> "Atomic Checkpointing" section instead of only mentioning it in the >> >> "Compatibility, Deprecation, and Migration Plan". >> >> >> >> >> >> 6. >> >> Describing upgrading and downgrading in the KIP is a good idea. >> >> Regarding downgrading, I think you could also catch the exception and >> do >> >> what is needed to downgrade, e.g., drop the column family. See here for >> >> an example: >> >> >> >> >> >> >> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75 >> >> >> >> It is a bit brittle, but it works. >> >> >> >> >> >> Best, >> >> Bruno >> >> >> >> >> >> On 8/24/23 12:18 PM, Nick Telford wrote: >> >>> Hi Bruno, >> >>> >> >>> Thanks for taking the time to review the KIP. I'm back from leave now >> and >> >>> intend to move this forwards as quickly as I can. >> >>> >> >>> Addressing your points: >> >>> >> >>> 1. >> >>> Because flush() is part of the StateStore API, it's exposed to custom >> >>> Processors, which might be making calls to flush(). This was actually >> the >> >>> case in a few integration tests. >> >>> To maintain as much compatibility as possible, I'd prefer not to make >> >> this >> >>> an UnsupportedOperationException, as it will cause previously working >> >>> Processors to start throwing exceptions at runtime. >> >>> I agree that it doesn't make sense for it to proxy commit(), though, >> as >> >>> that would cause it to violate the "StateStores commit only when the >> Task >> >>> commits" rule. >> >>> Instead, I think we should make this a no-op. That way, existing user >> >>> Processors will continue to work as-before, without violation of store >> >>> consistency that would be caused by premature flush/commit of >> StateStore >> >>> data to disk. >> >>> What do you think? >> >>> >> >>> 2. >> >>> As stated in the JavaDoc, when a StateStore implementation is >> >>> transactional, but is unable to estimate the uncommitted memory usage, >> >> the >> >>> method will return -1. >> >>> The intention here is to permit third-party implementations that may >> not >> >> be >> >>> able to estimate memory usage. >> >>> >> >>> Yes, it will be 0 when nothing has been written to the store yet. I >> >> thought >> >>> that was implied by "This method will return an approximation of the >> >> memory >> >>> would be freed by the next call to {@link #commit(Map)}" and "@return >> The >> >>> approximate size of all records awaiting {@link #commit(Map)}", >> however, >> >> I >> >>> can add it explicitly to the JavaDoc if you think this is unclear? >> >>> >> >>> 3. >> >>> I realise this is probably the most contentious point in my design, >> and >> >> I'm >> >>> open to changing it if I'm unable to convince you of the benefits. >> >>> Nevertheless, here's my argument: >> >>> The Interactive Query (IQ) API(s) are directly provided StateStores to >> >>> query, and it may be important for users to programmatically know >> which >> >>> mode the StateStore is operating under. If we simply provide an >> >>> "eosEnabled" boolean (as used throughout the internal streams >> engine), or >> >>> similar, then users will need to understand the operation and >> >> consequences >> >>> of each available processing mode and how it pertains to their >> >> StateStore. >> >>> >> >>> Interactive Query users aren't the only people that care about the >> >>> processing.mode/IsolationLevel of a StateStore: implementers of custom >> >>> StateStores also need to understand the behaviour expected of their >> >>> implementation. KIP-892 introduces some assumptions into the Streams >> >> Engine >> >>> about how StateStores operate under each processing mode, and it's >> >>> important that custom implementations adhere to those assumptions in >> >> order >> >>> to maintain the consistency guarantees. >> >>> >> >>> IsolationLevels provide a high-level contract on the behaviour of the >> >>> StateStore: a user knows that under READ_COMMITTED, they will see >> writes >> >>> only after the Task has committed, and under READ_UNCOMMITTED they >> will >> >> see >> >>> writes immediately. No understanding of the details of each >> >> processing.mode >> >>> is required, either for IQ users or StateStore implementers. >> >>> >> >>> An argument can be made that these contractual guarantees can simply >> be >> >>> documented for the processing.mode (i.e. that exactly-once and >> >>> exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves >> like >> >>> READ_UNCOMMITTED), but there are several small issues with this I'd >> >> prefer >> >>> to avoid: >> >>> >> >>> - Where would we document these contracts, in a way that is >> difficult >> >>> for users/implementers to miss/ignore? >> >>> - It's not clear to users that the processing mode is >> communicating >> >>> an expectation of read isolation, unless they read the >> >> documentation. Users >> >>> rarely consult documentation unless they feel they need to, so >> it's >> >> likely >> >>> this detail would get missed by many users. >> >>> - It tightly couples processing modes to read isolation. Adding >> new >> >>> processing modes, or changing the read isolation of existing >> >> processing >> >>> modes would be difficult/impossible. >> >>> >> >>> Ultimately, the cost of introducing IsolationLevels is just a single >> >>> method, since we re-use the existing IsolationLevel enum from Kafka. >> This >> >>> gives us a clear place to document the contractual guarantees expected >> >>> of/provided by StateStores, that is accessible both by the StateStore >> >>> itself, and by IQ users. >> >>> >> >>> (Writing this I've just realised that the StateStore and IQ APIs >> actually >> >>> don't provide access to StateStoreContext that IQ users would have >> direct >> >>> access to... Perhaps StateStore should expose isolationLevel() itself >> >> too?) >> >>> >> >>> 4. >> >>> Yeah, I'm not comfortable renaming the metrics in-place either, as >> it's a >> >>> backwards incompatible change. My concern is that, if we leave the >> >> existing >> >>> "flush" metrics in place, they will be confusing to users. Right now, >> >>> "flush" metrics record explicit flushes to disk, but under KIP-892, >> even >> >> a >> >>> commit() will not explicitly flush data to disk - RocksDB will decide >> on >> >>> when to flush memtables to disk itself. >> >>> >> >>> If we keep the existing "flush" metrics, we'd have two options, which >> >> both >> >>> seem pretty bad to me: >> >>> >> >>> 1. Have them record calls to commit(), which would be >> misleading, as >> >>> data is no longer explicitly "flushed" to disk by this call. >> >>> 2. Have them record nothing at all, which is equivalent to >> removing >> >> the >> >>> metrics, except that users will see the metric still exists and >> so >> >> assume >> >>> that the metric is correct, and that there's a problem with their >> >> system >> >>> when there isn't. >> >>> >> >>> I agree that removing them is also a bad solution, and I'd like some >> >>> guidance on the best path forward here. >> >>> >> >>> 5. >> >>> Position files are updated on every write to a StateStore. Since our >> >> writes >> >>> are now buffered until commit(), we can't update the Position file >> until >> >>> commit() has been called, otherwise it would be inconsistent with the >> >> data >> >>> in the event of a rollback. Consequently, we need to manage these >> offsets >> >>> the same way we manage the checkpoint offsets, and ensure they're only >> >>> written on commit(). >> >>> >> >>> 6. >> >>> Agreed, although I'm not exactly sure yet what tests to write. How >> >> explicit >> >>> do we need to be here in the KIP? >> >>> >> >>> As for upgrade/downgrade: upgrade is designed to be seamless, and we >> >> should >> >>> definitely add some tests around that. Downgrade, it transpires, isn't >> >>> currently possible, as the extra column family for offset storage is >> >>> incompatible with the pre-KIP-892 implementation: when you open a >> RocksDB >> >>> database, you must open all available column families or receive an >> >> error. >> >>> What currently happens on downgrade is that it attempts to open the >> >> store, >> >>> throws an error about the offsets column family not being opened, >> which >> >>> triggers a wipe and rebuild of the Task. Given that downgrades should >> be >> >>> uncommon, I think this is acceptable behaviour, as the end-state is >> >>> consistent, even if it results in an undesirable state restore. >> >>> >> >>> Should I document the upgrade/downgrade behaviour explicitly in the >> KIP? >> >>> >> >>> -- >> >>> >> >>> Regards, >> >>> Nick >> >>> >> >>> >> >>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <cado...@apache.org> >> wrote: >> >>> >> >>>> Hi Nick! >> >>>> >> >>>> Thanks for the updates! >> >>>> >> >>>> 1. >> >>>> Why does StateStore#flush() default to >> >>>> StateStore#commit(Collections.emptyMap())? >> >>>> Since calls to flush() will not exist anymore after this KIP is >> >>>> released, I would rather throw an unsupported operation exception by >> >>>> default. >> >>>> >> >>>> >> >>>> 2. >> >>>> When would a state store return -1 from >> >>>> StateStore#approximateNumUncommittedBytes() while being >> transactional? >> >>>> >> >>>> Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if >> >>>> the state store is transactional but nothing has been written to the >> >>>> state store yet? >> >>>> >> >>>> >> >>>> 3. >> >>>> Sorry for bringing this up again. Does this KIP really need to >> introduce >> >>>> StateStoreContext#isolationLevel()? StateStoreContext has already >> >>>> appConfigs() which basically exposes the same information, i.e., if >> EOS >> >>>> is enabled or not. >> >>>> In one of your previous e-mails you wrote: >> >>>> >> >>>> "My idea was to try to keep the StateStore interface as loosely >> coupled >> >>>> from the Streams engine as possible, to give implementers more >> freedom, >> >>>> and reduce the amount of internal knowledge required." >> >>>> >> >>>> While I understand the intent, I doubt that it decreases the >> coupling of >> >>>> a StateStore interface and the Streams engine. READ_COMMITTED only >> >>>> applies to IQ but not to reads by processors. Thus, implementers >> need to >> >>>> understand how Streams accesses the state stores. >> >>>> >> >>>> I would like to hear what others think about this. >> >>>> >> >>>> >> >>>> 4. >> >>>> Great exposing new metrics for transactional state stores! However, I >> >>>> would prefer to add new metrics and deprecate (in the docs) the old >> >>>> ones. You can find examples of deprecated metrics here: >> >>>> https://kafka.apache.org/documentation/#selector_monitoring >> >>>> >> >>>> >> >>>> 5. >> >>>> Why does the KIP mention position files? I do not think they are >> related >> >>>> to transactions or flushes. >> >>>> >> >>>> >> >>>> 6. >> >>>> I think we will also need to adapt/add integration tests besides unit >> >>>> tests. Additionally, we probably need integration or system tests to >> >>>> verify that upgrades and downgrades between transactional and >> >>>> non-transactional state stores work as expected. >> >>>> >> >>>> >> >>>> Best, >> >>>> Bruno >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> On 7/21/23 10:34 PM, Nick Telford wrote: >> >>>>> One more thing: I noted John's suggestion in the KIP, under >> "Rejected >> >>>>> Alternatives". I still think it's an idea worth pursuing, but I >> believe >> >>>>> that it's out of the scope of this KIP, because it solves a >> different >> >> set >> >>>>> of problems to this KIP, and the scope of this one has already grown >> >>>> quite >> >>>>> large! >> >>>>> >> >>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com> >> >>>> wrote: >> >>>>> >> >>>>>> Hi everyone, >> >>>>>> >> >>>>>> I've updated the KIP ( >> >>>>>> >> >>>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> >>>> ) >> >>>>>> with the latest changes; mostly bringing back "Atomic >> Checkpointing" >> >>>> (for >> >>>>>> what feels like the 10th time!). I think the one thing missing is >> some >> >>>>>> changes to metrics (notably the store "flush" metrics will need to >> be >> >>>>>> renamed to "commit"). >> >>>>>> >> >>>>>> The reason I brought back Atomic Checkpointing was to decouple >> store >> >>>> flush >> >>>>>> from store commit. This is important, because with Transactional >> >>>>>> StateStores, we now need to call "flush" on *every* Task commit, >> and >> >> not >> >>>>>> just when the StateStore is closing, otherwise our transaction >> buffer >> >>>> will >> >>>>>> never be written and persisted, instead growing unbounded! I >> >>>> experimented >> >>>>>> with some simple solutions, like forcing a store flush whenever the >> >>>>>> transaction buffer was likely to exceed its configured size, but >> this >> >>>> was >> >>>>>> brittle: it prevented the transaction buffer from being configured >> to >> >> be >> >>>>>> unbounded, and it still would have required explicit flushes of >> >> RocksDB, >> >>>>>> yielding sub-optimal performance and memory utilization. >> >>>>>> >> >>>>>> I deemed Atomic Checkpointing to be the "right" way to resolve this >> >>>>>> problem. By ensuring that the changelog offsets that correspond to >> the >> >>>> most >> >>>>>> recently written records are always atomically written to the >> >> StateStore >> >>>>>> (by writing them to the same transaction buffer), we can avoid >> >> forcibly >> >>>>>> flushing the RocksDB memtables to disk, letting RocksDB flush them >> >> only >> >>>>>> when necessary, without losing any of our consistency guarantees. >> See >> >>>> the >> >>>>>> updated KIP for more info. >> >>>>>> >> >>>>>> I have fully implemented these changes, although I'm still not >> >> entirely >> >>>>>> happy with the implementation for segmented StateStores, so I plan >> to >> >>>>>> refactor that. Despite that, all tests pass. If you'd like to try >> out >> >> or >> >>>>>> review this highly experimental and incomplete branch, it's >> available >> >>>> here: >> >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: >> it's >> >>>> built >> >>>>>> against Kafka 3.5.0 so that I had a stable base to build and test >> it >> >> on, >> >>>>>> and to enable easy apples-to-apples comparisons in a live >> >> environment. I >> >>>>>> plan to rebase it against trunk once it's nearer completion and has >> >> been >> >>>>>> proven on our main application. >> >>>>>> >> >>>>>> I would really appreciate help in reviewing and testing: >> >>>>>> - Segmented (Versioned, Session and Window) stores >> >>>>>> - Global stores >> >>>>>> >> >>>>>> As I do not currently use either of these, so my primary test >> >>>> environment >> >>>>>> doesn't test these areas. >> >>>>>> >> >>>>>> I'm going on Parental Leave starting next week for a few weeks, so >> >> will >> >>>>>> not have time to move this forward until late August. That said, >> your >> >>>>>> feedback is welcome and appreciated, I just won't be able to >> respond >> >> as >> >>>>>> quickly as usual. >> >>>>>> >> >>>>>> Regards, >> >>>>>> Nick >> >>>>>> >> >>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com> >> >>>> wrote: >> >>>>>> >> >>>>>>> Hi Bruno >> >>>>>>> >> >>>>>>> Yes, that's correct, although the impact on IQ is not something I >> had >> >>>>>>> considered. >> >>>>>>> >> >>>>>>> What about atomically updating the state store from the >> transaction >> >>>>>>>> buffer every commit interval and writing the checkpoint (thus, >> >>>> flushing >> >>>>>>>> the memtable) every configured amount of data and/or number of >> >> commit >> >>>>>>>> intervals? >> >>>>>>>> >> >>>>>>> >> >>>>>>> I'm not quite sure I follow. Are you suggesting that we add an >> >>>> additional >> >>>>>>> config for the max number of commit intervals between checkpoints? >> >> That >> >>>>>>> way, we would checkpoint *either* when the transaction buffers are >> >>>> nearly >> >>>>>>> full, *OR* whenever a certain number of commit intervals have >> >> elapsed, >> >>>>>>> whichever comes first? >> >>>>>>> >> >>>>>>> That certainly seems reasonable, although this re-ignites an >> earlier >> >>>>>>> debate about whether a config should be measured in "number of >> commit >> >>>>>>> intervals", instead of just an absolute time. >> >>>>>>> >> >>>>>>> FWIW, I realised that this issue is the reason I was pursuing the >> >>>> Atomic >> >>>>>>> Checkpoints, as it de-couples memtable flush from checkpointing, >> >> which >> >>>>>>> enables us to just checkpoint on every commit without any >> performance >> >>>>>>> impact. Atomic Checkpointing is definitely the "best" solution, >> but >> >>>> I'm not >> >>>>>>> sure if this is enough to bring it back into this KIP. >> >>>>>>> >> >>>>>>> I'm currently working on moving all the transactional logic >> directly >> >>>> into >> >>>>>>> RocksDBStore itself, which does away with the >> >> StateStore#newTransaction >> >>>>>>> method, and reduces the number of new classes introduced, >> >> significantly >> >>>>>>> reducing the complexity. If it works, and the complexity is >> >> drastically >> >>>>>>> reduced, I may try bringing back Atomic Checkpoints into this KIP. >> >>>>>>> >> >>>>>>> Regards, >> >>>>>>> Nick >> >>>>>>> >> >>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org> >> >> wrote: >> >>>>>>> >> >>>>>>>> Hi Nick, >> >>>>>>>> >> >>>>>>>> Thanks for the insights! Very interesting! >> >>>>>>>> >> >>>>>>>> As far as I understand, you want to atomically update the state >> >> store >> >>>>>>>> from the transaction buffer, flush the memtable of a state store >> and >> >>>>>>>> write the checkpoint not after the commit time elapsed but after >> the >> >>>>>>>> transaction buffer reached a size that would lead to exceeding >> >>>>>>>> statestore.transaction.buffer.max.bytes before the next commit >> >>>> interval >> >>>>>>>> ends. >> >>>>>>>> That means, the Kafka transaction would commit every commit >> interval >> >>>> but >> >>>>>>>> the state store will only be atomically updated roughly every >> >>>>>>>> statestore.transaction.buffer.max.bytes of data. Also IQ would >> then >> >>>> only >> >>>>>>>> see new data roughly every >> statestore.transaction.buffer.max.bytes. >> >>>>>>>> After a failure the state store needs to restore up to >> >>>>>>>> statestore.transaction.buffer.max.bytes. >> >>>>>>>> >> >>>>>>>> Is this correct? >> >>>>>>>> >> >>>>>>>> What about atomically updating the state store from the >> transaction >> >>>>>>>> buffer every commit interval and writing the checkpoint (thus, >> >>>> flushing >> >>>>>>>> the memtable) every configured amount of data and/or number of >> >> commit >> >>>>>>>> intervals? In such a way, we would have the same delay for >> records >> >>>>>>>> appearing in output topics and IQ because both would appear when >> the >> >>>>>>>> Kafka transaction is committed. However, after a failure the >> state >> >>>> store >> >>>>>>>> still needs to restore up to >> statestore.transaction.buffer.max.bytes >> >>>> and >> >>>>>>>> it might restore data that is already in the state store because >> the >> >>>>>>>> checkpoint lags behind the last stable offset (i.e. the last >> >> committed >> >>>>>>>> offset) of the changelog topics. Restoring data that is already >> in >> >> the >> >>>>>>>> state store is idempotent, so eos should not violated. >> >>>>>>>> This solution needs at least one new config to specify when a >> >>>> checkpoint >> >>>>>>>> should be written. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> A small correction to your previous e-mail that does not change >> >>>> anything >> >>>>>>>> you said: Under alos the default commit interval is 30 seconds, >> not >> >>>> five >> >>>>>>>> seconds. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Bruno >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 01.07.23 12:37, Nick Telford wrote: >> >>>>>>>>> Hi everyone, >> >>>>>>>>> >> >>>>>>>>> I've begun performance testing my branch on our staging >> >> environment, >> >>>>>>>>> putting it through its paces in our non-trivial application. I'm >> >>>>>>>> already >> >>>>>>>>> observing the same increased flush rate that we saw the last >> time >> >> we >> >>>>>>>>> attempted to use a version of this KIP, but this time, I think I >> >> know >> >>>>>>>> why. >> >>>>>>>>> >> >>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end >> of >> >> the >> >>>>>>>> Task >> >>>>>>>>> commit process, has the following behaviour: >> >>>>>>>>> >> >>>>>>>>> - Under ALOS: checkpoint the state stores. This includes >> >>>>>>>>> flushing memtables in RocksDB. This is acceptable >> because the >> >>>>>>>> default >> >>>>>>>>> commit.interval.ms is 5 seconds, so forcibly flushing >> >> memtables >> >>>>>>>> every 5 >> >>>>>>>>> seconds is acceptable for most applications. >> >>>>>>>>> - Under EOS: checkpointing is not done, *unless* it's >> being >> >>>>>>>> forced, due >> >>>>>>>>> to e.g. the Task closing or being revoked. This means >> that >> >> under >> >>>>>>>> normal >> >>>>>>>>> processing conditions, the state stores will not be >> >>>> checkpointed, >> >>>>>>>> and will >> >>>>>>>>> not have memtables flushed at all , unless RocksDB >> decides to >> >>>>>>>> flush them on >> >>>>>>>>> its own. Checkpointing stores and force-flushing their >> >> memtables >> >>>>>>>> is only >> >>>>>>>>> done when a Task is being closed. >> >>>>>>>>> >> >>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at least *some* >> >>>> normal >> >>>>>>>>> Task commits, in order to write the RocksDB transaction buffers >> to >> >>>> the >> >>>>>>>>> state stores, and to ensure the offsets are synced to disk to >> >> prevent >> >>>>>>>>> restores from getting out of hand. Consequently, my current >> >>>>>>>> implementation >> >>>>>>>>> calls maybeCheckpoint on *every* Task commit, which is far too >> >>>>>>>> frequent. >> >>>>>>>>> This causes checkpoints every 10,000 records, which is a change >> in >> >>>>>>>> flush >> >>>>>>>>> behaviour, potentially causing performance problems for some >> >>>>>>>> applications. >> >>>>>>>>> >> >>>>>>>>> I'm looking into possible solutions, and I'm currently leaning >> >>>> towards >> >>>>>>>>> using the statestore.transaction.buffer.max.bytes configuration >> to >> >>>>>>>>> checkpoint Tasks once we are likely to exceed it. This would >> >>>>>>>> complement the >> >>>>>>>>> existing "early Task commit" functionality that this >> configuration >> >>>>>>>>> provides, in the following way: >> >>>>>>>>> >> >>>>>>>>> - Currently, we use >> statestore.transaction.buffer.max.bytes >> >> to >> >>>>>>>> force an >> >>>>>>>>> early Task commit if processing more records would cause >> our >> >>>> state >> >>>>>>>> store >> >>>>>>>>> transactions to exceed the memory assigned to them. >> >>>>>>>>> - New functionality: when a Task *does* commit, we will >> not >> >>>>>>>> checkpoint >> >>>>>>>>> the stores (and hence flush the transaction buffers) >> unless >> >> we >> >>>>>>>> expect to >> >>>>>>>>> cross the statestore.transaction.buffer.max.bytes >> threshold >> >>>> before >> >>>>>>>> the next >> >>>>>>>>> commit >> >>>>>>>>> >> >>>>>>>>> I'm also open to suggestions. >> >>>>>>>>> >> >>>>>>>>> Regards, >> >>>>>>>>> Nick >> >>>>>>>>> >> >>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford < >> nick.telf...@gmail.com >> >>> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Hi Bruno! >> >>>>>>>>>> >> >>>>>>>>>> 3. >> >>>>>>>>>> By "less predictable for users", I meant in terms of >> understanding >> >>>> the >> >>>>>>>>>> performance profile under various circumstances. The more >> complex >> >>>> the >> >>>>>>>>>> solution, the more difficult it would be for users to >> understand >> >> the >> >>>>>>>>>> performance they see. For example, spilling records to disk >> when >> >> the >> >>>>>>>>>> transaction buffer reaches a threshold would, I expect, reduce >> >> write >> >>>>>>>>>> throughput. This reduction in write throughput could be >> >> unexpected, >> >>>>>>>> and >> >>>>>>>>>> potentially difficult to diagnose/understand for users. >> >>>>>>>>>> At the moment, I think the "early commit" concept is relatively >> >>>>>>>>>> straightforward; it's easy to document, and conceptually fairly >> >>>>>>>> obvious to >> >>>>>>>>>> users. We could probably add a metric to make it easier to >> >>>> understand >> >>>>>>>> when >> >>>>>>>>>> it happens though. >> >>>>>>>>>> >> >>>>>>>>>> 3. (the second one) >> >>>>>>>>>> The IsolationLevel is *essentially* an indirect way of telling >> >>>>>>>> StateStores >> >>>>>>>>>> whether they should be transactional. READ_COMMITTED >> essentially >> >>>>>>>> requires >> >>>>>>>>>> transactions, because it dictates that two threads calling >> >>>>>>>>>> `newTransaction()` should not see writes from the other >> >> transaction >> >>>>>>>> until >> >>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all bets are >> off, >> >>>> and >> >>>>>>>>>> stores can allow threads to observe written records at any >> time, >> >>>>>>>> which is >> >>>>>>>>>> essentially "no transactions". That said, StateStores are free >> to >> >>>>>>>> implement >> >>>>>>>>>> these guarantees however they can, which is a bit more relaxed >> >> than >> >>>>>>>>>> dictating "you must use transactions". For example, with >> RocksDB >> >> we >> >>>>>>>> would >> >>>>>>>>>> implement these as READ_COMMITTED == WBWI-based "transactions", >> >>>>>>>>>> READ_UNCOMMITTED == direct writes to the database. But with >> other >> >>>>>>>> storage >> >>>>>>>>>> engines, it might be preferable to *always* use transactions, >> even >> >>>>>>>> when >> >>>>>>>>>> unnecessary; or there may be storage engines that don't provide >> >>>>>>>>>> transactions, but the isolation guarantees can be met using a >> >>>>>>>> different >> >>>>>>>>>> technique. >> >>>>>>>>>> My idea was to try to keep the StateStore interface as loosely >> >>>> coupled >> >>>>>>>>>> from the Streams engine as possible, to give implementers more >> >>>>>>>> freedom, and >> >>>>>>>>>> reduce the amount of internal knowledge required. >> >>>>>>>>>> That said, I understand that "IsolationLevel" might not be the >> >> right >> >>>>>>>>>> abstraction, and we can always make it much more explicit if >> >>>>>>>> required, e.g. >> >>>>>>>>>> boolean transactional() >> >>>>>>>>>> >> >>>>>>>>>> 7-8. >> >>>>>>>>>> I can make these changes either later today or tomorrow. >> >>>>>>>>>> >> >>>>>>>>>> Small update: >> >>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of issues >> that >> >>>>>>>> needed >> >>>>>>>>>> addressing. Currently, all the tests pass, which is promising, >> but >> >>>> it >> >>>>>>>> will >> >>>>>>>>>> need to undergo some performance testing. I haven't (yet) >> worked >> >> on >> >>>>>>>>>> removing the `newTransaction()` stuff, but I would expect that, >> >>>>>>>>>> behaviourally, it should make no difference. The branch is >> >> available >> >>>>>>>> at >> >>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone >> is >> >>>>>>>>>> interested in taking an early look. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Nick >> >>>>>>>>>> >> >>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna < >> cado...@apache.org> >> >>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi Nick, >> >>>>>>>>>>> >> >>>>>>>>>>> 1. >> >>>>>>>>>>> Yeah, I agree with you. That was actually also my point. I >> >>>> understood >> >>>>>>>>>>> that John was proposing the ingestion path as a way to avoid >> the >> >>>>>>>> early >> >>>>>>>>>>> commits. Probably, I misinterpreted the intent. >> >>>>>>>>>>> >> >>>>>>>>>>> 2. >> >>>>>>>>>>> I agree with John here, that actually it is public API. My >> >> question >> >>>>>>>> is >> >>>>>>>>>>> how this usage pattern affects normal processing. >> >>>>>>>>>>> >> >>>>>>>>>>> 3. >> >>>>>>>>>>> My concern is that checking for the size of the transaction >> >> buffer >> >>>>>>>> and >> >>>>>>>>>>> maybe triggering an early commit affects the whole processing >> of >> >>>>>>>> Kafka >> >>>>>>>>>>> Streams. The transactionality of a state store is not >> confined to >> >>>> the >> >>>>>>>>>>> state store itself, but spills over and changes the behavior >> of >> >>>> other >> >>>>>>>>>>> parts of the system. I agree with you that it is a decent >> >>>>>>>> compromise. I >> >>>>>>>>>>> just wanted to analyse the downsides and list the options to >> >>>> overcome >> >>>>>>>>>>> them. I also agree with you that all options seem quite heavy >> >>>>>>>> compared >> >>>>>>>>>>> with your KIP. I do not understand what you mean with "less >> >>>>>>>> predictable >> >>>>>>>>>>> for users", though. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> I found the discussions about the alternatives really >> >> interesting. >> >>>>>>>> But I >> >>>>>>>>>>> also think that your plan sounds good and we should continue >> with >> >>>> it! >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> Some comments on your reply to my e-mail on June 20th: >> >>>>>>>>>>> >> >>>>>>>>>>> 3. >> >>>>>>>>>>> Ah, now, I understand the reasoning behind putting isolation >> >> level >> >>>> in >> >>>>>>>>>>> the state store context. Thanks! Should that also be a way to >> >> give >> >>>>>>>> the >> >>>>>>>>>>> the state store the opportunity to decide whether to turn on >> >>>>>>>>>>> transactions or not? >> >>>>>>>>>>> With my comment, I was more concerned about how do you know >> if a >> >>>>>>>>>>> checkpoint file needs to be written under EOS, if you do not >> >> have a >> >>>>>>>> way >> >>>>>>>>>>> to know if the state store is transactional or not. If a state >> >>>> store >> >>>>>>>> is >> >>>>>>>>>>> transactional, the checkpoint file can be written during >> normal >> >>>>>>>>>>> processing under EOS. If the state store is not transactional, >> >> the >> >>>>>>>>>>> checkpoint file must not be written under EOS. >> >>>>>>>>>>> >> >>>>>>>>>>> 7. >> >>>>>>>>>>> My point was about not only considering the bytes in memory in >> >>>> config >> >>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes that might be >> >>>>>>>> spilled >> >>>>>>>>>>> on disk. Basically, I was wondering whether you should remove >> the >> >>>>>>>>>>> "memory" in "Maximum number of memory bytes to be used to >> >>>>>>>>>>> buffer uncommitted state-store records." My thinking was that >> >> even >> >>>>>>>> if a >> >>>>>>>>>>> state store spills uncommitted bytes to disk, limiting the >> >> overall >> >>>>>>>> bytes >> >>>>>>>>>>> might make sense. Thinking about it again and considering the >> >>>> recent >> >>>>>>>>>>> discussions, it does not make too much sense anymore. >> >>>>>>>>>>> I like the name statestore.transaction.buffer.max.bytes that >> you >> >>>>>>>> proposed. >> >>>>>>>>>>> >> >>>>>>>>>>> 8. >> >>>>>>>>>>> A high-level description (without implementation details) of >> how >> >>>>>>>> Kafka >> >>>>>>>>>>> Streams will manage the commit of changelog transactions, >> state >> >>>> store >> >>>>>>>>>>> transactions and checkpointing would be great. Would be great >> if >> >>>> you >> >>>>>>>>>>> could also add some sentences about the behavior in case of a >> >>>>>>>> failure. >> >>>>>>>>>>> For instance how does a transactional state store recover >> after a >> >>>>>>>>>>> failure or what happens with the transaction buffer, etc. >> (that >> >> is >> >>>>>>>> what >> >>>>>>>>>>> I meant by "fail-over" in point 9.) >> >>>>>>>>>>> >> >>>>>>>>>>> Best, >> >>>>>>>>>>> Bruno >> >>>>>>>>>>> >> >>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote: >> >>>>>>>>>>>> Hi Bruno, >> >>>>>>>>>>>> >> >>>>>>>>>>>> 1. >> >>>>>>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex >> >>>>>>>> transactions >> >>>>>>>>>>>> have, whereby exceeding (or likely to exceed) configured >> memory >> >>>>>>>> needs to >> >>>>>>>>>>>> trigger an early commit? >> >>>>>>>>>>>> >> >>>>>>>>>>>> 2. >> >>>>>>>>>>>> This is one of my big concerns. Ultimately, any approach >> based >> >> on >> >>>>>>>>>>> cracking >> >>>>>>>>>>>> open RocksDB internals and using it in ways it's not really >> >>>> designed >> >>>>>>>>>>> for is >> >>>>>>>>>>>> likely to have some unforseen performance or consistency >> issues. >> >>>>>>>>>>>> >> >>>>>>>>>>>> 3. >> >>>>>>>>>>>> What's your motivation for removing these early commits? >> While >> >> not >> >>>>>>>>>>> ideal, I >> >>>>>>>>>>>> think they're a decent compromise to ensure consistency >> whilst >> >>>>>>>>>>> maintaining >> >>>>>>>>>>>> good and predictable performance. >> >>>>>>>>>>>> All 3 of your suggested ideas seem *very* complicated, and >> might >> >>>>>>>>>>> actually >> >>>>>>>>>>>> make behaviour less predictable for users as a consequence. >> >>>>>>>>>>>> >> >>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is growing a >> bit >> >>>> out >> >>>>>>>> of >> >>>>>>>>>>>> control. While it's good to discuss ideas for future >> >>>> improvements, I >> >>>>>>>>>>> think >> >>>>>>>>>>>> it's important to narrow the scope down to a design that >> >> achieves >> >>>>>>>> the >> >>>>>>>>>>> most >> >>>>>>>>>>>> pressing objectives (constant sized restorations during dirty >> >>>>>>>>>>>> close/unexpected errors). Any design that this KIP produces >> can >> >>>>>>>>>>> ultimately >> >>>>>>>>>>>> be changed in the future, especially if the bulk of it is >> >> internal >> >>>>>>>>>>>> behaviour. >> >>>>>>>>>>>> >> >>>>>>>>>>>> I'm going to spend some time next week trying to re-work the >> >>>>>>>> original >> >>>>>>>>>>>> WriteBatchWithIndex design to remove the newTransaction() >> >> method, >> >>>>>>>> such >> >>>>>>>>>>> that >> >>>>>>>>>>>> it's just an implementation detail of RocksDBStore. That >> way, if >> >>>> we >> >>>>>>>>>>> want to >> >>>>>>>>>>>> replace WBWI with something in the future, like the SST file >> >>>>>>>> management >> >>>>>>>>>>>> outlined by John, then we can do so with little/no API >> changes. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Regards, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Nick >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >