Hi Bruno,

Agreed, I can live with that for now.

In an effort to keep the scope of this KIP from expanding, I'm leaning
towards just providing a configurable default.state.isolation.level and
removing IsolationLevel from the StateStoreContext. This would be
compatible with adding support for query-time IsolationLevels in the
future, whilst providing a way for users to select an isolation level now.

The big problem with this, however, is that if a user selects processing.mode
= "exactly-once(-v2|-beta)", and default.state.isolation.level =
"READ_UNCOMMITTED", we need to guarantee that the data isn't written to
disk until commit() is called, but we also need to permit IQ threads to
read from the ongoing transaction.

A simple solution would be to (temporarily) forbid this combination of
configuration, and have default.state.isolation.level automatically switch
to READ_COMMITTED when processing.mode is anything other than
at-least-once. Do you think this would be acceptable?

In a later KIP, we can add support for query-time isolation levels and
solve this particular problem there, which would relax this restriction.

Regards,
Nick

On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <cado...@apache.org> wrote:

> Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think
> it is perfectly valid to say InMemoryKeyValueStore do not support
> READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default
> at the moment.
>
> Best,
> Bruno
>
> On 9/18/23 7:12 PM, Nick Telford wrote:
> > Oh! One other concern I haven't mentioned: if we make IsolationLevel a
> > query-time constraint, then we need to add support for READ_COMMITTED to
> > InMemoryKeyValueStore too, which will require some changes to the
> > implementation.
> >
> > On Mon, 18 Sept 2023 at 17:24, Nick Telford <nick.telf...@gmail.com>
> wrote:
> >
> >> Hi everyone,
> >>
> >> I agree that having IsolationLevel be determined at query-time is the
> >> ideal design, but there are a few sticking points:
> >>
> >> 1.
> >> There needs to be some way to communicate the IsolationLevel down to the
> >> RocksDBStore itself, so that the query can respect it. Since stores are
> >> "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore,
> etc.),
> >> we need some way to deliver that information to the bottom layer. For
> IQv2,
> >> we can use the existing State#query() method, but IQv1 has no way to do
> >> this.
> >>
> >> A simple approach, which would potentially open up other options, would
> be
> >> to add something like: ReadOnlyKeyValueStore<K, V>
> >> readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore
> (and
> >> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.).
> >>
> >> 2.
> >> As mentioned above, RocksDB WriteBatches are not thread-safe, which
> causes
> >> a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a
> >> look at RocksDB Transactions[1], but they solve a very different
> problem,
> >> and have the same thread-safety issue.
> >>
> >> One possible approach that I mentioned is chaining WriteBatches: every
> >> time a new Interactive Query is received (i.e. readOnlyView, see above,
> >> is called) we "freeze" the existing WriteBatch, and start a new one for
> new
> >> writes. The Interactive Query queries the "chain" of previous
> WriteBatches
> >> + the underlying database; while the StreamThread starts writing to the
> >> *new* WriteBatch. On-commit, the StreamThread would write *all*
> >> WriteBatches in the chain to the database (that have not yet been
> written).
> >>
> >> WriteBatches would be closed/freed only when they have been both
> >> committed, and all open Interactive Queries on them have been closed.
> This
> >> would require some reference counting.
> >>
> >> Obviously a drawback of this approach is the potential for increased
> >> memory usage: if an Interactive Query is long-lived, for example by
> doing a
> >> full scan over a large database, or even just pausing in the middle of
> an
> >> iteration, then the existing chain of WriteBatches could be kept around
> for
> >> a long time, potentially forever.
> >>
> >> --
> >>
> >> A.
> >> Going off on a tangent, it looks like in addition to supporting
> >> READ_COMMITTED queries, we could go further and support REPEATABLE_READ
> >> queries (i.e. where subsequent reads to the same key in the same
> >> Interactive Query are guaranteed to yield the same value) by making use
> of
> >> RocksDB Snapshots[2]. These are fairly lightweight, so the performance
> >> impact is likely to be negligible, but they do require that the
> Interactive
> >> Query session can be explicitly closed.
> >>
> >> This could be achieved if we made the above readOnlyView interface look
> >> more like:
> >>
> >> interface ReadOnlyKeyValueView<K, V> implements ReadOnlyKeyValueStore<K,
> >> V>, AutoCloseable {}
> >>
> >> interface ReadOnlyKeyValueStore<K, V> {
> >>      ...
> >>      ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel
> isolationLevel);
> >> }
> >>
> >> But this would be a breaking change, as existing IQv1 queries are
> >> guaranteed to never call store.close(), and therefore these would leak
> >> memory under REPEATABLE_READ.
> >>
> >> B.
> >> One thing that's notable: MyRocks states that they support
> READ_COMMITTED
> >> and REPEATABLE_READ, but they make no mention of READ_UNCOMMITTED[3][4].
> >> This could be because doing so is technically difficult/impossible using
> >> the primitives available in RocksDB.
> >>
> >> --
> >>
> >> Lucas, to address your points:
> >>
> >> U1.
> >> It's only "SHOULD" to permit alternative (i.e. non-RocksDB)
> >> implementations of StateStore that do not support atomic writes.
> Obviously
> >> in those cases, the guarantees Kafka Streams provides/expects would be
> >> relaxed. Do you think we should require all implementations to support
> >> atomic writes?
> >>
> >> U2.
> >> Stores can support multiple IsolationLevels. As we've discussed above,
> the
> >> ideal scenario would be to specify the IsolationLevel at query-time.
> >> Failing that, I think the second-best approach is to define the
> >> IsolationLevel for *all* queries based on the processing.mode, which is
> >> what the default StateStoreContext#isolationLevel() achieves. Would you
> >> prefer an alternative?
> >>
> >> While the existing implementation is equivalent to READ_UNCOMMITTED,
> this
> >> can yield unexpected results/errors under EOS, if a transaction is
> rolled
> >> back. While this would be a change in behaviour for users, it would look
> >> more like a bug fix than a breaking change. That said, we *could* make
> it
> >> configurable, and default to the existing behaviour (READ_UNCOMMITTED)
> >> instead of inferring it from the processing.mode?
> >>
> >> N1, N2.
> >> These were only primitives to avoid boxing costs, but since this is not
> a
> >> performance sensitive area, it should be fine to change if that's
> desirable.
> >>
> >> N3.
> >> It's because the store "manages its own offsets", which includes both
> >> committing the offset, *and providing it* via getCommittedOffset().
> >> Personally, I think "managesOffsets" conveys this best, but I don't mind
> >> changing it if the nomenclature is unclear.
> >>
> >> Sorry for the massive emails/essays!
> >> --
> >> Nick
> >>
> >> 1: https://github.com/facebook/rocksdb/wiki/Transactions
> >> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> >> 3: https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> >> 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/
> >>
> >> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> >> <lbruts...@confluent.io.invalid> wrote:
> >>
> >>> Hi Nick,
> >>>
> >>> since I last read it in April, the KIP has become much cleaner and
> >>> easier to read. Great work!
> >>>
> >>> It feels to me the last big open point is whether we can implement
> >>> isolation level as a query parameter. I understand that there are
> >>> implementation concerns, but as Colt says, it would be a great
> >>> addition, and would also simplify the migration path for this change.
> >>> Is the implementation problem you mentioned caused by the WriteBatch
> >>> not having a notion of a snapshot, as the underlying DB iterator does?
> >>> In that case, I am not sure a chain of WriteBatches as you propose
> >>> would fully solve the problem, but maybe I didn't dig enough into the
> >>> details to fully understand it.
> >>>
> >>> If it's not possible to implement it now, would it be an option to
> >>> make sure in this KIP that we do not fully close the door on per-query
> >>> isolation levels in the interface, as it may be possible to implement
> >>> the missing primitives in RocksDB or Speedb in the future.
> >>>
> >>> Understanding:
> >>>
> >>> * U1) Why is it only "SHOULD" for changelogOffsets to be persisted
> >>> atomically with the records?
> >>> * U2) Don't understand the default implementation of `isolationLevel`.
> >>> The isolation level should be a property of the underlying store, and
> >>> not be defined by the default config? Existing stores probably don't
> >>> guarantee READ_COMMITTED, so the default should be to return
> >>> READ_UNCOMMITTED.
> >>>
> >>> Nits:
> >>> * N1) Could `getComittedOffset` use an `OptionalLong` return type, to
> >>> avoid the `null`?
> >>> * N2) Could `apporixmateNumUncomittedBytes` use an `OptionalLong`
> >>> return type, to avoid the `-1`?
> >>> * N3) I don't understand why `managesOffsets` uses the 'manage' verb,
> >>> whereas all other methods use the "commits" verb. I'd suggest
> >>> `commitsOffsets`.
> >>>
> >>> Either way, it feels this KIP is very close to the finish line, I'm
> >>> looking forward to seeing this in production!
> >>>
> >>> Cheers,
> >>> Lucas
> >>>
> >>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <c...@littlehorse.io>
> wrote:
> >>>>
> >>>>> Making IsolationLevel a query-time constraint, rather than linking it
> >>> to
> >>>> the processing.guarantee.
> >>>>
> >>>> As I understand it, would this allow even a user of EOS to control
> >>> whether
> >>>> reading committed or uncommitted records? If so, I am highly in favor
> of
> >>>> this.
> >>>>
> >>>> I know that I was one of the early people to point out the current
> >>>> shortcoming that IQ reads uncommitted records, but just this morning I
> >>>> realized a pattern we use which means that (for certain queries) our
> >>> system
> >>>> needs to be able to read uncommitted records, which is the current
> >>> behavior
> >>>> of Kafka Streams in EOS.***
> >>>>
> >>>> If IsolationLevel being a query-time decision allows for this, then
> that
> >>>> would be amazing. I would also vote that the default behavior should
> be
> >>> for
> >>>> reading uncommitted records, because it is totally possible for a
> valid
> >>>> application to depend on that behavior, and breaking it in a minor
> >>> release
> >>>> might be a bit strong.
> >>>>
> >>>> *** (Note, for the curious reader....) Our use-case/query pattern is a
> >>> bit
> >>>> complex, but reading "uncommitted" records is actually safe in our
> case
> >>>> because processing is deterministic. Additionally, IQ being able to
> read
> >>>> uncommitted records is crucial to enable "read your own writes" on our
> >>> API:
> >>>> Due to the deterministic processing, we send an "ack" to the client
> who
> >>>> makes the request as soon as the processor processes the result. If
> they
> >>>> can't read uncommitted records, they may receive a "201 - Created"
> >>>> response, immediately followed by a "404 - Not Found" when doing a
> >>> lookup
> >>>> for the object they just created).
> >>>>
> >>>> Thanks,
> >>>> Colt McNealy
> >>>>
> >>>> *Founder, LittleHorse.dev*
> >>>>
> >>>>
> >>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <nick.telf...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Addendum:
> >>>>>
> >>>>> I think we would also face the same problem with the approach John
> >>> outlined
> >>>>> earlier (using the record cache as a transaction buffer and flushing
> >>> it
> >>>>> straight to SST files). This is because the record cache (the
> >>> ThreadCache
> >>>>> class) is not thread-safe, so every commit would invalidate open IQ
> >>>>> Iterators in the same way that RocksDB WriteBatches do.
> >>>>> --
> >>>>> Nick
> >>>>>
> >>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <nick.telf...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Bruno,
> >>>>>>
> >>>>>> I've updated the KIP based on our conversation. The only things
> >>> I've not
> >>>>>> yet done are:
> >>>>>>
> >>>>>> 1. Using transactions under ALOS and EOS.
> >>>>>> 2. Making IsolationLevel a query-time constraint, rather than
> >>> linking it
> >>>>>> to the processing.guarantee.
> >>>>>>
> >>>>>> There's a wrinkle that makes this a challenge: Interactive Queries
> >>> that
> >>>>>> open an Iterator, when using transactions and READ_UNCOMMITTED.
> >>>>>> The problem is that under READ_UNCOMMITTED, queries need to be able
> >>> to
> >>>>>> read records from the currently uncommitted transaction buffer
> >>>>>> (WriteBatch). This includes for Iterators, which should iterate
> >>> both the
> >>>>>> transaction buffer and underlying database (using
> >>>>>> WriteBatch#iteratorWithBase()).
> >>>>>>
> >>>>>> The issue is that when the StreamThread commits, it writes the
> >>> current
> >>>>>> WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the
> >>>>>> WriteBatch while an Interactive Query holds an open Iterator on it
> >>> will
> >>>>>> invalidate the Iterator. Worse, it turns out that Iterators over a
> >>>>>> WriteBatch become invalidated not just when the WriteBatch is
> >>> cleared,
> >>>>> but
> >>>>>> also when the Iterators' current key receives a new write.
> >>>>>>
> >>>>>> Now that I'm writing this, I remember that this is the major reason
> >>> that
> >>>>> I
> >>>>>> switched the original design from having a query-time
> >>> IsolationLevel to
> >>>>>> having the IsolationLevel linked to the transactionality of the
> >>> stores
> >>>>>> themselves.
> >>>>>>
> >>>>>> It *might* be possible to resolve this, by having a "chain" of
> >>>>>> WriteBatches, with the StreamThread switching to a new WriteBatch
> >>>>> whenever
> >>>>>> a new Interactive Query attempts to read from the database, but that
> >>>>> could
> >>>>>> cause some performance problems/memory pressure when subjected to a
> >>> high
> >>>>>> Interactive Query load. It would also reduce the efficiency of
> >>>>> WriteBatches
> >>>>>> on-commit, as we'd have to write N WriteBatches, where N is the
> >>> number of
> >>>>>> Interactive Queries since the last commit.
> >>>>>>
> >>>>>> I realise this is getting into the weeds of the implementation, and
> >>> you'd
> >>>>>> rather we focus on the API for now, but I think it's important to
> >>>>> consider
> >>>>>> how to implement the desired API, in case we come up with an API
> >>> that
> >>>>>> cannot be implemented efficiently, or even at all!
> >>>>>>
> >>>>>> Thoughts?
> >>>>>> --
> >>>>>> Nick
> >>>>>>
> >>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi Nick,
> >>>>>>>
> >>>>>>> 6.
> >>>>>>> Of course, you are right! My bad!
> >>>>>>> Wiping out the state in the downgrading case is fine.
> >>>>>>>
> >>>>>>>
> >>>>>>> 3a.
> >>>>>>> Focus on the public facing changes for the KIP. We will manage to
> >>> get
> >>>>>>> the internals right. Regarding state stores that do not support
> >>>>>>> READ_COMMITTED, they should throw an error stating that they do not
> >>>>>>> support READ_COMMITTED. No need to adapt all state stores
> >>> immediately.
> >>>>>>>
> >>>>>>> 3b.
> >>>>>>> I am in favor of using transactions also for ALOS.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote:
> >>>>>>>> Hi Bruno,
> >>>>>>>>
> >>>>>>>> Thanks for getting back to me!
> >>>>>>>>
> >>>>>>>> 2.
> >>>>>>>> The fact that implementations can always track estimated memory
> >>> usage
> >>>>> in
> >>>>>>>> the wrapper is a good point. I can remove -1 as an option, and
> >>> I'll
> >>>>>>> clarify
> >>>>>>>> the JavaDoc that 0 is not just for non-transactional stores,
> >>> which is
> >>>>>>>> currently misleading.
> >>>>>>>>
> >>>>>>>> 6.
> >>>>>>>> The problem with catching the exception in the downgrade process
> >>> is
> >>>>> that
> >>>>>>>> would require new code in the Kafka version being downgraded to.
> >>> Since
> >>>>>>>> users could conceivably downgrade to almost *any* older version
> >>> of
> >>>>> Kafka
> >>>>>>>> Streams, I'm not sure how we could add that code?
> >>>>>>>> The only way I can think of doing it would be to provide a
> >>> dedicated
> >>>>>>>> downgrade tool, that goes through every local store and removes
> >>> the
> >>>>>>>> offsets column families. But that seems like an unnecessary
> >>> amount of
> >>>>>>> extra
> >>>>>>>> code to maintain just to handle a somewhat niche situation, when
> >>> the
> >>>>>>>> alternative (automatically wipe and restore stores) should be
> >>>>>>> acceptable.
> >>>>>>>>
> >>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've requested.
> >>>>>>>>
> >>>>>>>> 3a.
> >>>>>>>> I agree that IsolationLevel makes more sense at query-time, and I
> >>>>>>> actually
> >>>>>>>> initially attempted to place the IsolationLevel at query-time,
> >>> but I
> >>>>> ran
> >>>>>>>> into some problems:
> >>>>>>>> - The key issue is that, under ALOS we're not staging writes in
> >>>>>>>> transactions, so can't perform writes at the READ_COMMITTED
> >>> isolation
> >>>>>>>> level. However, this may be addressed if we decide to *always*
> >>> use
> >>>>>>>> transactions as discussed under 3b.
> >>>>>>>> - IQv1 and IQv2 have quite different implementations. I remember
> >>>>> having
> >>>>>>>> some difficulty understanding the IQv1 internals, which made it
> >>>>>>> difficult
> >>>>>>>> to determine what needed to be changed. However, I *think* this
> >>> can be
> >>>>>>>> addressed for both implementations by wrapping the RocksDBStore
> >>> in an
> >>>>>>>> IsolationLevel-dependent wrapper, that overrides read methods
> >>> (get,
> >>>>>>> etc.)
> >>>>>>>> to either read directly from the database or from the ongoing
> >>>>>>> transaction.
> >>>>>>>> But IQv1 might still be difficult.
> >>>>>>>> - If IsolationLevel becomes a query constraint, then all other
> >>>>>>> StateStores
> >>>>>>>> will need to respect it, including the in-memory stores. This
> >>> would
> >>>>>>> require
> >>>>>>>> us to adapt in-memory stores to stage their writes so they can be
> >>>>>>> isolated
> >>>>>>>> from READ_COMMITTTED queries. It would also become an important
> >>>>>>>> consideration for third-party stores on upgrade, as without
> >>> changes,
> >>>>>>> they
> >>>>>>>> would not support READ_COMMITTED queries correctly.
> >>>>>>>>
> >>>>>>>> Ultimately, I may need some help making the necessary change to
> >>> IQv1
> >>>>> to
> >>>>>>>> support this, but I don't think it's fundamentally impossible,
> >>> if we
> >>>>>>> want
> >>>>>>>> to pursue this route.
> >>>>>>>>
> >>>>>>>> 3b.
> >>>>>>>> The main reason I chose to keep ALOS un-transactional was to
> >>> minimize
> >>>>>>>> behavioural change for most users (I believe most Streams users
> >>> use
> >>>>> the
> >>>>>>>> default configuration, which is ALOS). That said, it's clear
> >>> that if
> >>>>>>> ALOS
> >>>>>>>> also used transactional stores, the only change in behaviour
> >>> would be
> >>>>>>> that
> >>>>>>>> it would become *more correct*, which could be considered a "bug
> >>> fix"
> >>>>> by
> >>>>>>>> users, rather than a change they need to handle.
> >>>>>>>>
> >>>>>>>> I believe that performance using transactions (aka. RocksDB
> >>>>>>> WriteBatches)
> >>>>>>>> should actually be *better* than the un-batched write-path that
> >>> is
> >>>>>>>> currently used[1]. The only "performance" consideration will be
> >>> the
> >>>>>>>> increased memory usage that transactions require. Given the
> >>>>> mitigations
> >>>>>>> for
> >>>>>>>> this memory that we have in place, I would expect that this is
> >>> not a
> >>>>>>>> problem for most users.
> >>>>>>>>
> >>>>>>>> If we're happy to do so, we can make ALOS also use transactions.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Nick
> >>>>>>>>
> >>>>>>>> Link 1:
> >>>>>>>>
> >>>>>
> >>>
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>
> >>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org
> >>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Nick,
> >>>>>>>>>
> >>>>>>>>> Thanks for the updates and sorry for the delay on my side!
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>> Making the default implementation for flush() a no-op sounds
> >>> good to
> >>>>>>> me.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2.
> >>>>>>>>> I think what was bugging me here is that a third-party state
> >>> store
> >>>>>>> needs
> >>>>>>>>> to implement the state store interface. That means they need to
> >>>>>>>>> implement a wrapper around the actual state store as we do for
> >>>>> RocksDB
> >>>>>>>>> with RocksDBStore. So, a third-party state store can always
> >>> estimate
> >>>>>>> the
> >>>>>>>>> uncommitted bytes, if it wants, because the wrapper can record
> >>> the
> >>>>>>> added
> >>>>>>>>> bytes.
> >>>>>>>>> One case I can think of where returning -1 makes sense is when
> >>>>> Streams
> >>>>>>>>> does not need to estimate the size of the write batch and
> >>> trigger
> >>>>>>>>> extraordinary commits, because the third-party state store
> >>> takes care
> >>>>>>> of
> >>>>>>>>> memory. But in that case the method could also just return 0.
> >>> Even
> >>>>> that
> >>>>>>>>> case would be better solved with a method that returns whether
> >>> the
> >>>>>>> state
> >>>>>>>>> store manages itself the memory used for uncommitted bytes or
> >>> not.
> >>>>>>>>> Said that, I am fine with keeping the -1 return value, I was
> >>> just
> >>>>>>>>> wondering when and if it will be used.
> >>>>>>>>>
> >>>>>>>>> Regarding returning 0 for transactional state stores when the
> >>> batch
> >>>>> is
> >>>>>>>>> empty, I was just wondering because you explicitly stated
> >>>>>>>>>
> >>>>>>>>> "or {@code 0} if this StateStore does not support transactions."
> >>>>>>>>>
> >>>>>>>>> So it seemed to me returning 0 could only happen for
> >>>>> non-transactional
> >>>>>>>>> state stores.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 3.
> >>>>>>>>>
> >>>>>>>>> a) What do you think if we move the isolation level to IQ (v1
> >>> and
> >>>>> v2)?
> >>>>>>>>> In the end this is the only component that really needs to
> >>> specify
> >>>>> the
> >>>>>>>>> isolation level. It is similar to the Kafka consumer that can
> >>> choose
> >>>>>>>>> with what isolation level to read the input topic.
> >>>>>>>>> For IQv1 the isolation level should go into
> >>> StoreQueryParameters. For
> >>>>>>>>> IQv2, I would add it to the Query interface.
> >>>>>>>>>
> >>>>>>>>> b) Point a) raises the question what should happen during
> >>>>> at-least-once
> >>>>>>>>> processing when the state store does not use transactions? John
> >>> in
> >>>>> the
> >>>>>>>>> past proposed to also use transactions on state stores for
> >>>>>>>>> at-least-once. I like that idea, because it avoids aggregating
> >>> the
> >>>>> same
> >>>>>>>>> records over and over again in the case of a failure. We had a
> >>> case
> >>>>> in
> >>>>>>>>> the past where a Streams applications in at-least-once mode was
> >>>>> failing
> >>>>>>>>> continuously for some reasons I do not remember before
> >>> committing the
> >>>>>>>>> offsets. After each failover, the app aggregated again and
> >>> again the
> >>>>>>>>> same records. Of course the aggregate increased to very wrong
> >>> values
> >>>>>>>>> just because of the failover. With transactions on the state
> >>> stores
> >>>>> we
> >>>>>>>>> could have avoided this. The app would have output the same
> >>> aggregate
> >>>>>>>>> multiple times (i.e., after each failover) but at least the
> >>> value of
> >>>>>>> the
> >>>>>>>>> aggregate would not depend on the number of failovers.
> >>> Outputting the
> >>>>>>>>> same aggregate multiple times would be incorrect under
> >>> exactly-once
> >>>>> but
> >>>>>>>>> it is OK for at-least-once.
> >>>>>>>>> If it makes sense to add a config to turn on and off
> >>> transactions on
> >>>>>>>>> state stores under at-least-once or just use transactions in
> >>> any case
> >>>>>>> is
> >>>>>>>>> a question we should also discuss in this KIP. It depends a bit
> >>> on
> >>>>> the
> >>>>>>>>> performance trade-off. Maybe to be safe, I would add a config.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 4.
> >>>>>>>>> Your points are all valid. I tend to say to keep the metrics
> >>> around
> >>>>>>>>> flush() until we remove flush() completely from the interface.
> >>> Calls
> >>>>> to
> >>>>>>>>> flush() might still exist since existing processors might still
> >>> call
> >>>>>>>>> flush() explicitly as you mentioned in 1). For sure, we need to
> >>>>>>> document
> >>>>>>>>> how the metrics change due to the transactions in the upgrade
> >>> notes.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 5.
> >>>>>>>>> I see. Then you should describe how the .position files are
> >>> handled
> >>>>> in
> >>>>>>>>> a dedicated section of the KIP or incorporate the description
> >>> in the
> >>>>>>>>> "Atomic Checkpointing" section instead of only mentioning it in
> >>> the
> >>>>>>>>> "Compatibility, Deprecation, and Migration Plan".
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 6.
> >>>>>>>>> Describing upgrading and downgrading in the KIP is a good idea.
> >>>>>>>>> Regarding downgrading, I think you could also catch the
> >>> exception and
> >>>>>>> do
> >>>>>>>>> what is needed to downgrade, e.g., drop the column family. See
> >>> here
> >>>>> for
> >>>>>>>>> an example:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
> >>>>>>>>>
> >>>>>>>>> It is a bit brittle, but it works.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote:
> >>>>>>>>>> Hi Bruno,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for taking the time to review the KIP. I'm back from
> >>> leave
> >>>>> now
> >>>>>>> and
> >>>>>>>>>> intend to move this forwards as quickly as I can.
> >>>>>>>>>>
> >>>>>>>>>> Addressing your points:
> >>>>>>>>>>
> >>>>>>>>>> 1.
> >>>>>>>>>> Because flush() is part of the StateStore API, it's exposed to
> >>>>> custom
> >>>>>>>>>> Processors, which might be making calls to flush(). This was
> >>>>> actually
> >>>>>>> the
> >>>>>>>>>> case in a few integration tests.
> >>>>>>>>>> To maintain as much compatibility as possible, I'd prefer not
> >>> to
> >>>>> make
> >>>>>>>>> this
> >>>>>>>>>> an UnsupportedOperationException, as it will cause previously
> >>>>> working
> >>>>>>>>>> Processors to start throwing exceptions at runtime.
> >>>>>>>>>> I agree that it doesn't make sense for it to proxy commit(),
> >>> though,
> >>>>>>> as
> >>>>>>>>>> that would cause it to violate the "StateStores commit only
> >>> when the
> >>>>>>> Task
> >>>>>>>>>> commits" rule.
> >>>>>>>>>> Instead, I think we should make this a no-op. That way,
> >>> existing
> >>>>> user
> >>>>>>>>>> Processors will continue to work as-before, without violation
> >>> of
> >>>>> store
> >>>>>>>>>> consistency that would be caused by premature flush/commit of
> >>>>>>> StateStore
> >>>>>>>>>> data to disk.
> >>>>>>>>>> What do you think?
> >>>>>>>>>>
> >>>>>>>>>> 2.
> >>>>>>>>>> As stated in the JavaDoc, when a StateStore implementation is
> >>>>>>>>>> transactional, but is unable to estimate the uncommitted memory
> >>>>> usage,
> >>>>>>>>> the
> >>>>>>>>>> method will return -1.
> >>>>>>>>>> The intention here is to permit third-party implementations
> >>> that may
> >>>>>>> not
> >>>>>>>>> be
> >>>>>>>>>> able to estimate memory usage.
> >>>>>>>>>>
> >>>>>>>>>> Yes, it will be 0 when nothing has been written to the store
> >>> yet. I
> >>>>>>>>> thought
> >>>>>>>>>> that was implied by "This method will return an approximation
> >>> of the
> >>>>>>>>> memory
> >>>>>>>>>> would be freed by the next call to {@link #commit(Map)}" and
> >>>>> "@return
> >>>>>>> The
> >>>>>>>>>> approximate size of all records awaiting {@link #commit(Map)}",
> >>>>>>> however,
> >>>>>>>>> I
> >>>>>>>>>> can add it explicitly to the JavaDoc if you think this is
> >>> unclear?
> >>>>>>>>>>
> >>>>>>>>>> 3.
> >>>>>>>>>> I realise this is probably the most contentious point in my
> >>> design,
> >>>>>>> and
> >>>>>>>>> I'm
> >>>>>>>>>> open to changing it if I'm unable to convince you of the
> >>> benefits.
> >>>>>>>>>> Nevertheless, here's my argument:
> >>>>>>>>>> The Interactive Query (IQ) API(s) are directly provided
> >>> StateStores
> >>>>> to
> >>>>>>>>>> query, and it may be important for users to programmatically
> >>> know
> >>>>>>> which
> >>>>>>>>>> mode the StateStore is operating under. If we simply provide an
> >>>>>>>>>> "eosEnabled" boolean (as used throughout the internal streams
> >>>>>>> engine), or
> >>>>>>>>>> similar, then users will need to understand the operation and
> >>>>>>>>> consequences
> >>>>>>>>>> of each available processing mode and how it pertains to their
> >>>>>>>>> StateStore.
> >>>>>>>>>>
> >>>>>>>>>> Interactive Query users aren't the only people that care about
> >>> the
> >>>>>>>>>> processing.mode/IsolationLevel of a StateStore: implementers of
> >>>>> custom
> >>>>>>>>>> StateStores also need to understand the behaviour expected of
> >>> their
> >>>>>>>>>> implementation. KIP-892 introduces some assumptions into the
> >>> Streams
> >>>>>>>>> Engine
> >>>>>>>>>> about how StateStores operate under each processing mode, and
> >>> it's
> >>>>>>>>>> important that custom implementations adhere to those
> >>> assumptions in
> >>>>>>>>> order
> >>>>>>>>>> to maintain the consistency guarantees.
> >>>>>>>>>>
> >>>>>>>>>> IsolationLevels provide a high-level contract on the behaviour
> >>> of
> >>>>> the
> >>>>>>>>>> StateStore: a user knows that under READ_COMMITTED, they will
> >>> see
> >>>>>>> writes
> >>>>>>>>>> only after the Task has committed, and under READ_UNCOMMITTED
> >>> they
> >>>>>>> will
> >>>>>>>>> see
> >>>>>>>>>> writes immediately. No understanding of the details of each
> >>>>>>>>> processing.mode
> >>>>>>>>>> is required, either for IQ users or StateStore implementers.
> >>>>>>>>>>
> >>>>>>>>>> An argument can be made that these contractual guarantees can
> >>> simply
> >>>>>>> be
> >>>>>>>>>> documented for the processing.mode (i.e. that exactly-once and
> >>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and at-least-once
> >>> behaves
> >>>>>>> like
> >>>>>>>>>> READ_UNCOMMITTED), but there are several small issues with
> >>> this I'd
> >>>>>>>>> prefer
> >>>>>>>>>> to avoid:
> >>>>>>>>>>
> >>>>>>>>>>       - Where would we document these contracts, in a way that
> >>> is
> >>>>>>> difficult
> >>>>>>>>>>       for users/implementers to miss/ignore?
> >>>>>>>>>>       - It's not clear to users that the processing mode is
> >>>>>>> communicating
> >>>>>>>>>>       an expectation of read isolation, unless they read the
> >>>>>>>>> documentation. Users
> >>>>>>>>>>       rarely consult documentation unless they feel they need
> >>> to, so
> >>>>>>> it's
> >>>>>>>>> likely
> >>>>>>>>>>       this detail would get missed by many users.
> >>>>>>>>>>       - It tightly couples processing modes to read isolation.
> >>> Adding
> >>>>>>> new
> >>>>>>>>>>       processing modes, or changing the read isolation of
> >>> existing
> >>>>>>>>> processing
> >>>>>>>>>>       modes would be difficult/impossible.
> >>>>>>>>>>
> >>>>>>>>>> Ultimately, the cost of introducing IsolationLevels is just a
> >>> single
> >>>>>>>>>> method, since we re-use the existing IsolationLevel enum from
> >>> Kafka.
> >>>>>>> This
> >>>>>>>>>> gives us a clear place to document the contractual guarantees
> >>>>> expected
> >>>>>>>>>> of/provided by StateStores, that is accessible both by the
> >>>>> StateStore
> >>>>>>>>>> itself, and by IQ users.
> >>>>>>>>>>
> >>>>>>>>>> (Writing this I've just realised that the StateStore and IQ
> >>> APIs
> >>>>>>> actually
> >>>>>>>>>> don't provide access to StateStoreContext that IQ users would
> >>> have
> >>>>>>> direct
> >>>>>>>>>> access to... Perhaps StateStore should expose isolationLevel()
> >>>>> itself
> >>>>>>>>> too?)
> >>>>>>>>>>
> >>>>>>>>>> 4.
> >>>>>>>>>> Yeah, I'm not comfortable renaming the metrics in-place
> >>> either, as
> >>>>>>> it's a
> >>>>>>>>>> backwards incompatible change. My concern is that, if we leave
> >>> the
> >>>>>>>>> existing
> >>>>>>>>>> "flush" metrics in place, they will be confusing to users.
> >>> Right
> >>>>> now,
> >>>>>>>>>> "flush" metrics record explicit flushes to disk, but under
> >>> KIP-892,
> >>>>>>> even
> >>>>>>>>> a
> >>>>>>>>>> commit() will not explicitly flush data to disk - RocksDB will
> >>>>> decide
> >>>>>>> on
> >>>>>>>>>> when to flush memtables to disk itself.
> >>>>>>>>>>
> >>>>>>>>>> If we keep the existing "flush" metrics, we'd have two options,
> >>>>> which
> >>>>>>>>> both
> >>>>>>>>>> seem pretty bad to me:
> >>>>>>>>>>
> >>>>>>>>>>       1. Have them record calls to commit(), which would be
> >>>>>>> misleading, as
> >>>>>>>>>>       data is no longer explicitly "flushed" to disk by this
> >>> call.
> >>>>>>>>>>       2. Have them record nothing at all, which is equivalent to
> >>>>>>> removing
> >>>>>>>>> the
> >>>>>>>>>>       metrics, except that users will see the metric still
> >>> exists and
> >>>>>>> so
> >>>>>>>>> assume
> >>>>>>>>>>       that the metric is correct, and that there's a problem
> >>> with
> >>>>> their
> >>>>>>>>> system
> >>>>>>>>>>       when there isn't.
> >>>>>>>>>>
> >>>>>>>>>> I agree that removing them is also a bad solution, and I'd
> >>> like some
> >>>>>>>>>> guidance on the best path forward here.
> >>>>>>>>>>
> >>>>>>>>>> 5.
> >>>>>>>>>> Position files are updated on every write to a StateStore.
> >>> Since our
> >>>>>>>>> writes
> >>>>>>>>>> are now buffered until commit(), we can't update the Position
> >>> file
> >>>>>>> until
> >>>>>>>>>> commit() has been called, otherwise it would be inconsistent
> >>> with
> >>>>> the
> >>>>>>>>> data
> >>>>>>>>>> in the event of a rollback. Consequently, we need to manage
> >>> these
> >>>>>>> offsets
> >>>>>>>>>> the same way we manage the checkpoint offsets, and ensure
> >>> they're
> >>>>> only
> >>>>>>>>>> written on commit().
> >>>>>>>>>>
> >>>>>>>>>> 6.
> >>>>>>>>>> Agreed, although I'm not exactly sure yet what tests to write.
> >>> How
> >>>>>>>>> explicit
> >>>>>>>>>> do we need to be here in the KIP?
> >>>>>>>>>>
> >>>>>>>>>> As for upgrade/downgrade: upgrade is designed to be seamless,
> >>> and we
> >>>>>>>>> should
> >>>>>>>>>> definitely add some tests around that. Downgrade, it
> >>> transpires,
> >>>>> isn't
> >>>>>>>>>> currently possible, as the extra column family for offset
> >>> storage is
> >>>>>>>>>> incompatible with the pre-KIP-892 implementation: when you
> >>> open a
> >>>>>>> RocksDB
> >>>>>>>>>> database, you must open all available column families or
> >>> receive an
> >>>>>>>>> error.
> >>>>>>>>>> What currently happens on downgrade is that it attempts to
> >>> open the
> >>>>>>>>> store,
> >>>>>>>>>> throws an error about the offsets column family not being
> >>> opened,
> >>>>>>> which
> >>>>>>>>>> triggers a wipe and rebuild of the Task. Given that downgrades
> >>>>> should
> >>>>>>> be
> >>>>>>>>>> uncommon, I think this is acceptable behaviour, as the
> >>> end-state is
> >>>>>>>>>> consistent, even if it results in an undesirable state restore.
> >>>>>>>>>>
> >>>>>>>>>> Should I document the upgrade/downgrade behaviour explicitly
> >>> in the
> >>>>>>> KIP?
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Nick
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
> >>> cado...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Nick!
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the updates!
> >>>>>>>>>>>
> >>>>>>>>>>> 1.
> >>>>>>>>>>> Why does StateStore#flush() default to
> >>>>>>>>>>> StateStore#commit(Collections.emptyMap())?
> >>>>>>>>>>> Since calls to flush() will not exist anymore after this KIP
> >>> is
> >>>>>>>>>>> released, I would rather throw an unsupported operation
> >>> exception
> >>>>> by
> >>>>>>>>>>> default.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> When would a state store return -1 from
> >>>>>>>>>>> StateStore#approximateNumUncommittedBytes() while being
> >>>>>>> transactional?
> >>>>>>>>>>>
> >>>>>>>>>>> Wouldn't StateStore#approximateNumUncommittedBytes() also
> >>> return 0
> >>>>> if
> >>>>>>>>>>> the state store is transactional but nothing has been written
> >>> to
> >>>>> the
> >>>>>>>>>>> state store yet?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 3.
> >>>>>>>>>>> Sorry for bringing this up again. Does this KIP really need to
> >>>>>>> introduce
> >>>>>>>>>>> StateStoreContext#isolationLevel()? StateStoreContext has
> >>> already
> >>>>>>>>>>> appConfigs() which basically exposes the same information,
> >>> i.e., if
> >>>>>>> EOS
> >>>>>>>>>>> is enabled or not.
> >>>>>>>>>>> In one of your previous e-mails you wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> "My idea was to try to keep the StateStore interface as
> >>> loosely
> >>>>>>> coupled
> >>>>>>>>>>> from the Streams engine as possible, to give implementers more
> >>>>>>> freedom,
> >>>>>>>>>>> and reduce the amount of internal knowledge required."
> >>>>>>>>>>>
> >>>>>>>>>>> While I understand the intent, I doubt that it decreases the
> >>>>>>> coupling of
> >>>>>>>>>>> a StateStore interface and the Streams engine. READ_COMMITTED
> >>> only
> >>>>>>>>>>> applies to IQ but not to reads by processors. Thus,
> >>> implementers
> >>>>>>> need to
> >>>>>>>>>>> understand how Streams accesses the state stores.
> >>>>>>>>>>>
> >>>>>>>>>>> I would like to hear what others think about this.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 4.
> >>>>>>>>>>> Great exposing new metrics for transactional state stores!
> >>>>> However, I
> >>>>>>>>>>> would prefer to add new metrics and deprecate (in the docs)
> >>> the old
> >>>>>>>>>>> ones. You can find examples of deprecated metrics here:
> >>>>>>>>>>> https://kafka.apache.org/documentation/#selector_monitoring
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 5.
> >>>>>>>>>>> Why does the KIP mention position files? I do not think they
> >>> are
> >>>>>>> related
> >>>>>>>>>>> to transactions or flushes.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 6.
> >>>>>>>>>>> I think we will also need to adapt/add integration tests
> >>> besides
> >>>>> unit
> >>>>>>>>>>> tests. Additionally, we probably need integration or system
> >>> tests
> >>>>> to
> >>>>>>>>>>> verify that upgrades and downgrades between transactional and
> >>>>>>>>>>> non-transactional state stores work as expected.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote:
> >>>>>>>>>>>> One more thing: I noted John's suggestion in the KIP, under
> >>>>>>> "Rejected
> >>>>>>>>>>>> Alternatives". I still think it's an idea worth pursuing,
> >>> but I
> >>>>>>> believe
> >>>>>>>>>>>> that it's out of the scope of this KIP, because it solves a
> >>>>>>> different
> >>>>>>>>> set
> >>>>>>>>>>>> of problems to this KIP, and the scope of this one has
> >>> already
> >>>>> grown
> >>>>>>>>>>> quite
> >>>>>>>>>>>> large!
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
> >>>>> nick.telf...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I've updated the KIP (
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>> )
> >>>>>>>>>>>>> with the latest changes; mostly bringing back "Atomic
> >>>>>>> Checkpointing"
> >>>>>>>>>>> (for
> >>>>>>>>>>>>> what feels like the 10th time!). I think the one thing
> >>> missing is
> >>>>>>> some
> >>>>>>>>>>>>> changes to metrics (notably the store "flush" metrics will
> >>> need
> >>>>> to
> >>>>>>> be
> >>>>>>>>>>>>> renamed to "commit").
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The reason I brought back Atomic Checkpointing was to
> >>> decouple
> >>>>>>> store
> >>>>>>>>>>> flush
> >>>>>>>>>>>>> from store commit. This is important, because with
> >>> Transactional
> >>>>>>>>>>>>> StateStores, we now need to call "flush" on *every* Task
> >>> commit,
> >>>>>>> and
> >>>>>>>>> not
> >>>>>>>>>>>>> just when the StateStore is closing, otherwise our
> >>> transaction
> >>>>>>> buffer
> >>>>>>>>>>> will
> >>>>>>>>>>>>> never be written and persisted, instead growing unbounded! I
> >>>>>>>>>>> experimented
> >>>>>>>>>>>>> with some simple solutions, like forcing a store flush
> >>> whenever
> >>>>> the
> >>>>>>>>>>>>> transaction buffer was likely to exceed its configured
> >>> size, but
> >>>>>>> this
> >>>>>>>>>>> was
> >>>>>>>>>>>>> brittle: it prevented the transaction buffer from being
> >>>>> configured
> >>>>>>> to
> >>>>>>>>> be
> >>>>>>>>>>>>> unbounded, and it still would have required explicit
> >>> flushes of
> >>>>>>>>> RocksDB,
> >>>>>>>>>>>>> yielding sub-optimal performance and memory utilization.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I deemed Atomic Checkpointing to be the "right" way to
> >>> resolve
> >>>>> this
> >>>>>>>>>>>>> problem. By ensuring that the changelog offsets that
> >>> correspond
> >>>>> to
> >>>>>>> the
> >>>>>>>>>>> most
> >>>>>>>>>>>>> recently written records are always atomically written to
> >>> the
> >>>>>>>>> StateStore
> >>>>>>>>>>>>> (by writing them to the same transaction buffer), we can
> >>> avoid
> >>>>>>>>> forcibly
> >>>>>>>>>>>>> flushing the RocksDB memtables to disk, letting RocksDB
> >>> flush
> >>>>> them
> >>>>>>>>> only
> >>>>>>>>>>>>> when necessary, without losing any of our consistency
> >>> guarantees.
> >>>>>>> See
> >>>>>>>>>>> the
> >>>>>>>>>>>>> updated KIP for more info.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have fully implemented these changes, although I'm still
> >>> not
> >>>>>>>>> entirely
> >>>>>>>>>>>>> happy with the implementation for segmented StateStores, so
> >>> I
> >>>>> plan
> >>>>>>> to
> >>>>>>>>>>>>> refactor that. Despite that, all tests pass. If you'd like
> >>> to try
> >>>>>>> out
> >>>>>>>>> or
> >>>>>>>>>>>>> review this highly experimental and incomplete branch, it's
> >>>>>>> available
> >>>>>>>>>>> here:
> >>>>>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0.
> >>> Note:
> >>>>>>> it's
> >>>>>>>>>>> built
> >>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable base to build
> >>> and test
> >>>>>>> it
> >>>>>>>>> on,
> >>>>>>>>>>>>> and to enable easy apples-to-apples comparisons in a live
> >>>>>>>>> environment. I
> >>>>>>>>>>>>> plan to rebase it against trunk once it's nearer completion
> >>> and
> >>>>> has
> >>>>>>>>> been
> >>>>>>>>>>>>> proven on our main application.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I would really appreciate help in reviewing and testing:
> >>>>>>>>>>>>> - Segmented (Versioned, Session and Window) stores
> >>>>>>>>>>>>> - Global stores
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As I do not currently use either of these, so my primary
> >>> test
> >>>>>>>>>>> environment
> >>>>>>>>>>>>> doesn't test these areas.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I'm going on Parental Leave starting next week for a few
> >>> weeks,
> >>>>> so
> >>>>>>>>> will
> >>>>>>>>>>>>> not have time to move this forward until late August. That
> >>> said,
> >>>>>>> your
> >>>>>>>>>>>>> feedback is welcome and appreciated, I just won't be able to
> >>>>>>> respond
> >>>>>>>>> as
> >>>>>>>>>>>>> quickly as usual.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
> >>>>> nick.telf...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Yes, that's correct, although the impact on IQ is not
> >>> something
> >>>>> I
> >>>>>>> had
> >>>>>>>>>>>>>> considered.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What about atomically updating the state store from the
> >>>>>>> transaction
> >>>>>>>>>>>>>>> buffer every commit interval and writing the checkpoint
> >>> (thus,
> >>>>>>>>>>> flushing
> >>>>>>>>>>>>>>> the memtable) every configured amount of data and/or
> >>> number of
> >>>>>>>>> commit
> >>>>>>>>>>>>>>> intervals?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm not quite sure I follow. Are you suggesting that we
> >>> add an
> >>>>>>>>>>> additional
> >>>>>>>>>>>>>> config for the max number of commit intervals between
> >>>>> checkpoints?
> >>>>>>>>> That
> >>>>>>>>>>>>>> way, we would checkpoint *either* when the transaction
> >>> buffers
> >>>>> are
> >>>>>>>>>>> nearly
> >>>>>>>>>>>>>> full, *OR* whenever a certain number of commit intervals
> >>> have
> >>>>>>>>> elapsed,
> >>>>>>>>>>>>>> whichever comes first?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> That certainly seems reasonable, although this re-ignites
> >>> an
> >>>>>>> earlier
> >>>>>>>>>>>>>> debate about whether a config should be measured in
> >>> "number of
> >>>>>>> commit
> >>>>>>>>>>>>>> intervals", instead of just an absolute time.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> FWIW, I realised that this issue is the reason I was
> >>> pursuing
> >>>>> the
> >>>>>>>>>>> Atomic
> >>>>>>>>>>>>>> Checkpoints, as it de-couples memtable flush from
> >>> checkpointing,
> >>>>>>>>> which
> >>>>>>>>>>>>>> enables us to just checkpoint on every commit without any
> >>>>>>> performance
> >>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely the "best"
> >>> solution,
> >>>>>>> but
> >>>>>>>>>>> I'm not
> >>>>>>>>>>>>>> sure if this is enough to bring it back into this KIP.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm currently working on moving all the transactional logic
> >>>>>>> directly
> >>>>>>>>>>> into
> >>>>>>>>>>>>>> RocksDBStore itself, which does away with the
> >>>>>>>>> StateStore#newTransaction
> >>>>>>>>>>>>>> method, and reduces the number of new classes introduced,
> >>>>>>>>> significantly
> >>>>>>>>>>>>>> reducing the complexity. If it works, and the complexity is
> >>>>>>>>> drastically
> >>>>>>>>>>>>>> reduced, I may try bringing back Atomic Checkpoints into
> >>> this
> >>>>> KIP.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <
> >>> cado...@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the insights! Very interesting!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As far as I understand, you want to atomically update the
> >>> state
> >>>>>>>>> store
> >>>>>>>>>>>>>>> from the transaction buffer, flush the memtable of a state
> >>>>> store
> >>>>>>> and
> >>>>>>>>>>>>>>> write the checkpoint not after the commit time elapsed but
> >>>>> after
> >>>>>>> the
> >>>>>>>>>>>>>>> transaction buffer reached a size that would lead to
> >>> exceeding
> >>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes before the next
> >>> commit
> >>>>>>>>>>> interval
> >>>>>>>>>>>>>>> ends.
> >>>>>>>>>>>>>>> That means, the Kafka transaction would commit every
> >>> commit
> >>>>>>> interval
> >>>>>>>>>>> but
> >>>>>>>>>>>>>>> the state store will only be atomically updated roughly
> >>> every
> >>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of data. Also IQ
> >>> would
> >>>>>>> then
> >>>>>>>>>>> only
> >>>>>>>>>>>>>>> see new data roughly every
> >>>>>>> statestore.transaction.buffer.max.bytes.
> >>>>>>>>>>>>>>> After a failure the state store needs to restore up to
> >>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Is this correct?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What about atomically updating the state store from the
> >>>>>>> transaction
> >>>>>>>>>>>>>>> buffer every commit interval and writing the checkpoint
> >>> (thus,
> >>>>>>>>>>> flushing
> >>>>>>>>>>>>>>> the memtable) every configured amount of data and/or
> >>> number of
> >>>>>>>>> commit
> >>>>>>>>>>>>>>> intervals? In such a way, we would have the same delay for
> >>>>>>> records
> >>>>>>>>>>>>>>> appearing in output topics and IQ because both would
> >>> appear
> >>>>> when
> >>>>>>> the
> >>>>>>>>>>>>>>> Kafka transaction is committed. However, after a failure
> >>> the
> >>>>>>> state
> >>>>>>>>>>> store
> >>>>>>>>>>>>>>> still needs to restore up to
> >>>>>>> statestore.transaction.buffer.max.bytes
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> it might restore data that is already in the state store
> >>>>> because
> >>>>>>> the
> >>>>>>>>>>>>>>> checkpoint lags behind the last stable offset (i.e. the
> >>> last
> >>>>>>>>> committed
> >>>>>>>>>>>>>>> offset) of the changelog topics. Restoring data that is
> >>> already
> >>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>>>>>> state store is idempotent, so eos should not violated.
> >>>>>>>>>>>>>>> This solution needs at least one new config to specify
> >>> when a
> >>>>>>>>>>> checkpoint
> >>>>>>>>>>>>>>> should be written.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> A small correction to your previous e-mail that does not
> >>> change
> >>>>>>>>>>> anything
> >>>>>>>>>>>>>>> you said: Under alos the default commit interval is 30
> >>> seconds,
> >>>>>>> not
> >>>>>>>>>>> five
> >>>>>>>>>>>>>>> seconds.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
> >>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I've begun performance testing my branch on our staging
> >>>>>>>>> environment,
> >>>>>>>>>>>>>>>> putting it through its paces in our non-trivial
> >>> application.
> >>>>> I'm
> >>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>> observing the same increased flush rate that we saw the
> >>> last
> >>>>>>> time
> >>>>>>>>> we
> >>>>>>>>>>>>>>>> attempted to use a version of this KIP, but this time, I
> >>>>> think I
> >>>>>>>>> know
> >>>>>>>>>>>>>>> why.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called at
> >>> the end
> >>>>>>> of
> >>>>>>>>> the
> >>>>>>>>>>>>>>> Task
> >>>>>>>>>>>>>>>> commit process, has the following behaviour:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>         - Under ALOS: checkpoint the state stores. This
> >>>>> includes
> >>>>>>>>>>>>>>>>         flushing memtables in RocksDB. This is acceptable
> >>>>>>> because the
> >>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>         commit.interval.ms is 5 seconds, so forcibly
> >>> flushing
> >>>>>>>>> memtables
> >>>>>>>>>>>>>>> every 5
> >>>>>>>>>>>>>>>>         seconds is acceptable for most applications.
> >>>>>>>>>>>>>>>>         - Under EOS: checkpointing is not done, *unless*
> >>> it's
> >>>>>>> being
> >>>>>>>>>>>>>>> forced, due
> >>>>>>>>>>>>>>>>         to e.g. the Task closing or being revoked. This
> >>> means
> >>>>>>> that
> >>>>>>>>> under
> >>>>>>>>>>>>>>> normal
> >>>>>>>>>>>>>>>>         processing conditions, the state stores will not
> >>> be
> >>>>>>>>>>> checkpointed,
> >>>>>>>>>>>>>>> and will
> >>>>>>>>>>>>>>>>         not have memtables flushed at all , unless RocksDB
> >>>>>>> decides to
> >>>>>>>>>>>>>>> flush them on
> >>>>>>>>>>>>>>>>         its own. Checkpointing stores and force-flushing
> >>> their
> >>>>>>>>> memtables
> >>>>>>>>>>>>>>> is only
> >>>>>>>>>>>>>>>>         done when a Task is being closed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at least
> >>>>> *some*
> >>>>>>>>>>> normal
> >>>>>>>>>>>>>>>> Task commits, in order to write the RocksDB transaction
> >>>>> buffers
> >>>>>>> to
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> state stores, and to ensure the offsets are synced to
> >>> disk to
> >>>>>>>>> prevent
> >>>>>>>>>>>>>>>> restores from getting out of hand. Consequently, my
> >>> current
> >>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task commit, which is
> >>> far too
> >>>>>>>>>>>>>>> frequent.
> >>>>>>>>>>>>>>>> This causes checkpoints every 10,000 records, which is a
> >>>>> change
> >>>>>>> in
> >>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>> behaviour, potentially causing performance problems for
> >>> some
> >>>>>>>>>>>>>>> applications.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'm looking into possible solutions, and I'm currently
> >>> leaning
> >>>>>>>>>>> towards
> >>>>>>>>>>>>>>>> using the statestore.transaction.buffer.max.bytes
> >>>>> configuration
> >>>>>>> to
> >>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to exceed it. This
> >>> would
> >>>>>>>>>>>>>>> complement the
> >>>>>>>>>>>>>>>> existing "early Task commit" functionality that this
> >>>>>>> configuration
> >>>>>>>>>>>>>>>> provides, in the following way:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>         - Currently, we use
> >>>>>>> statestore.transaction.buffer.max.bytes
> >>>>>>>>> to
> >>>>>>>>>>>>>>> force an
> >>>>>>>>>>>>>>>>         early Task commit if processing more records would
> >>>>> cause
> >>>>>>> our
> >>>>>>>>>>> state
> >>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>         transactions to exceed the memory assigned to
> >>> them.
> >>>>>>>>>>>>>>>>         - New functionality: when a Task *does* commit,
> >>> we will
> >>>>>>> not
> >>>>>>>>>>>>>>> checkpoint
> >>>>>>>>>>>>>>>>         the stores (and hence flush the transaction
> >>> buffers)
> >>>>>>> unless
> >>>>>>>>> we
> >>>>>>>>>>>>>>> expect to
> >>>>>>>>>>>>>>>>         cross the statestore.transaction.buffer.max.bytes
> >>>>>>> threshold
> >>>>>>>>>>> before
> >>>>>>>>>>>>>>> the next
> >>>>>>>>>>>>>>>>         commit
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'm also open to suggestions.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> >>>>>>> nick.telf...@gmail.com
> >>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Bruno!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>> By "less predictable for users", I meant in terms of
> >>>>>>> understanding
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> performance profile under various circumstances. The
> >>> more
> >>>>>>> complex
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> solution, the more difficult it would be for users to
> >>>>>>> understand
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> performance they see. For example, spilling records to
> >>> disk
> >>>>>>> when
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> transaction buffer reaches a threshold would, I expect,
> >>>>> reduce
> >>>>>>>>> write
> >>>>>>>>>>>>>>>>> throughput. This reduction in write throughput could be
> >>>>>>>>> unexpected,
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> potentially difficult to diagnose/understand for users.
> >>>>>>>>>>>>>>>>> At the moment, I think the "early commit" concept is
> >>>>> relatively
> >>>>>>>>>>>>>>>>> straightforward; it's easy to document, and conceptually
> >>>>> fairly
> >>>>>>>>>>>>>>> obvious to
> >>>>>>>>>>>>>>>>> users. We could probably add a metric to make it easier
> >>> to
> >>>>>>>>>>> understand
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> it happens though.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. (the second one)
> >>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an indirect way of
> >>>>> telling
> >>>>>>>>>>>>>>> StateStores
> >>>>>>>>>>>>>>>>> whether they should be transactional. READ_COMMITTED
> >>>>>>> essentially
> >>>>>>>>>>>>>>> requires
> >>>>>>>>>>>>>>>>> transactions, because it dictates that two threads
> >>> calling
> >>>>>>>>>>>>>>>>> `newTransaction()` should not see writes from the other
> >>>>>>>>> transaction
> >>>>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all
> >>> bets are
> >>>>>>> off,
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> stores can allow threads to observe written records at
> >>> any
> >>>>>>> time,
> >>>>>>>>>>>>>>> which is
> >>>>>>>>>>>>>>>>> essentially "no transactions". That said, StateStores
> >>> are
> >>>>> free
> >>>>>>> to
> >>>>>>>>>>>>>>> implement
> >>>>>>>>>>>>>>>>> these guarantees however they can, which is a bit more
> >>>>> relaxed
> >>>>>>>>> than
> >>>>>>>>>>>>>>>>> dictating "you must use transactions". For example, with
> >>>>>>> RocksDB
> >>>>>>>>> we
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> implement these as READ_COMMITTED == WBWI-based
> >>>>> "transactions",
> >>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the database. But
> >>> with
> >>>>>>> other
> >>>>>>>>>>>>>>> storage
> >>>>>>>>>>>>>>>>> engines, it might be preferable to *always* use
> >>> transactions,
> >>>>>>> even
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> unnecessary; or there may be storage engines that don't
> >>>>> provide
> >>>>>>>>>>>>>>>>> transactions, but the isolation guarantees can be met
> >>> using a
> >>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>> technique.
> >>>>>>>>>>>>>>>>> My idea was to try to keep the StateStore interface as
> >>>>> loosely
> >>>>>>>>>>> coupled
> >>>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> >>> implementers
> >>>>> more
> >>>>>>>>>>>>>>> freedom, and
> >>>>>>>>>>>>>>>>> reduce the amount of internal knowledge required.
> >>>>>>>>>>>>>>>>> That said, I understand that "IsolationLevel" might not
> >>> be
> >>>>> the
> >>>>>>>>> right
> >>>>>>>>>>>>>>>>> abstraction, and we can always make it much more
> >>> explicit if
> >>>>>>>>>>>>>>> required, e.g.
> >>>>>>>>>>>>>>>>> boolean transactional()
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 7-8.
> >>>>>>>>>>>>>>>>> I can make these changes either later today or tomorrow.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Small update:
> >>>>>>>>>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of
> >>> issues
> >>>>>>> that
> >>>>>>>>>>>>>>> needed
> >>>>>>>>>>>>>>>>> addressing. Currently, all the tests pass, which is
> >>>>> promising,
> >>>>>>> but
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> need to undergo some performance testing. I haven't
> >>> (yet)
> >>>>>>> worked
> >>>>>>>>> on
> >>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, but I would
> >>> expect
> >>>>> that,
> >>>>>>>>>>>>>>>>> behaviourally, it should make no difference. The branch
> >>> is
> >>>>>>>>> available
> >>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if
> >>>>> anyone
> >>>>>>> is
> >>>>>>>>>>>>>>>>> interested in taking an early look.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
> >>>>>>> cado...@apache.org>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was actually also my
> >>> point. I
> >>>>>>>>>>> understood
> >>>>>>>>>>>>>>>>>> that John was proposing the ingestion path as a way to
> >>> avoid
> >>>>>>> the
> >>>>>>>>>>>>>>> early
> >>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the intent.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>> I agree with John here, that actually it is public
> >>> API. My
> >>>>>>>>> question
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> how this usage pattern affects normal processing.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>> My concern is that checking for the size of the
> >>> transaction
> >>>>>>>>> buffer
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> maybe triggering an early commit affects the whole
> >>>>> processing
> >>>>>>> of
> >>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>> Streams. The transactionality of a state store is not
> >>>>>>> confined to
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> state store itself, but spills over and changes the
> >>> behavior
> >>>>>>> of
> >>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>> parts of the system. I agree with you that it is a
> >>> decent
> >>>>>>>>>>>>>>> compromise. I
> >>>>>>>>>>>>>>>>>> just wanted to analyse the downsides and list the
> >>> options to
> >>>>>>>>>>> overcome
> >>>>>>>>>>>>>>>>>> them. I also agree with you that all options seem quite
> >>>>> heavy
> >>>>>>>>>>>>>>> compared
> >>>>>>>>>>>>>>>>>> with your KIP. I do not understand what you mean with
> >>> "less
> >>>>>>>>>>>>>>> predictable
> >>>>>>>>>>>>>>>>>> for users", though.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I found the discussions about the alternatives really
> >>>>>>>>> interesting.
> >>>>>>>>>>>>>>> But I
> >>>>>>>>>>>>>>>>>> also think that your plan sounds good and we should
> >>> continue
> >>>>>>> with
> >>>>>>>>>>> it!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Some comments on your reply to my e-mail on June 20th:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning behind putting
> >>> isolation
> >>>>>>>>> level
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> the state store context. Thanks! Should that also be a
> >>> way
> >>>>> to
> >>>>>>>>> give
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> the state store the opportunity to decide whether to
> >>> turn on
> >>>>>>>>>>>>>>>>>> transactions or not?
> >>>>>>>>>>>>>>>>>> With my comment, I was more concerned about how do you
> >>> know
> >>>>>>> if a
> >>>>>>>>>>>>>>>>>> checkpoint file needs to be written under EOS, if you
> >>> do not
> >>>>>>>>> have a
> >>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>> to know if the state store is transactional or not. If
> >>> a
> >>>>> state
> >>>>>>>>>>> store
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> transactional, the checkpoint file can be written
> >>> during
> >>>>>>> normal
> >>>>>>>>>>>>>>>>>> processing under EOS. If the state store is not
> >>>>> transactional,
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>> checkpoint file must not be written under EOS.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 7.
> >>>>>>>>>>>>>>>>>> My point was about not only considering the bytes in
> >>> memory
> >>>>> in
> >>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes that
> >>> might
> >>>>> be
> >>>>>>>>>>>>>>> spilled
> >>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering whether you should
> >>>>> remove
> >>>>>>> the
> >>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory bytes to be used
> >>> to
> >>>>>>>>>>>>>>>>>> buffer uncommitted state-store records." My thinking
> >>> was
> >>>>> that
> >>>>>>>>> even
> >>>>>>>>>>>>>>> if a
> >>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to disk, limiting
> >>> the
> >>>>>>>>> overall
> >>>>>>>>>>>>>>> bytes
> >>>>>>>>>>>>>>>>>> might make sense. Thinking about it again and
> >>> considering
> >>>>> the
> >>>>>>>>>>> recent
> >>>>>>>>>>>>>>>>>> discussions, it does not make too much sense anymore.
> >>>>>>>>>>>>>>>>>> I like the name
> >>> statestore.transaction.buffer.max.bytes that
> >>>>>>> you
> >>>>>>>>>>>>>>> proposed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 8.
> >>>>>>>>>>>>>>>>>> A high-level description (without implementation
> >>> details) of
> >>>>>>> how
> >>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>> Streams will manage the commit of changelog
> >>> transactions,
> >>>>>>> state
> >>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>> transactions and checkpointing would be great. Would be
> >>>>> great
> >>>>>>> if
> >>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> could also add some sentences about the behavior in
> >>> case of
> >>>>> a
> >>>>>>>>>>>>>>> failure.
> >>>>>>>>>>>>>>>>>> For instance how does a transactional state store
> >>> recover
> >>>>>>> after a
> >>>>>>>>>>>>>>>>>> failure or what happens with the transaction buffer,
> >>> etc.
> >>>>>>> (that
> >>>>>>>>> is
> >>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that
> >>> WriteBatchWithIndex
> >>>>>>>>>>>>>>> transactions
> >>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to exceed)
> >>> configured
> >>>>>>> memory
> >>>>>>>>>>>>>>> needs to
> >>>>>>>>>>>>>>>>>>> trigger an early commit?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>> This is one of my big concerns. Ultimately, any
> >>> approach
> >>>>>>> based
> >>>>>>>>> on
> >>>>>>>>>>>>>>>>>> cracking
> >>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in ways it's not
> >>> really
> >>>>>>>>>>> designed
> >>>>>>>>>>>>>>>>>> for is
> >>>>>>>>>>>>>>>>>>> likely to have some unforseen performance or
> >>> consistency
> >>>>>>> issues.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>> What's your motivation for removing these early
> >>> commits?
> >>>>>>> While
> >>>>>>>>> not
> >>>>>>>>>>>>>>>>>> ideal, I
> >>>>>>>>>>>>>>>>>>> think they're a decent compromise to ensure
> >>> consistency
> >>>>>>> whilst
> >>>>>>>>>>>>>>>>>> maintaining
> >>>>>>>>>>>>>>>>>>> good and predictable performance.
> >>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem *very*
> >>> complicated, and
> >>>>>>> might
> >>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>> make behaviour less predictable for users as a
> >>> consequence.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is
> >>> growing a
> >>>>>>> bit
> >>>>>>>>>>> out
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> control. While it's good to discuss ideas for future
> >>>>>>>>>>> improvements, I
> >>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>> it's important to narrow the scope down to a design
> >>> that
> >>>>>>>>> achieves
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>>>>> pressing objectives (constant sized restorations
> >>> during
> >>>>> dirty
> >>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design that this KIP
> >>> produces
> >>>>>>> can
> >>>>>>>>>>>>>>>>>> ultimately
> >>>>>>>>>>>>>>>>>>> be changed in the future, especially if the bulk of
> >>> it is
> >>>>>>>>> internal
> >>>>>>>>>>>>>>>>>>> behaviour.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I'm going to spend some time next week trying to
> >>> re-work
> >>>>> the
> >>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove the
> >>> newTransaction()
> >>>>>>>>> method,
> >>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> it's just an implementation detail of RocksDBStore.
> >>> That
> >>>>>>> way, if
> >>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>>>> replace WBWI with something in the future, like the
> >>> SST
> >>>>> file
> >>>>>>>>>>>>>>> management
> >>>>>>>>>>>>>>>>>>> outlined by John, then we can do so with little/no API
> >>>>>>> changes.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> >
>

Reply via email to