Hi Bruno, Agreed, I can live with that for now.
In an effort to keep the scope of this KIP from expanding, I'm leaning towards just providing a configurable default.state.isolation.level and removing IsolationLevel from the StateStoreContext. This would be compatible with adding support for query-time IsolationLevels in the future, whilst providing a way for users to select an isolation level now. The big problem with this, however, is that if a user selects processing.mode = "exactly-once(-v2|-beta)", and default.state.isolation.level = "READ_UNCOMMITTED", we need to guarantee that the data isn't written to disk until commit() is called, but we also need to permit IQ threads to read from the ongoing transaction. A simple solution would be to (temporarily) forbid this combination of configuration, and have default.state.isolation.level automatically switch to READ_COMMITTED when processing.mode is anything other than at-least-once. Do you think this would be acceptable? In a later KIP, we can add support for query-time isolation levels and solve this particular problem there, which would relax this restriction. Regards, Nick On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <cado...@apache.org> wrote: > Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think > it is perfectly valid to say InMemoryKeyValueStore do not support > READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default > at the moment. > > Best, > Bruno > > On 9/18/23 7:12 PM, Nick Telford wrote: > > Oh! One other concern I haven't mentioned: if we make IsolationLevel a > > query-time constraint, then we need to add support for READ_COMMITTED to > > InMemoryKeyValueStore too, which will require some changes to the > > implementation. > > > > On Mon, 18 Sept 2023 at 17:24, Nick Telford <nick.telf...@gmail.com> > wrote: > > > >> Hi everyone, > >> > >> I agree that having IsolationLevel be determined at query-time is the > >> ideal design, but there are a few sticking points: > >> > >> 1. > >> There needs to be some way to communicate the IsolationLevel down to the > >> RocksDBStore itself, so that the query can respect it. Since stores are > >> "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore, > etc.), > >> we need some way to deliver that information to the bottom layer. For > IQv2, > >> we can use the existing State#query() method, but IQv1 has no way to do > >> this. > >> > >> A simple approach, which would potentially open up other options, would > be > >> to add something like: ReadOnlyKeyValueStore<K, V> > >> readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore > (and > >> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.). > >> > >> 2. > >> As mentioned above, RocksDB WriteBatches are not thread-safe, which > causes > >> a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a > >> look at RocksDB Transactions[1], but they solve a very different > problem, > >> and have the same thread-safety issue. > >> > >> One possible approach that I mentioned is chaining WriteBatches: every > >> time a new Interactive Query is received (i.e. readOnlyView, see above, > >> is called) we "freeze" the existing WriteBatch, and start a new one for > new > >> writes. The Interactive Query queries the "chain" of previous > WriteBatches > >> + the underlying database; while the StreamThread starts writing to the > >> *new* WriteBatch. On-commit, the StreamThread would write *all* > >> WriteBatches in the chain to the database (that have not yet been > written). > >> > >> WriteBatches would be closed/freed only when they have been both > >> committed, and all open Interactive Queries on them have been closed. > This > >> would require some reference counting. > >> > >> Obviously a drawback of this approach is the potential for increased > >> memory usage: if an Interactive Query is long-lived, for example by > doing a > >> full scan over a large database, or even just pausing in the middle of > an > >> iteration, then the existing chain of WriteBatches could be kept around > for > >> a long time, potentially forever. > >> > >> -- > >> > >> A. > >> Going off on a tangent, it looks like in addition to supporting > >> READ_COMMITTED queries, we could go further and support REPEATABLE_READ > >> queries (i.e. where subsequent reads to the same key in the same > >> Interactive Query are guaranteed to yield the same value) by making use > of > >> RocksDB Snapshots[2]. These are fairly lightweight, so the performance > >> impact is likely to be negligible, but they do require that the > Interactive > >> Query session can be explicitly closed. > >> > >> This could be achieved if we made the above readOnlyView interface look > >> more like: > >> > >> interface ReadOnlyKeyValueView<K, V> implements ReadOnlyKeyValueStore<K, > >> V>, AutoCloseable {} > >> > >> interface ReadOnlyKeyValueStore<K, V> { > >> ... > >> ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel > isolationLevel); > >> } > >> > >> But this would be a breaking change, as existing IQv1 queries are > >> guaranteed to never call store.close(), and therefore these would leak > >> memory under REPEATABLE_READ. > >> > >> B. > >> One thing that's notable: MyRocks states that they support > READ_COMMITTED > >> and REPEATABLE_READ, but they make no mention of READ_UNCOMMITTED[3][4]. > >> This could be because doing so is technically difficult/impossible using > >> the primitives available in RocksDB. > >> > >> -- > >> > >> Lucas, to address your points: > >> > >> U1. > >> It's only "SHOULD" to permit alternative (i.e. non-RocksDB) > >> implementations of StateStore that do not support atomic writes. > Obviously > >> in those cases, the guarantees Kafka Streams provides/expects would be > >> relaxed. Do you think we should require all implementations to support > >> atomic writes? > >> > >> U2. > >> Stores can support multiple IsolationLevels. As we've discussed above, > the > >> ideal scenario would be to specify the IsolationLevel at query-time. > >> Failing that, I think the second-best approach is to define the > >> IsolationLevel for *all* queries based on the processing.mode, which is > >> what the default StateStoreContext#isolationLevel() achieves. Would you > >> prefer an alternative? > >> > >> While the existing implementation is equivalent to READ_UNCOMMITTED, > this > >> can yield unexpected results/errors under EOS, if a transaction is > rolled > >> back. While this would be a change in behaviour for users, it would look > >> more like a bug fix than a breaking change. That said, we *could* make > it > >> configurable, and default to the existing behaviour (READ_UNCOMMITTED) > >> instead of inferring it from the processing.mode? > >> > >> N1, N2. > >> These were only primitives to avoid boxing costs, but since this is not > a > >> performance sensitive area, it should be fine to change if that's > desirable. > >> > >> N3. > >> It's because the store "manages its own offsets", which includes both > >> committing the offset, *and providing it* via getCommittedOffset(). > >> Personally, I think "managesOffsets" conveys this best, but I don't mind > >> changing it if the nomenclature is unclear. > >> > >> Sorry for the massive emails/essays! > >> -- > >> Nick > >> > >> 1: https://github.com/facebook/rocksdb/wiki/Transactions > >> 2: https://github.com/facebook/rocksdb/wiki/Snapshot > >> 3: https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation > >> 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/ > >> > >> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy > >> <lbruts...@confluent.io.invalid> wrote: > >> > >>> Hi Nick, > >>> > >>> since I last read it in April, the KIP has become much cleaner and > >>> easier to read. Great work! > >>> > >>> It feels to me the last big open point is whether we can implement > >>> isolation level as a query parameter. I understand that there are > >>> implementation concerns, but as Colt says, it would be a great > >>> addition, and would also simplify the migration path for this change. > >>> Is the implementation problem you mentioned caused by the WriteBatch > >>> not having a notion of a snapshot, as the underlying DB iterator does? > >>> In that case, I am not sure a chain of WriteBatches as you propose > >>> would fully solve the problem, but maybe I didn't dig enough into the > >>> details to fully understand it. > >>> > >>> If it's not possible to implement it now, would it be an option to > >>> make sure in this KIP that we do not fully close the door on per-query > >>> isolation levels in the interface, as it may be possible to implement > >>> the missing primitives in RocksDB or Speedb in the future. > >>> > >>> Understanding: > >>> > >>> * U1) Why is it only "SHOULD" for changelogOffsets to be persisted > >>> atomically with the records? > >>> * U2) Don't understand the default implementation of `isolationLevel`. > >>> The isolation level should be a property of the underlying store, and > >>> not be defined by the default config? Existing stores probably don't > >>> guarantee READ_COMMITTED, so the default should be to return > >>> READ_UNCOMMITTED. > >>> > >>> Nits: > >>> * N1) Could `getComittedOffset` use an `OptionalLong` return type, to > >>> avoid the `null`? > >>> * N2) Could `apporixmateNumUncomittedBytes` use an `OptionalLong` > >>> return type, to avoid the `-1`? > >>> * N3) I don't understand why `managesOffsets` uses the 'manage' verb, > >>> whereas all other methods use the "commits" verb. I'd suggest > >>> `commitsOffsets`. > >>> > >>> Either way, it feels this KIP is very close to the finish line, I'm > >>> looking forward to seeing this in production! > >>> > >>> Cheers, > >>> Lucas > >>> > >>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <c...@littlehorse.io> > wrote: > >>>> > >>>>> Making IsolationLevel a query-time constraint, rather than linking it > >>> to > >>>> the processing.guarantee. > >>>> > >>>> As I understand it, would this allow even a user of EOS to control > >>> whether > >>>> reading committed or uncommitted records? If so, I am highly in favor > of > >>>> this. > >>>> > >>>> I know that I was one of the early people to point out the current > >>>> shortcoming that IQ reads uncommitted records, but just this morning I > >>>> realized a pattern we use which means that (for certain queries) our > >>> system > >>>> needs to be able to read uncommitted records, which is the current > >>> behavior > >>>> of Kafka Streams in EOS.*** > >>>> > >>>> If IsolationLevel being a query-time decision allows for this, then > that > >>>> would be amazing. I would also vote that the default behavior should > be > >>> for > >>>> reading uncommitted records, because it is totally possible for a > valid > >>>> application to depend on that behavior, and breaking it in a minor > >>> release > >>>> might be a bit strong. > >>>> > >>>> *** (Note, for the curious reader....) Our use-case/query pattern is a > >>> bit > >>>> complex, but reading "uncommitted" records is actually safe in our > case > >>>> because processing is deterministic. Additionally, IQ being able to > read > >>>> uncommitted records is crucial to enable "read your own writes" on our > >>> API: > >>>> Due to the deterministic processing, we send an "ack" to the client > who > >>>> makes the request as soon as the processor processes the result. If > they > >>>> can't read uncommitted records, they may receive a "201 - Created" > >>>> response, immediately followed by a "404 - Not Found" when doing a > >>> lookup > >>>> for the object they just created). > >>>> > >>>> Thanks, > >>>> Colt McNealy > >>>> > >>>> *Founder, LittleHorse.dev* > >>>> > >>>> > >>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <nick.telf...@gmail.com> > >>> wrote: > >>>> > >>>>> Addendum: > >>>>> > >>>>> I think we would also face the same problem with the approach John > >>> outlined > >>>>> earlier (using the record cache as a transaction buffer and flushing > >>> it > >>>>> straight to SST files). This is because the record cache (the > >>> ThreadCache > >>>>> class) is not thread-safe, so every commit would invalidate open IQ > >>>>> Iterators in the same way that RocksDB WriteBatches do. > >>>>> -- > >>>>> Nick > >>>>> > >>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <nick.telf...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Bruno, > >>>>>> > >>>>>> I've updated the KIP based on our conversation. The only things > >>> I've not > >>>>>> yet done are: > >>>>>> > >>>>>> 1. Using transactions under ALOS and EOS. > >>>>>> 2. Making IsolationLevel a query-time constraint, rather than > >>> linking it > >>>>>> to the processing.guarantee. > >>>>>> > >>>>>> There's a wrinkle that makes this a challenge: Interactive Queries > >>> that > >>>>>> open an Iterator, when using transactions and READ_UNCOMMITTED. > >>>>>> The problem is that under READ_UNCOMMITTED, queries need to be able > >>> to > >>>>>> read records from the currently uncommitted transaction buffer > >>>>>> (WriteBatch). This includes for Iterators, which should iterate > >>> both the > >>>>>> transaction buffer and underlying database (using > >>>>>> WriteBatch#iteratorWithBase()). > >>>>>> > >>>>>> The issue is that when the StreamThread commits, it writes the > >>> current > >>>>>> WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the > >>>>>> WriteBatch while an Interactive Query holds an open Iterator on it > >>> will > >>>>>> invalidate the Iterator. Worse, it turns out that Iterators over a > >>>>>> WriteBatch become invalidated not just when the WriteBatch is > >>> cleared, > >>>>> but > >>>>>> also when the Iterators' current key receives a new write. > >>>>>> > >>>>>> Now that I'm writing this, I remember that this is the major reason > >>> that > >>>>> I > >>>>>> switched the original design from having a query-time > >>> IsolationLevel to > >>>>>> having the IsolationLevel linked to the transactionality of the > >>> stores > >>>>>> themselves. > >>>>>> > >>>>>> It *might* be possible to resolve this, by having a "chain" of > >>>>>> WriteBatches, with the StreamThread switching to a new WriteBatch > >>>>> whenever > >>>>>> a new Interactive Query attempts to read from the database, but that > >>>>> could > >>>>>> cause some performance problems/memory pressure when subjected to a > >>> high > >>>>>> Interactive Query load. It would also reduce the efficiency of > >>>>> WriteBatches > >>>>>> on-commit, as we'd have to write N WriteBatches, where N is the > >>> number of > >>>>>> Interactive Queries since the last commit. > >>>>>> > >>>>>> I realise this is getting into the weeds of the implementation, and > >>> you'd > >>>>>> rather we focus on the API for now, but I think it's important to > >>>>> consider > >>>>>> how to implement the desired API, in case we come up with an API > >>> that > >>>>>> cannot be implemented efficiently, or even at all! > >>>>>> > >>>>>> Thoughts? > >>>>>> -- > >>>>>> Nick > >>>>>> > >>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org> > >>> wrote: > >>>>>> > >>>>>>> Hi Nick, > >>>>>>> > >>>>>>> 6. > >>>>>>> Of course, you are right! My bad! > >>>>>>> Wiping out the state in the downgrading case is fine. > >>>>>>> > >>>>>>> > >>>>>>> 3a. > >>>>>>> Focus on the public facing changes for the KIP. We will manage to > >>> get > >>>>>>> the internals right. Regarding state stores that do not support > >>>>>>> READ_COMMITTED, they should throw an error stating that they do not > >>>>>>> support READ_COMMITTED. No need to adapt all state stores > >>> immediately. > >>>>>>> > >>>>>>> 3b. > >>>>>>> I am in favor of using transactions also for ALOS. > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote: > >>>>>>>> Hi Bruno, > >>>>>>>> > >>>>>>>> Thanks for getting back to me! > >>>>>>>> > >>>>>>>> 2. > >>>>>>>> The fact that implementations can always track estimated memory > >>> usage > >>>>> in > >>>>>>>> the wrapper is a good point. I can remove -1 as an option, and > >>> I'll > >>>>>>> clarify > >>>>>>>> the JavaDoc that 0 is not just for non-transactional stores, > >>> which is > >>>>>>>> currently misleading. > >>>>>>>> > >>>>>>>> 6. > >>>>>>>> The problem with catching the exception in the downgrade process > >>> is > >>>>> that > >>>>>>>> would require new code in the Kafka version being downgraded to. > >>> Since > >>>>>>>> users could conceivably downgrade to almost *any* older version > >>> of > >>>>> Kafka > >>>>>>>> Streams, I'm not sure how we could add that code? > >>>>>>>> The only way I can think of doing it would be to provide a > >>> dedicated > >>>>>>>> downgrade tool, that goes through every local store and removes > >>> the > >>>>>>>> offsets column families. But that seems like an unnecessary > >>> amount of > >>>>>>> extra > >>>>>>>> code to maintain just to handle a somewhat niche situation, when > >>> the > >>>>>>>> alternative (automatically wipe and restore stores) should be > >>>>>>> acceptable. > >>>>>>>> > >>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've requested. > >>>>>>>> > >>>>>>>> 3a. > >>>>>>>> I agree that IsolationLevel makes more sense at query-time, and I > >>>>>>> actually > >>>>>>>> initially attempted to place the IsolationLevel at query-time, > >>> but I > >>>>> ran > >>>>>>>> into some problems: > >>>>>>>> - The key issue is that, under ALOS we're not staging writes in > >>>>>>>> transactions, so can't perform writes at the READ_COMMITTED > >>> isolation > >>>>>>>> level. However, this may be addressed if we decide to *always* > >>> use > >>>>>>>> transactions as discussed under 3b. > >>>>>>>> - IQv1 and IQv2 have quite different implementations. I remember > >>>>> having > >>>>>>>> some difficulty understanding the IQv1 internals, which made it > >>>>>>> difficult > >>>>>>>> to determine what needed to be changed. However, I *think* this > >>> can be > >>>>>>>> addressed for both implementations by wrapping the RocksDBStore > >>> in an > >>>>>>>> IsolationLevel-dependent wrapper, that overrides read methods > >>> (get, > >>>>>>> etc.) > >>>>>>>> to either read directly from the database or from the ongoing > >>>>>>> transaction. > >>>>>>>> But IQv1 might still be difficult. > >>>>>>>> - If IsolationLevel becomes a query constraint, then all other > >>>>>>> StateStores > >>>>>>>> will need to respect it, including the in-memory stores. This > >>> would > >>>>>>> require > >>>>>>>> us to adapt in-memory stores to stage their writes so they can be > >>>>>>> isolated > >>>>>>>> from READ_COMMITTTED queries. It would also become an important > >>>>>>>> consideration for third-party stores on upgrade, as without > >>> changes, > >>>>>>> they > >>>>>>>> would not support READ_COMMITTED queries correctly. > >>>>>>>> > >>>>>>>> Ultimately, I may need some help making the necessary change to > >>> IQv1 > >>>>> to > >>>>>>>> support this, but I don't think it's fundamentally impossible, > >>> if we > >>>>>>> want > >>>>>>>> to pursue this route. > >>>>>>>> > >>>>>>>> 3b. > >>>>>>>> The main reason I chose to keep ALOS un-transactional was to > >>> minimize > >>>>>>>> behavioural change for most users (I believe most Streams users > >>> use > >>>>> the > >>>>>>>> default configuration, which is ALOS). That said, it's clear > >>> that if > >>>>>>> ALOS > >>>>>>>> also used transactional stores, the only change in behaviour > >>> would be > >>>>>>> that > >>>>>>>> it would become *more correct*, which could be considered a "bug > >>> fix" > >>>>> by > >>>>>>>> users, rather than a change they need to handle. > >>>>>>>> > >>>>>>>> I believe that performance using transactions (aka. RocksDB > >>>>>>> WriteBatches) > >>>>>>>> should actually be *better* than the un-batched write-path that > >>> is > >>>>>>>> currently used[1]. The only "performance" consideration will be > >>> the > >>>>>>>> increased memory usage that transactions require. Given the > >>>>> mitigations > >>>>>>> for > >>>>>>>> this memory that we have in place, I would expect that this is > >>> not a > >>>>>>>> problem for most users. > >>>>>>>> > >>>>>>>> If we're happy to do so, we can make ALOS also use transactions. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Nick > >>>>>>>> > >>>>>>>> Link 1: > >>>>>>>> > >>>>> > >>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>> > >>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org > >>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Nick, > >>>>>>>>> > >>>>>>>>> Thanks for the updates and sorry for the delay on my side! > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 1. > >>>>>>>>> Making the default implementation for flush() a no-op sounds > >>> good to > >>>>>>> me. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 2. > >>>>>>>>> I think what was bugging me here is that a third-party state > >>> store > >>>>>>> needs > >>>>>>>>> to implement the state store interface. That means they need to > >>>>>>>>> implement a wrapper around the actual state store as we do for > >>>>> RocksDB > >>>>>>>>> with RocksDBStore. So, a third-party state store can always > >>> estimate > >>>>>>> the > >>>>>>>>> uncommitted bytes, if it wants, because the wrapper can record > >>> the > >>>>>>> added > >>>>>>>>> bytes. > >>>>>>>>> One case I can think of where returning -1 makes sense is when > >>>>> Streams > >>>>>>>>> does not need to estimate the size of the write batch and > >>> trigger > >>>>>>>>> extraordinary commits, because the third-party state store > >>> takes care > >>>>>>> of > >>>>>>>>> memory. But in that case the method could also just return 0. > >>> Even > >>>>> that > >>>>>>>>> case would be better solved with a method that returns whether > >>> the > >>>>>>> state > >>>>>>>>> store manages itself the memory used for uncommitted bytes or > >>> not. > >>>>>>>>> Said that, I am fine with keeping the -1 return value, I was > >>> just > >>>>>>>>> wondering when and if it will be used. > >>>>>>>>> > >>>>>>>>> Regarding returning 0 for transactional state stores when the > >>> batch > >>>>> is > >>>>>>>>> empty, I was just wondering because you explicitly stated > >>>>>>>>> > >>>>>>>>> "or {@code 0} if this StateStore does not support transactions." > >>>>>>>>> > >>>>>>>>> So it seemed to me returning 0 could only happen for > >>>>> non-transactional > >>>>>>>>> state stores. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 3. > >>>>>>>>> > >>>>>>>>> a) What do you think if we move the isolation level to IQ (v1 > >>> and > >>>>> v2)? > >>>>>>>>> In the end this is the only component that really needs to > >>> specify > >>>>> the > >>>>>>>>> isolation level. It is similar to the Kafka consumer that can > >>> choose > >>>>>>>>> with what isolation level to read the input topic. > >>>>>>>>> For IQv1 the isolation level should go into > >>> StoreQueryParameters. For > >>>>>>>>> IQv2, I would add it to the Query interface. > >>>>>>>>> > >>>>>>>>> b) Point a) raises the question what should happen during > >>>>> at-least-once > >>>>>>>>> processing when the state store does not use transactions? John > >>> in > >>>>> the > >>>>>>>>> past proposed to also use transactions on state stores for > >>>>>>>>> at-least-once. I like that idea, because it avoids aggregating > >>> the > >>>>> same > >>>>>>>>> records over and over again in the case of a failure. We had a > >>> case > >>>>> in > >>>>>>>>> the past where a Streams applications in at-least-once mode was > >>>>> failing > >>>>>>>>> continuously for some reasons I do not remember before > >>> committing the > >>>>>>>>> offsets. After each failover, the app aggregated again and > >>> again the > >>>>>>>>> same records. Of course the aggregate increased to very wrong > >>> values > >>>>>>>>> just because of the failover. With transactions on the state > >>> stores > >>>>> we > >>>>>>>>> could have avoided this. The app would have output the same > >>> aggregate > >>>>>>>>> multiple times (i.e., after each failover) but at least the > >>> value of > >>>>>>> the > >>>>>>>>> aggregate would not depend on the number of failovers. > >>> Outputting the > >>>>>>>>> same aggregate multiple times would be incorrect under > >>> exactly-once > >>>>> but > >>>>>>>>> it is OK for at-least-once. > >>>>>>>>> If it makes sense to add a config to turn on and off > >>> transactions on > >>>>>>>>> state stores under at-least-once or just use transactions in > >>> any case > >>>>>>> is > >>>>>>>>> a question we should also discuss in this KIP. It depends a bit > >>> on > >>>>> the > >>>>>>>>> performance trade-off. Maybe to be safe, I would add a config. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 4. > >>>>>>>>> Your points are all valid. I tend to say to keep the metrics > >>> around > >>>>>>>>> flush() until we remove flush() completely from the interface. > >>> Calls > >>>>> to > >>>>>>>>> flush() might still exist since existing processors might still > >>> call > >>>>>>>>> flush() explicitly as you mentioned in 1). For sure, we need to > >>>>>>> document > >>>>>>>>> how the metrics change due to the transactions in the upgrade > >>> notes. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 5. > >>>>>>>>> I see. Then you should describe how the .position files are > >>> handled > >>>>> in > >>>>>>>>> a dedicated section of the KIP or incorporate the description > >>> in the > >>>>>>>>> "Atomic Checkpointing" section instead of only mentioning it in > >>> the > >>>>>>>>> "Compatibility, Deprecation, and Migration Plan". > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 6. > >>>>>>>>> Describing upgrading and downgrading in the KIP is a good idea. > >>>>>>>>> Regarding downgrading, I think you could also catch the > >>> exception and > >>>>>>> do > >>>>>>>>> what is needed to downgrade, e.g., drop the column family. See > >>> here > >>>>> for > >>>>>>>>> an example: > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>> > https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75 > >>>>>>>>> > >>>>>>>>> It is a bit brittle, but it works. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote: > >>>>>>>>>> Hi Bruno, > >>>>>>>>>> > >>>>>>>>>> Thanks for taking the time to review the KIP. I'm back from > >>> leave > >>>>> now > >>>>>>> and > >>>>>>>>>> intend to move this forwards as quickly as I can. > >>>>>>>>>> > >>>>>>>>>> Addressing your points: > >>>>>>>>>> > >>>>>>>>>> 1. > >>>>>>>>>> Because flush() is part of the StateStore API, it's exposed to > >>>>> custom > >>>>>>>>>> Processors, which might be making calls to flush(). This was > >>>>> actually > >>>>>>> the > >>>>>>>>>> case in a few integration tests. > >>>>>>>>>> To maintain as much compatibility as possible, I'd prefer not > >>> to > >>>>> make > >>>>>>>>> this > >>>>>>>>>> an UnsupportedOperationException, as it will cause previously > >>>>> working > >>>>>>>>>> Processors to start throwing exceptions at runtime. > >>>>>>>>>> I agree that it doesn't make sense for it to proxy commit(), > >>> though, > >>>>>>> as > >>>>>>>>>> that would cause it to violate the "StateStores commit only > >>> when the > >>>>>>> Task > >>>>>>>>>> commits" rule. > >>>>>>>>>> Instead, I think we should make this a no-op. That way, > >>> existing > >>>>> user > >>>>>>>>>> Processors will continue to work as-before, without violation > >>> of > >>>>> store > >>>>>>>>>> consistency that would be caused by premature flush/commit of > >>>>>>> StateStore > >>>>>>>>>> data to disk. > >>>>>>>>>> What do you think? > >>>>>>>>>> > >>>>>>>>>> 2. > >>>>>>>>>> As stated in the JavaDoc, when a StateStore implementation is > >>>>>>>>>> transactional, but is unable to estimate the uncommitted memory > >>>>> usage, > >>>>>>>>> the > >>>>>>>>>> method will return -1. > >>>>>>>>>> The intention here is to permit third-party implementations > >>> that may > >>>>>>> not > >>>>>>>>> be > >>>>>>>>>> able to estimate memory usage. > >>>>>>>>>> > >>>>>>>>>> Yes, it will be 0 when nothing has been written to the store > >>> yet. I > >>>>>>>>> thought > >>>>>>>>>> that was implied by "This method will return an approximation > >>> of the > >>>>>>>>> memory > >>>>>>>>>> would be freed by the next call to {@link #commit(Map)}" and > >>>>> "@return > >>>>>>> The > >>>>>>>>>> approximate size of all records awaiting {@link #commit(Map)}", > >>>>>>> however, > >>>>>>>>> I > >>>>>>>>>> can add it explicitly to the JavaDoc if you think this is > >>> unclear? > >>>>>>>>>> > >>>>>>>>>> 3. > >>>>>>>>>> I realise this is probably the most contentious point in my > >>> design, > >>>>>>> and > >>>>>>>>> I'm > >>>>>>>>>> open to changing it if I'm unable to convince you of the > >>> benefits. > >>>>>>>>>> Nevertheless, here's my argument: > >>>>>>>>>> The Interactive Query (IQ) API(s) are directly provided > >>> StateStores > >>>>> to > >>>>>>>>>> query, and it may be important for users to programmatically > >>> know > >>>>>>> which > >>>>>>>>>> mode the StateStore is operating under. If we simply provide an > >>>>>>>>>> "eosEnabled" boolean (as used throughout the internal streams > >>>>>>> engine), or > >>>>>>>>>> similar, then users will need to understand the operation and > >>>>>>>>> consequences > >>>>>>>>>> of each available processing mode and how it pertains to their > >>>>>>>>> StateStore. > >>>>>>>>>> > >>>>>>>>>> Interactive Query users aren't the only people that care about > >>> the > >>>>>>>>>> processing.mode/IsolationLevel of a StateStore: implementers of > >>>>> custom > >>>>>>>>>> StateStores also need to understand the behaviour expected of > >>> their > >>>>>>>>>> implementation. KIP-892 introduces some assumptions into the > >>> Streams > >>>>>>>>> Engine > >>>>>>>>>> about how StateStores operate under each processing mode, and > >>> it's > >>>>>>>>>> important that custom implementations adhere to those > >>> assumptions in > >>>>>>>>> order > >>>>>>>>>> to maintain the consistency guarantees. > >>>>>>>>>> > >>>>>>>>>> IsolationLevels provide a high-level contract on the behaviour > >>> of > >>>>> the > >>>>>>>>>> StateStore: a user knows that under READ_COMMITTED, they will > >>> see > >>>>>>> writes > >>>>>>>>>> only after the Task has committed, and under READ_UNCOMMITTED > >>> they > >>>>>>> will > >>>>>>>>> see > >>>>>>>>>> writes immediately. No understanding of the details of each > >>>>>>>>> processing.mode > >>>>>>>>>> is required, either for IQ users or StateStore implementers. > >>>>>>>>>> > >>>>>>>>>> An argument can be made that these contractual guarantees can > >>> simply > >>>>>>> be > >>>>>>>>>> documented for the processing.mode (i.e. that exactly-once and > >>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and at-least-once > >>> behaves > >>>>>>> like > >>>>>>>>>> READ_UNCOMMITTED), but there are several small issues with > >>> this I'd > >>>>>>>>> prefer > >>>>>>>>>> to avoid: > >>>>>>>>>> > >>>>>>>>>> - Where would we document these contracts, in a way that > >>> is > >>>>>>> difficult > >>>>>>>>>> for users/implementers to miss/ignore? > >>>>>>>>>> - It's not clear to users that the processing mode is > >>>>>>> communicating > >>>>>>>>>> an expectation of read isolation, unless they read the > >>>>>>>>> documentation. Users > >>>>>>>>>> rarely consult documentation unless they feel they need > >>> to, so > >>>>>>> it's > >>>>>>>>> likely > >>>>>>>>>> this detail would get missed by many users. > >>>>>>>>>> - It tightly couples processing modes to read isolation. > >>> Adding > >>>>>>> new > >>>>>>>>>> processing modes, or changing the read isolation of > >>> existing > >>>>>>>>> processing > >>>>>>>>>> modes would be difficult/impossible. > >>>>>>>>>> > >>>>>>>>>> Ultimately, the cost of introducing IsolationLevels is just a > >>> single > >>>>>>>>>> method, since we re-use the existing IsolationLevel enum from > >>> Kafka. > >>>>>>> This > >>>>>>>>>> gives us a clear place to document the contractual guarantees > >>>>> expected > >>>>>>>>>> of/provided by StateStores, that is accessible both by the > >>>>> StateStore > >>>>>>>>>> itself, and by IQ users. > >>>>>>>>>> > >>>>>>>>>> (Writing this I've just realised that the StateStore and IQ > >>> APIs > >>>>>>> actually > >>>>>>>>>> don't provide access to StateStoreContext that IQ users would > >>> have > >>>>>>> direct > >>>>>>>>>> access to... Perhaps StateStore should expose isolationLevel() > >>>>> itself > >>>>>>>>> too?) > >>>>>>>>>> > >>>>>>>>>> 4. > >>>>>>>>>> Yeah, I'm not comfortable renaming the metrics in-place > >>> either, as > >>>>>>> it's a > >>>>>>>>>> backwards incompatible change. My concern is that, if we leave > >>> the > >>>>>>>>> existing > >>>>>>>>>> "flush" metrics in place, they will be confusing to users. > >>> Right > >>>>> now, > >>>>>>>>>> "flush" metrics record explicit flushes to disk, but under > >>> KIP-892, > >>>>>>> even > >>>>>>>>> a > >>>>>>>>>> commit() will not explicitly flush data to disk - RocksDB will > >>>>> decide > >>>>>>> on > >>>>>>>>>> when to flush memtables to disk itself. > >>>>>>>>>> > >>>>>>>>>> If we keep the existing "flush" metrics, we'd have two options, > >>>>> which > >>>>>>>>> both > >>>>>>>>>> seem pretty bad to me: > >>>>>>>>>> > >>>>>>>>>> 1. Have them record calls to commit(), which would be > >>>>>>> misleading, as > >>>>>>>>>> data is no longer explicitly "flushed" to disk by this > >>> call. > >>>>>>>>>> 2. Have them record nothing at all, which is equivalent to > >>>>>>> removing > >>>>>>>>> the > >>>>>>>>>> metrics, except that users will see the metric still > >>> exists and > >>>>>>> so > >>>>>>>>> assume > >>>>>>>>>> that the metric is correct, and that there's a problem > >>> with > >>>>> their > >>>>>>>>> system > >>>>>>>>>> when there isn't. > >>>>>>>>>> > >>>>>>>>>> I agree that removing them is also a bad solution, and I'd > >>> like some > >>>>>>>>>> guidance on the best path forward here. > >>>>>>>>>> > >>>>>>>>>> 5. > >>>>>>>>>> Position files are updated on every write to a StateStore. > >>> Since our > >>>>>>>>> writes > >>>>>>>>>> are now buffered until commit(), we can't update the Position > >>> file > >>>>>>> until > >>>>>>>>>> commit() has been called, otherwise it would be inconsistent > >>> with > >>>>> the > >>>>>>>>> data > >>>>>>>>>> in the event of a rollback. Consequently, we need to manage > >>> these > >>>>>>> offsets > >>>>>>>>>> the same way we manage the checkpoint offsets, and ensure > >>> they're > >>>>> only > >>>>>>>>>> written on commit(). > >>>>>>>>>> > >>>>>>>>>> 6. > >>>>>>>>>> Agreed, although I'm not exactly sure yet what tests to write. > >>> How > >>>>>>>>> explicit > >>>>>>>>>> do we need to be here in the KIP? > >>>>>>>>>> > >>>>>>>>>> As for upgrade/downgrade: upgrade is designed to be seamless, > >>> and we > >>>>>>>>> should > >>>>>>>>>> definitely add some tests around that. Downgrade, it > >>> transpires, > >>>>> isn't > >>>>>>>>>> currently possible, as the extra column family for offset > >>> storage is > >>>>>>>>>> incompatible with the pre-KIP-892 implementation: when you > >>> open a > >>>>>>> RocksDB > >>>>>>>>>> database, you must open all available column families or > >>> receive an > >>>>>>>>> error. > >>>>>>>>>> What currently happens on downgrade is that it attempts to > >>> open the > >>>>>>>>> store, > >>>>>>>>>> throws an error about the offsets column family not being > >>> opened, > >>>>>>> which > >>>>>>>>>> triggers a wipe and rebuild of the Task. Given that downgrades > >>>>> should > >>>>>>> be > >>>>>>>>>> uncommon, I think this is acceptable behaviour, as the > >>> end-state is > >>>>>>>>>> consistent, even if it results in an undesirable state restore. > >>>>>>>>>> > >>>>>>>>>> Should I document the upgrade/downgrade behaviour explicitly > >>> in the > >>>>>>> KIP? > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Nick > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna < > >>> cado...@apache.org> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Nick! > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the updates! > >>>>>>>>>>> > >>>>>>>>>>> 1. > >>>>>>>>>>> Why does StateStore#flush() default to > >>>>>>>>>>> StateStore#commit(Collections.emptyMap())? > >>>>>>>>>>> Since calls to flush() will not exist anymore after this KIP > >>> is > >>>>>>>>>>> released, I would rather throw an unsupported operation > >>> exception > >>>>> by > >>>>>>>>>>> default. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 2. > >>>>>>>>>>> When would a state store return -1 from > >>>>>>>>>>> StateStore#approximateNumUncommittedBytes() while being > >>>>>>> transactional? > >>>>>>>>>>> > >>>>>>>>>>> Wouldn't StateStore#approximateNumUncommittedBytes() also > >>> return 0 > >>>>> if > >>>>>>>>>>> the state store is transactional but nothing has been written > >>> to > >>>>> the > >>>>>>>>>>> state store yet? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 3. > >>>>>>>>>>> Sorry for bringing this up again. Does this KIP really need to > >>>>>>> introduce > >>>>>>>>>>> StateStoreContext#isolationLevel()? StateStoreContext has > >>> already > >>>>>>>>>>> appConfigs() which basically exposes the same information, > >>> i.e., if > >>>>>>> EOS > >>>>>>>>>>> is enabled or not. > >>>>>>>>>>> In one of your previous e-mails you wrote: > >>>>>>>>>>> > >>>>>>>>>>> "My idea was to try to keep the StateStore interface as > >>> loosely > >>>>>>> coupled > >>>>>>>>>>> from the Streams engine as possible, to give implementers more > >>>>>>> freedom, > >>>>>>>>>>> and reduce the amount of internal knowledge required." > >>>>>>>>>>> > >>>>>>>>>>> While I understand the intent, I doubt that it decreases the > >>>>>>> coupling of > >>>>>>>>>>> a StateStore interface and the Streams engine. READ_COMMITTED > >>> only > >>>>>>>>>>> applies to IQ but not to reads by processors. Thus, > >>> implementers > >>>>>>> need to > >>>>>>>>>>> understand how Streams accesses the state stores. > >>>>>>>>>>> > >>>>>>>>>>> I would like to hear what others think about this. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 4. > >>>>>>>>>>> Great exposing new metrics for transactional state stores! > >>>>> However, I > >>>>>>>>>>> would prefer to add new metrics and deprecate (in the docs) > >>> the old > >>>>>>>>>>> ones. You can find examples of deprecated metrics here: > >>>>>>>>>>> https://kafka.apache.org/documentation/#selector_monitoring > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 5. > >>>>>>>>>>> Why does the KIP mention position files? I do not think they > >>> are > >>>>>>> related > >>>>>>>>>>> to transactions or flushes. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 6. > >>>>>>>>>>> I think we will also need to adapt/add integration tests > >>> besides > >>>>> unit > >>>>>>>>>>> tests. Additionally, we probably need integration or system > >>> tests > >>>>> to > >>>>>>>>>>> verify that upgrades and downgrades between transactional and > >>>>>>>>>>> non-transactional state stores work as expected. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Bruno > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote: > >>>>>>>>>>>> One more thing: I noted John's suggestion in the KIP, under > >>>>>>> "Rejected > >>>>>>>>>>>> Alternatives". I still think it's an idea worth pursuing, > >>> but I > >>>>>>> believe > >>>>>>>>>>>> that it's out of the scope of this KIP, because it solves a > >>>>>>> different > >>>>>>>>> set > >>>>>>>>>>>> of problems to this KIP, and the scope of this one has > >>> already > >>>>> grown > >>>>>>>>>>> quite > >>>>>>>>>>>> large! > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford < > >>>>> nick.telf...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I've updated the KIP ( > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>> ) > >>>>>>>>>>>>> with the latest changes; mostly bringing back "Atomic > >>>>>>> Checkpointing" > >>>>>>>>>>> (for > >>>>>>>>>>>>> what feels like the 10th time!). I think the one thing > >>> missing is > >>>>>>> some > >>>>>>>>>>>>> changes to metrics (notably the store "flush" metrics will > >>> need > >>>>> to > >>>>>>> be > >>>>>>>>>>>>> renamed to "commit"). > >>>>>>>>>>>>> > >>>>>>>>>>>>> The reason I brought back Atomic Checkpointing was to > >>> decouple > >>>>>>> store > >>>>>>>>>>> flush > >>>>>>>>>>>>> from store commit. This is important, because with > >>> Transactional > >>>>>>>>>>>>> StateStores, we now need to call "flush" on *every* Task > >>> commit, > >>>>>>> and > >>>>>>>>> not > >>>>>>>>>>>>> just when the StateStore is closing, otherwise our > >>> transaction > >>>>>>> buffer > >>>>>>>>>>> will > >>>>>>>>>>>>> never be written and persisted, instead growing unbounded! I > >>>>>>>>>>> experimented > >>>>>>>>>>>>> with some simple solutions, like forcing a store flush > >>> whenever > >>>>> the > >>>>>>>>>>>>> transaction buffer was likely to exceed its configured > >>> size, but > >>>>>>> this > >>>>>>>>>>> was > >>>>>>>>>>>>> brittle: it prevented the transaction buffer from being > >>>>> configured > >>>>>>> to > >>>>>>>>> be > >>>>>>>>>>>>> unbounded, and it still would have required explicit > >>> flushes of > >>>>>>>>> RocksDB, > >>>>>>>>>>>>> yielding sub-optimal performance and memory utilization. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I deemed Atomic Checkpointing to be the "right" way to > >>> resolve > >>>>> this > >>>>>>>>>>>>> problem. By ensuring that the changelog offsets that > >>> correspond > >>>>> to > >>>>>>> the > >>>>>>>>>>> most > >>>>>>>>>>>>> recently written records are always atomically written to > >>> the > >>>>>>>>> StateStore > >>>>>>>>>>>>> (by writing them to the same transaction buffer), we can > >>> avoid > >>>>>>>>> forcibly > >>>>>>>>>>>>> flushing the RocksDB memtables to disk, letting RocksDB > >>> flush > >>>>> them > >>>>>>>>> only > >>>>>>>>>>>>> when necessary, without losing any of our consistency > >>> guarantees. > >>>>>>> See > >>>>>>>>>>> the > >>>>>>>>>>>>> updated KIP for more info. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I have fully implemented these changes, although I'm still > >>> not > >>>>>>>>> entirely > >>>>>>>>>>>>> happy with the implementation for segmented StateStores, so > >>> I > >>>>> plan > >>>>>>> to > >>>>>>>>>>>>> refactor that. Despite that, all tests pass. If you'd like > >>> to try > >>>>>>> out > >>>>>>>>> or > >>>>>>>>>>>>> review this highly experimental and incomplete branch, it's > >>>>>>> available > >>>>>>>>>>> here: > >>>>>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. > >>> Note: > >>>>>>> it's > >>>>>>>>>>> built > >>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable base to build > >>> and test > >>>>>>> it > >>>>>>>>> on, > >>>>>>>>>>>>> and to enable easy apples-to-apples comparisons in a live > >>>>>>>>> environment. I > >>>>>>>>>>>>> plan to rebase it against trunk once it's nearer completion > >>> and > >>>>> has > >>>>>>>>> been > >>>>>>>>>>>>> proven on our main application. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I would really appreciate help in reviewing and testing: > >>>>>>>>>>>>> - Segmented (Versioned, Session and Window) stores > >>>>>>>>>>>>> - Global stores > >>>>>>>>>>>>> > >>>>>>>>>>>>> As I do not currently use either of these, so my primary > >>> test > >>>>>>>>>>> environment > >>>>>>>>>>>>> doesn't test these areas. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I'm going on Parental Leave starting next week for a few > >>> weeks, > >>>>> so > >>>>>>>>> will > >>>>>>>>>>>>> not have time to move this forward until late August. That > >>> said, > >>>>>>> your > >>>>>>>>>>>>> feedback is welcome and appreciated, I just won't be able to > >>>>>>> respond > >>>>>>>>> as > >>>>>>>>>>>>> quickly as usual. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Nick > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford < > >>>>> nick.telf...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Bruno > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Yes, that's correct, although the impact on IQ is not > >>> something > >>>>> I > >>>>>>> had > >>>>>>>>>>>>>> considered. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> What about atomically updating the state store from the > >>>>>>> transaction > >>>>>>>>>>>>>>> buffer every commit interval and writing the checkpoint > >>> (thus, > >>>>>>>>>>> flushing > >>>>>>>>>>>>>>> the memtable) every configured amount of data and/or > >>> number of > >>>>>>>>> commit > >>>>>>>>>>>>>>> intervals? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'm not quite sure I follow. Are you suggesting that we > >>> add an > >>>>>>>>>>> additional > >>>>>>>>>>>>>> config for the max number of commit intervals between > >>>>> checkpoints? > >>>>>>>>> That > >>>>>>>>>>>>>> way, we would checkpoint *either* when the transaction > >>> buffers > >>>>> are > >>>>>>>>>>> nearly > >>>>>>>>>>>>>> full, *OR* whenever a certain number of commit intervals > >>> have > >>>>>>>>> elapsed, > >>>>>>>>>>>>>> whichever comes first? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> That certainly seems reasonable, although this re-ignites > >>> an > >>>>>>> earlier > >>>>>>>>>>>>>> debate about whether a config should be measured in > >>> "number of > >>>>>>> commit > >>>>>>>>>>>>>> intervals", instead of just an absolute time. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> FWIW, I realised that this issue is the reason I was > >>> pursuing > >>>>> the > >>>>>>>>>>> Atomic > >>>>>>>>>>>>>> Checkpoints, as it de-couples memtable flush from > >>> checkpointing, > >>>>>>>>> which > >>>>>>>>>>>>>> enables us to just checkpoint on every commit without any > >>>>>>> performance > >>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely the "best" > >>> solution, > >>>>>>> but > >>>>>>>>>>> I'm not > >>>>>>>>>>>>>> sure if this is enough to bring it back into this KIP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'm currently working on moving all the transactional logic > >>>>>>> directly > >>>>>>>>>>> into > >>>>>>>>>>>>>> RocksDBStore itself, which does away with the > >>>>>>>>> StateStore#newTransaction > >>>>>>>>>>>>>> method, and reduces the number of new classes introduced, > >>>>>>>>> significantly > >>>>>>>>>>>>>> reducing the complexity. If it works, and the complexity is > >>>>>>>>> drastically > >>>>>>>>>>>>>> reduced, I may try bringing back Atomic Checkpoints into > >>> this > >>>>> KIP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna < > >>> cado...@apache.org> > >>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the insights! Very interesting! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> As far as I understand, you want to atomically update the > >>> state > >>>>>>>>> store > >>>>>>>>>>>>>>> from the transaction buffer, flush the memtable of a state > >>>>> store > >>>>>>> and > >>>>>>>>>>>>>>> write the checkpoint not after the commit time elapsed but > >>>>> after > >>>>>>> the > >>>>>>>>>>>>>>> transaction buffer reached a size that would lead to > >>> exceeding > >>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes before the next > >>> commit > >>>>>>>>>>> interval > >>>>>>>>>>>>>>> ends. > >>>>>>>>>>>>>>> That means, the Kafka transaction would commit every > >>> commit > >>>>>>> interval > >>>>>>>>>>> but > >>>>>>>>>>>>>>> the state store will only be atomically updated roughly > >>> every > >>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of data. Also IQ > >>> would > >>>>>>> then > >>>>>>>>>>> only > >>>>>>>>>>>>>>> see new data roughly every > >>>>>>> statestore.transaction.buffer.max.bytes. > >>>>>>>>>>>>>>> After a failure the state store needs to restore up to > >>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Is this correct? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What about atomically updating the state store from the > >>>>>>> transaction > >>>>>>>>>>>>>>> buffer every commit interval and writing the checkpoint > >>> (thus, > >>>>>>>>>>> flushing > >>>>>>>>>>>>>>> the memtable) every configured amount of data and/or > >>> number of > >>>>>>>>> commit > >>>>>>>>>>>>>>> intervals? In such a way, we would have the same delay for > >>>>>>> records > >>>>>>>>>>>>>>> appearing in output topics and IQ because both would > >>> appear > >>>>> when > >>>>>>> the > >>>>>>>>>>>>>>> Kafka transaction is committed. However, after a failure > >>> the > >>>>>>> state > >>>>>>>>>>> store > >>>>>>>>>>>>>>> still needs to restore up to > >>>>>>> statestore.transaction.buffer.max.bytes > >>>>>>>>>>> and > >>>>>>>>>>>>>>> it might restore data that is already in the state store > >>>>> because > >>>>>>> the > >>>>>>>>>>>>>>> checkpoint lags behind the last stable offset (i.e. the > >>> last > >>>>>>>>> committed > >>>>>>>>>>>>>>> offset) of the changelog topics. Restoring data that is > >>> already > >>>>>>> in > >>>>>>>>> the > >>>>>>>>>>>>>>> state store is idempotent, so eos should not violated. > >>>>>>>>>>>>>>> This solution needs at least one new config to specify > >>> when a > >>>>>>>>>>> checkpoint > >>>>>>>>>>>>>>> should be written. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> A small correction to your previous e-mail that does not > >>> change > >>>>>>>>>>> anything > >>>>>>>>>>>>>>> you said: Under alos the default commit interval is 30 > >>> seconds, > >>>>>>> not > >>>>>>>>>>> five > >>>>>>>>>>>>>>> seconds. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote: > >>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I've begun performance testing my branch on our staging > >>>>>>>>> environment, > >>>>>>>>>>>>>>>> putting it through its paces in our non-trivial > >>> application. > >>>>> I'm > >>>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>> observing the same increased flush rate that we saw the > >>> last > >>>>>>> time > >>>>>>>>> we > >>>>>>>>>>>>>>>> attempted to use a version of this KIP, but this time, I > >>>>> think I > >>>>>>>>> know > >>>>>>>>>>>>>>> why. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called at > >>> the end > >>>>>>> of > >>>>>>>>> the > >>>>>>>>>>>>>>> Task > >>>>>>>>>>>>>>>> commit process, has the following behaviour: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> - Under ALOS: checkpoint the state stores. This > >>>>> includes > >>>>>>>>>>>>>>>> flushing memtables in RocksDB. This is acceptable > >>>>>>> because the > >>>>>>>>>>>>>>> default > >>>>>>>>>>>>>>>> commit.interval.ms is 5 seconds, so forcibly > >>> flushing > >>>>>>>>> memtables > >>>>>>>>>>>>>>> every 5 > >>>>>>>>>>>>>>>> seconds is acceptable for most applications. > >>>>>>>>>>>>>>>> - Under EOS: checkpointing is not done, *unless* > >>> it's > >>>>>>> being > >>>>>>>>>>>>>>> forced, due > >>>>>>>>>>>>>>>> to e.g. the Task closing or being revoked. This > >>> means > >>>>>>> that > >>>>>>>>> under > >>>>>>>>>>>>>>> normal > >>>>>>>>>>>>>>>> processing conditions, the state stores will not > >>> be > >>>>>>>>>>> checkpointed, > >>>>>>>>>>>>>>> and will > >>>>>>>>>>>>>>>> not have memtables flushed at all , unless RocksDB > >>>>>>> decides to > >>>>>>>>>>>>>>> flush them on > >>>>>>>>>>>>>>>> its own. Checkpointing stores and force-flushing > >>> their > >>>>>>>>> memtables > >>>>>>>>>>>>>>> is only > >>>>>>>>>>>>>>>> done when a Task is being closed. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at least > >>>>> *some* > >>>>>>>>>>> normal > >>>>>>>>>>>>>>>> Task commits, in order to write the RocksDB transaction > >>>>> buffers > >>>>>>> to > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> state stores, and to ensure the offsets are synced to > >>> disk to > >>>>>>>>> prevent > >>>>>>>>>>>>>>>> restores from getting out of hand. Consequently, my > >>> current > >>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task commit, which is > >>> far too > >>>>>>>>>>>>>>> frequent. > >>>>>>>>>>>>>>>> This causes checkpoints every 10,000 records, which is a > >>>>> change > >>>>>>> in > >>>>>>>>>>>>>>> flush > >>>>>>>>>>>>>>>> behaviour, potentially causing performance problems for > >>> some > >>>>>>>>>>>>>>> applications. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'm looking into possible solutions, and I'm currently > >>> leaning > >>>>>>>>>>> towards > >>>>>>>>>>>>>>>> using the statestore.transaction.buffer.max.bytes > >>>>> configuration > >>>>>>> to > >>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to exceed it. This > >>> would > >>>>>>>>>>>>>>> complement the > >>>>>>>>>>>>>>>> existing "early Task commit" functionality that this > >>>>>>> configuration > >>>>>>>>>>>>>>>> provides, in the following way: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> - Currently, we use > >>>>>>> statestore.transaction.buffer.max.bytes > >>>>>>>>> to > >>>>>>>>>>>>>>> force an > >>>>>>>>>>>>>>>> early Task commit if processing more records would > >>>>> cause > >>>>>>> our > >>>>>>>>>>> state > >>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>> transactions to exceed the memory assigned to > >>> them. > >>>>>>>>>>>>>>>> - New functionality: when a Task *does* commit, > >>> we will > >>>>>>> not > >>>>>>>>>>>>>>> checkpoint > >>>>>>>>>>>>>>>> the stores (and hence flush the transaction > >>> buffers) > >>>>>>> unless > >>>>>>>>> we > >>>>>>>>>>>>>>> expect to > >>>>>>>>>>>>>>>> cross the statestore.transaction.buffer.max.bytes > >>>>>>> threshold > >>>>>>>>>>> before > >>>>>>>>>>>>>>> the next > >>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'm also open to suggestions. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford < > >>>>>>> nick.telf...@gmail.com > >>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Bruno! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>> By "less predictable for users", I meant in terms of > >>>>>>> understanding > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> performance profile under various circumstances. The > >>> more > >>>>>>> complex > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> solution, the more difficult it would be for users to > >>>>>>> understand > >>>>>>>>> the > >>>>>>>>>>>>>>>>> performance they see. For example, spilling records to > >>> disk > >>>>>>> when > >>>>>>>>> the > >>>>>>>>>>>>>>>>> transaction buffer reaches a threshold would, I expect, > >>>>> reduce > >>>>>>>>> write > >>>>>>>>>>>>>>>>> throughput. This reduction in write throughput could be > >>>>>>>>> unexpected, > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> potentially difficult to diagnose/understand for users. > >>>>>>>>>>>>>>>>> At the moment, I think the "early commit" concept is > >>>>> relatively > >>>>>>>>>>>>>>>>> straightforward; it's easy to document, and conceptually > >>>>> fairly > >>>>>>>>>>>>>>> obvious to > >>>>>>>>>>>>>>>>> users. We could probably add a metric to make it easier > >>> to > >>>>>>>>>>> understand > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> it happens though. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. (the second one) > >>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an indirect way of > >>>>> telling > >>>>>>>>>>>>>>> StateStores > >>>>>>>>>>>>>>>>> whether they should be transactional. READ_COMMITTED > >>>>>>> essentially > >>>>>>>>>>>>>>> requires > >>>>>>>>>>>>>>>>> transactions, because it dictates that two threads > >>> calling > >>>>>>>>>>>>>>>>> `newTransaction()` should not see writes from the other > >>>>>>>>> transaction > >>>>>>>>>>>>>>> until > >>>>>>>>>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all > >>> bets are > >>>>>>> off, > >>>>>>>>>>> and > >>>>>>>>>>>>>>>>> stores can allow threads to observe written records at > >>> any > >>>>>>> time, > >>>>>>>>>>>>>>> which is > >>>>>>>>>>>>>>>>> essentially "no transactions". That said, StateStores > >>> are > >>>>> free > >>>>>>> to > >>>>>>>>>>>>>>> implement > >>>>>>>>>>>>>>>>> these guarantees however they can, which is a bit more > >>>>> relaxed > >>>>>>>>> than > >>>>>>>>>>>>>>>>> dictating "you must use transactions". For example, with > >>>>>>> RocksDB > >>>>>>>>> we > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> implement these as READ_COMMITTED == WBWI-based > >>>>> "transactions", > >>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the database. But > >>> with > >>>>>>> other > >>>>>>>>>>>>>>> storage > >>>>>>>>>>>>>>>>> engines, it might be preferable to *always* use > >>> transactions, > >>>>>>> even > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> unnecessary; or there may be storage engines that don't > >>>>> provide > >>>>>>>>>>>>>>>>> transactions, but the isolation guarantees can be met > >>> using a > >>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>> technique. > >>>>>>>>>>>>>>>>> My idea was to try to keep the StateStore interface as > >>>>> loosely > >>>>>>>>>>> coupled > >>>>>>>>>>>>>>>>> from the Streams engine as possible, to give > >>> implementers > >>>>> more > >>>>>>>>>>>>>>> freedom, and > >>>>>>>>>>>>>>>>> reduce the amount of internal knowledge required. > >>>>>>>>>>>>>>>>> That said, I understand that "IsolationLevel" might not > >>> be > >>>>> the > >>>>>>>>> right > >>>>>>>>>>>>>>>>> abstraction, and we can always make it much more > >>> explicit if > >>>>>>>>>>>>>>> required, e.g. > >>>>>>>>>>>>>>>>> boolean transactional() > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 7-8. > >>>>>>>>>>>>>>>>> I can make these changes either later today or tomorrow. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Small update: > >>>>>>>>>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of > >>> issues > >>>>>>> that > >>>>>>>>>>>>>>> needed > >>>>>>>>>>>>>>>>> addressing. Currently, all the tests pass, which is > >>>>> promising, > >>>>>>> but > >>>>>>>>>>> it > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> need to undergo some performance testing. I haven't > >>> (yet) > >>>>>>> worked > >>>>>>>>> on > >>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, but I would > >>> expect > >>>>> that, > >>>>>>>>>>>>>>>>> behaviourally, it should make no difference. The branch > >>> is > >>>>>>>>> available > >>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if > >>>>> anyone > >>>>>>> is > >>>>>>>>>>>>>>>>> interested in taking an early look. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna < > >>>>>>> cado...@apache.org> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was actually also my > >>> point. I > >>>>>>>>>>> understood > >>>>>>>>>>>>>>>>>> that John was proposing the ingestion path as a way to > >>> avoid > >>>>>>> the > >>>>>>>>>>>>>>> early > >>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the intent. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>> I agree with John here, that actually it is public > >>> API. My > >>>>>>>>> question > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> how this usage pattern affects normal processing. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>> My concern is that checking for the size of the > >>> transaction > >>>>>>>>> buffer > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> maybe triggering an early commit affects the whole > >>>>> processing > >>>>>>> of > >>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>> Streams. The transactionality of a state store is not > >>>>>>> confined to > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> state store itself, but spills over and changes the > >>> behavior > >>>>>>> of > >>>>>>>>>>> other > >>>>>>>>>>>>>>>>>> parts of the system. I agree with you that it is a > >>> decent > >>>>>>>>>>>>>>> compromise. I > >>>>>>>>>>>>>>>>>> just wanted to analyse the downsides and list the > >>> options to > >>>>>>>>>>> overcome > >>>>>>>>>>>>>>>>>> them. I also agree with you that all options seem quite > >>>>> heavy > >>>>>>>>>>>>>>> compared > >>>>>>>>>>>>>>>>>> with your KIP. I do not understand what you mean with > >>> "less > >>>>>>>>>>>>>>> predictable > >>>>>>>>>>>>>>>>>> for users", though. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I found the discussions about the alternatives really > >>>>>>>>> interesting. > >>>>>>>>>>>>>>> But I > >>>>>>>>>>>>>>>>>> also think that your plan sounds good and we should > >>> continue > >>>>>>> with > >>>>>>>>>>> it! > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Some comments on your reply to my e-mail on June 20th: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning behind putting > >>> isolation > >>>>>>>>> level > >>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> the state store context. Thanks! Should that also be a > >>> way > >>>>> to > >>>>>>>>> give > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> the state store the opportunity to decide whether to > >>> turn on > >>>>>>>>>>>>>>>>>> transactions or not? > >>>>>>>>>>>>>>>>>> With my comment, I was more concerned about how do you > >>> know > >>>>>>> if a > >>>>>>>>>>>>>>>>>> checkpoint file needs to be written under EOS, if you > >>> do not > >>>>>>>>> have a > >>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>> to know if the state store is transactional or not. If > >>> a > >>>>> state > >>>>>>>>>>> store > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> transactional, the checkpoint file can be written > >>> during > >>>>>>> normal > >>>>>>>>>>>>>>>>>> processing under EOS. If the state store is not > >>>>> transactional, > >>>>>>>>> the > >>>>>>>>>>>>>>>>>> checkpoint file must not be written under EOS. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 7. > >>>>>>>>>>>>>>>>>> My point was about not only considering the bytes in > >>> memory > >>>>> in > >>>>>>>>>>> config > >>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes that > >>> might > >>>>> be > >>>>>>>>>>>>>>> spilled > >>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering whether you should > >>>>> remove > >>>>>>> the > >>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory bytes to be used > >>> to > >>>>>>>>>>>>>>>>>> buffer uncommitted state-store records." My thinking > >>> was > >>>>> that > >>>>>>>>> even > >>>>>>>>>>>>>>> if a > >>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to disk, limiting > >>> the > >>>>>>>>> overall > >>>>>>>>>>>>>>> bytes > >>>>>>>>>>>>>>>>>> might make sense. Thinking about it again and > >>> considering > >>>>> the > >>>>>>>>>>> recent > >>>>>>>>>>>>>>>>>> discussions, it does not make too much sense anymore. > >>>>>>>>>>>>>>>>>> I like the name > >>> statestore.transaction.buffer.max.bytes that > >>>>>>> you > >>>>>>>>>>>>>>> proposed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 8. > >>>>>>>>>>>>>>>>>> A high-level description (without implementation > >>> details) of > >>>>>>> how > >>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>> Streams will manage the commit of changelog > >>> transactions, > >>>>>>> state > >>>>>>>>>>> store > >>>>>>>>>>>>>>>>>> transactions and checkpointing would be great. Would be > >>>>> great > >>>>>>> if > >>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> could also add some sentences about the behavior in > >>> case of > >>>>> a > >>>>>>>>>>>>>>> failure. > >>>>>>>>>>>>>>>>>> For instance how does a transactional state store > >>> recover > >>>>>>> after a > >>>>>>>>>>>>>>>>>> failure or what happens with the transaction buffer, > >>> etc. > >>>>>>> (that > >>>>>>>>> is > >>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote: > >>>>>>>>>>>>>>>>>>> Hi Bruno, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that > >>> WriteBatchWithIndex > >>>>>>>>>>>>>>> transactions > >>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to exceed) > >>> configured > >>>>>>> memory > >>>>>>>>>>>>>>> needs to > >>>>>>>>>>>>>>>>>>> trigger an early commit? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>> This is one of my big concerns. Ultimately, any > >>> approach > >>>>>>> based > >>>>>>>>> on > >>>>>>>>>>>>>>>>>> cracking > >>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in ways it's not > >>> really > >>>>>>>>>>> designed > >>>>>>>>>>>>>>>>>> for is > >>>>>>>>>>>>>>>>>>> likely to have some unforseen performance or > >>> consistency > >>>>>>> issues. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>>> What's your motivation for removing these early > >>> commits? > >>>>>>> While > >>>>>>>>> not > >>>>>>>>>>>>>>>>>> ideal, I > >>>>>>>>>>>>>>>>>>> think they're a decent compromise to ensure > >>> consistency > >>>>>>> whilst > >>>>>>>>>>>>>>>>>> maintaining > >>>>>>>>>>>>>>>>>>> good and predictable performance. > >>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem *very* > >>> complicated, and > >>>>>>> might > >>>>>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>> make behaviour less predictable for users as a > >>> consequence. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is > >>> growing a > >>>>>>> bit > >>>>>>>>>>> out > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> control. While it's good to discuss ideas for future > >>>>>>>>>>> improvements, I > >>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>> it's important to narrow the scope down to a design > >>> that > >>>>>>>>> achieves > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> most > >>>>>>>>>>>>>>>>>>> pressing objectives (constant sized restorations > >>> during > >>>>> dirty > >>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design that this KIP > >>> produces > >>>>>>> can > >>>>>>>>>>>>>>>>>> ultimately > >>>>>>>>>>>>>>>>>>> be changed in the future, especially if the bulk of > >>> it is > >>>>>>>>> internal > >>>>>>>>>>>>>>>>>>> behaviour. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I'm going to spend some time next week trying to > >>> re-work > >>>>> the > >>>>>>>>>>>>>>> original > >>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove the > >>> newTransaction() > >>>>>>>>> method, > >>>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> it's just an implementation detail of RocksDBStore. > >>> That > >>>>>>> way, if > >>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> want to > >>>>>>>>>>>>>>>>>>> replace WBWI with something in the future, like the > >>> SST > >>>>> file > >>>>>>>>>>>>>>> management > >>>>>>>>>>>>>>>>>>> outlined by John, then we can do so with little/no API > >>>>>>> changes. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > > >