Oh! One other concern I haven't mentioned: if we make IsolationLevel a query-time constraint, then we need to add support for READ_COMMITTED to InMemoryKeyValueStore too, which will require some changes to the implementation.
On Mon, 18 Sept 2023 at 17:24, Nick Telford <nick.telf...@gmail.com> wrote: > Hi everyone, > > I agree that having IsolationLevel be determined at query-time is the > ideal design, but there are a few sticking points: > > 1. > There needs to be some way to communicate the IsolationLevel down to the > RocksDBStore itself, so that the query can respect it. Since stores are > "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore, etc.), > we need some way to deliver that information to the bottom layer. For IQv2, > we can use the existing State#query() method, but IQv1 has no way to do > this. > > A simple approach, which would potentially open up other options, would be > to add something like: ReadOnlyKeyValueStore<K, V> > readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore (and > similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.). > > 2. > As mentioned above, RocksDB WriteBatches are not thread-safe, which causes > a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a > look at RocksDB Transactions[1], but they solve a very different problem, > and have the same thread-safety issue. > > One possible approach that I mentioned is chaining WriteBatches: every > time a new Interactive Query is received (i.e. readOnlyView, see above, > is called) we "freeze" the existing WriteBatch, and start a new one for new > writes. The Interactive Query queries the "chain" of previous WriteBatches > + the underlying database; while the StreamThread starts writing to the > *new* WriteBatch. On-commit, the StreamThread would write *all* > WriteBatches in the chain to the database (that have not yet been written). > > WriteBatches would be closed/freed only when they have been both > committed, and all open Interactive Queries on them have been closed. This > would require some reference counting. > > Obviously a drawback of this approach is the potential for increased > memory usage: if an Interactive Query is long-lived, for example by doing a > full scan over a large database, or even just pausing in the middle of an > iteration, then the existing chain of WriteBatches could be kept around for > a long time, potentially forever. > > -- > > A. > Going off on a tangent, it looks like in addition to supporting > READ_COMMITTED queries, we could go further and support REPEATABLE_READ > queries (i.e. where subsequent reads to the same key in the same > Interactive Query are guaranteed to yield the same value) by making use of > RocksDB Snapshots[2]. These are fairly lightweight, so the performance > impact is likely to be negligible, but they do require that the Interactive > Query session can be explicitly closed. > > This could be achieved if we made the above readOnlyView interface look > more like: > > interface ReadOnlyKeyValueView<K, V> implements ReadOnlyKeyValueStore<K, > V>, AutoCloseable {} > > interface ReadOnlyKeyValueStore<K, V> { > ... > ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel isolationLevel); > } > > But this would be a breaking change, as existing IQv1 queries are > guaranteed to never call store.close(), and therefore these would leak > memory under REPEATABLE_READ. > > B. > One thing that's notable: MyRocks states that they support READ_COMMITTED > and REPEATABLE_READ, but they make no mention of READ_UNCOMMITTED[3][4]. > This could be because doing so is technically difficult/impossible using > the primitives available in RocksDB. > > -- > > Lucas, to address your points: > > U1. > It's only "SHOULD" to permit alternative (i.e. non-RocksDB) > implementations of StateStore that do not support atomic writes. Obviously > in those cases, the guarantees Kafka Streams provides/expects would be > relaxed. Do you think we should require all implementations to support > atomic writes? > > U2. > Stores can support multiple IsolationLevels. As we've discussed above, the > ideal scenario would be to specify the IsolationLevel at query-time. > Failing that, I think the second-best approach is to define the > IsolationLevel for *all* queries based on the processing.mode, which is > what the default StateStoreContext#isolationLevel() achieves. Would you > prefer an alternative? > > While the existing implementation is equivalent to READ_UNCOMMITTED, this > can yield unexpected results/errors under EOS, if a transaction is rolled > back. While this would be a change in behaviour for users, it would look > more like a bug fix than a breaking change. That said, we *could* make it > configurable, and default to the existing behaviour (READ_UNCOMMITTED) > instead of inferring it from the processing.mode? > > N1, N2. > These were only primitives to avoid boxing costs, but since this is not a > performance sensitive area, it should be fine to change if that's desirable. > > N3. > It's because the store "manages its own offsets", which includes both > committing the offset, *and providing it* via getCommittedOffset(). > Personally, I think "managesOffsets" conveys this best, but I don't mind > changing it if the nomenclature is unclear. > > Sorry for the massive emails/essays! > -- > Nick > > 1: https://github.com/facebook/rocksdb/wiki/Transactions > 2: https://github.com/facebook/rocksdb/wiki/Snapshot > 3: https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation > 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/ > > On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy > <lbruts...@confluent.io.invalid> wrote: > >> Hi Nick, >> >> since I last read it in April, the KIP has become much cleaner and >> easier to read. Great work! >> >> It feels to me the last big open point is whether we can implement >> isolation level as a query parameter. I understand that there are >> implementation concerns, but as Colt says, it would be a great >> addition, and would also simplify the migration path for this change. >> Is the implementation problem you mentioned caused by the WriteBatch >> not having a notion of a snapshot, as the underlying DB iterator does? >> In that case, I am not sure a chain of WriteBatches as you propose >> would fully solve the problem, but maybe I didn't dig enough into the >> details to fully understand it. >> >> If it's not possible to implement it now, would it be an option to >> make sure in this KIP that we do not fully close the door on per-query >> isolation levels in the interface, as it may be possible to implement >> the missing primitives in RocksDB or Speedb in the future. >> >> Understanding: >> >> * U1) Why is it only "SHOULD" for changelogOffsets to be persisted >> atomically with the records? >> * U2) Don't understand the default implementation of `isolationLevel`. >> The isolation level should be a property of the underlying store, and >> not be defined by the default config? Existing stores probably don't >> guarantee READ_COMMITTED, so the default should be to return >> READ_UNCOMMITTED. >> >> Nits: >> * N1) Could `getComittedOffset` use an `OptionalLong` return type, to >> avoid the `null`? >> * N2) Could `apporixmateNumUncomittedBytes` use an `OptionalLong` >> return type, to avoid the `-1`? >> * N3) I don't understand why `managesOffsets` uses the 'manage' verb, >> whereas all other methods use the "commits" verb. I'd suggest >> `commitsOffsets`. >> >> Either way, it feels this KIP is very close to the finish line, I'm >> looking forward to seeing this in production! >> >> Cheers, >> Lucas >> >> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <c...@littlehorse.io> wrote: >> > >> > > Making IsolationLevel a query-time constraint, rather than linking it >> to >> > the processing.guarantee. >> > >> > As I understand it, would this allow even a user of EOS to control >> whether >> > reading committed or uncommitted records? If so, I am highly in favor of >> > this. >> > >> > I know that I was one of the early people to point out the current >> > shortcoming that IQ reads uncommitted records, but just this morning I >> > realized a pattern we use which means that (for certain queries) our >> system >> > needs to be able to read uncommitted records, which is the current >> behavior >> > of Kafka Streams in EOS.*** >> > >> > If IsolationLevel being a query-time decision allows for this, then that >> > would be amazing. I would also vote that the default behavior should be >> for >> > reading uncommitted records, because it is totally possible for a valid >> > application to depend on that behavior, and breaking it in a minor >> release >> > might be a bit strong. >> > >> > *** (Note, for the curious reader....) Our use-case/query pattern is a >> bit >> > complex, but reading "uncommitted" records is actually safe in our case >> > because processing is deterministic. Additionally, IQ being able to read >> > uncommitted records is crucial to enable "read your own writes" on our >> API: >> > Due to the deterministic processing, we send an "ack" to the client who >> > makes the request as soon as the processor processes the result. If they >> > can't read uncommitted records, they may receive a "201 - Created" >> > response, immediately followed by a "404 - Not Found" when doing a >> lookup >> > for the object they just created). >> > >> > Thanks, >> > Colt McNealy >> > >> > *Founder, LittleHorse.dev* >> > >> > >> > On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <nick.telf...@gmail.com> >> wrote: >> > >> > > Addendum: >> > > >> > > I think we would also face the same problem with the approach John >> outlined >> > > earlier (using the record cache as a transaction buffer and flushing >> it >> > > straight to SST files). This is because the record cache (the >> ThreadCache >> > > class) is not thread-safe, so every commit would invalidate open IQ >> > > Iterators in the same way that RocksDB WriteBatches do. >> > > -- >> > > Nick >> > > >> > > On Wed, 13 Sept 2023 at 16:58, Nick Telford <nick.telf...@gmail.com> >> > > wrote: >> > > >> > > > Hi Bruno, >> > > > >> > > > I've updated the KIP based on our conversation. The only things >> I've not >> > > > yet done are: >> > > > >> > > > 1. Using transactions under ALOS and EOS. >> > > > 2. Making IsolationLevel a query-time constraint, rather than >> linking it >> > > > to the processing.guarantee. >> > > > >> > > > There's a wrinkle that makes this a challenge: Interactive Queries >> that >> > > > open an Iterator, when using transactions and READ_UNCOMMITTED. >> > > > The problem is that under READ_UNCOMMITTED, queries need to be able >> to >> > > > read records from the currently uncommitted transaction buffer >> > > > (WriteBatch). This includes for Iterators, which should iterate >> both the >> > > > transaction buffer and underlying database (using >> > > > WriteBatch#iteratorWithBase()). >> > > > >> > > > The issue is that when the StreamThread commits, it writes the >> current >> > > > WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the >> > > > WriteBatch while an Interactive Query holds an open Iterator on it >> will >> > > > invalidate the Iterator. Worse, it turns out that Iterators over a >> > > > WriteBatch become invalidated not just when the WriteBatch is >> cleared, >> > > but >> > > > also when the Iterators' current key receives a new write. >> > > > >> > > > Now that I'm writing this, I remember that this is the major reason >> that >> > > I >> > > > switched the original design from having a query-time >> IsolationLevel to >> > > > having the IsolationLevel linked to the transactionality of the >> stores >> > > > themselves. >> > > > >> > > > It *might* be possible to resolve this, by having a "chain" of >> > > > WriteBatches, with the StreamThread switching to a new WriteBatch >> > > whenever >> > > > a new Interactive Query attempts to read from the database, but that >> > > could >> > > > cause some performance problems/memory pressure when subjected to a >> high >> > > > Interactive Query load. It would also reduce the efficiency of >> > > WriteBatches >> > > > on-commit, as we'd have to write N WriteBatches, where N is the >> number of >> > > > Interactive Queries since the last commit. >> > > > >> > > > I realise this is getting into the weeds of the implementation, and >> you'd >> > > > rather we focus on the API for now, but I think it's important to >> > > consider >> > > > how to implement the desired API, in case we come up with an API >> that >> > > > cannot be implemented efficiently, or even at all! >> > > > >> > > > Thoughts? >> > > > -- >> > > > Nick >> > > > >> > > > On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org> >> wrote: >> > > > >> > > >> Hi Nick, >> > > >> >> > > >> 6. >> > > >> Of course, you are right! My bad! >> > > >> Wiping out the state in the downgrading case is fine. >> > > >> >> > > >> >> > > >> 3a. >> > > >> Focus on the public facing changes for the KIP. We will manage to >> get >> > > >> the internals right. Regarding state stores that do not support >> > > >> READ_COMMITTED, they should throw an error stating that they do not >> > > >> support READ_COMMITTED. No need to adapt all state stores >> immediately. >> > > >> >> > > >> 3b. >> > > >> I am in favor of using transactions also for ALOS. >> > > >> >> > > >> >> > > >> Best, >> > > >> Bruno >> > > >> >> > > >> On 9/13/23 11:57 AM, Nick Telford wrote: >> > > >> > Hi Bruno, >> > > >> > >> > > >> > Thanks for getting back to me! >> > > >> > >> > > >> > 2. >> > > >> > The fact that implementations can always track estimated memory >> usage >> > > in >> > > >> > the wrapper is a good point. I can remove -1 as an option, and >> I'll >> > > >> clarify >> > > >> > the JavaDoc that 0 is not just for non-transactional stores, >> which is >> > > >> > currently misleading. >> > > >> > >> > > >> > 6. >> > > >> > The problem with catching the exception in the downgrade process >> is >> > > that >> > > >> > would require new code in the Kafka version being downgraded to. >> Since >> > > >> > users could conceivably downgrade to almost *any* older version >> of >> > > Kafka >> > > >> > Streams, I'm not sure how we could add that code? >> > > >> > The only way I can think of doing it would be to provide a >> dedicated >> > > >> > downgrade tool, that goes through every local store and removes >> the >> > > >> > offsets column families. But that seems like an unnecessary >> amount of >> > > >> extra >> > > >> > code to maintain just to handle a somewhat niche situation, when >> the >> > > >> > alternative (automatically wipe and restore stores) should be >> > > >> acceptable. >> > > >> > >> > > >> > 1, 4, 5: Agreed. I'll make the changes you've requested. >> > > >> > >> > > >> > 3a. >> > > >> > I agree that IsolationLevel makes more sense at query-time, and I >> > > >> actually >> > > >> > initially attempted to place the IsolationLevel at query-time, >> but I >> > > ran >> > > >> > into some problems: >> > > >> > - The key issue is that, under ALOS we're not staging writes in >> > > >> > transactions, so can't perform writes at the READ_COMMITTED >> isolation >> > > >> > level. However, this may be addressed if we decide to *always* >> use >> > > >> > transactions as discussed under 3b. >> > > >> > - IQv1 and IQv2 have quite different implementations. I remember >> > > having >> > > >> > some difficulty understanding the IQv1 internals, which made it >> > > >> difficult >> > > >> > to determine what needed to be changed. However, I *think* this >> can be >> > > >> > addressed for both implementations by wrapping the RocksDBStore >> in an >> > > >> > IsolationLevel-dependent wrapper, that overrides read methods >> (get, >> > > >> etc.) >> > > >> > to either read directly from the database or from the ongoing >> > > >> transaction. >> > > >> > But IQv1 might still be difficult. >> > > >> > - If IsolationLevel becomes a query constraint, then all other >> > > >> StateStores >> > > >> > will need to respect it, including the in-memory stores. This >> would >> > > >> require >> > > >> > us to adapt in-memory stores to stage their writes so they can be >> > > >> isolated >> > > >> > from READ_COMMITTTED queries. It would also become an important >> > > >> > consideration for third-party stores on upgrade, as without >> changes, >> > > >> they >> > > >> > would not support READ_COMMITTED queries correctly. >> > > >> > >> > > >> > Ultimately, I may need some help making the necessary change to >> IQv1 >> > > to >> > > >> > support this, but I don't think it's fundamentally impossible, >> if we >> > > >> want >> > > >> > to pursue this route. >> > > >> > >> > > >> > 3b. >> > > >> > The main reason I chose to keep ALOS un-transactional was to >> minimize >> > > >> > behavioural change for most users (I believe most Streams users >> use >> > > the >> > > >> > default configuration, which is ALOS). That said, it's clear >> that if >> > > >> ALOS >> > > >> > also used transactional stores, the only change in behaviour >> would be >> > > >> that >> > > >> > it would become *more correct*, which could be considered a "bug >> fix" >> > > by >> > > >> > users, rather than a change they need to handle. >> > > >> > >> > > >> > I believe that performance using transactions (aka. RocksDB >> > > >> WriteBatches) >> > > >> > should actually be *better* than the un-batched write-path that >> is >> > > >> > currently used[1]. The only "performance" consideration will be >> the >> > > >> > increased memory usage that transactions require. Given the >> > > mitigations >> > > >> for >> > > >> > this memory that we have in place, I would expect that this is >> not a >> > > >> > problem for most users. >> > > >> > >> > > >> > If we're happy to do so, we can make ALOS also use transactions. >> > > >> > >> > > >> > Regards, >> > > >> > Nick >> > > >> > >> > > >> > Link 1: >> > > >> > >> > > >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results >> > > >> > >> > > >> > On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org >> > >> > > >> wrote: >> > > >> > >> > > >> >> Hi Nick, >> > > >> >> >> > > >> >> Thanks for the updates and sorry for the delay on my side! >> > > >> >> >> > > >> >> >> > > >> >> 1. >> > > >> >> Making the default implementation for flush() a no-op sounds >> good to >> > > >> me. >> > > >> >> >> > > >> >> >> > > >> >> 2. >> > > >> >> I think what was bugging me here is that a third-party state >> store >> > > >> needs >> > > >> >> to implement the state store interface. That means they need to >> > > >> >> implement a wrapper around the actual state store as we do for >> > > RocksDB >> > > >> >> with RocksDBStore. So, a third-party state store can always >> estimate >> > > >> the >> > > >> >> uncommitted bytes, if it wants, because the wrapper can record >> the >> > > >> added >> > > >> >> bytes. >> > > >> >> One case I can think of where returning -1 makes sense is when >> > > Streams >> > > >> >> does not need to estimate the size of the write batch and >> trigger >> > > >> >> extraordinary commits, because the third-party state store >> takes care >> > > >> of >> > > >> >> memory. But in that case the method could also just return 0. >> Even >> > > that >> > > >> >> case would be better solved with a method that returns whether >> the >> > > >> state >> > > >> >> store manages itself the memory used for uncommitted bytes or >> not. >> > > >> >> Said that, I am fine with keeping the -1 return value, I was >> just >> > > >> >> wondering when and if it will be used. >> > > >> >> >> > > >> >> Regarding returning 0 for transactional state stores when the >> batch >> > > is >> > > >> >> empty, I was just wondering because you explicitly stated >> > > >> >> >> > > >> >> "or {@code 0} if this StateStore does not support transactions." >> > > >> >> >> > > >> >> So it seemed to me returning 0 could only happen for >> > > non-transactional >> > > >> >> state stores. >> > > >> >> >> > > >> >> >> > > >> >> 3. >> > > >> >> >> > > >> >> a) What do you think if we move the isolation level to IQ (v1 >> and >> > > v2)? >> > > >> >> In the end this is the only component that really needs to >> specify >> > > the >> > > >> >> isolation level. It is similar to the Kafka consumer that can >> choose >> > > >> >> with what isolation level to read the input topic. >> > > >> >> For IQv1 the isolation level should go into >> StoreQueryParameters. For >> > > >> >> IQv2, I would add it to the Query interface. >> > > >> >> >> > > >> >> b) Point a) raises the question what should happen during >> > > at-least-once >> > > >> >> processing when the state store does not use transactions? John >> in >> > > the >> > > >> >> past proposed to also use transactions on state stores for >> > > >> >> at-least-once. I like that idea, because it avoids aggregating >> the >> > > same >> > > >> >> records over and over again in the case of a failure. We had a >> case >> > > in >> > > >> >> the past where a Streams applications in at-least-once mode was >> > > failing >> > > >> >> continuously for some reasons I do not remember before >> committing the >> > > >> >> offsets. After each failover, the app aggregated again and >> again the >> > > >> >> same records. Of course the aggregate increased to very wrong >> values >> > > >> >> just because of the failover. With transactions on the state >> stores >> > > we >> > > >> >> could have avoided this. The app would have output the same >> aggregate >> > > >> >> multiple times (i.e., after each failover) but at least the >> value of >> > > >> the >> > > >> >> aggregate would not depend on the number of failovers. >> Outputting the >> > > >> >> same aggregate multiple times would be incorrect under >> exactly-once >> > > but >> > > >> >> it is OK for at-least-once. >> > > >> >> If it makes sense to add a config to turn on and off >> transactions on >> > > >> >> state stores under at-least-once or just use transactions in >> any case >> > > >> is >> > > >> >> a question we should also discuss in this KIP. It depends a bit >> on >> > > the >> > > >> >> performance trade-off. Maybe to be safe, I would add a config. >> > > >> >> >> > > >> >> >> > > >> >> 4. >> > > >> >> Your points are all valid. I tend to say to keep the metrics >> around >> > > >> >> flush() until we remove flush() completely from the interface. >> Calls >> > > to >> > > >> >> flush() might still exist since existing processors might still >> call >> > > >> >> flush() explicitly as you mentioned in 1). For sure, we need to >> > > >> document >> > > >> >> how the metrics change due to the transactions in the upgrade >> notes. >> > > >> >> >> > > >> >> >> > > >> >> 5. >> > > >> >> I see. Then you should describe how the .position files are >> handled >> > > in >> > > >> >> a dedicated section of the KIP or incorporate the description >> in the >> > > >> >> "Atomic Checkpointing" section instead of only mentioning it in >> the >> > > >> >> "Compatibility, Deprecation, and Migration Plan". >> > > >> >> >> > > >> >> >> > > >> >> 6. >> > > >> >> Describing upgrading and downgrading in the KIP is a good idea. >> > > >> >> Regarding downgrading, I think you could also catch the >> exception and >> > > >> do >> > > >> >> what is needed to downgrade, e.g., drop the column family. See >> here >> > > for >> > > >> >> an example: >> > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> > > >> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75 >> > > >> >> >> > > >> >> It is a bit brittle, but it works. >> > > >> >> >> > > >> >> >> > > >> >> Best, >> > > >> >> Bruno >> > > >> >> >> > > >> >> >> > > >> >> On 8/24/23 12:18 PM, Nick Telford wrote: >> > > >> >>> Hi Bruno, >> > > >> >>> >> > > >> >>> Thanks for taking the time to review the KIP. I'm back from >> leave >> > > now >> > > >> and >> > > >> >>> intend to move this forwards as quickly as I can. >> > > >> >>> >> > > >> >>> Addressing your points: >> > > >> >>> >> > > >> >>> 1. >> > > >> >>> Because flush() is part of the StateStore API, it's exposed to >> > > custom >> > > >> >>> Processors, which might be making calls to flush(). This was >> > > actually >> > > >> the >> > > >> >>> case in a few integration tests. >> > > >> >>> To maintain as much compatibility as possible, I'd prefer not >> to >> > > make >> > > >> >> this >> > > >> >>> an UnsupportedOperationException, as it will cause previously >> > > working >> > > >> >>> Processors to start throwing exceptions at runtime. >> > > >> >>> I agree that it doesn't make sense for it to proxy commit(), >> though, >> > > >> as >> > > >> >>> that would cause it to violate the "StateStores commit only >> when the >> > > >> Task >> > > >> >>> commits" rule. >> > > >> >>> Instead, I think we should make this a no-op. That way, >> existing >> > > user >> > > >> >>> Processors will continue to work as-before, without violation >> of >> > > store >> > > >> >>> consistency that would be caused by premature flush/commit of >> > > >> StateStore >> > > >> >>> data to disk. >> > > >> >>> What do you think? >> > > >> >>> >> > > >> >>> 2. >> > > >> >>> As stated in the JavaDoc, when a StateStore implementation is >> > > >> >>> transactional, but is unable to estimate the uncommitted memory >> > > usage, >> > > >> >> the >> > > >> >>> method will return -1. >> > > >> >>> The intention here is to permit third-party implementations >> that may >> > > >> not >> > > >> >> be >> > > >> >>> able to estimate memory usage. >> > > >> >>> >> > > >> >>> Yes, it will be 0 when nothing has been written to the store >> yet. I >> > > >> >> thought >> > > >> >>> that was implied by "This method will return an approximation >> of the >> > > >> >> memory >> > > >> >>> would be freed by the next call to {@link #commit(Map)}" and >> > > "@return >> > > >> The >> > > >> >>> approximate size of all records awaiting {@link #commit(Map)}", >> > > >> however, >> > > >> >> I >> > > >> >>> can add it explicitly to the JavaDoc if you think this is >> unclear? >> > > >> >>> >> > > >> >>> 3. >> > > >> >>> I realise this is probably the most contentious point in my >> design, >> > > >> and >> > > >> >> I'm >> > > >> >>> open to changing it if I'm unable to convince you of the >> benefits. >> > > >> >>> Nevertheless, here's my argument: >> > > >> >>> The Interactive Query (IQ) API(s) are directly provided >> StateStores >> > > to >> > > >> >>> query, and it may be important for users to programmatically >> know >> > > >> which >> > > >> >>> mode the StateStore is operating under. If we simply provide an >> > > >> >>> "eosEnabled" boolean (as used throughout the internal streams >> > > >> engine), or >> > > >> >>> similar, then users will need to understand the operation and >> > > >> >> consequences >> > > >> >>> of each available processing mode and how it pertains to their >> > > >> >> StateStore. >> > > >> >>> >> > > >> >>> Interactive Query users aren't the only people that care about >> the >> > > >> >>> processing.mode/IsolationLevel of a StateStore: implementers of >> > > custom >> > > >> >>> StateStores also need to understand the behaviour expected of >> their >> > > >> >>> implementation. KIP-892 introduces some assumptions into the >> Streams >> > > >> >> Engine >> > > >> >>> about how StateStores operate under each processing mode, and >> it's >> > > >> >>> important that custom implementations adhere to those >> assumptions in >> > > >> >> order >> > > >> >>> to maintain the consistency guarantees. >> > > >> >>> >> > > >> >>> IsolationLevels provide a high-level contract on the behaviour >> of >> > > the >> > > >> >>> StateStore: a user knows that under READ_COMMITTED, they will >> see >> > > >> writes >> > > >> >>> only after the Task has committed, and under READ_UNCOMMITTED >> they >> > > >> will >> > > >> >> see >> > > >> >>> writes immediately. No understanding of the details of each >> > > >> >> processing.mode >> > > >> >>> is required, either for IQ users or StateStore implementers. >> > > >> >>> >> > > >> >>> An argument can be made that these contractual guarantees can >> simply >> > > >> be >> > > >> >>> documented for the processing.mode (i.e. that exactly-once and >> > > >> >>> exactly-once-v2 behave like READ_COMMITTED and at-least-once >> behaves >> > > >> like >> > > >> >>> READ_UNCOMMITTED), but there are several small issues with >> this I'd >> > > >> >> prefer >> > > >> >>> to avoid: >> > > >> >>> >> > > >> >>> - Where would we document these contracts, in a way that >> is >> > > >> difficult >> > > >> >>> for users/implementers to miss/ignore? >> > > >> >>> - It's not clear to users that the processing mode is >> > > >> communicating >> > > >> >>> an expectation of read isolation, unless they read the >> > > >> >> documentation. Users >> > > >> >>> rarely consult documentation unless they feel they need >> to, so >> > > >> it's >> > > >> >> likely >> > > >> >>> this detail would get missed by many users. >> > > >> >>> - It tightly couples processing modes to read isolation. >> Adding >> > > >> new >> > > >> >>> processing modes, or changing the read isolation of >> existing >> > > >> >> processing >> > > >> >>> modes would be difficult/impossible. >> > > >> >>> >> > > >> >>> Ultimately, the cost of introducing IsolationLevels is just a >> single >> > > >> >>> method, since we re-use the existing IsolationLevel enum from >> Kafka. >> > > >> This >> > > >> >>> gives us a clear place to document the contractual guarantees >> > > expected >> > > >> >>> of/provided by StateStores, that is accessible both by the >> > > StateStore >> > > >> >>> itself, and by IQ users. >> > > >> >>> >> > > >> >>> (Writing this I've just realised that the StateStore and IQ >> APIs >> > > >> actually >> > > >> >>> don't provide access to StateStoreContext that IQ users would >> have >> > > >> direct >> > > >> >>> access to... Perhaps StateStore should expose isolationLevel() >> > > itself >> > > >> >> too?) >> > > >> >>> >> > > >> >>> 4. >> > > >> >>> Yeah, I'm not comfortable renaming the metrics in-place >> either, as >> > > >> it's a >> > > >> >>> backwards incompatible change. My concern is that, if we leave >> the >> > > >> >> existing >> > > >> >>> "flush" metrics in place, they will be confusing to users. >> Right >> > > now, >> > > >> >>> "flush" metrics record explicit flushes to disk, but under >> KIP-892, >> > > >> even >> > > >> >> a >> > > >> >>> commit() will not explicitly flush data to disk - RocksDB will >> > > decide >> > > >> on >> > > >> >>> when to flush memtables to disk itself. >> > > >> >>> >> > > >> >>> If we keep the existing "flush" metrics, we'd have two options, >> > > which >> > > >> >> both >> > > >> >>> seem pretty bad to me: >> > > >> >>> >> > > >> >>> 1. Have them record calls to commit(), which would be >> > > >> misleading, as >> > > >> >>> data is no longer explicitly "flushed" to disk by this >> call. >> > > >> >>> 2. Have them record nothing at all, which is equivalent to >> > > >> removing >> > > >> >> the >> > > >> >>> metrics, except that users will see the metric still >> exists and >> > > >> so >> > > >> >> assume >> > > >> >>> that the metric is correct, and that there's a problem >> with >> > > their >> > > >> >> system >> > > >> >>> when there isn't. >> > > >> >>> >> > > >> >>> I agree that removing them is also a bad solution, and I'd >> like some >> > > >> >>> guidance on the best path forward here. >> > > >> >>> >> > > >> >>> 5. >> > > >> >>> Position files are updated on every write to a StateStore. >> Since our >> > > >> >> writes >> > > >> >>> are now buffered until commit(), we can't update the Position >> file >> > > >> until >> > > >> >>> commit() has been called, otherwise it would be inconsistent >> with >> > > the >> > > >> >> data >> > > >> >>> in the event of a rollback. Consequently, we need to manage >> these >> > > >> offsets >> > > >> >>> the same way we manage the checkpoint offsets, and ensure >> they're >> > > only >> > > >> >>> written on commit(). >> > > >> >>> >> > > >> >>> 6. >> > > >> >>> Agreed, although I'm not exactly sure yet what tests to write. >> How >> > > >> >> explicit >> > > >> >>> do we need to be here in the KIP? >> > > >> >>> >> > > >> >>> As for upgrade/downgrade: upgrade is designed to be seamless, >> and we >> > > >> >> should >> > > >> >>> definitely add some tests around that. Downgrade, it >> transpires, >> > > isn't >> > > >> >>> currently possible, as the extra column family for offset >> storage is >> > > >> >>> incompatible with the pre-KIP-892 implementation: when you >> open a >> > > >> RocksDB >> > > >> >>> database, you must open all available column families or >> receive an >> > > >> >> error. >> > > >> >>> What currently happens on downgrade is that it attempts to >> open the >> > > >> >> store, >> > > >> >>> throws an error about the offsets column family not being >> opened, >> > > >> which >> > > >> >>> triggers a wipe and rebuild of the Task. Given that downgrades >> > > should >> > > >> be >> > > >> >>> uncommon, I think this is acceptable behaviour, as the >> end-state is >> > > >> >>> consistent, even if it results in an undesirable state restore. >> > > >> >>> >> > > >> >>> Should I document the upgrade/downgrade behaviour explicitly >> in the >> > > >> KIP? >> > > >> >>> >> > > >> >>> -- >> > > >> >>> >> > > >> >>> Regards, >> > > >> >>> Nick >> > > >> >>> >> > > >> >>> >> > > >> >>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna < >> cado...@apache.org> >> > > >> wrote: >> > > >> >>> >> > > >> >>>> Hi Nick! >> > > >> >>>> >> > > >> >>>> Thanks for the updates! >> > > >> >>>> >> > > >> >>>> 1. >> > > >> >>>> Why does StateStore#flush() default to >> > > >> >>>> StateStore#commit(Collections.emptyMap())? >> > > >> >>>> Since calls to flush() will not exist anymore after this KIP >> is >> > > >> >>>> released, I would rather throw an unsupported operation >> exception >> > > by >> > > >> >>>> default. >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> 2. >> > > >> >>>> When would a state store return -1 from >> > > >> >>>> StateStore#approximateNumUncommittedBytes() while being >> > > >> transactional? >> > > >> >>>> >> > > >> >>>> Wouldn't StateStore#approximateNumUncommittedBytes() also >> return 0 >> > > if >> > > >> >>>> the state store is transactional but nothing has been written >> to >> > > the >> > > >> >>>> state store yet? >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> 3. >> > > >> >>>> Sorry for bringing this up again. Does this KIP really need to >> > > >> introduce >> > > >> >>>> StateStoreContext#isolationLevel()? StateStoreContext has >> already >> > > >> >>>> appConfigs() which basically exposes the same information, >> i.e., if >> > > >> EOS >> > > >> >>>> is enabled or not. >> > > >> >>>> In one of your previous e-mails you wrote: >> > > >> >>>> >> > > >> >>>> "My idea was to try to keep the StateStore interface as >> loosely >> > > >> coupled >> > > >> >>>> from the Streams engine as possible, to give implementers more >> > > >> freedom, >> > > >> >>>> and reduce the amount of internal knowledge required." >> > > >> >>>> >> > > >> >>>> While I understand the intent, I doubt that it decreases the >> > > >> coupling of >> > > >> >>>> a StateStore interface and the Streams engine. READ_COMMITTED >> only >> > > >> >>>> applies to IQ but not to reads by processors. Thus, >> implementers >> > > >> need to >> > > >> >>>> understand how Streams accesses the state stores. >> > > >> >>>> >> > > >> >>>> I would like to hear what others think about this. >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> 4. >> > > >> >>>> Great exposing new metrics for transactional state stores! >> > > However, I >> > > >> >>>> would prefer to add new metrics and deprecate (in the docs) >> the old >> > > >> >>>> ones. You can find examples of deprecated metrics here: >> > > >> >>>> https://kafka.apache.org/documentation/#selector_monitoring >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> 5. >> > > >> >>>> Why does the KIP mention position files? I do not think they >> are >> > > >> related >> > > >> >>>> to transactions or flushes. >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> 6. >> > > >> >>>> I think we will also need to adapt/add integration tests >> besides >> > > unit >> > > >> >>>> tests. Additionally, we probably need integration or system >> tests >> > > to >> > > >> >>>> verify that upgrades and downgrades between transactional and >> > > >> >>>> non-transactional state stores work as expected. >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> Best, >> > > >> >>>> Bruno >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> >> > > >> >>>> On 7/21/23 10:34 PM, Nick Telford wrote: >> > > >> >>>>> One more thing: I noted John's suggestion in the KIP, under >> > > >> "Rejected >> > > >> >>>>> Alternatives". I still think it's an idea worth pursuing, >> but I >> > > >> believe >> > > >> >>>>> that it's out of the scope of this KIP, because it solves a >> > > >> different >> > > >> >> set >> > > >> >>>>> of problems to this KIP, and the scope of this one has >> already >> > > grown >> > > >> >>>> quite >> > > >> >>>>> large! >> > > >> >>>>> >> > > >> >>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford < >> > > nick.telf...@gmail.com> >> > > >> >>>> wrote: >> > > >> >>>>> >> > > >> >>>>>> Hi everyone, >> > > >> >>>>>> >> > > >> >>>>>> I've updated the KIP ( >> > > >> >>>>>> >> > > >> >>>> >> > > >> >> >> > > >> >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> > > >> >>>> ) >> > > >> >>>>>> with the latest changes; mostly bringing back "Atomic >> > > >> Checkpointing" >> > > >> >>>> (for >> > > >> >>>>>> what feels like the 10th time!). I think the one thing >> missing is >> > > >> some >> > > >> >>>>>> changes to metrics (notably the store "flush" metrics will >> need >> > > to >> > > >> be >> > > >> >>>>>> renamed to "commit"). >> > > >> >>>>>> >> > > >> >>>>>> The reason I brought back Atomic Checkpointing was to >> decouple >> > > >> store >> > > >> >>>> flush >> > > >> >>>>>> from store commit. This is important, because with >> Transactional >> > > >> >>>>>> StateStores, we now need to call "flush" on *every* Task >> commit, >> > > >> and >> > > >> >> not >> > > >> >>>>>> just when the StateStore is closing, otherwise our >> transaction >> > > >> buffer >> > > >> >>>> will >> > > >> >>>>>> never be written and persisted, instead growing unbounded! I >> > > >> >>>> experimented >> > > >> >>>>>> with some simple solutions, like forcing a store flush >> whenever >> > > the >> > > >> >>>>>> transaction buffer was likely to exceed its configured >> size, but >> > > >> this >> > > >> >>>> was >> > > >> >>>>>> brittle: it prevented the transaction buffer from being >> > > configured >> > > >> to >> > > >> >> be >> > > >> >>>>>> unbounded, and it still would have required explicit >> flushes of >> > > >> >> RocksDB, >> > > >> >>>>>> yielding sub-optimal performance and memory utilization. >> > > >> >>>>>> >> > > >> >>>>>> I deemed Atomic Checkpointing to be the "right" way to >> resolve >> > > this >> > > >> >>>>>> problem. By ensuring that the changelog offsets that >> correspond >> > > to >> > > >> the >> > > >> >>>> most >> > > >> >>>>>> recently written records are always atomically written to >> the >> > > >> >> StateStore >> > > >> >>>>>> (by writing them to the same transaction buffer), we can >> avoid >> > > >> >> forcibly >> > > >> >>>>>> flushing the RocksDB memtables to disk, letting RocksDB >> flush >> > > them >> > > >> >> only >> > > >> >>>>>> when necessary, without losing any of our consistency >> guarantees. >> > > >> See >> > > >> >>>> the >> > > >> >>>>>> updated KIP for more info. >> > > >> >>>>>> >> > > >> >>>>>> I have fully implemented these changes, although I'm still >> not >> > > >> >> entirely >> > > >> >>>>>> happy with the implementation for segmented StateStores, so >> I >> > > plan >> > > >> to >> > > >> >>>>>> refactor that. Despite that, all tests pass. If you'd like >> to try >> > > >> out >> > > >> >> or >> > > >> >>>>>> review this highly experimental and incomplete branch, it's >> > > >> available >> > > >> >>>> here: >> > > >> >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. >> Note: >> > > >> it's >> > > >> >>>> built >> > > >> >>>>>> against Kafka 3.5.0 so that I had a stable base to build >> and test >> > > >> it >> > > >> >> on, >> > > >> >>>>>> and to enable easy apples-to-apples comparisons in a live >> > > >> >> environment. I >> > > >> >>>>>> plan to rebase it against trunk once it's nearer completion >> and >> > > has >> > > >> >> been >> > > >> >>>>>> proven on our main application. >> > > >> >>>>>> >> > > >> >>>>>> I would really appreciate help in reviewing and testing: >> > > >> >>>>>> - Segmented (Versioned, Session and Window) stores >> > > >> >>>>>> - Global stores >> > > >> >>>>>> >> > > >> >>>>>> As I do not currently use either of these, so my primary >> test >> > > >> >>>> environment >> > > >> >>>>>> doesn't test these areas. >> > > >> >>>>>> >> > > >> >>>>>> I'm going on Parental Leave starting next week for a few >> weeks, >> > > so >> > > >> >> will >> > > >> >>>>>> not have time to move this forward until late August. That >> said, >> > > >> your >> > > >> >>>>>> feedback is welcome and appreciated, I just won't be able to >> > > >> respond >> > > >> >> as >> > > >> >>>>>> quickly as usual. >> > > >> >>>>>> >> > > >> >>>>>> Regards, >> > > >> >>>>>> Nick >> > > >> >>>>>> >> > > >> >>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford < >> > > nick.telf...@gmail.com> >> > > >> >>>> wrote: >> > > >> >>>>>> >> > > >> >>>>>>> Hi Bruno >> > > >> >>>>>>> >> > > >> >>>>>>> Yes, that's correct, although the impact on IQ is not >> something >> > > I >> > > >> had >> > > >> >>>>>>> considered. >> > > >> >>>>>>> >> > > >> >>>>>>> What about atomically updating the state store from the >> > > >> transaction >> > > >> >>>>>>>> buffer every commit interval and writing the checkpoint >> (thus, >> > > >> >>>> flushing >> > > >> >>>>>>>> the memtable) every configured amount of data and/or >> number of >> > > >> >> commit >> > > >> >>>>>>>> intervals? >> > > >> >>>>>>>> >> > > >> >>>>>>> >> > > >> >>>>>>> I'm not quite sure I follow. Are you suggesting that we >> add an >> > > >> >>>> additional >> > > >> >>>>>>> config for the max number of commit intervals between >> > > checkpoints? >> > > >> >> That >> > > >> >>>>>>> way, we would checkpoint *either* when the transaction >> buffers >> > > are >> > > >> >>>> nearly >> > > >> >>>>>>> full, *OR* whenever a certain number of commit intervals >> have >> > > >> >> elapsed, >> > > >> >>>>>>> whichever comes first? >> > > >> >>>>>>> >> > > >> >>>>>>> That certainly seems reasonable, although this re-ignites >> an >> > > >> earlier >> > > >> >>>>>>> debate about whether a config should be measured in >> "number of >> > > >> commit >> > > >> >>>>>>> intervals", instead of just an absolute time. >> > > >> >>>>>>> >> > > >> >>>>>>> FWIW, I realised that this issue is the reason I was >> pursuing >> > > the >> > > >> >>>> Atomic >> > > >> >>>>>>> Checkpoints, as it de-couples memtable flush from >> checkpointing, >> > > >> >> which >> > > >> >>>>>>> enables us to just checkpoint on every commit without any >> > > >> performance >> > > >> >>>>>>> impact. Atomic Checkpointing is definitely the "best" >> solution, >> > > >> but >> > > >> >>>> I'm not >> > > >> >>>>>>> sure if this is enough to bring it back into this KIP. >> > > >> >>>>>>> >> > > >> >>>>>>> I'm currently working on moving all the transactional logic >> > > >> directly >> > > >> >>>> into >> > > >> >>>>>>> RocksDBStore itself, which does away with the >> > > >> >> StateStore#newTransaction >> > > >> >>>>>>> method, and reduces the number of new classes introduced, >> > > >> >> significantly >> > > >> >>>>>>> reducing the complexity. If it works, and the complexity is >> > > >> >> drastically >> > > >> >>>>>>> reduced, I may try bringing back Atomic Checkpoints into >> this >> > > KIP. >> > > >> >>>>>>> >> > > >> >>>>>>> Regards, >> > > >> >>>>>>> Nick >> > > >> >>>>>>> >> > > >> >>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna < >> cado...@apache.org> >> > > >> >> wrote: >> > > >> >>>>>>> >> > > >> >>>>>>>> Hi Nick, >> > > >> >>>>>>>> >> > > >> >>>>>>>> Thanks for the insights! Very interesting! >> > > >> >>>>>>>> >> > > >> >>>>>>>> As far as I understand, you want to atomically update the >> state >> > > >> >> store >> > > >> >>>>>>>> from the transaction buffer, flush the memtable of a state >> > > store >> > > >> and >> > > >> >>>>>>>> write the checkpoint not after the commit time elapsed but >> > > after >> > > >> the >> > > >> >>>>>>>> transaction buffer reached a size that would lead to >> exceeding >> > > >> >>>>>>>> statestore.transaction.buffer.max.bytes before the next >> commit >> > > >> >>>> interval >> > > >> >>>>>>>> ends. >> > > >> >>>>>>>> That means, the Kafka transaction would commit every >> commit >> > > >> interval >> > > >> >>>> but >> > > >> >>>>>>>> the state store will only be atomically updated roughly >> every >> > > >> >>>>>>>> statestore.transaction.buffer.max.bytes of data. Also IQ >> would >> > > >> then >> > > >> >>>> only >> > > >> >>>>>>>> see new data roughly every >> > > >> statestore.transaction.buffer.max.bytes. >> > > >> >>>>>>>> After a failure the state store needs to restore up to >> > > >> >>>>>>>> statestore.transaction.buffer.max.bytes. >> > > >> >>>>>>>> >> > > >> >>>>>>>> Is this correct? >> > > >> >>>>>>>> >> > > >> >>>>>>>> What about atomically updating the state store from the >> > > >> transaction >> > > >> >>>>>>>> buffer every commit interval and writing the checkpoint >> (thus, >> > > >> >>>> flushing >> > > >> >>>>>>>> the memtable) every configured amount of data and/or >> number of >> > > >> >> commit >> > > >> >>>>>>>> intervals? In such a way, we would have the same delay for >> > > >> records >> > > >> >>>>>>>> appearing in output topics and IQ because both would >> appear >> > > when >> > > >> the >> > > >> >>>>>>>> Kafka transaction is committed. However, after a failure >> the >> > > >> state >> > > >> >>>> store >> > > >> >>>>>>>> still needs to restore up to >> > > >> statestore.transaction.buffer.max.bytes >> > > >> >>>> and >> > > >> >>>>>>>> it might restore data that is already in the state store >> > > because >> > > >> the >> > > >> >>>>>>>> checkpoint lags behind the last stable offset (i.e. the >> last >> > > >> >> committed >> > > >> >>>>>>>> offset) of the changelog topics. Restoring data that is >> already >> > > >> in >> > > >> >> the >> > > >> >>>>>>>> state store is idempotent, so eos should not violated. >> > > >> >>>>>>>> This solution needs at least one new config to specify >> when a >> > > >> >>>> checkpoint >> > > >> >>>>>>>> should be written. >> > > >> >>>>>>>> >> > > >> >>>>>>>> >> > > >> >>>>>>>> >> > > >> >>>>>>>> A small correction to your previous e-mail that does not >> change >> > > >> >>>> anything >> > > >> >>>>>>>> you said: Under alos the default commit interval is 30 >> seconds, >> > > >> not >> > > >> >>>> five >> > > >> >>>>>>>> seconds. >> > > >> >>>>>>>> >> > > >> >>>>>>>> >> > > >> >>>>>>>> Best, >> > > >> >>>>>>>> Bruno >> > > >> >>>>>>>> >> > > >> >>>>>>>> >> > > >> >>>>>>>> On 01.07.23 12:37, Nick Telford wrote: >> > > >> >>>>>>>>> Hi everyone, >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> I've begun performance testing my branch on our staging >> > > >> >> environment, >> > > >> >>>>>>>>> putting it through its paces in our non-trivial >> application. >> > > I'm >> > > >> >>>>>>>> already >> > > >> >>>>>>>>> observing the same increased flush rate that we saw the >> last >> > > >> time >> > > >> >> we >> > > >> >>>>>>>>> attempted to use a version of this KIP, but this time, I >> > > think I >> > > >> >> know >> > > >> >>>>>>>> why. >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called at >> the end >> > > >> of >> > > >> >> the >> > > >> >>>>>>>> Task >> > > >> >>>>>>>>> commit process, has the following behaviour: >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> - Under ALOS: checkpoint the state stores. This >> > > includes >> > > >> >>>>>>>>> flushing memtables in RocksDB. This is acceptable >> > > >> because the >> > > >> >>>>>>>> default >> > > >> >>>>>>>>> commit.interval.ms is 5 seconds, so forcibly >> flushing >> > > >> >> memtables >> > > >> >>>>>>>> every 5 >> > > >> >>>>>>>>> seconds is acceptable for most applications. >> > > >> >>>>>>>>> - Under EOS: checkpointing is not done, *unless* >> it's >> > > >> being >> > > >> >>>>>>>> forced, due >> > > >> >>>>>>>>> to e.g. the Task closing or being revoked. This >> means >> > > >> that >> > > >> >> under >> > > >> >>>>>>>> normal >> > > >> >>>>>>>>> processing conditions, the state stores will not >> be >> > > >> >>>> checkpointed, >> > > >> >>>>>>>> and will >> > > >> >>>>>>>>> not have memtables flushed at all , unless RocksDB >> > > >> decides to >> > > >> >>>>>>>> flush them on >> > > >> >>>>>>>>> its own. Checkpointing stores and force-flushing >> their >> > > >> >> memtables >> > > >> >>>>>>>> is only >> > > >> >>>>>>>>> done when a Task is being closed. >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at least >> > > *some* >> > > >> >>>> normal >> > > >> >>>>>>>>> Task commits, in order to write the RocksDB transaction >> > > buffers >> > > >> to >> > > >> >>>> the >> > > >> >>>>>>>>> state stores, and to ensure the offsets are synced to >> disk to >> > > >> >> prevent >> > > >> >>>>>>>>> restores from getting out of hand. Consequently, my >> current >> > > >> >>>>>>>> implementation >> > > >> >>>>>>>>> calls maybeCheckpoint on *every* Task commit, which is >> far too >> > > >> >>>>>>>> frequent. >> > > >> >>>>>>>>> This causes checkpoints every 10,000 records, which is a >> > > change >> > > >> in >> > > >> >>>>>>>> flush >> > > >> >>>>>>>>> behaviour, potentially causing performance problems for >> some >> > > >> >>>>>>>> applications. >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> I'm looking into possible solutions, and I'm currently >> leaning >> > > >> >>>> towards >> > > >> >>>>>>>>> using the statestore.transaction.buffer.max.bytes >> > > configuration >> > > >> to >> > > >> >>>>>>>>> checkpoint Tasks once we are likely to exceed it. This >> would >> > > >> >>>>>>>> complement the >> > > >> >>>>>>>>> existing "early Task commit" functionality that this >> > > >> configuration >> > > >> >>>>>>>>> provides, in the following way: >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> - Currently, we use >> > > >> statestore.transaction.buffer.max.bytes >> > > >> >> to >> > > >> >>>>>>>> force an >> > > >> >>>>>>>>> early Task commit if processing more records would >> > > cause >> > > >> our >> > > >> >>>> state >> > > >> >>>>>>>> store >> > > >> >>>>>>>>> transactions to exceed the memory assigned to >> them. >> > > >> >>>>>>>>> - New functionality: when a Task *does* commit, >> we will >> > > >> not >> > > >> >>>>>>>> checkpoint >> > > >> >>>>>>>>> the stores (and hence flush the transaction >> buffers) >> > > >> unless >> > > >> >> we >> > > >> >>>>>>>> expect to >> > > >> >>>>>>>>> cross the statestore.transaction.buffer.max.bytes >> > > >> threshold >> > > >> >>>> before >> > > >> >>>>>>>> the next >> > > >> >>>>>>>>> commit >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> I'm also open to suggestions. >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> Regards, >> > > >> >>>>>>>>> Nick >> > > >> >>>>>>>>> >> > > >> >>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford < >> > > >> nick.telf...@gmail.com >> > > >> >>> >> > > >> >>>>>>>> wrote: >> > > >> >>>>>>>>> >> > > >> >>>>>>>>>> Hi Bruno! >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>> 3. >> > > >> >>>>>>>>>> By "less predictable for users", I meant in terms of >> > > >> understanding >> > > >> >>>> the >> > > >> >>>>>>>>>> performance profile under various circumstances. The >> more >> > > >> complex >> > > >> >>>> the >> > > >> >>>>>>>>>> solution, the more difficult it would be for users to >> > > >> understand >> > > >> >> the >> > > >> >>>>>>>>>> performance they see. For example, spilling records to >> disk >> > > >> when >> > > >> >> the >> > > >> >>>>>>>>>> transaction buffer reaches a threshold would, I expect, >> > > reduce >> > > >> >> write >> > > >> >>>>>>>>>> throughput. This reduction in write throughput could be >> > > >> >> unexpected, >> > > >> >>>>>>>> and >> > > >> >>>>>>>>>> potentially difficult to diagnose/understand for users. >> > > >> >>>>>>>>>> At the moment, I think the "early commit" concept is >> > > relatively >> > > >> >>>>>>>>>> straightforward; it's easy to document, and conceptually >> > > fairly >> > > >> >>>>>>>> obvious to >> > > >> >>>>>>>>>> users. We could probably add a metric to make it easier >> to >> > > >> >>>> understand >> > > >> >>>>>>>> when >> > > >> >>>>>>>>>> it happens though. >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>> 3. (the second one) >> > > >> >>>>>>>>>> The IsolationLevel is *essentially* an indirect way of >> > > telling >> > > >> >>>>>>>> StateStores >> > > >> >>>>>>>>>> whether they should be transactional. READ_COMMITTED >> > > >> essentially >> > > >> >>>>>>>> requires >> > > >> >>>>>>>>>> transactions, because it dictates that two threads >> calling >> > > >> >>>>>>>>>> `newTransaction()` should not see writes from the other >> > > >> >> transaction >> > > >> >>>>>>>> until >> > > >> >>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all >> bets are >> > > >> off, >> > > >> >>>> and >> > > >> >>>>>>>>>> stores can allow threads to observe written records at >> any >> > > >> time, >> > > >> >>>>>>>> which is >> > > >> >>>>>>>>>> essentially "no transactions". That said, StateStores >> are >> > > free >> > > >> to >> > > >> >>>>>>>> implement >> > > >> >>>>>>>>>> these guarantees however they can, which is a bit more >> > > relaxed >> > > >> >> than >> > > >> >>>>>>>>>> dictating "you must use transactions". For example, with >> > > >> RocksDB >> > > >> >> we >> > > >> >>>>>>>> would >> > > >> >>>>>>>>>> implement these as READ_COMMITTED == WBWI-based >> > > "transactions", >> > > >> >>>>>>>>>> READ_UNCOMMITTED == direct writes to the database. But >> with >> > > >> other >> > > >> >>>>>>>> storage >> > > >> >>>>>>>>>> engines, it might be preferable to *always* use >> transactions, >> > > >> even >> > > >> >>>>>>>> when >> > > >> >>>>>>>>>> unnecessary; or there may be storage engines that don't >> > > provide >> > > >> >>>>>>>>>> transactions, but the isolation guarantees can be met >> using a >> > > >> >>>>>>>> different >> > > >> >>>>>>>>>> technique. >> > > >> >>>>>>>>>> My idea was to try to keep the StateStore interface as >> > > loosely >> > > >> >>>> coupled >> > > >> >>>>>>>>>> from the Streams engine as possible, to give >> implementers >> > > more >> > > >> >>>>>>>> freedom, and >> > > >> >>>>>>>>>> reduce the amount of internal knowledge required. >> > > >> >>>>>>>>>> That said, I understand that "IsolationLevel" might not >> be >> > > the >> > > >> >> right >> > > >> >>>>>>>>>> abstraction, and we can always make it much more >> explicit if >> > > >> >>>>>>>> required, e.g. >> > > >> >>>>>>>>>> boolean transactional() >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>> 7-8. >> > > >> >>>>>>>>>> I can make these changes either later today or tomorrow. >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>> Small update: >> > > >> >>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of >> issues >> > > >> that >> > > >> >>>>>>>> needed >> > > >> >>>>>>>>>> addressing. Currently, all the tests pass, which is >> > > promising, >> > > >> but >> > > >> >>>> it >> > > >> >>>>>>>> will >> > > >> >>>>>>>>>> need to undergo some performance testing. I haven't >> (yet) >> > > >> worked >> > > >> >> on >> > > >> >>>>>>>>>> removing the `newTransaction()` stuff, but I would >> expect >> > > that, >> > > >> >>>>>>>>>> behaviourally, it should make no difference. The branch >> is >> > > >> >> available >> > > >> >>>>>>>> at >> > > >> >>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if >> > > anyone >> > > >> is >> > > >> >>>>>>>>>> interested in taking an early look. >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>> Regards, >> > > >> >>>>>>>>>> Nick >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna < >> > > >> cado...@apache.org> >> > > >> >>>>>>>> wrote: >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>>>> Hi Nick, >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> 1. >> > > >> >>>>>>>>>>> Yeah, I agree with you. That was actually also my >> point. I >> > > >> >>>> understood >> > > >> >>>>>>>>>>> that John was proposing the ingestion path as a way to >> avoid >> > > >> the >> > > >> >>>>>>>> early >> > > >> >>>>>>>>>>> commits. Probably, I misinterpreted the intent. >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> 2. >> > > >> >>>>>>>>>>> I agree with John here, that actually it is public >> API. My >> > > >> >> question >> > > >> >>>>>>>> is >> > > >> >>>>>>>>>>> how this usage pattern affects normal processing. >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> 3. >> > > >> >>>>>>>>>>> My concern is that checking for the size of the >> transaction >> > > >> >> buffer >> > > >> >>>>>>>> and >> > > >> >>>>>>>>>>> maybe triggering an early commit affects the whole >> > > processing >> > > >> of >> > > >> >>>>>>>> Kafka >> > > >> >>>>>>>>>>> Streams. The transactionality of a state store is not >> > > >> confined to >> > > >> >>>> the >> > > >> >>>>>>>>>>> state store itself, but spills over and changes the >> behavior >> > > >> of >> > > >> >>>> other >> > > >> >>>>>>>>>>> parts of the system. I agree with you that it is a >> decent >> > > >> >>>>>>>> compromise. I >> > > >> >>>>>>>>>>> just wanted to analyse the downsides and list the >> options to >> > > >> >>>> overcome >> > > >> >>>>>>>>>>> them. I also agree with you that all options seem quite >> > > heavy >> > > >> >>>>>>>> compared >> > > >> >>>>>>>>>>> with your KIP. I do not understand what you mean with >> "less >> > > >> >>>>>>>> predictable >> > > >> >>>>>>>>>>> for users", though. >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> I found the discussions about the alternatives really >> > > >> >> interesting. >> > > >> >>>>>>>> But I >> > > >> >>>>>>>>>>> also think that your plan sounds good and we should >> continue >> > > >> with >> > > >> >>>> it! >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> Some comments on your reply to my e-mail on June 20th: >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> 3. >> > > >> >>>>>>>>>>> Ah, now, I understand the reasoning behind putting >> isolation >> > > >> >> level >> > > >> >>>> in >> > > >> >>>>>>>>>>> the state store context. Thanks! Should that also be a >> way >> > > to >> > > >> >> give >> > > >> >>>>>>>> the >> > > >> >>>>>>>>>>> the state store the opportunity to decide whether to >> turn on >> > > >> >>>>>>>>>>> transactions or not? >> > > >> >>>>>>>>>>> With my comment, I was more concerned about how do you >> know >> > > >> if a >> > > >> >>>>>>>>>>> checkpoint file needs to be written under EOS, if you >> do not >> > > >> >> have a >> > > >> >>>>>>>> way >> > > >> >>>>>>>>>>> to know if the state store is transactional or not. If >> a >> > > state >> > > >> >>>> store >> > > >> >>>>>>>> is >> > > >> >>>>>>>>>>> transactional, the checkpoint file can be written >> during >> > > >> normal >> > > >> >>>>>>>>>>> processing under EOS. If the state store is not >> > > transactional, >> > > >> >> the >> > > >> >>>>>>>>>>> checkpoint file must not be written under EOS. >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> 7. >> > > >> >>>>>>>>>>> My point was about not only considering the bytes in >> memory >> > > in >> > > >> >>>> config >> > > >> >>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes that >> might >> > > be >> > > >> >>>>>>>> spilled >> > > >> >>>>>>>>>>> on disk. Basically, I was wondering whether you should >> > > remove >> > > >> the >> > > >> >>>>>>>>>>> "memory" in "Maximum number of memory bytes to be used >> to >> > > >> >>>>>>>>>>> buffer uncommitted state-store records." My thinking >> was >> > > that >> > > >> >> even >> > > >> >>>>>>>> if a >> > > >> >>>>>>>>>>> state store spills uncommitted bytes to disk, limiting >> the >> > > >> >> overall >> > > >> >>>>>>>> bytes >> > > >> >>>>>>>>>>> might make sense. Thinking about it again and >> considering >> > > the >> > > >> >>>> recent >> > > >> >>>>>>>>>>> discussions, it does not make too much sense anymore. >> > > >> >>>>>>>>>>> I like the name >> statestore.transaction.buffer.max.bytes that >> > > >> you >> > > >> >>>>>>>> proposed. >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> 8. >> > > >> >>>>>>>>>>> A high-level description (without implementation >> details) of >> > > >> how >> > > >> >>>>>>>> Kafka >> > > >> >>>>>>>>>>> Streams will manage the commit of changelog >> transactions, >> > > >> state >> > > >> >>>> store >> > > >> >>>>>>>>>>> transactions and checkpointing would be great. Would be >> > > great >> > > >> if >> > > >> >>>> you >> > > >> >>>>>>>>>>> could also add some sentences about the behavior in >> case of >> > > a >> > > >> >>>>>>>> failure. >> > > >> >>>>>>>>>>> For instance how does a transactional state store >> recover >> > > >> after a >> > > >> >>>>>>>>>>> failure or what happens with the transaction buffer, >> etc. >> > > >> (that >> > > >> >> is >> > > >> >>>>>>>> what >> > > >> >>>>>>>>>>> I meant by "fail-over" in point 9.) >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> Best, >> > > >> >>>>>>>>>>> Bruno >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote: >> > > >> >>>>>>>>>>>> Hi Bruno, >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> 1. >> > > >> >>>>>>>>>>>> Isn't this exactly the same issue that >> WriteBatchWithIndex >> > > >> >>>>>>>> transactions >> > > >> >>>>>>>>>>>> have, whereby exceeding (or likely to exceed) >> configured >> > > >> memory >> > > >> >>>>>>>> needs to >> > > >> >>>>>>>>>>>> trigger an early commit? >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> 2. >> > > >> >>>>>>>>>>>> This is one of my big concerns. Ultimately, any >> approach >> > > >> based >> > > >> >> on >> > > >> >>>>>>>>>>> cracking >> > > >> >>>>>>>>>>>> open RocksDB internals and using it in ways it's not >> really >> > > >> >>>> designed >> > > >> >>>>>>>>>>> for is >> > > >> >>>>>>>>>>>> likely to have some unforseen performance or >> consistency >> > > >> issues. >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> 3. >> > > >> >>>>>>>>>>>> What's your motivation for removing these early >> commits? >> > > >> While >> > > >> >> not >> > > >> >>>>>>>>>>> ideal, I >> > > >> >>>>>>>>>>>> think they're a decent compromise to ensure >> consistency >> > > >> whilst >> > > >> >>>>>>>>>>> maintaining >> > > >> >>>>>>>>>>>> good and predictable performance. >> > > >> >>>>>>>>>>>> All 3 of your suggested ideas seem *very* >> complicated, and >> > > >> might >> > > >> >>>>>>>>>>> actually >> > > >> >>>>>>>>>>>> make behaviour less predictable for users as a >> consequence. >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is >> growing a >> > > >> bit >> > > >> >>>> out >> > > >> >>>>>>>> of >> > > >> >>>>>>>>>>>> control. While it's good to discuss ideas for future >> > > >> >>>> improvements, I >> > > >> >>>>>>>>>>> think >> > > >> >>>>>>>>>>>> it's important to narrow the scope down to a design >> that >> > > >> >> achieves >> > > >> >>>>>>>> the >> > > >> >>>>>>>>>>> most >> > > >> >>>>>>>>>>>> pressing objectives (constant sized restorations >> during >> > > dirty >> > > >> >>>>>>>>>>>> close/unexpected errors). Any design that this KIP >> produces >> > > >> can >> > > >> >>>>>>>>>>> ultimately >> > > >> >>>>>>>>>>>> be changed in the future, especially if the bulk of >> it is >> > > >> >> internal >> > > >> >>>>>>>>>>>> behaviour. >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> I'm going to spend some time next week trying to >> re-work >> > > the >> > > >> >>>>>>>> original >> > > >> >>>>>>>>>>>> WriteBatchWithIndex design to remove the >> newTransaction() >> > > >> >> method, >> > > >> >>>>>>>> such >> > > >> >>>>>>>>>>> that >> > > >> >>>>>>>>>>>> it's just an implementation detail of RocksDBStore. >> That >> > > >> way, if >> > > >> >>>> we >> > > >> >>>>>>>>>>> want to >> > > >> >>>>>>>>>>>> replace WBWI with something in the future, like the >> SST >> > > file >> > > >> >>>>>>>> management >> > > >> >>>>>>>>>>>> outlined by John, then we can do so with little/no API >> > > >> changes. >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> Regards, >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>>> Nick >> > > >> >>>>>>>>>>>> >> > > >> >>>>>>>>>>> >> > > >> >>>>>>>>>> >> > > >> >>>>>>>>> >> > > >> >>>>>>>> >> > > >> >>>>>>> >> > > >> >>>>> >> > > >> >>>> >> > > >> >>> >> > > >> >> >> > > >> > >> > > >> >> > > > >> > > >> >