Guozhang—I agree, I am in favor of moving forward with the KIP now that the Transactional State Stores will be behind a feature flag.
Nick—I just did a bit more light testing of your branch `KIP-892-3.5.0` with your most recent changes. I couldn't detect a performance difference versus trunk (in the past there was a slight degradation of performance on the restoration path, but that has been fixed). I don't believe that your branch has the state updater thread enabled, so I didn't test that path too heavily. As expected, however, our internal correctness tests failed due to the IQ read-your-own-writes issue we discussed previously. The community as a whole would vastly benefit from this KIP getting over the finish line in 3.7.0, and so long as it is behind a feature flag so that we at LittleHorse can still guarantee RYOW for our users, I think it's purely a win for the community. Until we can figure out how to get read_committed, we will just be smart with standby's + rebalances etc (: Thanks Nick! This improvement is long overdue for the streams community. Colt McNealy *Founder, LittleHorse.dev* On Sun, Oct 29, 2023 at 11:30 AM Guozhang Wang <guozhang.wang...@gmail.com> wrote: > I'd agree with you guys that as long as we are in agreement about the > configuration semantics, that would be a big win to move forward for > this KIP. As for the TaskCorruptedException handling like wiping state > stores, we can discuss that in the PR rather than in the KIP. > > Just to clarify, I'm onboard with the latest proposal, and probably we > can move on for voting on this KIP now? > > Guozhang > > On Thu, Oct 19, 2023 at 5:33 AM Bruno Cadonna <cado...@apache.org> wrote: > > > > Hi Nick, > > > > What you and Lucas wrote about the different configurations of ALOS/EOS > > and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier > > concerns about changelogs diverging from the content of the local state > > stores turned out to not apply. So I think, we can move on with those > > configurations. > > > > Regarding the TaskCorruptedException and wiping out the state stores > > under EOS, couldn't we abort the transaction on the state store and > > close the task dirty? If the Kafka transaction was indeed committed, the > > store would restore the missing part from the changelog topic. If the > > Kafka transaction was not committed, changelog topic and state store are > > in-sync. > > > > In any case, IMO those are implementation details that we do not need to > > discuss and solve in the KIP discussion. We can solve them on the PR. > > The important thing is that the processing guarantees hold. > > > > Best, > > Bruno > > > > On 10/18/23 3:56 PM, Nick Telford wrote: > > > Hi Lucas, > > > > > > TaskCorruptedException is how Streams signals that the Task state > needs to > > > be wiped, so we can't retain that exception without also wiping state > on > > > timeouts. > > > > > > Regards, > > > Nick > > > > > > On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy <lbruts...@confluent.io > .invalid> > > > wrote: > > > > > >> Hi Nick, > > >> > > >> I think indeed the better behavior would be to retry commitTransaction > > >> until we risk running out of time to meet `max.poll.interval.ms`. > > >> > > >> However, if it's handled as a `TaskCorruptedException` at the moment, > > >> I would do the same in this KIP, and leave exception handling > > >> improvements to future work. This KIP is already improving the > > >> situation a lot by not wiping the state store. > > >> > > >> Cheers, > > >> Lucas > > >> > > >> On Tue, Oct 17, 2023 at 3:51 PM Nick Telford <nick.telf...@gmail.com> > > >> wrote: > > >>> > > >>> Hi Lucas, > > >>> > > >>> Yeah, this is pretty much the direction I'm thinking of going in > now. You > > >>> make an interesting point about committing on-error under > > >>> ALOS/READ_COMMITTED, although I haven't had a chance to think > through the > > >>> implications yet. > > >>> > > >>> Something that I ran into earlier this week is an issue with the new > > >>> handling of TimeoutException. Without TX stores, TimeoutException > under > > >> EOS > > >>> throws a TaskCorruptedException, which wipes the stores. However, > with TX > > >>> stores, TimeoutException is now just bubbled up and dealt with as it > is > > >>> under ALOS. The problem arises when the Producer#commitTransaction > call > > >>> times out: Streams attempts to ignore the error and continue > producing, > > >>> which causes the next call to Producer#send to throw > > >>> "IllegalStateException: Cannot attempt operation `send` because the > > >>> previous call to `commitTransaction` timed out and must be retried". > > >>> > > >>> I'm not sure what we should do here: retrying the commitTransaction > seems > > >>> logical, but what if it times out again? Where do we draw the line > and > > >>> shutdown the instance? > > >>> > > >>> Regards, > > >>> Nick > > >>> > > >>> On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy <lbruts...@confluent.io > > >> .invalid> > > >>> wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> I think I liked your suggestion of allowing EOS with > READ_UNCOMMITTED, > > >>>> but keep wiping the state on error, and I'd vote for this solution > > >>>> when introducing `default.state.isolation.level`. This way, we'd > have > > >>>> the most low-risk roll-out of this feature (no behavior change > without > > >>>> reconfiguration), with the possibility of switching to the most > sane / > > >>>> battle-tested default settings in 4.0. Essentially, we'd have a > > >>>> feature flag but call it `default.state.isolation.level` and don't > > >>>> have to deprecate it later. > > >>>> > > >>>> So the possible configurations would then be this: > > >>>> > > >>>> 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, > IQ > > >>>> reads from DB. > > >>>> 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from > > >>>> WriteBatch/DB. Flush on error (see note below). > > >>>> 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ > > >>>> reads from DB. Wipe state on error. > > >>>> 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from > > >>>> WriteBatch/DB. > > >>>> > > >>>> I believe the feature is important enough that we will see good > > >>>> adoption even without changing the default. In 4.0, when we have > seen > > >>>> this being adopted and is battle-tested, we make READ_COMMITTED the > > >>>> default for EOS, or even READ_COMITTED always the default, depending > > >>>> on our experiences. And we could add a clever implementation of > > >>>> READ_UNCOMITTED with WriteBatches later. > > >>>> > > >>>> The only smell here is that `default.state.isolation.level` wouldn't > > >>>> be purely an IQ setting, but it would also (slightly) change the > > >>>> behavior of the processing, but that seems unavoidable as long as we > > >>>> haven't solve READ_UNCOMITTED IQ with WriteBatches. > > >>>> > > >>>> Minor: As for Bruno's point 4, I think if we are concerned about > this > > >>>> behavior (we don't necessarily have to be, because it doesn't > violate > > >>>> ALOS guarantees as far as I can see), we could make > > >>>> ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing > > >>>> the WriteBatch on error (obviously, only if we have a chance to do > > >>>> that). > > >>>> > > >>>> Cheers, > > >>>> Lucas > > >>>> > > >>>> On Mon, Oct 16, 2023 at 12:19 PM Nick Telford < > nick.telf...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>> Hi Guozhang, > > >>>>> > > >>>>> The KIP as it stands introduces a new configuration, > > >>>>> default.state.isolation.level, which is independent of > > >> processing.mode. > > >>>>> It's intended that this new configuration be used to configure a > > >> global > > >>>> IQ > > >>>>> isolation level in the short term, with a future KIP introducing > the > > >>>>> capability to change the isolation level on a per-query basis, > > >> falling > > >>>> back > > >>>>> to the "default" defined by this config. That's why I called it > > >>>> "default", > > >>>>> for future-proofing. > > >>>>> > > >>>>> However, it currently includes the caveat that READ_UNCOMMITTED is > > >> not > > >>>>> available under EOS. I think this is the coupling you are alluding > > >> to? > > >>>>> > > >>>>> This isn't intended to be a restriction of the API, but is > currently > > >> a > > >>>>> technical limitation. However, after discussing with some users > about > > >>>>> use-cases that would require READ_UNCOMMITTED under EOS, I'm > > >> inclined to > > >>>>> remove that clause and put in the necessary work to make that > > >> combination > > >>>>> possible now. > > >>>>> > > >>>>> I currently see two possible approaches: > > >>>>> > > >>>>> 1. Disable TX StateStores internally when the IsolationLevel is > > >>>>> READ_UNCOMMITTED and the processing.mode is EOS. This is more > > >>>> difficult > > >>>>> than it sounds, as there are many assumptions being made > > >> throughout > > >>>> the > > >>>>> internals about the guarantees StateStores provide. It would > > >>>> definitely add > > >>>>> a lot of extra "if (read_uncommitted && eos)" branches, > > >> complicating > > >>>>> maintenance and testing. > > >>>>> 2. Invest the time *now* to make READ_UNCOMMITTED of EOS > > >> StateStores > > >>>>> possible. I have some ideas on how this could be achieved, but > > >> they > > >>>> would > > >>>>> need testing and could introduce some additional issues. The > > >> benefit > > >>>> of > > >>>>> this approach is that it would make query-time IsolationLevels > > >> much > > >>>> simpler > > >>>>> to implement in the future. > > >>>>> > > >>>>> Unfortunately, both will require considerable work that will > further > > >>>> delay > > >>>>> this KIP, which was the reason I placed the restriction in the KIP > > >> in the > > >>>>> first place. > > >>>>> > > >>>>> Regards, > > >>>>> Nick > > >>>>> > > >>>>> On Sat, 14 Oct 2023 at 03:30, Guozhang Wang < > > >> guozhang.wang...@gmail.com> > > >>>>> wrote: > > >>>>> > > >>>>>> Hello Nick, > > >>>>>> > > >>>>>> First of all, thanks a lot for the great effort you've put in > > >> driving > > >>>>>> this KIP! I really like it coming through finally, as many people > > >> in > > >>>>>> the community have raised this. At the same time I honestly feel a > > >> bit > > >>>>>> ashamed for not putting enough of my time supporting it and > > >> pushing it > > >>>>>> through the finish line (you raised this KIP almost a year ago). > > >>>>>> > > >>>>>> I briefly passed through the DISCUSS thread so far, not sure I've > > >> 100 > > >>>>>> percent digested all the bullet points. But with the goal of > > >> trying to > > >>>>>> help take it through the finish line in mind, I'd want to throw > > >>>>>> thoughts on top of my head only on the point #4 above which I felt > > >> may > > >>>>>> be the main hurdle for the current KIP to drive to a consensus > now. > > >>>>>> > > >>>>>> The general question I asked myself is, whether we want to couple > > >> "IQ > > >>>>>> reading mode" with "processing mode". While technically I tend to > > >>>>>> agree with you that, it's feels like a bug if some single user > > >> chose > > >>>>>> "EOS" for processing mode while choosing "read uncommitted" for IQ > > >>>>>> reading mode, at the same time, I'm thinking if it's possible that > > >>>>>> there could be two different persons (or even two teams) that > > >> would be > > >>>>>> using the stream API to build the app, and the IQ API to query the > > >>>>>> running state of the app. I know this is less of a technical thing > > >> but > > >>>>>> rather a more design stuff, but if it could be ever the case, I'm > > >>>>>> wondering if the personale using the IQ API knows about the risks > > >> of > > >>>>>> using read uncommitted but still chose so for the favor of > > >>>>>> performance, no matter if the underlying stream processing mode > > >>>>>> configured by another personale is EOS or not. In that regard, I'm > > >>>>>> leaning towards a "leaving the door open, and close it later if we > > >>>>>> found it's a bad idea" aspect with a configuration that we can > > >>>>>> potentially deprecate than "shut the door, clean for everyone". > > >> More > > >>>>>> specifically, allowing the processing mode / IQ read mode to be > > >>>>>> decoupled, and if we found that there's no such cases as I > > >> speculated > > >>>>>> above or people started complaining a lot, we can still enforce > > >>>>>> coupling them. > > >>>>>> > > >>>>>> Again, just my 2c here. Thanks again for the great patience and > > >>>>>> diligence on this KIP. > > >>>>>> > > >>>>>> > > >>>>>> Guozhang > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Fri, Oct 13, 2023 at 8:48 AM Nick Telford < > > >> nick.telf...@gmail.com> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi Bruno, > > >>>>>>> > > >>>>>>> 4. > > >>>>>>> I'll hold off on making that change until we have a consensus as > > >> to > > >>>> what > > >>>>>>> configuration to use to control all of this, as it'll be > > >> affected by > > >>>> the > > >>>>>>> decision on EOS isolation levels. > > >>>>>>> > > >>>>>>> 5. > > >>>>>>> Done. I've chosen "committedOffsets". > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Nick > > >>>>>>> > > >>>>>>> On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org> > > >>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Nick, > > >>>>>>>> > > >>>>>>>> 1. > > >>>>>>>> Yeah, you are probably right that it does not make too much > > >> sense. > > >>>>>>>> Thanks for the clarification! > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 4. > > >>>>>>>> Yes, sorry for the back and forth, but I think for the sake of > > >> the > > >>>> KIP > > >>>>>>>> it is better to let the ALOS behavior as it is for now due to > > >> the > > >>>>>>>> possible issues you would run into. Maybe we can find a > > >> solution > > >>>> in the > > >>>>>>>> future. Now the question returns to whether we really need > > >>>>>>>> default.state.isolation.level. Maybe the config could be the > > >>>> feature > > >>>>>>>> flag Sophie requested. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 5. > > >>>>>>>> There is a guideline in Kafka not to use the get prefix for > > >>>> getters (at > > >>>>>>>> least in the public API). Thus, could you please rename > > >>>>>>>> > > >>>>>>>> getCommittedOffset(TopicPartition partition) -> > > >>>>>>>> committedOffsetFor(TopicPartition partition) > > >>>>>>>> > > >>>>>>>> You can also propose an alternative to committedOffsetFor(). > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Bruno > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On 10/13/23 3:21 PM, Nick Telford wrote: > > >>>>>>>>> Hi Bruno, > > >>>>>>>>> > > >>>>>>>>> Thanks for getting back to me. > > >>>>>>>>> > > >>>>>>>>> 1. > > >>>>>>>>> I think this should be possible. Are you thinking of the > > >>>> situation > > >>>>>> where > > >>>>>>>> a > > >>>>>>>>> user may downgrade to a previous version of Kafka Streams? In > > >>>> that > > >>>>>> case, > > >>>>>>>>> sadly, the RocksDBStore would get wiped by the older version > > >> of > > >>>> Kafka > > >>>>>>>>> Streams anyway, because that version wouldn't understand the > > >>>> extra > > >>>>>> column > > >>>>>>>>> family (that holds offsets), so the missing Position file > > >> would > > >>>>>>>>> automatically get rebuilt when the store is rebuilt from the > > >>>>>> changelog. > > >>>>>>>>> Are there other situations than downgrade where a > > >> transactional > > >>>> store > > >>>>>>>> could > > >>>>>>>>> be replaced by a non-transactional one? I can't think of any. > > >>>>>>>>> > > >>>>>>>>> 2. > > >>>>>>>>> Ahh yes, the Test Plan - my Kryptonite! This section > > >> definitely > > >>>>>> needs to > > >>>>>>>> be > > >>>>>>>>> fleshed out. I'll work on that. How much detail do you need? > > >>>>>>>>> > > >>>>>>>>> 3. > > >>>>>>>>> See my previous email discussing this. > > >>>>>>>>> > > >>>>>>>>> 4. > > >>>>>>>>> Hmm, this is an interesting point. Are you suggesting that > > >> under > > >>>> ALOS > > >>>>>>>>> READ_COMMITTED should not be supported? > > >>>>>>>>> > > >>>>>>>>> Regards, > > >>>>>>>>> Nick > > >>>>>>>>> > > >>>>>>>>> On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna < > > >> cado...@apache.org> > > >>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Nick, > > >>>>>>>>>> > > >>>>>>>>>> I think the KIP is converging! > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 1. > > >>>>>>>>>> I am wondering whether it makes sense to write the position > > >> file > > >>>>>> during > > >>>>>>>>>> close as we do for the checkpoint file, so that in case the > > >>>> state > > >>>>>> store > > >>>>>>>>>> is replaced with a non-transactional state store the > > >>>>>> non-transactional > > >>>>>>>>>> state store finds the position file. I think, this is not > > >>>> strictly > > >>>>>>>>>> needed, but would be a nice behavior instead of just > > >> deleting > > >>>> the > > >>>>>>>>>> position file. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 2. > > >>>>>>>>>> The test plan does not mention integration tests. Do you not > > >>>> need to > > >>>>>>>>>> extend existing ones and add new ones. Also for upgrading > > >> and > > >>>>>>>>>> downgrading you might need integration and/or system tests. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 3. > > >>>>>>>>>> I think Sophie made a point. Although, IQ reading from > > >>>> uncommitted > > >>>>>> data > > >>>>>>>>>> under EOS might be considered a bug by some people. Thus, > > >> your > > >>>> KIP > > >>>>>> would > > >>>>>>>>>> fix a bug rather than changing the intended behavior. > > >> However, I > > >>>>>> also > > >>>>>>>>>> see that a feature flag would help users that rely on this > > >> buggy > > >>>>>>>>>> behavior (at least until AK 4.0). > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 4. > > >>>>>>>>>> This is related to the previous point. I assume that the > > >>>> difference > > >>>>>>>>>> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is > > >> that in > > >>>> the > > >>>>>>>>>> former you enable transactions on the state store and in the > > >>>> latter > > >>>>>> you > > >>>>>>>>>> disable them. If my assumption is correct, I think that is > > >> an > > >>>> issue. > > >>>>>>>>>> Let's assume under ALOS Streams fails over a couple of times > > >>>> more or > > >>>>>>>>>> less at the same step in processing after value 3 is added > > >> to an > > >>>>>>>>>> aggregation but the offset of the corresponding input record > > >>>> was not > > >>>>>>>>>> committed. Without transactions disabled, the aggregation > > >> value > > >>>>>> would > > >>>>>>>>>> increase by 3 for each failover. With transactions enabled, > > >>>> value 3 > > >>>>>>>>>> would only be added to the aggregation once when the offset > > >> of > > >>>> the > > >>>>>> input > > >>>>>>>>>> record is committed and the transaction finally completes. > > >> So > > >>>> the > > >>>>>>>>>> content of the state store would change depending on the > > >>>>>> configuration > > >>>>>>>>>> for IQ. IMO, the content of the state store should be > > >>>> independent > > >>>>>> from > > >>>>>>>>>> IQ. Given this issue, I propose to not use transactions with > > >>>> ALOS at > > >>>>>>>>>> all. I was a big proponent of using transactions with ALOS, > > >> but > > >>>> I > > >>>>>>>>>> realized that transactions with ALOS is not as easy as > > >> enabling > > >>>>>>>>>> transactions on state stores. Another aspect that is > > >>>> problematic is > > >>>>>> that > > >>>>>>>>>> the changelog topic which actually replicates the state > > >> store > > >>>> is not > > >>>>>>>>>> transactional under ALOS. Thus, it might happen that the > > >> state > > >>>>>> store and > > >>>>>>>>>> the changelog differ in their content. All of this is maybe > > >>>> solvable > > >>>>>>>>>> somehow, but for the sake of this KIP, I would leave it for > > >> the > > >>>>>> future. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> Bruno > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote: > > >>>>>>>>>>> Hey Nick! First of all thanks for taking up this awesome > > >>>> feature, > > >>>>>> I'm > > >>>>>>>>>> sure > > >>>>>>>>>>> every single > > >>>>>>>>>>> Kafka Streams user and dev would agree that it is sorely > > >>>> needed. > > >>>>>>>>>>> > > >>>>>>>>>>> I've just been catching up on the KIP and surrounding > > >>>> discussion, > > >>>>>> so > > >>>>>>>>>> please > > >>>>>>>>>>> forgive me > > >>>>>>>>>>> for any misunderstandings or misinterpretations of the > > >> current > > >>>>>> plan and > > >>>>>>>>>>> don't hesitate to > > >>>>>>>>>>> correct me. > > >>>>>>>>>>> > > >>>>>>>>>>> Before I jump in, I just want to say that having seen this > > >>>> drag on > > >>>>>> for > > >>>>>>>> so > > >>>>>>>>>>> long, my singular > > >>>>>>>>>>> goal in responding is to help this KIP past a perceived > > >>>> impasse so > > >>>>>> we > > >>>>>>>> can > > >>>>>>>>>>> finally move on > > >>>>>>>>>>> to voting and implementing it. Long discussions are to be > > >>>> expected > > >>>>>> for > > >>>>>>>>>>> major features like > > >>>>>>>>>>> this but it's completely on us as the Streams devs to make > > >> sure > > >>>>>> there > > >>>>>>>> is > > >>>>>>>>>> an > > >>>>>>>>>>> end in sight > > >>>>>>>>>>> for any ongoing discussion. > > >>>>>>>>>>> > > >>>>>>>>>>> With that said, it's my understanding that the KIP as > > >> currently > > >>>>>>>> proposed > > >>>>>>>>>> is > > >>>>>>>>>>> just not tenable > > >>>>>>>>>>> for Kafka Streams, and would prevent some EOS users from > > >>>> upgrading > > >>>>>> to > > >>>>>>>> the > > >>>>>>>>>>> version it > > >>>>>>>>>>> first appears in. Given that we can't predict or guarantee > > >>>> whether > > >>>>>> any > > >>>>>>>> of > > >>>>>>>>>>> the followup KIPs > > >>>>>>>>>>> would be completed in the same release cycle as this one, > > >> we > > >>>> need > > >>>>>> to > > >>>>>>>> make > > >>>>>>>>>>> sure that the > > >>>>>>>>>>> feature is either compatible with all current users or else > > >>>>>>>>>> feature-flagged > > >>>>>>>>>>> so that they may > > >>>>>>>>>>> opt in/out. > > >>>>>>>>>>> > > >>>>>>>>>>> Therefore, IIUC we need to have either (or both) of these > > >> as > > >>>>>>>>>>> fully-implemented config options: > > >>>>>>>>>>> 1. default.state.isolation.level > > >>>>>>>>>>> 2. enable.transactional.state.stores > > >>>>>>>>>>> > > >>>>>>>>>>> This way EOS users for whom read_committed semantics are > > >> not > > >>>>>> viable can > > >>>>>>>>>>> still upgrade, > > >>>>>>>>>>> and either use the isolation.level config to leverage the > > >> new > > >>>> txn > > >>>>>> state > > >>>>>>>>>>> stores without sacrificing > > >>>>>>>>>>> their application semantics, or else simply keep the > > >>>> transactional > > >>>>>>>> state > > >>>>>>>>>>> stores disabled until we > > >>>>>>>>>>> are able to fully implement the isolation level > > >> configuration > > >>>> at > > >>>>>> either > > >>>>>>>>>> an > > >>>>>>>>>>> application or query level. > > >>>>>>>>>>> > > >>>>>>>>>>> Frankly you are the expert here and know much more about > > >> the > > >>>>>> tradeoffs > > >>>>>>>> in > > >>>>>>>>>>> both semantics and > > >>>>>>>>>>> effort level of implementing one of these configs vs the > > >>>> other. In > > >>>>>> my > > >>>>>>>>>>> opinion, either option would > > >>>>>>>>>>> be fine and I would leave the decision of which one to > > >> include > > >>>> in > > >>>>>> this > > >>>>>>>>>> KIP > > >>>>>>>>>>> completely up to you. > > >>>>>>>>>>> I just don't see a way for the KIP to proceed without some > > >>>>>> variation of > > >>>>>>>>>> the > > >>>>>>>>>>> above that would allow > > >>>>>>>>>>> EOS users to opt-out of read_committed. > > >>>>>>>>>>> > > >>>>>>>>>>> (If it's all the same to you, I would recommend always > > >>>> including a > > >>>>>>>>>> feature > > >>>>>>>>>>> flag in large structural > > >>>>>>>>>>> changes like this. No matter how much I trust someone or > > >>>> myself to > > >>>>>>>>>>> implement a feature, you just > > >>>>>>>>>>> never know what kind of bugs might slip in, especially > > >> with the > > >>>>>> very > > >>>>>>>>>> first > > >>>>>>>>>>> iteration that gets released. > > >>>>>>>>>>> So personally, my choice would be to add the feature flag > > >> and > > >>>>>> leave it > > >>>>>>>>>> off > > >>>>>>>>>>> by default. If all goes well > > >>>>>>>>>>> you can do a quick KIP to enable it by default as soon as > > >> the > > >>>>>>>>>>> isolation.level config has been > > >>>>>>>>>>> completed. But feel free to just pick whichever option is > > >>>> easiest > > >>>>>> or > > >>>>>>>>>>> quickest for you to implement) > > >>>>>>>>>>> > > >>>>>>>>>>> Hope this helps move the discussion forward, > > >>>>>>>>>>> Sophie > > >>>>>>>>>>> > > >>>>>>>>>>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford < > > >>>>>> nick.telf...@gmail.com> > > >>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi Bruno, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Agreed, I can live with that for now. > > >>>>>>>>>>>> > > >>>>>>>>>>>> In an effort to keep the scope of this KIP from > > >> expanding, I'm > > >>>>>> leaning > > >>>>>>>>>>>> towards just providing a configurable > > >>>>>> default.state.isolation.level > > >>>>>>>> and > > >>>>>>>>>>>> removing IsolationLevel from the StateStoreContext. This > > >>>> would be > > >>>>>>>>>>>> compatible with adding support for query-time > > >> IsolationLevels > > >>>> in > > >>>>>> the > > >>>>>>>>>>>> future, whilst providing a way for users to select an > > >>>> isolation > > >>>>>> level > > >>>>>>>>>> now. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The big problem with this, however, is that if a user > > >> selects > > >>>>>>>>>>>> processing.mode > > >>>>>>>>>>>> = "exactly-once(-v2|-beta)", and > > >>>> default.state.isolation.level = > > >>>>>>>>>>>> "READ_UNCOMMITTED", we need to guarantee that the data > > >> isn't > > >>>>>> written > > >>>>>>>> to > > >>>>>>>>>>>> disk until commit() is called, but we also need to permit > > >> IQ > > >>>>>> threads > > >>>>>>>> to > > >>>>>>>>>>>> read from the ongoing transaction. > > >>>>>>>>>>>> > > >>>>>>>>>>>> A simple solution would be to (temporarily) forbid this > > >>>>>> combination of > > >>>>>>>>>>>> configuration, and have default.state.isolation.level > > >>>>>> automatically > > >>>>>>>>>> switch > > >>>>>>>>>>>> to READ_COMMITTED when processing.mode is anything other > > >> than > > >>>>>>>>>>>> at-least-once. Do you think this would be acceptable? > > >>>>>>>>>>>> > > >>>>>>>>>>>> In a later KIP, we can add support for query-time > > >> isolation > > >>>>>> levels and > > >>>>>>>>>>>> solve this particular problem there, which would relax > > >> this > > >>>>>>>> restriction. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Regards, > > >>>>>>>>>>>> Nick > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna < > > >>>> cado...@apache.org> > > >>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Why do we need to add READ_COMMITTED to > > >>>> InMemoryKeyValueStore? I > > >>>>>>>> think > > >>>>>>>>>>>>> it is perfectly valid to say InMemoryKeyValueStore do not > > >>>> support > > >>>>>>>>>>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the > > >>>> de-facto > > >>>>>>>> default > > >>>>>>>>>>>>> at the moment. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best, > > >>>>>>>>>>>>> Bruno > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 9/18/23 7:12 PM, Nick Telford wrote: > > >>>>>>>>>>>>>> Oh! One other concern I haven't mentioned: if we make > > >>>>>>>> IsolationLevel a > > >>>>>>>>>>>>>> query-time constraint, then we need to add support for > > >>>>>>>> READ_COMMITTED > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>> InMemoryKeyValueStore too, which will require some > > >> changes > > >>>> to > > >>>>>> the > > >>>>>>>>>>>>>> implementation. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford < > > >>>>>> nick.telf...@gmail.com > > >>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I agree that having IsolationLevel be determined at > > >>>> query-time > > >>>>>> is > > >>>>>>>> the > > >>>>>>>>>>>>>>> ideal design, but there are a few sticking points: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>> There needs to be some way to communicate the > > >>>> IsolationLevel > > >>>>>> down > > >>>>>>>> to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> RocksDBStore itself, so that the query can respect it. > > >>>> Since > > >>>>>> stores > > >>>>>>>>>>>> are > > >>>>>>>>>>>>>>> "layered" in functionality (i.e. ChangeLoggingStore, > > >>>>>> MeteredStore, > > >>>>>>>>>>>>> etc.), > > >>>>>>>>>>>>>>> we need some way to deliver that information to the > > >> bottom > > >>>>>> layer. > > >>>>>>>> For > > >>>>>>>>>>>>> IQv2, > > >>>>>>>>>>>>>>> we can use the existing State#query() method, but IQv1 > > >> has > > >>>> no > > >>>>>> way > > >>>>>>>> to > > >>>>>>>>>>>> do > > >>>>>>>>>>>>>>> this. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> A simple approach, which would potentially open up > > >> other > > >>>>>> options, > > >>>>>>>>>>>> would > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>> to add something like: ReadOnlyKeyValueStore<K, V> > > >>>>>>>>>>>>>>> readOnlyView(IsolationLevel isolationLevel) to > > >>>>>>>> ReadOnlyKeyValueStore > > >>>>>>>>>>>>> (and > > >>>>>>>>>>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore, > > >>>> etc.). > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>> As mentioned above, RocksDB WriteBatches are not > > >>>> thread-safe, > > >>>>>> which > > >>>>>>>>>>>>> causes > > >>>>>>>>>>>>>>> a problem if we want to provide READ_UNCOMMITTED > > >>>> Iterators. I > > >>>>>> also > > >>>>>>>>>>>> had a > > >>>>>>>>>>>>>>> look at RocksDB Transactions[1], but they solve a very > > >>>>>> different > > >>>>>>>>>>>>> problem, > > >>>>>>>>>>>>>>> and have the same thread-safety issue. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> One possible approach that I mentioned is chaining > > >>>>>> WriteBatches: > > >>>>>>>>>> every > > >>>>>>>>>>>>>>> time a new Interactive Query is received (i.e. > > >>>> readOnlyView, > > >>>>>> see > > >>>>>>>>>>>> above, > > >>>>>>>>>>>>>>> is called) we "freeze" the existing WriteBatch, and > > >> start a > > >>>>>> new one > > >>>>>>>>>>>> for > > >>>>>>>>>>>>> new > > >>>>>>>>>>>>>>> writes. The Interactive Query queries the "chain" of > > >>>> previous > > >>>>>>>>>>>>> WriteBatches > > >>>>>>>>>>>>>>> + the underlying database; while the StreamThread > > >> starts > > >>>>>> writing to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> *new* WriteBatch. On-commit, the StreamThread would > > >> write > > >>>> *all* > > >>>>>>>>>>>>>>> WriteBatches in the chain to the database (that have > > >> not > > >>>> yet > > >>>>>> been > > >>>>>>>>>>>>> written). > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> WriteBatches would be closed/freed only when they have > > >> been > > >>>>>> both > > >>>>>>>>>>>>>>> committed, and all open Interactive Queries on them > > >> have > > >>>> been > > >>>>>>>> closed. > > >>>>>>>>>>>>> This > > >>>>>>>>>>>>>>> would require some reference counting. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Obviously a drawback of this approach is the potential > > >> for > > >>>>>>>> increased > > >>>>>>>>>>>>>>> memory usage: if an Interactive Query is long-lived, > > >> for > > >>>>>> example by > > >>>>>>>>>>>>> doing a > > >>>>>>>>>>>>>>> full scan over a large database, or even just pausing > > >> in > > >>>> the > > >>>>>> middle > > >>>>>>>>>> of > > >>>>>>>>>>>>> an > > >>>>>>>>>>>>>>> iteration, then the existing chain of WriteBatches > > >> could be > > >>>>>> kept > > >>>>>>>>>>>> around > > >>>>>>>>>>>>> for > > >>>>>>>>>>>>>>> a long time, potentially forever. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> A. > > >>>>>>>>>>>>>>> Going off on a tangent, it looks like in addition to > > >>>> supporting > > >>>>>>>>>>>>>>> READ_COMMITTED queries, we could go further and support > > >>>>>>>>>>>> REPEATABLE_READ > > >>>>>>>>>>>>>>> queries (i.e. where subsequent reads to the same key > > >> in the > > >>>>>> same > > >>>>>>>>>>>>>>> Interactive Query are guaranteed to yield the same > > >> value) > > >>>> by > > >>>>>> making > > >>>>>>>>>>>> use > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so > > >> the > > >>>>>>>>>> performance > > >>>>>>>>>>>>>>> impact is likely to be negligible, but they do require > > >>>> that the > > >>>>>>>>>>>>> Interactive > > >>>>>>>>>>>>>>> Query session can be explicitly closed. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> This could be achieved if we made the above > > >> readOnlyView > > >>>>>> interface > > >>>>>>>>>>>> look > > >>>>>>>>>>>>>>> more like: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> interface ReadOnlyKeyValueView<K, V> implements > > >>>>>>>>>>>> ReadOnlyKeyValueStore<K, > > >>>>>>>>>>>>>>> V>, AutoCloseable {} > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> interface ReadOnlyKeyValueStore<K, V> { > > >>>>>>>>>>>>>>> ... > > >>>>>>>>>>>>>>> ReadOnlyKeyValueView<K, V> > > >>>> readOnlyView(IsolationLevel > > >>>>>>>>>>>>> isolationLevel); > > >>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> But this would be a breaking change, as existing IQv1 > > >>>> queries > > >>>>>> are > > >>>>>>>>>>>>>>> guaranteed to never call store.close(), and therefore > > >> these > > >>>>>> would > > >>>>>>>>>> leak > > >>>>>>>>>>>>>>> memory under REPEATABLE_READ. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> B. > > >>>>>>>>>>>>>>> One thing that's notable: MyRocks states that they > > >> support > > >>>>>>>>>>>>> READ_COMMITTED > > >>>>>>>>>>>>>>> and REPEATABLE_READ, but they make no mention of > > >>>>>>>>>>>> READ_UNCOMMITTED[3][4]. > > >>>>>>>>>>>>>>> This could be because doing so is technically > > >>>>>> difficult/impossible > > >>>>>>>>>>>> using > > >>>>>>>>>>>>>>> the primitives available in RocksDB. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Lucas, to address your points: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> U1. > > >>>>>>>>>>>>>>> It's only "SHOULD" to permit alternative (i.e. > > >> non-RocksDB) > > >>>>>>>>>>>>>>> implementations of StateStore that do not support > > >> atomic > > >>>>>> writes. > > >>>>>>>>>>>>> Obviously > > >>>>>>>>>>>>>>> in those cases, the guarantees Kafka Streams > > >>>> provides/expects > > >>>>>> would > > >>>>>>>>>> be > > >>>>>>>>>>>>>>> relaxed. Do you think we should require all > > >>>> implementations to > > >>>>>>>>>> support > > >>>>>>>>>>>>>>> atomic writes? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> U2. > > >>>>>>>>>>>>>>> Stores can support multiple IsolationLevels. As we've > > >>>> discussed > > >>>>>>>>>> above, > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> ideal scenario would be to specify the IsolationLevel > > >> at > > >>>>>>>> query-time. > > >>>>>>>>>>>>>>> Failing that, I think the second-best approach is to > > >>>> define the > > >>>>>>>>>>>>>>> IsolationLevel for *all* queries based on the > > >>>> processing.mode, > > >>>>>>>> which > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>>> what the default StateStoreContext#isolationLevel() > > >>>> achieves. > > >>>>>> Would > > >>>>>>>>>>>> you > > >>>>>>>>>>>>>>> prefer an alternative? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> While the existing implementation is equivalent to > > >>>>>>>> READ_UNCOMMITTED, > > >>>>>>>>>>>>> this > > >>>>>>>>>>>>>>> can yield unexpected results/errors under EOS, if a > > >>>>>> transaction is > > >>>>>>>>>>>>> rolled > > >>>>>>>>>>>>>>> back. While this would be a change in behaviour for > > >> users, > > >>>> it > > >>>>>> would > > >>>>>>>>>>>> look > > >>>>>>>>>>>>>>> more like a bug fix than a breaking change. That said, > > >> we > > >>>>>> *could* > > >>>>>>>>>> make > > >>>>>>>>>>>>> it > > >>>>>>>>>>>>>>> configurable, and default to the existing behaviour > > >>>>>>>>>> (READ_UNCOMMITTED) > > >>>>>>>>>>>>>>> instead of inferring it from the processing.mode? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> N1, N2. > > >>>>>>>>>>>>>>> These were only primitives to avoid boxing costs, but > > >> since > > >>>>>> this is > > >>>>>>>>>>>> not > > >>>>>>>>>>>>> a > > >>>>>>>>>>>>>>> performance sensitive area, it should be fine to > > >> change if > > >>>>>> that's > > >>>>>>>>>>>>> desirable. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> N3. > > >>>>>>>>>>>>>>> It's because the store "manages its own offsets", which > > >>>>>> includes > > >>>>>>>> both > > >>>>>>>>>>>>>>> committing the offset, *and providing it* via > > >>>>>> getCommittedOffset(). > > >>>>>>>>>>>>>>> Personally, I think "managesOffsets" conveys this best, > > >>>> but I > > >>>>>> don't > > >>>>>>>>>>>> mind > > >>>>>>>>>>>>>>> changing it if the nomenclature is unclear. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Sorry for the massive emails/essays! > > >>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1: > > >> https://github.com/facebook/rocksdb/wiki/Transactions > > >>>>>>>>>>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot > > >>>>>>>>>>>>>>> 3: > > >>>>>>>> > > >> https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation > > >>>>>>>>>>>>>>> 4: > > >>>> https://mariadb.com/kb/en/myrocks-transactional-isolation/ > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy > > >>>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Nick, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> since I last read it in April, the KIP has become much > > >>>>>> cleaner and > > >>>>>>>>>>>>>>>> easier to read. Great work! > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> It feels to me the last big open point is whether we > > >> can > > >>>>>> implement > > >>>>>>>>>>>>>>>> isolation level as a query parameter. I understand > > >> that > > >>>> there > > >>>>>> are > > >>>>>>>>>>>>>>>> implementation concerns, but as Colt says, it would > > >> be a > > >>>> great > > >>>>>>>>>>>>>>>> addition, and would also simplify the migration path > > >> for > > >>>> this > > >>>>>>>>>> change. > > >>>>>>>>>>>>>>>> Is the implementation problem you mentioned caused by > > >> the > > >>>>>>>> WriteBatch > > >>>>>>>>>>>>>>>> not having a notion of a snapshot, as the underlying > > >> DB > > >>>>>> iterator > > >>>>>>>>>>>> does? > > >>>>>>>>>>>>>>>> In that case, I am not sure a chain of WriteBatches > > >> as you > > >>>>>> propose > > >>>>>>>>>>>>>>>> would fully solve the problem, but maybe I didn't dig > > >>>> enough > > >>>>>> into > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>> details to fully understand it. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> If it's not possible to implement it now, would it be > > >> an > > >>>>>> option to > > >>>>>>>>>>>>>>>> make sure in this KIP that we do not fully close the > > >> door > > >>>> on > > >>>>>>>>>>>> per-query > > >>>>>>>>>>>>>>>> isolation levels in the interface, as it may be > > >> possible > > >>>> to > > >>>>>>>>>> implement > > >>>>>>>>>>>>>>>> the missing primitives in RocksDB or Speedb in the > > >> future. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Understanding: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to > > >> be > > >>>>>> persisted > > >>>>>>>>>>>>>>>> atomically with the records? > > >>>>>>>>>>>>>>>> * U2) Don't understand the default implementation of > > >>>>>>>>>>>> `isolationLevel`. > > >>>>>>>>>>>>>>>> The isolation level should be a property of the > > >> underlying > > >>>>>> store, > > >>>>>>>>>> and > > >>>>>>>>>>>>>>>> not be defined by the default config? Existing stores > > >>>> probably > > >>>>>>>> don't > > >>>>>>>>>>>>>>>> guarantee READ_COMMITTED, so the default should be to > > >>>> return > > >>>>>>>>>>>>>>>> READ_UNCOMMITTED. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Nits: > > >>>>>>>>>>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong` > > >>>> return > > >>>>>> type, > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>> avoid the `null`? > > >>>>>>>>>>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an > > >>>>>> `OptionalLong` > > >>>>>>>>>>>>>>>> return type, to avoid the `-1`? > > >>>>>>>>>>>>>>>> * N3) I don't understand why `managesOffsets` uses the > > >>>>>> 'manage' > > >>>>>>>>>> verb, > > >>>>>>>>>>>>>>>> whereas all other methods use the "commits" verb. I'd > > >>>> suggest > > >>>>>>>>>>>>>>>> `commitsOffsets`. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Either way, it feels this KIP is very close to the > > >> finish > > >>>>>> line, > > >>>>>>>> I'm > > >>>>>>>>>>>>>>>> looking forward to seeing this in production! > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>> Lucas > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy < > > >>>>>> c...@littlehorse.io > > >>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Making IsolationLevel a query-time constraint, > > >> rather > > >>>> than > > >>>>>>>> linking > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> the processing.guarantee. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> As I understand it, would this allow even a user of > > >> EOS > > >>>> to > > >>>>>>>> control > > >>>>>>>>>>>>>>>> whether > > >>>>>>>>>>>>>>>>> reading committed or uncommitted records? If so, I am > > >>>> highly > > >>>>>> in > > >>>>>>>>>>>> favor > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> this. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I know that I was one of the early people to point > > >> out > > >>>> the > > >>>>>>>> current > > >>>>>>>>>>>>>>>>> shortcoming that IQ reads uncommitted records, but > > >> just > > >>>> this > > >>>>>>>>>>>> morning I > > >>>>>>>>>>>>>>>>> realized a pattern we use which means that (for > > >> certain > > >>>>>> queries) > > >>>>>>>>>> our > > >>>>>>>>>>>>>>>> system > > >>>>>>>>>>>>>>>>> needs to be able to read uncommitted records, which > > >> is > > >>>> the > > >>>>>>>> current > > >>>>>>>>>>>>>>>> behavior > > >>>>>>>>>>>>>>>>> of Kafka Streams in EOS.*** > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> If IsolationLevel being a query-time decision allows > > >> for > > >>>>>> this, > > >>>>>>>> then > > >>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> would be amazing. I would also vote that the default > > >>>> behavior > > >>>>>>>>>> should > > >>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>> reading uncommitted records, because it is totally > > >>>> possible > > >>>>>> for a > > >>>>>>>>>>>>> valid > > >>>>>>>>>>>>>>>>> application to depend on that behavior, and breaking > > >> it > > >>>> in a > > >>>>>>>> minor > > >>>>>>>>>>>>>>>> release > > >>>>>>>>>>>>>>>>> might be a bit strong. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> *** (Note, for the curious reader....) Our > > >> use-case/query > > >>>>>> pattern > > >>>>>>>>>>>> is a > > >>>>>>>>>>>>>>>> bit > > >>>>>>>>>>>>>>>>> complex, but reading "uncommitted" records is > > >> actually > > >>>> safe > > >>>>>> in > > >>>>>>>> our > > >>>>>>>>>>>>> case > > >>>>>>>>>>>>>>>>> because processing is deterministic. Additionally, IQ > > >>>> being > > >>>>>> able > > >>>>>>>> to > > >>>>>>>>>>>>> read > > >>>>>>>>>>>>>>>>> uncommitted records is crucial to enable "read your > > >> own > > >>>>>> writes" > > >>>>>>>> on > > >>>>>>>>>>>> our > > >>>>>>>>>>>>>>>> API: > > >>>>>>>>>>>>>>>>> Due to the deterministic processing, we send an > > >> "ack" to > > >>>> the > > >>>>>>>> client > > >>>>>>>>>>>>> who > > >>>>>>>>>>>>>>>>> makes the request as soon as the processor processes > > >> the > > >>>>>> result. > > >>>>>>>> If > > >>>>>>>>>>>>> they > > >>>>>>>>>>>>>>>>> can't read uncommitted records, they may receive a > > >> "201 - > > >>>>>>>> Created" > > >>>>>>>>>>>>>>>>> response, immediately followed by a "404 - Not Found" > > >>>> when > > >>>>>> doing > > >>>>>>>> a > > >>>>>>>>>>>>>>>> lookup > > >>>>>>>>>>>>>>>>> for the object they just created). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>> Colt McNealy > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev* > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford < > > >>>>>>>>>>>> nick.telf...@gmail.com> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Addendum: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I think we would also face the same problem with the > > >>>>>> approach > > >>>>>>>> John > > >>>>>>>>>>>>>>>> outlined > > >>>>>>>>>>>>>>>>>> earlier (using the record cache as a transaction > > >> buffer > > >>>> and > > >>>>>>>>>>>> flushing > > >>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>> straight to SST files). This is because the record > > >> cache > > >>>>>> (the > > >>>>>>>>>>>>>>>> ThreadCache > > >>>>>>>>>>>>>>>>>> class) is not thread-safe, so every commit would > > >>>> invalidate > > >>>>>> open > > >>>>>>>>>> IQ > > >>>>>>>>>>>>>>>>>> Iterators in the same way that RocksDB WriteBatches > > >> do. > > >>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford < > > >>>>>>>>>>>> nick.telf...@gmail.com> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi Bruno, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I've updated the KIP based on our conversation. The > > >>>> only > > >>>>>> things > > >>>>>>>>>>>>>>>> I've not > > >>>>>>>>>>>>>>>>>>> yet done are: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 1. Using transactions under ALOS and EOS. > > >>>>>>>>>>>>>>>>>>> 2. Making IsolationLevel a query-time constraint, > > >>>> rather > > >>>>>> than > > >>>>>>>>>>>>>>>> linking it > > >>>>>>>>>>>>>>>>>>> to the processing.guarantee. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> There's a wrinkle that makes this a challenge: > > >>>> Interactive > > >>>>>>>>>> Queries > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>> open an Iterator, when using transactions and > > >>>>>> READ_UNCOMMITTED. > > >>>>>>>>>>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries > > >>>> need > > >>>>>> to be > > >>>>>>>>>>>> able > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> read records from the currently uncommitted > > >> transaction > > >>>>>> buffer > > >>>>>>>>>>>>>>>>>>> (WriteBatch). This includes for Iterators, which > > >> should > > >>>>>> iterate > > >>>>>>>>>>>>>>>> both the > > >>>>>>>>>>>>>>>>>>> transaction buffer and underlying database (using > > >>>>>>>>>>>>>>>>>>> WriteBatch#iteratorWithBase()). > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> The issue is that when the StreamThread commits, it > > >>>> writes > > >>>>>> the > > >>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>> WriteBatch to RocksDB *and then clears the > > >> WriteBatch*. > > >>>>>>>> Clearing > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> WriteBatch while an Interactive Query holds an open > > >>>>>> Iterator on > > >>>>>>>>>> it > > >>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>> invalidate the Iterator. Worse, it turns out that > > >>>> Iterators > > >>>>>>>> over > > >>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>> WriteBatch become invalidated not just when the > > >>>> WriteBatch > > >>>>>> is > > >>>>>>>>>>>>>>>> cleared, > > >>>>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>> also when the Iterators' current key receives a new > > >>>> write. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Now that I'm writing this, I remember that this is > > >> the > > >>>>>> major > > >>>>>>>>>>>> reason > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>> switched the original design from having a > > >> query-time > > >>>>>>>>>>>>>>>> IsolationLevel to > > >>>>>>>>>>>>>>>>>>> having the IsolationLevel linked to the > > >>>> transactionality > > >>>>>> of the > > >>>>>>>>>>>>>>>> stores > > >>>>>>>>>>>>>>>>>>> themselves. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> It *might* be possible to resolve this, by having a > > >>>>>> "chain" of > > >>>>>>>>>>>>>>>>>>> WriteBatches, with the StreamThread switching to a > > >> new > > >>>>>>>> WriteBatch > > >>>>>>>>>>>>>>>>>> whenever > > >>>>>>>>>>>>>>>>>>> a new Interactive Query attempts to read from the > > >>>>>> database, but > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>>> cause some performance problems/memory pressure > > >> when > > >>>>>> subjected > > >>>>>>>> to > > >>>>>>>>>>>> a > > >>>>>>>>>>>>>>>> high > > >>>>>>>>>>>>>>>>>>> Interactive Query load. It would also reduce the > > >>>>>> efficiency of > > >>>>>>>>>>>>>>>>>> WriteBatches > > >>>>>>>>>>>>>>>>>>> on-commit, as we'd have to write N WriteBatches, > > >> where > > >>>> N > > >>>>>> is the > > >>>>>>>>>>>>>>>> number of > > >>>>>>>>>>>>>>>>>>> Interactive Queries since the last commit. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I realise this is getting into the weeds of the > > >>>>>> implementation, > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>>>> you'd > > >>>>>>>>>>>>>>>>>>> rather we focus on the API for now, but I think > > >> it's > > >>>>>> important > > >>>>>>>> to > > >>>>>>>>>>>>>>>>>> consider > > >>>>>>>>>>>>>>>>>>> how to implement the desired API, in case we come > > >> up > > >>>> with > > >>>>>> an > > >>>>>>>> API > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>> cannot be implemented efficiently, or even at all! > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thoughts? > > >>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna < > > >>>>>>>> cado...@apache.org > > >>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Nick, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 6. > > >>>>>>>>>>>>>>>>>>>> Of course, you are right! My bad! > > >>>>>>>>>>>>>>>>>>>> Wiping out the state in the downgrading case is > > >> fine. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 3a. > > >>>>>>>>>>>>>>>>>>>> Focus on the public facing changes for the KIP. We > > >>>> will > > >>>>>> manage > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>> get > > >>>>>>>>>>>>>>>>>>>> the internals right. Regarding state stores that > > >> do > > >>>> not > > >>>>>>>> support > > >>>>>>>>>>>>>>>>>>>> READ_COMMITTED, they should throw an error stating > > >>>> that > > >>>>>> they > > >>>>>>>> do > > >>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state > > >>>> stores > > >>>>>>>>>>>>>>>> immediately. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> 3b. > > >>>>>>>>>>>>>>>>>>>> I am in favor of using transactions also for ALOS. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>> Bruno > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote: > > >>>>>>>>>>>>>>>>>>>>> Hi Bruno, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Thanks for getting back to me! > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>> The fact that implementations can always track > > >>>> estimated > > >>>>>>>> memory > > >>>>>>>>>>>>>>>> usage > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as > > >> an > > >>>>>> option, > > >>>>>>>> and > > >>>>>>>>>>>>>>>> I'll > > >>>>>>>>>>>>>>>>>>>> clarify > > >>>>>>>>>>>>>>>>>>>>> the JavaDoc that 0 is not just for > > >> non-transactional > > >>>>>> stores, > > >>>>>>>>>>>>>>>> which is > > >>>>>>>>>>>>>>>>>>>>> currently misleading. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 6. > > >>>>>>>>>>>>>>>>>>>>> The problem with catching the exception in the > > >>>> downgrade > > >>>>>>>>>> process > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>> would require new code in the Kafka version being > > >>>>>> downgraded > > >>>>>>>>>> to. > > >>>>>>>>>>>>>>>> Since > > >>>>>>>>>>>>>>>>>>>>> users could conceivably downgrade to almost *any* > > >>>> older > > >>>>>>>> version > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>> Streams, I'm not sure how we could add that code? > > >>>>>>>>>>>>>>>>>>>>> The only way I can think of doing it would be to > > >>>> provide > > >>>>>> a > > >>>>>>>>>>>>>>>> dedicated > > >>>>>>>>>>>>>>>>>>>>> downgrade tool, that goes through every local > > >> store > > >>>> and > > >>>>>>>> removes > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> offsets column families. But that seems like an > > >>>>>> unnecessary > > >>>>>>>>>>>>>>>> amount of > > >>>>>>>>>>>>>>>>>>>> extra > > >>>>>>>>>>>>>>>>>>>>> code to maintain just to handle a somewhat niche > > >>>>>> situation, > > >>>>>>>>>> when > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> alternative (automatically wipe and restore > > >> stores) > > >>>>>> should be > > >>>>>>>>>>>>>>>>>>>> acceptable. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've > > >>>> requested. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 3a. > > >>>>>>>>>>>>>>>>>>>>> I agree that IsolationLevel makes more sense at > > >>>>>> query-time, > > >>>>>>>> and > > >>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>> actually > > >>>>>>>>>>>>>>>>>>>>> initially attempted to place the IsolationLevel > > >> at > > >>>>>>>> query-time, > > >>>>>>>>>>>>>>>> but I > > >>>>>>>>>>>>>>>>>> ran > > >>>>>>>>>>>>>>>>>>>>> into some problems: > > >>>>>>>>>>>>>>>>>>>>> - The key issue is that, under ALOS we're not > > >> staging > > >>>>>> writes > > >>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>> transactions, so can't perform writes at the > > >>>>>> READ_COMMITTED > > >>>>>>>>>>>>>>>> isolation > > >>>>>>>>>>>>>>>>>>>>> level. However, this may be addressed if we > > >> decide to > > >>>>>>>> *always* > > >>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>>> transactions as discussed under 3b. > > >>>>>>>>>>>>>>>>>>>>> - IQv1 and IQv2 have quite different > > >>>> implementations. I > > >>>>>>>>>> remember > > >>>>>>>>>>>>>>>>>> having > > >>>>>>>>>>>>>>>>>>>>> some difficulty understanding the IQv1 internals, > > >>>> which > > >>>>>> made > > >>>>>>>> it > > >>>>>>>>>>>>>>>>>>>> difficult > > >>>>>>>>>>>>>>>>>>>>> to determine what needed to be changed. However, > > >> I > > >>>>>> *think* > > >>>>>>>> this > > >>>>>>>>>>>>>>>> can be > > >>>>>>>>>>>>>>>>>>>>> addressed for both implementations by wrapping > > >> the > > >>>>>>>> RocksDBStore > > >>>>>>>>>>>>>>>> in an > > >>>>>>>>>>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides > > >> read > > >>>>>> methods > > >>>>>>>>>>>>>>>> (get, > > >>>>>>>>>>>>>>>>>>>> etc.) > > >>>>>>>>>>>>>>>>>>>>> to either read directly from the database or > > >> from the > > >>>>>> ongoing > > >>>>>>>>>>>>>>>>>>>> transaction. > > >>>>>>>>>>>>>>>>>>>>> But IQv1 might still be difficult. > > >>>>>>>>>>>>>>>>>>>>> - If IsolationLevel becomes a query constraint, > > >> then > > >>>> all > > >>>>>>>> other > > >>>>>>>>>>>>>>>>>>>> StateStores > > >>>>>>>>>>>>>>>>>>>>> will need to respect it, including the in-memory > > >>>> stores. > > >>>>>> This > > >>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>> require > > >>>>>>>>>>>>>>>>>>>>> us to adapt in-memory stores to stage their > > >> writes so > > >>>>>> they > > >>>>>>>> can > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> isolated > > >>>>>>>>>>>>>>>>>>>>> from READ_COMMITTTED queries. It would also > > >> become an > > >>>>>>>> important > > >>>>>>>>>>>>>>>>>>>>> consideration for third-party stores on upgrade, > > >> as > > >>>>>> without > > >>>>>>>>>>>>>>>> changes, > > >>>>>>>>>>>>>>>>>>>> they > > >>>>>>>>>>>>>>>>>>>>> would not support READ_COMMITTED queries > > >> correctly. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Ultimately, I may need some help making the > > >> necessary > > >>>>>> change > > >>>>>>>> to > > >>>>>>>>>>>>>>>> IQv1 > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> support this, but I don't think it's > > >> fundamentally > > >>>>>>>> impossible, > > >>>>>>>>>>>>>>>> if we > > >>>>>>>>>>>>>>>>>>>> want > > >>>>>>>>>>>>>>>>>>>>> to pursue this route. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 3b. > > >>>>>>>>>>>>>>>>>>>>> The main reason I chose to keep ALOS > > >> un-transactional > > >>>>>> was to > > >>>>>>>>>>>>>>>> minimize > > >>>>>>>>>>>>>>>>>>>>> behavioural change for most users (I believe most > > >>>> Streams > > >>>>>>>> users > > >>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> default configuration, which is ALOS). That said, > > >>>> it's > > >>>>>> clear > > >>>>>>>>>>>>>>>> that if > > >>>>>>>>>>>>>>>>>>>> ALOS > > >>>>>>>>>>>>>>>>>>>>> also used transactional stores, the only change > > >> in > > >>>>>> behaviour > > >>>>>>>>>>>>>>>> would be > > >>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>> it would become *more correct*, which could be > > >>>>>> considered a > > >>>>>>>>>> "bug > > >>>>>>>>>>>>>>>> fix" > > >>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>> users, rather than a change they need to handle. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I believe that performance using transactions > > >> (aka. > > >>>>>> RocksDB > > >>>>>>>>>>>>>>>>>>>> WriteBatches) > > >>>>>>>>>>>>>>>>>>>>> should actually be *better* than the un-batched > > >>>>>> write-path > > >>>>>>>> that > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>> currently used[1]. The only "performance" > > >>>> consideration > > >>>>>> will > > >>>>>>>> be > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>> increased memory usage that transactions require. > > >>>> Given > > >>>>>> the > > >>>>>>>>>>>>>>>>>> mitigations > > >>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>> this memory that we have in place, I would expect > > >>>> that > > >>>>>> this > > >>>>>>>> is > > >>>>>>>>>>>>>>>> not a > > >>>>>>>>>>>>>>>>>>>>> problem for most users. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> If we're happy to do so, we can make ALOS also > > >> use > > >>>>>>>>>> transactions. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Link 1: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>> > > >>>> > > >> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna < > > >>>>>>>>>>>> cado...@apache.org > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi Nick, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the updates and sorry for the delay > > >> on my > > >>>>>> side! > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>>>>>>> Making the default implementation for flush() a > > >>>> no-op > > >>>>>> sounds > > >>>>>>>>>>>>>>>> good to > > >>>>>>>>>>>>>>>>>>>> me. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>>> I think what was bugging me here is that a > > >>>> third-party > > >>>>>> state > > >>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>> needs > > >>>>>>>>>>>>>>>>>>>>>> to implement the state store interface. That > > >> means > > >>>> they > > >>>>>> need > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> implement a wrapper around the actual state > > >> store > > >>>> as we > > >>>>>> do > > >>>>>>>> for > > >>>>>>>>>>>>>>>>>> RocksDB > > >>>>>>>>>>>>>>>>>>>>>> with RocksDBStore. So, a third-party state > > >> store can > > >>>>>> always > > >>>>>>>>>>>>>>>> estimate > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> uncommitted bytes, if it wants, because the > > >> wrapper > > >>>> can > > >>>>>>>> record > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> added > > >>>>>>>>>>>>>>>>>>>>>> bytes. > > >>>>>>>>>>>>>>>>>>>>>> One case I can think of where returning -1 makes > > >>>> sense > > >>>>>> is > > >>>>>>>> when > > >>>>>>>>>>>>>>>>>> Streams > > >>>>>>>>>>>>>>>>>>>>>> does not need to estimate the size of the write > > >>>> batch > > >>>>>> and > > >>>>>>>>>>>>>>>> trigger > > >>>>>>>>>>>>>>>>>>>>>> extraordinary commits, because the third-party > > >> state > > >>>>>> store > > >>>>>>>>>>>>>>>> takes care > > >>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>> memory. But in that case the method could also > > >> just > > >>>>>> return > > >>>>>>>> 0. > > >>>>>>>>>>>>>>>> Even > > >>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> case would be better solved with a method that > > >>>> returns > > >>>>>>>> whether > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>>>> store manages itself the memory used for > > >> uncommitted > > >>>>>> bytes > > >>>>>>>> or > > >>>>>>>>>>>>>>>> not. > > >>>>>>>>>>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return > > >>>> value, > > >>>>>> I was > > >>>>>>>>>>>>>>>> just > > >>>>>>>>>>>>>>>>>>>>>> wondering when and if it will be used. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Regarding returning 0 for transactional state > > >> stores > > >>>>>> when > > >>>>>>>> the > > >>>>>>>>>>>>>>>> batch > > >>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>> empty, I was just wondering because you > > >> explicitly > > >>>>>> stated > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> "or {@code 0} if this StateStore does not > > >> support > > >>>>>>>>>>>> transactions." > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> So it seemed to me returning 0 could only > > >> happen for > > >>>>>>>>>>>>>>>>>> non-transactional > > >>>>>>>>>>>>>>>>>>>>>> state stores. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> a) What do you think if we move the isolation > > >> level > > >>>> to > > >>>>>> IQ > > >>>>>>>> (v1 > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> v2)? > > >>>>>>>>>>>>>>>>>>>>>> In the end this is the only component that > > >> really > > >>>> needs > > >>>>>> to > > >>>>>>>>>>>>>>>> specify > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> isolation level. It is similar to the Kafka > > >> consumer > > >>>>>> that > > >>>>>>>> can > > >>>>>>>>>>>>>>>> choose > > >>>>>>>>>>>>>>>>>>>>>> with what isolation level to read the input > > >> topic. > > >>>>>>>>>>>>>>>>>>>>>> For IQv1 the isolation level should go into > > >>>>>>>>>>>>>>>> StoreQueryParameters. For > > >>>>>>>>>>>>>>>>>>>>>> IQv2, I would add it to the Query interface. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> b) Point a) raises the question what should > > >> happen > > >>>>>> during > > >>>>>>>>>>>>>>>>>> at-least-once > > >>>>>>>>>>>>>>>>>>>>>> processing when the state store does not use > > >>>>>> transactions? > > >>>>>>>>>> John > > >>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> past proposed to also use transactions on state > > >>>> stores > > >>>>>> for > > >>>>>>>>>>>>>>>>>>>>>> at-least-once. I like that idea, because it > > >> avoids > > >>>>>>>> aggregating > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>>>> records over and over again in the case of a > > >>>> failure. We > > >>>>>>>> had a > > >>>>>>>>>>>>>>>> case > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> the past where a Streams applications in > > >>>> at-least-once > > >>>>>> mode > > >>>>>>>>>> was > > >>>>>>>>>>>>>>>>>> failing > > >>>>>>>>>>>>>>>>>>>>>> continuously for some reasons I do not remember > > >>>> before > > >>>>>>>>>>>>>>>> committing the > > >>>>>>>>>>>>>>>>>>>>>> offsets. After each failover, the app aggregated > > >>>> again > > >>>>>> and > > >>>>>>>>>>>>>>>> again the > > >>>>>>>>>>>>>>>>>>>>>> same records. Of course the aggregate increased > > >> to > > >>>> very > > >>>>>>>> wrong > > >>>>>>>>>>>>>>>> values > > >>>>>>>>>>>>>>>>>>>>>> just because of the failover. With transactions > > >> on > > >>>> the > > >>>>>> state > > >>>>>>>>>>>>>>>> stores > > >>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>> could have avoided this. The app would have > > >> output > > >>>> the > > >>>>>> same > > >>>>>>>>>>>>>>>> aggregate > > >>>>>>>>>>>>>>>>>>>>>> multiple times (i.e., after each failover) but > > >> at > > >>>> least > > >>>>>> the > > >>>>>>>>>>>>>>>> value of > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> aggregate would not depend on the number of > > >>>> failovers. > > >>>>>>>>>>>>>>>> Outputting the > > >>>>>>>>>>>>>>>>>>>>>> same aggregate multiple times would be incorrect > > >>>> under > > >>>>>>>>>>>>>>>> exactly-once > > >>>>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>>>>> it is OK for at-least-once. > > >>>>>>>>>>>>>>>>>>>>>> If it makes sense to add a config to turn on > > >> and off > > >>>>>>>>>>>>>>>> transactions on > > >>>>>>>>>>>>>>>>>>>>>> state stores under at-least-once or just use > > >>>>>> transactions in > > >>>>>>>>>>>>>>>> any case > > >>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>> a question we should also discuss in this KIP. > > >> It > > >>>>>> depends a > > >>>>>>>>>> bit > > >>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would > > >>>> add a > > >>>>>>>> config. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 4. > > >>>>>>>>>>>>>>>>>>>>>> Your points are all valid. I tend to say to > > >> keep the > > >>>>>> metrics > > >>>>>>>>>>>>>>>> around > > >>>>>>>>>>>>>>>>>>>>>> flush() until we remove flush() completely from > > >> the > > >>>>>>>> interface. > > >>>>>>>>>>>>>>>> Calls > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> flush() might still exist since existing > > >> processors > > >>>>>> might > > >>>>>>>>>> still > > >>>>>>>>>>>>>>>> call > > >>>>>>>>>>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For > > >>>> sure, we > > >>>>>> need > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> document > > >>>>>>>>>>>>>>>>>>>>>> how the metrics change due to the transactions > > >> in > > >>>> the > > >>>>>>>> upgrade > > >>>>>>>>>>>>>>>> notes. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 5. > > >>>>>>>>>>>>>>>>>>>>>> I see. Then you should describe how the > > >> .position > > >>>> files > > >>>>>> are > > >>>>>>>>>>>>>>>> handled > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> a dedicated section of the KIP or incorporate > > >> the > > >>>>>>>> description > > >>>>>>>>>>>>>>>> in the > > >>>>>>>>>>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only > > >>>>>> mentioning it > > >>>>>>>>>> in > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration > > >> Plan". > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 6. > > >>>>>>>>>>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP > > >> is a > > >>>>>> good > > >>>>>>>>>> idea. > > >>>>>>>>>>>>>>>>>>>>>> Regarding downgrading, I think you could also > > >> catch > > >>>> the > > >>>>>>>>>>>>>>>> exception and > > >>>>>>>>>>>>>>>>>>>> do > > >>>>>>>>>>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the > > >> column > > >>>>>> family. > > >>>>>>>> See > > >>>>>>>>>>>>>>>> here > > >>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>> an example: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>> > > >> > https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75 > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> It is a bit brittle, but it works. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>> Bruno > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote: > > >>>>>>>>>>>>>>>>>>>>>>> Hi Bruno, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for taking the time to review the KIP. > > >> I'm > > >>>> back > > >>>>>> from > > >>>>>>>>>>>>>>>> leave > > >>>>>>>>>>>>>>>>>> now > > >>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>> intend to move this forwards as quickly as I > > >> can. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Addressing your points: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>>>>>>>> Because flush() is part of the StateStore API, > > >> it's > > >>>>>> exposed > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>>> custom > > >>>>>>>>>>>>>>>>>>>>>>> Processors, which might be making calls to > > >> flush(). > > >>>>>> This > > >>>>>>>> was > > >>>>>>>>>>>>>>>>>> actually > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> case in a few integration tests. > > >>>>>>>>>>>>>>>>>>>>>>> To maintain as much compatibility as possible, > > >> I'd > > >>>>>> prefer > > >>>>>>>> not > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>> an UnsupportedOperationException, as it will > > >> cause > > >>>>>>>> previously > > >>>>>>>>>>>>>>>>>> working > > >>>>>>>>>>>>>>>>>>>>>>> Processors to start throwing exceptions at > > >> runtime. > > >>>>>>>>>>>>>>>>>>>>>>> I agree that it doesn't make sense for it to > > >> proxy > > >>>>>>>> commit(), > > >>>>>>>>>>>>>>>> though, > > >>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>> that would cause it to violate the "StateStores > > >>>> commit > > >>>>>> only > > >>>>>>>>>>>>>>>> when the > > >>>>>>>>>>>>>>>>>>>> Task > > >>>>>>>>>>>>>>>>>>>>>>> commits" rule. > > >>>>>>>>>>>>>>>>>>>>>>> Instead, I think we should make this a no-op. > > >> That > > >>>> way, > > >>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>> user > > >>>>>>>>>>>>>>>>>>>>>>> Processors will continue to work as-before, > > >> without > > >>>>>>>> violation > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>> consistency that would be caused by premature > > >>>>>> flush/commit > > >>>>>>>> of > > >>>>>>>>>>>>>>>>>>>> StateStore > > >>>>>>>>>>>>>>>>>>>>>>> data to disk. > > >>>>>>>>>>>>>>>>>>>>>>> What do you think? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore > > >>>>>> implementation > > >>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>> transactional, but is unable to estimate the > > >>>>>> uncommitted > > >>>>>>>>>>>> memory > > >>>>>>>>>>>>>>>>>> usage, > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> method will return -1. > > >>>>>>>>>>>>>>>>>>>>>>> The intention here is to permit third-party > > >>>>>> implementations > > >>>>>>>>>>>>>>>> that may > > >>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> able to estimate memory usage. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been > > >> written to > > >>>> the > > >>>>>>>> store > > >>>>>>>>>>>>>>>> yet. I > > >>>>>>>>>>>>>>>>>>>>>> thought > > >>>>>>>>>>>>>>>>>>>>>>> that was implied by "This method will return an > > >>>>>>>> approximation > > >>>>>>>>>>>>>>>> of the > > >>>>>>>>>>>>>>>>>>>>>> memory > > >>>>>>>>>>>>>>>>>>>>>>> would be freed by the next call to {@link > > >>>>>> #commit(Map)}" > > >>>>>>>> and > > >>>>>>>>>>>>>>>>>> "@return > > >>>>>>>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>>>>> approximate size of all records awaiting {@link > > >>>>>>>>>>>> #commit(Map)}", > > >>>>>>>>>>>>>>>>>>>> however, > > >>>>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you > > >> think > > >>>> this > > >>>>>> is > > >>>>>>>>>>>>>>>> unclear? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>>> I realise this is probably the most contentious > > >>>> point > > >>>>>> in my > > >>>>>>>>>>>>>>>> design, > > >>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>>>>>>>>>> open to changing it if I'm unable to convince > > >> you > > >>>> of > > >>>>>> the > > >>>>>>>>>>>>>>>> benefits. > > >>>>>>>>>>>>>>>>>>>>>>> Nevertheless, here's my argument: > > >>>>>>>>>>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly > > >>>> provided > > >>>>>>>>>>>>>>>> StateStores > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>> query, and it may be important for users to > > >>>>>>>> programmatically > > >>>>>>>>>>>>>>>> know > > >>>>>>>>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>>>>>>>>> mode the StateStore is operating under. If we > > >>>> simply > > >>>>>>>> provide > > >>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the > > >>>> internal > > >>>>>>>> streams > > >>>>>>>>>>>>>>>>>>>> engine), or > > >>>>>>>>>>>>>>>>>>>>>>> similar, then users will need to understand the > > >>>>>> operation > > >>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> consequences > > >>>>>>>>>>>>>>>>>>>>>>> of each available processing mode and how it > > >>>> pertains > > >>>>>> to > > >>>>>>>>>> their > > >>>>>>>>>>>>>>>>>>>>>> StateStore. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Interactive Query users aren't the only people > > >> that > > >>>>>> care > > >>>>>>>>>> about > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore: > > >>>>>>>> implementers > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> custom > > >>>>>>>>>>>>>>>>>>>>>>> StateStores also need to understand the > > >> behaviour > > >>>>>> expected > > >>>>>>>> of > > >>>>>>>>>>>>>>>> their > > >>>>>>>>>>>>>>>>>>>>>>> implementation. KIP-892 introduces some > > >> assumptions > > >>>>>> into > > >>>>>>>> the > > >>>>>>>>>>>>>>>> Streams > > >>>>>>>>>>>>>>>>>>>>>> Engine > > >>>>>>>>>>>>>>>>>>>>>>> about how StateStores operate under each > > >> processing > > >>>>>> mode, > > >>>>>>>> and > > >>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>>>>> important that custom implementations adhere to > > >>>> those > > >>>>>>>>>>>>>>>> assumptions in > > >>>>>>>>>>>>>>>>>>>>>> order > > >>>>>>>>>>>>>>>>>>>>>>> to maintain the consistency guarantees. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> IsolationLevels provide a high-level contract > > >> on > > >>>> the > > >>>>>>>>>> behaviour > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> StateStore: a user knows that under > > >> READ_COMMITTED, > > >>>>>> they > > >>>>>>>> will > > >>>>>>>>>>>>>>>> see > > >>>>>>>>>>>>>>>>>>>> writes > > >>>>>>>>>>>>>>>>>>>>>>> only after the Task has committed, and under > > >>>>>>>> READ_UNCOMMITTED > > >>>>>>>>>>>>>>>> they > > >>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>> see > > >>>>>>>>>>>>>>>>>>>>>>> writes immediately. No understanding of the > > >>>> details of > > >>>>>> each > > >>>>>>>>>>>>>>>>>>>>>> processing.mode > > >>>>>>>>>>>>>>>>>>>>>>> is required, either for IQ users or StateStore > > >>>>>>>> implementers. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> An argument can be made that these contractual > > >>>>>> guarantees > > >>>>>>>> can > > >>>>>>>>>>>>>>>> simply > > >>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> documented for the processing.mode (i.e. that > > >>>>>> exactly-once > > >>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and > > >>>>>>>> at-least-once > > >>>>>>>>>>>>>>>> behaves > > >>>>>>>>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small > > >>>> issues > > >>>>>> with > > >>>>>>>>>>>>>>>> this I'd > > >>>>>>>>>>>>>>>>>>>>>> prefer > > >>>>>>>>>>>>>>>>>>>>>>> to avoid: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> - Where would we document these > > >> contracts, > > >>>> in > > >>>>>> a way > > >>>>>>>>>> that > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>> difficult > > >>>>>>>>>>>>>>>>>>>>>>> for users/implementers to miss/ignore? > > >>>>>>>>>>>>>>>>>>>>>>> - It's not clear to users that the > > >>>> processing > > >>>>>> mode > > >>>>>>>> is > > >>>>>>>>>>>>>>>>>>>> communicating > > >>>>>>>>>>>>>>>>>>>>>>> an expectation of read isolation, > > >> unless > > >>>> they > > >>>>>> read > > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> documentation. Users > > >>>>>>>>>>>>>>>>>>>>>>> rarely consult documentation unless > > >> they > > >>>> feel > > >>>>>> they > > >>>>>>>>>> need > > >>>>>>>>>>>>>>>> to, so > > >>>>>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>>>> likely > > >>>>>>>>>>>>>>>>>>>>>>> this detail would get missed by many > > >> users. > > >>>>>>>>>>>>>>>>>>>>>>> - It tightly couples processing modes > > >> to > > >>>> read > > >>>>>>>>>> isolation. > > >>>>>>>>>>>>>>>> Adding > > >>>>>>>>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>>>>>>> processing modes, or changing the read > > >>>>>> isolation of > > >>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>>>>>> processing > > >>>>>>>>>>>>>>>>>>>>>>> modes would be difficult/impossible. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Ultimately, the cost of introducing > > >>>> IsolationLevels is > > >>>>>>>> just a > > >>>>>>>>>>>>>>>> single > > >>>>>>>>>>>>>>>>>>>>>>> method, since we re-use the existing > > >> IsolationLevel > > >>>>>> enum > > >>>>>>>> from > > >>>>>>>>>>>>>>>> Kafka. > > >>>>>>>>>>>>>>>>>>>> This > > >>>>>>>>>>>>>>>>>>>>>>> gives us a clear place to document the > > >> contractual > > >>>>>>>> guarantees > > >>>>>>>>>>>>>>>>>> expected > > >>>>>>>>>>>>>>>>>>>>>>> of/provided by StateStores, that is accessible > > >>>> both by > > >>>>>> the > > >>>>>>>>>>>>>>>>>> StateStore > > >>>>>>>>>>>>>>>>>>>>>>> itself, and by IQ users. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> (Writing this I've just realised that the > > >>>> StateStore > > >>>>>> and IQ > > >>>>>>>>>>>>>>>> APIs > > >>>>>>>>>>>>>>>>>>>> actually > > >>>>>>>>>>>>>>>>>>>>>>> don't provide access to StateStoreContext that > > >> IQ > > >>>> users > > >>>>>>>> would > > >>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>> direct > > >>>>>>>>>>>>>>>>>>>>>>> access to... Perhaps StateStore should expose > > >>>>>>>>>> isolationLevel() > > >>>>>>>>>>>>>>>>>> itself > > >>>>>>>>>>>>>>>>>>>>>> too?) > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 4. > > >>>>>>>>>>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics > > >>>> in-place > > >>>>>>>>>>>>>>>> either, as > > >>>>>>>>>>>>>>>>>>>> it's a > > >>>>>>>>>>>>>>>>>>>>>>> backwards incompatible change. My concern is > > >> that, > > >>>> if > > >>>>>> we > > >>>>>>>>>> leave > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>>>>>>> "flush" metrics in place, they will be > > >> confusing to > > >>>>>> users. > > >>>>>>>>>>>>>>>> Right > > >>>>>>>>>>>>>>>>>> now, > > >>>>>>>>>>>>>>>>>>>>>>> "flush" metrics record explicit flushes to > > >> disk, > > >>>> but > > >>>>>> under > > >>>>>>>>>>>>>>>> KIP-892, > > >>>>>>>>>>>>>>>>>>>> even > > >>>>>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>> commit() will not explicitly flush data to > > >> disk - > > >>>>>> RocksDB > > >>>>>>>>>> will > > >>>>>>>>>>>>>>>>>> decide > > >>>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>>>> when to flush memtables to disk itself. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd > > >> have > > >>>> two > > >>>>>>>>>>>> options, > > >>>>>>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>>>>>>>> both > > >>>>>>>>>>>>>>>>>>>>>>> seem pretty bad to me: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. Have them record calls to commit(), > > >>>> which > > >>>>>> would > > >>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> misleading, as > > >>>>>>>>>>>>>>>>>>>>>>> data is no longer explicitly "flushed" > > >> to > > >>>> disk > > >>>>>> by > > >>>>>>>> this > > >>>>>>>>>>>>>>>> call. > > >>>>>>>>>>>>>>>>>>>>>>> 2. Have them record nothing at all, > > >> which > > >>>> is > > >>>>>>>>>> equivalent > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> removing > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>> metrics, except that users will see the > > >>>> metric > > >>>>>>>> still > > >>>>>>>>>>>>>>>> exists and > > >>>>>>>>>>>>>>>>>>>> so > > >>>>>>>>>>>>>>>>>>>>>> assume > > >>>>>>>>>>>>>>>>>>>>>>> that the metric is correct, and that > > >>>> there's a > > >>>>>>>> problem > > >>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>> their > > >>>>>>>>>>>>>>>>>>>>>> system > > >>>>>>>>>>>>>>>>>>>>>>> when there isn't. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> I agree that removing them is also a bad > > >> solution, > > >>>> and > > >>>>>> I'd > > >>>>>>>>>>>>>>>> like some > > >>>>>>>>>>>>>>>>>>>>>>> guidance on the best path forward here. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 5. > > >>>>>>>>>>>>>>>>>>>>>>> Position files are updated on every write to a > > >>>>>> StateStore. > > >>>>>>>>>>>>>>>> Since our > > >>>>>>>>>>>>>>>>>>>>>> writes > > >>>>>>>>>>>>>>>>>>>>>>> are now buffered until commit(), we can't > > >> update > > >>>> the > > >>>>>>>> Position > > >>>>>>>>>>>>>>>> file > > >>>>>>>>>>>>>>>>>>>> until > > >>>>>>>>>>>>>>>>>>>>>>> commit() has been called, otherwise it would be > > >>>>>>>> inconsistent > > >>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> data > > >>>>>>>>>>>>>>>>>>>>>>> in the event of a rollback. Consequently, we > > >> need > > >>>> to > > >>>>>> manage > > >>>>>>>>>>>>>>>> these > > >>>>>>>>>>>>>>>>>>>> offsets > > >>>>>>>>>>>>>>>>>>>>>>> the same way we manage the checkpoint offsets, > > >> and > > >>>>>> ensure > > >>>>>>>>>>>>>>>> they're > > >>>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>>>> written on commit(). > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 6. > > >>>>>>>>>>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what > > >>>> tests to > > >>>>>>>>>> write. > > >>>>>>>>>>>>>>>> How > > >>>>>>>>>>>>>>>>>>>>>> explicit > > >>>>>>>>>>>>>>>>>>>>>>> do we need to be here in the KIP? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed > > >> to be > > >>>>>>>> seamless, > > >>>>>>>>>>>>>>>> and we > > >>>>>>>>>>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>>>>>>>>>> definitely add some tests around that. > > >> Downgrade, > > >>>> it > > >>>>>>>>>>>>>>>> transpires, > > >>>>>>>>>>>>>>>>>> isn't > > >>>>>>>>>>>>>>>>>>>>>>> currently possible, as the extra column family > > >> for > > >>>>>> offset > > >>>>>>>>>>>>>>>> storage is > > >>>>>>>>>>>>>>>>>>>>>>> incompatible with the pre-KIP-892 > > >> implementation: > > >>>> when > > >>>>>> you > > >>>>>>>>>>>>>>>> open a > > >>>>>>>>>>>>>>>>>>>> RocksDB > > >>>>>>>>>>>>>>>>>>>>>>> database, you must open all available column > > >>>> families > > >>>>>> or > > >>>>>>>>>>>>>>>> receive an > > >>>>>>>>>>>>>>>>>>>>>> error. > > >>>>>>>>>>>>>>>>>>>>>>> What currently happens on downgrade is that it > > >>>>>> attempts to > > >>>>>>>>>>>>>>>> open the > > >>>>>>>>>>>>>>>>>>>>>> store, > > >>>>>>>>>>>>>>>>>>>>>>> throws an error about the offsets column > > >> family not > > >>>>>> being > > >>>>>>>>>>>>>>>> opened, > > >>>>>>>>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given > > >> that > > >>>>>>>>>> downgrades > > >>>>>>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> uncommon, I think this is acceptable > > >> behaviour, as > > >>>> the > > >>>>>>>>>>>>>>>> end-state is > > >>>>>>>>>>>>>>>>>>>>>>> consistent, even if it results in an > > >> undesirable > > >>>> state > > >>>>>>>>>>>> restore. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Should I document the upgrade/downgrade > > >> behaviour > > >>>>>>>> explicitly > > >>>>>>>>>>>>>>>> in the > > >>>>>>>>>>>>>>>>>>>> KIP? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna < > > >>>>>>>>>>>>>>>> cado...@apache.org> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Nick! > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the updates! > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>>>>>>>>> Why does StateStore#flush() default to > > >>>>>>>>>>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())? > > >>>>>>>>>>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore > > >>>> after > > >>>>>> this > > >>>>>>>> KIP > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>> released, I would rather throw an unsupported > > >>>>>> operation > > >>>>>>>>>>>>>>>> exception > > >>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>>> default. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>>>>> When would a state store return -1 from > > >>>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() > > >> while > > >>>>>> being > > >>>>>>>>>>>>>>>>>>>> transactional? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Wouldn't > > >>>> StateStore#approximateNumUncommittedBytes() > > >>>>>> also > > >>>>>>>>>>>>>>>> return 0 > > >>>>>>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>>>>>>>>> the state store is transactional but nothing > > >> has > > >>>> been > > >>>>>>>>>> written > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> state store yet? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this > > >> KIP > > >>>> really > > >>>>>>>> need > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> introduce > > >>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()? > > >>>> StateStoreContext > > >>>>>> has > > >>>>>>>>>>>>>>>> already > > >>>>>>>>>>>>>>>>>>>>>>>> appConfigs() which basically exposes the same > > >>>>>> information, > > >>>>>>>>>>>>>>>> i.e., if > > >>>>>>>>>>>>>>>>>>>> EOS > > >>>>>>>>>>>>>>>>>>>>>>>> is enabled or not. > > >>>>>>>>>>>>>>>>>>>>>>>> In one of your previous e-mails you wrote: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore > > >>>> interface > > >>>>>> as > > >>>>>>>>>>>>>>>> loosely > > >>>>>>>>>>>>>>>>>>>> coupled > > >>>>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give > > >>>>>> implementers > > >>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>> freedom, > > >>>>>>>>>>>>>>>>>>>>>>>> and reduce the amount of internal knowledge > > >>>> required." > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> While I understand the intent, I doubt that it > > >>>>>> decreases > > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> coupling of > > >>>>>>>>>>>>>>>>>>>>>>>> a StateStore interface and the Streams engine. > > >>>>>>>>>> READ_COMMITTED > > >>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>>>>> applies to IQ but not to reads by processors. > > >>>> Thus, > > >>>>>>>>>>>>>>>> implementers > > >>>>>>>>>>>>>>>>>>>> need to > > >>>>>>>>>>>>>>>>>>>>>>>> understand how Streams accesses the state > > >> stores. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I would like to hear what others think about > > >> this. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> 4. > > >>>>>>>>>>>>>>>>>>>>>>>> Great exposing new metrics for transactional > > >> state > > >>>>>> stores! > > >>>>>>>>>>>>>>>>>> However, I > > >>>>>>>>>>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate > > >> (in > > >>>> the > > >>>>>>>> docs) > > >>>>>>>>>>>>>>>> the old > > >>>>>>>>>>>>>>>>>>>>>>>> ones. You can find examples of deprecated > > >> metrics > > >>>>>> here: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>> https://kafka.apache.org/documentation/#selector_monitoring > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> 5. > > >>>>>>>>>>>>>>>>>>>>>>>> Why does the KIP mention position files? I do > > >> not > > >>>>>> think > > >>>>>>>> they > > >>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>> related > > >>>>>>>>>>>>>>>>>>>>>>>> to transactions or flushes. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> 6. > > >>>>>>>>>>>>>>>>>>>>>>>> I think we will also need to adapt/add > > >> integration > > >>>>>> tests > > >>>>>>>>>>>>>>>> besides > > >>>>>>>>>>>>>>>>>> unit > > >>>>>>>>>>>>>>>>>>>>>>>> tests. Additionally, we probably need > > >> integration > > >>>> or > > >>>>>>>> system > > >>>>>>>>>>>>>>>> tests > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> verify that upgrades and downgrades between > > >>>>>> transactional > > >>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>> non-transactional state stores work as > > >> expected. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>> Bruno > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in > > >> the > > >>>> KIP, > > >>>>>>>> under > > >>>>>>>>>>>>>>>>>>>> "Rejected > > >>>>>>>>>>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea > > >> worth > > >>>>>> pursuing, > > >>>>>>>>>>>>>>>> but I > > >>>>>>>>>>>>>>>>>>>> believe > > >>>>>>>>>>>>>>>>>>>>>>>>> that it's out of the scope of this KIP, > > >> because > > >>>> it > > >>>>>>>> solves a > > >>>>>>>>>>>>>>>>>>>> different > > >>>>>>>>>>>>>>>>>>>>>> set > > >>>>>>>>>>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of > > >> this > > >>>> one > > >>>>>> has > > >>>>>>>>>>>>>>>> already > > >>>>>>>>>>>>>>>>>> grown > > >>>>>>>>>>>>>>>>>>>>>>>> quite > > >>>>>>>>>>>>>>>>>>>>>>>>> large! > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford < > > >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I've updated the KIP ( > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > > >>>>>>>>>>>>>>>>>>>>>>>> ) > > >>>>>>>>>>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing > > >> back > > >>>>>> "Atomic > > >>>>>>>>>>>>>>>>>>>> Checkpointing" > > >>>>>>>>>>>>>>>>>>>>>>>> (for > > >>>>>>>>>>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think > > >> the one > > >>>>>> thing > > >>>>>>>>>>>>>>>> missing is > > >>>>>>>>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>>>>>>>>> changes to metrics (notably the store > > >> "flush" > > >>>>>> metrics > > >>>>>>>> will > > >>>>>>>>>>>>>>>> need > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>>>> renamed to "commit"). > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> The reason I brought back Atomic > > >> Checkpointing > > >>>> was > > >>>>>> to > > >>>>>>>>>>>>>>>> decouple > > >>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>> flush > > >>>>>>>>>>>>>>>>>>>>>>>>>> from store commit. This is important, > > >> because > > >>>> with > > >>>>>>>>>>>>>>>> Transactional > > >>>>>>>>>>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on > > >>>> *every* > > >>>>>> Task > > >>>>>>>>>>>>>>>> commit, > > >>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>> just when the StateStore is closing, > > >> otherwise > > >>>> our > > >>>>>>>>>>>>>>>> transaction > > >>>>>>>>>>>>>>>>>>>> buffer > > >>>>>>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>>>>>> never be written and persisted, instead > > >> growing > > >>>>>>>> unbounded! > > >>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>>>> experimented > > >>>>>>>>>>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a > > >> store > > >>>>>> flush > > >>>>>>>>>>>>>>>> whenever > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its > > >>>>>> configured > > >>>>>>>>>>>>>>>> size, but > > >>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>> was > > >>>>>>>>>>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer > > >>>> from > > >>>>>> being > > >>>>>>>>>>>>>>>>>> configured > > >>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>>>> unbounded, and it still would have required > > >>>> explicit > > >>>>>>>>>>>>>>>> flushes of > > >>>>>>>>>>>>>>>>>>>>>> RocksDB, > > >>>>>>>>>>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory > > >>>>>> utilization. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the > > >> "right" > > >>>> way > > >>>>>> to > > >>>>>>>>>>>>>>>> resolve > > >>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog > > >> offsets > > >>>> that > > >>>>>>>>>>>>>>>> correspond > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> most > > >>>>>>>>>>>>>>>>>>>>>>>>>> recently written records are always > > >> atomically > > >>>>>> written > > >>>>>>>> to > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> StateStore > > >>>>>>>>>>>>>>>>>>>>>>>>>> (by writing them to the same transaction > > >>>> buffer), > > >>>>>> we can > > >>>>>>>>>>>>>>>> avoid > > >>>>>>>>>>>>>>>>>>>>>> forcibly > > >>>>>>>>>>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk, > > >> letting > > >>>>>> RocksDB > > >>>>>>>>>>>>>>>> flush > > >>>>>>>>>>>>>>>>>> them > > >>>>>>>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>>>>>>> when necessary, without losing any of our > > >>>>>> consistency > > >>>>>>>>>>>>>>>> guarantees. > > >>>>>>>>>>>>>>>>>>>> See > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>> updated KIP for more info. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I have fully implemented these changes, > > >>>> although I'm > > >>>>>>>> still > > >>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>> entirely > > >>>>>>>>>>>>>>>>>>>>>>>>>> happy with the implementation for segmented > > >>>>>> StateStores, > > >>>>>>>>>> so > > >>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>> plan > > >>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests > > >> pass. If > > >>>>>> you'd > > >>>>>>>> like > > >>>>>>>>>>>>>>>> to try > > >>>>>>>>>>>>>>>>>>>> out > > >>>>>>>>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>>>>>>>> review this highly experimental and > > >> incomplete > > >>>>>> branch, > > >>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>> available > > >>>>>>>>>>>>>>>>>>>>>>>> here: > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0 > > >>>>>>>> . > > >>>>>>>>>>>>>>>> Note: > > >>>>>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>>>>>> built > > >>>>>>>>>>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable > > >> base > > >>>> to > > >>>>>> build > > >>>>>>>>>>>>>>>> and test > > >>>>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>>> on, > > >>>>>>>>>>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples > > >> comparisons > > >>>> in a > > >>>>>>>> live > > >>>>>>>>>>>>>>>>>>>>>> environment. I > > >>>>>>>>>>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's > > >> nearer > > >>>>>>>>>> completion > > >>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> has > > >>>>>>>>>>>>>>>>>>>>>> been > > >>>>>>>>>>>>>>>>>>>>>>>>>> proven on our main application. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing > > >> and > > >>>>>> testing: > > >>>>>>>>>>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window) > > >>>> stores > > >>>>>>>>>>>>>>>>>>>>>>>>>> - Global stores > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> As I do not currently use either of these, > > >> so my > > >>>>>> primary > > >>>>>>>>>>>>>>>> test > > >>>>>>>>>>>>>>>>>>>>>>>> environment > > >>>>>>>>>>>>>>>>>>>>>>>>>> doesn't test these areas. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next > > >> week > > >>>> for > > >>>>>> a few > > >>>>>>>>>>>>>>>> weeks, > > >>>>>>>>>>>>>>>>>> so > > >>>>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>>>>>> not have time to move this forward until > > >> late > > >>>>>> August. > > >>>>>>>> That > > >>>>>>>>>>>>>>>> said, > > >>>>>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just > > >>>> won't be > > >>>>>>>> able > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> respond > > >>>>>>>>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>>>>>> quickly as usual. > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford < > > >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact > > >> on IQ > > >>>> is > > >>>>>> not > > >>>>>>>>>>>>>>>> something > > >>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>> had > > >>>>>>>>>>>>>>>>>>>>>>>>>>> considered. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> What about atomically updating the state > > >> store > > >>>>>> from the > > >>>>>>>>>>>>>>>>>>>> transaction > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing > > >> the > > >>>>>>>> checkpoint > > >>>>>>>>>>>>>>>> (thus, > > >>>>>>>>>>>>>>>>>>>>>>>> flushing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of > > >> data > > >>>>>> and/or > > >>>>>>>>>>>>>>>> number of > > >>>>>>>>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> intervals? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you > > >> suggesting > > >>>>>> that we > > >>>>>>>>>>>>>>>> add an > > >>>>>>>>>>>>>>>>>>>>>>>> additional > > >>>>>>>>>>>>>>>>>>>>>>>>>>> config for the max number of commit > > >> intervals > > >>>>>> between > > >>>>>>>>>>>>>>>>>> checkpoints? > > >>>>>>>>>>>>>>>>>>>>>> That > > >>>>>>>>>>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the > > >>>>>> transaction > > >>>>>>>>>>>>>>>> buffers > > >>>>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>>>>> nearly > > >>>>>>>>>>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of > > >> commit > > >>>>>>>> intervals > > >>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>>>>> elapsed, > > >>>>>>>>>>>>>>>>>>>>>>>>>>> whichever comes first? > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although > > >> this > > >>>>>>>> re-ignites > > >>>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>>>> earlier > > >>>>>>>>>>>>>>>>>>>>>>>>>>> debate about whether a config should be > > >>>> measured in > > >>>>>>>>>>>>>>>> "number of > > >>>>>>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute > > >> time. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the > > >> reason > > >>>> I > > >>>>>> was > > >>>>>>>>>>>>>>>> pursuing > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> Atomic > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable > > >> flush > > >>>> from > > >>>>>>>>>>>>>>>> checkpointing, > > >>>>>>>>>>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every > > >> commit > > >>>>>> without > > >>>>>>>> any > > >>>>>>>>>>>>>>>>>>>> performance > > >>>>>>>>>>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely > > >> the > > >>>>>> "best" > > >>>>>>>>>>>>>>>> solution, > > >>>>>>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>>>>>>> I'm not > > >>>>>>>>>>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back > > >> into > > >>>> this > > >>>>>> KIP. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm currently working on moving all the > > >>>>>> transactional > > >>>>>>>>>>>> logic > > >>>>>>>>>>>>>>>>>>>> directly > > >>>>>>>>>>>>>>>>>>>>>>>> into > > >>>>>>>>>>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with > > >> the > > >>>>>>>>>>>>>>>>>>>>>> StateStore#newTransaction > > >>>>>>>>>>>>>>>>>>>>>>>>>>> method, and reduces the number of new > > >> classes > > >>>>>>>> introduced, > > >>>>>>>>>>>>>>>>>>>>>> significantly > > >>>>>>>>>>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and > > >> the > > >>>>>>>> complexity > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>> drastically > > >>>>>>>>>>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic > > >>>> Checkpoints > > >>>>>>>> into > > >>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>> KIP. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna > > >> < > > >>>>>>>>>>>>>>>> cado...@apache.org> > > >>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> As far as I understand, you want to > > >> atomically > > >>>>>> update > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the > > >>>> memtable > > >>>>>> of a > > >>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit > > >> time > > >>>>>> elapsed > > >>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>> after > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that > > >> would > > >>>> lead > > >>>>>> to > > >>>>>>>>>>>>>>>> exceeding > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes > > >>>> before the > > >>>>>>>> next > > >>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>>>>>>>> interval > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ends. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would > > >> commit > > >>>>>> every > > >>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>>>> interval > > >>>>>>>>>>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store will only be atomically > > >>>> updated > > >>>>>>>> roughly > > >>>>>>>>>>>>>>>> every > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of > > >>>> data. > > >>>>>> Also > > >>>>>>>> IQ > > >>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>> then > > >>>>>>>>>>>>>>>>>>>>>>>> only > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> see new data roughly every > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> After a failure the state store needs to > > >>>> restore > > >>>>>> up to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Is this correct? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> What about atomically updating the state > > >> store > > >>>>>> from > > >>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> transaction > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing > > >> the > > >>>>>>>> checkpoint > > >>>>>>>>>>>>>>>> (thus, > > >>>>>>>>>>>>>>>>>>>>>>>> flushing > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of > > >> data > > >>>>>> and/or > > >>>>>>>>>>>>>>>> number of > > >>>>>>>>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have > > >> the > > >>>> same > > >>>>>> delay > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> records > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because > > >> both > > >>>>>> would > > >>>>>>>>>>>>>>>> appear > > >>>>>>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However, > > >>>> after a > > >>>>>>>> failure > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> still needs to restore up to > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes > > >>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> it might restore data that is already in > > >> the > > >>>> state > > >>>>>>>> store > > >>>>>>>>>>>>>>>>>> because > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable > > >> offset > > >>>>>> (i.e. > > >>>>>>>> the > > >>>>>>>>>>>>>>>> last > > >>>>>>>>>>>>>>>>>>>>>> committed > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring > > >>>> data > > >>>>>> that > > >>>>>>>> is > > >>>>>>>>>>>>>>>> already > > >>>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should > > >> not > > >>>>>> violated. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> This solution needs at least one new > > >> config to > > >>>>>> specify > > >>>>>>>>>>>>>>>> when a > > >>>>>>>>>>>>>>>>>>>>>>>> checkpoint > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> should be written. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail > > >>>> that > > >>>>>> does > > >>>>>>>> not > > >>>>>>>>>>>>>>>> change > > >>>>>>>>>>>>>>>>>>>>>>>> anything > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit > > >>>> interval > > >>>>>> is 30 > > >>>>>>>>>>>>>>>> seconds, > > >>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>> five > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> seconds. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch > > >> on > > >>>> our > > >>>>>>>> staging > > >>>>>>>>>>>>>>>>>>>>>> environment, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting it through its paces in our > > >>>> non-trivial > > >>>>>>>>>>>>>>>> application. > > >>>>>>>>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> already > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate > > >> that > > >>>> we > > >>>>>> saw > > >>>>>>>> the > > >>>>>>>>>>>>>>>> last > > >>>>>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP, > > >> but > > >>>> this > > >>>>>>>> time, > > >>>>>>>>>> I > > >>>>>>>>>>>>>>>>>> think I > > >>>>>>>>>>>>>>>>>>>>>> know > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> why. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, > > >> which is > > >>>>>> called > > >>>>>>>> at > > >>>>>>>>>>>>>>>> the end > > >>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Task > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit process, has the following > > >> behaviour: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Under ALOS: checkpoint the > > >> state > > >>>>>> stores. > > >>>>>>>>>> This > > >>>>>>>>>>>>>>>>>> includes > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> flushing memtables in RocksDB. > > >>>> This is > > >>>>>>>>>>>> acceptable > > >>>>>>>>>>>>>>>>>>>> because the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> default > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit.interval.ms is 5 > > >> seconds, > > >>>> so > > >>>>>>>> forcibly > > >>>>>>>>>>>>>>>> flushing > > >>>>>>>>>>>>>>>>>>>>>> memtables > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> every 5 > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> seconds is acceptable for most > > >>>>>>>> applications. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Under EOS: checkpointing is > > >> not > > >>>> done, > > >>>>>>>>>> *unless* > > >>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>> being > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> forced, due > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to e.g. the Task closing or > > >> being > > >>>>>> revoked. > > >>>>>>>>>> This > > >>>>>>>>>>>>>>>> means > > >>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> under > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> normal > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> processing conditions, the > > >> state > > >>>> stores > > >>>>>>>> will > > >>>>>>>>>> not > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>> checkpointed, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and will > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have memtables flushed at > > >> all , > > >>>>>> unless > > >>>>>>>>>>>> RocksDB > > >>>>>>>>>>>>>>>>>>>> decides to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush them on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> its own. Checkpointing stores > > >> and > > >>>>>>>>>> force-flushing > > >>>>>>>>>>>>>>>> their > > >>>>>>>>>>>>>>>>>>>>>> memtables > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is only > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> done when a Task is being > > >> closed. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint > > >>>> stores on > > >>>>>> at > > >>>>>>>>>>>> least > > >>>>>>>>>>>>>>>>>> *some* > > >>>>>>>>>>>>>>>>>>>>>>>> normal > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the > > >> RocksDB > > >>>>>>>> transaction > > >>>>>>>>>>>>>>>>>> buffers > > >>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets > > >> are > > >>>>>> synced to > > >>>>>>>>>>>>>>>> disk to > > >>>>>>>>>>>>>>>>>>>>>> prevent > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> restores from getting out of hand. > > >>>> Consequently, > > >>>>>> my > > >>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task > > >> commit, > > >>>>>> which > > >>>>>>>> is > > >>>>>>>>>>>>>>>> far too > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> frequent. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000 > > >> records, > > >>>>>> which > > >>>>>>>> is > > >>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> change > > >>>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing > > >> performance > > >>>>>> problems > > >>>>>>>> for > > >>>>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> applications. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and > > >> I'm > > >>>>>>>> currently > > >>>>>>>>>>>>>>>> leaning > > >>>>>>>>>>>>>>>>>>>>>>>> towards > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> using the > > >>>> statestore.transaction.buffer.max.bytes > > >>>>>>>>>>>>>>>>>> configuration > > >>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to > > >>>> exceed it. > > >>>>>>>> This > > >>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> complement the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing "early Task commit" > > >> functionality > > >>>> that > > >>>>>> this > > >>>>>>>>>>>>>>>>>>>> configuration > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides, in the following way: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Currently, we use > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes > > >>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> force an > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> early Task commit if processing > > >>>> more > > >>>>>>>> records > > >>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>> cause > > >>>>>>>>>>>>>>>>>>>> our > > >>>>>>>>>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions to exceed the > > >> memory > > >>>>>> assigned > > >>>>>>>> to > > >>>>>>>>>>>>>>>> them. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - New functionality: when a > > >> Task > > >>>> *does* > > >>>>>>>>>> commit, > > >>>>>>>>>>>>>>>> we will > > >>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the stores (and hence flush the > > >>>>>> transaction > > >>>>>>>>>>>>>>>> buffers) > > >>>>>>>>>>>>>>>>>>>> unless > > >>>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expect to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> cross the > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes > > >>>>>>>>>>>>>>>>>>>> threshold > > >>>>>>>>>>>>>>>>>>>>>>>> before > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the next > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick > > >> Telford < > > >>>>>>>>>>>>>>>>>>>> nick.telf...@gmail.com > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I > > >> meant in > > >>>>>> terms of > > >>>>>>>>>>>>>>>>>>>> understanding > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance profile under various > > >>>>>> circumstances. The > > >>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>> complex > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would > > >> be for > > >>>>>> users > > >>>>>>>> to > > >>>>>>>>>>>>>>>>>>>> understand > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance they see. For example, > > >> spilling > > >>>>>> records > > >>>>>>>> to > > >>>>>>>>>>>>>>>> disk > > >>>>>>>>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold > > >>>> would, I > > >>>>>>>>>> expect, > > >>>>>>>>>>>>>>>>>> reduce > > >>>>>>>>>>>>>>>>>>>>>> write > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write > > >>>> throughput > > >>>>>> could > > >>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>> unexpected, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> potentially difficult to > > >>>> diagnose/understand for > > >>>>>>>>>> users. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early > > >> commit" > > >>>>>> concept is > > >>>>>>>>>>>>>>>>>> relatively > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document, > > >> and > > >>>>>>>>>>>> conceptually > > >>>>>>>>>>>>>>>>>> fairly > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> obvious to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to > > >>>> make it > > >>>>>>>>>> easier > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> understand > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it happens though. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. (the second one) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an > > >>>> indirect > > >>>>>> way > > >>>>>>>> of > > >>>>>>>>>>>>>>>>>> telling > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> StateStores > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether they should be transactional. > > >>>>>> READ_COMMITTED > > >>>>>>>>>>>>>>>>>>>> essentially > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> requires > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that > > >> two > > >>>>>> threads > > >>>>>>>>>>>>>>>> calling > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes > > >>>> from > > >>>>>> the > > >>>>>>>>>> other > > >>>>>>>>>>>>>>>>>>>>>> transaction > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> until > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> they have been committed. With > > >>>>>> READ_UNCOMMITTED, all > > >>>>>>>>>>>>>>>> bets are > > >>>>>>>>>>>>>>>>>>>> off, > > >>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe > > >> written > > >>>>>> records > > >>>>>>>> at > > >>>>>>>>>>>>>>>> any > > >>>>>>>>>>>>>>>>>>>> time, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> which is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That > > >> said, > > >>>>>>>> StateStores > > >>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>> free > > >>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implement > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these guarantees however they can, > > >> which is > > >>>> a > > >>>>>> bit > > >>>>>>>> more > > >>>>>>>>>>>>>>>>>> relaxed > > >>>>>>>>>>>>>>>>>>>>>> than > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions". > > >> For > > >>>>>> example, > > >>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>> RocksDB > > >>>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED == > > >>>> WBWI-based > > >>>>>>>>>>>>>>>>>> "transactions", > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the > > >>>>>> database. > > >>>>>>>> But > > >>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>> other > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> storage > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to > > >> *always* > > >>>> use > > >>>>>>>>>>>>>>>> transactions, > > >>>>>>>>>>>>>>>>>>>> even > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> when > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage > > >> engines > > >>>>>> that > > >>>>>>>>>> don't > > >>>>>>>>>>>>>>>>>> provide > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation > > >> guarantees > > >>>> can > > >>>>>> be > > >>>>>>>> met > > >>>>>>>>>>>>>>>> using a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> different > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> technique. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the > > >> StateStore > > >>>>>> interface > > >>>>>>>> as > > >>>>>>>>>>>>>>>>>> loosely > > >>>>>>>>>>>>>>>>>>>>>>>> coupled > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to > > >> give > > >>>>>>>>>>>>>>>> implementers > > >>>>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> freedom, and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge > > >>>>>> required. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That said, I understand that > > >>>> "IsolationLevel" > > >>>>>> might > > >>>>>>>>>> not > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> right > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it > > >> much > > >>>> more > > >>>>>>>>>>>>>>>> explicit if > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> required, e.g. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean transactional() > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 7-8. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can make these changes either later > > >> today > > >>>> or > > >>>>>>>>>>>> tomorrow. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Small update: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and > > >> fixed a > > >>>>>> bunch of > > >>>>>>>>>>>>>>>> issues > > >>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> needed > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests > > >> pass, > > >>>>>> which is > > >>>>>>>>>>>>>>>>>> promising, > > >>>>>>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to undergo some performance > > >> testing. I > > >>>>>> haven't > > >>>>>>>>>>>>>>>> (yet) > > >>>>>>>>>>>>>>>>>>>> worked > > >>>>>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, > > >> but I > > >>>>>> would > > >>>>>>>>>>>>>>>> expect > > >>>>>>>>>>>>>>>>>> that, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no > > >>>> difference. The > > >>>>>>>>>> branch > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>> available > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> at > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c > > >>>>>>>>>> if > > >>>>>>>>>>>>>>>>>> anyone > > >>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interested in taking an early look. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno > > >> Cadonna > > >>>> < > > >>>>>>>>>>>>>>>>>>>> cado...@apache.org> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was > > >> actually > > >>>> also > > >>>>>> my > > >>>>>>>>>>>>>>>> point. I > > >>>>>>>>>>>>>>>>>>>>>>>> understood > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion > > >> path > > >>>> as > > >>>>>> a way > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>> avoid > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> early > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the > > >>>> intent. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually > > >> it is > > >>>>>> public > > >>>>>>>>>>>>>>>> API. My > > >>>>>>>>>>>>>>>>>>>>>> question > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal > > >>>>>> processing. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the > > >> size > > >>>> of the > > >>>>>>>>>>>>>>>> transaction > > >>>>>>>>>>>>>>>>>>>>>> buffer > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit > > >> affects > > >>>> the > > >>>>>> whole > > >>>>>>>>>>>>>>>>>> processing > > >>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a > > >> state > > >>>> store > > >>>>>> is > > >>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> confined to > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and > > >>>>>> changes the > > >>>>>>>>>>>>>>>> behavior > > >>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>> other > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you > > >> that > > >>>> it > > >>>>>> is a > > >>>>>>>>>>>>>>>> decent > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> compromise. I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides > > >> and > > >>>> list > > >>>>>> the > > >>>>>>>>>>>>>>>> options to > > >>>>>>>>>>>>>>>>>>>>>>>> overcome > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all > > >>>> options > > >>>>>> seem > > >>>>>>>>>>>> quite > > >>>>>>>>>>>>>>>>>> heavy > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> compared > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand > > >> what you > > >>>>>> mean > > >>>>>>>> with > > >>>>>>>>>>>>>>>> "less > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> predictable > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users", though. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the > > >>>> alternatives > > >>>>>>>> really > > >>>>>>>>>>>>>>>>>>>>>> interesting. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> But I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good > > >> and > > >>>> we > > >>>>>> should > > >>>>>>>>>>>>>>>> continue > > >>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>>>>>>> it! > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my > > >> e-mail on > > >>>>>> June > > >>>>>>>>>> 20th: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning > > >> behind > > >>>>>> putting > > >>>>>>>>>>>>>>>> isolation > > >>>>>>>>>>>>>>>>>>>>>> level > > >>>>>>>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should > > >>>> that > > >>>>>> also > > >>>>>>>> be > > >>>>>>>>>> a > > >>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> give > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to > > >> decide > > >>>>>> whether > > >>>>>>>> to > > >>>>>>>>>>>>>>>> turn on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions or not? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned > > >> about > > >>>>>> how do > > >>>>>>>>>> you > > >>>>>>>>>>>>>>>> know > > >>>>>>>>>>>>>>>>>>>> if a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written > > >> under > > >>>> EOS, > > >>>>>> if > > >>>>>>>> you > > >>>>>>>>>>>>>>>> do not > > >>>>>>>>>>>>>>>>>>>>>> have a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to know if the state store is > > >>>> transactional or > > >>>>>> not. > > >>>>>>>>>> If > > >>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can > > >> be > > >>>>>> written > > >>>>>>>>>>>>>>>> during > > >>>>>>>>>>>>>>>>>>>> normal > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state > > >> store > > >>>> is not > > >>>>>>>>>>>>>>>>>> transactional, > > >>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written > > >> under > > >>>> EOS. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 7. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My point was about not only > > >> considering the > > >>>>>> bytes > > >>>>>>>> in > > >>>>>>>>>>>>>>>> memory > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>>>> config > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but > > >> also > > >>>>>> bytes > > >>>>>>>> that > > >>>>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> spilled > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering > > >>>> whether you > > >>>>>>>>>> should > > >>>>>>>>>>>>>>>>>> remove > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory > > >>>> bytes to > > >>>>>> be > > >>>>>>>>>> used > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store > > >> records." My > > >>>>>>>> thinking > > >>>>>>>>>>>>>>>> was > > >>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> even > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> if a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to > > >>>> disk, > > >>>>>>>>>> limiting > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> overall > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> bytes > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it > > >> again > > >>>> and > > >>>>>>>>>>>>>>>> considering > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> recent > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much > > >>>> sense > > >>>>>>>> anymore. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the name > > >>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes that > > >>>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 8. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A high-level description (without > > >>>>>> implementation > > >>>>>>>>>>>>>>>> details) of > > >>>>>>>>>>>>>>>>>>>> how > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of > > >> changelog > > >>>>>>>>>>>>>>>> transactions, > > >>>>>>>>>>>>>>>>>>>> state > > >>>>>>>>>>>>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be > > >>>> great. > > >>>>>>>> Would > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>> great > > >>>>>>>>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the > > >>>>>> behavior in > > >>>>>>>>>>>>>>>> case of > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> failure. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional > > >> state > > >>>>>> store > > >>>>>>>>>>>>>>>> recover > > >>>>>>>>>>>>>>>>>>>> after a > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the > > >>>> transaction > > >>>>>>>> buffer, > > >>>>>>>>>>>>>>>> etc. > > >>>>>>>>>>>>>>>>>>>> (that > > >>>>>>>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> what > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.) > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that > > >>>>>>>>>>>>>>>> WriteBatchWithIndex > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to > > >>>> exceed) > > >>>>>>>>>>>>>>>> configured > > >>>>>>>>>>>>>>>>>>>> memory > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> needs to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit? > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns. > > >>>> Ultimately, > > >>>>>> any > > >>>>>>>>>>>>>>>> approach > > >>>>>>>>>>>>>>>>>>>> based > > >>>>>>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cracking > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in > > >>>> ways > > >>>>>> it's > > >>>>>>>> not > > >>>>>>>>>>>>>>>> really > > >>>>>>>>>>>>>>>>>>>>>>>> designed > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for is > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen > > >> performance > > >>>> or > > >>>>>>>>>>>>>>>> consistency > > >>>>>>>>>>>>>>>>>>>> issues. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing > > >> these > > >>>>>> early > > >>>>>>>>>>>>>>>> commits? > > >>>>>>>>>>>>>>>>>>>> While > > >>>>>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ideal, I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to > > >>>> ensure > > >>>>>>>>>>>>>>>> consistency > > >>>>>>>>>>>>>>>>>>>> whilst > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> maintaining > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem > > >> *very* > > >>>>>>>>>>>>>>>> complicated, and > > >>>>>>>>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for > > >> users > > >>>> as a > > >>>>>>>>>>>>>>>> consequence. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of > > >> this > > >>>>>> KIP is > > >>>>>>>>>>>>>>>> growing a > > >>>>>>>>>>>>>>>>>>>> bit > > >>>>>>>>>>>>>>>>>>>>>>>> out > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss > > >> ideas > > >>>> for > > >>>>>>>> future > > >>>>>>>>>>>>>>>>>>>>>>>> improvements, I > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think > > >>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope > > >> down > > >>>> to a > > >>>>>>>> design > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> achieves > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized > > >>>>>> restorations > > >>>>>>>>>>>>>>>> during > > >>>>>>>>>>>>>>>>>> dirty > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design > > >> that > > >>>>>> this KIP > > >>>>>>>>>>>>>>>> produces > > >>>>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ultimately > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially > > >> if > > >>>> the > > >>>>>> bulk > > >>>>>>>> of > > >>>>>>>>>>>>>>>> it is > > >>>>>>>>>>>>>>>>>>>>>> internal > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week > > >>>> trying > > >>>>>> to > > >>>>>>>>>>>>>>>> re-work > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> original > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove > > >> the > > >>>>>>>>>>>>>>>> newTransaction() > > >>>>>>>>>>>>>>>>>>>>>> method, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> such > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of > > >>>>>>>> RocksDBStore. > > >>>>>>>>>>>>>>>> That > > >>>>>>>>>>>>>>>>>>>> way, if > > >>>>>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> want to > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the > > >> future, > > >>>>>> like > > >>>>>>>> the > > >>>>>>>>>>>>>>>> SST > > >>>>>>>>>>>>>>>>>> file > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> management > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so > > >> with > > >>>>>> little/no > > >>>>>>>>>>>> API > > >>>>>>>>>>>>>>>>>>>> changes. > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>>> > > >>>> > > >> > > > >