Hi all, I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED, but keep wiping the state on error, and I'd vote for this solution when introducing `default.state.isolation.level`. This way, we'd have the most low-risk roll-out of this feature (no behavior change without reconfiguration), with the possibility of switching to the most sane / battle-tested default settings in 4.0. Essentially, we'd have a feature flag but call it `default.state.isolation.level` and don't have to deprecate it later.
So the possible configurations would then be this: 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ reads from DB. 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from WriteBatch/DB. Flush on error (see note below). 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ reads from DB. Wipe state on error. 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from WriteBatch/DB. I believe the feature is important enough that we will see good adoption even without changing the default. In 4.0, when we have seen this being adopted and is battle-tested, we make READ_COMMITTED the default for EOS, or even READ_COMITTED always the default, depending on our experiences. And we could add a clever implementation of READ_UNCOMITTED with WriteBatches later. The only smell here is that `default.state.isolation.level` wouldn't be purely an IQ setting, but it would also (slightly) change the behavior of the processing, but that seems unavoidable as long as we haven't solve READ_UNCOMITTED IQ with WriteBatches. Minor: As for Bruno's point 4, I think if we are concerned about this behavior (we don't necessarily have to be, because it doesn't violate ALOS guarantees as far as I can see), we could make ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing the WriteBatch on error (obviously, only if we have a chance to do that). Cheers, Lucas On Mon, Oct 16, 2023 at 12:19 PM Nick Telford <nick.telf...@gmail.com> wrote: > > Hi Guozhang, > > The KIP as it stands introduces a new configuration, > default.state.isolation.level, which is independent of processing.mode. > It's intended that this new configuration be used to configure a global IQ > isolation level in the short term, with a future KIP introducing the > capability to change the isolation level on a per-query basis, falling back > to the "default" defined by this config. That's why I called it "default", > for future-proofing. > > However, it currently includes the caveat that READ_UNCOMMITTED is not > available under EOS. I think this is the coupling you are alluding to? > > This isn't intended to be a restriction of the API, but is currently a > technical limitation. However, after discussing with some users about > use-cases that would require READ_UNCOMMITTED under EOS, I'm inclined to > remove that clause and put in the necessary work to make that combination > possible now. > > I currently see two possible approaches: > > 1. Disable TX StateStores internally when the IsolationLevel is > READ_UNCOMMITTED and the processing.mode is EOS. This is more difficult > than it sounds, as there are many assumptions being made throughout the > internals about the guarantees StateStores provide. It would definitely add > a lot of extra "if (read_uncommitted && eos)" branches, complicating > maintenance and testing. > 2. Invest the time *now* to make READ_UNCOMMITTED of EOS StateStores > possible. I have some ideas on how this could be achieved, but they would > need testing and could introduce some additional issues. The benefit of > this approach is that it would make query-time IsolationLevels much simpler > to implement in the future. > > Unfortunately, both will require considerable work that will further delay > this KIP, which was the reason I placed the restriction in the KIP in the > first place. > > Regards, > Nick > > On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <guozhang.wang...@gmail.com> > wrote: > > > Hello Nick, > > > > First of all, thanks a lot for the great effort you've put in driving > > this KIP! I really like it coming through finally, as many people in > > the community have raised this. At the same time I honestly feel a bit > > ashamed for not putting enough of my time supporting it and pushing it > > through the finish line (you raised this KIP almost a year ago). > > > > I briefly passed through the DISCUSS thread so far, not sure I've 100 > > percent digested all the bullet points. But with the goal of trying to > > help take it through the finish line in mind, I'd want to throw > > thoughts on top of my head only on the point #4 above which I felt may > > be the main hurdle for the current KIP to drive to a consensus now. > > > > The general question I asked myself is, whether we want to couple "IQ > > reading mode" with "processing mode". While technically I tend to > > agree with you that, it's feels like a bug if some single user chose > > "EOS" for processing mode while choosing "read uncommitted" for IQ > > reading mode, at the same time, I'm thinking if it's possible that > > there could be two different persons (or even two teams) that would be > > using the stream API to build the app, and the IQ API to query the > > running state of the app. I know this is less of a technical thing but > > rather a more design stuff, but if it could be ever the case, I'm > > wondering if the personale using the IQ API knows about the risks of > > using read uncommitted but still chose so for the favor of > > performance, no matter if the underlying stream processing mode > > configured by another personale is EOS or not. In that regard, I'm > > leaning towards a "leaving the door open, and close it later if we > > found it's a bad idea" aspect with a configuration that we can > > potentially deprecate than "shut the door, clean for everyone". More > > specifically, allowing the processing mode / IQ read mode to be > > decoupled, and if we found that there's no such cases as I speculated > > above or people started complaining a lot, we can still enforce > > coupling them. > > > > Again, just my 2c here. Thanks again for the great patience and > > diligence on this KIP. > > > > > > Guozhang > > > > > > > > On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <nick.telf...@gmail.com> > > wrote: > > > > > > Hi Bruno, > > > > > > 4. > > > I'll hold off on making that change until we have a consensus as to what > > > configuration to use to control all of this, as it'll be affected by the > > > decision on EOS isolation levels. > > > > > > 5. > > > Done. I've chosen "committedOffsets". > > > > > > Regards, > > > Nick > > > > > > On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org> wrote: > > > > > > > Hi Nick, > > > > > > > > 1. > > > > Yeah, you are probably right that it does not make too much sense. > > > > Thanks for the clarification! > > > > > > > > > > > > 4. > > > > Yes, sorry for the back and forth, but I think for the sake of the KIP > > > > it is better to let the ALOS behavior as it is for now due to the > > > > possible issues you would run into. Maybe we can find a solution in the > > > > future. Now the question returns to whether we really need > > > > default.state.isolation.level. Maybe the config could be the feature > > > > flag Sophie requested. > > > > > > > > > > > > 5. > > > > There is a guideline in Kafka not to use the get prefix for getters (at > > > > least in the public API). Thus, could you please rename > > > > > > > > getCommittedOffset(TopicPartition partition) -> > > > > committedOffsetFor(TopicPartition partition) > > > > > > > > You can also propose an alternative to committedOffsetFor(). > > > > > > > > > > > > Best, > > > > Bruno > > > > > > > > > > > > On 10/13/23 3:21 PM, Nick Telford wrote: > > > > > Hi Bruno, > > > > > > > > > > Thanks for getting back to me. > > > > > > > > > > 1. > > > > > I think this should be possible. Are you thinking of the situation > > where > > > > a > > > > > user may downgrade to a previous version of Kafka Streams? In that > > case, > > > > > sadly, the RocksDBStore would get wiped by the older version of Kafka > > > > > Streams anyway, because that version wouldn't understand the extra > > column > > > > > family (that holds offsets), so the missing Position file would > > > > > automatically get rebuilt when the store is rebuilt from the > > changelog. > > > > > Are there other situations than downgrade where a transactional store > > > > could > > > > > be replaced by a non-transactional one? I can't think of any. > > > > > > > > > > 2. > > > > > Ahh yes, the Test Plan - my Kryptonite! This section definitely > > needs to > > > > be > > > > > fleshed out. I'll work on that. How much detail do you need? > > > > > > > > > > 3. > > > > > See my previous email discussing this. > > > > > > > > > > 4. > > > > > Hmm, this is an interesting point. Are you suggesting that under ALOS > > > > > READ_COMMITTED should not be supported? > > > > > > > > > > Regards, > > > > > Nick > > > > > > > > > > On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <cado...@apache.org> > > wrote: > > > > > > > > > >> Hi Nick, > > > > >> > > > > >> I think the KIP is converging! > > > > >> > > > > >> > > > > >> 1. > > > > >> I am wondering whether it makes sense to write the position file > > during > > > > >> close as we do for the checkpoint file, so that in case the state > > store > > > > >> is replaced with a non-transactional state store the > > non-transactional > > > > >> state store finds the position file. I think, this is not strictly > > > > >> needed, but would be a nice behavior instead of just deleting the > > > > >> position file. > > > > >> > > > > >> > > > > >> 2. > > > > >> The test plan does not mention integration tests. Do you not need to > > > > >> extend existing ones and add new ones. Also for upgrading and > > > > >> downgrading you might need integration and/or system tests. > > > > >> > > > > >> > > > > >> 3. > > > > >> I think Sophie made a point. Although, IQ reading from uncommitted > > data > > > > >> under EOS might be considered a bug by some people. Thus, your KIP > > would > > > > >> fix a bug rather than changing the intended behavior. However, I > > also > > > > >> see that a feature flag would help users that rely on this buggy > > > > >> behavior (at least until AK 4.0). > > > > >> > > > > >> > > > > >> 4. > > > > >> This is related to the previous point. I assume that the difference > > > > >> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the > > > > >> former you enable transactions on the state store and in the latter > > you > > > > >> disable them. If my assumption is correct, I think that is an issue. > > > > >> Let's assume under ALOS Streams fails over a couple of times more or > > > > >> less at the same step in processing after value 3 is added to an > > > > >> aggregation but the offset of the corresponding input record was not > > > > >> committed. Without transactions disabled, the aggregation value > > would > > > > >> increase by 3 for each failover. With transactions enabled, value 3 > > > > >> would only be added to the aggregation once when the offset of the > > input > > > > >> record is committed and the transaction finally completes. So the > > > > >> content of the state store would change depending on the > > configuration > > > > >> for IQ. IMO, the content of the state store should be independent > > from > > > > >> IQ. Given this issue, I propose to not use transactions with ALOS at > > > > >> all. I was a big proponent of using transactions with ALOS, but I > > > > >> realized that transactions with ALOS is not as easy as enabling > > > > >> transactions on state stores. Another aspect that is problematic is > > that > > > > >> the changelog topic which actually replicates the state store is not > > > > >> transactional under ALOS. Thus, it might happen that the state > > store and > > > > >> the changelog differ in their content. All of this is maybe solvable > > > > >> somehow, but for the sake of this KIP, I would leave it for the > > future. > > > > >> > > > > >> > > > > >> Best, > > > > >> Bruno > > > > >> > > > > >> > > > > >> > > > > >> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote: > > > > >>> Hey Nick! First of all thanks for taking up this awesome feature, > > I'm > > > > >> sure > > > > >>> every single > > > > >>> Kafka Streams user and dev would agree that it is sorely needed. > > > > >>> > > > > >>> I've just been catching up on the KIP and surrounding discussion, > > so > > > > >> please > > > > >>> forgive me > > > > >>> for any misunderstandings or misinterpretations of the current > > plan and > > > > >>> don't hesitate to > > > > >>> correct me. > > > > >>> > > > > >>> Before I jump in, I just want to say that having seen this drag on > > for > > > > so > > > > >>> long, my singular > > > > >>> goal in responding is to help this KIP past a perceived impasse so > > we > > > > can > > > > >>> finally move on > > > > >>> to voting and implementing it. Long discussions are to be expected > > for > > > > >>> major features like > > > > >>> this but it's completely on us as the Streams devs to make sure > > there > > > > is > > > > >> an > > > > >>> end in sight > > > > >>> for any ongoing discussion. > > > > >>> > > > > >>> With that said, it's my understanding that the KIP as currently > > > > proposed > > > > >> is > > > > >>> just not tenable > > > > >>> for Kafka Streams, and would prevent some EOS users from upgrading > > to > > > > the > > > > >>> version it > > > > >>> first appears in. Given that we can't predict or guarantee whether > > any > > > > of > > > > >>> the followup KIPs > > > > >>> would be completed in the same release cycle as this one, we need > > to > > > > make > > > > >>> sure that the > > > > >>> feature is either compatible with all current users or else > > > > >> feature-flagged > > > > >>> so that they may > > > > >>> opt in/out. > > > > >>> > > > > >>> Therefore, IIUC we need to have either (or both) of these as > > > > >>> fully-implemented config options: > > > > >>> 1. default.state.isolation.level > > > > >>> 2. enable.transactional.state.stores > > > > >>> > > > > >>> This way EOS users for whom read_committed semantics are not > > viable can > > > > >>> still upgrade, > > > > >>> and either use the isolation.level config to leverage the new txn > > state > > > > >>> stores without sacrificing > > > > >>> their application semantics, or else simply keep the transactional > > > > state > > > > >>> stores disabled until we > > > > >>> are able to fully implement the isolation level configuration at > > either > > > > >> an > > > > >>> application or query level. > > > > >>> > > > > >>> Frankly you are the expert here and know much more about the > > tradeoffs > > > > in > > > > >>> both semantics and > > > > >>> effort level of implementing one of these configs vs the other. In > > my > > > > >>> opinion, either option would > > > > >>> be fine and I would leave the decision of which one to include in > > this > > > > >> KIP > > > > >>> completely up to you. > > > > >>> I just don't see a way for the KIP to proceed without some > > variation of > > > > >> the > > > > >>> above that would allow > > > > >>> EOS users to opt-out of read_committed. > > > > >>> > > > > >>> (If it's all the same to you, I would recommend always including a > > > > >> feature > > > > >>> flag in large structural > > > > >>> changes like this. No matter how much I trust someone or myself to > > > > >>> implement a feature, you just > > > > >>> never know what kind of bugs might slip in, especially with the > > very > > > > >> first > > > > >>> iteration that gets released. > > > > >>> So personally, my choice would be to add the feature flag and > > leave it > > > > >> off > > > > >>> by default. If all goes well > > > > >>> you can do a quick KIP to enable it by default as soon as the > > > > >>> isolation.level config has been > > > > >>> completed. But feel free to just pick whichever option is easiest > > or > > > > >>> quickest for you to implement) > > > > >>> > > > > >>> Hope this helps move the discussion forward, > > > > >>> Sophie > > > > >>> > > > > >>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford < > > nick.telf...@gmail.com> > > > > >> wrote: > > > > >>> > > > > >>>> Hi Bruno, > > > > >>>> > > > > >>>> Agreed, I can live with that for now. > > > > >>>> > > > > >>>> In an effort to keep the scope of this KIP from expanding, I'm > > leaning > > > > >>>> towards just providing a configurable > > default.state.isolation.level > > > > and > > > > >>>> removing IsolationLevel from the StateStoreContext. This would be > > > > >>>> compatible with adding support for query-time IsolationLevels in > > the > > > > >>>> future, whilst providing a way for users to select an isolation > > level > > > > >> now. > > > > >>>> > > > > >>>> The big problem with this, however, is that if a user selects > > > > >>>> processing.mode > > > > >>>> = "exactly-once(-v2|-beta)", and default.state.isolation.level = > > > > >>>> "READ_UNCOMMITTED", we need to guarantee that the data isn't > > written > > > > to > > > > >>>> disk until commit() is called, but we also need to permit IQ > > threads > > > > to > > > > >>>> read from the ongoing transaction. > > > > >>>> > > > > >>>> A simple solution would be to (temporarily) forbid this > > combination of > > > > >>>> configuration, and have default.state.isolation.level > > automatically > > > > >> switch > > > > >>>> to READ_COMMITTED when processing.mode is anything other than > > > > >>>> at-least-once. Do you think this would be acceptable? > > > > >>>> > > > > >>>> In a later KIP, we can add support for query-time isolation > > levels and > > > > >>>> solve this particular problem there, which would relax this > > > > restriction. > > > > >>>> > > > > >>>> Regards, > > > > >>>> Nick > > > > >>>> > > > > >>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <cado...@apache.org> > > > > >> wrote: > > > > >>>> > > > > >>>>> Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I > > > > think > > > > >>>>> it is perfectly valid to say InMemoryKeyValueStore do not support > > > > >>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto > > > > default > > > > >>>>> at the moment. > > > > >>>>> > > > > >>>>> Best, > > > > >>>>> Bruno > > > > >>>>> > > > > >>>>> On 9/18/23 7:12 PM, Nick Telford wrote: > > > > >>>>>> Oh! One other concern I haven't mentioned: if we make > > > > IsolationLevel a > > > > >>>>>> query-time constraint, then we need to add support for > > > > READ_COMMITTED > > > > >>>> to > > > > >>>>>> InMemoryKeyValueStore too, which will require some changes to > > the > > > > >>>>>> implementation. > > > > >>>>>> > > > > >>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford < > > nick.telf...@gmail.com > > > > > > > > > >>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Hi everyone, > > > > >>>>>>> > > > > >>>>>>> I agree that having IsolationLevel be determined at query-time > > is > > > > the > > > > >>>>>>> ideal design, but there are a few sticking points: > > > > >>>>>>> > > > > >>>>>>> 1. > > > > >>>>>>> There needs to be some way to communicate the IsolationLevel > > down > > > > to > > > > >>>> the > > > > >>>>>>> RocksDBStore itself, so that the query can respect it. Since > > stores > > > > >>>> are > > > > >>>>>>> "layered" in functionality (i.e. ChangeLoggingStore, > > MeteredStore, > > > > >>>>> etc.), > > > > >>>>>>> we need some way to deliver that information to the bottom > > layer. > > > > For > > > > >>>>> IQv2, > > > > >>>>>>> we can use the existing State#query() method, but IQv1 has no > > way > > > > to > > > > >>>> do > > > > >>>>>>> this. > > > > >>>>>>> > > > > >>>>>>> A simple approach, which would potentially open up other > > options, > > > > >>>> would > > > > >>>>> be > > > > >>>>>>> to add something like: ReadOnlyKeyValueStore<K, V> > > > > >>>>>>> readOnlyView(IsolationLevel isolationLevel) to > > > > ReadOnlyKeyValueStore > > > > >>>>> (and > > > > >>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.). > > > > >>>>>>> > > > > >>>>>>> 2. > > > > >>>>>>> As mentioned above, RocksDB WriteBatches are not thread-safe, > > which > > > > >>>>> causes > > > > >>>>>>> a problem if we want to provide READ_UNCOMMITTED Iterators. I > > also > > > > >>>> had a > > > > >>>>>>> look at RocksDB Transactions[1], but they solve a very > > different > > > > >>>>> problem, > > > > >>>>>>> and have the same thread-safety issue. > > > > >>>>>>> > > > > >>>>>>> One possible approach that I mentioned is chaining > > WriteBatches: > > > > >> every > > > > >>>>>>> time a new Interactive Query is received (i.e. readOnlyView, > > see > > > > >>>> above, > > > > >>>>>>> is called) we "freeze" the existing WriteBatch, and start a > > new one > > > > >>>> for > > > > >>>>> new > > > > >>>>>>> writes. The Interactive Query queries the "chain" of previous > > > > >>>>> WriteBatches > > > > >>>>>>> + the underlying database; while the StreamThread starts > > writing to > > > > >>>> the > > > > >>>>>>> *new* WriteBatch. On-commit, the StreamThread would write *all* > > > > >>>>>>> WriteBatches in the chain to the database (that have not yet > > been > > > > >>>>> written). > > > > >>>>>>> > > > > >>>>>>> WriteBatches would be closed/freed only when they have been > > both > > > > >>>>>>> committed, and all open Interactive Queries on them have been > > > > closed. > > > > >>>>> This > > > > >>>>>>> would require some reference counting. > > > > >>>>>>> > > > > >>>>>>> Obviously a drawback of this approach is the potential for > > > > increased > > > > >>>>>>> memory usage: if an Interactive Query is long-lived, for > > example by > > > > >>>>> doing a > > > > >>>>>>> full scan over a large database, or even just pausing in the > > middle > > > > >> of > > > > >>>>> an > > > > >>>>>>> iteration, then the existing chain of WriteBatches could be > > kept > > > > >>>> around > > > > >>>>> for > > > > >>>>>>> a long time, potentially forever. > > > > >>>>>>> > > > > >>>>>>> -- > > > > >>>>>>> > > > > >>>>>>> A. > > > > >>>>>>> Going off on a tangent, it looks like in addition to supporting > > > > >>>>>>> READ_COMMITTED queries, we could go further and support > > > > >>>> REPEATABLE_READ > > > > >>>>>>> queries (i.e. where subsequent reads to the same key in the > > same > > > > >>>>>>> Interactive Query are guaranteed to yield the same value) by > > making > > > > >>>> use > > > > >>>>> of > > > > >>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so the > > > > >> performance > > > > >>>>>>> impact is likely to be negligible, but they do require that the > > > > >>>>> Interactive > > > > >>>>>>> Query session can be explicitly closed. > > > > >>>>>>> > > > > >>>>>>> This could be achieved if we made the above readOnlyView > > interface > > > > >>>> look > > > > >>>>>>> more like: > > > > >>>>>>> > > > > >>>>>>> interface ReadOnlyKeyValueView<K, V> implements > > > > >>>> ReadOnlyKeyValueStore<K, > > > > >>>>>>> V>, AutoCloseable {} > > > > >>>>>>> > > > > >>>>>>> interface ReadOnlyKeyValueStore<K, V> { > > > > >>>>>>> ... > > > > >>>>>>> ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel > > > > >>>>> isolationLevel); > > > > >>>>>>> } > > > > >>>>>>> > > > > >>>>>>> But this would be a breaking change, as existing IQv1 queries > > are > > > > >>>>>>> guaranteed to never call store.close(), and therefore these > > would > > > > >> leak > > > > >>>>>>> memory under REPEATABLE_READ. > > > > >>>>>>> > > > > >>>>>>> B. > > > > >>>>>>> One thing that's notable: MyRocks states that they support > > > > >>>>> READ_COMMITTED > > > > >>>>>>> and REPEATABLE_READ, but they make no mention of > > > > >>>> READ_UNCOMMITTED[3][4]. > > > > >>>>>>> This could be because doing so is technically > > difficult/impossible > > > > >>>> using > > > > >>>>>>> the primitives available in RocksDB. > > > > >>>>>>> > > > > >>>>>>> -- > > > > >>>>>>> > > > > >>>>>>> Lucas, to address your points: > > > > >>>>>>> > > > > >>>>>>> U1. > > > > >>>>>>> It's only "SHOULD" to permit alternative (i.e. non-RocksDB) > > > > >>>>>>> implementations of StateStore that do not support atomic > > writes. > > > > >>>>> Obviously > > > > >>>>>>> in those cases, the guarantees Kafka Streams provides/expects > > would > > > > >> be > > > > >>>>>>> relaxed. Do you think we should require all implementations to > > > > >> support > > > > >>>>>>> atomic writes? > > > > >>>>>>> > > > > >>>>>>> U2. > > > > >>>>>>> Stores can support multiple IsolationLevels. As we've discussed > > > > >> above, > > > > >>>>> the > > > > >>>>>>> ideal scenario would be to specify the IsolationLevel at > > > > query-time. > > > > >>>>>>> Failing that, I think the second-best approach is to define the > > > > >>>>>>> IsolationLevel for *all* queries based on the processing.mode, > > > > which > > > > >>>> is > > > > >>>>>>> what the default StateStoreContext#isolationLevel() achieves. > > Would > > > > >>>> you > > > > >>>>>>> prefer an alternative? > > > > >>>>>>> > > > > >>>>>>> While the existing implementation is equivalent to > > > > READ_UNCOMMITTED, > > > > >>>>> this > > > > >>>>>>> can yield unexpected results/errors under EOS, if a > > transaction is > > > > >>>>> rolled > > > > >>>>>>> back. While this would be a change in behaviour for users, it > > would > > > > >>>> look > > > > >>>>>>> more like a bug fix than a breaking change. That said, we > > *could* > > > > >> make > > > > >>>>> it > > > > >>>>>>> configurable, and default to the existing behaviour > > > > >> (READ_UNCOMMITTED) > > > > >>>>>>> instead of inferring it from the processing.mode? > > > > >>>>>>> > > > > >>>>>>> N1, N2. > > > > >>>>>>> These were only primitives to avoid boxing costs, but since > > this is > > > > >>>> not > > > > >>>>> a > > > > >>>>>>> performance sensitive area, it should be fine to change if > > that's > > > > >>>>> desirable. > > > > >>>>>>> > > > > >>>>>>> N3. > > > > >>>>>>> It's because the store "manages its own offsets", which > > includes > > > > both > > > > >>>>>>> committing the offset, *and providing it* via > > getCommittedOffset(). > > > > >>>>>>> Personally, I think "managesOffsets" conveys this best, but I > > don't > > > > >>>> mind > > > > >>>>>>> changing it if the nomenclature is unclear. > > > > >>>>>>> > > > > >>>>>>> Sorry for the massive emails/essays! > > > > >>>>>>> -- > > > > >>>>>>> Nick > > > > >>>>>>> > > > > >>>>>>> 1: https://github.com/facebook/rocksdb/wiki/Transactions > > > > >>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot > > > > >>>>>>> 3: > > > > https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation > > > > >>>>>>> 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/ > > > > >>>>>>> > > > > >>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy > > > > >>>>>>> <lbruts...@confluent.io.invalid> wrote: > > > > >>>>>>> > > > > >>>>>>>> Hi Nick, > > > > >>>>>>>> > > > > >>>>>>>> since I last read it in April, the KIP has become much > > cleaner and > > > > >>>>>>>> easier to read. Great work! > > > > >>>>>>>> > > > > >>>>>>>> It feels to me the last big open point is whether we can > > implement > > > > >>>>>>>> isolation level as a query parameter. I understand that there > > are > > > > >>>>>>>> implementation concerns, but as Colt says, it would be a great > > > > >>>>>>>> addition, and would also simplify the migration path for this > > > > >> change. > > > > >>>>>>>> Is the implementation problem you mentioned caused by the > > > > WriteBatch > > > > >>>>>>>> not having a notion of a snapshot, as the underlying DB > > iterator > > > > >>>> does? > > > > >>>>>>>> In that case, I am not sure a chain of WriteBatches as you > > propose > > > > >>>>>>>> would fully solve the problem, but maybe I didn't dig enough > > into > > > > >> the > > > > >>>>>>>> details to fully understand it. > > > > >>>>>>>> > > > > >>>>>>>> If it's not possible to implement it now, would it be an > > option to > > > > >>>>>>>> make sure in this KIP that we do not fully close the door on > > > > >>>> per-query > > > > >>>>>>>> isolation levels in the interface, as it may be possible to > > > > >> implement > > > > >>>>>>>> the missing primitives in RocksDB or Speedb in the future. > > > > >>>>>>>> > > > > >>>>>>>> Understanding: > > > > >>>>>>>> > > > > >>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to be > > persisted > > > > >>>>>>>> atomically with the records? > > > > >>>>>>>> * U2) Don't understand the default implementation of > > > > >>>> `isolationLevel`. > > > > >>>>>>>> The isolation level should be a property of the underlying > > store, > > > > >> and > > > > >>>>>>>> not be defined by the default config? Existing stores probably > > > > don't > > > > >>>>>>>> guarantee READ_COMMITTED, so the default should be to return > > > > >>>>>>>> READ_UNCOMMITTED. > > > > >>>>>>>> > > > > >>>>>>>> Nits: > > > > >>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong` return > > type, > > > > >> to > > > > >>>>>>>> avoid the `null`? > > > > >>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an > > `OptionalLong` > > > > >>>>>>>> return type, to avoid the `-1`? > > > > >>>>>>>> * N3) I don't understand why `managesOffsets` uses the > > 'manage' > > > > >> verb, > > > > >>>>>>>> whereas all other methods use the "commits" verb. I'd suggest > > > > >>>>>>>> `commitsOffsets`. > > > > >>>>>>>> > > > > >>>>>>>> Either way, it feels this KIP is very close to the finish > > line, > > > > I'm > > > > >>>>>>>> looking forward to seeing this in production! > > > > >>>>>>>> > > > > >>>>>>>> Cheers, > > > > >>>>>>>> Lucas > > > > >>>>>>>> > > > > >>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy < > > c...@littlehorse.io > > > > > > > > > >>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Making IsolationLevel a query-time constraint, rather than > > > > linking > > > > >>>> it > > > > >>>>>>>> to > > > > >>>>>>>>> the processing.guarantee. > > > > >>>>>>>>> > > > > >>>>>>>>> As I understand it, would this allow even a user of EOS to > > > > control > > > > >>>>>>>> whether > > > > >>>>>>>>> reading committed or uncommitted records? If so, I am highly > > in > > > > >>>> favor > > > > >>>>> of > > > > >>>>>>>>> this. > > > > >>>>>>>>> > > > > >>>>>>>>> I know that I was one of the early people to point out the > > > > current > > > > >>>>>>>>> shortcoming that IQ reads uncommitted records, but just this > > > > >>>> morning I > > > > >>>>>>>>> realized a pattern we use which means that (for certain > > queries) > > > > >> our > > > > >>>>>>>> system > > > > >>>>>>>>> needs to be able to read uncommitted records, which is the > > > > current > > > > >>>>>>>> behavior > > > > >>>>>>>>> of Kafka Streams in EOS.*** > > > > >>>>>>>>> > > > > >>>>>>>>> If IsolationLevel being a query-time decision allows for > > this, > > > > then > > > > >>>>> that > > > > >>>>>>>>> would be amazing. I would also vote that the default behavior > > > > >> should > > > > >>>>> be > > > > >>>>>>>> for > > > > >>>>>>>>> reading uncommitted records, because it is totally possible > > for a > > > > >>>>> valid > > > > >>>>>>>>> application to depend on that behavior, and breaking it in a > > > > minor > > > > >>>>>>>> release > > > > >>>>>>>>> might be a bit strong. > > > > >>>>>>>>> > > > > >>>>>>>>> *** (Note, for the curious reader....) Our use-case/query > > pattern > > > > >>>> is a > > > > >>>>>>>> bit > > > > >>>>>>>>> complex, but reading "uncommitted" records is actually safe > > in > > > > our > > > > >>>>> case > > > > >>>>>>>>> because processing is deterministic. Additionally, IQ being > > able > > > > to > > > > >>>>> read > > > > >>>>>>>>> uncommitted records is crucial to enable "read your own > > writes" > > > > on > > > > >>>> our > > > > >>>>>>>> API: > > > > >>>>>>>>> Due to the deterministic processing, we send an "ack" to the > > > > client > > > > >>>>> who > > > > >>>>>>>>> makes the request as soon as the processor processes the > > result. > > > > If > > > > >>>>> they > > > > >>>>>>>>> can't read uncommitted records, they may receive a "201 - > > > > Created" > > > > >>>>>>>>> response, immediately followed by a "404 - Not Found" when > > doing > > > > a > > > > >>>>>>>> lookup > > > > >>>>>>>>> for the object they just created). > > > > >>>>>>>>> > > > > >>>>>>>>> Thanks, > > > > >>>>>>>>> Colt McNealy > > > > >>>>>>>>> > > > > >>>>>>>>> *Founder, LittleHorse.dev* > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford < > > > > >>>> nick.telf...@gmail.com> > > > > >>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Addendum: > > > > >>>>>>>>>> > > > > >>>>>>>>>> I think we would also face the same problem with the > > approach > > > > John > > > > >>>>>>>> outlined > > > > >>>>>>>>>> earlier (using the record cache as a transaction buffer and > > > > >>>> flushing > > > > >>>>>>>> it > > > > >>>>>>>>>> straight to SST files). This is because the record cache > > (the > > > > >>>>>>>> ThreadCache > > > > >>>>>>>>>> class) is not thread-safe, so every commit would invalidate > > open > > > > >> IQ > > > > >>>>>>>>>> Iterators in the same way that RocksDB WriteBatches do. > > > > >>>>>>>>>> -- > > > > >>>>>>>>>> Nick > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford < > > > > >>>> nick.telf...@gmail.com> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Hi Bruno, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> I've updated the KIP based on our conversation. The only > > things > > > > >>>>>>>> I've not > > > > >>>>>>>>>>> yet done are: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 1. Using transactions under ALOS and EOS. > > > > >>>>>>>>>>> 2. Making IsolationLevel a query-time constraint, rather > > than > > > > >>>>>>>> linking it > > > > >>>>>>>>>>> to the processing.guarantee. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> There's a wrinkle that makes this a challenge: Interactive > > > > >> Queries > > > > >>>>>>>> that > > > > >>>>>>>>>>> open an Iterator, when using transactions and > > READ_UNCOMMITTED. > > > > >>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries need > > to be > > > > >>>> able > > > > >>>>>>>> to > > > > >>>>>>>>>>> read records from the currently uncommitted transaction > > buffer > > > > >>>>>>>>>>> (WriteBatch). This includes for Iterators, which should > > iterate > > > > >>>>>>>> both the > > > > >>>>>>>>>>> transaction buffer and underlying database (using > > > > >>>>>>>>>>> WriteBatch#iteratorWithBase()). > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> The issue is that when the StreamThread commits, it writes > > the > > > > >>>>>>>> current > > > > >>>>>>>>>>> WriteBatch to RocksDB *and then clears the WriteBatch*. > > > > Clearing > > > > >>>> the > > > > >>>>>>>>>>> WriteBatch while an Interactive Query holds an open > > Iterator on > > > > >> it > > > > >>>>>>>> will > > > > >>>>>>>>>>> invalidate the Iterator. Worse, it turns out that Iterators > > > > over > > > > >> a > > > > >>>>>>>>>>> WriteBatch become invalidated not just when the WriteBatch > > is > > > > >>>>>>>> cleared, > > > > >>>>>>>>>> but > > > > >>>>>>>>>>> also when the Iterators' current key receives a new write. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Now that I'm writing this, I remember that this is the > > major > > > > >>>> reason > > > > >>>>>>>> that > > > > >>>>>>>>>> I > > > > >>>>>>>>>>> switched the original design from having a query-time > > > > >>>>>>>> IsolationLevel to > > > > >>>>>>>>>>> having the IsolationLevel linked to the transactionality > > of the > > > > >>>>>>>> stores > > > > >>>>>>>>>>> themselves. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> It *might* be possible to resolve this, by having a > > "chain" of > > > > >>>>>>>>>>> WriteBatches, with the StreamThread switching to a new > > > > WriteBatch > > > > >>>>>>>>>> whenever > > > > >>>>>>>>>>> a new Interactive Query attempts to read from the > > database, but > > > > >>>> that > > > > >>>>>>>>>> could > > > > >>>>>>>>>>> cause some performance problems/memory pressure when > > subjected > > > > to > > > > >>>> a > > > > >>>>>>>> high > > > > >>>>>>>>>>> Interactive Query load. It would also reduce the > > efficiency of > > > > >>>>>>>>>> WriteBatches > > > > >>>>>>>>>>> on-commit, as we'd have to write N WriteBatches, where N > > is the > > > > >>>>>>>> number of > > > > >>>>>>>>>>> Interactive Queries since the last commit. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> I realise this is getting into the weeds of the > > implementation, > > > > >>>> and > > > > >>>>>>>> you'd > > > > >>>>>>>>>>> rather we focus on the API for now, but I think it's > > important > > > > to > > > > >>>>>>>>>> consider > > > > >>>>>>>>>>> how to implement the desired API, in case we come up with > > an > > > > API > > > > >>>>>>>> that > > > > >>>>>>>>>>> cannot be implemented efficiently, or even at all! > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thoughts? > > > > >>>>>>>>>>> -- > > > > >>>>>>>>>>> Nick > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna < > > > > cado...@apache.org > > > > >>> > > > > >>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Hi Nick, > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 6. > > > > >>>>>>>>>>>> Of course, you are right! My bad! > > > > >>>>>>>>>>>> Wiping out the state in the downgrading case is fine. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 3a. > > > > >>>>>>>>>>>> Focus on the public facing changes for the KIP. We will > > manage > > > > >> to > > > > >>>>>>>> get > > > > >>>>>>>>>>>> the internals right. Regarding state stores that do not > > > > support > > > > >>>>>>>>>>>> READ_COMMITTED, they should throw an error stating that > > they > > > > do > > > > >>>> not > > > > >>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state stores > > > > >>>>>>>> immediately. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 3b. > > > > >>>>>>>>>>>> I am in favor of using transactions also for ALOS. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Best, > > > > >>>>>>>>>>>> Bruno > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote: > > > > >>>>>>>>>>>>> Hi Bruno, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks for getting back to me! > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 2. > > > > >>>>>>>>>>>>> The fact that implementations can always track estimated > > > > memory > > > > >>>>>>>> usage > > > > >>>>>>>>>> in > > > > >>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as an > > option, > > > > and > > > > >>>>>>>> I'll > > > > >>>>>>>>>>>> clarify > > > > >>>>>>>>>>>>> the JavaDoc that 0 is not just for non-transactional > > stores, > > > > >>>>>>>> which is > > > > >>>>>>>>>>>>> currently misleading. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 6. > > > > >>>>>>>>>>>>> The problem with catching the exception in the downgrade > > > > >> process > > > > >>>>>>>> is > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>>> would require new code in the Kafka version being > > downgraded > > > > >> to. > > > > >>>>>>>> Since > > > > >>>>>>>>>>>>> users could conceivably downgrade to almost *any* older > > > > version > > > > >>>>>>>> of > > > > >>>>>>>>>> Kafka > > > > >>>>>>>>>>>>> Streams, I'm not sure how we could add that code? > > > > >>>>>>>>>>>>> The only way I can think of doing it would be to provide > > a > > > > >>>>>>>> dedicated > > > > >>>>>>>>>>>>> downgrade tool, that goes through every local store and > > > > removes > > > > >>>>>>>> the > > > > >>>>>>>>>>>>> offsets column families. But that seems like an > > unnecessary > > > > >>>>>>>> amount of > > > > >>>>>>>>>>>> extra > > > > >>>>>>>>>>>>> code to maintain just to handle a somewhat niche > > situation, > > > > >> when > > > > >>>>>>>> the > > > > >>>>>>>>>>>>> alternative (automatically wipe and restore stores) > > should be > > > > >>>>>>>>>>>> acceptable. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've requested. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 3a. > > > > >>>>>>>>>>>>> I agree that IsolationLevel makes more sense at > > query-time, > > > > and > > > > >>>> I > > > > >>>>>>>>>>>> actually > > > > >>>>>>>>>>>>> initially attempted to place the IsolationLevel at > > > > query-time, > > > > >>>>>>>> but I > > > > >>>>>>>>>> ran > > > > >>>>>>>>>>>>> into some problems: > > > > >>>>>>>>>>>>> - The key issue is that, under ALOS we're not staging > > writes > > > > in > > > > >>>>>>>>>>>>> transactions, so can't perform writes at the > > READ_COMMITTED > > > > >>>>>>>> isolation > > > > >>>>>>>>>>>>> level. However, this may be addressed if we decide to > > > > *always* > > > > >>>>>>>> use > > > > >>>>>>>>>>>>> transactions as discussed under 3b. > > > > >>>>>>>>>>>>> - IQv1 and IQv2 have quite different implementations. I > > > > >> remember > > > > >>>>>>>>>> having > > > > >>>>>>>>>>>>> some difficulty understanding the IQv1 internals, which > > made > > > > it > > > > >>>>>>>>>>>> difficult > > > > >>>>>>>>>>>>> to determine what needed to be changed. However, I > > *think* > > > > this > > > > >>>>>>>> can be > > > > >>>>>>>>>>>>> addressed for both implementations by wrapping the > > > > RocksDBStore > > > > >>>>>>>> in an > > > > >>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides read > > methods > > > > >>>>>>>> (get, > > > > >>>>>>>>>>>> etc.) > > > > >>>>>>>>>>>>> to either read directly from the database or from the > > ongoing > > > > >>>>>>>>>>>> transaction. > > > > >>>>>>>>>>>>> But IQv1 might still be difficult. > > > > >>>>>>>>>>>>> - If IsolationLevel becomes a query constraint, then all > > > > other > > > > >>>>>>>>>>>> StateStores > > > > >>>>>>>>>>>>> will need to respect it, including the in-memory stores. > > This > > > > >>>>>>>> would > > > > >>>>>>>>>>>> require > > > > >>>>>>>>>>>>> us to adapt in-memory stores to stage their writes so > > they > > > > can > > > > >>>> be > > > > >>>>>>>>>>>> isolated > > > > >>>>>>>>>>>>> from READ_COMMITTTED queries. It would also become an > > > > important > > > > >>>>>>>>>>>>> consideration for third-party stores on upgrade, as > > without > > > > >>>>>>>> changes, > > > > >>>>>>>>>>>> they > > > > >>>>>>>>>>>>> would not support READ_COMMITTED queries correctly. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Ultimately, I may need some help making the necessary > > change > > > > to > > > > >>>>>>>> IQv1 > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>>> support this, but I don't think it's fundamentally > > > > impossible, > > > > >>>>>>>> if we > > > > >>>>>>>>>>>> want > > > > >>>>>>>>>>>>> to pursue this route. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 3b. > > > > >>>>>>>>>>>>> The main reason I chose to keep ALOS un-transactional > > was to > > > > >>>>>>>> minimize > > > > >>>>>>>>>>>>> behavioural change for most users (I believe most Streams > > > > users > > > > >>>>>>>> use > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> default configuration, which is ALOS). That said, it's > > clear > > > > >>>>>>>> that if > > > > >>>>>>>>>>>> ALOS > > > > >>>>>>>>>>>>> also used transactional stores, the only change in > > behaviour > > > > >>>>>>>> would be > > > > >>>>>>>>>>>> that > > > > >>>>>>>>>>>>> it would become *more correct*, which could be > > considered a > > > > >> "bug > > > > >>>>>>>> fix" > > > > >>>>>>>>>> by > > > > >>>>>>>>>>>>> users, rather than a change they need to handle. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I believe that performance using transactions (aka. > > RocksDB > > > > >>>>>>>>>>>> WriteBatches) > > > > >>>>>>>>>>>>> should actually be *better* than the un-batched > > write-path > > > > that > > > > >>>>>>>> is > > > > >>>>>>>>>>>>> currently used[1]. The only "performance" consideration > > will > > > > be > > > > >>>>>>>> the > > > > >>>>>>>>>>>>> increased memory usage that transactions require. Given > > the > > > > >>>>>>>>>> mitigations > > > > >>>>>>>>>>>> for > > > > >>>>>>>>>>>>> this memory that we have in place, I would expect that > > this > > > > is > > > > >>>>>>>> not a > > > > >>>>>>>>>>>>> problem for most users. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> If we're happy to do so, we can make ALOS also use > > > > >> transactions. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Link 1: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >> > > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna < > > > > >>>> cado...@apache.org > > > > >>>>>>>>> > > > > >>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi Nick, > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks for the updates and sorry for the delay on my > > side! > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 1. > > > > >>>>>>>>>>>>>> Making the default implementation for flush() a no-op > > sounds > > > > >>>>>>>> good to > > > > >>>>>>>>>>>> me. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 2. > > > > >>>>>>>>>>>>>> I think what was bugging me here is that a third-party > > state > > > > >>>>>>>> store > > > > >>>>>>>>>>>> needs > > > > >>>>>>>>>>>>>> to implement the state store interface. That means they > > need > > > > >> to > > > > >>>>>>>>>>>>>> implement a wrapper around the actual state store as we > > do > > > > for > > > > >>>>>>>>>> RocksDB > > > > >>>>>>>>>>>>>> with RocksDBStore. So, a third-party state store can > > always > > > > >>>>>>>> estimate > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> uncommitted bytes, if it wants, because the wrapper can > > > > record > > > > >>>>>>>> the > > > > >>>>>>>>>>>> added > > > > >>>>>>>>>>>>>> bytes. > > > > >>>>>>>>>>>>>> One case I can think of where returning -1 makes sense > > is > > > > when > > > > >>>>>>>>>> Streams > > > > >>>>>>>>>>>>>> does not need to estimate the size of the write batch > > and > > > > >>>>>>>> trigger > > > > >>>>>>>>>>>>>> extraordinary commits, because the third-party state > > store > > > > >>>>>>>> takes care > > > > >>>>>>>>>>>> of > > > > >>>>>>>>>>>>>> memory. But in that case the method could also just > > return > > > > 0. > > > > >>>>>>>> Even > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>>>> case would be better solved with a method that returns > > > > whether > > > > >>>>>>>> the > > > > >>>>>>>>>>>> state > > > > >>>>>>>>>>>>>> store manages itself the memory used for uncommitted > > bytes > > > > or > > > > >>>>>>>> not. > > > > >>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return value, > > I was > > > > >>>>>>>> just > > > > >>>>>>>>>>>>>> wondering when and if it will be used. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Regarding returning 0 for transactional state stores > > when > > > > the > > > > >>>>>>>> batch > > > > >>>>>>>>>> is > > > > >>>>>>>>>>>>>> empty, I was just wondering because you explicitly > > stated > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> "or {@code 0} if this StateStore does not support > > > > >>>> transactions." > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> So it seemed to me returning 0 could only happen for > > > > >>>>>>>>>> non-transactional > > > > >>>>>>>>>>>>>> state stores. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> a) What do you think if we move the isolation level to > > IQ > > > > (v1 > > > > >>>>>>>> and > > > > >>>>>>>>>> v2)? > > > > >>>>>>>>>>>>>> In the end this is the only component that really needs > > to > > > > >>>>>>>> specify > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>> isolation level. It is similar to the Kafka consumer > > that > > > > can > > > > >>>>>>>> choose > > > > >>>>>>>>>>>>>> with what isolation level to read the input topic. > > > > >>>>>>>>>>>>>> For IQv1 the isolation level should go into > > > > >>>>>>>> StoreQueryParameters. For > > > > >>>>>>>>>>>>>> IQv2, I would add it to the Query interface. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> b) Point a) raises the question what should happen > > during > > > > >>>>>>>>>> at-least-once > > > > >>>>>>>>>>>>>> processing when the state store does not use > > transactions? > > > > >> John > > > > >>>>>>>> in > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>> past proposed to also use transactions on state stores > > for > > > > >>>>>>>>>>>>>> at-least-once. I like that idea, because it avoids > > > > aggregating > > > > >>>>>>>> the > > > > >>>>>>>>>> same > > > > >>>>>>>>>>>>>> records over and over again in the case of a failure. We > > > > had a > > > > >>>>>>>> case > > > > >>>>>>>>>> in > > > > >>>>>>>>>>>>>> the past where a Streams applications in at-least-once > > mode > > > > >> was > > > > >>>>>>>>>> failing > > > > >>>>>>>>>>>>>> continuously for some reasons I do not remember before > > > > >>>>>>>> committing the > > > > >>>>>>>>>>>>>> offsets. After each failover, the app aggregated again > > and > > > > >>>>>>>> again the > > > > >>>>>>>>>>>>>> same records. Of course the aggregate increased to very > > > > wrong > > > > >>>>>>>> values > > > > >>>>>>>>>>>>>> just because of the failover. With transactions on the > > state > > > > >>>>>>>> stores > > > > >>>>>>>>>> we > > > > >>>>>>>>>>>>>> could have avoided this. The app would have output the > > same > > > > >>>>>>>> aggregate > > > > >>>>>>>>>>>>>> multiple times (i.e., after each failover) but at least > > the > > > > >>>>>>>> value of > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> aggregate would not depend on the number of failovers. > > > > >>>>>>>> Outputting the > > > > >>>>>>>>>>>>>> same aggregate multiple times would be incorrect under > > > > >>>>>>>> exactly-once > > > > >>>>>>>>>> but > > > > >>>>>>>>>>>>>> it is OK for at-least-once. > > > > >>>>>>>>>>>>>> If it makes sense to add a config to turn on and off > > > > >>>>>>>> transactions on > > > > >>>>>>>>>>>>>> state stores under at-least-once or just use > > transactions in > > > > >>>>>>>> any case > > > > >>>>>>>>>>>> is > > > > >>>>>>>>>>>>>> a question we should also discuss in this KIP. It > > depends a > > > > >> bit > > > > >>>>>>>> on > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would add a > > > > config. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 4. > > > > >>>>>>>>>>>>>> Your points are all valid. I tend to say to keep the > > metrics > > > > >>>>>>>> around > > > > >>>>>>>>>>>>>> flush() until we remove flush() completely from the > > > > interface. > > > > >>>>>>>> Calls > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>>>> flush() might still exist since existing processors > > might > > > > >> still > > > > >>>>>>>> call > > > > >>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For sure, we > > need > > > > >> to > > > > >>>>>>>>>>>> document > > > > >>>>>>>>>>>>>> how the metrics change due to the transactions in the > > > > upgrade > > > > >>>>>>>> notes. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 5. > > > > >>>>>>>>>>>>>> I see. Then you should describe how the .position files > > are > > > > >>>>>>>> handled > > > > >>>>>>>>>> in > > > > >>>>>>>>>>>>>> a dedicated section of the KIP or incorporate the > > > > description > > > > >>>>>>>> in the > > > > >>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only > > mentioning it > > > > >> in > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration Plan". > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 6. > > > > >>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP is a > > good > > > > >> idea. > > > > >>>>>>>>>>>>>> Regarding downgrading, I think you could also catch the > > > > >>>>>>>> exception and > > > > >>>>>>>>>>>> do > > > > >>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the column > > family. > > > > See > > > > >>>>>>>> here > > > > >>>>>>>>>> for > > > > >>>>>>>>>>>>>> an example: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75 > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> It is a bit brittle, but it works. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>>> Bruno > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote: > > > > >>>>>>>>>>>>>>> Hi Bruno, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thanks for taking the time to review the KIP. I'm back > > from > > > > >>>>>>>> leave > > > > >>>>>>>>>> now > > > > >>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>> intend to move this forwards as quickly as I can. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Addressing your points: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 1. > > > > >>>>>>>>>>>>>>> Because flush() is part of the StateStore API, it's > > exposed > > > > >> to > > > > >>>>>>>>>> custom > > > > >>>>>>>>>>>>>>> Processors, which might be making calls to flush(). > > This > > > > was > > > > >>>>>>>>>> actually > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>> case in a few integration tests. > > > > >>>>>>>>>>>>>>> To maintain as much compatibility as possible, I'd > > prefer > > > > not > > > > >>>>>>>> to > > > > >>>>>>>>>> make > > > > >>>>>>>>>>>>>> this > > > > >>>>>>>>>>>>>>> an UnsupportedOperationException, as it will cause > > > > previously > > > > >>>>>>>>>> working > > > > >>>>>>>>>>>>>>> Processors to start throwing exceptions at runtime. > > > > >>>>>>>>>>>>>>> I agree that it doesn't make sense for it to proxy > > > > commit(), > > > > >>>>>>>> though, > > > > >>>>>>>>>>>> as > > > > >>>>>>>>>>>>>>> that would cause it to violate the "StateStores commit > > only > > > > >>>>>>>> when the > > > > >>>>>>>>>>>> Task > > > > >>>>>>>>>>>>>>> commits" rule. > > > > >>>>>>>>>>>>>>> Instead, I think we should make this a no-op. That way, > > > > >>>>>>>> existing > > > > >>>>>>>>>> user > > > > >>>>>>>>>>>>>>> Processors will continue to work as-before, without > > > > violation > > > > >>>>>>>> of > > > > >>>>>>>>>> store > > > > >>>>>>>>>>>>>>> consistency that would be caused by premature > > flush/commit > > > > of > > > > >>>>>>>>>>>> StateStore > > > > >>>>>>>>>>>>>>> data to disk. > > > > >>>>>>>>>>>>>>> What do you think? > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 2. > > > > >>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore > > implementation > > > > is > > > > >>>>>>>>>>>>>>> transactional, but is unable to estimate the > > uncommitted > > > > >>>> memory > > > > >>>>>>>>>> usage, > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>> method will return -1. > > > > >>>>>>>>>>>>>>> The intention here is to permit third-party > > implementations > > > > >>>>>>>> that may > > > > >>>>>>>>>>>> not > > > > >>>>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>> able to estimate memory usage. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been written to the > > > > store > > > > >>>>>>>> yet. I > > > > >>>>>>>>>>>>>> thought > > > > >>>>>>>>>>>>>>> that was implied by "This method will return an > > > > approximation > > > > >>>>>>>> of the > > > > >>>>>>>>>>>>>> memory > > > > >>>>>>>>>>>>>>> would be freed by the next call to {@link > > #commit(Map)}" > > > > and > > > > >>>>>>>>>> "@return > > > > >>>>>>>>>>>> The > > > > >>>>>>>>>>>>>>> approximate size of all records awaiting {@link > > > > >>>> #commit(Map)}", > > > > >>>>>>>>>>>> however, > > > > >>>>>>>>>>>>>> I > > > > >>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you think this > > is > > > > >>>>>>>> unclear? > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>>> I realise this is probably the most contentious point > > in my > > > > >>>>>>>> design, > > > > >>>>>>>>>>>> and > > > > >>>>>>>>>>>>>> I'm > > > > >>>>>>>>>>>>>>> open to changing it if I'm unable to convince you of > > the > > > > >>>>>>>> benefits. > > > > >>>>>>>>>>>>>>> Nevertheless, here's my argument: > > > > >>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly provided > > > > >>>>>>>> StateStores > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>>>>> query, and it may be important for users to > > > > programmatically > > > > >>>>>>>> know > > > > >>>>>>>>>>>> which > > > > >>>>>>>>>>>>>>> mode the StateStore is operating under. If we simply > > > > provide > > > > >>>> an > > > > >>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the internal > > > > streams > > > > >>>>>>>>>>>> engine), or > > > > >>>>>>>>>>>>>>> similar, then users will need to understand the > > operation > > > > and > > > > >>>>>>>>>>>>>> consequences > > > > >>>>>>>>>>>>>>> of each available processing mode and how it pertains > > to > > > > >> their > > > > >>>>>>>>>>>>>> StateStore. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Interactive Query users aren't the only people that > > care > > > > >> about > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore: > > > > implementers > > > > >>>> of > > > > >>>>>>>>>> custom > > > > >>>>>>>>>>>>>>> StateStores also need to understand the behaviour > > expected > > > > of > > > > >>>>>>>> their > > > > >>>>>>>>>>>>>>> implementation. KIP-892 introduces some assumptions > > into > > > > the > > > > >>>>>>>> Streams > > > > >>>>>>>>>>>>>> Engine > > > > >>>>>>>>>>>>>>> about how StateStores operate under each processing > > mode, > > > > and > > > > >>>>>>>> it's > > > > >>>>>>>>>>>>>>> important that custom implementations adhere to those > > > > >>>>>>>> assumptions in > > > > >>>>>>>>>>>>>> order > > > > >>>>>>>>>>>>>>> to maintain the consistency guarantees. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> IsolationLevels provide a high-level contract on the > > > > >> behaviour > > > > >>>>>>>> of > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>> StateStore: a user knows that under READ_COMMITTED, > > they > > > > will > > > > >>>>>>>> see > > > > >>>>>>>>>>>> writes > > > > >>>>>>>>>>>>>>> only after the Task has committed, and under > > > > READ_UNCOMMITTED > > > > >>>>>>>> they > > > > >>>>>>>>>>>> will > > > > >>>>>>>>>>>>>> see > > > > >>>>>>>>>>>>>>> writes immediately. No understanding of the details of > > each > > > > >>>>>>>>>>>>>> processing.mode > > > > >>>>>>>>>>>>>>> is required, either for IQ users or StateStore > > > > implementers. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> An argument can be made that these contractual > > guarantees > > > > can > > > > >>>>>>>> simply > > > > >>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>> documented for the processing.mode (i.e. that > > exactly-once > > > > >> and > > > > >>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and > > > > at-least-once > > > > >>>>>>>> behaves > > > > >>>>>>>>>>>> like > > > > >>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small issues > > with > > > > >>>>>>>> this I'd > > > > >>>>>>>>>>>>>> prefer > > > > >>>>>>>>>>>>>>> to avoid: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> - Where would we document these contracts, in > > a way > > > > >> that > > > > >>>>>>>> is > > > > >>>>>>>>>>>> difficult > > > > >>>>>>>>>>>>>>> for users/implementers to miss/ignore? > > > > >>>>>>>>>>>>>>> - It's not clear to users that the processing > > mode > > > > is > > > > >>>>>>>>>>>> communicating > > > > >>>>>>>>>>>>>>> an expectation of read isolation, unless they > > read > > > > the > > > > >>>>>>>>>>>>>> documentation. Users > > > > >>>>>>>>>>>>>>> rarely consult documentation unless they feel > > they > > > > >> need > > > > >>>>>>>> to, so > > > > >>>>>>>>>>>> it's > > > > >>>>>>>>>>>>>> likely > > > > >>>>>>>>>>>>>>> this detail would get missed by many users. > > > > >>>>>>>>>>>>>>> - It tightly couples processing modes to read > > > > >> isolation. > > > > >>>>>>>> Adding > > > > >>>>>>>>>>>> new > > > > >>>>>>>>>>>>>>> processing modes, or changing the read > > isolation of > > > > >>>>>>>> existing > > > > >>>>>>>>>>>>>> processing > > > > >>>>>>>>>>>>>>> modes would be difficult/impossible. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Ultimately, the cost of introducing IsolationLevels is > > > > just a > > > > >>>>>>>> single > > > > >>>>>>>>>>>>>>> method, since we re-use the existing IsolationLevel > > enum > > > > from > > > > >>>>>>>> Kafka. > > > > >>>>>>>>>>>> This > > > > >>>>>>>>>>>>>>> gives us a clear place to document the contractual > > > > guarantees > > > > >>>>>>>>>> expected > > > > >>>>>>>>>>>>>>> of/provided by StateStores, that is accessible both by > > the > > > > >>>>>>>>>> StateStore > > > > >>>>>>>>>>>>>>> itself, and by IQ users. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> (Writing this I've just realised that the StateStore > > and IQ > > > > >>>>>>>> APIs > > > > >>>>>>>>>>>> actually > > > > >>>>>>>>>>>>>>> don't provide access to StateStoreContext that IQ users > > > > would > > > > >>>>>>>> have > > > > >>>>>>>>>>>> direct > > > > >>>>>>>>>>>>>>> access to... Perhaps StateStore should expose > > > > >> isolationLevel() > > > > >>>>>>>>>> itself > > > > >>>>>>>>>>>>>> too?) > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 4. > > > > >>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics in-place > > > > >>>>>>>> either, as > > > > >>>>>>>>>>>> it's a > > > > >>>>>>>>>>>>>>> backwards incompatible change. My concern is that, if > > we > > > > >> leave > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>> existing > > > > >>>>>>>>>>>>>>> "flush" metrics in place, they will be confusing to > > users. > > > > >>>>>>>> Right > > > > >>>>>>>>>> now, > > > > >>>>>>>>>>>>>>> "flush" metrics record explicit flushes to disk, but > > under > > > > >>>>>>>> KIP-892, > > > > >>>>>>>>>>>> even > > > > >>>>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>> commit() will not explicitly flush data to disk - > > RocksDB > > > > >> will > > > > >>>>>>>>>> decide > > > > >>>>>>>>>>>> on > > > > >>>>>>>>>>>>>>> when to flush memtables to disk itself. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd have two > > > > >>>> options, > > > > >>>>>>>>>> which > > > > >>>>>>>>>>>>>> both > > > > >>>>>>>>>>>>>>> seem pretty bad to me: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 1. Have them record calls to commit(), which > > would > > > > be > > > > >>>>>>>>>>>> misleading, as > > > > >>>>>>>>>>>>>>> data is no longer explicitly "flushed" to disk > > by > > > > this > > > > >>>>>>>> call. > > > > >>>>>>>>>>>>>>> 2. Have them record nothing at all, which is > > > > >> equivalent > > > > >>>> to > > > > >>>>>>>>>>>> removing > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>> metrics, except that users will see the metric > > > > still > > > > >>>>>>>> exists and > > > > >>>>>>>>>>>> so > > > > >>>>>>>>>>>>>> assume > > > > >>>>>>>>>>>>>>> that the metric is correct, and that there's a > > > > problem > > > > >>>>>>>> with > > > > >>>>>>>>>> their > > > > >>>>>>>>>>>>>> system > > > > >>>>>>>>>>>>>>> when there isn't. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> I agree that removing them is also a bad solution, and > > I'd > > > > >>>>>>>> like some > > > > >>>>>>>>>>>>>>> guidance on the best path forward here. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 5. > > > > >>>>>>>>>>>>>>> Position files are updated on every write to a > > StateStore. > > > > >>>>>>>> Since our > > > > >>>>>>>>>>>>>> writes > > > > >>>>>>>>>>>>>>> are now buffered until commit(), we can't update the > > > > Position > > > > >>>>>>>> file > > > > >>>>>>>>>>>> until > > > > >>>>>>>>>>>>>>> commit() has been called, otherwise it would be > > > > inconsistent > > > > >>>>>>>> with > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>> data > > > > >>>>>>>>>>>>>>> in the event of a rollback. Consequently, we need to > > manage > > > > >>>>>>>> these > > > > >>>>>>>>>>>> offsets > > > > >>>>>>>>>>>>>>> the same way we manage the checkpoint offsets, and > > ensure > > > > >>>>>>>> they're > > > > >>>>>>>>>> only > > > > >>>>>>>>>>>>>>> written on commit(). > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 6. > > > > >>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what tests to > > > > >> write. > > > > >>>>>>>> How > > > > >>>>>>>>>>>>>> explicit > > > > >>>>>>>>>>>>>>> do we need to be here in the KIP? > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed to be > > > > seamless, > > > > >>>>>>>> and we > > > > >>>>>>>>>>>>>> should > > > > >>>>>>>>>>>>>>> definitely add some tests around that. Downgrade, it > > > > >>>>>>>> transpires, > > > > >>>>>>>>>> isn't > > > > >>>>>>>>>>>>>>> currently possible, as the extra column family for > > offset > > > > >>>>>>>> storage is > > > > >>>>>>>>>>>>>>> incompatible with the pre-KIP-892 implementation: when > > you > > > > >>>>>>>> open a > > > > >>>>>>>>>>>> RocksDB > > > > >>>>>>>>>>>>>>> database, you must open all available column families > > or > > > > >>>>>>>> receive an > > > > >>>>>>>>>>>>>> error. > > > > >>>>>>>>>>>>>>> What currently happens on downgrade is that it > > attempts to > > > > >>>>>>>> open the > > > > >>>>>>>>>>>>>> store, > > > > >>>>>>>>>>>>>>> throws an error about the offsets column family not > > being > > > > >>>>>>>> opened, > > > > >>>>>>>>>>>> which > > > > >>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given that > > > > >> downgrades > > > > >>>>>>>>>> should > > > > >>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>> uncommon, I think this is acceptable behaviour, as the > > > > >>>>>>>> end-state is > > > > >>>>>>>>>>>>>>> consistent, even if it results in an undesirable state > > > > >>>> restore. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Should I document the upgrade/downgrade behaviour > > > > explicitly > > > > >>>>>>>> in the > > > > >>>>>>>>>>>> KIP? > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> -- > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna < > > > > >>>>>>>> cado...@apache.org> > > > > >>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi Nick! > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Thanks for the updates! > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> 1. > > > > >>>>>>>>>>>>>>>> Why does StateStore#flush() default to > > > > >>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())? > > > > >>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore after > > this > > > > KIP > > > > >>>>>>>> is > > > > >>>>>>>>>>>>>>>> released, I would rather throw an unsupported > > operation > > > > >>>>>>>> exception > > > > >>>>>>>>>> by > > > > >>>>>>>>>>>>>>>> default. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> 2. > > > > >>>>>>>>>>>>>>>> When would a state store return -1 from > > > > >>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() while > > being > > > > >>>>>>>>>>>> transactional? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Wouldn't StateStore#approximateNumUncommittedBytes() > > also > > > > >>>>>>>> return 0 > > > > >>>>>>>>>> if > > > > >>>>>>>>>>>>>>>> the state store is transactional but nothing has been > > > > >> written > > > > >>>>>>>> to > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> state store yet? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this KIP really > > > > need > > > > >>>> to > > > > >>>>>>>>>>>> introduce > > > > >>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()? StateStoreContext > > has > > > > >>>>>>>> already > > > > >>>>>>>>>>>>>>>> appConfigs() which basically exposes the same > > information, > > > > >>>>>>>> i.e., if > > > > >>>>>>>>>>>> EOS > > > > >>>>>>>>>>>>>>>> is enabled or not. > > > > >>>>>>>>>>>>>>>> In one of your previous e-mails you wrote: > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore interface > > as > > > > >>>>>>>> loosely > > > > >>>>>>>>>>>> coupled > > > > >>>>>>>>>>>>>>>> from the Streams engine as possible, to give > > implementers > > > > >>>> more > > > > >>>>>>>>>>>> freedom, > > > > >>>>>>>>>>>>>>>> and reduce the amount of internal knowledge required." > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> While I understand the intent, I doubt that it > > decreases > > > > the > > > > >>>>>>>>>>>> coupling of > > > > >>>>>>>>>>>>>>>> a StateStore interface and the Streams engine. > > > > >> READ_COMMITTED > > > > >>>>>>>> only > > > > >>>>>>>>>>>>>>>> applies to IQ but not to reads by processors. Thus, > > > > >>>>>>>> implementers > > > > >>>>>>>>>>>> need to > > > > >>>>>>>>>>>>>>>> understand how Streams accesses the state stores. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I would like to hear what others think about this. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> 4. > > > > >>>>>>>>>>>>>>>> Great exposing new metrics for transactional state > > stores! > > > > >>>>>>>>>> However, I > > > > >>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate (in the > > > > docs) > > > > >>>>>>>> the old > > > > >>>>>>>>>>>>>>>> ones. You can find examples of deprecated metrics > > here: > > > > >>>>>>>>>>>>>>>> > > > > https://kafka.apache.org/documentation/#selector_monitoring > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> 5. > > > > >>>>>>>>>>>>>>>> Why does the KIP mention position files? I do not > > think > > > > they > > > > >>>>>>>> are > > > > >>>>>>>>>>>> related > > > > >>>>>>>>>>>>>>>> to transactions or flushes. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> 6. > > > > >>>>>>>>>>>>>>>> I think we will also need to adapt/add integration > > tests > > > > >>>>>>>> besides > > > > >>>>>>>>>> unit > > > > >>>>>>>>>>>>>>>> tests. Additionally, we probably need integration or > > > > system > > > > >>>>>>>> tests > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>>>>>> verify that upgrades and downgrades between > > transactional > > > > >> and > > > > >>>>>>>>>>>>>>>> non-transactional state stores work as expected. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>>>>> Bruno > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote: > > > > >>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in the KIP, > > > > under > > > > >>>>>>>>>>>> "Rejected > > > > >>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea worth > > pursuing, > > > > >>>>>>>> but I > > > > >>>>>>>>>>>> believe > > > > >>>>>>>>>>>>>>>>> that it's out of the scope of this KIP, because it > > > > solves a > > > > >>>>>>>>>>>> different > > > > >>>>>>>>>>>>>> set > > > > >>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of this one > > has > > > > >>>>>>>> already > > > > >>>>>>>>>> grown > > > > >>>>>>>>>>>>>>>> quite > > > > >>>>>>>>>>>>>>>>> large! > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford < > > > > >>>>>>>>>> nick.telf...@gmail.com> > > > > >>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Hi everyone, > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I've updated the KIP ( > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > > > >>>>>>>>>>>>>>>> ) > > > > >>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing back > > "Atomic > > > > >>>>>>>>>>>> Checkpointing" > > > > >>>>>>>>>>>>>>>> (for > > > > >>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think the one > > thing > > > > >>>>>>>> missing is > > > > >>>>>>>>>>>> some > > > > >>>>>>>>>>>>>>>>>> changes to metrics (notably the store "flush" > > metrics > > > > will > > > > >>>>>>>> need > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>>>>> renamed to "commit"). > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> The reason I brought back Atomic Checkpointing was > > to > > > > >>>>>>>> decouple > > > > >>>>>>>>>>>> store > > > > >>>>>>>>>>>>>>>> flush > > > > >>>>>>>>>>>>>>>>>> from store commit. This is important, because with > > > > >>>>>>>> Transactional > > > > >>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on *every* > > Task > > > > >>>>>>>> commit, > > > > >>>>>>>>>>>> and > > > > >>>>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>> just when the StateStore is closing, otherwise our > > > > >>>>>>>> transaction > > > > >>>>>>>>>>>> buffer > > > > >>>>>>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>>>>> never be written and persisted, instead growing > > > > unbounded! > > > > >>>> I > > > > >>>>>>>>>>>>>>>> experimented > > > > >>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a store > > flush > > > > >>>>>>>> whenever > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its > > configured > > > > >>>>>>>> size, but > > > > >>>>>>>>>>>> this > > > > >>>>>>>>>>>>>>>> was > > > > >>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer from > > being > > > > >>>>>>>>>> configured > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>>>>> unbounded, and it still would have required explicit > > > > >>>>>>>> flushes of > > > > >>>>>>>>>>>>>> RocksDB, > > > > >>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory > > utilization. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the "right" way > > to > > > > >>>>>>>> resolve > > > > >>>>>>>>>> this > > > > >>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog offsets that > > > > >>>>>>>> correspond > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> most > > > > >>>>>>>>>>>>>>>>>> recently written records are always atomically > > written > > > > to > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>> StateStore > > > > >>>>>>>>>>>>>>>>>> (by writing them to the same transaction buffer), > > we can > > > > >>>>>>>> avoid > > > > >>>>>>>>>>>>>> forcibly > > > > >>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk, letting > > RocksDB > > > > >>>>>>>> flush > > > > >>>>>>>>>> them > > > > >>>>>>>>>>>>>> only > > > > >>>>>>>>>>>>>>>>>> when necessary, without losing any of our > > consistency > > > > >>>>>>>> guarantees. > > > > >>>>>>>>>>>> See > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> updated KIP for more info. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I have fully implemented these changes, although I'm > > > > still > > > > >>>>>>>> not > > > > >>>>>>>>>>>>>> entirely > > > > >>>>>>>>>>>>>>>>>> happy with the implementation for segmented > > StateStores, > > > > >> so > > > > >>>>>>>> I > > > > >>>>>>>>>> plan > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests pass. If > > you'd > > > > like > > > > >>>>>>>> to try > > > > >>>>>>>>>>>> out > > > > >>>>>>>>>>>>>> or > > > > >>>>>>>>>>>>>>>>>> review this highly experimental and incomplete > > branch, > > > > >> it's > > > > >>>>>>>>>>>> available > > > > >>>>>>>>>>>>>>>> here: > > > > >>>>>>>>>>>>>>>>>> > > https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0 > > > > . > > > > >>>>>>>> Note: > > > > >>>>>>>>>>>> it's > > > > >>>>>>>>>>>>>>>> built > > > > >>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable base to > > build > > > > >>>>>>>> and test > > > > >>>>>>>>>>>> it > > > > >>>>>>>>>>>>>> on, > > > > >>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples comparisons in a > > > > live > > > > >>>>>>>>>>>>>> environment. I > > > > >>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's nearer > > > > >> completion > > > > >>>>>>>> and > > > > >>>>>>>>>> has > > > > >>>>>>>>>>>>>> been > > > > >>>>>>>>>>>>>>>>>> proven on our main application. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing and > > testing: > > > > >>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window) stores > > > > >>>>>>>>>>>>>>>>>> - Global stores > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> As I do not currently use either of these, so my > > primary > > > > >>>>>>>> test > > > > >>>>>>>>>>>>>>>> environment > > > > >>>>>>>>>>>>>>>>>> doesn't test these areas. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next week for > > a few > > > > >>>>>>>> weeks, > > > > >>>>>>>>>> so > > > > >>>>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>>>>> not have time to move this forward until late > > August. > > > > That > > > > >>>>>>>> said, > > > > >>>>>>>>>>>> your > > > > >>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just won't be > > > > able > > > > >>>> to > > > > >>>>>>>>>>>> respond > > > > >>>>>>>>>>>>>> as > > > > >>>>>>>>>>>>>>>>>> quickly as usual. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford < > > > > >>>>>>>>>> nick.telf...@gmail.com> > > > > >>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Hi Bruno > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact on IQ is > > not > > > > >>>>>>>> something > > > > >>>>>>>>>> I > > > > >>>>>>>>>>>> had > > > > >>>>>>>>>>>>>>>>>>> considered. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> What about atomically updating the state store > > from the > > > > >>>>>>>>>>>> transaction > > > > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the > > > > checkpoint > > > > >>>>>>>> (thus, > > > > >>>>>>>>>>>>>>>> flushing > > > > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data > > and/or > > > > >>>>>>>> number of > > > > >>>>>>>>>>>>>> commit > > > > >>>>>>>>>>>>>>>>>>>> intervals? > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you suggesting > > that we > > > > >>>>>>>> add an > > > > >>>>>>>>>>>>>>>> additional > > > > >>>>>>>>>>>>>>>>>>> config for the max number of commit intervals > > between > > > > >>>>>>>>>> checkpoints? > > > > >>>>>>>>>>>>>> That > > > > >>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the > > transaction > > > > >>>>>>>> buffers > > > > >>>>>>>>>> are > > > > >>>>>>>>>>>>>>>> nearly > > > > >>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of commit > > > > intervals > > > > >>>>>>>> have > > > > >>>>>>>>>>>>>> elapsed, > > > > >>>>>>>>>>>>>>>>>>> whichever comes first? > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although this > > > > re-ignites > > > > >>>>>>>> an > > > > >>>>>>>>>>>> earlier > > > > >>>>>>>>>>>>>>>>>>> debate about whether a config should be measured in > > > > >>>>>>>> "number of > > > > >>>>>>>>>>>> commit > > > > >>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute time. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the reason I > > was > > > > >>>>>>>> pursuing > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> Atomic > > > > >>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable flush from > > > > >>>>>>>> checkpointing, > > > > >>>>>>>>>>>>>> which > > > > >>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every commit > > without > > > > any > > > > >>>>>>>>>>>> performance > > > > >>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely the > > "best" > > > > >>>>>>>> solution, > > > > >>>>>>>>>>>> but > > > > >>>>>>>>>>>>>>>> I'm not > > > > >>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back into this > > KIP. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> I'm currently working on moving all the > > transactional > > > > >>>> logic > > > > >>>>>>>>>>>> directly > > > > >>>>>>>>>>>>>>>> into > > > > >>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with the > > > > >>>>>>>>>>>>>> StateStore#newTransaction > > > > >>>>>>>>>>>>>>>>>>> method, and reduces the number of new classes > > > > introduced, > > > > >>>>>>>>>>>>>> significantly > > > > >>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and the > > > > complexity > > > > >>>> is > > > > >>>>>>>>>>>>>> drastically > > > > >>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic Checkpoints > > > > into > > > > >>>>>>>> this > > > > >>>>>>>>>> KIP. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna < > > > > >>>>>>>> cado...@apache.org> > > > > >>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Hi Nick, > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting! > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> As far as I understand, you want to atomically > > update > > > > >> the > > > > >>>>>>>> state > > > > >>>>>>>>>>>>>> store > > > > >>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the memtable > > of a > > > > >>>> state > > > > >>>>>>>>>> store > > > > >>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit time > > elapsed > > > > >>>> but > > > > >>>>>>>>>> after > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that would lead > > to > > > > >>>>>>>> exceeding > > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes before the > > > > next > > > > >>>>>>>> commit > > > > >>>>>>>>>>>>>>>> interval > > > > >>>>>>>>>>>>>>>>>>>> ends. > > > > >>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would commit > > every > > > > >>>>>>>> commit > > > > >>>>>>>>>>>> interval > > > > >>>>>>>>>>>>>>>> but > > > > >>>>>>>>>>>>>>>>>>>> the state store will only be atomically updated > > > > roughly > > > > >>>>>>>> every > > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of data. > > Also > > > > IQ > > > > >>>>>>>> would > > > > >>>>>>>>>>>> then > > > > >>>>>>>>>>>>>>>> only > > > > >>>>>>>>>>>>>>>>>>>> see new data roughly every > > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes. > > > > >>>>>>>>>>>>>>>>>>>> After a failure the state store needs to restore > > up to > > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Is this correct? > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> What about atomically updating the state store > > from > > > > the > > > > >>>>>>>>>>>> transaction > > > > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the > > > > checkpoint > > > > >>>>>>>> (thus, > > > > >>>>>>>>>>>>>>>> flushing > > > > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data > > and/or > > > > >>>>>>>> number of > > > > >>>>>>>>>>>>>> commit > > > > >>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have the same > > delay > > > > >>>> for > > > > >>>>>>>>>>>> records > > > > >>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because both > > would > > > > >>>>>>>> appear > > > > >>>>>>>>>> when > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However, after a > > > > failure > > > > >>>>>>>> the > > > > >>>>>>>>>>>> state > > > > >>>>>>>>>>>>>>>> store > > > > >>>>>>>>>>>>>>>>>>>> still needs to restore up to > > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes > > > > >>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>> it might restore data that is already in the state > > > > store > > > > >>>>>>>>>> because > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable offset > > (i.e. > > > > the > > > > >>>>>>>> last > > > > >>>>>>>>>>>>>> committed > > > > >>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring data > > that > > > > is > > > > >>>>>>>> already > > > > >>>>>>>>>>>> in > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should not > > violated. > > > > >>>>>>>>>>>>>>>>>>>> This solution needs at least one new config to > > specify > > > > >>>>>>>> when a > > > > >>>>>>>>>>>>>>>> checkpoint > > > > >>>>>>>>>>>>>>>>>>>> should be written. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail that > > does > > > > not > > > > >>>>>>>> change > > > > >>>>>>>>>>>>>>>> anything > > > > >>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit interval > > is 30 > > > > >>>>>>>> seconds, > > > > >>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>> five > > > > >>>>>>>>>>>>>>>>>>>> seconds. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>>>>>>>>> Bruno > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote: > > > > >>>>>>>>>>>>>>>>>>>>> Hi everyone, > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch on our > > > > staging > > > > >>>>>>>>>>>>>> environment, > > > > >>>>>>>>>>>>>>>>>>>>> putting it through its paces in our non-trivial > > > > >>>>>>>> application. > > > > >>>>>>>>>> I'm > > > > >>>>>>>>>>>>>>>>>>>> already > > > > >>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate that we > > saw > > > > the > > > > >>>>>>>> last > > > > >>>>>>>>>>>> time > > > > >>>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP, but this > > > > time, > > > > >> I > > > > >>>>>>>>>> think I > > > > >>>>>>>>>>>>>> know > > > > >>>>>>>>>>>>>>>>>>>> why. > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is > > called > > > > at > > > > >>>>>>>> the end > > > > >>>>>>>>>>>> of > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> Task > > > > >>>>>>>>>>>>>>>>>>>>> commit process, has the following behaviour: > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> - Under ALOS: checkpoint the state > > stores. > > > > >> This > > > > >>>>>>>>>> includes > > > > >>>>>>>>>>>>>>>>>>>>> flushing memtables in RocksDB. This is > > > > >>>> acceptable > > > > >>>>>>>>>>>> because the > > > > >>>>>>>>>>>>>>>>>>>> default > > > > >>>>>>>>>>>>>>>>>>>>> commit.interval.ms is 5 seconds, so > > > > forcibly > > > > >>>>>>>> flushing > > > > >>>>>>>>>>>>>> memtables > > > > >>>>>>>>>>>>>>>>>>>> every 5 > > > > >>>>>>>>>>>>>>>>>>>>> seconds is acceptable for most > > > > applications. > > > > >>>>>>>>>>>>>>>>>>>>> - Under EOS: checkpointing is not done, > > > > >> *unless* > > > > >>>>>>>> it's > > > > >>>>>>>>>>>> being > > > > >>>>>>>>>>>>>>>>>>>> forced, due > > > > >>>>>>>>>>>>>>>>>>>>> to e.g. the Task closing or being > > revoked. > > > > >> This > > > > >>>>>>>> means > > > > >>>>>>>>>>>> that > > > > >>>>>>>>>>>>>> under > > > > >>>>>>>>>>>>>>>>>>>> normal > > > > >>>>>>>>>>>>>>>>>>>>> processing conditions, the state stores > > > > will > > > > >> not > > > > >>>>>>>> be > > > > >>>>>>>>>>>>>>>> checkpointed, > > > > >>>>>>>>>>>>>>>>>>>> and will > > > > >>>>>>>>>>>>>>>>>>>>> not have memtables flushed at all , > > unless > > > > >>>> RocksDB > > > > >>>>>>>>>>>> decides to > > > > >>>>>>>>>>>>>>>>>>>> flush them on > > > > >>>>>>>>>>>>>>>>>>>>> its own. Checkpointing stores and > > > > >> force-flushing > > > > >>>>>>>> their > > > > >>>>>>>>>>>>>> memtables > > > > >>>>>>>>>>>>>>>>>>>> is only > > > > >>>>>>>>>>>>>>>>>>>>> done when a Task is being closed. > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on > > at > > > > >>>> least > > > > >>>>>>>>>> *some* > > > > >>>>>>>>>>>>>>>> normal > > > > >>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the RocksDB > > > > transaction > > > > >>>>>>>>>> buffers > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets are > > synced to > > > > >>>>>>>> disk to > > > > >>>>>>>>>>>>>> prevent > > > > >>>>>>>>>>>>>>>>>>>>> restores from getting out of hand. Consequently, > > my > > > > >>>>>>>> current > > > > >>>>>>>>>>>>>>>>>>>> implementation > > > > >>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task commit, > > which > > > > is > > > > >>>>>>>> far too > > > > >>>>>>>>>>>>>>>>>>>> frequent. > > > > >>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000 records, > > which > > > > is > > > > >> a > > > > >>>>>>>>>> change > > > > >>>>>>>>>>>> in > > > > >>>>>>>>>>>>>>>>>>>> flush > > > > >>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing performance > > problems > > > > for > > > > >>>>>>>> some > > > > >>>>>>>>>>>>>>>>>>>> applications. > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and I'm > > > > currently > > > > >>>>>>>> leaning > > > > >>>>>>>>>>>>>>>> towards > > > > >>>>>>>>>>>>>>>>>>>>> using the statestore.transaction.buffer.max.bytes > > > > >>>>>>>>>> configuration > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to exceed it. > > > > This > > > > >>>>>>>> would > > > > >>>>>>>>>>>>>>>>>>>> complement the > > > > >>>>>>>>>>>>>>>>>>>>> existing "early Task commit" functionality that > > this > > > > >>>>>>>>>>>> configuration > > > > >>>>>>>>>>>>>>>>>>>>> provides, in the following way: > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> - Currently, we use > > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes > > > > >>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>> force an > > > > >>>>>>>>>>>>>>>>>>>>> early Task commit if processing more > > > > records > > > > >>>> would > > > > >>>>>>>>>> cause > > > > >>>>>>>>>>>> our > > > > >>>>>>>>>>>>>>>> state > > > > >>>>>>>>>>>>>>>>>>>> store > > > > >>>>>>>>>>>>>>>>>>>>> transactions to exceed the memory > > assigned > > > > to > > > > >>>>>>>> them. > > > > >>>>>>>>>>>>>>>>>>>>> - New functionality: when a Task *does* > > > > >> commit, > > > > >>>>>>>> we will > > > > >>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>>>> checkpoint > > > > >>>>>>>>>>>>>>>>>>>>> the stores (and hence flush the > > transaction > > > > >>>>>>>> buffers) > > > > >>>>>>>>>>>> unless > > > > >>>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>>>>>>> expect to > > > > >>>>>>>>>>>>>>>>>>>>> cross the > > > > >>>> statestore.transaction.buffer.max.bytes > > > > >>>>>>>>>>>> threshold > > > > >>>>>>>>>>>>>>>> before > > > > >>>>>>>>>>>>>>>>>>>> the next > > > > >>>>>>>>>>>>>>>>>>>>> commit > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions. > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford < > > > > >>>>>>>>>>>> nick.telf...@gmail.com > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> Hi Bruno! > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I meant in > > terms of > > > > >>>>>>>>>>>> understanding > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>> performance profile under various > > circumstances. The > > > > >>>>>>>> more > > > > >>>>>>>>>>>> complex > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would be for > > users > > > > to > > > > >>>>>>>>>>>> understand > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>> performance they see. For example, spilling > > records > > > > to > > > > >>>>>>>> disk > > > > >>>>>>>>>>>> when > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold would, I > > > > >> expect, > > > > >>>>>>>>>> reduce > > > > >>>>>>>>>>>>>> write > > > > >>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write throughput > > could > > > > >> be > > > > >>>>>>>>>>>>>> unexpected, > > > > >>>>>>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>>>> potentially difficult to diagnose/understand for > > > > >> users. > > > > >>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early commit" > > concept is > > > > >>>>>>>>>> relatively > > > > >>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document, and > > > > >>>> conceptually > > > > >>>>>>>>>> fairly > > > > >>>>>>>>>>>>>>>>>>>> obvious to > > > > >>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to make it > > > > >> easier > > > > >>>>>>>> to > > > > >>>>>>>>>>>>>>>> understand > > > > >>>>>>>>>>>>>>>>>>>> when > > > > >>>>>>>>>>>>>>>>>>>>>> it happens though. > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> 3. (the second one) > > > > >>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an indirect > > way > > > > of > > > > >>>>>>>>>> telling > > > > >>>>>>>>>>>>>>>>>>>> StateStores > > > > >>>>>>>>>>>>>>>>>>>>>> whether they should be transactional. > > READ_COMMITTED > > > > >>>>>>>>>>>> essentially > > > > >>>>>>>>>>>>>>>>>>>> requires > > > > >>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that two > > threads > > > > >>>>>>>> calling > > > > >>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes from > > the > > > > >> other > > > > >>>>>>>>>>>>>> transaction > > > > >>>>>>>>>>>>>>>>>>>> until > > > > >>>>>>>>>>>>>>>>>>>>>> they have been committed. With > > READ_UNCOMMITTED, all > > > > >>>>>>>> bets are > > > > >>>>>>>>>>>> off, > > > > >>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe written > > records > > > > at > > > > >>>>>>>> any > > > > >>>>>>>>>>>> time, > > > > >>>>>>>>>>>>>>>>>>>> which is > > > > >>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That said, > > > > StateStores > > > > >>>>>>>> are > > > > >>>>>>>>>> free > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>> implement > > > > >>>>>>>>>>>>>>>>>>>>>> these guarantees however they can, which is a > > bit > > > > more > > > > >>>>>>>>>> relaxed > > > > >>>>>>>>>>>>>> than > > > > >>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions". For > > example, > > > > >>>> with > > > > >>>>>>>>>>>> RocksDB > > > > >>>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>>>>>>> would > > > > >>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED == WBWI-based > > > > >>>>>>>>>> "transactions", > > > > >>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the > > database. > > > > But > > > > >>>>>>>> with > > > > >>>>>>>>>>>> other > > > > >>>>>>>>>>>>>>>>>>>> storage > > > > >>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to *always* use > > > > >>>>>>>> transactions, > > > > >>>>>>>>>>>> even > > > > >>>>>>>>>>>>>>>>>>>> when > > > > >>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage engines > > that > > > > >> don't > > > > >>>>>>>>>> provide > > > > >>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation guarantees can > > be > > > > met > > > > >>>>>>>> using a > > > > >>>>>>>>>>>>>>>>>>>> different > > > > >>>>>>>>>>>>>>>>>>>>>> technique. > > > > >>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the StateStore > > interface > > > > as > > > > >>>>>>>>>> loosely > > > > >>>>>>>>>>>>>>>> coupled > > > > >>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give > > > > >>>>>>>> implementers > > > > >>>>>>>>>> more > > > > >>>>>>>>>>>>>>>>>>>> freedom, and > > > > >>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge > > required. > > > > >>>>>>>>>>>>>>>>>>>>>> That said, I understand that "IsolationLevel" > > might > > > > >> not > > > > >>>>>>>> be > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>> right > > > > >>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it much more > > > > >>>>>>>> explicit if > > > > >>>>>>>>>>>>>>>>>>>> required, e.g. > > > > >>>>>>>>>>>>>>>>>>>>>> boolean transactional() > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> 7-8. > > > > >>>>>>>>>>>>>>>>>>>>>> I can make these changes either later today or > > > > >>>> tomorrow. > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> Small update: > > > > >>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and fixed a > > bunch of > > > > >>>>>>>> issues > > > > >>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>>>> needed > > > > >>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests pass, > > which is > > > > >>>>>>>>>> promising, > > > > >>>>>>>>>>>> but > > > > >>>>>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>>>>>>>>> need to undergo some performance testing. I > > haven't > > > > >>>>>>>> (yet) > > > > >>>>>>>>>>>> worked > > > > >>>>>>>>>>>>>> on > > > > >>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, but I > > would > > > > >>>>>>>> expect > > > > >>>>>>>>>> that, > > > > >>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no difference. The > > > > >> branch > > > > >>>>>>>> is > > > > >>>>>>>>>>>>>> available > > > > >>>>>>>>>>>>>>>>>>>> at > > > > >>>>>>>>>>>>>>>>>>>>>> > > https://github.com/nicktelford/kafka/tree/KIP-892-c > > > > >> if > > > > >>>>>>>>>> anyone > > > > >>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>>>>>>> interested in taking an early look. > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna < > > > > >>>>>>>>>>>> cado...@apache.org> > > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Nick, > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> 1. > > > > >>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was actually also > > my > > > > >>>>>>>> point. I > > > > >>>>>>>>>>>>>>>> understood > > > > >>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion path as > > a way > > > > >> to > > > > >>>>>>>> avoid > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> early > > > > >>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the intent. > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> 2. > > > > >>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually it is > > public > > > > >>>>>>>> API. My > > > > >>>>>>>>>>>>>> question > > > > >>>>>>>>>>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal > > processing. > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the size of the > > > > >>>>>>>> transaction > > > > >>>>>>>>>>>>>> buffer > > > > >>>>>>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit affects the > > whole > > > > >>>>>>>>>> processing > > > > >>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>>>>>> Kafka > > > > >>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a state store > > is > > > > not > > > > >>>>>>>>>>>> confined to > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and > > changes the > > > > >>>>>>>> behavior > > > > >>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>> other > > > > >>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you that it > > is a > > > > >>>>>>>> decent > > > > >>>>>>>>>>>>>>>>>>>> compromise. I > > > > >>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides and list > > the > > > > >>>>>>>> options to > > > > >>>>>>>>>>>>>>>> overcome > > > > >>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all options > > seem > > > > >>>> quite > > > > >>>>>>>>>> heavy > > > > >>>>>>>>>>>>>>>>>>>> compared > > > > >>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand what you > > mean > > > > with > > > > >>>>>>>> "less > > > > >>>>>>>>>>>>>>>>>>>> predictable > > > > >>>>>>>>>>>>>>>>>>>>>>> for users", though. > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the alternatives > > > > really > > > > >>>>>>>>>>>>>> interesting. > > > > >>>>>>>>>>>>>>>>>>>> But I > > > > >>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good and we > > should > > > > >>>>>>>> continue > > > > >>>>>>>>>>>> with > > > > >>>>>>>>>>>>>>>> it! > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my e-mail on > > June > > > > >> 20th: > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning behind > > putting > > > > >>>>>>>> isolation > > > > >>>>>>>>>>>>>> level > > > > >>>>>>>>>>>>>>>> in > > > > >>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should that > > also > > > > be > > > > >> a > > > > >>>>>>>> way > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>>>> give > > > > >>>>>>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to decide > > whether > > > > to > > > > >>>>>>>> turn on > > > > >>>>>>>>>>>>>>>>>>>>>>> transactions or not? > > > > >>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned about > > how do > > > > >> you > > > > >>>>>>>> know > > > > >>>>>>>>>>>> if a > > > > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written under EOS, > > if > > > > you > > > > >>>>>>>> do not > > > > >>>>>>>>>>>>>> have a > > > > >>>>>>>>>>>>>>>>>>>> way > > > > >>>>>>>>>>>>>>>>>>>>>>> to know if the state store is transactional or > > not. > > > > >> If > > > > >>>>>>>> a > > > > >>>>>>>>>> state > > > > >>>>>>>>>>>>>>>> store > > > > >>>>>>>>>>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can be > > written > > > > >>>>>>>> during > > > > >>>>>>>>>>>> normal > > > > >>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state store is not > > > > >>>>>>>>>> transactional, > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written under EOS. > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> 7. > > > > >>>>>>>>>>>>>>>>>>>>>>> My point was about not only considering the > > bytes > > > > in > > > > >>>>>>>> memory > > > > >>>>>>>>>> in > > > > >>>>>>>>>>>>>>>> config > > > > >>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but also > > bytes > > > > that > > > > >>>>>>>> might > > > > >>>>>>>>>> be > > > > >>>>>>>>>>>>>>>>>>>> spilled > > > > >>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering whether you > > > > >> should > > > > >>>>>>>>>> remove > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory bytes to > > be > > > > >> used > > > > >>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store records." My > > > > thinking > > > > >>>>>>>> was > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>>>> even > > > > >>>>>>>>>>>>>>>>>>>> if a > > > > >>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to disk, > > > > >> limiting > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>> overall > > > > >>>>>>>>>>>>>>>>>>>> bytes > > > > >>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it again and > > > > >>>>>>>> considering > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> recent > > > > >>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much sense > > > > anymore. > > > > >>>>>>>>>>>>>>>>>>>>>>> I like the name > > > > >>>>>>>> statestore.transaction.buffer.max.bytes that > > > > >>>>>>>>>>>> you > > > > >>>>>>>>>>>>>>>>>>>> proposed. > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> 8. > > > > >>>>>>>>>>>>>>>>>>>>>>> A high-level description (without > > implementation > > > > >>>>>>>> details) of > > > > >>>>>>>>>>>> how > > > > >>>>>>>>>>>>>>>>>>>> Kafka > > > > >>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of changelog > > > > >>>>>>>> transactions, > > > > >>>>>>>>>>>> state > > > > >>>>>>>>>>>>>>>> store > > > > >>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be great. > > > > Would > > > > >>>> be > > > > >>>>>>>>>> great > > > > >>>>>>>>>>>> if > > > > >>>>>>>>>>>>>>>> you > > > > >>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the > > behavior in > > > > >>>>>>>> case of > > > > >>>>>>>>>> a > > > > >>>>>>>>>>>>>>>>>>>> failure. > > > > >>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional state > > store > > > > >>>>>>>> recover > > > > >>>>>>>>>>>> after a > > > > >>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the transaction > > > > buffer, > > > > >>>>>>>> etc. > > > > >>>>>>>>>>>> (that > > > > >>>>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>>>>> what > > > > >>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.) > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>>>>>>>>>>>> Bruno > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote: > > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno, > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> 1. > > > > >>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that > > > > >>>>>>>> WriteBatchWithIndex > > > > >>>>>>>>>>>>>>>>>>>> transactions > > > > >>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to exceed) > > > > >>>>>>>> configured > > > > >>>>>>>>>>>> memory > > > > >>>>>>>>>>>>>>>>>>>> needs to > > > > >>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit? > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> 2. > > > > >>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns. Ultimately, > > any > > > > >>>>>>>> approach > > > > >>>>>>>>>>>> based > > > > >>>>>>>>>>>>>> on > > > > >>>>>>>>>>>>>>>>>>>>>>> cracking > > > > >>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in ways > > it's > > > > not > > > > >>>>>>>> really > > > > >>>>>>>>>>>>>>>> designed > > > > >>>>>>>>>>>>>>>>>>>>>>> for is > > > > >>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen performance or > > > > >>>>>>>> consistency > > > > >>>>>>>>>>>> issues. > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> 3. > > > > >>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing these > > early > > > > >>>>>>>> commits? > > > > >>>>>>>>>>>> While > > > > >>>>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>>>>>>> ideal, I > > > > >>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to ensure > > > > >>>>>>>> consistency > > > > >>>>>>>>>>>> whilst > > > > >>>>>>>>>>>>>>>>>>>>>>> maintaining > > > > >>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance. > > > > >>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem *very* > > > > >>>>>>>> complicated, and > > > > >>>>>>>>>>>> might > > > > >>>>>>>>>>>>>>>>>>>>>>> actually > > > > >>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for users as a > > > > >>>>>>>> consequence. > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of this > > KIP is > > > > >>>>>>>> growing a > > > > >>>>>>>>>>>> bit > > > > >>>>>>>>>>>>>>>> out > > > > >>>>>>>>>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss ideas for > > > > future > > > > >>>>>>>>>>>>>>>> improvements, I > > > > >>>>>>>>>>>>>>>>>>>>>>> think > > > > >>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope down to a > > > > design > > > > >>>>>>>> that > > > > >>>>>>>>>>>>>> achieves > > > > >>>>>>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>>>> most > > > > >>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized > > restorations > > > > >>>>>>>> during > > > > >>>>>>>>>> dirty > > > > >>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design that > > this KIP > > > > >>>>>>>> produces > > > > >>>>>>>>>>>> can > > > > >>>>>>>>>>>>>>>>>>>>>>> ultimately > > > > >>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially if the > > bulk > > > > of > > > > >>>>>>>> it is > > > > >>>>>>>>>>>>>> internal > > > > >>>>>>>>>>>>>>>>>>>>>>>> behaviour. > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week trying > > to > > > > >>>>>>>> re-work > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> original > > > > >>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove the > > > > >>>>>>>> newTransaction() > > > > >>>>>>>>>>>>>> method, > > > > >>>>>>>>>>>>>>>>>>>> such > > > > >>>>>>>>>>>>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of > > > > RocksDBStore. > > > > >>>>>>>> That > > > > >>>>>>>>>>>> way, if > > > > >>>>>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>>>>>>>>>> want to > > > > >>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the future, > > like > > > > the > > > > >>>>>>>> SST > > > > >>>>>>>>>> file > > > > >>>>>>>>>>>>>>>>>>>> management > > > > >>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so with > > little/no > > > > >>>> API > > > > >>>>>>>>>>>> changes. > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> Nick > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > >