Hi Nick,

I think indeed the better behavior would be to retry commitTransaction
until we risk running out of time to meet `max.poll.interval.ms`.

However, if it's handled as a `TaskCorruptedException` at the moment,
I would do the same in this KIP, and leave exception handling
improvements to future work. This KIP is already improving the
situation a lot by not wiping the state store.

Cheers,
Lucas

On Tue, Oct 17, 2023 at 3:51 PM Nick Telford <nick.telf...@gmail.com> wrote:
>
> Hi Lucas,
>
> Yeah, this is pretty much the direction I'm thinking of going in now. You
> make an interesting point about committing on-error under
> ALOS/READ_COMMITTED, although I haven't had a chance to think through the
> implications yet.
>
> Something that I ran into earlier this week is an issue with the new
> handling of TimeoutException. Without TX stores, TimeoutException under EOS
> throws a TaskCorruptedException, which wipes the stores. However, with TX
> stores, TimeoutException is now just bubbled up and dealt with as it is
> under ALOS. The problem arises when the Producer#commitTransaction call
> times out: Streams attempts to ignore the error and continue producing,
> which causes the next call to Producer#send to throw
> "IllegalStateException: Cannot attempt operation `send` because the
> previous call to `commitTransaction` timed out and must be retried".
>
> I'm not sure what we should do here: retrying the commitTransaction seems
> logical, but what if it times out again? Where do we draw the line and
> shutdown the instance?
>
> Regards,
> Nick
>
> On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy <lbruts...@confluent.io.invalid>
> wrote:
>
> > Hi all,
> >
> > I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED,
> > but keep wiping the state on error, and I'd vote for this solution
> > when introducing `default.state.isolation.level`. This way, we'd have
> > the most low-risk roll-out of this feature (no behavior change without
> > reconfiguration), with the possibility of switching to the most sane /
> > battle-tested default settings in 4.0. Essentially, we'd have a
> > feature flag but call it `default.state.isolation.level` and don't
> > have to deprecate it later.
> >
> > So the possible configurations would then be this:
> >
> > 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> > reads from DB.
> > 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> > WriteBatch/DB. Flush on error (see note below).
> > 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> > reads from DB. Wipe state on error.
> > 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> > WriteBatch/DB.
> >
> > I believe the feature is important enough that we will see good
> > adoption even without changing the default. In 4.0, when we have seen
> > this being adopted and is battle-tested, we make READ_COMMITTED the
> > default for EOS, or even READ_COMITTED always the default, depending
> > on our experiences. And we could add a clever implementation of
> > READ_UNCOMITTED with WriteBatches later.
> >
> > The only smell here is that `default.state.isolation.level` wouldn't
> > be purely an IQ setting, but it would also (slightly) change the
> > behavior of the processing, but that seems unavoidable as long as we
> > haven't solve READ_UNCOMITTED IQ with WriteBatches.
> >
> > Minor: As for Bruno's point 4, I think if we are concerned about this
> > behavior (we don't necessarily have to be, because it doesn't violate
> > ALOS guarantees as far as I can see), we could make
> > ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing
> > the WriteBatch on error (obviously, only if we have a chance to do
> > that).
> >
> > Cheers,
> > Lucas
> >
> > On Mon, Oct 16, 2023 at 12:19 PM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> > >
> > > Hi Guozhang,
> > >
> > > The KIP as it stands introduces a new configuration,
> > > default.state.isolation.level, which is independent of processing.mode.
> > > It's intended that this new configuration be used to configure a global
> > IQ
> > > isolation level in the short term, with a future KIP introducing the
> > > capability to change the isolation level on a per-query basis, falling
> > back
> > > to the "default" defined by this config. That's why I called it
> > "default",
> > > for future-proofing.
> > >
> > > However, it currently includes the caveat that READ_UNCOMMITTED is not
> > > available under EOS. I think this is the coupling you are alluding to?
> > >
> > > This isn't intended to be a restriction of the API, but is currently a
> > > technical limitation. However, after discussing with some users about
> > > use-cases that would require READ_UNCOMMITTED under EOS, I'm inclined to
> > > remove that clause and put in the necessary work to make that combination
> > > possible now.
> > >
> > > I currently see two possible approaches:
> > >
> > >    1. Disable TX StateStores internally when the IsolationLevel is
> > >    READ_UNCOMMITTED and the processing.mode is EOS. This is more
> > difficult
> > >    than it sounds, as there are many assumptions being made throughout
> > the
> > >    internals about the guarantees StateStores provide. It would
> > definitely add
> > >    a lot of extra "if (read_uncommitted && eos)" branches, complicating
> > >    maintenance and testing.
> > >    2. Invest the time *now* to make READ_UNCOMMITTED of EOS StateStores
> > >    possible. I have some ideas on how this could be achieved, but they
> > would
> > >    need testing and could introduce some additional issues. The benefit
> > of
> > >    this approach is that it would make query-time IsolationLevels much
> > simpler
> > >    to implement in the future.
> > >
> > > Unfortunately, both will require considerable work that will further
> > delay
> > > this KIP, which was the reason I placed the restriction in the KIP in the
> > > first place.
> > >
> > > Regards,
> > > Nick
> > >
> > > On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <guozhang.wang...@gmail.com>
> > > wrote:
> > >
> > > > Hello Nick,
> > > >
> > > > First of all, thanks a lot for the great effort you've put in driving
> > > > this KIP! I really like it coming through finally, as many people in
> > > > the community have raised this. At the same time I honestly feel a bit
> > > > ashamed for not putting enough of my time supporting it and pushing it
> > > > through the finish line (you raised this KIP almost a year ago).
> > > >
> > > > I briefly passed through the DISCUSS thread so far, not sure I've 100
> > > > percent digested all the bullet points. But with the goal of trying to
> > > > help take it through the finish line in mind, I'd want to throw
> > > > thoughts on top of my head only on the point #4 above which I felt may
> > > > be the main hurdle for the current KIP to drive to a consensus now.
> > > >
> > > > The general question I asked myself is, whether we want to couple "IQ
> > > > reading mode" with "processing mode". While technically I tend to
> > > > agree with you that, it's feels like a bug if some single user chose
> > > > "EOS" for processing mode while choosing "read uncommitted" for IQ
> > > > reading mode, at the same time, I'm thinking if it's possible that
> > > > there could be two different persons (or even two teams) that would be
> > > > using the stream API to build the app, and the IQ API to query the
> > > > running state of the app. I know this is less of a technical thing but
> > > > rather a more design stuff, but if it could be ever the case, I'm
> > > > wondering if the personale using the IQ API knows about the risks of
> > > > using read uncommitted but still chose so for the favor of
> > > > performance, no matter if the underlying stream processing mode
> > > > configured by another personale is EOS or not. In that regard, I'm
> > > > leaning towards a "leaving the door open, and close it later if we
> > > > found it's a bad idea" aspect with a configuration that we can
> > > > potentially deprecate than "shut the door, clean for everyone". More
> > > > specifically, allowing the processing mode / IQ read mode to be
> > > > decoupled, and if we found that there's no such cases as I speculated
> > > > above or people started complaining a lot, we can still enforce
> > > > coupling them.
> > > >
> > > > Again, just my 2c here. Thanks again for the great patience and
> > > > diligence on this KIP.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <nick.telf...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi Bruno,
> > > > >
> > > > > 4.
> > > > > I'll hold off on making that change until we have a consensus as to
> > what
> > > > > configuration to use to control all of this, as it'll be affected by
> > the
> > > > > decision on EOS isolation levels.
> > > > >
> > > > > 5.
> > > > > Done. I've chosen "committedOffsets".
> > > > >
> > > > > Regards,
> > > > > Nick
> > > > >
> > > > > On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Nick,
> > > > > >
> > > > > > 1.
> > > > > > Yeah, you are probably right that it does not make too much sense.
> > > > > > Thanks for the clarification!
> > > > > >
> > > > > >
> > > > > > 4.
> > > > > > Yes, sorry for the back and forth, but I think for the sake of the
> > KIP
> > > > > > it is better to let the ALOS behavior as it is for now due to the
> > > > > > possible issues you would run into. Maybe we can find a solution
> > in the
> > > > > > future. Now the question returns to whether we really need
> > > > > > default.state.isolation.level. Maybe the config could be the
> > feature
> > > > > > flag Sophie requested.
> > > > > >
> > > > > >
> > > > > > 5.
> > > > > > There is a guideline in Kafka not to use the get prefix for
> > getters (at
> > > > > > least in the public API). Thus, could you please rename
> > > > > >
> > > > > > getCommittedOffset(TopicPartition partition) ->
> > > > > > committedOffsetFor(TopicPartition partition)
> > > > > >
> > > > > > You can also propose an alternative to committedOffsetFor().
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > >
> > > > > > On 10/13/23 3:21 PM, Nick Telford wrote:
> > > > > > > Hi Bruno,
> > > > > > >
> > > > > > > Thanks for getting back to me.
> > > > > > >
> > > > > > > 1.
> > > > > > > I think this should be possible. Are you thinking of the
> > situation
> > > > where
> > > > > > a
> > > > > > > user may downgrade to a previous version of Kafka Streams? In
> > that
> > > > case,
> > > > > > > sadly, the RocksDBStore would get wiped by the older version of
> > Kafka
> > > > > > > Streams anyway, because that version wouldn't understand the
> > extra
> > > > column
> > > > > > > family (that holds offsets), so the missing Position file would
> > > > > > > automatically get rebuilt when the store is rebuilt from the
> > > > changelog.
> > > > > > > Are there other situations than downgrade where a transactional
> > store
> > > > > > could
> > > > > > > be replaced by a non-transactional one? I can't think of any.
> > > > > > >
> > > > > > > 2.
> > > > > > > Ahh yes, the Test Plan - my Kryptonite! This section definitely
> > > > needs to
> > > > > > be
> > > > > > > fleshed out. I'll work on that. How much detail do you need?
> > > > > > >
> > > > > > > 3.
> > > > > > > See my previous email discussing this.
> > > > > > >
> > > > > > > 4.
> > > > > > > Hmm, this is an interesting point. Are you suggesting that under
> > ALOS
> > > > > > > READ_COMMITTED should not be supported?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Nick
> > > > > > >
> > > > > > > On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <cado...@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > >> Hi Nick,
> > > > > > >>
> > > > > > >> I think the KIP is converging!
> > > > > > >>
> > > > > > >>
> > > > > > >> 1.
> > > > > > >> I am wondering whether it makes sense to write the position file
> > > > during
> > > > > > >> close as we do for the checkpoint file, so that in case the
> > state
> > > > store
> > > > > > >> is replaced with a non-transactional state store the
> > > > non-transactional
> > > > > > >> state store finds the position file. I think, this is not
> > strictly
> > > > > > >> needed, but would be a nice behavior instead of just deleting
> > the
> > > > > > >> position file.
> > > > > > >>
> > > > > > >>
> > > > > > >> 2.
> > > > > > >> The test plan does not mention integration tests. Do you not
> > need to
> > > > > > >> extend existing ones and add new ones. Also for upgrading and
> > > > > > >> downgrading you might need integration and/or system tests.
> > > > > > >>
> > > > > > >>
> > > > > > >> 3.
> > > > > > >> I think Sophie made a point. Although, IQ reading from
> > uncommitted
> > > > data
> > > > > > >> under EOS might be considered a bug by some people. Thus, your
> > KIP
> > > > would
> > > > > > >> fix a bug rather than changing the intended behavior. However, I
> > > > also
> > > > > > >> see that a feature flag would help users that rely on this buggy
> > > > > > >> behavior (at least until AK 4.0).
> > > > > > >>
> > > > > > >>
> > > > > > >> 4.
> > > > > > >> This is related to the previous point. I assume that the
> > difference
> > > > > > >> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in
> > the
> > > > > > >> former you enable transactions on the state store and in the
> > latter
> > > > you
> > > > > > >> disable them. If my assumption is correct, I think that is an
> > issue.
> > > > > > >> Let's assume under ALOS Streams fails over a couple of times
> > more or
> > > > > > >> less at the same step in processing after value 3 is added to an
> > > > > > >> aggregation but the offset of the corresponding input record
> > was not
> > > > > > >> committed. Without transactions disabled, the aggregation value
> > > > would
> > > > > > >> increase by 3 for each failover. With transactions enabled,
> > value 3
> > > > > > >> would only be added to the aggregation once when the offset of
> > the
> > > > input
> > > > > > >> record is committed and the transaction finally completes. So
> > the
> > > > > > >> content of the state store would change depending on the
> > > > configuration
> > > > > > >> for IQ. IMO, the content of the state store should be
> > independent
> > > > from
> > > > > > >> IQ. Given this issue, I propose to not use transactions with
> > ALOS at
> > > > > > >> all. I was a big proponent of using transactions with ALOS, but
> > I
> > > > > > >> realized that transactions with ALOS is not as easy as enabling
> > > > > > >> transactions on state stores. Another aspect that is
> > problematic is
> > > > that
> > > > > > >> the changelog topic which actually replicates the state store
> > is not
> > > > > > >> transactional under ALOS. Thus, it might happen that the state
> > > > store and
> > > > > > >> the changelog differ in their content. All of this is maybe
> > solvable
> > > > > > >> somehow, but for the sake of this KIP, I would leave it for the
> > > > future.
> > > > > > >>
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Bruno
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
> > > > > > >>> Hey Nick! First of all thanks for taking up this awesome
> > feature,
> > > > I'm
> > > > > > >> sure
> > > > > > >>> every single
> > > > > > >>> Kafka Streams user and dev would agree that it is sorely
> > needed.
> > > > > > >>>
> > > > > > >>> I've just been catching up on the KIP and surrounding
> > discussion,
> > > > so
> > > > > > >> please
> > > > > > >>> forgive me
> > > > > > >>> for any misunderstandings or misinterpretations of the current
> > > > plan and
> > > > > > >>> don't hesitate to
> > > > > > >>> correct me.
> > > > > > >>>
> > > > > > >>> Before I jump in, I just want to say that having seen this
> > drag on
> > > > for
> > > > > > so
> > > > > > >>> long, my singular
> > > > > > >>> goal in responding is to help this KIP past a perceived
> > impasse so
> > > > we
> > > > > > can
> > > > > > >>> finally move on
> > > > > > >>> to voting and implementing it. Long discussions are to be
> > expected
> > > > for
> > > > > > >>> major features like
> > > > > > >>> this but it's completely on us as the Streams devs to make sure
> > > > there
> > > > > > is
> > > > > > >> an
> > > > > > >>> end in sight
> > > > > > >>> for any ongoing discussion.
> > > > > > >>>
> > > > > > >>> With that said, it's my understanding that the KIP as currently
> > > > > > proposed
> > > > > > >> is
> > > > > > >>> just not tenable
> > > > > > >>> for Kafka Streams, and would prevent some EOS users from
> > upgrading
> > > > to
> > > > > > the
> > > > > > >>> version it
> > > > > > >>> first appears in. Given that we can't predict or guarantee
> > whether
> > > > any
> > > > > > of
> > > > > > >>> the followup KIPs
> > > > > > >>> would be completed in the same release cycle as this one, we
> > need
> > > > to
> > > > > > make
> > > > > > >>> sure that the
> > > > > > >>> feature is either compatible with all current users or else
> > > > > > >> feature-flagged
> > > > > > >>> so that they may
> > > > > > >>> opt in/out.
> > > > > > >>>
> > > > > > >>> Therefore, IIUC we need to have either (or both) of these as
> > > > > > >>> fully-implemented config options:
> > > > > > >>> 1. default.state.isolation.level
> > > > > > >>> 2. enable.transactional.state.stores
> > > > > > >>>
> > > > > > >>> This way EOS users for whom read_committed semantics are not
> > > > viable can
> > > > > > >>> still upgrade,
> > > > > > >>> and either use the isolation.level config to leverage the new
> > txn
> > > > state
> > > > > > >>> stores without sacrificing
> > > > > > >>> their application semantics, or else simply keep the
> > transactional
> > > > > > state
> > > > > > >>> stores disabled until we
> > > > > > >>> are able to fully implement the isolation level configuration
> > at
> > > > either
> > > > > > >> an
> > > > > > >>> application or query level.
> > > > > > >>>
> > > > > > >>> Frankly you are the expert here and know much more about the
> > > > tradeoffs
> > > > > > in
> > > > > > >>> both semantics and
> > > > > > >>> effort level of implementing one of these configs vs the
> > other. In
> > > > my
> > > > > > >>> opinion, either option would
> > > > > > >>> be fine and I would leave the decision of which one to include
> > in
> > > > this
> > > > > > >> KIP
> > > > > > >>> completely up to you.
> > > > > > >>> I just don't see a way for the KIP to proceed without some
> > > > variation of
> > > > > > >> the
> > > > > > >>> above that would allow
> > > > > > >>> EOS users to opt-out of read_committed.
> > > > > > >>>
> > > > > > >>> (If it's all the same to you, I would recommend always
> > including a
> > > > > > >> feature
> > > > > > >>> flag in large structural
> > > > > > >>> changes like this. No matter how much I trust someone or
> > myself to
> > > > > > >>> implement a feature, you just
> > > > > > >>> never know what kind of bugs might slip in, especially with the
> > > > very
> > > > > > >> first
> > > > > > >>> iteration that gets released.
> > > > > > >>> So personally, my choice would be to add the feature flag and
> > > > leave it
> > > > > > >> off
> > > > > > >>> by default. If all goes well
> > > > > > >>> you can do a quick KIP to enable it by default as soon as the
> > > > > > >>> isolation.level config has been
> > > > > > >>> completed. But feel free to just pick whichever option is
> > easiest
> > > > or
> > > > > > >>> quickest for you to implement)
> > > > > > >>>
> > > > > > >>> Hope this helps move the discussion forward,
> > > > > > >>> Sophie
> > > > > > >>>
> > > > > > >>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <
> > > > nick.telf...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>>> Hi Bruno,
> > > > > > >>>>
> > > > > > >>>> Agreed, I can live with that for now.
> > > > > > >>>>
> > > > > > >>>> In an effort to keep the scope of this KIP from expanding, I'm
> > > > leaning
> > > > > > >>>> towards just providing a configurable
> > > > default.state.isolation.level
> > > > > > and
> > > > > > >>>> removing IsolationLevel from the StateStoreContext. This
> > would be
> > > > > > >>>> compatible with adding support for query-time IsolationLevels
> > in
> > > > the
> > > > > > >>>> future, whilst providing a way for users to select an
> > isolation
> > > > level
> > > > > > >> now.
> > > > > > >>>>
> > > > > > >>>> The big problem with this, however, is that if a user selects
> > > > > > >>>> processing.mode
> > > > > > >>>> = "exactly-once(-v2|-beta)", and
> > default.state.isolation.level =
> > > > > > >>>> "READ_UNCOMMITTED", we need to guarantee that the data isn't
> > > > written
> > > > > > to
> > > > > > >>>> disk until commit() is called, but we also need to permit IQ
> > > > threads
> > > > > > to
> > > > > > >>>> read from the ongoing transaction.
> > > > > > >>>>
> > > > > > >>>> A simple solution would be to (temporarily) forbid this
> > > > combination of
> > > > > > >>>> configuration, and have default.state.isolation.level
> > > > automatically
> > > > > > >> switch
> > > > > > >>>> to READ_COMMITTED when processing.mode is anything other than
> > > > > > >>>> at-least-once. Do you think this would be acceptable?
> > > > > > >>>>
> > > > > > >>>> In a later KIP, we can add support for query-time isolation
> > > > levels and
> > > > > > >>>> solve this particular problem there, which would relax this
> > > > > > restriction.
> > > > > > >>>>
> > > > > > >>>> Regards,
> > > > > > >>>> Nick
> > > > > > >>>>
> > > > > > >>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <
> > cado...@apache.org>
> > > > > > >> wrote:
> > > > > > >>>>
> > > > > > >>>>> Why do we need to add READ_COMMITTED to
> > InMemoryKeyValueStore? I
> > > > > > think
> > > > > > >>>>> it is perfectly valid to say InMemoryKeyValueStore do not
> > support
> > > > > > >>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the
> > de-facto
> > > > > > default
> > > > > > >>>>> at the moment.
> > > > > > >>>>>
> > > > > > >>>>> Best,
> > > > > > >>>>> Bruno
> > > > > > >>>>>
> > > > > > >>>>> On 9/18/23 7:12 PM, Nick Telford wrote:
> > > > > > >>>>>> Oh! One other concern I haven't mentioned: if we make
> > > > > > IsolationLevel a
> > > > > > >>>>>> query-time constraint, then we need to add support for
> > > > > > READ_COMMITTED
> > > > > > >>>> to
> > > > > > >>>>>> InMemoryKeyValueStore too, which will require some changes
> > to
> > > > the
> > > > > > >>>>>> implementation.
> > > > > > >>>>>>
> > > > > > >>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford <
> > > > nick.telf...@gmail.com
> > > > > > >
> > > > > > >>>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hi everyone,
> > > > > > >>>>>>>
> > > > > > >>>>>>> I agree that having IsolationLevel be determined at
> > query-time
> > > > is
> > > > > > the
> > > > > > >>>>>>> ideal design, but there are a few sticking points:
> > > > > > >>>>>>>
> > > > > > >>>>>>> 1.
> > > > > > >>>>>>> There needs to be some way to communicate the
> > IsolationLevel
> > > > down
> > > > > > to
> > > > > > >>>> the
> > > > > > >>>>>>> RocksDBStore itself, so that the query can respect it.
> > Since
> > > > stores
> > > > > > >>>> are
> > > > > > >>>>>>> "layered" in functionality (i.e. ChangeLoggingStore,
> > > > MeteredStore,
> > > > > > >>>>> etc.),
> > > > > > >>>>>>> we need some way to deliver that information to the bottom
> > > > layer.
> > > > > > For
> > > > > > >>>>> IQv2,
> > > > > > >>>>>>> we can use the existing State#query() method, but IQv1 has
> > no
> > > > way
> > > > > > to
> > > > > > >>>> do
> > > > > > >>>>>>> this.
> > > > > > >>>>>>>
> > > > > > >>>>>>> A simple approach, which would potentially open up other
> > > > options,
> > > > > > >>>> would
> > > > > > >>>>> be
> > > > > > >>>>>>> to add something like: ReadOnlyKeyValueStore<K, V>
> > > > > > >>>>>>> readOnlyView(IsolationLevel isolationLevel) to
> > > > > > ReadOnlyKeyValueStore
> > > > > > >>>>> (and
> > > > > > >>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore,
> > etc.).
> > > > > > >>>>>>>
> > > > > > >>>>>>> 2.
> > > > > > >>>>>>> As mentioned above, RocksDB WriteBatches are not
> > thread-safe,
> > > > which
> > > > > > >>>>> causes
> > > > > > >>>>>>> a problem if we want to provide READ_UNCOMMITTED
> > Iterators. I
> > > > also
> > > > > > >>>> had a
> > > > > > >>>>>>> look at RocksDB Transactions[1], but they solve a very
> > > > different
> > > > > > >>>>> problem,
> > > > > > >>>>>>> and have the same thread-safety issue.
> > > > > > >>>>>>>
> > > > > > >>>>>>> One possible approach that I mentioned is chaining
> > > > WriteBatches:
> > > > > > >> every
> > > > > > >>>>>>> time a new Interactive Query is received (i.e.
> > readOnlyView,
> > > > see
> > > > > > >>>> above,
> > > > > > >>>>>>> is called) we "freeze" the existing WriteBatch, and start a
> > > > new one
> > > > > > >>>> for
> > > > > > >>>>> new
> > > > > > >>>>>>> writes. The Interactive Query queries the "chain" of
> > previous
> > > > > > >>>>> WriteBatches
> > > > > > >>>>>>> + the underlying database; while the StreamThread starts
> > > > writing to
> > > > > > >>>> the
> > > > > > >>>>>>> *new* WriteBatch. On-commit, the StreamThread would write
> > *all*
> > > > > > >>>>>>> WriteBatches in the chain to the database (that have not
> > yet
> > > > been
> > > > > > >>>>> written).
> > > > > > >>>>>>>
> > > > > > >>>>>>> WriteBatches would be closed/freed only when they have been
> > > > both
> > > > > > >>>>>>> committed, and all open Interactive Queries on them have
> > been
> > > > > > closed.
> > > > > > >>>>> This
> > > > > > >>>>>>> would require some reference counting.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Obviously a drawback of this approach is the potential for
> > > > > > increased
> > > > > > >>>>>>> memory usage: if an Interactive Query is long-lived, for
> > > > example by
> > > > > > >>>>> doing a
> > > > > > >>>>>>> full scan over a large database, or even just pausing in
> > the
> > > > middle
> > > > > > >> of
> > > > > > >>>>> an
> > > > > > >>>>>>> iteration, then the existing chain of WriteBatches could be
> > > > kept
> > > > > > >>>> around
> > > > > > >>>>> for
> > > > > > >>>>>>> a long time, potentially forever.
> > > > > > >>>>>>>
> > > > > > >>>>>>> --
> > > > > > >>>>>>>
> > > > > > >>>>>>> A.
> > > > > > >>>>>>> Going off on a tangent, it looks like in addition to
> > supporting
> > > > > > >>>>>>> READ_COMMITTED queries, we could go further and support
> > > > > > >>>> REPEATABLE_READ
> > > > > > >>>>>>> queries (i.e. where subsequent reads to the same key in the
> > > > same
> > > > > > >>>>>>> Interactive Query are guaranteed to yield the same value)
> > by
> > > > making
> > > > > > >>>> use
> > > > > > >>>>> of
> > > > > > >>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so the
> > > > > > >> performance
> > > > > > >>>>>>> impact is likely to be negligible, but they do require
> > that the
> > > > > > >>>>> Interactive
> > > > > > >>>>>>> Query session can be explicitly closed.
> > > > > > >>>>>>>
> > > > > > >>>>>>> This could be achieved if we made the above readOnlyView
> > > > interface
> > > > > > >>>> look
> > > > > > >>>>>>> more like:
> > > > > > >>>>>>>
> > > > > > >>>>>>> interface ReadOnlyKeyValueView<K, V> implements
> > > > > > >>>> ReadOnlyKeyValueStore<K,
> > > > > > >>>>>>> V>, AutoCloseable {}
> > > > > > >>>>>>>
> > > > > > >>>>>>> interface ReadOnlyKeyValueStore<K, V> {
> > > > > > >>>>>>>        ...
> > > > > > >>>>>>>        ReadOnlyKeyValueView<K, V>
> > readOnlyView(IsolationLevel
> > > > > > >>>>> isolationLevel);
> > > > > > >>>>>>> }
> > > > > > >>>>>>>
> > > > > > >>>>>>> But this would be a breaking change, as existing IQv1
> > queries
> > > > are
> > > > > > >>>>>>> guaranteed to never call store.close(), and therefore these
> > > > would
> > > > > > >> leak
> > > > > > >>>>>>> memory under REPEATABLE_READ.
> > > > > > >>>>>>>
> > > > > > >>>>>>> B.
> > > > > > >>>>>>> One thing that's notable: MyRocks states that they support
> > > > > > >>>>> READ_COMMITTED
> > > > > > >>>>>>> and REPEATABLE_READ, but they make no mention of
> > > > > > >>>> READ_UNCOMMITTED[3][4].
> > > > > > >>>>>>> This could be because doing so is technically
> > > > difficult/impossible
> > > > > > >>>> using
> > > > > > >>>>>>> the primitives available in RocksDB.
> > > > > > >>>>>>>
> > > > > > >>>>>>> --
> > > > > > >>>>>>>
> > > > > > >>>>>>> Lucas, to address your points:
> > > > > > >>>>>>>
> > > > > > >>>>>>> U1.
> > > > > > >>>>>>> It's only "SHOULD" to permit alternative (i.e. non-RocksDB)
> > > > > > >>>>>>> implementations of StateStore that do not support atomic
> > > > writes.
> > > > > > >>>>> Obviously
> > > > > > >>>>>>> in those cases, the guarantees Kafka Streams
> > provides/expects
> > > > would
> > > > > > >> be
> > > > > > >>>>>>> relaxed. Do you think we should require all
> > implementations to
> > > > > > >> support
> > > > > > >>>>>>> atomic writes?
> > > > > > >>>>>>>
> > > > > > >>>>>>> U2.
> > > > > > >>>>>>> Stores can support multiple IsolationLevels. As we've
> > discussed
> > > > > > >> above,
> > > > > > >>>>> the
> > > > > > >>>>>>> ideal scenario would be to specify the IsolationLevel at
> > > > > > query-time.
> > > > > > >>>>>>> Failing that, I think the second-best approach is to
> > define the
> > > > > > >>>>>>> IsolationLevel for *all* queries based on the
> > processing.mode,
> > > > > > which
> > > > > > >>>> is
> > > > > > >>>>>>> what the default StateStoreContext#isolationLevel()
> > achieves.
> > > > Would
> > > > > > >>>> you
> > > > > > >>>>>>> prefer an alternative?
> > > > > > >>>>>>>
> > > > > > >>>>>>> While the existing implementation is equivalent to
> > > > > > READ_UNCOMMITTED,
> > > > > > >>>>> this
> > > > > > >>>>>>> can yield unexpected results/errors under EOS, if a
> > > > transaction is
> > > > > > >>>>> rolled
> > > > > > >>>>>>> back. While this would be a change in behaviour for users,
> > it
> > > > would
> > > > > > >>>> look
> > > > > > >>>>>>> more like a bug fix than a breaking change. That said, we
> > > > *could*
> > > > > > >> make
> > > > > > >>>>> it
> > > > > > >>>>>>> configurable, and default to the existing behaviour
> > > > > > >> (READ_UNCOMMITTED)
> > > > > > >>>>>>> instead of inferring it from the processing.mode?
> > > > > > >>>>>>>
> > > > > > >>>>>>> N1, N2.
> > > > > > >>>>>>> These were only primitives to avoid boxing costs, but since
> > > > this is
> > > > > > >>>> not
> > > > > > >>>>> a
> > > > > > >>>>>>> performance sensitive area, it should be fine to change if
> > > > that's
> > > > > > >>>>> desirable.
> > > > > > >>>>>>>
> > > > > > >>>>>>> N3.
> > > > > > >>>>>>> It's because the store "manages its own offsets", which
> > > > includes
> > > > > > both
> > > > > > >>>>>>> committing the offset, *and providing it* via
> > > > getCommittedOffset().
> > > > > > >>>>>>> Personally, I think "managesOffsets" conveys this best,
> > but I
> > > > don't
> > > > > > >>>> mind
> > > > > > >>>>>>> changing it if the nomenclature is unclear.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Sorry for the massive emails/essays!
> > > > > > >>>>>>> --
> > > > > > >>>>>>> Nick
> > > > > > >>>>>>>
> > > > > > >>>>>>> 1: https://github.com/facebook/rocksdb/wiki/Transactions
> > > > > > >>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> > > > > > >>>>>>> 3:
> > > > > > https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> > > > > > >>>>>>> 4:
> > https://mariadb.com/kb/en/myrocks-transactional-isolation/
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> > > > > > >>>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hi Nick,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> since I last read it in April, the KIP has become much
> > > > cleaner and
> > > > > > >>>>>>>> easier to read. Great work!
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> It feels to me the last big open point is whether we can
> > > > implement
> > > > > > >>>>>>>> isolation level as a query parameter. I understand that
> > there
> > > > are
> > > > > > >>>>>>>> implementation concerns, but as Colt says, it would be a
> > great
> > > > > > >>>>>>>> addition, and would also simplify the migration path for
> > this
> > > > > > >> change.
> > > > > > >>>>>>>> Is the implementation problem you mentioned caused by the
> > > > > > WriteBatch
> > > > > > >>>>>>>> not having a notion of a snapshot, as the underlying DB
> > > > iterator
> > > > > > >>>> does?
> > > > > > >>>>>>>> In that case, I am not sure a chain of WriteBatches as you
> > > > propose
> > > > > > >>>>>>>> would fully solve the problem, but maybe I didn't dig
> > enough
> > > > into
> > > > > > >> the
> > > > > > >>>>>>>> details to fully understand it.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> If it's not possible to implement it now, would it be an
> > > > option to
> > > > > > >>>>>>>> make sure in this KIP that we do not fully close the door
> > on
> > > > > > >>>> per-query
> > > > > > >>>>>>>> isolation levels in the interface, as it may be possible
> > to
> > > > > > >> implement
> > > > > > >>>>>>>> the missing primitives in RocksDB or Speedb in the future.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Understanding:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to be
> > > > persisted
> > > > > > >>>>>>>> atomically with the records?
> > > > > > >>>>>>>> * U2) Don't understand the default implementation of
> > > > > > >>>> `isolationLevel`.
> > > > > > >>>>>>>> The isolation level should be a property of the underlying
> > > > store,
> > > > > > >> and
> > > > > > >>>>>>>> not be defined by the default config? Existing stores
> > probably
> > > > > > don't
> > > > > > >>>>>>>> guarantee READ_COMMITTED, so the default should be to
> > return
> > > > > > >>>>>>>> READ_UNCOMMITTED.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Nits:
> > > > > > >>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong`
> > return
> > > > type,
> > > > > > >> to
> > > > > > >>>>>>>> avoid the `null`?
> > > > > > >>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an
> > > > `OptionalLong`
> > > > > > >>>>>>>> return type, to avoid the `-1`?
> > > > > > >>>>>>>> * N3) I don't understand why `managesOffsets` uses the
> > > > 'manage'
> > > > > > >> verb,
> > > > > > >>>>>>>> whereas all other methods use the "commits" verb. I'd
> > suggest
> > > > > > >>>>>>>> `commitsOffsets`.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Either way, it feels this KIP is very close to the finish
> > > > line,
> > > > > > I'm
> > > > > > >>>>>>>> looking forward to seeing this in production!
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Cheers,
> > > > > > >>>>>>>> Lucas
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <
> > > > c...@littlehorse.io
> > > > > > >
> > > > > > >>>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Making IsolationLevel a query-time constraint, rather
> > than
> > > > > > linking
> > > > > > >>>> it
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>> the processing.guarantee.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> As I understand it, would this allow even a user of EOS
> > to
> > > > > > control
> > > > > > >>>>>>>> whether
> > > > > > >>>>>>>>> reading committed or uncommitted records? If so, I am
> > highly
> > > > in
> > > > > > >>>> favor
> > > > > > >>>>> of
> > > > > > >>>>>>>>> this.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> I know that I was one of the early people to point out
> > the
> > > > > > current
> > > > > > >>>>>>>>> shortcoming that IQ reads uncommitted records, but just
> > this
> > > > > > >>>> morning I
> > > > > > >>>>>>>>> realized a pattern we use which means that (for certain
> > > > queries)
> > > > > > >> our
> > > > > > >>>>>>>> system
> > > > > > >>>>>>>>> needs to be able to read uncommitted records, which is
> > the
> > > > > > current
> > > > > > >>>>>>>> behavior
> > > > > > >>>>>>>>> of Kafka Streams in EOS.***
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> If IsolationLevel being a query-time decision allows for
> > > > this,
> > > > > > then
> > > > > > >>>>> that
> > > > > > >>>>>>>>> would be amazing. I would also vote that the default
> > behavior
> > > > > > >> should
> > > > > > >>>>> be
> > > > > > >>>>>>>> for
> > > > > > >>>>>>>>> reading uncommitted records, because it is totally
> > possible
> > > > for a
> > > > > > >>>>> valid
> > > > > > >>>>>>>>> application to depend on that behavior, and breaking it
> > in a
> > > > > > minor
> > > > > > >>>>>>>> release
> > > > > > >>>>>>>>> might be a bit strong.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> *** (Note, for the curious reader....) Our use-case/query
> > > > pattern
> > > > > > >>>> is a
> > > > > > >>>>>>>> bit
> > > > > > >>>>>>>>> complex, but reading "uncommitted" records is actually
> > safe
> > > > in
> > > > > > our
> > > > > > >>>>> case
> > > > > > >>>>>>>>> because processing is deterministic. Additionally, IQ
> > being
> > > > able
> > > > > > to
> > > > > > >>>>> read
> > > > > > >>>>>>>>> uncommitted records is crucial to enable "read your own
> > > > writes"
> > > > > > on
> > > > > > >>>> our
> > > > > > >>>>>>>> API:
> > > > > > >>>>>>>>> Due to the deterministic processing, we send an "ack" to
> > the
> > > > > > client
> > > > > > >>>>> who
> > > > > > >>>>>>>>> makes the request as soon as the processor processes the
> > > > result.
> > > > > > If
> > > > > > >>>>> they
> > > > > > >>>>>>>>> can't read uncommitted records, they may receive a "201 -
> > > > > > Created"
> > > > > > >>>>>>>>> response, immediately followed by a "404 - Not Found"
> > when
> > > > doing
> > > > > > a
> > > > > > >>>>>>>> lookup
> > > > > > >>>>>>>>> for the object they just created).
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks,
> > > > > > >>>>>>>>> Colt McNealy
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> *Founder, LittleHorse.dev*
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
> > > > > > >>>> nick.telf...@gmail.com>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Addendum:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> I think we would also face the same problem with the
> > > > approach
> > > > > > John
> > > > > > >>>>>>>> outlined
> > > > > > >>>>>>>>>> earlier (using the record cache as a transaction buffer
> > and
> > > > > > >>>> flushing
> > > > > > >>>>>>>> it
> > > > > > >>>>>>>>>> straight to SST files). This is because the record cache
> > > > (the
> > > > > > >>>>>>>> ThreadCache
> > > > > > >>>>>>>>>> class) is not thread-safe, so every commit would
> > invalidate
> > > > open
> > > > > > >> IQ
> > > > > > >>>>>>>>>> Iterators in the same way that RocksDB WriteBatches do.
> > > > > > >>>>>>>>>> --
> > > > > > >>>>>>>>>> Nick
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <
> > > > > > >>>> nick.telf...@gmail.com>
> > > > > > >>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> Hi Bruno,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> I've updated the KIP based on our conversation. The
> > only
> > > > things
> > > > > > >>>>>>>> I've not
> > > > > > >>>>>>>>>>> yet done are:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> 1. Using transactions under ALOS and EOS.
> > > > > > >>>>>>>>>>> 2. Making IsolationLevel a query-time constraint,
> > rather
> > > > than
> > > > > > >>>>>>>> linking it
> > > > > > >>>>>>>>>>> to the processing.guarantee.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> There's a wrinkle that makes this a challenge:
> > Interactive
> > > > > > >> Queries
> > > > > > >>>>>>>> that
> > > > > > >>>>>>>>>>> open an Iterator, when using transactions and
> > > > READ_UNCOMMITTED.
> > > > > > >>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries
> > need
> > > > to be
> > > > > > >>>> able
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>> read records from the currently uncommitted transaction
> > > > buffer
> > > > > > >>>>>>>>>>> (WriteBatch). This includes for Iterators, which should
> > > > iterate
> > > > > > >>>>>>>> both the
> > > > > > >>>>>>>>>>> transaction buffer and underlying database (using
> > > > > > >>>>>>>>>>> WriteBatch#iteratorWithBase()).
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> The issue is that when the StreamThread commits, it
> > writes
> > > > the
> > > > > > >>>>>>>> current
> > > > > > >>>>>>>>>>> WriteBatch to RocksDB *and then clears the WriteBatch*.
> > > > > > Clearing
> > > > > > >>>> the
> > > > > > >>>>>>>>>>> WriteBatch while an Interactive Query holds an open
> > > > Iterator on
> > > > > > >> it
> > > > > > >>>>>>>> will
> > > > > > >>>>>>>>>>> invalidate the Iterator. Worse, it turns out that
> > Iterators
> > > > > > over
> > > > > > >> a
> > > > > > >>>>>>>>>>> WriteBatch become invalidated not just when the
> > WriteBatch
> > > > is
> > > > > > >>>>>>>> cleared,
> > > > > > >>>>>>>>>> but
> > > > > > >>>>>>>>>>> also when the Iterators' current key receives a new
> > write.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Now that I'm writing this, I remember that this is the
> > > > major
> > > > > > >>>> reason
> > > > > > >>>>>>>> that
> > > > > > >>>>>>>>>> I
> > > > > > >>>>>>>>>>> switched the original design from having a query-time
> > > > > > >>>>>>>> IsolationLevel to
> > > > > > >>>>>>>>>>> having the IsolationLevel linked to the
> > transactionality
> > > > of the
> > > > > > >>>>>>>> stores
> > > > > > >>>>>>>>>>> themselves.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> It *might* be possible to resolve this, by having a
> > > > "chain" of
> > > > > > >>>>>>>>>>> WriteBatches, with the StreamThread switching to a new
> > > > > > WriteBatch
> > > > > > >>>>>>>>>> whenever
> > > > > > >>>>>>>>>>> a new Interactive Query attempts to read from the
> > > > database, but
> > > > > > >>>> that
> > > > > > >>>>>>>>>> could
> > > > > > >>>>>>>>>>> cause some performance problems/memory pressure when
> > > > subjected
> > > > > > to
> > > > > > >>>> a
> > > > > > >>>>>>>> high
> > > > > > >>>>>>>>>>> Interactive Query load. It would also reduce the
> > > > efficiency of
> > > > > > >>>>>>>>>> WriteBatches
> > > > > > >>>>>>>>>>> on-commit, as we'd have to write N WriteBatches, where
> > N
> > > > is the
> > > > > > >>>>>>>> number of
> > > > > > >>>>>>>>>>> Interactive Queries since the last commit.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> I realise this is getting into the weeds of the
> > > > implementation,
> > > > > > >>>> and
> > > > > > >>>>>>>> you'd
> > > > > > >>>>>>>>>>> rather we focus on the API for now, but I think it's
> > > > important
> > > > > > to
> > > > > > >>>>>>>>>> consider
> > > > > > >>>>>>>>>>> how to implement the desired API, in case we come up
> > with
> > > > an
> > > > > > API
> > > > > > >>>>>>>> that
> > > > > > >>>>>>>>>>> cannot be implemented efficiently, or even at all!
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thoughts?
> > > > > > >>>>>>>>>>> --
> > > > > > >>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <
> > > > > > cado...@apache.org
> > > > > > >>>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Hi Nick,
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 6.
> > > > > > >>>>>>>>>>>> Of course, you are right! My bad!
> > > > > > >>>>>>>>>>>> Wiping out the state in the downgrading case is fine.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 3a.
> > > > > > >>>>>>>>>>>> Focus on the public facing changes for the KIP. We
> > will
> > > > manage
> > > > > > >> to
> > > > > > >>>>>>>> get
> > > > > > >>>>>>>>>>>> the internals right. Regarding state stores that do
> > not
> > > > > > support
> > > > > > >>>>>>>>>>>> READ_COMMITTED, they should throw an error stating
> > that
> > > > they
> > > > > > do
> > > > > > >>>> not
> > > > > > >>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state
> > stores
> > > > > > >>>>>>>> immediately.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 3b.
> > > > > > >>>>>>>>>>>> I am in favor of using transactions also for ALOS.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>> Bruno
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote:
> > > > > > >>>>>>>>>>>>> Hi Bruno,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thanks for getting back to me!
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>>>> The fact that implementations can always track
> > estimated
> > > > > > memory
> > > > > > >>>>>>>> usage
> > > > > > >>>>>>>>>> in
> > > > > > >>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as an
> > > > option,
> > > > > > and
> > > > > > >>>>>>>> I'll
> > > > > > >>>>>>>>>>>> clarify
> > > > > > >>>>>>>>>>>>> the JavaDoc that 0 is not just for non-transactional
> > > > stores,
> > > > > > >>>>>>>> which is
> > > > > > >>>>>>>>>>>>> currently misleading.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 6.
> > > > > > >>>>>>>>>>>>> The problem with catching the exception in the
> > downgrade
> > > > > > >> process
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>> that
> > > > > > >>>>>>>>>>>>> would require new code in the Kafka version being
> > > > downgraded
> > > > > > >> to.
> > > > > > >>>>>>>> Since
> > > > > > >>>>>>>>>>>>> users could conceivably downgrade to almost *any*
> > older
> > > > > > version
> > > > > > >>>>>>>> of
> > > > > > >>>>>>>>>> Kafka
> > > > > > >>>>>>>>>>>>> Streams, I'm not sure how we could add that code?
> > > > > > >>>>>>>>>>>>> The only way I can think of doing it would be to
> > provide
> > > > a
> > > > > > >>>>>>>> dedicated
> > > > > > >>>>>>>>>>>>> downgrade tool, that goes through every local store
> > and
> > > > > > removes
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> offsets column families. But that seems like an
> > > > unnecessary
> > > > > > >>>>>>>> amount of
> > > > > > >>>>>>>>>>>> extra
> > > > > > >>>>>>>>>>>>> code to maintain just to handle a somewhat niche
> > > > situation,
> > > > > > >> when
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> alternative (automatically wipe and restore stores)
> > > > should be
> > > > > > >>>>>>>>>>>> acceptable.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've
> > requested.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 3a.
> > > > > > >>>>>>>>>>>>> I agree that IsolationLevel makes more sense at
> > > > query-time,
> > > > > > and
> > > > > > >>>> I
> > > > > > >>>>>>>>>>>> actually
> > > > > > >>>>>>>>>>>>> initially attempted to place the IsolationLevel at
> > > > > > query-time,
> > > > > > >>>>>>>> but I
> > > > > > >>>>>>>>>> ran
> > > > > > >>>>>>>>>>>>> into some problems:
> > > > > > >>>>>>>>>>>>> - The key issue is that, under ALOS we're not staging
> > > > writes
> > > > > > in
> > > > > > >>>>>>>>>>>>> transactions, so can't perform writes at the
> > > > READ_COMMITTED
> > > > > > >>>>>>>> isolation
> > > > > > >>>>>>>>>>>>> level. However, this may be addressed if we decide to
> > > > > > *always*
> > > > > > >>>>>>>> use
> > > > > > >>>>>>>>>>>>> transactions as discussed under 3b.
> > > > > > >>>>>>>>>>>>> - IQv1 and IQv2 have quite different
> > implementations. I
> > > > > > >> remember
> > > > > > >>>>>>>>>> having
> > > > > > >>>>>>>>>>>>> some difficulty understanding the IQv1 internals,
> > which
> > > > made
> > > > > > it
> > > > > > >>>>>>>>>>>> difficult
> > > > > > >>>>>>>>>>>>> to determine what needed to be changed. However, I
> > > > *think*
> > > > > > this
> > > > > > >>>>>>>> can be
> > > > > > >>>>>>>>>>>>> addressed for both implementations by wrapping the
> > > > > > RocksDBStore
> > > > > > >>>>>>>> in an
> > > > > > >>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides read
> > > > methods
> > > > > > >>>>>>>> (get,
> > > > > > >>>>>>>>>>>> etc.)
> > > > > > >>>>>>>>>>>>> to either read directly from the database or from the
> > > > ongoing
> > > > > > >>>>>>>>>>>> transaction.
> > > > > > >>>>>>>>>>>>> But IQv1 might still be difficult.
> > > > > > >>>>>>>>>>>>> - If IsolationLevel becomes a query constraint, then
> > all
> > > > > > other
> > > > > > >>>>>>>>>>>> StateStores
> > > > > > >>>>>>>>>>>>> will need to respect it, including the in-memory
> > stores.
> > > > This
> > > > > > >>>>>>>> would
> > > > > > >>>>>>>>>>>> require
> > > > > > >>>>>>>>>>>>> us to adapt in-memory stores to stage their writes so
> > > > they
> > > > > > can
> > > > > > >>>> be
> > > > > > >>>>>>>>>>>> isolated
> > > > > > >>>>>>>>>>>>> from READ_COMMITTTED queries. It would also become an
> > > > > > important
> > > > > > >>>>>>>>>>>>> consideration for third-party stores on upgrade, as
> > > > without
> > > > > > >>>>>>>> changes,
> > > > > > >>>>>>>>>>>> they
> > > > > > >>>>>>>>>>>>> would not support READ_COMMITTED queries correctly.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Ultimately, I may need some help making the necessary
> > > > change
> > > > > > to
> > > > > > >>>>>>>> IQv1
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>>> support this, but I don't think it's fundamentally
> > > > > > impossible,
> > > > > > >>>>>>>> if we
> > > > > > >>>>>>>>>>>> want
> > > > > > >>>>>>>>>>>>> to pursue this route.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 3b.
> > > > > > >>>>>>>>>>>>> The main reason I chose to keep ALOS un-transactional
> > > > was to
> > > > > > >>>>>>>> minimize
> > > > > > >>>>>>>>>>>>> behavioural change for most users (I believe most
> > Streams
> > > > > > users
> > > > > > >>>>>>>> use
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>> default configuration, which is ALOS). That said,
> > it's
> > > > clear
> > > > > > >>>>>>>> that if
> > > > > > >>>>>>>>>>>> ALOS
> > > > > > >>>>>>>>>>>>> also used transactional stores, the only change in
> > > > behaviour
> > > > > > >>>>>>>> would be
> > > > > > >>>>>>>>>>>> that
> > > > > > >>>>>>>>>>>>> it would become *more correct*, which could be
> > > > considered a
> > > > > > >> "bug
> > > > > > >>>>>>>> fix"
> > > > > > >>>>>>>>>> by
> > > > > > >>>>>>>>>>>>> users, rather than a change they need to handle.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> I believe that performance using transactions (aka.
> > > > RocksDB
> > > > > > >>>>>>>>>>>> WriteBatches)
> > > > > > >>>>>>>>>>>>> should actually be *better* than the un-batched
> > > > write-path
> > > > > > that
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>>>>> currently used[1]. The only "performance"
> > consideration
> > > > will
> > > > > > be
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> increased memory usage that transactions require.
> > Given
> > > > the
> > > > > > >>>>>>>>>> mitigations
> > > > > > >>>>>>>>>>>> for
> > > > > > >>>>>>>>>>>>> this memory that we have in place, I would expect
> > that
> > > > this
> > > > > > is
> > > > > > >>>>>>>> not a
> > > > > > >>>>>>>>>>>>> problem for most users.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> If we're happy to do so, we can make ALOS also use
> > > > > > >> transactions.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Link 1:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>
> > > > > > >>
> > > >
> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
> > > > > > >>>> cado...@apache.org
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Hi Nick,
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Thanks for the updates and sorry for the delay on my
> > > > side!
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 1.
> > > > > > >>>>>>>>>>>>>> Making the default implementation for flush() a
> > no-op
> > > > sounds
> > > > > > >>>>>>>> good to
> > > > > > >>>>>>>>>>>> me.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>>>>> I think what was bugging me here is that a
> > third-party
> > > > state
> > > > > > >>>>>>>> store
> > > > > > >>>>>>>>>>>> needs
> > > > > > >>>>>>>>>>>>>> to implement the state store interface. That means
> > they
> > > > need
> > > > > > >> to
> > > > > > >>>>>>>>>>>>>> implement a wrapper around the actual state store
> > as we
> > > > do
> > > > > > for
> > > > > > >>>>>>>>>> RocksDB
> > > > > > >>>>>>>>>>>>>> with RocksDBStore. So, a third-party state store can
> > > > always
> > > > > > >>>>>>>> estimate
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> uncommitted bytes, if it wants, because the wrapper
> > can
> > > > > > record
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>> added
> > > > > > >>>>>>>>>>>>>> bytes.
> > > > > > >>>>>>>>>>>>>> One case I can think of where returning -1 makes
> > sense
> > > > is
> > > > > > when
> > > > > > >>>>>>>>>> Streams
> > > > > > >>>>>>>>>>>>>> does not need to estimate the size of the write
> > batch
> > > > and
> > > > > > >>>>>>>> trigger
> > > > > > >>>>>>>>>>>>>> extraordinary commits, because the third-party state
> > > > store
> > > > > > >>>>>>>> takes care
> > > > > > >>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>> memory. But in that case the method could also just
> > > > return
> > > > > > 0.
> > > > > > >>>>>>>> Even
> > > > > > >>>>>>>>>> that
> > > > > > >>>>>>>>>>>>>> case would be better solved with a method that
> > returns
> > > > > > whether
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>> state
> > > > > > >>>>>>>>>>>>>> store manages itself the memory used for uncommitted
> > > > bytes
> > > > > > or
> > > > > > >>>>>>>> not.
> > > > > > >>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return
> > value,
> > > > I was
> > > > > > >>>>>>>> just
> > > > > > >>>>>>>>>>>>>> wondering when and if it will be used.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Regarding returning 0 for transactional state stores
> > > > when
> > > > > > the
> > > > > > >>>>>>>> batch
> > > > > > >>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>> empty, I was just wondering because you explicitly
> > > > stated
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> "or {@code 0} if this StateStore does not support
> > > > > > >>>> transactions."
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> So it seemed to me returning 0 could only happen for
> > > > > > >>>>>>>>>> non-transactional
> > > > > > >>>>>>>>>>>>>> state stores.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> a) What do you think if we move the isolation level
> > to
> > > > IQ
> > > > > > (v1
> > > > > > >>>>>>>> and
> > > > > > >>>>>>>>>> v2)?
> > > > > > >>>>>>>>>>>>>> In the end this is the only component that really
> > needs
> > > > to
> > > > > > >>>>>>>> specify
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> isolation level. It is similar to the Kafka consumer
> > > > that
> > > > > > can
> > > > > > >>>>>>>> choose
> > > > > > >>>>>>>>>>>>>> with what isolation level to read the input topic.
> > > > > > >>>>>>>>>>>>>> For IQv1 the isolation level should go into
> > > > > > >>>>>>>> StoreQueryParameters. For
> > > > > > >>>>>>>>>>>>>> IQv2, I would add it to the Query interface.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> b) Point a) raises the question what should happen
> > > > during
> > > > > > >>>>>>>>>> at-least-once
> > > > > > >>>>>>>>>>>>>> processing when the state store does not use
> > > > transactions?
> > > > > > >> John
> > > > > > >>>>>>>> in
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> past proposed to also use transactions on state
> > stores
> > > > for
> > > > > > >>>>>>>>>>>>>> at-least-once. I like that idea, because it avoids
> > > > > > aggregating
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>> same
> > > > > > >>>>>>>>>>>>>> records over and over again in the case of a
> > failure. We
> > > > > > had a
> > > > > > >>>>>>>> case
> > > > > > >>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>> the past where a Streams applications in
> > at-least-once
> > > > mode
> > > > > > >> was
> > > > > > >>>>>>>>>> failing
> > > > > > >>>>>>>>>>>>>> continuously for some reasons I do not remember
> > before
> > > > > > >>>>>>>> committing the
> > > > > > >>>>>>>>>>>>>> offsets. After each failover, the app aggregated
> > again
> > > > and
> > > > > > >>>>>>>> again the
> > > > > > >>>>>>>>>>>>>> same records. Of course the aggregate increased to
> > very
> > > > > > wrong
> > > > > > >>>>>>>> values
> > > > > > >>>>>>>>>>>>>> just because of the failover. With transactions on
> > the
> > > > state
> > > > > > >>>>>>>> stores
> > > > > > >>>>>>>>>> we
> > > > > > >>>>>>>>>>>>>> could have avoided this. The app would have output
> > the
> > > > same
> > > > > > >>>>>>>> aggregate
> > > > > > >>>>>>>>>>>>>> multiple times (i.e., after each failover) but at
> > least
> > > > the
> > > > > > >>>>>>>> value of
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> aggregate would not depend on the number of
> > failovers.
> > > > > > >>>>>>>> Outputting the
> > > > > > >>>>>>>>>>>>>> same aggregate multiple times would be incorrect
> > under
> > > > > > >>>>>>>> exactly-once
> > > > > > >>>>>>>>>> but
> > > > > > >>>>>>>>>>>>>> it is OK for at-least-once.
> > > > > > >>>>>>>>>>>>>> If it makes sense to add a config to turn on and off
> > > > > > >>>>>>>> transactions on
> > > > > > >>>>>>>>>>>>>> state stores under at-least-once or just use
> > > > transactions in
> > > > > > >>>>>>>> any case
> > > > > > >>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>> a question we should also discuss in this KIP. It
> > > > depends a
> > > > > > >> bit
> > > > > > >>>>>>>> on
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would
> > add a
> > > > > > config.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 4.
> > > > > > >>>>>>>>>>>>>> Your points are all valid. I tend to say to keep the
> > > > metrics
> > > > > > >>>>>>>> around
> > > > > > >>>>>>>>>>>>>> flush() until we remove flush() completely from the
> > > > > > interface.
> > > > > > >>>>>>>> Calls
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>> flush() might still exist since existing processors
> > > > might
> > > > > > >> still
> > > > > > >>>>>>>> call
> > > > > > >>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For
> > sure, we
> > > > need
> > > > > > >> to
> > > > > > >>>>>>>>>>>> document
> > > > > > >>>>>>>>>>>>>> how the metrics change due to the transactions in
> > the
> > > > > > upgrade
> > > > > > >>>>>>>> notes.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 5.
> > > > > > >>>>>>>>>>>>>> I see. Then you should describe how the .position
> > files
> > > > are
> > > > > > >>>>>>>> handled
> > > > > > >>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>> a dedicated section of the KIP or incorporate the
> > > > > > description
> > > > > > >>>>>>>> in the
> > > > > > >>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only
> > > > mentioning it
> > > > > > >> in
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration Plan".
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 6.
> > > > > > >>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP is a
> > > > good
> > > > > > >> idea.
> > > > > > >>>>>>>>>>>>>> Regarding downgrading, I think you could also catch
> > the
> > > > > > >>>>>>>> exception and
> > > > > > >>>>>>>>>>>> do
> > > > > > >>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the column
> > > > family.
> > > > > > See
> > > > > > >>>>>>>> here
> > > > > > >>>>>>>>>> for
> > > > > > >>>>>>>>>>>>>> an example:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > >
> > https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> It is a bit brittle, but it works.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>> Bruno
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote:
> > > > > > >>>>>>>>>>>>>>> Hi Bruno,
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Thanks for taking the time to review the KIP. I'm
> > back
> > > > from
> > > > > > >>>>>>>> leave
> > > > > > >>>>>>>>>> now
> > > > > > >>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>> intend to move this forwards as quickly as I can.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Addressing your points:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 1.
> > > > > > >>>>>>>>>>>>>>> Because flush() is part of the StateStore API, it's
> > > > exposed
> > > > > > >> to
> > > > > > >>>>>>>>>> custom
> > > > > > >>>>>>>>>>>>>>> Processors, which might be making calls to flush().
> > > > This
> > > > > > was
> > > > > > >>>>>>>>>> actually
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> case in a few integration tests.
> > > > > > >>>>>>>>>>>>>>> To maintain as much compatibility as possible, I'd
> > > > prefer
> > > > > > not
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>> make
> > > > > > >>>>>>>>>>>>>> this
> > > > > > >>>>>>>>>>>>>>> an UnsupportedOperationException, as it will cause
> > > > > > previously
> > > > > > >>>>>>>>>> working
> > > > > > >>>>>>>>>>>>>>> Processors to start throwing exceptions at runtime.
> > > > > > >>>>>>>>>>>>>>> I agree that it doesn't make sense for it to proxy
> > > > > > commit(),
> > > > > > >>>>>>>> though,
> > > > > > >>>>>>>>>>>> as
> > > > > > >>>>>>>>>>>>>>> that would cause it to violate the "StateStores
> > commit
> > > > only
> > > > > > >>>>>>>> when the
> > > > > > >>>>>>>>>>>> Task
> > > > > > >>>>>>>>>>>>>>> commits" rule.
> > > > > > >>>>>>>>>>>>>>> Instead, I think we should make this a no-op. That
> > way,
> > > > > > >>>>>>>> existing
> > > > > > >>>>>>>>>> user
> > > > > > >>>>>>>>>>>>>>> Processors will continue to work as-before, without
> > > > > > violation
> > > > > > >>>>>>>> of
> > > > > > >>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>> consistency that would be caused by premature
> > > > flush/commit
> > > > > > of
> > > > > > >>>>>>>>>>>> StateStore
> > > > > > >>>>>>>>>>>>>>> data to disk.
> > > > > > >>>>>>>>>>>>>>> What do you think?
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore
> > > > implementation
> > > > > > is
> > > > > > >>>>>>>>>>>>>>> transactional, but is unable to estimate the
> > > > uncommitted
> > > > > > >>>> memory
> > > > > > >>>>>>>>>> usage,
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> method will return -1.
> > > > > > >>>>>>>>>>>>>>> The intention here is to permit third-party
> > > > implementations
> > > > > > >>>>>>>> that may
> > > > > > >>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>> able to estimate memory usage.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been written to
> > the
> > > > > > store
> > > > > > >>>>>>>> yet. I
> > > > > > >>>>>>>>>>>>>> thought
> > > > > > >>>>>>>>>>>>>>> that was implied by "This method will return an
> > > > > > approximation
> > > > > > >>>>>>>> of the
> > > > > > >>>>>>>>>>>>>> memory
> > > > > > >>>>>>>>>>>>>>> would be freed by the next call to {@link
> > > > #commit(Map)}"
> > > > > > and
> > > > > > >>>>>>>>>> "@return
> > > > > > >>>>>>>>>>>> The
> > > > > > >>>>>>>>>>>>>>> approximate size of all records awaiting {@link
> > > > > > >>>> #commit(Map)}",
> > > > > > >>>>>>>>>>>> however,
> > > > > > >>>>>>>>>>>>>> I
> > > > > > >>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you think
> > this
> > > > is
> > > > > > >>>>>>>> unclear?
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>> I realise this is probably the most contentious
> > point
> > > > in my
> > > > > > >>>>>>>> design,
> > > > > > >>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>> I'm
> > > > > > >>>>>>>>>>>>>>> open to changing it if I'm unable to convince you
> > of
> > > > the
> > > > > > >>>>>>>> benefits.
> > > > > > >>>>>>>>>>>>>>> Nevertheless, here's my argument:
> > > > > > >>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly
> > provided
> > > > > > >>>>>>>> StateStores
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>> query, and it may be important for users to
> > > > > > programmatically
> > > > > > >>>>>>>> know
> > > > > > >>>>>>>>>>>> which
> > > > > > >>>>>>>>>>>>>>> mode the StateStore is operating under. If we
> > simply
> > > > > > provide
> > > > > > >>>> an
> > > > > > >>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the
> > internal
> > > > > > streams
> > > > > > >>>>>>>>>>>> engine), or
> > > > > > >>>>>>>>>>>>>>> similar, then users will need to understand the
> > > > operation
> > > > > > and
> > > > > > >>>>>>>>>>>>>> consequences
> > > > > > >>>>>>>>>>>>>>> of each available processing mode and how it
> > pertains
> > > > to
> > > > > > >> their
> > > > > > >>>>>>>>>>>>>> StateStore.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Interactive Query users aren't the only people that
> > > > care
> > > > > > >> about
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore:
> > > > > > implementers
> > > > > > >>>> of
> > > > > > >>>>>>>>>> custom
> > > > > > >>>>>>>>>>>>>>> StateStores also need to understand the behaviour
> > > > expected
> > > > > > of
> > > > > > >>>>>>>> their
> > > > > > >>>>>>>>>>>>>>> implementation. KIP-892 introduces some assumptions
> > > > into
> > > > > > the
> > > > > > >>>>>>>> Streams
> > > > > > >>>>>>>>>>>>>> Engine
> > > > > > >>>>>>>>>>>>>>> about how StateStores operate under each processing
> > > > mode,
> > > > > > and
> > > > > > >>>>>>>> it's
> > > > > > >>>>>>>>>>>>>>> important that custom implementations adhere to
> > those
> > > > > > >>>>>>>> assumptions in
> > > > > > >>>>>>>>>>>>>> order
> > > > > > >>>>>>>>>>>>>>> to maintain the consistency guarantees.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> IsolationLevels provide a high-level contract on
> > the
> > > > > > >> behaviour
> > > > > > >>>>>>>> of
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> StateStore: a user knows that under READ_COMMITTED,
> > > > they
> > > > > > will
> > > > > > >>>>>>>> see
> > > > > > >>>>>>>>>>>> writes
> > > > > > >>>>>>>>>>>>>>> only after the Task has committed, and under
> > > > > > READ_UNCOMMITTED
> > > > > > >>>>>>>> they
> > > > > > >>>>>>>>>>>> will
> > > > > > >>>>>>>>>>>>>> see
> > > > > > >>>>>>>>>>>>>>> writes immediately. No understanding of the
> > details of
> > > > each
> > > > > > >>>>>>>>>>>>>> processing.mode
> > > > > > >>>>>>>>>>>>>>> is required, either for IQ users or StateStore
> > > > > > implementers.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> An argument can be made that these contractual
> > > > guarantees
> > > > > > can
> > > > > > >>>>>>>> simply
> > > > > > >>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>> documented for the processing.mode (i.e. that
> > > > exactly-once
> > > > > > >> and
> > > > > > >>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and
> > > > > > at-least-once
> > > > > > >>>>>>>> behaves
> > > > > > >>>>>>>>>>>> like
> > > > > > >>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small
> > issues
> > > > with
> > > > > > >>>>>>>> this I'd
> > > > > > >>>>>>>>>>>>>> prefer
> > > > > > >>>>>>>>>>>>>>> to avoid:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>         - Where would we document these contracts,
> > in
> > > > a way
> > > > > > >> that
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>>>> difficult
> > > > > > >>>>>>>>>>>>>>>         for users/implementers to miss/ignore?
> > > > > > >>>>>>>>>>>>>>>         - It's not clear to users that the
> > processing
> > > > mode
> > > > > > is
> > > > > > >>>>>>>>>>>> communicating
> > > > > > >>>>>>>>>>>>>>>         an expectation of read isolation, unless
> > they
> > > > read
> > > > > > the
> > > > > > >>>>>>>>>>>>>> documentation. Users
> > > > > > >>>>>>>>>>>>>>>         rarely consult documentation unless they
> > feel
> > > > they
> > > > > > >> need
> > > > > > >>>>>>>> to, so
> > > > > > >>>>>>>>>>>> it's
> > > > > > >>>>>>>>>>>>>> likely
> > > > > > >>>>>>>>>>>>>>>         this detail would get missed by many users.
> > > > > > >>>>>>>>>>>>>>>         - It tightly couples processing modes to
> > read
> > > > > > >> isolation.
> > > > > > >>>>>>>> Adding
> > > > > > >>>>>>>>>>>> new
> > > > > > >>>>>>>>>>>>>>>         processing modes, or changing the read
> > > > isolation of
> > > > > > >>>>>>>> existing
> > > > > > >>>>>>>>>>>>>> processing
> > > > > > >>>>>>>>>>>>>>>         modes would be difficult/impossible.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Ultimately, the cost of introducing
> > IsolationLevels is
> > > > > > just a
> > > > > > >>>>>>>> single
> > > > > > >>>>>>>>>>>>>>> method, since we re-use the existing IsolationLevel
> > > > enum
> > > > > > from
> > > > > > >>>>>>>> Kafka.
> > > > > > >>>>>>>>>>>> This
> > > > > > >>>>>>>>>>>>>>> gives us a clear place to document the contractual
> > > > > > guarantees
> > > > > > >>>>>>>>>> expected
> > > > > > >>>>>>>>>>>>>>> of/provided by StateStores, that is accessible
> > both by
> > > > the
> > > > > > >>>>>>>>>> StateStore
> > > > > > >>>>>>>>>>>>>>> itself, and by IQ users.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> (Writing this I've just realised that the
> > StateStore
> > > > and IQ
> > > > > > >>>>>>>> APIs
> > > > > > >>>>>>>>>>>> actually
> > > > > > >>>>>>>>>>>>>>> don't provide access to StateStoreContext that IQ
> > users
> > > > > > would
> > > > > > >>>>>>>> have
> > > > > > >>>>>>>>>>>> direct
> > > > > > >>>>>>>>>>>>>>> access to... Perhaps StateStore should expose
> > > > > > >> isolationLevel()
> > > > > > >>>>>>>>>> itself
> > > > > > >>>>>>>>>>>>>> too?)
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 4.
> > > > > > >>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics
> > in-place
> > > > > > >>>>>>>> either, as
> > > > > > >>>>>>>>>>>> it's a
> > > > > > >>>>>>>>>>>>>>> backwards incompatible change. My concern is that,
> > if
> > > > we
> > > > > > >> leave
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>>> existing
> > > > > > >>>>>>>>>>>>>>> "flush" metrics in place, they will be confusing to
> > > > users.
> > > > > > >>>>>>>> Right
> > > > > > >>>>>>>>>> now,
> > > > > > >>>>>>>>>>>>>>> "flush" metrics record explicit flushes to disk,
> > but
> > > > under
> > > > > > >>>>>>>> KIP-892,
> > > > > > >>>>>>>>>>>> even
> > > > > > >>>>>>>>>>>>>> a
> > > > > > >>>>>>>>>>>>>>> commit() will not explicitly flush data to disk -
> > > > RocksDB
> > > > > > >> will
> > > > > > >>>>>>>>>> decide
> > > > > > >>>>>>>>>>>> on
> > > > > > >>>>>>>>>>>>>>> when to flush memtables to disk itself.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd have
> > two
> > > > > > >>>> options,
> > > > > > >>>>>>>>>> which
> > > > > > >>>>>>>>>>>>>> both
> > > > > > >>>>>>>>>>>>>>> seem pretty bad to me:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>         1. Have them record calls to commit(),
> > which
> > > > would
> > > > > > be
> > > > > > >>>>>>>>>>>> misleading, as
> > > > > > >>>>>>>>>>>>>>>         data is no longer explicitly "flushed" to
> > disk
> > > > by
> > > > > > this
> > > > > > >>>>>>>> call.
> > > > > > >>>>>>>>>>>>>>>         2. Have them record nothing at all, which
> > is
> > > > > > >> equivalent
> > > > > > >>>> to
> > > > > > >>>>>>>>>>>> removing
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>         metrics, except that users will see the
> > metric
> > > > > > still
> > > > > > >>>>>>>> exists and
> > > > > > >>>>>>>>>>>> so
> > > > > > >>>>>>>>>>>>>> assume
> > > > > > >>>>>>>>>>>>>>>         that the metric is correct, and that
> > there's a
> > > > > > problem
> > > > > > >>>>>>>> with
> > > > > > >>>>>>>>>> their
> > > > > > >>>>>>>>>>>>>> system
> > > > > > >>>>>>>>>>>>>>>         when there isn't.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> I agree that removing them is also a bad solution,
> > and
> > > > I'd
> > > > > > >>>>>>>> like some
> > > > > > >>>>>>>>>>>>>>> guidance on the best path forward here.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 5.
> > > > > > >>>>>>>>>>>>>>> Position files are updated on every write to a
> > > > StateStore.
> > > > > > >>>>>>>> Since our
> > > > > > >>>>>>>>>>>>>> writes
> > > > > > >>>>>>>>>>>>>>> are now buffered until commit(), we can't update
> > the
> > > > > > Position
> > > > > > >>>>>>>> file
> > > > > > >>>>>>>>>>>> until
> > > > > > >>>>>>>>>>>>>>> commit() has been called, otherwise it would be
> > > > > > inconsistent
> > > > > > >>>>>>>> with
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> data
> > > > > > >>>>>>>>>>>>>>> in the event of a rollback. Consequently, we need
> > to
> > > > manage
> > > > > > >>>>>>>> these
> > > > > > >>>>>>>>>>>> offsets
> > > > > > >>>>>>>>>>>>>>> the same way we manage the checkpoint offsets, and
> > > > ensure
> > > > > > >>>>>>>> they're
> > > > > > >>>>>>>>>> only
> > > > > > >>>>>>>>>>>>>>> written on commit().
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 6.
> > > > > > >>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what
> > tests to
> > > > > > >> write.
> > > > > > >>>>>>>> How
> > > > > > >>>>>>>>>>>>>> explicit
> > > > > > >>>>>>>>>>>>>>> do we need to be here in the KIP?
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed to be
> > > > > > seamless,
> > > > > > >>>>>>>> and we
> > > > > > >>>>>>>>>>>>>> should
> > > > > > >>>>>>>>>>>>>>> definitely add some tests around that. Downgrade,
> > it
> > > > > > >>>>>>>> transpires,
> > > > > > >>>>>>>>>> isn't
> > > > > > >>>>>>>>>>>>>>> currently possible, as the extra column family for
> > > > offset
> > > > > > >>>>>>>> storage is
> > > > > > >>>>>>>>>>>>>>> incompatible with the pre-KIP-892 implementation:
> > when
> > > > you
> > > > > > >>>>>>>> open a
> > > > > > >>>>>>>>>>>> RocksDB
> > > > > > >>>>>>>>>>>>>>> database, you must open all available column
> > families
> > > > or
> > > > > > >>>>>>>> receive an
> > > > > > >>>>>>>>>>>>>> error.
> > > > > > >>>>>>>>>>>>>>> What currently happens on downgrade is that it
> > > > attempts to
> > > > > > >>>>>>>> open the
> > > > > > >>>>>>>>>>>>>> store,
> > > > > > >>>>>>>>>>>>>>> throws an error about the offsets column family not
> > > > being
> > > > > > >>>>>>>> opened,
> > > > > > >>>>>>>>>>>> which
> > > > > > >>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given that
> > > > > > >> downgrades
> > > > > > >>>>>>>>>> should
> > > > > > >>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>> uncommon, I think this is acceptable behaviour, as
> > the
> > > > > > >>>>>>>> end-state is
> > > > > > >>>>>>>>>>>>>>> consistent, even if it results in an undesirable
> > state
> > > > > > >>>> restore.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Should I document the upgrade/downgrade behaviour
> > > > > > explicitly
> > > > > > >>>>>>>> in the
> > > > > > >>>>>>>>>>>> KIP?
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> --
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
> > > > > > >>>>>>>> cado...@apache.org>
> > > > > > >>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Hi Nick!
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Thanks for the updates!
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> 1.
> > > > > > >>>>>>>>>>>>>>>> Why does StateStore#flush() default to
> > > > > > >>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())?
> > > > > > >>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore
> > after
> > > > this
> > > > > > KIP
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>> released, I would rather throw an unsupported
> > > > operation
> > > > > > >>>>>>>> exception
> > > > > > >>>>>>>>>> by
> > > > > > >>>>>>>>>>>>>>>> default.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>>>>>>> When would a state store return -1 from
> > > > > > >>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() while
> > > > being
> > > > > > >>>>>>>>>>>> transactional?
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Wouldn't
> > StateStore#approximateNumUncommittedBytes()
> > > > also
> > > > > > >>>>>>>> return 0
> > > > > > >>>>>>>>>> if
> > > > > > >>>>>>>>>>>>>>>> the state store is transactional but nothing has
> > been
> > > > > > >> written
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>> state store yet?
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this KIP
> > really
> > > > > > need
> > > > > > >>>> to
> > > > > > >>>>>>>>>>>> introduce
> > > > > > >>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()?
> > StateStoreContext
> > > > has
> > > > > > >>>>>>>> already
> > > > > > >>>>>>>>>>>>>>>> appConfigs() which basically exposes the same
> > > > information,
> > > > > > >>>>>>>> i.e., if
> > > > > > >>>>>>>>>>>> EOS
> > > > > > >>>>>>>>>>>>>>>> is enabled or not.
> > > > > > >>>>>>>>>>>>>>>> In one of your previous e-mails you wrote:
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore
> > interface
> > > > as
> > > > > > >>>>>>>> loosely
> > > > > > >>>>>>>>>>>> coupled
> > > > > > >>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> > > > implementers
> > > > > > >>>> more
> > > > > > >>>>>>>>>>>> freedom,
> > > > > > >>>>>>>>>>>>>>>> and reduce the amount of internal knowledge
> > required."
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> While I understand the intent, I doubt that it
> > > > decreases
> > > > > > the
> > > > > > >>>>>>>>>>>> coupling of
> > > > > > >>>>>>>>>>>>>>>> a StateStore interface and the Streams engine.
> > > > > > >> READ_COMMITTED
> > > > > > >>>>>>>> only
> > > > > > >>>>>>>>>>>>>>>> applies to IQ but not to reads by processors.
> > Thus,
> > > > > > >>>>>>>> implementers
> > > > > > >>>>>>>>>>>> need to
> > > > > > >>>>>>>>>>>>>>>> understand how Streams accesses the state stores.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> I would like to hear what others think about this.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> 4.
> > > > > > >>>>>>>>>>>>>>>> Great exposing new metrics for transactional state
> > > > stores!
> > > > > > >>>>>>>>>> However, I
> > > > > > >>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate (in
> > the
> > > > > > docs)
> > > > > > >>>>>>>> the old
> > > > > > >>>>>>>>>>>>>>>> ones. You can find examples of deprecated metrics
> > > > here:
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > https://kafka.apache.org/documentation/#selector_monitoring
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> 5.
> > > > > > >>>>>>>>>>>>>>>> Why does the KIP mention position files? I do not
> > > > think
> > > > > > they
> > > > > > >>>>>>>> are
> > > > > > >>>>>>>>>>>> related
> > > > > > >>>>>>>>>>>>>>>> to transactions or flushes.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> 6.
> > > > > > >>>>>>>>>>>>>>>> I think we will also need to adapt/add integration
> > > > tests
> > > > > > >>>>>>>> besides
> > > > > > >>>>>>>>>> unit
> > > > > > >>>>>>>>>>>>>>>> tests. Additionally, we probably need integration
> > or
> > > > > > system
> > > > > > >>>>>>>> tests
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>> verify that upgrades and downgrades between
> > > > transactional
> > > > > > >> and
> > > > > > >>>>>>>>>>>>>>>> non-transactional state stores work as expected.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>>> Bruno
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote:
> > > > > > >>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in the
> > KIP,
> > > > > > under
> > > > > > >>>>>>>>>>>> "Rejected
> > > > > > >>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea worth
> > > > pursuing,
> > > > > > >>>>>>>> but I
> > > > > > >>>>>>>>>>>> believe
> > > > > > >>>>>>>>>>>>>>>>> that it's out of the scope of this KIP, because
> > it
> > > > > > solves a
> > > > > > >>>>>>>>>>>> different
> > > > > > >>>>>>>>>>>>>> set
> > > > > > >>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of this
> > one
> > > > has
> > > > > > >>>>>>>> already
> > > > > > >>>>>>>>>> grown
> > > > > > >>>>>>>>>>>>>>>> quite
> > > > > > >>>>>>>>>>>>>>>>> large!
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
> > > > > > >>>>>>>>>> nick.telf...@gmail.com>
> > > > > > >>>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> Hi everyone,
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> I've updated the KIP (
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > > > > >>>>>>>>>>>>>>>> )
> > > > > > >>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing back
> > > > "Atomic
> > > > > > >>>>>>>>>>>> Checkpointing"
> > > > > > >>>>>>>>>>>>>>>> (for
> > > > > > >>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think the one
> > > > thing
> > > > > > >>>>>>>> missing is
> > > > > > >>>>>>>>>>>> some
> > > > > > >>>>>>>>>>>>>>>>>> changes to metrics (notably the store "flush"
> > > > metrics
> > > > > > will
> > > > > > >>>>>>>> need
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>>>>> renamed to "commit").
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> The reason I brought back Atomic Checkpointing
> > was
> > > > to
> > > > > > >>>>>>>> decouple
> > > > > > >>>>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>>> flush
> > > > > > >>>>>>>>>>>>>>>>>> from store commit. This is important, because
> > with
> > > > > > >>>>>>>> Transactional
> > > > > > >>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on
> > *every*
> > > > Task
> > > > > > >>>>>>>> commit,
> > > > > > >>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>>>>>> just when the StateStore is closing, otherwise
> > our
> > > > > > >>>>>>>> transaction
> > > > > > >>>>>>>>>>>> buffer
> > > > > > >>>>>>>>>>>>>>>> will
> > > > > > >>>>>>>>>>>>>>>>>> never be written and persisted, instead growing
> > > > > > unbounded!
> > > > > > >>>> I
> > > > > > >>>>>>>>>>>>>>>> experimented
> > > > > > >>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a store
> > > > flush
> > > > > > >>>>>>>> whenever
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its
> > > > configured
> > > > > > >>>>>>>> size, but
> > > > > > >>>>>>>>>>>> this
> > > > > > >>>>>>>>>>>>>>>> was
> > > > > > >>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer
> > from
> > > > being
> > > > > > >>>>>>>>>> configured
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>>>>> unbounded, and it still would have required
> > explicit
> > > > > > >>>>>>>> flushes of
> > > > > > >>>>>>>>>>>>>> RocksDB,
> > > > > > >>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory
> > > > utilization.
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the "right"
> > way
> > > > to
> > > > > > >>>>>>>> resolve
> > > > > > >>>>>>>>>> this
> > > > > > >>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog offsets
> > that
> > > > > > >>>>>>>> correspond
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>> most
> > > > > > >>>>>>>>>>>>>>>>>> recently written records are always atomically
> > > > written
> > > > > > to
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>>> StateStore
> > > > > > >>>>>>>>>>>>>>>>>> (by writing them to the same transaction
> > buffer),
> > > > we can
> > > > > > >>>>>>>> avoid
> > > > > > >>>>>>>>>>>>>> forcibly
> > > > > > >>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk, letting
> > > > RocksDB
> > > > > > >>>>>>>> flush
> > > > > > >>>>>>>>>> them
> > > > > > >>>>>>>>>>>>>> only
> > > > > > >>>>>>>>>>>>>>>>>> when necessary, without losing any of our
> > > > consistency
> > > > > > >>>>>>>> guarantees.
> > > > > > >>>>>>>>>>>> See
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>> updated KIP for more info.
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> I have fully implemented these changes,
> > although I'm
> > > > > > still
> > > > > > >>>>>>>> not
> > > > > > >>>>>>>>>>>>>> entirely
> > > > > > >>>>>>>>>>>>>>>>>> happy with the implementation for segmented
> > > > StateStores,
> > > > > > >> so
> > > > > > >>>>>>>> I
> > > > > > >>>>>>>>>> plan
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests pass. If
> > > > you'd
> > > > > > like
> > > > > > >>>>>>>> to try
> > > > > > >>>>>>>>>>>> out
> > > > > > >>>>>>>>>>>>>> or
> > > > > > >>>>>>>>>>>>>>>>>> review this highly experimental and incomplete
> > > > branch,
> > > > > > >> it's
> > > > > > >>>>>>>>>>>> available
> > > > > > >>>>>>>>>>>>>>>> here:
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0
> > > > > > .
> > > > > > >>>>>>>> Note:
> > > > > > >>>>>>>>>>>> it's
> > > > > > >>>>>>>>>>>>>>>> built
> > > > > > >>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable base
> > to
> > > > build
> > > > > > >>>>>>>> and test
> > > > > > >>>>>>>>>>>> it
> > > > > > >>>>>>>>>>>>>> on,
> > > > > > >>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples comparisons
> > in a
> > > > > > live
> > > > > > >>>>>>>>>>>>>> environment. I
> > > > > > >>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's nearer
> > > > > > >> completion
> > > > > > >>>>>>>> and
> > > > > > >>>>>>>>>> has
> > > > > > >>>>>>>>>>>>>> been
> > > > > > >>>>>>>>>>>>>>>>>> proven on our main application.
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing and
> > > > testing:
> > > > > > >>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window)
> > stores
> > > > > > >>>>>>>>>>>>>>>>>> - Global stores
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> As I do not currently use either of these, so my
> > > > primary
> > > > > > >>>>>>>> test
> > > > > > >>>>>>>>>>>>>>>> environment
> > > > > > >>>>>>>>>>>>>>>>>> doesn't test these areas.
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next week
> > for
> > > > a few
> > > > > > >>>>>>>> weeks,
> > > > > > >>>>>>>>>> so
> > > > > > >>>>>>>>>>>>>> will
> > > > > > >>>>>>>>>>>>>>>>>> not have time to move this forward until late
> > > > August.
> > > > > > That
> > > > > > >>>>>>>> said,
> > > > > > >>>>>>>>>>>> your
> > > > > > >>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just
> > won't be
> > > > > > able
> > > > > > >>>> to
> > > > > > >>>>>>>>>>>> respond
> > > > > > >>>>>>>>>>>>>> as
> > > > > > >>>>>>>>>>>>>>>>>> quickly as usual.
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
> > > > > > >>>>>>>>>> nick.telf...@gmail.com>
> > > > > > >>>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> Hi Bruno
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact on IQ
> > is
> > > > not
> > > > > > >>>>>>>> something
> > > > > > >>>>>>>>>> I
> > > > > > >>>>>>>>>>>> had
> > > > > > >>>>>>>>>>>>>>>>>>> considered.
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> What about atomically updating the state store
> > > > from the
> > > > > > >>>>>>>>>>>> transaction
> > > > > > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the
> > > > > > checkpoint
> > > > > > >>>>>>>> (thus,
> > > > > > >>>>>>>>>>>>>>>> flushing
> > > > > > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data
> > > > and/or
> > > > > > >>>>>>>> number of
> > > > > > >>>>>>>>>>>>>> commit
> > > > > > >>>>>>>>>>>>>>>>>>>> intervals?
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you suggesting
> > > > that we
> > > > > > >>>>>>>> add an
> > > > > > >>>>>>>>>>>>>>>> additional
> > > > > > >>>>>>>>>>>>>>>>>>> config for the max number of commit intervals
> > > > between
> > > > > > >>>>>>>>>> checkpoints?
> > > > > > >>>>>>>>>>>>>> That
> > > > > > >>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the
> > > > transaction
> > > > > > >>>>>>>> buffers
> > > > > > >>>>>>>>>> are
> > > > > > >>>>>>>>>>>>>>>> nearly
> > > > > > >>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of commit
> > > > > > intervals
> > > > > > >>>>>>>> have
> > > > > > >>>>>>>>>>>>>> elapsed,
> > > > > > >>>>>>>>>>>>>>>>>>> whichever comes first?
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although this
> > > > > > re-ignites
> > > > > > >>>>>>>> an
> > > > > > >>>>>>>>>>>> earlier
> > > > > > >>>>>>>>>>>>>>>>>>> debate about whether a config should be
> > measured in
> > > > > > >>>>>>>> "number of
> > > > > > >>>>>>>>>>>> commit
> > > > > > >>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute time.
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the reason
> > I
> > > > was
> > > > > > >>>>>>>> pursuing
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>> Atomic
> > > > > > >>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable flush
> > from
> > > > > > >>>>>>>> checkpointing,
> > > > > > >>>>>>>>>>>>>> which
> > > > > > >>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every commit
> > > > without
> > > > > > any
> > > > > > >>>>>>>>>>>> performance
> > > > > > >>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely the
> > > > "best"
> > > > > > >>>>>>>> solution,
> > > > > > >>>>>>>>>>>> but
> > > > > > >>>>>>>>>>>>>>>> I'm not
> > > > > > >>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back into
> > this
> > > > KIP.
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> I'm currently working on moving all the
> > > > transactional
> > > > > > >>>> logic
> > > > > > >>>>>>>>>>>> directly
> > > > > > >>>>>>>>>>>>>>>> into
> > > > > > >>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with the
> > > > > > >>>>>>>>>>>>>> StateStore#newTransaction
> > > > > > >>>>>>>>>>>>>>>>>>> method, and reduces the number of new classes
> > > > > > introduced,
> > > > > > >>>>>>>>>>>>>> significantly
> > > > > > >>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and the
> > > > > > complexity
> > > > > > >>>> is
> > > > > > >>>>>>>>>>>>>> drastically
> > > > > > >>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic
> > Checkpoints
> > > > > > into
> > > > > > >>>>>>>> this
> > > > > > >>>>>>>>>> KIP.
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <
> > > > > > >>>>>>>> cado...@apache.org>
> > > > > > >>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> Hi Nick,
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting!
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> As far as I understand, you want to atomically
> > > > update
> > > > > > >> the
> > > > > > >>>>>>>> state
> > > > > > >>>>>>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the
> > memtable
> > > > of a
> > > > > > >>>> state
> > > > > > >>>>>>>>>> store
> > > > > > >>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit time
> > > > elapsed
> > > > > > >>>> but
> > > > > > >>>>>>>>>> after
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that would
> > lead
> > > > to
> > > > > > >>>>>>>> exceeding
> > > > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > before the
> > > > > > next
> > > > > > >>>>>>>> commit
> > > > > > >>>>>>>>>>>>>>>> interval
> > > > > > >>>>>>>>>>>>>>>>>>>> ends.
> > > > > > >>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would commit
> > > > every
> > > > > > >>>>>>>> commit
> > > > > > >>>>>>>>>>>> interval
> > > > > > >>>>>>>>>>>>>>>> but
> > > > > > >>>>>>>>>>>>>>>>>>>> the state store will only be atomically
> > updated
> > > > > > roughly
> > > > > > >>>>>>>> every
> > > > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of
> > data.
> > > > Also
> > > > > > IQ
> > > > > > >>>>>>>> would
> > > > > > >>>>>>>>>>>> then
> > > > > > >>>>>>>>>>>>>>>> only
> > > > > > >>>>>>>>>>>>>>>>>>>> see new data roughly every
> > > > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > > > > > >>>>>>>>>>>>>>>>>>>> After a failure the state store needs to
> > restore
> > > > up to
> > > > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> Is this correct?
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> What about atomically updating the state store
> > > > from
> > > > > > the
> > > > > > >>>>>>>>>>>> transaction
> > > > > > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the
> > > > > > checkpoint
> > > > > > >>>>>>>> (thus,
> > > > > > >>>>>>>>>>>>>>>> flushing
> > > > > > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data
> > > > and/or
> > > > > > >>>>>>>> number of
> > > > > > >>>>>>>>>>>>>> commit
> > > > > > >>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have the
> > same
> > > > delay
> > > > > > >>>> for
> > > > > > >>>>>>>>>>>> records
> > > > > > >>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because both
> > > > would
> > > > > > >>>>>>>> appear
> > > > > > >>>>>>>>>> when
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However,
> > after a
> > > > > > failure
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>> state
> > > > > > >>>>>>>>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>>>>>>> still needs to restore up to
> > > > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > > > > > >>>>>>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>>>>>>> it might restore data that is already in the
> > state
> > > > > > store
> > > > > > >>>>>>>>>> because
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable offset
> > > > (i.e.
> > > > > > the
> > > > > > >>>>>>>> last
> > > > > > >>>>>>>>>>>>>> committed
> > > > > > >>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring
> > data
> > > > that
> > > > > > is
> > > > > > >>>>>>>> already
> > > > > > >>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should not
> > > > violated.
> > > > > > >>>>>>>>>>>>>>>>>>>> This solution needs at least one new config to
> > > > specify
> > > > > > >>>>>>>> when a
> > > > > > >>>>>>>>>>>>>>>> checkpoint
> > > > > > >>>>>>>>>>>>>>>>>>>> should be written.
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail
> > that
> > > > does
> > > > > > not
> > > > > > >>>>>>>> change
> > > > > > >>>>>>>>>>>>>>>> anything
> > > > > > >>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit
> > interval
> > > > is 30
> > > > > > >>>>>>>> seconds,
> > > > > > >>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>>>> five
> > > > > > >>>>>>>>>>>>>>>>>>>> seconds.
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>>>>>>> Bruno
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
> > > > > > >>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch on
> > our
> > > > > > staging
> > > > > > >>>>>>>>>>>>>> environment,
> > > > > > >>>>>>>>>>>>>>>>>>>>> putting it through its paces in our
> > non-trivial
> > > > > > >>>>>>>> application.
> > > > > > >>>>>>>>>> I'm
> > > > > > >>>>>>>>>>>>>>>>>>>> already
> > > > > > >>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate that
> > we
> > > > saw
> > > > > > the
> > > > > > >>>>>>>> last
> > > > > > >>>>>>>>>>>> time
> > > > > > >>>>>>>>>>>>>> we
> > > > > > >>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP, but
> > this
> > > > > > time,
> > > > > > >> I
> > > > > > >>>>>>>>>> think I
> > > > > > >>>>>>>>>>>>>> know
> > > > > > >>>>>>>>>>>>>>>>>>>> why.
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is
> > > > called
> > > > > > at
> > > > > > >>>>>>>> the end
> > > > > > >>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> Task
> > > > > > >>>>>>>>>>>>>>>>>>>>> commit process, has the following behaviour:
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>           - Under ALOS: checkpoint the state
> > > > stores.
> > > > > > >> This
> > > > > > >>>>>>>>>> includes
> > > > > > >>>>>>>>>>>>>>>>>>>>>           flushing memtables in RocksDB.
> > This is
> > > > > > >>>> acceptable
> > > > > > >>>>>>>>>>>> because the
> > > > > > >>>>>>>>>>>>>>>>>>>> default
> > > > > > >>>>>>>>>>>>>>>>>>>>>           commit.interval.ms is 5 seconds,
> > so
> > > > > > forcibly
> > > > > > >>>>>>>> flushing
> > > > > > >>>>>>>>>>>>>> memtables
> > > > > > >>>>>>>>>>>>>>>>>>>> every 5
> > > > > > >>>>>>>>>>>>>>>>>>>>>           seconds is acceptable for most
> > > > > > applications.
> > > > > > >>>>>>>>>>>>>>>>>>>>>           - Under EOS: checkpointing is not
> > done,
> > > > > > >> *unless*
> > > > > > >>>>>>>> it's
> > > > > > >>>>>>>>>>>> being
> > > > > > >>>>>>>>>>>>>>>>>>>> forced, due
> > > > > > >>>>>>>>>>>>>>>>>>>>>           to e.g. the Task closing or being
> > > > revoked.
> > > > > > >> This
> > > > > > >>>>>>>> means
> > > > > > >>>>>>>>>>>> that
> > > > > > >>>>>>>>>>>>>> under
> > > > > > >>>>>>>>>>>>>>>>>>>> normal
> > > > > > >>>>>>>>>>>>>>>>>>>>>           processing conditions, the state
> > stores
> > > > > > will
> > > > > > >> not
> > > > > > >>>>>>>> be
> > > > > > >>>>>>>>>>>>>>>> checkpointed,
> > > > > > >>>>>>>>>>>>>>>>>>>> and will
> > > > > > >>>>>>>>>>>>>>>>>>>>>           not have memtables flushed at all ,
> > > > unless
> > > > > > >>>> RocksDB
> > > > > > >>>>>>>>>>>> decides to
> > > > > > >>>>>>>>>>>>>>>>>>>> flush them on
> > > > > > >>>>>>>>>>>>>>>>>>>>>           its own. Checkpointing stores and
> > > > > > >> force-flushing
> > > > > > >>>>>>>> their
> > > > > > >>>>>>>>>>>>>> memtables
> > > > > > >>>>>>>>>>>>>>>>>>>> is only
> > > > > > >>>>>>>>>>>>>>>>>>>>>           done when a Task is being closed.
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint
> > stores on
> > > > at
> > > > > > >>>> least
> > > > > > >>>>>>>>>> *some*
> > > > > > >>>>>>>>>>>>>>>> normal
> > > > > > >>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the RocksDB
> > > > > > transaction
> > > > > > >>>>>>>>>> buffers
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets are
> > > > synced to
> > > > > > >>>>>>>> disk to
> > > > > > >>>>>>>>>>>>>> prevent
> > > > > > >>>>>>>>>>>>>>>>>>>>> restores from getting out of hand.
> > Consequently,
> > > > my
> > > > > > >>>>>>>> current
> > > > > > >>>>>>>>>>>>>>>>>>>> implementation
> > > > > > >>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task commit,
> > > > which
> > > > > > is
> > > > > > >>>>>>>> far too
> > > > > > >>>>>>>>>>>>>>>>>>>> frequent.
> > > > > > >>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000 records,
> > > > which
> > > > > > is
> > > > > > >> a
> > > > > > >>>>>>>>>> change
> > > > > > >>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>>>>>>>> flush
> > > > > > >>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing performance
> > > > problems
> > > > > > for
> > > > > > >>>>>>>> some
> > > > > > >>>>>>>>>>>>>>>>>>>> applications.
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and I'm
> > > > > > currently
> > > > > > >>>>>>>> leaning
> > > > > > >>>>>>>>>>>>>>>> towards
> > > > > > >>>>>>>>>>>>>>>>>>>>> using the
> > statestore.transaction.buffer.max.bytes
> > > > > > >>>>>>>>>> configuration
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to
> > exceed it.
> > > > > > This
> > > > > > >>>>>>>> would
> > > > > > >>>>>>>>>>>>>>>>>>>> complement the
> > > > > > >>>>>>>>>>>>>>>>>>>>> existing "early Task commit" functionality
> > that
> > > > this
> > > > > > >>>>>>>>>>>> configuration
> > > > > > >>>>>>>>>>>>>>>>>>>>> provides, in the following way:
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>           - Currently, we use
> > > > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > > > > > >>>>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>>>>> force an
> > > > > > >>>>>>>>>>>>>>>>>>>>>           early Task commit if processing
> > more
> > > > > > records
> > > > > > >>>> would
> > > > > > >>>>>>>>>> cause
> > > > > > >>>>>>>>>>>> our
> > > > > > >>>>>>>>>>>>>>>> state
> > > > > > >>>>>>>>>>>>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>>>>>>>>           transactions to exceed the memory
> > > > assigned
> > > > > > to
> > > > > > >>>>>>>> them.
> > > > > > >>>>>>>>>>>>>>>>>>>>>           - New functionality: when a Task
> > *does*
> > > > > > >> commit,
> > > > > > >>>>>>>> we will
> > > > > > >>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>>>>>>>> checkpoint
> > > > > > >>>>>>>>>>>>>>>>>>>>>           the stores (and hence flush the
> > > > transaction
> > > > > > >>>>>>>> buffers)
> > > > > > >>>>>>>>>>>> unless
> > > > > > >>>>>>>>>>>>>> we
> > > > > > >>>>>>>>>>>>>>>>>>>> expect to
> > > > > > >>>>>>>>>>>>>>>>>>>>>           cross the
> > > > > > >>>> statestore.transaction.buffer.max.bytes
> > > > > > >>>>>>>>>>>> threshold
> > > > > > >>>>>>>>>>>>>>>> before
> > > > > > >>>>>>>>>>>>>>>>>>>> the next
> > > > > > >>>>>>>>>>>>>>>>>>>>>           commit
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions.
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> > > > > > >>>>>>>>>>>> nick.telf...@gmail.com
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> Hi Bruno!
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I meant in
> > > > terms of
> > > > > > >>>>>>>>>>>> understanding
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>> performance profile under various
> > > > circumstances. The
> > > > > > >>>>>>>> more
> > > > > > >>>>>>>>>>>> complex
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would be for
> > > > users
> > > > > > to
> > > > > > >>>>>>>>>>>> understand
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>> performance they see. For example, spilling
> > > > records
> > > > > > to
> > > > > > >>>>>>>> disk
> > > > > > >>>>>>>>>>>> when
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold
> > would, I
> > > > > > >> expect,
> > > > > > >>>>>>>>>> reduce
> > > > > > >>>>>>>>>>>>>> write
> > > > > > >>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write
> > throughput
> > > > could
> > > > > > >> be
> > > > > > >>>>>>>>>>>>>> unexpected,
> > > > > > >>>>>>>>>>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>>>>>>>>> potentially difficult to
> > diagnose/understand for
> > > > > > >> users.
> > > > > > >>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early commit"
> > > > concept is
> > > > > > >>>>>>>>>> relatively
> > > > > > >>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document, and
> > > > > > >>>> conceptually
> > > > > > >>>>>>>>>> fairly
> > > > > > >>>>>>>>>>>>>>>>>>>> obvious to
> > > > > > >>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to
> > make it
> > > > > > >> easier
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>> understand
> > > > > > >>>>>>>>>>>>>>>>>>>> when
> > > > > > >>>>>>>>>>>>>>>>>>>>>> it happens though.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> 3. (the second one)
> > > > > > >>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an
> > indirect
> > > > way
> > > > > > of
> > > > > > >>>>>>>>>> telling
> > > > > > >>>>>>>>>>>>>>>>>>>> StateStores
> > > > > > >>>>>>>>>>>>>>>>>>>>>> whether they should be transactional.
> > > > READ_COMMITTED
> > > > > > >>>>>>>>>>>> essentially
> > > > > > >>>>>>>>>>>>>>>>>>>> requires
> > > > > > >>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that two
> > > > threads
> > > > > > >>>>>>>> calling
> > > > > > >>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes
> > from
> > > > the
> > > > > > >> other
> > > > > > >>>>>>>>>>>>>> transaction
> > > > > > >>>>>>>>>>>>>>>>>>>> until
> > > > > > >>>>>>>>>>>>>>>>>>>>>> they have been committed. With
> > > > READ_UNCOMMITTED, all
> > > > > > >>>>>>>> bets are
> > > > > > >>>>>>>>>>>> off,
> > > > > > >>>>>>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe written
> > > > records
> > > > > > at
> > > > > > >>>>>>>> any
> > > > > > >>>>>>>>>>>> time,
> > > > > > >>>>>>>>>>>>>>>>>>>> which is
> > > > > > >>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That said,
> > > > > > StateStores
> > > > > > >>>>>>>> are
> > > > > > >>>>>>>>>> free
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>>>>> implement
> > > > > > >>>>>>>>>>>>>>>>>>>>>> these guarantees however they can, which is
> > a
> > > > bit
> > > > > > more
> > > > > > >>>>>>>>>> relaxed
> > > > > > >>>>>>>>>>>>>> than
> > > > > > >>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions". For
> > > > example,
> > > > > > >>>> with
> > > > > > >>>>>>>>>>>> RocksDB
> > > > > > >>>>>>>>>>>>>> we
> > > > > > >>>>>>>>>>>>>>>>>>>> would
> > > > > > >>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED ==
> > WBWI-based
> > > > > > >>>>>>>>>> "transactions",
> > > > > > >>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the
> > > > database.
> > > > > > But
> > > > > > >>>>>>>> with
> > > > > > >>>>>>>>>>>> other
> > > > > > >>>>>>>>>>>>>>>>>>>> storage
> > > > > > >>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to *always*
> > use
> > > > > > >>>>>>>> transactions,
> > > > > > >>>>>>>>>>>> even
> > > > > > >>>>>>>>>>>>>>>>>>>> when
> > > > > > >>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage engines
> > > > that
> > > > > > >> don't
> > > > > > >>>>>>>>>> provide
> > > > > > >>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation guarantees
> > can
> > > > be
> > > > > > met
> > > > > > >>>>>>>> using a
> > > > > > >>>>>>>>>>>>>>>>>>>> different
> > > > > > >>>>>>>>>>>>>>>>>>>>>> technique.
> > > > > > >>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the StateStore
> > > > interface
> > > > > > as
> > > > > > >>>>>>>>>> loosely
> > > > > > >>>>>>>>>>>>>>>> coupled
> > > > > > >>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> > > > > > >>>>>>>> implementers
> > > > > > >>>>>>>>>> more
> > > > > > >>>>>>>>>>>>>>>>>>>> freedom, and
> > > > > > >>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge
> > > > required.
> > > > > > >>>>>>>>>>>>>>>>>>>>>> That said, I understand that
> > "IsolationLevel"
> > > > might
> > > > > > >> not
> > > > > > >>>>>>>> be
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> right
> > > > > > >>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it much
> > more
> > > > > > >>>>>>>> explicit if
> > > > > > >>>>>>>>>>>>>>>>>>>> required, e.g.
> > > > > > >>>>>>>>>>>>>>>>>>>>>> boolean transactional()
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> 7-8.
> > > > > > >>>>>>>>>>>>>>>>>>>>>> I can make these changes either later today
> > or
> > > > > > >>>> tomorrow.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> Small update:
> > > > > > >>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and fixed a
> > > > bunch of
> > > > > > >>>>>>>> issues
> > > > > > >>>>>>>>>>>> that
> > > > > > >>>>>>>>>>>>>>>>>>>> needed
> > > > > > >>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests pass,
> > > > which is
> > > > > > >>>>>>>>>> promising,
> > > > > > >>>>>>>>>>>> but
> > > > > > >>>>>>>>>>>>>>>> it
> > > > > > >>>>>>>>>>>>>>>>>>>> will
> > > > > > >>>>>>>>>>>>>>>>>>>>>> need to undergo some performance testing. I
> > > > haven't
> > > > > > >>>>>>>> (yet)
> > > > > > >>>>>>>>>>>> worked
> > > > > > >>>>>>>>>>>>>> on
> > > > > > >>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, but I
> > > > would
> > > > > > >>>>>>>> expect
> > > > > > >>>>>>>>>> that,
> > > > > > >>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no
> > difference. The
> > > > > > >> branch
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>>>>>>>> available
> > > > > > >>>>>>>>>>>>>>>>>>>> at
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > https://github.com/nicktelford/kafka/tree/KIP-892-c
> > > > > > >> if
> > > > > > >>>>>>>>>> anyone
> > > > > > >>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>>>>>>>> interested in taking an early look.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna
> > <
> > > > > > >>>>>>>>>>>> cado...@apache.org>
> > > > > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> 1.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was actually
> > also
> > > > my
> > > > > > >>>>>>>> point. I
> > > > > > >>>>>>>>>>>>>>>> understood
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion path
> > as
> > > > a way
> > > > > > >> to
> > > > > > >>>>>>>> avoid
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> early
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the
> > intent.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually it is
> > > > public
> > > > > > >>>>>>>> API. My
> > > > > > >>>>>>>>>>>>>> question
> > > > > > >>>>>>>>>>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal
> > > > processing.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the size
> > of the
> > > > > > >>>>>>>> transaction
> > > > > > >>>>>>>>>>>>>> buffer
> > > > > > >>>>>>>>>>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit affects
> > the
> > > > whole
> > > > > > >>>>>>>>>> processing
> > > > > > >>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>>>>>> Kafka
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a state
> > store
> > > > is
> > > > > > not
> > > > > > >>>>>>>>>>>> confined to
> > > > > > >>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and
> > > > changes the
> > > > > > >>>>>>>> behavior
> > > > > > >>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>> other
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you that
> > it
> > > > is a
> > > > > > >>>>>>>> decent
> > > > > > >>>>>>>>>>>>>>>>>>>> compromise. I
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides and
> > list
> > > > the
> > > > > > >>>>>>>> options to
> > > > > > >>>>>>>>>>>>>>>> overcome
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all
> > options
> > > > seem
> > > > > > >>>> quite
> > > > > > >>>>>>>>>> heavy
> > > > > > >>>>>>>>>>>>>>>>>>>> compared
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand what you
> > > > mean
> > > > > > with
> > > > > > >>>>>>>> "less
> > > > > > >>>>>>>>>>>>>>>>>>>> predictable
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> for users", though.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the
> > alternatives
> > > > > > really
> > > > > > >>>>>>>>>>>>>> interesting.
> > > > > > >>>>>>>>>>>>>>>>>>>> But I
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good and
> > we
> > > > should
> > > > > > >>>>>>>> continue
> > > > > > >>>>>>>>>>>> with
> > > > > > >>>>>>>>>>>>>>>> it!
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my e-mail on
> > > > June
> > > > > > >> 20th:
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning behind
> > > > putting
> > > > > > >>>>>>>> isolation
> > > > > > >>>>>>>>>>>>>> level
> > > > > > >>>>>>>>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should
> > that
> > > > also
> > > > > > be
> > > > > > >> a
> > > > > > >>>>>>>> way
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>> give
> > > > > > >>>>>>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to decide
> > > > whether
> > > > > > to
> > > > > > >>>>>>>> turn on
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> transactions or not?
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned about
> > > > how do
> > > > > > >> you
> > > > > > >>>>>>>> know
> > > > > > >>>>>>>>>>>> if a
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written under
> > EOS,
> > > > if
> > > > > > you
> > > > > > >>>>>>>> do not
> > > > > > >>>>>>>>>>>>>> have a
> > > > > > >>>>>>>>>>>>>>>>>>>> way
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> to know if the state store is
> > transactional or
> > > > not.
> > > > > > >> If
> > > > > > >>>>>>>> a
> > > > > > >>>>>>>>>> state
> > > > > > >>>>>>>>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can be
> > > > written
> > > > > > >>>>>>>> during
> > > > > > >>>>>>>>>>>> normal
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state store
> > is not
> > > > > > >>>>>>>>>> transactional,
> > > > > > >>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written under
> > EOS.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> 7.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> My point was about not only considering the
> > > > bytes
> > > > > > in
> > > > > > >>>>>>>> memory
> > > > > > >>>>>>>>>> in
> > > > > > >>>>>>>>>>>>>>>> config
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but also
> > > > bytes
> > > > > > that
> > > > > > >>>>>>>> might
> > > > > > >>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>>>>>>> spilled
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering
> > whether you
> > > > > > >> should
> > > > > > >>>>>>>>>> remove
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory
> > bytes to
> > > > be
> > > > > > >> used
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store records." My
> > > > > > thinking
> > > > > > >>>>>>>> was
> > > > > > >>>>>>>>>> that
> > > > > > >>>>>>>>>>>>>> even
> > > > > > >>>>>>>>>>>>>>>>>>>> if a
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to
> > disk,
> > > > > > >> limiting
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>>> overall
> > > > > > >>>>>>>>>>>>>>>>>>>> bytes
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it again
> > and
> > > > > > >>>>>>>> considering
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>> recent
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much
> > sense
> > > > > > anymore.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> I like the name
> > > > > > >>>>>>>> statestore.transaction.buffer.max.bytes that
> > > > > > >>>>>>>>>>>> you
> > > > > > >>>>>>>>>>>>>>>>>>>> proposed.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> 8.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> A high-level description (without
> > > > implementation
> > > > > > >>>>>>>> details) of
> > > > > > >>>>>>>>>>>> how
> > > > > > >>>>>>>>>>>>>>>>>>>> Kafka
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of changelog
> > > > > > >>>>>>>> transactions,
> > > > > > >>>>>>>>>>>> state
> > > > > > >>>>>>>>>>>>>>>> store
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be
> > great.
> > > > > > Would
> > > > > > >>>> be
> > > > > > >>>>>>>>>> great
> > > > > > >>>>>>>>>>>> if
> > > > > > >>>>>>>>>>>>>>>> you
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the
> > > > behavior in
> > > > > > >>>>>>>> case of
> > > > > > >>>>>>>>>> a
> > > > > > >>>>>>>>>>>>>>>>>>>> failure.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional state
> > > > store
> > > > > > >>>>>>>> recover
> > > > > > >>>>>>>>>>>> after a
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the
> > transaction
> > > > > > buffer,
> > > > > > >>>>>>>> etc.
> > > > > > >>>>>>>>>>>> (that
> > > > > > >>>>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>>>>>>>>> what
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.)
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> Bruno
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> 1.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that
> > > > > > >>>>>>>> WriteBatchWithIndex
> > > > > > >>>>>>>>>>>>>>>>>>>> transactions
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to
> > exceed)
> > > > > > >>>>>>>> configured
> > > > > > >>>>>>>>>>>> memory
> > > > > > >>>>>>>>>>>>>>>>>>>> needs to
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit?
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns.
> > Ultimately,
> > > > any
> > > > > > >>>>>>>> approach
> > > > > > >>>>>>>>>>>> based
> > > > > > >>>>>>>>>>>>>> on
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> cracking
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in
> > ways
> > > > it's
> > > > > > not
> > > > > > >>>>>>>> really
> > > > > > >>>>>>>>>>>>>>>> designed
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> for is
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen performance
> > or
> > > > > > >>>>>>>> consistency
> > > > > > >>>>>>>>>>>> issues.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> 3.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing these
> > > > early
> > > > > > >>>>>>>> commits?
> > > > > > >>>>>>>>>>>> While
> > > > > > >>>>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> ideal, I
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to
> > ensure
> > > > > > >>>>>>>> consistency
> > > > > > >>>>>>>>>>>> whilst
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> maintaining
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem *very*
> > > > > > >>>>>>>> complicated, and
> > > > > > >>>>>>>>>>>> might
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> actually
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for users
> > as a
> > > > > > >>>>>>>> consequence.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of this
> > > > KIP is
> > > > > > >>>>>>>> growing a
> > > > > > >>>>>>>>>>>> bit
> > > > > > >>>>>>>>>>>>>>>> out
> > > > > > >>>>>>>>>>>>>>>>>>>> of
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss ideas
> > for
> > > > > > future
> > > > > > >>>>>>>>>>>>>>>> improvements, I
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> think
> >
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope down
> > to a
> > > > > > design
> > > > > > >>>>>>>> that
> > > > > > >>>>>>>>>>>>>> achieves
> > > > > > >>>>>>>>>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> most
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized
> > > > restorations
> > > > > > >>>>>>>> during
> > > > > > >>>>>>>>>> dirty
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design that
> > > > this KIP
> > > > > > >>>>>>>> produces
> > > > > > >>>>>>>>>>>> can
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> ultimately
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially if
> > the
> > > > bulk
> > > > > > of
> > > > > > >>>>>>>> it is
> > > > > > >>>>>>>>>>>>>> internal
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> behaviour.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week
> > trying
> > > > to
> > > > > > >>>>>>>> re-work
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>>>>>>> original
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove the
> > > > > > >>>>>>>> newTransaction()
> > > > > > >>>>>>>>>>>>>> method,
> > > > > > >>>>>>>>>>>>>>>>>>>> such
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> that
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of
> > > > > > RocksDBStore.
> > > > > > >>>>>>>> That
> > > > > > >>>>>>>>>>>> way, if
> > > > > > >>>>>>>>>>>>>>>> we
> > > > > > >>>>>>>>>>>>>>>>>>>>>>> want to
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the future,
> > > > like
> > > > > > the
> > > > > > >>>>>>>> SST
> > > > > > >>>>>>>>>> file
> > > > > > >>>>>>>>>>>>>>>>>>>> management
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so with
> > > > little/no
> > > > > > >>>> API
> > > > > > >>>>>>>>>>>> changes.
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Nick
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > > >
> >

Reply via email to