Guozhang—I agree, I am in favor of moving forward with the KIP now that the
Transactional State Stores will be behind a feature flag.

Nick—I just did a bit more light testing of your branch `KIP-892-3.5.0`
with your most recent changes. I couldn't detect a performance difference
versus trunk (in the past there was a slight degradation of performance on
the restoration path, but that has been fixed). I don't believe that your
branch has the state updater thread enabled, so I didn't test that path too
heavily.

As expected, however, our internal correctness tests failed due to the IQ
read-your-own-writes issue we discussed previously. The community as a
whole would vastly benefit from this KIP getting over the finish line in
3.7.0, and so long as it is behind a feature flag so that we at LittleHorse
can still guarantee RYOW for our users, I think it's purely a win for the
community. Until we can figure out how to get read_committed, we will just
be smart with standby's + rebalances etc (:

Thanks Nick! This improvement is long overdue for the streams community.

Colt McNealy

*Founder, LittleHorse.dev*


On Sun, Oct 29, 2023 at 11:30 AM Guozhang Wang <guozhang.wang...@gmail.com>
wrote:

> I'd agree with you guys that as long as we are in agreement about the
> configuration semantics, that would be a big win to move forward for
> this KIP. As for the TaskCorruptedException handling like wiping state
> stores, we can discuss that in the PR rather than in the KIP.
>
> Just to clarify, I'm onboard with the latest proposal, and probably we
> can move on for voting on this KIP now?
>
> Guozhang
>
> On Thu, Oct 19, 2023 at 5:33 AM Bruno Cadonna <cado...@apache.org> wrote:
> >
> > Hi Nick,
> >
> > What you and Lucas wrote about the different configurations of ALOS/EOS
> > and READ_COMMITTED/READ_UNCOMMITTED make sense to me. My earlier
> > concerns about changelogs diverging from the content of the local state
> > stores turned out to not apply. So I think, we can move on with those
> > configurations.
> >
> > Regarding the TaskCorruptedException and wiping out the state stores
> > under EOS, couldn't we abort the transaction on the state store and
> > close the task dirty? If the Kafka transaction was indeed committed, the
> > store would restore the missing part from the changelog topic. If the
> > Kafka transaction was not committed, changelog topic and state store are
> > in-sync.
> >
> > In any case, IMO those are implementation details that we do not need to
> > discuss and solve in the KIP discussion. We can solve them on the PR.
> > The important thing is that the processing guarantees hold.
> >
> > Best,
> > Bruno
> >
> > On 10/18/23 3:56 PM, Nick Telford wrote:
> > > Hi Lucas,
> > >
> > > TaskCorruptedException is how Streams signals that the Task state
> needs to
> > > be wiped, so we can't retain that exception without also wiping state
> on
> > > timeouts.
> > >
> > > Regards,
> > > Nick
> > >
> > > On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy <lbruts...@confluent.io
> .invalid>
> > > wrote:
> > >
> > >> Hi Nick,
> > >>
> > >> I think indeed the better behavior would be to retry commitTransaction
> > >> until we risk running out of time to meet `max.poll.interval.ms`.
> > >>
> > >> However, if it's handled as a `TaskCorruptedException` at the moment,
> > >> I would do the same in this KIP, and leave exception handling
> > >> improvements to future work. This KIP is already improving the
> > >> situation a lot by not wiping the state store.
> > >>
> > >> Cheers,
> > >> Lucas
> > >>
> > >> On Tue, Oct 17, 2023 at 3:51 PM Nick Telford <nick.telf...@gmail.com>
> > >> wrote:
> > >>>
> > >>> Hi Lucas,
> > >>>
> > >>> Yeah, this is pretty much the direction I'm thinking of going in
> now. You
> > >>> make an interesting point about committing on-error under
> > >>> ALOS/READ_COMMITTED, although I haven't had a chance to think
> through the
> > >>> implications yet.
> > >>>
> > >>> Something that I ran into earlier this week is an issue with the new
> > >>> handling of TimeoutException. Without TX stores, TimeoutException
> under
> > >> EOS
> > >>> throws a TaskCorruptedException, which wipes the stores. However,
> with TX
> > >>> stores, TimeoutException is now just bubbled up and dealt with as it
> is
> > >>> under ALOS. The problem arises when the Producer#commitTransaction
> call
> > >>> times out: Streams attempts to ignore the error and continue
> producing,
> > >>> which causes the next call to Producer#send to throw
> > >>> "IllegalStateException: Cannot attempt operation `send` because the
> > >>> previous call to `commitTransaction` timed out and must be retried".
> > >>>
> > >>> I'm not sure what we should do here: retrying the commitTransaction
> seems
> > >>> logical, but what if it times out again? Where do we draw the line
> and
> > >>> shutdown the instance?
> > >>>
> > >>> Regards,
> > >>> Nick
> > >>>
> > >>> On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy <lbruts...@confluent.io
> > >> .invalid>
> > >>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> I think I liked your suggestion of allowing EOS with
> READ_UNCOMMITTED,
> > >>>> but keep wiping the state on error, and I'd vote for this solution
> > >>>> when introducing `default.state.isolation.level`. This way, we'd
> have
> > >>>> the most low-risk roll-out of this feature (no behavior change
> without
> > >>>> reconfiguration), with the possibility of switching to the most
> sane /
> > >>>> battle-tested default settings in 4.0. Essentially, we'd have a
> > >>>> feature flag but call it `default.state.isolation.level` and don't
> > >>>> have to deprecate it later.
> > >>>>
> > >>>> So the possible configurations would then be this:
> > >>>>
> > >>>> 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB,
> IQ
> > >>>> reads from DB.
> > >>>> 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> > >>>> WriteBatch/DB. Flush on error (see note below).
> > >>>> 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> > >>>> reads from DB. Wipe state on error.
> > >>>> 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> > >>>> WriteBatch/DB.
> > >>>>
> > >>>> I believe the feature is important enough that we will see good
> > >>>> adoption even without changing the default. In 4.0, when we have
> seen
> > >>>> this being adopted and is battle-tested, we make READ_COMMITTED the
> > >>>> default for EOS, or even READ_COMITTED always the default, depending
> > >>>> on our experiences. And we could add a clever implementation of
> > >>>> READ_UNCOMITTED with WriteBatches later.
> > >>>>
> > >>>> The only smell here is that `default.state.isolation.level` wouldn't
> > >>>> be purely an IQ setting, but it would also (slightly) change the
> > >>>> behavior of the processing, but that seems unavoidable as long as we
> > >>>> haven't solve READ_UNCOMITTED IQ with WriteBatches.
> > >>>>
> > >>>> Minor: As for Bruno's point 4, I think if we are concerned about
> this
> > >>>> behavior (we don't necessarily have to be, because it doesn't
> violate
> > >>>> ALOS guarantees as far as I can see), we could make
> > >>>> ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing
> > >>>> the WriteBatch on error (obviously, only if we have a chance to do
> > >>>> that).
> > >>>>
> > >>>> Cheers,
> > >>>> Lucas
> > >>>>
> > >>>> On Mon, Oct 16, 2023 at 12:19 PM Nick Telford <
> nick.telf...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>> Hi Guozhang,
> > >>>>>
> > >>>>> The KIP as it stands introduces a new configuration,
> > >>>>> default.state.isolation.level, which is independent of
> > >> processing.mode.
> > >>>>> It's intended that this new configuration be used to configure a
> > >> global
> > >>>> IQ
> > >>>>> isolation level in the short term, with a future KIP introducing
> the
> > >>>>> capability to change the isolation level on a per-query basis,
> > >> falling
> > >>>> back
> > >>>>> to the "default" defined by this config. That's why I called it
> > >>>> "default",
> > >>>>> for future-proofing.
> > >>>>>
> > >>>>> However, it currently includes the caveat that READ_UNCOMMITTED is
> > >> not
> > >>>>> available under EOS. I think this is the coupling you are alluding
> > >> to?
> > >>>>>
> > >>>>> This isn't intended to be a restriction of the API, but is
> currently
> > >> a
> > >>>>> technical limitation. However, after discussing with some users
> about
> > >>>>> use-cases that would require READ_UNCOMMITTED under EOS, I'm
> > >> inclined to
> > >>>>> remove that clause and put in the necessary work to make that
> > >> combination
> > >>>>> possible now.
> > >>>>>
> > >>>>> I currently see two possible approaches:
> > >>>>>
> > >>>>>     1. Disable TX StateStores internally when the IsolationLevel is
> > >>>>>     READ_UNCOMMITTED and the processing.mode is EOS. This is more
> > >>>> difficult
> > >>>>>     than it sounds, as there are many assumptions being made
> > >> throughout
> > >>>> the
> > >>>>>     internals about the guarantees StateStores provide. It would
> > >>>> definitely add
> > >>>>>     a lot of extra "if (read_uncommitted && eos)" branches,
> > >> complicating
> > >>>>>     maintenance and testing.
> > >>>>>     2. Invest the time *now* to make READ_UNCOMMITTED of EOS
> > >> StateStores
> > >>>>>     possible. I have some ideas on how this could be achieved, but
> > >> they
> > >>>> would
> > >>>>>     need testing and could introduce some additional issues. The
> > >> benefit
> > >>>> of
> > >>>>>     this approach is that it would make query-time IsolationLevels
> > >> much
> > >>>> simpler
> > >>>>>     to implement in the future.
> > >>>>>
> > >>>>> Unfortunately, both will require considerable work that will
> further
> > >>>> delay
> > >>>>> this KIP, which was the reason I placed the restriction in the KIP
> > >> in the
> > >>>>> first place.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Nick
> > >>>>>
> > >>>>> On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <
> > >> guozhang.wang...@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hello Nick,
> > >>>>>>
> > >>>>>> First of all, thanks a lot for the great effort you've put in
> > >> driving
> > >>>>>> this KIP! I really like it coming through finally, as many people
> > >> in
> > >>>>>> the community have raised this. At the same time I honestly feel a
> > >> bit
> > >>>>>> ashamed for not putting enough of my time supporting it and
> > >> pushing it
> > >>>>>> through the finish line (you raised this KIP almost a year ago).
> > >>>>>>
> > >>>>>> I briefly passed through the DISCUSS thread so far, not sure I've
> > >> 100
> > >>>>>> percent digested all the bullet points. But with the goal of
> > >> trying to
> > >>>>>> help take it through the finish line in mind, I'd want to throw
> > >>>>>> thoughts on top of my head only on the point #4 above which I felt
> > >> may
> > >>>>>> be the main hurdle for the current KIP to drive to a consensus
> now.
> > >>>>>>
> > >>>>>> The general question I asked myself is, whether we want to couple
> > >> "IQ
> > >>>>>> reading mode" with "processing mode". While technically I tend to
> > >>>>>> agree with you that, it's feels like a bug if some single user
> > >> chose
> > >>>>>> "EOS" for processing mode while choosing "read uncommitted" for IQ
> > >>>>>> reading mode, at the same time, I'm thinking if it's possible that
> > >>>>>> there could be two different persons (or even two teams) that
> > >> would be
> > >>>>>> using the stream API to build the app, and the IQ API to query the
> > >>>>>> running state of the app. I know this is less of a technical thing
> > >> but
> > >>>>>> rather a more design stuff, but if it could be ever the case, I'm
> > >>>>>> wondering if the personale using the IQ API knows about the risks
> > >> of
> > >>>>>> using read uncommitted but still chose so for the favor of
> > >>>>>> performance, no matter if the underlying stream processing mode
> > >>>>>> configured by another personale is EOS or not. In that regard, I'm
> > >>>>>> leaning towards a "leaving the door open, and close it later if we
> > >>>>>> found it's a bad idea" aspect with a configuration that we can
> > >>>>>> potentially deprecate than "shut the door, clean for everyone".
> > >> More
> > >>>>>> specifically, allowing the processing mode / IQ read mode to be
> > >>>>>> decoupled, and if we found that there's no such cases as I
> > >> speculated
> > >>>>>> above or people started complaining a lot, we can still enforce
> > >>>>>> coupling them.
> > >>>>>>
> > >>>>>> Again, just my 2c here. Thanks again for the great patience and
> > >>>>>> diligence on this KIP.
> > >>>>>>
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <
> > >> nick.telf...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi Bruno,
> > >>>>>>>
> > >>>>>>> 4.
> > >>>>>>> I'll hold off on making that change until we have a consensus as
> > >> to
> > >>>> what
> > >>>>>>> configuration to use to control all of this, as it'll be
> > >> affected by
> > >>>> the
> > >>>>>>> decision on EOS isolation levels.
> > >>>>>>>
> > >>>>>>> 5.
> > >>>>>>> Done. I've chosen "committedOffsets".
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Nick
> > >>>>>>>
> > >>>>>>> On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Nick,
> > >>>>>>>>
> > >>>>>>>> 1.
> > >>>>>>>> Yeah, you are probably right that it does not make too much
> > >> sense.
> > >>>>>>>> Thanks for the clarification!
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 4.
> > >>>>>>>> Yes, sorry for the back and forth, but I think for the sake of
> > >> the
> > >>>> KIP
> > >>>>>>>> it is better to let the ALOS behavior as it is for now due to
> > >> the
> > >>>>>>>> possible issues you would run into. Maybe we can find a
> > >> solution
> > >>>> in the
> > >>>>>>>> future. Now the question returns to whether we really need
> > >>>>>>>> default.state.isolation.level. Maybe the config could be the
> > >>>> feature
> > >>>>>>>> flag Sophie requested.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 5.
> > >>>>>>>> There is a guideline in Kafka not to use the get prefix for
> > >>>> getters (at
> > >>>>>>>> least in the public API). Thus, could you please rename
> > >>>>>>>>
> > >>>>>>>> getCommittedOffset(TopicPartition partition) ->
> > >>>>>>>> committedOffsetFor(TopicPartition partition)
> > >>>>>>>>
> > >>>>>>>> You can also propose an alternative to committedOffsetFor().
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Bruno
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On 10/13/23 3:21 PM, Nick Telford wrote:
> > >>>>>>>>> Hi Bruno,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for getting back to me.
> > >>>>>>>>>
> > >>>>>>>>> 1.
> > >>>>>>>>> I think this should be possible. Are you thinking of the
> > >>>> situation
> > >>>>>> where
> > >>>>>>>> a
> > >>>>>>>>> user may downgrade to a previous version of Kafka Streams? In
> > >>>> that
> > >>>>>> case,
> > >>>>>>>>> sadly, the RocksDBStore would get wiped by the older version
> > >> of
> > >>>> Kafka
> > >>>>>>>>> Streams anyway, because that version wouldn't understand the
> > >>>> extra
> > >>>>>> column
> > >>>>>>>>> family (that holds offsets), so the missing Position file
> > >> would
> > >>>>>>>>> automatically get rebuilt when the store is rebuilt from the
> > >>>>>> changelog.
> > >>>>>>>>> Are there other situations than downgrade where a
> > >> transactional
> > >>>> store
> > >>>>>>>> could
> > >>>>>>>>> be replaced by a non-transactional one? I can't think of any.
> > >>>>>>>>>
> > >>>>>>>>> 2.
> > >>>>>>>>> Ahh yes, the Test Plan - my Kryptonite! This section
> > >> definitely
> > >>>>>> needs to
> > >>>>>>>> be
> > >>>>>>>>> fleshed out. I'll work on that. How much detail do you need?
> > >>>>>>>>>
> > >>>>>>>>> 3.
> > >>>>>>>>> See my previous email discussing this.
> > >>>>>>>>>
> > >>>>>>>>> 4.
> > >>>>>>>>> Hmm, this is an interesting point. Are you suggesting that
> > >> under
> > >>>> ALOS
> > >>>>>>>>> READ_COMMITTED should not be supported?
> > >>>>>>>>>
> > >>>>>>>>> Regards,
> > >>>>>>>>> Nick
> > >>>>>>>>>
> > >>>>>>>>> On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <
> > >> cado...@apache.org>
> > >>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Nick,
> > >>>>>>>>>>
> > >>>>>>>>>> I think the KIP is converging!
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 1.
> > >>>>>>>>>> I am wondering whether it makes sense to write the position
> > >> file
> > >>>>>> during
> > >>>>>>>>>> close as we do for the checkpoint file, so that in case the
> > >>>> state
> > >>>>>> store
> > >>>>>>>>>> is replaced with a non-transactional state store the
> > >>>>>> non-transactional
> > >>>>>>>>>> state store finds the position file. I think, this is not
> > >>>> strictly
> > >>>>>>>>>> needed, but would be a nice behavior instead of just
> > >> deleting
> > >>>> the
> > >>>>>>>>>> position file.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 2.
> > >>>>>>>>>> The test plan does not mention integration tests. Do you not
> > >>>> need to
> > >>>>>>>>>> extend existing ones and add new ones. Also for upgrading
> > >> and
> > >>>>>>>>>> downgrading you might need integration and/or system tests.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 3.
> > >>>>>>>>>> I think Sophie made a point. Although, IQ reading from
> > >>>> uncommitted
> > >>>>>> data
> > >>>>>>>>>> under EOS might be considered a bug by some people. Thus,
> > >> your
> > >>>> KIP
> > >>>>>> would
> > >>>>>>>>>> fix a bug rather than changing the intended behavior.
> > >> However, I
> > >>>>>> also
> > >>>>>>>>>> see that a feature flag would help users that rely on this
> > >> buggy
> > >>>>>>>>>> behavior (at least until AK 4.0).
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 4.
> > >>>>>>>>>> This is related to the previous point. I assume that the
> > >>>> difference
> > >>>>>>>>>> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is
> > >> that in
> > >>>> the
> > >>>>>>>>>> former you enable transactions on the state store and in the
> > >>>> latter
> > >>>>>> you
> > >>>>>>>>>> disable them. If my assumption is correct, I think that is
> > >> an
> > >>>> issue.
> > >>>>>>>>>> Let's assume under ALOS Streams fails over a couple of times
> > >>>> more or
> > >>>>>>>>>> less at the same step in processing after value 3 is added
> > >> to an
> > >>>>>>>>>> aggregation but the offset of the corresponding input record
> > >>>> was not
> > >>>>>>>>>> committed. Without transactions disabled, the aggregation
> > >> value
> > >>>>>> would
> > >>>>>>>>>> increase by 3 for each failover. With transactions enabled,
> > >>>> value 3
> > >>>>>>>>>> would only be added to the aggregation once when the offset
> > >> of
> > >>>> the
> > >>>>>> input
> > >>>>>>>>>> record is committed and the transaction finally completes.
> > >> So
> > >>>> the
> > >>>>>>>>>> content of the state store would change depending on the
> > >>>>>> configuration
> > >>>>>>>>>> for IQ. IMO, the content of the state store should be
> > >>>> independent
> > >>>>>> from
> > >>>>>>>>>> IQ. Given this issue, I propose to not use transactions with
> > >>>> ALOS at
> > >>>>>>>>>> all. I was a big proponent of using transactions with ALOS,
> > >> but
> > >>>> I
> > >>>>>>>>>> realized that transactions with ALOS is not as easy as
> > >> enabling
> > >>>>>>>>>> transactions on state stores. Another aspect that is
> > >>>> problematic is
> > >>>>>> that
> > >>>>>>>>>> the changelog topic which actually replicates the state
> > >> store
> > >>>> is not
> > >>>>>>>>>> transactional under ALOS. Thus, it might happen that the
> > >> state
> > >>>>>> store and
> > >>>>>>>>>> the changelog differ in their content. All of this is maybe
> > >>>> solvable
> > >>>>>>>>>> somehow, but for the sake of this KIP, I would leave it for
> > >> the
> > >>>>>> future.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Bruno
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
> > >>>>>>>>>>> Hey Nick! First of all thanks for taking up this awesome
> > >>>> feature,
> > >>>>>> I'm
> > >>>>>>>>>> sure
> > >>>>>>>>>>> every single
> > >>>>>>>>>>> Kafka Streams user and dev would agree that it is sorely
> > >>>> needed.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I've just been catching up on the KIP and surrounding
> > >>>> discussion,
> > >>>>>> so
> > >>>>>>>>>> please
> > >>>>>>>>>>> forgive me
> > >>>>>>>>>>> for any misunderstandings or misinterpretations of the
> > >> current
> > >>>>>> plan and
> > >>>>>>>>>>> don't hesitate to
> > >>>>>>>>>>> correct me.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Before I jump in, I just want to say that having seen this
> > >>>> drag on
> > >>>>>> for
> > >>>>>>>> so
> > >>>>>>>>>>> long, my singular
> > >>>>>>>>>>> goal in responding is to help this KIP past a perceived
> > >>>> impasse so
> > >>>>>> we
> > >>>>>>>> can
> > >>>>>>>>>>> finally move on
> > >>>>>>>>>>> to voting and implementing it. Long discussions are to be
> > >>>> expected
> > >>>>>> for
> > >>>>>>>>>>> major features like
> > >>>>>>>>>>> this but it's completely on us as the Streams devs to make
> > >> sure
> > >>>>>> there
> > >>>>>>>> is
> > >>>>>>>>>> an
> > >>>>>>>>>>> end in sight
> > >>>>>>>>>>> for any ongoing discussion.
> > >>>>>>>>>>>
> > >>>>>>>>>>> With that said, it's my understanding that the KIP as
> > >> currently
> > >>>>>>>> proposed
> > >>>>>>>>>> is
> > >>>>>>>>>>> just not tenable
> > >>>>>>>>>>> for Kafka Streams, and would prevent some EOS users from
> > >>>> upgrading
> > >>>>>> to
> > >>>>>>>> the
> > >>>>>>>>>>> version it
> > >>>>>>>>>>> first appears in. Given that we can't predict or guarantee
> > >>>> whether
> > >>>>>> any
> > >>>>>>>> of
> > >>>>>>>>>>> the followup KIPs
> > >>>>>>>>>>> would be completed in the same release cycle as this one,
> > >> we
> > >>>> need
> > >>>>>> to
> > >>>>>>>> make
> > >>>>>>>>>>> sure that the
> > >>>>>>>>>>> feature is either compatible with all current users or else
> > >>>>>>>>>> feature-flagged
> > >>>>>>>>>>> so that they may
> > >>>>>>>>>>> opt in/out.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Therefore, IIUC we need to have either (or both) of these
> > >> as
> > >>>>>>>>>>> fully-implemented config options:
> > >>>>>>>>>>> 1. default.state.isolation.level
> > >>>>>>>>>>> 2. enable.transactional.state.stores
> > >>>>>>>>>>>
> > >>>>>>>>>>> This way EOS users for whom read_committed semantics are
> > >> not
> > >>>>>> viable can
> > >>>>>>>>>>> still upgrade,
> > >>>>>>>>>>> and either use the isolation.level config to leverage the
> > >> new
> > >>>> txn
> > >>>>>> state
> > >>>>>>>>>>> stores without sacrificing
> > >>>>>>>>>>> their application semantics, or else simply keep the
> > >>>> transactional
> > >>>>>>>> state
> > >>>>>>>>>>> stores disabled until we
> > >>>>>>>>>>> are able to fully implement the isolation level
> > >> configuration
> > >>>> at
> > >>>>>> either
> > >>>>>>>>>> an
> > >>>>>>>>>>> application or query level.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Frankly you are the expert here and know much more about
> > >> the
> > >>>>>> tradeoffs
> > >>>>>>>> in
> > >>>>>>>>>>> both semantics and
> > >>>>>>>>>>> effort level of implementing one of these configs vs the
> > >>>> other. In
> > >>>>>> my
> > >>>>>>>>>>> opinion, either option would
> > >>>>>>>>>>> be fine and I would leave the decision of which one to
> > >> include
> > >>>> in
> > >>>>>> this
> > >>>>>>>>>> KIP
> > >>>>>>>>>>> completely up to you.
> > >>>>>>>>>>> I just don't see a way for the KIP to proceed without some
> > >>>>>> variation of
> > >>>>>>>>>> the
> > >>>>>>>>>>> above that would allow
> > >>>>>>>>>>> EOS users to opt-out of read_committed.
> > >>>>>>>>>>>
> > >>>>>>>>>>> (If it's all the same to you, I would recommend always
> > >>>> including a
> > >>>>>>>>>> feature
> > >>>>>>>>>>> flag in large structural
> > >>>>>>>>>>> changes like this. No matter how much I trust someone or
> > >>>> myself to
> > >>>>>>>>>>> implement a feature, you just
> > >>>>>>>>>>> never know what kind of bugs might slip in, especially
> > >> with the
> > >>>>>> very
> > >>>>>>>>>> first
> > >>>>>>>>>>> iteration that gets released.
> > >>>>>>>>>>> So personally, my choice would be to add the feature flag
> > >> and
> > >>>>>> leave it
> > >>>>>>>>>> off
> > >>>>>>>>>>> by default. If all goes well
> > >>>>>>>>>>> you can do a quick KIP to enable it by default as soon as
> > >> the
> > >>>>>>>>>>> isolation.level config has been
> > >>>>>>>>>>> completed. But feel free to just pick whichever option is
> > >>>> easiest
> > >>>>>> or
> > >>>>>>>>>>> quickest for you to implement)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hope this helps move the discussion forward,
> > >>>>>>>>>>> Sophie
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <
> > >>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Agreed, I can live with that for now.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> In an effort to keep the scope of this KIP from
> > >> expanding, I'm
> > >>>>>> leaning
> > >>>>>>>>>>>> towards just providing a configurable
> > >>>>>> default.state.isolation.level
> > >>>>>>>> and
> > >>>>>>>>>>>> removing IsolationLevel from the StateStoreContext. This
> > >>>> would be
> > >>>>>>>>>>>> compatible with adding support for query-time
> > >> IsolationLevels
> > >>>> in
> > >>>>>> the
> > >>>>>>>>>>>> future, whilst providing a way for users to select an
> > >>>> isolation
> > >>>>>> level
> > >>>>>>>>>> now.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The big problem with this, however, is that if a user
> > >> selects
> > >>>>>>>>>>>> processing.mode
> > >>>>>>>>>>>> = "exactly-once(-v2|-beta)", and
> > >>>> default.state.isolation.level =
> > >>>>>>>>>>>> "READ_UNCOMMITTED", we need to guarantee that the data
> > >> isn't
> > >>>>>> written
> > >>>>>>>> to
> > >>>>>>>>>>>> disk until commit() is called, but we also need to permit
> > >> IQ
> > >>>>>> threads
> > >>>>>>>> to
> > >>>>>>>>>>>> read from the ongoing transaction.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> A simple solution would be to (temporarily) forbid this
> > >>>>>> combination of
> > >>>>>>>>>>>> configuration, and have default.state.isolation.level
> > >>>>>> automatically
> > >>>>>>>>>> switch
> > >>>>>>>>>>>> to READ_COMMITTED when processing.mode is anything other
> > >> than
> > >>>>>>>>>>>> at-least-once. Do you think this would be acceptable?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> In a later KIP, we can add support for query-time
> > >> isolation
> > >>>>>> levels and
> > >>>>>>>>>>>> solve this particular problem there, which would relax
> > >> this
> > >>>>>>>> restriction.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Regards,
> > >>>>>>>>>>>> Nick
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <
> > >>>> cado...@apache.org>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Why do we need to add READ_COMMITTED to
> > >>>> InMemoryKeyValueStore? I
> > >>>>>>>> think
> > >>>>>>>>>>>>> it is perfectly valid to say InMemoryKeyValueStore do not
> > >>>> support
> > >>>>>>>>>>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the
> > >>>> de-facto
> > >>>>>>>> default
> > >>>>>>>>>>>>> at the moment.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 9/18/23 7:12 PM, Nick Telford wrote:
> > >>>>>>>>>>>>>> Oh! One other concern I haven't mentioned: if we make
> > >>>>>>>> IsolationLevel a
> > >>>>>>>>>>>>>> query-time constraint, then we need to add support for
> > >>>>>>>> READ_COMMITTED
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> InMemoryKeyValueStore too, which will require some
> > >> changes
> > >>>> to
> > >>>>>> the
> > >>>>>>>>>>>>>> implementation.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford <
> > >>>>>> nick.telf...@gmail.com
> > >>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I agree that having IsolationLevel be determined at
> > >>>> query-time
> > >>>>>> is
> > >>>>>>>> the
> > >>>>>>>>>>>>>>> ideal design, but there are a few sticking points:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>> There needs to be some way to communicate the
> > >>>> IsolationLevel
> > >>>>>> down
> > >>>>>>>> to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> RocksDBStore itself, so that the query can respect it.
> > >>>> Since
> > >>>>>> stores
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> "layered" in functionality (i.e. ChangeLoggingStore,
> > >>>>>> MeteredStore,
> > >>>>>>>>>>>>> etc.),
> > >>>>>>>>>>>>>>> we need some way to deliver that information to the
> > >> bottom
> > >>>>>> layer.
> > >>>>>>>> For
> > >>>>>>>>>>>>> IQv2,
> > >>>>>>>>>>>>>>> we can use the existing State#query() method, but IQv1
> > >> has
> > >>>> no
> > >>>>>> way
> > >>>>>>>> to
> > >>>>>>>>>>>> do
> > >>>>>>>>>>>>>>> this.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> A simple approach, which would potentially open up
> > >> other
> > >>>>>> options,
> > >>>>>>>>>>>> would
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>> to add something like: ReadOnlyKeyValueStore<K, V>
> > >>>>>>>>>>>>>>> readOnlyView(IsolationLevel isolationLevel) to
> > >>>>>>>> ReadOnlyKeyValueStore
> > >>>>>>>>>>>>> (and
> > >>>>>>>>>>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore,
> > >>>> etc.).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>> As mentioned above, RocksDB WriteBatches are not
> > >>>> thread-safe,
> > >>>>>> which
> > >>>>>>>>>>>>> causes
> > >>>>>>>>>>>>>>> a problem if we want to provide READ_UNCOMMITTED
> > >>>> Iterators. I
> > >>>>>> also
> > >>>>>>>>>>>> had a
> > >>>>>>>>>>>>>>> look at RocksDB Transactions[1], but they solve a very
> > >>>>>> different
> > >>>>>>>>>>>>> problem,
> > >>>>>>>>>>>>>>> and have the same thread-safety issue.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> One possible approach that I mentioned is chaining
> > >>>>>> WriteBatches:
> > >>>>>>>>>> every
> > >>>>>>>>>>>>>>> time a new Interactive Query is received (i.e.
> > >>>> readOnlyView,
> > >>>>>> see
> > >>>>>>>>>>>> above,
> > >>>>>>>>>>>>>>> is called) we "freeze" the existing WriteBatch, and
> > >> start a
> > >>>>>> new one
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>> writes. The Interactive Query queries the "chain" of
> > >>>> previous
> > >>>>>>>>>>>>> WriteBatches
> > >>>>>>>>>>>>>>> + the underlying database; while the StreamThread
> > >> starts
> > >>>>>> writing to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> *new* WriteBatch. On-commit, the StreamThread would
> > >> write
> > >>>> *all*
> > >>>>>>>>>>>>>>> WriteBatches in the chain to the database (that have
> > >> not
> > >>>> yet
> > >>>>>> been
> > >>>>>>>>>>>>> written).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> WriteBatches would be closed/freed only when they have
> > >> been
> > >>>>>> both
> > >>>>>>>>>>>>>>> committed, and all open Interactive Queries on them
> > >> have
> > >>>> been
> > >>>>>>>> closed.
> > >>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>> would require some reference counting.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Obviously a drawback of this approach is the potential
> > >> for
> > >>>>>>>> increased
> > >>>>>>>>>>>>>>> memory usage: if an Interactive Query is long-lived,
> > >> for
> > >>>>>> example by
> > >>>>>>>>>>>>> doing a
> > >>>>>>>>>>>>>>> full scan over a large database, or even just pausing
> > >> in
> > >>>> the
> > >>>>>> middle
> > >>>>>>>>>> of
> > >>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>> iteration, then the existing chain of WriteBatches
> > >> could be
> > >>>>>> kept
> > >>>>>>>>>>>> around
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>> a long time, potentially forever.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> A.
> > >>>>>>>>>>>>>>> Going off on a tangent, it looks like in addition to
> > >>>> supporting
> > >>>>>>>>>>>>>>> READ_COMMITTED queries, we could go further and support
> > >>>>>>>>>>>> REPEATABLE_READ
> > >>>>>>>>>>>>>>> queries (i.e. where subsequent reads to the same key
> > >> in the
> > >>>>>> same
> > >>>>>>>>>>>>>>> Interactive Query are guaranteed to yield the same
> > >> value)
> > >>>> by
> > >>>>>> making
> > >>>>>>>>>>>> use
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so
> > >> the
> > >>>>>>>>>> performance
> > >>>>>>>>>>>>>>> impact is likely to be negligible, but they do require
> > >>>> that the
> > >>>>>>>>>>>>> Interactive
> > >>>>>>>>>>>>>>> Query session can be explicitly closed.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> This could be achieved if we made the above
> > >> readOnlyView
> > >>>>>> interface
> > >>>>>>>>>>>> look
> > >>>>>>>>>>>>>>> more like:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> interface ReadOnlyKeyValueView<K, V> implements
> > >>>>>>>>>>>> ReadOnlyKeyValueStore<K,
> > >>>>>>>>>>>>>>> V>, AutoCloseable {}
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> interface ReadOnlyKeyValueStore<K, V> {
> > >>>>>>>>>>>>>>>         ...
> > >>>>>>>>>>>>>>>         ReadOnlyKeyValueView<K, V>
> > >>>> readOnlyView(IsolationLevel
> > >>>>>>>>>>>>> isolationLevel);
> > >>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> But this would be a breaking change, as existing IQv1
> > >>>> queries
> > >>>>>> are
> > >>>>>>>>>>>>>>> guaranteed to never call store.close(), and therefore
> > >> these
> > >>>>>> would
> > >>>>>>>>>> leak
> > >>>>>>>>>>>>>>> memory under REPEATABLE_READ.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> B.
> > >>>>>>>>>>>>>>> One thing that's notable: MyRocks states that they
> > >> support
> > >>>>>>>>>>>>> READ_COMMITTED
> > >>>>>>>>>>>>>>> and REPEATABLE_READ, but they make no mention of
> > >>>>>>>>>>>> READ_UNCOMMITTED[3][4].
> > >>>>>>>>>>>>>>> This could be because doing so is technically
> > >>>>>> difficult/impossible
> > >>>>>>>>>>>> using
> > >>>>>>>>>>>>>>> the primitives available in RocksDB.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Lucas, to address your points:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> U1.
> > >>>>>>>>>>>>>>> It's only "SHOULD" to permit alternative (i.e.
> > >> non-RocksDB)
> > >>>>>>>>>>>>>>> implementations of StateStore that do not support
> > >> atomic
> > >>>>>> writes.
> > >>>>>>>>>>>>> Obviously
> > >>>>>>>>>>>>>>> in those cases, the guarantees Kafka Streams
> > >>>> provides/expects
> > >>>>>> would
> > >>>>>>>>>> be
> > >>>>>>>>>>>>>>> relaxed. Do you think we should require all
> > >>>> implementations to
> > >>>>>>>>>> support
> > >>>>>>>>>>>>>>> atomic writes?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> U2.
> > >>>>>>>>>>>>>>> Stores can support multiple IsolationLevels. As we've
> > >>>> discussed
> > >>>>>>>>>> above,
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> ideal scenario would be to specify the IsolationLevel
> > >> at
> > >>>>>>>> query-time.
> > >>>>>>>>>>>>>>> Failing that, I think the second-best approach is to
> > >>>> define the
> > >>>>>>>>>>>>>>> IsolationLevel for *all* queries based on the
> > >>>> processing.mode,
> > >>>>>>>> which
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>> what the default StateStoreContext#isolationLevel()
> > >>>> achieves.
> > >>>>>> Would
> > >>>>>>>>>>>> you
> > >>>>>>>>>>>>>>> prefer an alternative?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> While the existing implementation is equivalent to
> > >>>>>>>> READ_UNCOMMITTED,
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>> can yield unexpected results/errors under EOS, if a
> > >>>>>> transaction is
> > >>>>>>>>>>>>> rolled
> > >>>>>>>>>>>>>>> back. While this would be a change in behaviour for
> > >> users,
> > >>>> it
> > >>>>>> would
> > >>>>>>>>>>>> look
> > >>>>>>>>>>>>>>> more like a bug fix than a breaking change. That said,
> > >> we
> > >>>>>> *could*
> > >>>>>>>>>> make
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> configurable, and default to the existing behaviour
> > >>>>>>>>>> (READ_UNCOMMITTED)
> > >>>>>>>>>>>>>>> instead of inferring it from the processing.mode?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> N1, N2.
> > >>>>>>>>>>>>>>> These were only primitives to avoid boxing costs, but
> > >> since
> > >>>>>> this is
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>> performance sensitive area, it should be fine to
> > >> change if
> > >>>>>> that's
> > >>>>>>>>>>>>> desirable.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> N3.
> > >>>>>>>>>>>>>>> It's because the store "manages its own offsets", which
> > >>>>>> includes
> > >>>>>>>> both
> > >>>>>>>>>>>>>>> committing the offset, *and providing it* via
> > >>>>>> getCommittedOffset().
> > >>>>>>>>>>>>>>> Personally, I think "managesOffsets" conveys this best,
> > >>>> but I
> > >>>>>> don't
> > >>>>>>>>>>>> mind
> > >>>>>>>>>>>>>>> changing it if the nomenclature is unclear.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Sorry for the massive emails/essays!
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1:
> > >> https://github.com/facebook/rocksdb/wiki/Transactions
> > >>>>>>>>>>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> > >>>>>>>>>>>>>>> 3:
> > >>>>>>>>
> > >> https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> > >>>>>>>>>>>>>>> 4:
> > >>>> https://mariadb.com/kb/en/myrocks-transactional-isolation/
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> > >>>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> since I last read it in April, the KIP has become much
> > >>>>>> cleaner and
> > >>>>>>>>>>>>>>>> easier to read. Great work!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> It feels to me the last big open point is whether we
> > >> can
> > >>>>>> implement
> > >>>>>>>>>>>>>>>> isolation level as a query parameter. I understand
> > >> that
> > >>>> there
> > >>>>>> are
> > >>>>>>>>>>>>>>>> implementation concerns, but as Colt says, it would
> > >> be a
> > >>>> great
> > >>>>>>>>>>>>>>>> addition, and would also simplify the migration path
> > >> for
> > >>>> this
> > >>>>>>>>>> change.
> > >>>>>>>>>>>>>>>> Is the implementation problem you mentioned caused by
> > >> the
> > >>>>>>>> WriteBatch
> > >>>>>>>>>>>>>>>> not having a notion of a snapshot, as the underlying
> > >> DB
> > >>>>>> iterator
> > >>>>>>>>>>>> does?
> > >>>>>>>>>>>>>>>> In that case, I am not sure a chain of WriteBatches
> > >> as you
> > >>>>>> propose
> > >>>>>>>>>>>>>>>> would fully solve the problem, but maybe I didn't dig
> > >>>> enough
> > >>>>>> into
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>> details to fully understand it.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> If it's not possible to implement it now, would it be
> > >> an
> > >>>>>> option to
> > >>>>>>>>>>>>>>>> make sure in this KIP that we do not fully close the
> > >> door
> > >>>> on
> > >>>>>>>>>>>> per-query
> > >>>>>>>>>>>>>>>> isolation levels in the interface, as it may be
> > >> possible
> > >>>> to
> > >>>>>>>>>> implement
> > >>>>>>>>>>>>>>>> the missing primitives in RocksDB or Speedb in the
> > >> future.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Understanding:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to
> > >> be
> > >>>>>> persisted
> > >>>>>>>>>>>>>>>> atomically with the records?
> > >>>>>>>>>>>>>>>> * U2) Don't understand the default implementation of
> > >>>>>>>>>>>> `isolationLevel`.
> > >>>>>>>>>>>>>>>> The isolation level should be a property of the
> > >> underlying
> > >>>>>> store,
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>>>> not be defined by the default config? Existing stores
> > >>>> probably
> > >>>>>>>> don't
> > >>>>>>>>>>>>>>>> guarantee READ_COMMITTED, so the default should be to
> > >>>> return
> > >>>>>>>>>>>>>>>> READ_UNCOMMITTED.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Nits:
> > >>>>>>>>>>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong`
> > >>>> return
> > >>>>>> type,
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>> avoid the `null`?
> > >>>>>>>>>>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an
> > >>>>>> `OptionalLong`
> > >>>>>>>>>>>>>>>> return type, to avoid the `-1`?
> > >>>>>>>>>>>>>>>> * N3) I don't understand why `managesOffsets` uses the
> > >>>>>> 'manage'
> > >>>>>>>>>> verb,
> > >>>>>>>>>>>>>>>> whereas all other methods use the "commits" verb. I'd
> > >>>> suggest
> > >>>>>>>>>>>>>>>> `commitsOffsets`.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Either way, it feels this KIP is very close to the
> > >> finish
> > >>>>>> line,
> > >>>>>>>> I'm
> > >>>>>>>>>>>>>>>> looking forward to seeing this in production!
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>> Lucas
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <
> > >>>>>> c...@littlehorse.io
> > >>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Making IsolationLevel a query-time constraint,
> > >> rather
> > >>>> than
> > >>>>>>>> linking
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> the processing.guarantee.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> As I understand it, would this allow even a user of
> > >> EOS
> > >>>> to
> > >>>>>>>> control
> > >>>>>>>>>>>>>>>> whether
> > >>>>>>>>>>>>>>>>> reading committed or uncommitted records? If so, I am
> > >>>> highly
> > >>>>>> in
> > >>>>>>>>>>>> favor
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> this.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I know that I was one of the early people to point
> > >> out
> > >>>> the
> > >>>>>>>> current
> > >>>>>>>>>>>>>>>>> shortcoming that IQ reads uncommitted records, but
> > >> just
> > >>>> this
> > >>>>>>>>>>>> morning I
> > >>>>>>>>>>>>>>>>> realized a pattern we use which means that (for
> > >> certain
> > >>>>>> queries)
> > >>>>>>>>>> our
> > >>>>>>>>>>>>>>>> system
> > >>>>>>>>>>>>>>>>> needs to be able to read uncommitted records, which
> > >> is
> > >>>> the
> > >>>>>>>> current
> > >>>>>>>>>>>>>>>> behavior
> > >>>>>>>>>>>>>>>>> of Kafka Streams in EOS.***
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> If IsolationLevel being a query-time decision allows
> > >> for
> > >>>>>> this,
> > >>>>>>>> then
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> would be amazing. I would also vote that the default
> > >>>> behavior
> > >>>>>>>>>> should
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> reading uncommitted records, because it is totally
> > >>>> possible
> > >>>>>> for a
> > >>>>>>>>>>>>> valid
> > >>>>>>>>>>>>>>>>> application to depend on that behavior, and breaking
> > >> it
> > >>>> in a
> > >>>>>>>> minor
> > >>>>>>>>>>>>>>>> release
> > >>>>>>>>>>>>>>>>> might be a bit strong.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> *** (Note, for the curious reader....) Our
> > >> use-case/query
> > >>>>>> pattern
> > >>>>>>>>>>>> is a
> > >>>>>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>> complex, but reading "uncommitted" records is
> > >> actually
> > >>>> safe
> > >>>>>> in
> > >>>>>>>> our
> > >>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>>> because processing is deterministic. Additionally, IQ
> > >>>> being
> > >>>>>> able
> > >>>>>>>> to
> > >>>>>>>>>>>>> read
> > >>>>>>>>>>>>>>>>> uncommitted records is crucial to enable "read your
> > >> own
> > >>>>>> writes"
> > >>>>>>>> on
> > >>>>>>>>>>>> our
> > >>>>>>>>>>>>>>>> API:
> > >>>>>>>>>>>>>>>>> Due to the deterministic processing, we send an
> > >> "ack" to
> > >>>> the
> > >>>>>>>> client
> > >>>>>>>>>>>>> who
> > >>>>>>>>>>>>>>>>> makes the request as soon as the processor processes
> > >> the
> > >>>>>> result.
> > >>>>>>>> If
> > >>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>> can't read uncommitted records, they may receive a
> > >> "201 -
> > >>>>>>>> Created"
> > >>>>>>>>>>>>>>>>> response, immediately followed by a "404 - Not Found"
> > >>>> when
> > >>>>>> doing
> > >>>>>>>> a
> > >>>>>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>> for the object they just created).
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>> Colt McNealy
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
> > >>>>>>>>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Addendum:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I think we would also face the same problem with the
> > >>>>>> approach
> > >>>>>>>> John
> > >>>>>>>>>>>>>>>> outlined
> > >>>>>>>>>>>>>>>>>> earlier (using the record cache as a transaction
> > >> buffer
> > >>>> and
> > >>>>>>>>>>>> flushing
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>> straight to SST files). This is because the record
> > >> cache
> > >>>>>> (the
> > >>>>>>>>>>>>>>>> ThreadCache
> > >>>>>>>>>>>>>>>>>> class) is not thread-safe, so every commit would
> > >>>> invalidate
> > >>>>>> open
> > >>>>>>>>>> IQ
> > >>>>>>>>>>>>>>>>>> Iterators in the same way that RocksDB WriteBatches
> > >> do.
> > >>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <
> > >>>>>>>>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I've updated the KIP based on our conversation. The
> > >>>> only
> > >>>>>> things
> > >>>>>>>>>>>>>>>> I've not
> > >>>>>>>>>>>>>>>>>>> yet done are:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 1. Using transactions under ALOS and EOS.
> > >>>>>>>>>>>>>>>>>>> 2. Making IsolationLevel a query-time constraint,
> > >>>> rather
> > >>>>>> than
> > >>>>>>>>>>>>>>>> linking it
> > >>>>>>>>>>>>>>>>>>> to the processing.guarantee.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> There's a wrinkle that makes this a challenge:
> > >>>> Interactive
> > >>>>>>>>>> Queries
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> open an Iterator, when using transactions and
> > >>>>>> READ_UNCOMMITTED.
> > >>>>>>>>>>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries
> > >>>> need
> > >>>>>> to be
> > >>>>>>>>>>>> able
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> read records from the currently uncommitted
> > >> transaction
> > >>>>>> buffer
> > >>>>>>>>>>>>>>>>>>> (WriteBatch). This includes for Iterators, which
> > >> should
> > >>>>>> iterate
> > >>>>>>>>>>>>>>>> both the
> > >>>>>>>>>>>>>>>>>>> transaction buffer and underlying database (using
> > >>>>>>>>>>>>>>>>>>> WriteBatch#iteratorWithBase()).
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> The issue is that when the StreamThread commits, it
> > >>>> writes
> > >>>>>> the
> > >>>>>>>>>>>>>>>> current
> > >>>>>>>>>>>>>>>>>>> WriteBatch to RocksDB *and then clears the
> > >> WriteBatch*.
> > >>>>>>>> Clearing
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> WriteBatch while an Interactive Query holds an open
> > >>>>>> Iterator on
> > >>>>>>>>>> it
> > >>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>> invalidate the Iterator. Worse, it turns out that
> > >>>> Iterators
> > >>>>>>>> over
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> WriteBatch become invalidated not just when the
> > >>>> WriteBatch
> > >>>>>> is
> > >>>>>>>>>>>>>>>> cleared,
> > >>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>> also when the Iterators' current key receives a new
> > >>>> write.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Now that I'm writing this, I remember that this is
> > >> the
> > >>>>>> major
> > >>>>>>>>>>>> reason
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>> switched the original design from having a
> > >> query-time
> > >>>>>>>>>>>>>>>> IsolationLevel to
> > >>>>>>>>>>>>>>>>>>> having the IsolationLevel linked to the
> > >>>> transactionality
> > >>>>>> of the
> > >>>>>>>>>>>>>>>> stores
> > >>>>>>>>>>>>>>>>>>> themselves.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> It *might* be possible to resolve this, by having a
> > >>>>>> "chain" of
> > >>>>>>>>>>>>>>>>>>> WriteBatches, with the StreamThread switching to a
> > >> new
> > >>>>>>>> WriteBatch
> > >>>>>>>>>>>>>>>>>> whenever
> > >>>>>>>>>>>>>>>>>>> a new Interactive Query attempts to read from the
> > >>>>>> database, but
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>> cause some performance problems/memory pressure
> > >> when
> > >>>>>> subjected
> > >>>>>>>> to
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> high
> > >>>>>>>>>>>>>>>>>>> Interactive Query load. It would also reduce the
> > >>>>>> efficiency of
> > >>>>>>>>>>>>>>>>>> WriteBatches
> > >>>>>>>>>>>>>>>>>>> on-commit, as we'd have to write N WriteBatches,
> > >> where
> > >>>> N
> > >>>>>> is the
> > >>>>>>>>>>>>>>>> number of
> > >>>>>>>>>>>>>>>>>>> Interactive Queries since the last commit.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I realise this is getting into the weeds of the
> > >>>>>> implementation,
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> you'd
> > >>>>>>>>>>>>>>>>>>> rather we focus on the API for now, but I think
> > >> it's
> > >>>>>> important
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>>>> consider
> > >>>>>>>>>>>>>>>>>>> how to implement the desired API, in case we come
> > >> up
> > >>>> with
> > >>>>>> an
> > >>>>>>>> API
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> cannot be implemented efficiently, or even at all!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thoughts?
> > >>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <
> > >>>>>>>> cado...@apache.org
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>>>>>>> Of course, you are right! My bad!
> > >>>>>>>>>>>>>>>>>>>> Wiping out the state in the downgrading case is
> > >> fine.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 3a.
> > >>>>>>>>>>>>>>>>>>>> Focus on the public facing changes for the KIP. We
> > >>>> will
> > >>>>>> manage
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>> get
> > >>>>>>>>>>>>>>>>>>>> the internals right. Regarding state stores that
> > >> do
> > >>>> not
> > >>>>>>>> support
> > >>>>>>>>>>>>>>>>>>>> READ_COMMITTED, they should throw an error stating
> > >>>> that
> > >>>>>> they
> > >>>>>>>> do
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state
> > >>>> stores
> > >>>>>>>>>>>>>>>> immediately.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 3b.
> > >>>>>>>>>>>>>>>>>>>> I am in favor of using transactions also for ALOS.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Thanks for getting back to me!
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>> The fact that implementations can always track
> > >>>> estimated
> > >>>>>>>> memory
> > >>>>>>>>>>>>>>>> usage
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as
> > >> an
> > >>>>>> option,
> > >>>>>>>> and
> > >>>>>>>>>>>>>>>> I'll
> > >>>>>>>>>>>>>>>>>>>> clarify
> > >>>>>>>>>>>>>>>>>>>>> the JavaDoc that 0 is not just for
> > >> non-transactional
> > >>>>>> stores,
> > >>>>>>>>>>>>>>>> which is
> > >>>>>>>>>>>>>>>>>>>>> currently misleading.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>>>>>>>> The problem with catching the exception in the
> > >>>> downgrade
> > >>>>>>>>>> process
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> would require new code in the Kafka version being
> > >>>>>> downgraded
> > >>>>>>>>>> to.
> > >>>>>>>>>>>>>>>> Since
> > >>>>>>>>>>>>>>>>>>>>> users could conceivably downgrade to almost *any*
> > >>>> older
> > >>>>>>>> version
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>>>>> Streams, I'm not sure how we could add that code?
> > >>>>>>>>>>>>>>>>>>>>> The only way I can think of doing it would be to
> > >>>> provide
> > >>>>>> a
> > >>>>>>>>>>>>>>>> dedicated
> > >>>>>>>>>>>>>>>>>>>>> downgrade tool, that goes through every local
> > >> store
> > >>>> and
> > >>>>>>>> removes
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> offsets column families. But that seems like an
> > >>>>>> unnecessary
> > >>>>>>>>>>>>>>>> amount of
> > >>>>>>>>>>>>>>>>>>>> extra
> > >>>>>>>>>>>>>>>>>>>>> code to maintain just to handle a somewhat niche
> > >>>>>> situation,
> > >>>>>>>>>> when
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> alternative (automatically wipe and restore
> > >> stores)
> > >>>>>> should be
> > >>>>>>>>>>>>>>>>>>>> acceptable.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've
> > >>>> requested.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 3a.
> > >>>>>>>>>>>>>>>>>>>>> I agree that IsolationLevel makes more sense at
> > >>>>>> query-time,
> > >>>>>>>> and
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>> initially attempted to place the IsolationLevel
> > >> at
> > >>>>>>>> query-time,
> > >>>>>>>>>>>>>>>> but I
> > >>>>>>>>>>>>>>>>>> ran
> > >>>>>>>>>>>>>>>>>>>>> into some problems:
> > >>>>>>>>>>>>>>>>>>>>> - The key issue is that, under ALOS we're not
> > >> staging
> > >>>>>> writes
> > >>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> transactions, so can't perform writes at the
> > >>>>>> READ_COMMITTED
> > >>>>>>>>>>>>>>>> isolation
> > >>>>>>>>>>>>>>>>>>>>> level. However, this may be addressed if we
> > >> decide to
> > >>>>>>>> *always*
> > >>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>>>> transactions as discussed under 3b.
> > >>>>>>>>>>>>>>>>>>>>> - IQv1 and IQv2 have quite different
> > >>>> implementations. I
> > >>>>>>>>>> remember
> > >>>>>>>>>>>>>>>>>> having
> > >>>>>>>>>>>>>>>>>>>>> some difficulty understanding the IQv1 internals,
> > >>>> which
> > >>>>>> made
> > >>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>> difficult
> > >>>>>>>>>>>>>>>>>>>>> to determine what needed to be changed. However,
> > >> I
> > >>>>>> *think*
> > >>>>>>>> this
> > >>>>>>>>>>>>>>>> can be
> > >>>>>>>>>>>>>>>>>>>>> addressed for both implementations by wrapping
> > >> the
> > >>>>>>>> RocksDBStore
> > >>>>>>>>>>>>>>>> in an
> > >>>>>>>>>>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides
> > >> read
> > >>>>>> methods
> > >>>>>>>>>>>>>>>> (get,
> > >>>>>>>>>>>>>>>>>>>> etc.)
> > >>>>>>>>>>>>>>>>>>>>> to either read directly from the database or
> > >> from the
> > >>>>>> ongoing
> > >>>>>>>>>>>>>>>>>>>> transaction.
> > >>>>>>>>>>>>>>>>>>>>> But IQv1 might still be difficult.
> > >>>>>>>>>>>>>>>>>>>>> - If IsolationLevel becomes a query constraint,
> > >> then
> > >>>> all
> > >>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>> StateStores
> > >>>>>>>>>>>>>>>>>>>>> will need to respect it, including the in-memory
> > >>>> stores.
> > >>>>>> This
> > >>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>> require
> > >>>>>>>>>>>>>>>>>>>>> us to adapt in-memory stores to stage their
> > >> writes so
> > >>>>>> they
> > >>>>>>>> can
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> isolated
> > >>>>>>>>>>>>>>>>>>>>> from READ_COMMITTTED queries. It would also
> > >> become an
> > >>>>>>>> important
> > >>>>>>>>>>>>>>>>>>>>> consideration for third-party stores on upgrade,
> > >> as
> > >>>>>> without
> > >>>>>>>>>>>>>>>> changes,
> > >>>>>>>>>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>>>>>> would not support READ_COMMITTED queries
> > >> correctly.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Ultimately, I may need some help making the
> > >> necessary
> > >>>>>> change
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>> IQv1
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> support this, but I don't think it's
> > >> fundamentally
> > >>>>>>>> impossible,
> > >>>>>>>>>>>>>>>> if we
> > >>>>>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>>>> to pursue this route.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 3b.
> > >>>>>>>>>>>>>>>>>>>>> The main reason I chose to keep ALOS
> > >> un-transactional
> > >>>>>> was to
> > >>>>>>>>>>>>>>>> minimize
> > >>>>>>>>>>>>>>>>>>>>> behavioural change for most users (I believe most
> > >>>> Streams
> > >>>>>>>> users
> > >>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> default configuration, which is ALOS). That said,
> > >>>> it's
> > >>>>>> clear
> > >>>>>>>>>>>>>>>> that if
> > >>>>>>>>>>>>>>>>>>>> ALOS
> > >>>>>>>>>>>>>>>>>>>>> also used transactional stores, the only change
> > >> in
> > >>>>>> behaviour
> > >>>>>>>>>>>>>>>> would be
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> it would become *more correct*, which could be
> > >>>>>> considered a
> > >>>>>>>>>> "bug
> > >>>>>>>>>>>>>>>> fix"
> > >>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> users, rather than a change they need to handle.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I believe that performance using transactions
> > >> (aka.
> > >>>>>> RocksDB
> > >>>>>>>>>>>>>>>>>>>> WriteBatches)
> > >>>>>>>>>>>>>>>>>>>>> should actually be *better* than the un-batched
> > >>>>>> write-path
> > >>>>>>>> that
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> currently used[1]. The only "performance"
> > >>>> consideration
> > >>>>>> will
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> increased memory usage that transactions require.
> > >>>> Given
> > >>>>>> the
> > >>>>>>>>>>>>>>>>>> mitigations
> > >>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>> this memory that we have in place, I would expect
> > >>>> that
> > >>>>>> this
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>> not a
> > >>>>>>>>>>>>>>>>>>>>> problem for most users.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> If we're happy to do so, we can make ALOS also
> > >> use
> > >>>>>>>>>> transactions.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Link 1:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>
> > >>>>
> > >>
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
> > >>>>>>>>>>>> cado...@apache.org
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the updates and sorry for the delay
> > >> on my
> > >>>>>> side!
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>> Making the default implementation for flush() a
> > >>>> no-op
> > >>>>>> sounds
> > >>>>>>>>>>>>>>>> good to
> > >>>>>>>>>>>>>>>>>>>> me.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>> I think what was bugging me here is that a
> > >>>> third-party
> > >>>>>> state
> > >>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>> needs
> > >>>>>>>>>>>>>>>>>>>>>> to implement the state store interface. That
> > >> means
> > >>>> they
> > >>>>>> need
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> implement a wrapper around the actual state
> > >> store
> > >>>> as we
> > >>>>>> do
> > >>>>>>>> for
> > >>>>>>>>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>>>>>>>>>> with RocksDBStore. So, a third-party state
> > >> store can
> > >>>>>> always
> > >>>>>>>>>>>>>>>> estimate
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> uncommitted bytes, if it wants, because the
> > >> wrapper
> > >>>> can
> > >>>>>>>> record
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> added
> > >>>>>>>>>>>>>>>>>>>>>> bytes.
> > >>>>>>>>>>>>>>>>>>>>>> One case I can think of where returning -1 makes
> > >>>> sense
> > >>>>>> is
> > >>>>>>>> when
> > >>>>>>>>>>>>>>>>>> Streams
> > >>>>>>>>>>>>>>>>>>>>>> does not need to estimate the size of the write
> > >>>> batch
> > >>>>>> and
> > >>>>>>>>>>>>>>>> trigger
> > >>>>>>>>>>>>>>>>>>>>>> extraordinary commits, because the third-party
> > >> state
> > >>>>>> store
> > >>>>>>>>>>>>>>>> takes care
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>> memory. But in that case the method could also
> > >> just
> > >>>>>> return
> > >>>>>>>> 0.
> > >>>>>>>>>>>>>>>> Even
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> case would be better solved with a method that
> > >>>> returns
> > >>>>>>>> whether
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>> store manages itself the memory used for
> > >> uncommitted
> > >>>>>> bytes
> > >>>>>>>> or
> > >>>>>>>>>>>>>>>> not.
> > >>>>>>>>>>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return
> > >>>> value,
> > >>>>>> I was
> > >>>>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>>>> wondering when and if it will be used.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Regarding returning 0 for transactional state
> > >> stores
> > >>>>>> when
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>> batch
> > >>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> empty, I was just wondering because you
> > >> explicitly
> > >>>>>> stated
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> "or {@code 0} if this StateStore does not
> > >> support
> > >>>>>>>>>>>> transactions."
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> So it seemed to me returning 0 could only
> > >> happen for
> > >>>>>>>>>>>>>>>>>> non-transactional
> > >>>>>>>>>>>>>>>>>>>>>> state stores.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> a) What do you think if we move the isolation
> > >> level
> > >>>> to
> > >>>>>> IQ
> > >>>>>>>> (v1
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> v2)?
> > >>>>>>>>>>>>>>>>>>>>>> In the end this is the only component that
> > >> really
> > >>>> needs
> > >>>>>> to
> > >>>>>>>>>>>>>>>> specify
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> isolation level. It is similar to the Kafka
> > >> consumer
> > >>>>>> that
> > >>>>>>>> can
> > >>>>>>>>>>>>>>>> choose
> > >>>>>>>>>>>>>>>>>>>>>> with what isolation level to read the input
> > >> topic.
> > >>>>>>>>>>>>>>>>>>>>>> For IQv1 the isolation level should go into
> > >>>>>>>>>>>>>>>> StoreQueryParameters. For
> > >>>>>>>>>>>>>>>>>>>>>> IQv2, I would add it to the Query interface.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> b) Point a) raises the question what should
> > >> happen
> > >>>>>> during
> > >>>>>>>>>>>>>>>>>> at-least-once
> > >>>>>>>>>>>>>>>>>>>>>> processing when the state store does not use
> > >>>>>> transactions?
> > >>>>>>>>>> John
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> past proposed to also use transactions on state
> > >>>> stores
> > >>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>> at-least-once. I like that idea, because it
> > >> avoids
> > >>>>>>>> aggregating
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>> records over and over again in the case of a
> > >>>> failure. We
> > >>>>>>>> had a
> > >>>>>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>> the past where a Streams applications in
> > >>>> at-least-once
> > >>>>>> mode
> > >>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>> failing
> > >>>>>>>>>>>>>>>>>>>>>> continuously for some reasons I do not remember
> > >>>> before
> > >>>>>>>>>>>>>>>> committing the
> > >>>>>>>>>>>>>>>>>>>>>> offsets. After each failover, the app aggregated
> > >>>> again
> > >>>>>> and
> > >>>>>>>>>>>>>>>> again the
> > >>>>>>>>>>>>>>>>>>>>>> same records. Of course the aggregate increased
> > >> to
> > >>>> very
> > >>>>>>>> wrong
> > >>>>>>>>>>>>>>>> values
> > >>>>>>>>>>>>>>>>>>>>>> just because of the failover. With transactions
> > >> on
> > >>>> the
> > >>>>>> state
> > >>>>>>>>>>>>>>>> stores
> > >>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>> could have avoided this. The app would have
> > >> output
> > >>>> the
> > >>>>>> same
> > >>>>>>>>>>>>>>>> aggregate
> > >>>>>>>>>>>>>>>>>>>>>> multiple times (i.e., after each failover) but
> > >> at
> > >>>> least
> > >>>>>> the
> > >>>>>>>>>>>>>>>> value of
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> aggregate would not depend on the number of
> > >>>> failovers.
> > >>>>>>>>>>>>>>>> Outputting the
> > >>>>>>>>>>>>>>>>>>>>>> same aggregate multiple times would be incorrect
> > >>>> under
> > >>>>>>>>>>>>>>>> exactly-once
> > >>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>> it is OK for at-least-once.
> > >>>>>>>>>>>>>>>>>>>>>> If it makes sense to add a config to turn on
> > >> and off
> > >>>>>>>>>>>>>>>> transactions on
> > >>>>>>>>>>>>>>>>>>>>>> state stores under at-least-once or just use
> > >>>>>> transactions in
> > >>>>>>>>>>>>>>>> any case
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> a question we should also discuss in this KIP.
> > >> It
> > >>>>>> depends a
> > >>>>>>>>>> bit
> > >>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would
> > >>>> add a
> > >>>>>>>> config.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 4.
> > >>>>>>>>>>>>>>>>>>>>>> Your points are all valid. I tend to say to
> > >> keep the
> > >>>>>> metrics
> > >>>>>>>>>>>>>>>> around
> > >>>>>>>>>>>>>>>>>>>>>> flush() until we remove flush() completely from
> > >> the
> > >>>>>>>> interface.
> > >>>>>>>>>>>>>>>> Calls
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> flush() might still exist since existing
> > >> processors
> > >>>>>> might
> > >>>>>>>>>> still
> > >>>>>>>>>>>>>>>> call
> > >>>>>>>>>>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For
> > >>>> sure, we
> > >>>>>> need
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> document
> > >>>>>>>>>>>>>>>>>>>>>> how the metrics change due to the transactions
> > >> in
> > >>>> the
> > >>>>>>>> upgrade
> > >>>>>>>>>>>>>>>> notes.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 5.
> > >>>>>>>>>>>>>>>>>>>>>> I see. Then you should describe how the
> > >> .position
> > >>>> files
> > >>>>>> are
> > >>>>>>>>>>>>>>>> handled
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>> a dedicated section of the KIP or incorporate
> > >> the
> > >>>>>>>> description
> > >>>>>>>>>>>>>>>> in the
> > >>>>>>>>>>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only
> > >>>>>> mentioning it
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration
> > >> Plan".
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP
> > >> is a
> > >>>>>> good
> > >>>>>>>>>> idea.
> > >>>>>>>>>>>>>>>>>>>>>> Regarding downgrading, I think you could also
> > >> catch
> > >>>> the
> > >>>>>>>>>>>>>>>> exception and
> > >>>>>>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the
> > >> column
> > >>>>>> family.
> > >>>>>>>> See
> > >>>>>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>> an example:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>
> > >>
> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> It is a bit brittle, but it works.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for taking the time to review the KIP.
> > >> I'm
> > >>>> back
> > >>>>>> from
> > >>>>>>>>>>>>>>>> leave
> > >>>>>>>>>>>>>>>>>> now
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>> intend to move this forwards as quickly as I
> > >> can.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Addressing your points:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>>> Because flush() is part of the StateStore API,
> > >> it's
> > >>>>>> exposed
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> custom
> > >>>>>>>>>>>>>>>>>>>>>>> Processors, which might be making calls to
> > >> flush().
> > >>>>>> This
> > >>>>>>>> was
> > >>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> case in a few integration tests.
> > >>>>>>>>>>>>>>>>>>>>>>> To maintain as much compatibility as possible,
> > >> I'd
> > >>>>>> prefer
> > >>>>>>>> not
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>> an UnsupportedOperationException, as it will
> > >> cause
> > >>>>>>>> previously
> > >>>>>>>>>>>>>>>>>> working
> > >>>>>>>>>>>>>>>>>>>>>>> Processors to start throwing exceptions at
> > >> runtime.
> > >>>>>>>>>>>>>>>>>>>>>>> I agree that it doesn't make sense for it to
> > >> proxy
> > >>>>>>>> commit(),
> > >>>>>>>>>>>>>>>> though,
> > >>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>> that would cause it to violate the "StateStores
> > >>>> commit
> > >>>>>> only
> > >>>>>>>>>>>>>>>> when the
> > >>>>>>>>>>>>>>>>>>>> Task
> > >>>>>>>>>>>>>>>>>>>>>>> commits" rule.
> > >>>>>>>>>>>>>>>>>>>>>>> Instead, I think we should make this a no-op.
> > >> That
> > >>>> way,
> > >>>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>>>>> Processors will continue to work as-before,
> > >> without
> > >>>>>>>> violation
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>> consistency that would be caused by premature
> > >>>>>> flush/commit
> > >>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>> StateStore
> > >>>>>>>>>>>>>>>>>>>>>>> data to disk.
> > >>>>>>>>>>>>>>>>>>>>>>> What do you think?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore
> > >>>>>> implementation
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> transactional, but is unable to estimate the
> > >>>>>> uncommitted
> > >>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>>>>> usage,
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> method will return -1.
> > >>>>>>>>>>>>>>>>>>>>>>> The intention here is to permit third-party
> > >>>>>> implementations
> > >>>>>>>>>>>>>>>> that may
> > >>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> able to estimate memory usage.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been
> > >> written to
> > >>>> the
> > >>>>>>>> store
> > >>>>>>>>>>>>>>>> yet. I
> > >>>>>>>>>>>>>>>>>>>>>> thought
> > >>>>>>>>>>>>>>>>>>>>>>> that was implied by "This method will return an
> > >>>>>>>> approximation
> > >>>>>>>>>>>>>>>> of the
> > >>>>>>>>>>>>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>>>>>>>>>> would be freed by the next call to {@link
> > >>>>>> #commit(Map)}"
> > >>>>>>>> and
> > >>>>>>>>>>>>>>>>>> "@return
> > >>>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>> approximate size of all records awaiting {@link
> > >>>>>>>>>>>> #commit(Map)}",
> > >>>>>>>>>>>>>>>>>>>> however,
> > >>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you
> > >> think
> > >>>> this
> > >>>>>> is
> > >>>>>>>>>>>>>>>> unclear?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>> I realise this is probably the most contentious
> > >>>> point
> > >>>>>> in my
> > >>>>>>>>>>>>>>>> design,
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>>>>>>>>>> open to changing it if I'm unable to convince
> > >> you
> > >>>> of
> > >>>>>> the
> > >>>>>>>>>>>>>>>> benefits.
> > >>>>>>>>>>>>>>>>>>>>>>> Nevertheless, here's my argument:
> > >>>>>>>>>>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly
> > >>>> provided
> > >>>>>>>>>>>>>>>> StateStores
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> query, and it may be important for users to
> > >>>>>>>> programmatically
> > >>>>>>>>>>>>>>>> know
> > >>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>> mode the StateStore is operating under. If we
> > >>>> simply
> > >>>>>>>> provide
> > >>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the
> > >>>> internal
> > >>>>>>>> streams
> > >>>>>>>>>>>>>>>>>>>> engine), or
> > >>>>>>>>>>>>>>>>>>>>>>> similar, then users will need to understand the
> > >>>>>> operation
> > >>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> consequences
> > >>>>>>>>>>>>>>>>>>>>>>> of each available processing mode and how it
> > >>>> pertains
> > >>>>>> to
> > >>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>> StateStore.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Interactive Query users aren't the only people
> > >> that
> > >>>>>> care
> > >>>>>>>>>> about
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore:
> > >>>>>>>> implementers
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> custom
> > >>>>>>>>>>>>>>>>>>>>>>> StateStores also need to understand the
> > >> behaviour
> > >>>>>> expected
> > >>>>>>>> of
> > >>>>>>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>>> implementation. KIP-892 introduces some
> > >> assumptions
> > >>>>>> into
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>> Streams
> > >>>>>>>>>>>>>>>>>>>>>> Engine
> > >>>>>>>>>>>>>>>>>>>>>>> about how StateStores operate under each
> > >> processing
> > >>>>>> mode,
> > >>>>>>>> and
> > >>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>>>>> important that custom implementations adhere to
> > >>>> those
> > >>>>>>>>>>>>>>>> assumptions in
> > >>>>>>>>>>>>>>>>>>>>>> order
> > >>>>>>>>>>>>>>>>>>>>>>> to maintain the consistency guarantees.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> IsolationLevels provide a high-level contract
> > >> on
> > >>>> the
> > >>>>>>>>>> behaviour
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> StateStore: a user knows that under
> > >> READ_COMMITTED,
> > >>>>>> they
> > >>>>>>>> will
> > >>>>>>>>>>>>>>>> see
> > >>>>>>>>>>>>>>>>>>>> writes
> > >>>>>>>>>>>>>>>>>>>>>>> only after the Task has committed, and under
> > >>>>>>>> READ_UNCOMMITTED
> > >>>>>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>> see
> > >>>>>>>>>>>>>>>>>>>>>>> writes immediately. No understanding of the
> > >>>> details of
> > >>>>>> each
> > >>>>>>>>>>>>>>>>>>>>>> processing.mode
> > >>>>>>>>>>>>>>>>>>>>>>> is required, either for IQ users or StateStore
> > >>>>>>>> implementers.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> An argument can be made that these contractual
> > >>>>>> guarantees
> > >>>>>>>> can
> > >>>>>>>>>>>>>>>> simply
> > >>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> documented for the processing.mode (i.e. that
> > >>>>>> exactly-once
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and
> > >>>>>>>> at-least-once
> > >>>>>>>>>>>>>>>> behaves
> > >>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small
> > >>>> issues
> > >>>>>> with
> > >>>>>>>>>>>>>>>> this I'd
> > >>>>>>>>>>>>>>>>>>>>>> prefer
> > >>>>>>>>>>>>>>>>>>>>>>> to avoid:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>          - Where would we document these
> > >> contracts,
> > >>>> in
> > >>>>>> a way
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> difficult
> > >>>>>>>>>>>>>>>>>>>>>>>          for users/implementers to miss/ignore?
> > >>>>>>>>>>>>>>>>>>>>>>>          - It's not clear to users that the
> > >>>> processing
> > >>>>>> mode
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> communicating
> > >>>>>>>>>>>>>>>>>>>>>>>          an expectation of read isolation,
> > >> unless
> > >>>> they
> > >>>>>> read
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> documentation. Users
> > >>>>>>>>>>>>>>>>>>>>>>>          rarely consult documentation unless
> > >> they
> > >>>> feel
> > >>>>>> they
> > >>>>>>>>>> need
> > >>>>>>>>>>>>>>>> to, so
> > >>>>>>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>>>> likely
> > >>>>>>>>>>>>>>>>>>>>>>>          this detail would get missed by many
> > >> users.
> > >>>>>>>>>>>>>>>>>>>>>>>          - It tightly couples processing modes
> > >> to
> > >>>> read
> > >>>>>>>>>> isolation.
> > >>>>>>>>>>>>>>>> Adding
> > >>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>          processing modes, or changing the read
> > >>>>>> isolation of
> > >>>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>>>>>>>>>          modes would be difficult/impossible.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Ultimately, the cost of introducing
> > >>>> IsolationLevels is
> > >>>>>>>> just a
> > >>>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>>>>>>> method, since we re-use the existing
> > >> IsolationLevel
> > >>>>>> enum
> > >>>>>>>> from
> > >>>>>>>>>>>>>>>> Kafka.
> > >>>>>>>>>>>>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>>>>>>> gives us a clear place to document the
> > >> contractual
> > >>>>>>>> guarantees
> > >>>>>>>>>>>>>>>>>> expected
> > >>>>>>>>>>>>>>>>>>>>>>> of/provided by StateStores, that is accessible
> > >>>> both by
> > >>>>>> the
> > >>>>>>>>>>>>>>>>>> StateStore
> > >>>>>>>>>>>>>>>>>>>>>>> itself, and by IQ users.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> (Writing this I've just realised that the
> > >>>> StateStore
> > >>>>>> and IQ
> > >>>>>>>>>>>>>>>> APIs
> > >>>>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>> don't provide access to StateStoreContext that
> > >> IQ
> > >>>> users
> > >>>>>>>> would
> > >>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>> direct
> > >>>>>>>>>>>>>>>>>>>>>>> access to... Perhaps StateStore should expose
> > >>>>>>>>>> isolationLevel()
> > >>>>>>>>>>>>>>>>>> itself
> > >>>>>>>>>>>>>>>>>>>>>> too?)
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 4.
> > >>>>>>>>>>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics
> > >>>> in-place
> > >>>>>>>>>>>>>>>> either, as
> > >>>>>>>>>>>>>>>>>>>> it's a
> > >>>>>>>>>>>>>>>>>>>>>>> backwards incompatible change. My concern is
> > >> that,
> > >>>> if
> > >>>>>> we
> > >>>>>>>>>> leave
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>>>>>> "flush" metrics in place, they will be
> > >> confusing to
> > >>>>>> users.
> > >>>>>>>>>>>>>>>> Right
> > >>>>>>>>>>>>>>>>>> now,
> > >>>>>>>>>>>>>>>>>>>>>>> "flush" metrics record explicit flushes to
> > >> disk,
> > >>>> but
> > >>>>>> under
> > >>>>>>>>>>>>>>>> KIP-892,
> > >>>>>>>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>> commit() will not explicitly flush data to
> > >> disk -
> > >>>>>> RocksDB
> > >>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>> decide
> > >>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>> when to flush memtables to disk itself.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd
> > >> have
> > >>>> two
> > >>>>>>>>>>>> options,
> > >>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>> both
> > >>>>>>>>>>>>>>>>>>>>>>> seem pretty bad to me:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>          1. Have them record calls to commit(),
> > >>>> which
> > >>>>>> would
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> misleading, as
> > >>>>>>>>>>>>>>>>>>>>>>>          data is no longer explicitly "flushed"
> > >> to
> > >>>> disk
> > >>>>>> by
> > >>>>>>>> this
> > >>>>>>>>>>>>>>>> call.
> > >>>>>>>>>>>>>>>>>>>>>>>          2. Have them record nothing at all,
> > >> which
> > >>>> is
> > >>>>>>>>>> equivalent
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> removing
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>          metrics, except that users will see the
> > >>>> metric
> > >>>>>>>> still
> > >>>>>>>>>>>>>>>> exists and
> > >>>>>>>>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>>>>>>> assume
> > >>>>>>>>>>>>>>>>>>>>>>>          that the metric is correct, and that
> > >>>> there's a
> > >>>>>>>> problem
> > >>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>> system
> > >>>>>>>>>>>>>>>>>>>>>>>          when there isn't.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I agree that removing them is also a bad
> > >> solution,
> > >>>> and
> > >>>>>> I'd
> > >>>>>>>>>>>>>>>> like some
> > >>>>>>>>>>>>>>>>>>>>>>> guidance on the best path forward here.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 5.
> > >>>>>>>>>>>>>>>>>>>>>>> Position files are updated on every write to a
> > >>>>>> StateStore.
> > >>>>>>>>>>>>>>>> Since our
> > >>>>>>>>>>>>>>>>>>>>>> writes
> > >>>>>>>>>>>>>>>>>>>>>>> are now buffered until commit(), we can't
> > >> update
> > >>>> the
> > >>>>>>>> Position
> > >>>>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>>>>>>>> until
> > >>>>>>>>>>>>>>>>>>>>>>> commit() has been called, otherwise it would be
> > >>>>>>>> inconsistent
> > >>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> data
> > >>>>>>>>>>>>>>>>>>>>>>> in the event of a rollback. Consequently, we
> > >> need
> > >>>> to
> > >>>>>> manage
> > >>>>>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>>> offsets
> > >>>>>>>>>>>>>>>>>>>>>>> the same way we manage the checkpoint offsets,
> > >> and
> > >>>>>> ensure
> > >>>>>>>>>>>>>>>> they're
> > >>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>> written on commit().
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what
> > >>>> tests to
> > >>>>>>>>>> write.
> > >>>>>>>>>>>>>>>> How
> > >>>>>>>>>>>>>>>>>>>>>> explicit
> > >>>>>>>>>>>>>>>>>>>>>>> do we need to be here in the KIP?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed
> > >> to be
> > >>>>>>>> seamless,
> > >>>>>>>>>>>>>>>> and we
> > >>>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>> definitely add some tests around that.
> > >> Downgrade,
> > >>>> it
> > >>>>>>>>>>>>>>>> transpires,
> > >>>>>>>>>>>>>>>>>> isn't
> > >>>>>>>>>>>>>>>>>>>>>>> currently possible, as the extra column family
> > >> for
> > >>>>>> offset
> > >>>>>>>>>>>>>>>> storage is
> > >>>>>>>>>>>>>>>>>>>>>>> incompatible with the pre-KIP-892
> > >> implementation:
> > >>>> when
> > >>>>>> you
> > >>>>>>>>>>>>>>>> open a
> > >>>>>>>>>>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>>>>>>>>>>> database, you must open all available column
> > >>>> families
> > >>>>>> or
> > >>>>>>>>>>>>>>>> receive an
> > >>>>>>>>>>>>>>>>>>>>>> error.
> > >>>>>>>>>>>>>>>>>>>>>>> What currently happens on downgrade is that it
> > >>>>>> attempts to
> > >>>>>>>>>>>>>>>> open the
> > >>>>>>>>>>>>>>>>>>>>>> store,
> > >>>>>>>>>>>>>>>>>>>>>>> throws an error about the offsets column
> > >> family not
> > >>>>>> being
> > >>>>>>>>>>>>>>>> opened,
> > >>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given
> > >> that
> > >>>>>>>>>> downgrades
> > >>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> uncommon, I think this is acceptable
> > >> behaviour, as
> > >>>> the
> > >>>>>>>>>>>>>>>> end-state is
> > >>>>>>>>>>>>>>>>>>>>>>> consistent, even if it results in an
> > >> undesirable
> > >>>> state
> > >>>>>>>>>>>> restore.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Should I document the upgrade/downgrade
> > >> behaviour
> > >>>>>>>> explicitly
> > >>>>>>>>>>>>>>>> in the
> > >>>>>>>>>>>>>>>>>>>> KIP?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
> > >>>>>>>>>>>>>>>> cado...@apache.org>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Nick!
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the updates!
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>>>> Why does StateStore#flush() default to
> > >>>>>>>>>>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())?
> > >>>>>>>>>>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore
> > >>>> after
> > >>>>>> this
> > >>>>>>>> KIP
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>> released, I would rather throw an unsupported
> > >>>>>> operation
> > >>>>>>>>>>>>>>>> exception
> > >>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>> default.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>>>> When would a state store return -1 from
> > >>>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes()
> > >> while
> > >>>>>> being
> > >>>>>>>>>>>>>>>>>>>> transactional?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Wouldn't
> > >>>> StateStore#approximateNumUncommittedBytes()
> > >>>>>> also
> > >>>>>>>>>>>>>>>> return 0
> > >>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>>> the state store is transactional but nothing
> > >> has
> > >>>> been
> > >>>>>>>>>> written
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> state store yet?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this
> > >> KIP
> > >>>> really
> > >>>>>>>> need
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()?
> > >>>> StateStoreContext
> > >>>>>> has
> > >>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>>>>>>>> appConfigs() which basically exposes the same
> > >>>>>> information,
> > >>>>>>>>>>>>>>>> i.e., if
> > >>>>>>>>>>>>>>>>>>>> EOS
> > >>>>>>>>>>>>>>>>>>>>>>>> is enabled or not.
> > >>>>>>>>>>>>>>>>>>>>>>>> In one of your previous e-mails you wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore
> > >>>> interface
> > >>>>>> as
> > >>>>>>>>>>>>>>>> loosely
> > >>>>>>>>>>>>>>>>>>>> coupled
> > >>>>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> > >>>>>> implementers
> > >>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> freedom,
> > >>>>>>>>>>>>>>>>>>>>>>>> and reduce the amount of internal knowledge
> > >>>> required."
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> While I understand the intent, I doubt that it
> > >>>>>> decreases
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> coupling of
> > >>>>>>>>>>>>>>>>>>>>>>>> a StateStore interface and the Streams engine.
> > >>>>>>>>>> READ_COMMITTED
> > >>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>> applies to IQ but not to reads by processors.
> > >>>> Thus,
> > >>>>>>>>>>>>>>>> implementers
> > >>>>>>>>>>>>>>>>>>>> need to
> > >>>>>>>>>>>>>>>>>>>>>>>> understand how Streams accesses the state
> > >> stores.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I would like to hear what others think about
> > >> this.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 4.
> > >>>>>>>>>>>>>>>>>>>>>>>> Great exposing new metrics for transactional
> > >> state
> > >>>>>> stores!
> > >>>>>>>>>>>>>>>>>> However, I
> > >>>>>>>>>>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate
> > >> (in
> > >>>> the
> > >>>>>>>> docs)
> > >>>>>>>>>>>>>>>> the old
> > >>>>>>>>>>>>>>>>>>>>>>>> ones. You can find examples of deprecated
> > >> metrics
> > >>>>>> here:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>> https://kafka.apache.org/documentation/#selector_monitoring
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 5.
> > >>>>>>>>>>>>>>>>>>>>>>>> Why does the KIP mention position files? I do
> > >> not
> > >>>>>> think
> > >>>>>>>> they
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>> related
> > >>>>>>>>>>>>>>>>>>>>>>>> to transactions or flushes.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> 6.
> > >>>>>>>>>>>>>>>>>>>>>>>> I think we will also need to adapt/add
> > >> integration
> > >>>>>> tests
> > >>>>>>>>>>>>>>>> besides
> > >>>>>>>>>>>>>>>>>> unit
> > >>>>>>>>>>>>>>>>>>>>>>>> tests. Additionally, we probably need
> > >> integration
> > >>>> or
> > >>>>>>>> system
> > >>>>>>>>>>>>>>>> tests
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> verify that upgrades and downgrades between
> > >>>>>> transactional
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>> non-transactional state stores work as
> > >> expected.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in
> > >> the
> > >>>> KIP,
> > >>>>>>>> under
> > >>>>>>>>>>>>>>>>>>>> "Rejected
> > >>>>>>>>>>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea
> > >> worth
> > >>>>>> pursuing,
> > >>>>>>>>>>>>>>>> but I
> > >>>>>>>>>>>>>>>>>>>> believe
> > >>>>>>>>>>>>>>>>>>>>>>>>> that it's out of the scope of this KIP,
> > >> because
> > >>>> it
> > >>>>>>>> solves a
> > >>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>> set
> > >>>>>>>>>>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of
> > >> this
> > >>>> one
> > >>>>>> has
> > >>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>> grown
> > >>>>>>>>>>>>>>>>>>>>>>>> quite
> > >>>>>>>>>>>>>>>>>>>>>>>>> large!
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
> > >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I've updated the KIP (
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > >>>>>>>>>>>>>>>>>>>>>>>> )
> > >>>>>>>>>>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing
> > >> back
> > >>>>>> "Atomic
> > >>>>>>>>>>>>>>>>>>>> Checkpointing"
> > >>>>>>>>>>>>>>>>>>>>>>>> (for
> > >>>>>>>>>>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think
> > >> the one
> > >>>>>> thing
> > >>>>>>>>>>>>>>>> missing is
> > >>>>>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>>>>>> changes to metrics (notably the store
> > >> "flush"
> > >>>>>> metrics
> > >>>>>>>> will
> > >>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> renamed to "commit").
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> The reason I brought back Atomic
> > >> Checkpointing
> > >>>> was
> > >>>>>> to
> > >>>>>>>>>>>>>>>> decouple
> > >>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>>>>>>>>>>>> from store commit. This is important,
> > >> because
> > >>>> with
> > >>>>>>>>>>>>>>>> Transactional
> > >>>>>>>>>>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on
> > >>>> *every*
> > >>>>>> Task
> > >>>>>>>>>>>>>>>> commit,
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>> just when the StateStore is closing,
> > >> otherwise
> > >>>> our
> > >>>>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>> buffer
> > >>>>>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>> never be written and persisted, instead
> > >> growing
> > >>>>>>>> unbounded!
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>> experimented
> > >>>>>>>>>>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a
> > >> store
> > >>>>>> flush
> > >>>>>>>>>>>>>>>> whenever
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its
> > >>>>>> configured
> > >>>>>>>>>>>>>>>> size, but
> > >>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer
> > >>>> from
> > >>>>>> being
> > >>>>>>>>>>>>>>>>>> configured
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> unbounded, and it still would have required
> > >>>> explicit
> > >>>>>>>>>>>>>>>> flushes of
> > >>>>>>>>>>>>>>>>>>>>>> RocksDB,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory
> > >>>>>> utilization.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the
> > >> "right"
> > >>>> way
> > >>>>>> to
> > >>>>>>>>>>>>>>>> resolve
> > >>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog
> > >> offsets
> > >>>> that
> > >>>>>>>>>>>>>>>> correspond
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> most
> > >>>>>>>>>>>>>>>>>>>>>>>>>> recently written records are always
> > >> atomically
> > >>>>>> written
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> StateStore
> > >>>>>>>>>>>>>>>>>>>>>>>>>> (by writing them to the same transaction
> > >>>> buffer),
> > >>>>>> we can
> > >>>>>>>>>>>>>>>> avoid
> > >>>>>>>>>>>>>>>>>>>>>> forcibly
> > >>>>>>>>>>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk,
> > >> letting
> > >>>>>> RocksDB
> > >>>>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>>>> them
> > >>>>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>>>> when necessary, without losing any of our
> > >>>>>> consistency
> > >>>>>>>>>>>>>>>> guarantees.
> > >>>>>>>>>>>>>>>>>>>> See
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> updated KIP for more info.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I have fully implemented these changes,
> > >>>> although I'm
> > >>>>>>>> still
> > >>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>> entirely
> > >>>>>>>>>>>>>>>>>>>>>>>>>> happy with the implementation for segmented
> > >>>>>> StateStores,
> > >>>>>>>>>> so
> > >>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>> plan
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests
> > >> pass. If
> > >>>>>> you'd
> > >>>>>>>> like
> > >>>>>>>>>>>>>>>> to try
> > >>>>>>>>>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>>>>> review this highly experimental and
> > >> incomplete
> > >>>>>> branch,
> > >>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>> available
> > >>>>>>>>>>>>>>>>>>>>>>>> here:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0
> > >>>>>>>> .
> > >>>>>>>>>>>>>>>> Note:
> > >>>>>>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>>>>>> built
> > >>>>>>>>>>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable
> > >> base
> > >>>> to
> > >>>>>> build
> > >>>>>>>>>>>>>>>> and test
> > >>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>> on,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples
> > >> comparisons
> > >>>> in a
> > >>>>>>>> live
> > >>>>>>>>>>>>>>>>>>>>>> environment. I
> > >>>>>>>>>>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's
> > >> nearer
> > >>>>>>>>>> completion
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> has
> > >>>>>>>>>>>>>>>>>>>>>> been
> > >>>>>>>>>>>>>>>>>>>>>>>>>> proven on our main application.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing
> > >> and
> > >>>>>> testing:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window)
> > >>>> stores
> > >>>>>>>>>>>>>>>>>>>>>>>>>> - Global stores
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> As I do not currently use either of these,
> > >> so my
> > >>>>>> primary
> > >>>>>>>>>>>>>>>> test
> > >>>>>>>>>>>>>>>>>>>>>>>> environment
> > >>>>>>>>>>>>>>>>>>>>>>>>>> doesn't test these areas.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next
> > >> week
> > >>>> for
> > >>>>>> a few
> > >>>>>>>>>>>>>>>> weeks,
> > >>>>>>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>> not have time to move this forward until
> > >> late
> > >>>>>> August.
> > >>>>>>>> That
> > >>>>>>>>>>>>>>>> said,
> > >>>>>>>>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just
> > >>>> won't be
> > >>>>>>>> able
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> respond
> > >>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>> quickly as usual.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
> > >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact
> > >> on IQ
> > >>>> is
> > >>>>>> not
> > >>>>>>>>>>>>>>>> something
> > >>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>> had
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> considered.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> What about atomically updating the state
> > >> store
> > >>>>>> from the
> > >>>>>>>>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing
> > >> the
> > >>>>>>>> checkpoint
> > >>>>>>>>>>>>>>>> (thus,
> > >>>>>>>>>>>>>>>>>>>>>>>> flushing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of
> > >> data
> > >>>>>> and/or
> > >>>>>>>>>>>>>>>> number of
> > >>>>>>>>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> intervals?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you
> > >> suggesting
> > >>>>>> that we
> > >>>>>>>>>>>>>>>> add an
> > >>>>>>>>>>>>>>>>>>>>>>>> additional
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> config for the max number of commit
> > >> intervals
> > >>>>>> between
> > >>>>>>>>>>>>>>>>>> checkpoints?
> > >>>>>>>>>>>>>>>>>>>>>> That
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the
> > >>>>>> transaction
> > >>>>>>>>>>>>>>>> buffers
> > >>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>> nearly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of
> > >> commit
> > >>>>>>>> intervals
> > >>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>> elapsed,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> whichever comes first?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although
> > >> this
> > >>>>>>>> re-ignites
> > >>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>> earlier
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> debate about whether a config should be
> > >>>> measured in
> > >>>>>>>>>>>>>>>> "number of
> > >>>>>>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute
> > >> time.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the
> > >> reason
> > >>>> I
> > >>>>>> was
> > >>>>>>>>>>>>>>>> pursuing
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> Atomic
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable
> > >> flush
> > >>>> from
> > >>>>>>>>>>>>>>>> checkpointing,
> > >>>>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every
> > >> commit
> > >>>>>> without
> > >>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>> performance
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely
> > >> the
> > >>>>>> "best"
> > >>>>>>>>>>>>>>>> solution,
> > >>>>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>>>> I'm not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back
> > >> into
> > >>>> this
> > >>>>>> KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm currently working on moving all the
> > >>>>>> transactional
> > >>>>>>>>>>>> logic
> > >>>>>>>>>>>>>>>>>>>> directly
> > >>>>>>>>>>>>>>>>>>>>>>>> into
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with
> > >> the
> > >>>>>>>>>>>>>>>>>>>>>> StateStore#newTransaction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> method, and reduces the number of new
> > >> classes
> > >>>>>>>> introduced,
> > >>>>>>>>>>>>>>>>>>>>>> significantly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and
> > >> the
> > >>>>>>>> complexity
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> drastically
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic
> > >>>> Checkpoints
> > >>>>>>>> into
> > >>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>> KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna
> > >> <
> > >>>>>>>>>>>>>>>> cado...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> As far as I understand, you want to
> > >> atomically
> > >>>>>> update
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the
> > >>>> memtable
> > >>>>>> of a
> > >>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit
> > >> time
> > >>>>>> elapsed
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>> after
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that
> > >> would
> > >>>> lead
> > >>>>>> to
> > >>>>>>>>>>>>>>>> exceeding
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > >>>> before the
> > >>>>>>>> next
> > >>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>>>>>> interval
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> ends.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would
> > >> commit
> > >>>>>> every
> > >>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>> interval
> > >>>>>>>>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store will only be atomically
> > >>>> updated
> > >>>>>>>> roughly
> > >>>>>>>>>>>>>>>> every
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of
> > >>>> data.
> > >>>>>> Also
> > >>>>>>>> IQ
> > >>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> see new data roughly every
> > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> After a failure the state store needs to
> > >>>> restore
> > >>>>>> up to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Is this correct?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> What about atomically updating the state
> > >> store
> > >>>>>> from
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing
> > >> the
> > >>>>>>>> checkpoint
> > >>>>>>>>>>>>>>>> (thus,
> > >>>>>>>>>>>>>>>>>>>>>>>> flushing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of
> > >> data
> > >>>>>> and/or
> > >>>>>>>>>>>>>>>> number of
> > >>>>>>>>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have
> > >> the
> > >>>> same
> > >>>>>> delay
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because
> > >> both
> > >>>>>> would
> > >>>>>>>>>>>>>>>> appear
> > >>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However,
> > >>>> after a
> > >>>>>>>> failure
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> still needs to restore up to
> > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> it might restore data that is already in
> > >> the
> > >>>> state
> > >>>>>>>> store
> > >>>>>>>>>>>>>>>>>> because
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable
> > >> offset
> > >>>>>> (i.e.
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>> last
> > >>>>>>>>>>>>>>>>>>>>>> committed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring
> > >>>> data
> > >>>>>> that
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should
> > >> not
> > >>>>>> violated.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> This solution needs at least one new
> > >> config to
> > >>>>>> specify
> > >>>>>>>>>>>>>>>> when a
> > >>>>>>>>>>>>>>>>>>>>>>>> checkpoint
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> should be written.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail
> > >>>> that
> > >>>>>> does
> > >>>>>>>> not
> > >>>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>>>>>>>> anything
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit
> > >>>> interval
> > >>>>>> is 30
> > >>>>>>>>>>>>>>>> seconds,
> > >>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> five
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> seconds.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch
> > >> on
> > >>>> our
> > >>>>>>>> staging
> > >>>>>>>>>>>>>>>>>>>>>> environment,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting it through its paces in our
> > >>>> non-trivial
> > >>>>>>>>>>>>>>>> application.
> > >>>>>>>>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate
> > >> that
> > >>>> we
> > >>>>>> saw
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>> last
> > >>>>>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP,
> > >> but
> > >>>> this
> > >>>>>>>> time,
> > >>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>> think I
> > >>>>>>>>>>>>>>>>>>>>>> know
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> why.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit,
> > >> which is
> > >>>>>> called
> > >>>>>>>> at
> > >>>>>>>>>>>>>>>> the end
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Task
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit process, has the following
> > >> behaviour:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - Under ALOS: checkpoint the
> > >> state
> > >>>>>> stores.
> > >>>>>>>>>> This
> > >>>>>>>>>>>>>>>>>> includes
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            flushing memtables in RocksDB.
> > >>>> This is
> > >>>>>>>>>>>> acceptable
> > >>>>>>>>>>>>>>>>>>>> because the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            commit.interval.ms is 5
> > >> seconds,
> > >>>> so
> > >>>>>>>> forcibly
> > >>>>>>>>>>>>>>>> flushing
> > >>>>>>>>>>>>>>>>>>>>>> memtables
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> every 5
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            seconds is acceptable for most
> > >>>>>>>> applications.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - Under EOS: checkpointing is
> > >> not
> > >>>> done,
> > >>>>>>>>>> *unless*
> > >>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> forced, due
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            to e.g. the Task closing or
> > >> being
> > >>>>>> revoked.
> > >>>>>>>>>> This
> > >>>>>>>>>>>>>>>> means
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> under
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> normal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            processing conditions, the
> > >> state
> > >>>> stores
> > >>>>>>>> will
> > >>>>>>>>>> not
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>> checkpointed,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and will
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            not have memtables flushed at
> > >> all ,
> > >>>>>> unless
> > >>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>>>>>>>> decides to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush them on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            its own. Checkpointing stores
> > >> and
> > >>>>>>>>>> force-flushing
> > >>>>>>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>> memtables
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is only
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            done when a Task is being
> > >> closed.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint
> > >>>> stores on
> > >>>>>> at
> > >>>>>>>>>>>> least
> > >>>>>>>>>>>>>>>>>> *some*
> > >>>>>>>>>>>>>>>>>>>>>>>> normal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the
> > >> RocksDB
> > >>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>> buffers
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets
> > >> are
> > >>>>>> synced to
> > >>>>>>>>>>>>>>>> disk to
> > >>>>>>>>>>>>>>>>>>>>>> prevent
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> restores from getting out of hand.
> > >>>> Consequently,
> > >>>>>> my
> > >>>>>>>>>>>>>>>> current
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task
> > >> commit,
> > >>>>>> which
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>> far too
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> frequent.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000
> > >> records,
> > >>>>>> which
> > >>>>>>>> is
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> flush
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing
> > >> performance
> > >>>>>> problems
> > >>>>>>>> for
> > >>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> applications.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and
> > >> I'm
> > >>>>>>>> currently
> > >>>>>>>>>>>>>>>> leaning
> > >>>>>>>>>>>>>>>>>>>>>>>> towards
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> using the
> > >>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>>>>>>>> configuration
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to
> > >>>> exceed it.
> > >>>>>>>> This
> > >>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> complement the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing "early Task commit"
> > >> functionality
> > >>>> that
> > >>>>>> this
> > >>>>>>>>>>>>>>>>>>>> configuration
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides, in the following way:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - Currently, we use
> > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> force an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            early Task commit if processing
> > >>>> more
> > >>>>>>>> records
> > >>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>> cause
> > >>>>>>>>>>>>>>>>>>>> our
> > >>>>>>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            transactions to exceed the
> > >> memory
> > >>>>>> assigned
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>> them.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            - New functionality: when a
> > >> Task
> > >>>> *does*
> > >>>>>>>>>> commit,
> > >>>>>>>>>>>>>>>> we will
> > >>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            the stores (and hence flush the
> > >>>>>> transaction
> > >>>>>>>>>>>>>>>> buffers)
> > >>>>>>>>>>>>>>>>>>>> unless
> > >>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> expect to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            cross the
> > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > >>>>>>>>>>>>>>>>>>>> threshold
> > >>>>>>>>>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the next
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            commit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick
> > >> Telford <
> > >>>>>>>>>>>>>>>>>>>> nick.telf...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I
> > >> meant in
> > >>>>>> terms of
> > >>>>>>>>>>>>>>>>>>>> understanding
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance profile under various
> > >>>>>> circumstances. The
> > >>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> complex
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would
> > >> be for
> > >>>>>> users
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance they see. For example,
> > >> spilling
> > >>>>>> records
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>> disk
> > >>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold
> > >>>> would, I
> > >>>>>>>>>> expect,
> > >>>>>>>>>>>>>>>>>> reduce
> > >>>>>>>>>>>>>>>>>>>>>> write
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write
> > >>>> throughput
> > >>>>>> could
> > >>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>> unexpected,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> potentially difficult to
> > >>>> diagnose/understand for
> > >>>>>>>>>> users.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early
> > >> commit"
> > >>>>>> concept is
> > >>>>>>>>>>>>>>>>>> relatively
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document,
> > >> and
> > >>>>>>>>>>>> conceptually
> > >>>>>>>>>>>>>>>>>> fairly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> obvious to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to
> > >>>> make it
> > >>>>>>>>>> easier
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it happens though.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. (the second one)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an
> > >>>> indirect
> > >>>>>> way
> > >>>>>>>> of
> > >>>>>>>>>>>>>>>>>> telling
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> StateStores
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether they should be transactional.
> > >>>>>> READ_COMMITTED
> > >>>>>>>>>>>>>>>>>>>> essentially
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> requires
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that
> > >> two
> > >>>>>> threads
> > >>>>>>>>>>>>>>>> calling
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes
> > >>>> from
> > >>>>>> the
> > >>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> until
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> they have been committed. With
> > >>>>>> READ_UNCOMMITTED, all
> > >>>>>>>>>>>>>>>> bets are
> > >>>>>>>>>>>>>>>>>>>> off,
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe
> > >> written
> > >>>>>> records
> > >>>>>>>> at
> > >>>>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>>> time,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> which is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That
> > >> said,
> > >>>>>>>> StateStores
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>> free
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implement
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> these guarantees however they can,
> > >> which is
> > >>>> a
> > >>>>>> bit
> > >>>>>>>> more
> > >>>>>>>>>>>>>>>>>> relaxed
> > >>>>>>>>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions".
> > >> For
> > >>>>>> example,
> > >>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>> RocksDB
> > >>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED ==
> > >>>> WBWI-based
> > >>>>>>>>>>>>>>>>>> "transactions",
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the
> > >>>>>> database.
> > >>>>>>>> But
> > >>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> storage
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to
> > >> *always*
> > >>>> use
> > >>>>>>>>>>>>>>>> transactions,
> > >>>>>>>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage
> > >> engines
> > >>>>>> that
> > >>>>>>>>>> don't
> > >>>>>>>>>>>>>>>>>> provide
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation
> > >> guarantees
> > >>>> can
> > >>>>>> be
> > >>>>>>>> met
> > >>>>>>>>>>>>>>>> using a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> technique.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the
> > >> StateStore
> > >>>>>> interface
> > >>>>>>>> as
> > >>>>>>>>>>>>>>>>>> loosely
> > >>>>>>>>>>>>>>>>>>>>>>>> coupled
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to
> > >> give
> > >>>>>>>>>>>>>>>> implementers
> > >>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> freedom, and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge
> > >>>>>> required.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That said, I understand that
> > >>>> "IsolationLevel"
> > >>>>>> might
> > >>>>>>>>>> not
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> right
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it
> > >> much
> > >>>> more
> > >>>>>>>>>>>>>>>> explicit if
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> required, e.g.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean transactional()
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 7-8.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can make these changes either later
> > >> today
> > >>>> or
> > >>>>>>>>>>>> tomorrow.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Small update:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and
> > >> fixed a
> > >>>>>> bunch of
> > >>>>>>>>>>>>>>>> issues
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> needed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests
> > >> pass,
> > >>>>>> which is
> > >>>>>>>>>>>>>>>>>> promising,
> > >>>>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> need to undergo some performance
> > >> testing. I
> > >>>>>> haven't
> > >>>>>>>>>>>>>>>> (yet)
> > >>>>>>>>>>>>>>>>>>>> worked
> > >>>>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff,
> > >> but I
> > >>>>>> would
> > >>>>>>>>>>>>>>>> expect
> > >>>>>>>>>>>>>>>>>> that,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no
> > >>>> difference. The
> > >>>>>>>>>> branch
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> available
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c
> > >>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>> anyone
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interested in taking an early look.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno
> > >> Cadonna
> > >>>> <
> > >>>>>>>>>>>>>>>>>>>> cado...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was
> > >> actually
> > >>>> also
> > >>>>>> my
> > >>>>>>>>>>>>>>>> point. I
> > >>>>>>>>>>>>>>>>>>>>>>>> understood
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion
> > >> path
> > >>>> as
> > >>>>>> a way
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>> avoid
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> early
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the
> > >>>> intent.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually
> > >> it is
> > >>>>>> public
> > >>>>>>>>>>>>>>>> API. My
> > >>>>>>>>>>>>>>>>>>>>>> question
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal
> > >>>>>> processing.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the
> > >> size
> > >>>> of the
> > >>>>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>>>>>>>> buffer
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit
> > >> affects
> > >>>> the
> > >>>>>> whole
> > >>>>>>>>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a
> > >> state
> > >>>> store
> > >>>>>> is
> > >>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> confined to
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and
> > >>>>>> changes the
> > >>>>>>>>>>>>>>>> behavior
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you
> > >> that
> > >>>> it
> > >>>>>> is a
> > >>>>>>>>>>>>>>>> decent
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> compromise. I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides
> > >> and
> > >>>> list
> > >>>>>> the
> > >>>>>>>>>>>>>>>> options to
> > >>>>>>>>>>>>>>>>>>>>>>>> overcome
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all
> > >>>> options
> > >>>>>> seem
> > >>>>>>>>>>>> quite
> > >>>>>>>>>>>>>>>>>> heavy
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> compared
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand
> > >> what you
> > >>>>>> mean
> > >>>>>>>> with
> > >>>>>>>>>>>>>>>> "less
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> predictable
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users", though.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the
> > >>>> alternatives
> > >>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>>>> interesting.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> But I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good
> > >> and
> > >>>> we
> > >>>>>> should
> > >>>>>>>>>>>>>>>> continue
> > >>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>> it!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my
> > >> e-mail on
> > >>>>>> June
> > >>>>>>>>>> 20th:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning
> > >> behind
> > >>>>>> putting
> > >>>>>>>>>>>>>>>> isolation
> > >>>>>>>>>>>>>>>>>>>>>> level
> > >>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should
> > >>>> that
> > >>>>>> also
> > >>>>>>>> be
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> give
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to
> > >> decide
> > >>>>>> whether
> > >>>>>>>> to
> > >>>>>>>>>>>>>>>> turn on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions or not?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned
> > >> about
> > >>>>>> how do
> > >>>>>>>>>> you
> > >>>>>>>>>>>>>>>> know
> > >>>>>>>>>>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written
> > >> under
> > >>>> EOS,
> > >>>>>> if
> > >>>>>>>> you
> > >>>>>>>>>>>>>>>> do not
> > >>>>>>>>>>>>>>>>>>>>>> have a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to know if the state store is
> > >>>> transactional or
> > >>>>>> not.
> > >>>>>>>>>> If
> > >>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can
> > >> be
> > >>>>>> written
> > >>>>>>>>>>>>>>>> during
> > >>>>>>>>>>>>>>>>>>>> normal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state
> > >> store
> > >>>> is not
> > >>>>>>>>>>>>>>>>>> transactional,
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written
> > >> under
> > >>>> EOS.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 7.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My point was about not only
> > >> considering the
> > >>>>>> bytes
> > >>>>>>>> in
> > >>>>>>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>> config
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but
> > >> also
> > >>>>>> bytes
> > >>>>>>>> that
> > >>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> spilled
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering
> > >>>> whether you
> > >>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>> remove
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory
> > >>>> bytes to
> > >>>>>> be
> > >>>>>>>>>> used
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store
> > >> records." My
> > >>>>>>>> thinking
> > >>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to
> > >>>> disk,
> > >>>>>>>>>> limiting
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> overall
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> bytes
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it
> > >> again
> > >>>> and
> > >>>>>>>>>>>>>>>> considering
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> recent
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much
> > >>>> sense
> > >>>>>>>> anymore.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I like the name
> > >>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes that
> > >>>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 8.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A high-level description (without
> > >>>>>> implementation
> > >>>>>>>>>>>>>>>> details) of
> > >>>>>>>>>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of
> > >> changelog
> > >>>>>>>>>>>>>>>> transactions,
> > >>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be
> > >>>> great.
> > >>>>>>>> Would
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> great
> > >>>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the
> > >>>>>> behavior in
> > >>>>>>>>>>>>>>>> case of
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> failure.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional
> > >> state
> > >>>>>> store
> > >>>>>>>>>>>>>>>> recover
> > >>>>>>>>>>>>>>>>>>>> after a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the
> > >>>> transaction
> > >>>>>>>> buffer,
> > >>>>>>>>>>>>>>>> etc.
> > >>>>>>>>>>>>>>>>>>>> (that
> > >>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that
> > >>>>>>>>>>>>>>>> WriteBatchWithIndex
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transactions
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to
> > >>>> exceed)
> > >>>>>>>>>>>>>>>> configured
> > >>>>>>>>>>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> needs to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns.
> > >>>> Ultimately,
> > >>>>>> any
> > >>>>>>>>>>>>>>>> approach
> > >>>>>>>>>>>>>>>>>>>> based
> > >>>>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cracking
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in
> > >>>> ways
> > >>>>>> it's
> > >>>>>>>> not
> > >>>>>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>>>>>> designed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen
> > >> performance
> > >>>> or
> > >>>>>>>>>>>>>>>> consistency
> > >>>>>>>>>>>>>>>>>>>> issues.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing
> > >> these
> > >>>>>> early
> > >>>>>>>>>>>>>>>> commits?
> > >>>>>>>>>>>>>>>>>>>> While
> > >>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ideal, I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to
> > >>>> ensure
> > >>>>>>>>>>>>>>>> consistency
> > >>>>>>>>>>>>>>>>>>>> whilst
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> maintaining
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem
> > >> *very*
> > >>>>>>>>>>>>>>>> complicated, and
> > >>>>>>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for
> > >> users
> > >>>> as a
> > >>>>>>>>>>>>>>>> consequence.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of
> > >> this
> > >>>>>> KIP is
> > >>>>>>>>>>>>>>>> growing a
> > >>>>>>>>>>>>>>>>>>>> bit
> > >>>>>>>>>>>>>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss
> > >> ideas
> > >>>> for
> > >>>>>>>> future
> > >>>>>>>>>>>>>>>>>>>>>>>> improvements, I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think
> > >>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope
> > >> down
> > >>>> to a
> > >>>>>>>> design
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> achieves
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized
> > >>>>>> restorations
> > >>>>>>>>>>>>>>>> during
> > >>>>>>>>>>>>>>>>>> dirty
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design
> > >> that
> > >>>>>> this KIP
> > >>>>>>>>>>>>>>>> produces
> > >>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ultimately
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially
> > >> if
> > >>>> the
> > >>>>>> bulk
> > >>>>>>>> of
> > >>>>>>>>>>>>>>>> it is
> > >>>>>>>>>>>>>>>>>>>>>> internal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week
> > >>>> trying
> > >>>>>> to
> > >>>>>>>>>>>>>>>> re-work
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove
> > >> the
> > >>>>>>>>>>>>>>>> newTransaction()
> > >>>>>>>>>>>>>>>>>>>>>> method,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of
> > >>>>>>>> RocksDBStore.
> > >>>>>>>>>>>>>>>> That
> > >>>>>>>>>>>>>>>>>>>> way, if
> > >>>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> want to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the
> > >> future,
> > >>>>>> like
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>> SST
> > >>>>>>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> management
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so
> > >> with
> > >>>>>> little/no
> > >>>>>>>>>>>> API
> > >>>>>>>>>>>>>>>>>>>> changes.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>
> > >
>

Reply via email to