Oh! One other concern I haven't mentioned: if we make IsolationLevel a
query-time constraint, then we need to add support for READ_COMMITTED to
InMemoryKeyValueStore too, which will require some changes to the
implementation.

On Mon, 18 Sept 2023 at 17:24, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> I agree that having IsolationLevel be determined at query-time is the
> ideal design, but there are a few sticking points:
>
> 1.
> There needs to be some way to communicate the IsolationLevel down to the
> RocksDBStore itself, so that the query can respect it. Since stores are
> "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore, etc.),
> we need some way to deliver that information to the bottom layer. For IQv2,
> we can use the existing State#query() method, but IQv1 has no way to do
> this.
>
> A simple approach, which would potentially open up other options, would be
> to add something like: ReadOnlyKeyValueStore<K, V>
> readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore (and
> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.).
>
> 2.
> As mentioned above, RocksDB WriteBatches are not thread-safe, which causes
> a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a
> look at RocksDB Transactions[1], but they solve a very different problem,
> and have the same thread-safety issue.
>
> One possible approach that I mentioned is chaining WriteBatches: every
> time a new Interactive Query is received (i.e. readOnlyView, see above,
> is called) we "freeze" the existing WriteBatch, and start a new one for new
> writes. The Interactive Query queries the "chain" of previous WriteBatches
> + the underlying database; while the StreamThread starts writing to the
> *new* WriteBatch. On-commit, the StreamThread would write *all*
> WriteBatches in the chain to the database (that have not yet been written).
>
> WriteBatches would be closed/freed only when they have been both
> committed, and all open Interactive Queries on them have been closed. This
> would require some reference counting.
>
> Obviously a drawback of this approach is the potential for increased
> memory usage: if an Interactive Query is long-lived, for example by doing a
> full scan over a large database, or even just pausing in the middle of an
> iteration, then the existing chain of WriteBatches could be kept around for
> a long time, potentially forever.
>
> --
>
> A.
> Going off on a tangent, it looks like in addition to supporting
> READ_COMMITTED queries, we could go further and support REPEATABLE_READ
> queries (i.e. where subsequent reads to the same key in the same
> Interactive Query are guaranteed to yield the same value) by making use of
> RocksDB Snapshots[2]. These are fairly lightweight, so the performance
> impact is likely to be negligible, but they do require that the Interactive
> Query session can be explicitly closed.
>
> This could be achieved if we made the above readOnlyView interface look
> more like:
>
> interface ReadOnlyKeyValueView<K, V> implements ReadOnlyKeyValueStore<K,
> V>, AutoCloseable {}
>
> interface ReadOnlyKeyValueStore<K, V> {
>     ...
>     ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel isolationLevel);
> }
>
> But this would be a breaking change, as existing IQv1 queries are
> guaranteed to never call store.close(), and therefore these would leak
> memory under REPEATABLE_READ.
>
> B.
> One thing that's notable: MyRocks states that they support READ_COMMITTED
> and REPEATABLE_READ, but they make no mention of READ_UNCOMMITTED[3][4].
> This could be because doing so is technically difficult/impossible using
> the primitives available in RocksDB.
>
> --
>
> Lucas, to address your points:
>
> U1.
> It's only "SHOULD" to permit alternative (i.e. non-RocksDB)
> implementations of StateStore that do not support atomic writes. Obviously
> in those cases, the guarantees Kafka Streams provides/expects would be
> relaxed. Do you think we should require all implementations to support
> atomic writes?
>
> U2.
> Stores can support multiple IsolationLevels. As we've discussed above, the
> ideal scenario would be to specify the IsolationLevel at query-time.
> Failing that, I think the second-best approach is to define the
> IsolationLevel for *all* queries based on the processing.mode, which is
> what the default StateStoreContext#isolationLevel() achieves. Would you
> prefer an alternative?
>
> While the existing implementation is equivalent to READ_UNCOMMITTED, this
> can yield unexpected results/errors under EOS, if a transaction is rolled
> back. While this would be a change in behaviour for users, it would look
> more like a bug fix than a breaking change. That said, we *could* make it
> configurable, and default to the existing behaviour (READ_UNCOMMITTED)
> instead of inferring it from the processing.mode?
>
> N1, N2.
> These were only primitives to avoid boxing costs, but since this is not a
> performance sensitive area, it should be fine to change if that's desirable.
>
> N3.
> It's because the store "manages its own offsets", which includes both
> committing the offset, *and providing it* via getCommittedOffset().
> Personally, I think "managesOffsets" conveys this best, but I don't mind
> changing it if the nomenclature is unclear.
>
> Sorry for the massive emails/essays!
> --
> Nick
>
> 1: https://github.com/facebook/rocksdb/wiki/Transactions
> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> 3: https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/
>
> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> <lbruts...@confluent.io.invalid> wrote:
>
>> Hi Nick,
>>
>> since I last read it in April, the KIP has become much cleaner and
>> easier to read. Great work!
>>
>> It feels to me the last big open point is whether we can implement
>> isolation level as a query parameter. I understand that there are
>> implementation concerns, but as Colt says, it would be a great
>> addition, and would also simplify the migration path for this change.
>> Is the implementation problem you mentioned caused by the WriteBatch
>> not having a notion of a snapshot, as the underlying DB iterator does?
>> In that case, I am not sure a chain of WriteBatches as you propose
>> would fully solve the problem, but maybe I didn't dig enough into the
>> details to fully understand it.
>>
>> If it's not possible to implement it now, would it be an option to
>> make sure in this KIP that we do not fully close the door on per-query
>> isolation levels in the interface, as it may be possible to implement
>> the missing primitives in RocksDB or Speedb in the future.
>>
>> Understanding:
>>
>> * U1) Why is it only "SHOULD" for changelogOffsets to be persisted
>> atomically with the records?
>> * U2) Don't understand the default implementation of `isolationLevel`.
>> The isolation level should be a property of the underlying store, and
>> not be defined by the default config? Existing stores probably don't
>> guarantee READ_COMMITTED, so the default should be to return
>> READ_UNCOMMITTED.
>>
>> Nits:
>> * N1) Could `getComittedOffset` use an `OptionalLong` return type, to
>> avoid the `null`?
>> * N2) Could `apporixmateNumUncomittedBytes` use an `OptionalLong`
>> return type, to avoid the `-1`?
>> * N3) I don't understand why `managesOffsets` uses the 'manage' verb,
>> whereas all other methods use the "commits" verb. I'd suggest
>> `commitsOffsets`.
>>
>> Either way, it feels this KIP is very close to the finish line, I'm
>> looking forward to seeing this in production!
>>
>> Cheers,
>> Lucas
>>
>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <c...@littlehorse.io> wrote:
>> >
>> > > Making IsolationLevel a query-time constraint, rather than linking it
>> to
>> > the processing.guarantee.
>> >
>> > As I understand it, would this allow even a user of EOS to control
>> whether
>> > reading committed or uncommitted records? If so, I am highly in favor of
>> > this.
>> >
>> > I know that I was one of the early people to point out the current
>> > shortcoming that IQ reads uncommitted records, but just this morning I
>> > realized a pattern we use which means that (for certain queries) our
>> system
>> > needs to be able to read uncommitted records, which is the current
>> behavior
>> > of Kafka Streams in EOS.***
>> >
>> > If IsolationLevel being a query-time decision allows for this, then that
>> > would be amazing. I would also vote that the default behavior should be
>> for
>> > reading uncommitted records, because it is totally possible for a valid
>> > application to depend on that behavior, and breaking it in a minor
>> release
>> > might be a bit strong.
>> >
>> > *** (Note, for the curious reader....) Our use-case/query pattern is a
>> bit
>> > complex, but reading "uncommitted" records is actually safe in our case
>> > because processing is deterministic. Additionally, IQ being able to read
>> > uncommitted records is crucial to enable "read your own writes" on our
>> API:
>> > Due to the deterministic processing, we send an "ack" to the client who
>> > makes the request as soon as the processor processes the result. If they
>> > can't read uncommitted records, they may receive a "201 - Created"
>> > response, immediately followed by a "404 - Not Found" when doing a
>> lookup
>> > for the object they just created).
>> >
>> > Thanks,
>> > Colt McNealy
>> >
>> > *Founder, LittleHorse.dev*
>> >
>> >
>> > On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <nick.telf...@gmail.com>
>> wrote:
>> >
>> > > Addendum:
>> > >
>> > > I think we would also face the same problem with the approach John
>> outlined
>> > > earlier (using the record cache as a transaction buffer and flushing
>> it
>> > > straight to SST files). This is because the record cache (the
>> ThreadCache
>> > > class) is not thread-safe, so every commit would invalidate open IQ
>> > > Iterators in the same way that RocksDB WriteBatches do.
>> > > --
>> > > Nick
>> > >
>> > > On Wed, 13 Sept 2023 at 16:58, Nick Telford <nick.telf...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Bruno,
>> > > >
>> > > > I've updated the KIP based on our conversation. The only things
>> I've not
>> > > > yet done are:
>> > > >
>> > > > 1. Using transactions under ALOS and EOS.
>> > > > 2. Making IsolationLevel a query-time constraint, rather than
>> linking it
>> > > > to the processing.guarantee.
>> > > >
>> > > > There's a wrinkle that makes this a challenge: Interactive Queries
>> that
>> > > > open an Iterator, when using transactions and READ_UNCOMMITTED.
>> > > > The problem is that under READ_UNCOMMITTED, queries need to be able
>> to
>> > > > read records from the currently uncommitted transaction buffer
>> > > > (WriteBatch). This includes for Iterators, which should iterate
>> both the
>> > > > transaction buffer and underlying database (using
>> > > > WriteBatch#iteratorWithBase()).
>> > > >
>> > > > The issue is that when the StreamThread commits, it writes the
>> current
>> > > > WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the
>> > > > WriteBatch while an Interactive Query holds an open Iterator on it
>> will
>> > > > invalidate the Iterator. Worse, it turns out that Iterators over a
>> > > > WriteBatch become invalidated not just when the WriteBatch is
>> cleared,
>> > > but
>> > > > also when the Iterators' current key receives a new write.
>> > > >
>> > > > Now that I'm writing this, I remember that this is the major reason
>> that
>> > > I
>> > > > switched the original design from having a query-time
>> IsolationLevel to
>> > > > having the IsolationLevel linked to the transactionality of the
>> stores
>> > > > themselves.
>> > > >
>> > > > It *might* be possible to resolve this, by having a "chain" of
>> > > > WriteBatches, with the StreamThread switching to a new WriteBatch
>> > > whenever
>> > > > a new Interactive Query attempts to read from the database, but that
>> > > could
>> > > > cause some performance problems/memory pressure when subjected to a
>> high
>> > > > Interactive Query load. It would also reduce the efficiency of
>> > > WriteBatches
>> > > > on-commit, as we'd have to write N WriteBatches, where N is the
>> number of
>> > > > Interactive Queries since the last commit.
>> > > >
>> > > > I realise this is getting into the weeds of the implementation, and
>> you'd
>> > > > rather we focus on the API for now, but I think it's important to
>> > > consider
>> > > > how to implement the desired API, in case we come up with an API
>> that
>> > > > cannot be implemented efficiently, or even at all!
>> > > >
>> > > > Thoughts?
>> > > > --
>> > > > Nick
>> > > >
>> > > > On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org>
>> wrote:
>> > > >
>> > > >> Hi Nick,
>> > > >>
>> > > >> 6.
>> > > >> Of course, you are right! My bad!
>> > > >> Wiping out the state in the downgrading case is fine.
>> > > >>
>> > > >>
>> > > >> 3a.
>> > > >> Focus on the public facing changes for the KIP. We will manage to
>> get
>> > > >> the internals right. Regarding state stores that do not support
>> > > >> READ_COMMITTED, they should throw an error stating that they do not
>> > > >> support READ_COMMITTED. No need to adapt all state stores
>> immediately.
>> > > >>
>> > > >> 3b.
>> > > >> I am in favor of using transactions also for ALOS.
>> > > >>
>> > > >>
>> > > >> Best,
>> > > >> Bruno
>> > > >>
>> > > >> On 9/13/23 11:57 AM, Nick Telford wrote:
>> > > >> > Hi Bruno,
>> > > >> >
>> > > >> > Thanks for getting back to me!
>> > > >> >
>> > > >> > 2.
>> > > >> > The fact that implementations can always track estimated memory
>> usage
>> > > in
>> > > >> > the wrapper is a good point. I can remove -1 as an option, and
>> I'll
>> > > >> clarify
>> > > >> > the JavaDoc that 0 is not just for non-transactional stores,
>> which is
>> > > >> > currently misleading.
>> > > >> >
>> > > >> > 6.
>> > > >> > The problem with catching the exception in the downgrade process
>> is
>> > > that
>> > > >> > would require new code in the Kafka version being downgraded to.
>> Since
>> > > >> > users could conceivably downgrade to almost *any* older version
>> of
>> > > Kafka
>> > > >> > Streams, I'm not sure how we could add that code?
>> > > >> > The only way I can think of doing it would be to provide a
>> dedicated
>> > > >> > downgrade tool, that goes through every local store and removes
>> the
>> > > >> > offsets column families. But that seems like an unnecessary
>> amount of
>> > > >> extra
>> > > >> > code to maintain just to handle a somewhat niche situation, when
>> the
>> > > >> > alternative (automatically wipe and restore stores) should be
>> > > >> acceptable.
>> > > >> >
>> > > >> > 1, 4, 5: Agreed. I'll make the changes you've requested.
>> > > >> >
>> > > >> > 3a.
>> > > >> > I agree that IsolationLevel makes more sense at query-time, and I
>> > > >> actually
>> > > >> > initially attempted to place the IsolationLevel at query-time,
>> but I
>> > > ran
>> > > >> > into some problems:
>> > > >> > - The key issue is that, under ALOS we're not staging writes in
>> > > >> > transactions, so can't perform writes at the READ_COMMITTED
>> isolation
>> > > >> > level. However, this may be addressed if we decide to *always*
>> use
>> > > >> > transactions as discussed under 3b.
>> > > >> > - IQv1 and IQv2 have quite different implementations. I remember
>> > > having
>> > > >> > some difficulty understanding the IQv1 internals, which made it
>> > > >> difficult
>> > > >> > to determine what needed to be changed. However, I *think* this
>> can be
>> > > >> > addressed for both implementations by wrapping the RocksDBStore
>> in an
>> > > >> > IsolationLevel-dependent wrapper, that overrides read methods
>> (get,
>> > > >> etc.)
>> > > >> > to either read directly from the database or from the ongoing
>> > > >> transaction.
>> > > >> > But IQv1 might still be difficult.
>> > > >> > - If IsolationLevel becomes a query constraint, then all other
>> > > >> StateStores
>> > > >> > will need to respect it, including the in-memory stores. This
>> would
>> > > >> require
>> > > >> > us to adapt in-memory stores to stage their writes so they can be
>> > > >> isolated
>> > > >> > from READ_COMMITTTED queries. It would also become an important
>> > > >> > consideration for third-party stores on upgrade, as without
>> changes,
>> > > >> they
>> > > >> > would not support READ_COMMITTED queries correctly.
>> > > >> >
>> > > >> > Ultimately, I may need some help making the necessary change to
>> IQv1
>> > > to
>> > > >> > support this, but I don't think it's fundamentally impossible,
>> if we
>> > > >> want
>> > > >> > to pursue this route.
>> > > >> >
>> > > >> > 3b.
>> > > >> > The main reason I chose to keep ALOS un-transactional was to
>> minimize
>> > > >> > behavioural change for most users (I believe most Streams users
>> use
>> > > the
>> > > >> > default configuration, which is ALOS). That said, it's clear
>> that if
>> > > >> ALOS
>> > > >> > also used transactional stores, the only change in behaviour
>> would be
>> > > >> that
>> > > >> > it would become *more correct*, which could be considered a "bug
>> fix"
>> > > by
>> > > >> > users, rather than a change they need to handle.
>> > > >> >
>> > > >> > I believe that performance using transactions (aka. RocksDB
>> > > >> WriteBatches)
>> > > >> > should actually be *better* than the un-batched write-path that
>> is
>> > > >> > currently used[1]. The only "performance" consideration will be
>> the
>> > > >> > increased memory usage that transactions require. Given the
>> > > mitigations
>> > > >> for
>> > > >> > this memory that we have in place, I would expect that this is
>> not a
>> > > >> > problem for most users.
>> > > >> >
>> > > >> > If we're happy to do so, we can make ALOS also use transactions.
>> > > >> >
>> > > >> > Regards,
>> > > >> > Nick
>> > > >> >
>> > > >> > Link 1:
>> > > >> >
>> > >
>> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>> > > >> >
>> > > >> > On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org
>> >
>> > > >> wrote:
>> > > >> >
>> > > >> >> Hi Nick,
>> > > >> >>
>> > > >> >> Thanks for the updates and sorry for the delay on my side!
>> > > >> >>
>> > > >> >>
>> > > >> >> 1.
>> > > >> >> Making the default implementation for flush() a no-op sounds
>> good to
>> > > >> me.
>> > > >> >>
>> > > >> >>
>> > > >> >> 2.
>> > > >> >> I think what was bugging me here is that a third-party state
>> store
>> > > >> needs
>> > > >> >> to implement the state store interface. That means they need to
>> > > >> >> implement a wrapper around the actual state store as we do for
>> > > RocksDB
>> > > >> >> with RocksDBStore. So, a third-party state store can always
>> estimate
>> > > >> the
>> > > >> >> uncommitted bytes, if it wants, because the wrapper can record
>> the
>> > > >> added
>> > > >> >> bytes.
>> > > >> >> One case I can think of where returning -1 makes sense is when
>> > > Streams
>> > > >> >> does not need to estimate the size of the write batch and
>> trigger
>> > > >> >> extraordinary commits, because the third-party state store
>> takes care
>> > > >> of
>> > > >> >> memory. But in that case the method could also just return 0.
>> Even
>> > > that
>> > > >> >> case would be better solved with a method that returns whether
>> the
>> > > >> state
>> > > >> >> store manages itself the memory used for uncommitted bytes or
>> not.
>> > > >> >> Said that, I am fine with keeping the -1 return value, I was
>> just
>> > > >> >> wondering when and if it will be used.
>> > > >> >>
>> > > >> >> Regarding returning 0 for transactional state stores when the
>> batch
>> > > is
>> > > >> >> empty, I was just wondering because you explicitly stated
>> > > >> >>
>> > > >> >> "or {@code 0} if this StateStore does not support transactions."
>> > > >> >>
>> > > >> >> So it seemed to me returning 0 could only happen for
>> > > non-transactional
>> > > >> >> state stores.
>> > > >> >>
>> > > >> >>
>> > > >> >> 3.
>> > > >> >>
>> > > >> >> a) What do you think if we move the isolation level to IQ (v1
>> and
>> > > v2)?
>> > > >> >> In the end this is the only component that really needs to
>> specify
>> > > the
>> > > >> >> isolation level. It is similar to the Kafka consumer that can
>> choose
>> > > >> >> with what isolation level to read the input topic.
>> > > >> >> For IQv1 the isolation level should go into
>> StoreQueryParameters. For
>> > > >> >> IQv2, I would add it to the Query interface.
>> > > >> >>
>> > > >> >> b) Point a) raises the question what should happen during
>> > > at-least-once
>> > > >> >> processing when the state store does not use transactions? John
>> in
>> > > the
>> > > >> >> past proposed to also use transactions on state stores for
>> > > >> >> at-least-once. I like that idea, because it avoids aggregating
>> the
>> > > same
>> > > >> >> records over and over again in the case of a failure. We had a
>> case
>> > > in
>> > > >> >> the past where a Streams applications in at-least-once mode was
>> > > failing
>> > > >> >> continuously for some reasons I do not remember before
>> committing the
>> > > >> >> offsets. After each failover, the app aggregated again and
>> again the
>> > > >> >> same records. Of course the aggregate increased to very wrong
>> values
>> > > >> >> just because of the failover. With transactions on the state
>> stores
>> > > we
>> > > >> >> could have avoided this. The app would have output the same
>> aggregate
>> > > >> >> multiple times (i.e., after each failover) but at least the
>> value of
>> > > >> the
>> > > >> >> aggregate would not depend on the number of failovers.
>> Outputting the
>> > > >> >> same aggregate multiple times would be incorrect under
>> exactly-once
>> > > but
>> > > >> >> it is OK for at-least-once.
>> > > >> >> If it makes sense to add a config to turn on and off
>> transactions on
>> > > >> >> state stores under at-least-once or just use transactions in
>> any case
>> > > >> is
>> > > >> >> a question we should also discuss in this KIP. It depends a bit
>> on
>> > > the
>> > > >> >> performance trade-off. Maybe to be safe, I would add a config.
>> > > >> >>
>> > > >> >>
>> > > >> >> 4.
>> > > >> >> Your points are all valid. I tend to say to keep the metrics
>> around
>> > > >> >> flush() until we remove flush() completely from the interface.
>> Calls
>> > > to
>> > > >> >> flush() might still exist since existing processors might still
>> call
>> > > >> >> flush() explicitly as you mentioned in 1). For sure, we need to
>> > > >> document
>> > > >> >> how the metrics change due to the transactions in the upgrade
>> notes.
>> > > >> >>
>> > > >> >>
>> > > >> >> 5.
>> > > >> >> I see. Then you should describe how the .position files are
>> handled
>> > > in
>> > > >> >> a dedicated section of the KIP or incorporate the description
>> in the
>> > > >> >> "Atomic Checkpointing" section instead of only mentioning it in
>> the
>> > > >> >> "Compatibility, Deprecation, and Migration Plan".
>> > > >> >>
>> > > >> >>
>> > > >> >> 6.
>> > > >> >> Describing upgrading and downgrading in the KIP is a good idea.
>> > > >> >> Regarding downgrading, I think you could also catch the
>> exception and
>> > > >> do
>> > > >> >> what is needed to downgrade, e.g., drop the column family. See
>> here
>> > > for
>> > > >> >> an example:
>> > > >> >>
>> > > >> >>
>> > > >> >>
>> > > >>
>> > >
>> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
>> > > >> >>
>> > > >> >> It is a bit brittle, but it works.
>> > > >> >>
>> > > >> >>
>> > > >> >> Best,
>> > > >> >> Bruno
>> > > >> >>
>> > > >> >>
>> > > >> >> On 8/24/23 12:18 PM, Nick Telford wrote:
>> > > >> >>> Hi Bruno,
>> > > >> >>>
>> > > >> >>> Thanks for taking the time to review the KIP. I'm back from
>> leave
>> > > now
>> > > >> and
>> > > >> >>> intend to move this forwards as quickly as I can.
>> > > >> >>>
>> > > >> >>> Addressing your points:
>> > > >> >>>
>> > > >> >>> 1.
>> > > >> >>> Because flush() is part of the StateStore API, it's exposed to
>> > > custom
>> > > >> >>> Processors, which might be making calls to flush(). This was
>> > > actually
>> > > >> the
>> > > >> >>> case in a few integration tests.
>> > > >> >>> To maintain as much compatibility as possible, I'd prefer not
>> to
>> > > make
>> > > >> >> this
>> > > >> >>> an UnsupportedOperationException, as it will cause previously
>> > > working
>> > > >> >>> Processors to start throwing exceptions at runtime.
>> > > >> >>> I agree that it doesn't make sense for it to proxy commit(),
>> though,
>> > > >> as
>> > > >> >>> that would cause it to violate the "StateStores commit only
>> when the
>> > > >> Task
>> > > >> >>> commits" rule.
>> > > >> >>> Instead, I think we should make this a no-op. That way,
>> existing
>> > > user
>> > > >> >>> Processors will continue to work as-before, without violation
>> of
>> > > store
>> > > >> >>> consistency that would be caused by premature flush/commit of
>> > > >> StateStore
>> > > >> >>> data to disk.
>> > > >> >>> What do you think?
>> > > >> >>>
>> > > >> >>> 2.
>> > > >> >>> As stated in the JavaDoc, when a StateStore implementation is
>> > > >> >>> transactional, but is unable to estimate the uncommitted memory
>> > > usage,
>> > > >> >> the
>> > > >> >>> method will return -1.
>> > > >> >>> The intention here is to permit third-party implementations
>> that may
>> > > >> not
>> > > >> >> be
>> > > >> >>> able to estimate memory usage.
>> > > >> >>>
>> > > >> >>> Yes, it will be 0 when nothing has been written to the store
>> yet. I
>> > > >> >> thought
>> > > >> >>> that was implied by "This method will return an approximation
>> of the
>> > > >> >> memory
>> > > >> >>> would be freed by the next call to {@link #commit(Map)}" and
>> > > "@return
>> > > >> The
>> > > >> >>> approximate size of all records awaiting {@link #commit(Map)}",
>> > > >> however,
>> > > >> >> I
>> > > >> >>> can add it explicitly to the JavaDoc if you think this is
>> unclear?
>> > > >> >>>
>> > > >> >>> 3.
>> > > >> >>> I realise this is probably the most contentious point in my
>> design,
>> > > >> and
>> > > >> >> I'm
>> > > >> >>> open to changing it if I'm unable to convince you of the
>> benefits.
>> > > >> >>> Nevertheless, here's my argument:
>> > > >> >>> The Interactive Query (IQ) API(s) are directly provided
>> StateStores
>> > > to
>> > > >> >>> query, and it may be important for users to programmatically
>> know
>> > > >> which
>> > > >> >>> mode the StateStore is operating under. If we simply provide an
>> > > >> >>> "eosEnabled" boolean (as used throughout the internal streams
>> > > >> engine), or
>> > > >> >>> similar, then users will need to understand the operation and
>> > > >> >> consequences
>> > > >> >>> of each available processing mode and how it pertains to their
>> > > >> >> StateStore.
>> > > >> >>>
>> > > >> >>> Interactive Query users aren't the only people that care about
>> the
>> > > >> >>> processing.mode/IsolationLevel of a StateStore: implementers of
>> > > custom
>> > > >> >>> StateStores also need to understand the behaviour expected of
>> their
>> > > >> >>> implementation. KIP-892 introduces some assumptions into the
>> Streams
>> > > >> >> Engine
>> > > >> >>> about how StateStores operate under each processing mode, and
>> it's
>> > > >> >>> important that custom implementations adhere to those
>> assumptions in
>> > > >> >> order
>> > > >> >>> to maintain the consistency guarantees.
>> > > >> >>>
>> > > >> >>> IsolationLevels provide a high-level contract on the behaviour
>> of
>> > > the
>> > > >> >>> StateStore: a user knows that under READ_COMMITTED, they will
>> see
>> > > >> writes
>> > > >> >>> only after the Task has committed, and under READ_UNCOMMITTED
>> they
>> > > >> will
>> > > >> >> see
>> > > >> >>> writes immediately. No understanding of the details of each
>> > > >> >> processing.mode
>> > > >> >>> is required, either for IQ users or StateStore implementers.
>> > > >> >>>
>> > > >> >>> An argument can be made that these contractual guarantees can
>> simply
>> > > >> be
>> > > >> >>> documented for the processing.mode (i.e. that exactly-once and
>> > > >> >>> exactly-once-v2 behave like READ_COMMITTED and at-least-once
>> behaves
>> > > >> like
>> > > >> >>> READ_UNCOMMITTED), but there are several small issues with
>> this I'd
>> > > >> >> prefer
>> > > >> >>> to avoid:
>> > > >> >>>
>> > > >> >>>      - Where would we document these contracts, in a way that
>> is
>> > > >> difficult
>> > > >> >>>      for users/implementers to miss/ignore?
>> > > >> >>>      - It's not clear to users that the processing mode is
>> > > >> communicating
>> > > >> >>>      an expectation of read isolation, unless they read the
>> > > >> >> documentation. Users
>> > > >> >>>      rarely consult documentation unless they feel they need
>> to, so
>> > > >> it's
>> > > >> >> likely
>> > > >> >>>      this detail would get missed by many users.
>> > > >> >>>      - It tightly couples processing modes to read isolation.
>> Adding
>> > > >> new
>> > > >> >>>      processing modes, or changing the read isolation of
>> existing
>> > > >> >> processing
>> > > >> >>>      modes would be difficult/impossible.
>> > > >> >>>
>> > > >> >>> Ultimately, the cost of introducing IsolationLevels is just a
>> single
>> > > >> >>> method, since we re-use the existing IsolationLevel enum from
>> Kafka.
>> > > >> This
>> > > >> >>> gives us a clear place to document the contractual guarantees
>> > > expected
>> > > >> >>> of/provided by StateStores, that is accessible both by the
>> > > StateStore
>> > > >> >>> itself, and by IQ users.
>> > > >> >>>
>> > > >> >>> (Writing this I've just realised that the StateStore and IQ
>> APIs
>> > > >> actually
>> > > >> >>> don't provide access to StateStoreContext that IQ users would
>> have
>> > > >> direct
>> > > >> >>> access to... Perhaps StateStore should expose isolationLevel()
>> > > itself
>> > > >> >> too?)
>> > > >> >>>
>> > > >> >>> 4.
>> > > >> >>> Yeah, I'm not comfortable renaming the metrics in-place
>> either, as
>> > > >> it's a
>> > > >> >>> backwards incompatible change. My concern is that, if we leave
>> the
>> > > >> >> existing
>> > > >> >>> "flush" metrics in place, they will be confusing to users.
>> Right
>> > > now,
>> > > >> >>> "flush" metrics record explicit flushes to disk, but under
>> KIP-892,
>> > > >> even
>> > > >> >> a
>> > > >> >>> commit() will not explicitly flush data to disk - RocksDB will
>> > > decide
>> > > >> on
>> > > >> >>> when to flush memtables to disk itself.
>> > > >> >>>
>> > > >> >>> If we keep the existing "flush" metrics, we'd have two options,
>> > > which
>> > > >> >> both
>> > > >> >>> seem pretty bad to me:
>> > > >> >>>
>> > > >> >>>      1. Have them record calls to commit(), which would be
>> > > >> misleading, as
>> > > >> >>>      data is no longer explicitly "flushed" to disk by this
>> call.
>> > > >> >>>      2. Have them record nothing at all, which is equivalent to
>> > > >> removing
>> > > >> >> the
>> > > >> >>>      metrics, except that users will see the metric still
>> exists and
>> > > >> so
>> > > >> >> assume
>> > > >> >>>      that the metric is correct, and that there's a problem
>> with
>> > > their
>> > > >> >> system
>> > > >> >>>      when there isn't.
>> > > >> >>>
>> > > >> >>> I agree that removing them is also a bad solution, and I'd
>> like some
>> > > >> >>> guidance on the best path forward here.
>> > > >> >>>
>> > > >> >>> 5.
>> > > >> >>> Position files are updated on every write to a StateStore.
>> Since our
>> > > >> >> writes
>> > > >> >>> are now buffered until commit(), we can't update the Position
>> file
>> > > >> until
>> > > >> >>> commit() has been called, otherwise it would be inconsistent
>> with
>> > > the
>> > > >> >> data
>> > > >> >>> in the event of a rollback. Consequently, we need to manage
>> these
>> > > >> offsets
>> > > >> >>> the same way we manage the checkpoint offsets, and ensure
>> they're
>> > > only
>> > > >> >>> written on commit().
>> > > >> >>>
>> > > >> >>> 6.
>> > > >> >>> Agreed, although I'm not exactly sure yet what tests to write.
>> How
>> > > >> >> explicit
>> > > >> >>> do we need to be here in the KIP?
>> > > >> >>>
>> > > >> >>> As for upgrade/downgrade: upgrade is designed to be seamless,
>> and we
>> > > >> >> should
>> > > >> >>> definitely add some tests around that. Downgrade, it
>> transpires,
>> > > isn't
>> > > >> >>> currently possible, as the extra column family for offset
>> storage is
>> > > >> >>> incompatible with the pre-KIP-892 implementation: when you
>> open a
>> > > >> RocksDB
>> > > >> >>> database, you must open all available column families or
>> receive an
>> > > >> >> error.
>> > > >> >>> What currently happens on downgrade is that it attempts to
>> open the
>> > > >> >> store,
>> > > >> >>> throws an error about the offsets column family not being
>> opened,
>> > > >> which
>> > > >> >>> triggers a wipe and rebuild of the Task. Given that downgrades
>> > > should
>> > > >> be
>> > > >> >>> uncommon, I think this is acceptable behaviour, as the
>> end-state is
>> > > >> >>> consistent, even if it results in an undesirable state restore.
>> > > >> >>>
>> > > >> >>> Should I document the upgrade/downgrade behaviour explicitly
>> in the
>> > > >> KIP?
>> > > >> >>>
>> > > >> >>> --
>> > > >> >>>
>> > > >> >>> Regards,
>> > > >> >>> Nick
>> > > >> >>>
>> > > >> >>>
>> > > >> >>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
>> cado...@apache.org>
>> > > >> wrote:
>> > > >> >>>
>> > > >> >>>> Hi Nick!
>> > > >> >>>>
>> > > >> >>>> Thanks for the updates!
>> > > >> >>>>
>> > > >> >>>> 1.
>> > > >> >>>> Why does StateStore#flush() default to
>> > > >> >>>> StateStore#commit(Collections.emptyMap())?
>> > > >> >>>> Since calls to flush() will not exist anymore after this KIP
>> is
>> > > >> >>>> released, I would rather throw an unsupported operation
>> exception
>> > > by
>> > > >> >>>> default.
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> 2.
>> > > >> >>>> When would a state store return -1 from
>> > > >> >>>> StateStore#approximateNumUncommittedBytes() while being
>> > > >> transactional?
>> > > >> >>>>
>> > > >> >>>> Wouldn't StateStore#approximateNumUncommittedBytes() also
>> return 0
>> > > if
>> > > >> >>>> the state store is transactional but nothing has been written
>> to
>> > > the
>> > > >> >>>> state store yet?
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> 3.
>> > > >> >>>> Sorry for bringing this up again. Does this KIP really need to
>> > > >> introduce
>> > > >> >>>> StateStoreContext#isolationLevel()? StateStoreContext has
>> already
>> > > >> >>>> appConfigs() which basically exposes the same information,
>> i.e., if
>> > > >> EOS
>> > > >> >>>> is enabled or not.
>> > > >> >>>> In one of your previous e-mails you wrote:
>> > > >> >>>>
>> > > >> >>>> "My idea was to try to keep the StateStore interface as
>> loosely
>> > > >> coupled
>> > > >> >>>> from the Streams engine as possible, to give implementers more
>> > > >> freedom,
>> > > >> >>>> and reduce the amount of internal knowledge required."
>> > > >> >>>>
>> > > >> >>>> While I understand the intent, I doubt that it decreases the
>> > > >> coupling of
>> > > >> >>>> a StateStore interface and the Streams engine. READ_COMMITTED
>> only
>> > > >> >>>> applies to IQ but not to reads by processors. Thus,
>> implementers
>> > > >> need to
>> > > >> >>>> understand how Streams accesses the state stores.
>> > > >> >>>>
>> > > >> >>>> I would like to hear what others think about this.
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> 4.
>> > > >> >>>> Great exposing new metrics for transactional state stores!
>> > > However, I
>> > > >> >>>> would prefer to add new metrics and deprecate (in the docs)
>> the old
>> > > >> >>>> ones. You can find examples of deprecated metrics here:
>> > > >> >>>> https://kafka.apache.org/documentation/#selector_monitoring
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> 5.
>> > > >> >>>> Why does the KIP mention position files? I do not think they
>> are
>> > > >> related
>> > > >> >>>> to transactions or flushes.
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> 6.
>> > > >> >>>> I think we will also need to adapt/add integration tests
>> besides
>> > > unit
>> > > >> >>>> tests. Additionally, we probably need integration or system
>> tests
>> > > to
>> > > >> >>>> verify that upgrades and downgrades between transactional and
>> > > >> >>>> non-transactional state stores work as expected.
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> Best,
>> > > >> >>>> Bruno
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>>> On 7/21/23 10:34 PM, Nick Telford wrote:
>> > > >> >>>>> One more thing: I noted John's suggestion in the KIP, under
>> > > >> "Rejected
>> > > >> >>>>> Alternatives". I still think it's an idea worth pursuing,
>> but I
>> > > >> believe
>> > > >> >>>>> that it's out of the scope of this KIP, because it solves a
>> > > >> different
>> > > >> >> set
>> > > >> >>>>> of problems to this KIP, and the scope of this one has
>> already
>> > > grown
>> > > >> >>>> quite
>> > > >> >>>>> large!
>> > > >> >>>>>
>> > > >> >>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
>> > > nick.telf...@gmail.com>
>> > > >> >>>> wrote:
>> > > >> >>>>>
>> > > >> >>>>>> Hi everyone,
>> > > >> >>>>>>
>> > > >> >>>>>> I've updated the KIP (
>> > > >> >>>>>>
>> > > >> >>>>
>> > > >> >>
>> > > >>
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> > > >> >>>> )
>> > > >> >>>>>> with the latest changes; mostly bringing back "Atomic
>> > > >> Checkpointing"
>> > > >> >>>> (for
>> > > >> >>>>>> what feels like the 10th time!). I think the one thing
>> missing is
>> > > >> some
>> > > >> >>>>>> changes to metrics (notably the store "flush" metrics will
>> need
>> > > to
>> > > >> be
>> > > >> >>>>>> renamed to "commit").
>> > > >> >>>>>>
>> > > >> >>>>>> The reason I brought back Atomic Checkpointing was to
>> decouple
>> > > >> store
>> > > >> >>>> flush
>> > > >> >>>>>> from store commit. This is important, because with
>> Transactional
>> > > >> >>>>>> StateStores, we now need to call "flush" on *every* Task
>> commit,
>> > > >> and
>> > > >> >> not
>> > > >> >>>>>> just when the StateStore is closing, otherwise our
>> transaction
>> > > >> buffer
>> > > >> >>>> will
>> > > >> >>>>>> never be written and persisted, instead growing unbounded! I
>> > > >> >>>> experimented
>> > > >> >>>>>> with some simple solutions, like forcing a store flush
>> whenever
>> > > the
>> > > >> >>>>>> transaction buffer was likely to exceed its configured
>> size, but
>> > > >> this
>> > > >> >>>> was
>> > > >> >>>>>> brittle: it prevented the transaction buffer from being
>> > > configured
>> > > >> to
>> > > >> >> be
>> > > >> >>>>>> unbounded, and it still would have required explicit
>> flushes of
>> > > >> >> RocksDB,
>> > > >> >>>>>> yielding sub-optimal performance and memory utilization.
>> > > >> >>>>>>
>> > > >> >>>>>> I deemed Atomic Checkpointing to be the "right" way to
>> resolve
>> > > this
>> > > >> >>>>>> problem. By ensuring that the changelog offsets that
>> correspond
>> > > to
>> > > >> the
>> > > >> >>>> most
>> > > >> >>>>>> recently written records are always atomically written to
>> the
>> > > >> >> StateStore
>> > > >> >>>>>> (by writing them to the same transaction buffer), we can
>> avoid
>> > > >> >> forcibly
>> > > >> >>>>>> flushing the RocksDB memtables to disk, letting RocksDB
>> flush
>> > > them
>> > > >> >> only
>> > > >> >>>>>> when necessary, without losing any of our consistency
>> guarantees.
>> > > >> See
>> > > >> >>>> the
>> > > >> >>>>>> updated KIP for more info.
>> > > >> >>>>>>
>> > > >> >>>>>> I have fully implemented these changes, although I'm still
>> not
>> > > >> >> entirely
>> > > >> >>>>>> happy with the implementation for segmented StateStores, so
>> I
>> > > plan
>> > > >> to
>> > > >> >>>>>> refactor that. Despite that, all tests pass. If you'd like
>> to try
>> > > >> out
>> > > >> >> or
>> > > >> >>>>>> review this highly experimental and incomplete branch, it's
>> > > >> available
>> > > >> >>>> here:
>> > > >> >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0.
>> Note:
>> > > >> it's
>> > > >> >>>> built
>> > > >> >>>>>> against Kafka 3.5.0 so that I had a stable base to build
>> and test
>> > > >> it
>> > > >> >> on,
>> > > >> >>>>>> and to enable easy apples-to-apples comparisons in a live
>> > > >> >> environment. I
>> > > >> >>>>>> plan to rebase it against trunk once it's nearer completion
>> and
>> > > has
>> > > >> >> been
>> > > >> >>>>>> proven on our main application.
>> > > >> >>>>>>
>> > > >> >>>>>> I would really appreciate help in reviewing and testing:
>> > > >> >>>>>> - Segmented (Versioned, Session and Window) stores
>> > > >> >>>>>> - Global stores
>> > > >> >>>>>>
>> > > >> >>>>>> As I do not currently use either of these, so my primary
>> test
>> > > >> >>>> environment
>> > > >> >>>>>> doesn't test these areas.
>> > > >> >>>>>>
>> > > >> >>>>>> I'm going on Parental Leave starting next week for a few
>> weeks,
>> > > so
>> > > >> >> will
>> > > >> >>>>>> not have time to move this forward until late August. That
>> said,
>> > > >> your
>> > > >> >>>>>> feedback is welcome and appreciated, I just won't be able to
>> > > >> respond
>> > > >> >> as
>> > > >> >>>>>> quickly as usual.
>> > > >> >>>>>>
>> > > >> >>>>>> Regards,
>> > > >> >>>>>> Nick
>> > > >> >>>>>>
>> > > >> >>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
>> > > nick.telf...@gmail.com>
>> > > >> >>>> wrote:
>> > > >> >>>>>>
>> > > >> >>>>>>> Hi Bruno
>> > > >> >>>>>>>
>> > > >> >>>>>>> Yes, that's correct, although the impact on IQ is not
>> something
>> > > I
>> > > >> had
>> > > >> >>>>>>> considered.
>> > > >> >>>>>>>
>> > > >> >>>>>>> What about atomically updating the state store from the
>> > > >> transaction
>> > > >> >>>>>>>> buffer every commit interval and writing the checkpoint
>> (thus,
>> > > >> >>>> flushing
>> > > >> >>>>>>>> the memtable) every configured amount of data and/or
>> number of
>> > > >> >> commit
>> > > >> >>>>>>>> intervals?
>> > > >> >>>>>>>>
>> > > >> >>>>>>>
>> > > >> >>>>>>> I'm not quite sure I follow. Are you suggesting that we
>> add an
>> > > >> >>>> additional
>> > > >> >>>>>>> config for the max number of commit intervals between
>> > > checkpoints?
>> > > >> >> That
>> > > >> >>>>>>> way, we would checkpoint *either* when the transaction
>> buffers
>> > > are
>> > > >> >>>> nearly
>> > > >> >>>>>>> full, *OR* whenever a certain number of commit intervals
>> have
>> > > >> >> elapsed,
>> > > >> >>>>>>> whichever comes first?
>> > > >> >>>>>>>
>> > > >> >>>>>>> That certainly seems reasonable, although this re-ignites
>> an
>> > > >> earlier
>> > > >> >>>>>>> debate about whether a config should be measured in
>> "number of
>> > > >> commit
>> > > >> >>>>>>> intervals", instead of just an absolute time.
>> > > >> >>>>>>>
>> > > >> >>>>>>> FWIW, I realised that this issue is the reason I was
>> pursuing
>> > > the
>> > > >> >>>> Atomic
>> > > >> >>>>>>> Checkpoints, as it de-couples memtable flush from
>> checkpointing,
>> > > >> >> which
>> > > >> >>>>>>> enables us to just checkpoint on every commit without any
>> > > >> performance
>> > > >> >>>>>>> impact. Atomic Checkpointing is definitely the "best"
>> solution,
>> > > >> but
>> > > >> >>>> I'm not
>> > > >> >>>>>>> sure if this is enough to bring it back into this KIP.
>> > > >> >>>>>>>
>> > > >> >>>>>>> I'm currently working on moving all the transactional logic
>> > > >> directly
>> > > >> >>>> into
>> > > >> >>>>>>> RocksDBStore itself, which does away with the
>> > > >> >> StateStore#newTransaction
>> > > >> >>>>>>> method, and reduces the number of new classes introduced,
>> > > >> >> significantly
>> > > >> >>>>>>> reducing the complexity. If it works, and the complexity is
>> > > >> >> drastically
>> > > >> >>>>>>> reduced, I may try bringing back Atomic Checkpoints into
>> this
>> > > KIP.
>> > > >> >>>>>>>
>> > > >> >>>>>>> Regards,
>> > > >> >>>>>>> Nick
>> > > >> >>>>>>>
>> > > >> >>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <
>> cado...@apache.org>
>> > > >> >> wrote:
>> > > >> >>>>>>>
>> > > >> >>>>>>>> Hi Nick,
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> Thanks for the insights! Very interesting!
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> As far as I understand, you want to atomically update the
>> state
>> > > >> >> store
>> > > >> >>>>>>>> from the transaction buffer, flush the memtable of a state
>> > > store
>> > > >> and
>> > > >> >>>>>>>> write the checkpoint not after the commit time elapsed but
>> > > after
>> > > >> the
>> > > >> >>>>>>>> transaction buffer reached a size that would lead to
>> exceeding
>> > > >> >>>>>>>> statestore.transaction.buffer.max.bytes before the next
>> commit
>> > > >> >>>> interval
>> > > >> >>>>>>>> ends.
>> > > >> >>>>>>>> That means, the Kafka transaction would commit every
>> commit
>> > > >> interval
>> > > >> >>>> but
>> > > >> >>>>>>>> the state store will only be atomically updated roughly
>> every
>> > > >> >>>>>>>> statestore.transaction.buffer.max.bytes of data. Also IQ
>> would
>> > > >> then
>> > > >> >>>> only
>> > > >> >>>>>>>> see new data roughly every
>> > > >> statestore.transaction.buffer.max.bytes.
>> > > >> >>>>>>>> After a failure the state store needs to restore up to
>> > > >> >>>>>>>> statestore.transaction.buffer.max.bytes.
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> Is this correct?
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> What about atomically updating the state store from the
>> > > >> transaction
>> > > >> >>>>>>>> buffer every commit interval and writing the checkpoint
>> (thus,
>> > > >> >>>> flushing
>> > > >> >>>>>>>> the memtable) every configured amount of data and/or
>> number of
>> > > >> >> commit
>> > > >> >>>>>>>> intervals? In such a way, we would have the same delay for
>> > > >> records
>> > > >> >>>>>>>> appearing in output topics and IQ because both would
>> appear
>> > > when
>> > > >> the
>> > > >> >>>>>>>> Kafka transaction is committed. However, after a failure
>> the
>> > > >> state
>> > > >> >>>> store
>> > > >> >>>>>>>> still needs to restore up to
>> > > >> statestore.transaction.buffer.max.bytes
>> > > >> >>>> and
>> > > >> >>>>>>>> it might restore data that is already in the state store
>> > > because
>> > > >> the
>> > > >> >>>>>>>> checkpoint lags behind the last stable offset (i.e. the
>> last
>> > > >> >> committed
>> > > >> >>>>>>>> offset) of the changelog topics. Restoring data that is
>> already
>> > > >> in
>> > > >> >> the
>> > > >> >>>>>>>> state store is idempotent, so eos should not violated.
>> > > >> >>>>>>>> This solution needs at least one new config to specify
>> when a
>> > > >> >>>> checkpoint
>> > > >> >>>>>>>> should be written.
>> > > >> >>>>>>>>
>> > > >> >>>>>>>>
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> A small correction to your previous e-mail that does not
>> change
>> > > >> >>>> anything
>> > > >> >>>>>>>> you said: Under alos the default commit interval is 30
>> seconds,
>> > > >> not
>> > > >> >>>> five
>> > > >> >>>>>>>> seconds.
>> > > >> >>>>>>>>
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> Best,
>> > > >> >>>>>>>> Bruno
>> > > >> >>>>>>>>
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
>> > > >> >>>>>>>>> Hi everyone,
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I've begun performance testing my branch on our staging
>> > > >> >> environment,
>> > > >> >>>>>>>>> putting it through its paces in our non-trivial
>> application.
>> > > I'm
>> > > >> >>>>>>>> already
>> > > >> >>>>>>>>> observing the same increased flush rate that we saw the
>> last
>> > > >> time
>> > > >> >> we
>> > > >> >>>>>>>>> attempted to use a version of this KIP, but this time, I
>> > > think I
>> > > >> >> know
>> > > >> >>>>>>>> why.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called at
>> the end
>> > > >> of
>> > > >> >> the
>> > > >> >>>>>>>> Task
>> > > >> >>>>>>>>> commit process, has the following behaviour:
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>        - Under ALOS: checkpoint the state stores. This
>> > > includes
>> > > >> >>>>>>>>>        flushing memtables in RocksDB. This is acceptable
>> > > >> because the
>> > > >> >>>>>>>> default
>> > > >> >>>>>>>>>        commit.interval.ms is 5 seconds, so forcibly
>> flushing
>> > > >> >> memtables
>> > > >> >>>>>>>> every 5
>> > > >> >>>>>>>>>        seconds is acceptable for most applications.
>> > > >> >>>>>>>>>        - Under EOS: checkpointing is not done, *unless*
>> it's
>> > > >> being
>> > > >> >>>>>>>> forced, due
>> > > >> >>>>>>>>>        to e.g. the Task closing or being revoked. This
>> means
>> > > >> that
>> > > >> >> under
>> > > >> >>>>>>>> normal
>> > > >> >>>>>>>>>        processing conditions, the state stores will not
>> be
>> > > >> >>>> checkpointed,
>> > > >> >>>>>>>> and will
>> > > >> >>>>>>>>>        not have memtables flushed at all , unless RocksDB
>> > > >> decides to
>> > > >> >>>>>>>> flush them on
>> > > >> >>>>>>>>>        its own. Checkpointing stores and force-flushing
>> their
>> > > >> >> memtables
>> > > >> >>>>>>>> is only
>> > > >> >>>>>>>>>        done when a Task is being closed.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at least
>> > > *some*
>> > > >> >>>> normal
>> > > >> >>>>>>>>> Task commits, in order to write the RocksDB transaction
>> > > buffers
>> > > >> to
>> > > >> >>>> the
>> > > >> >>>>>>>>> state stores, and to ensure the offsets are synced to
>> disk to
>> > > >> >> prevent
>> > > >> >>>>>>>>> restores from getting out of hand. Consequently, my
>> current
>> > > >> >>>>>>>> implementation
>> > > >> >>>>>>>>> calls maybeCheckpoint on *every* Task commit, which is
>> far too
>> > > >> >>>>>>>> frequent.
>> > > >> >>>>>>>>> This causes checkpoints every 10,000 records, which is a
>> > > change
>> > > >> in
>> > > >> >>>>>>>> flush
>> > > >> >>>>>>>>> behaviour, potentially causing performance problems for
>> some
>> > > >> >>>>>>>> applications.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I'm looking into possible solutions, and I'm currently
>> leaning
>> > > >> >>>> towards
>> > > >> >>>>>>>>> using the statestore.transaction.buffer.max.bytes
>> > > configuration
>> > > >> to
>> > > >> >>>>>>>>> checkpoint Tasks once we are likely to exceed it. This
>> would
>> > > >> >>>>>>>> complement the
>> > > >> >>>>>>>>> existing "early Task commit" functionality that this
>> > > >> configuration
>> > > >> >>>>>>>>> provides, in the following way:
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>        - Currently, we use
>> > > >> statestore.transaction.buffer.max.bytes
>> > > >> >> to
>> > > >> >>>>>>>> force an
>> > > >> >>>>>>>>>        early Task commit if processing more records would
>> > > cause
>> > > >> our
>> > > >> >>>> state
>> > > >> >>>>>>>> store
>> > > >> >>>>>>>>>        transactions to exceed the memory assigned to
>> them.
>> > > >> >>>>>>>>>        - New functionality: when a Task *does* commit,
>> we will
>> > > >> not
>> > > >> >>>>>>>> checkpoint
>> > > >> >>>>>>>>>        the stores (and hence flush the transaction
>> buffers)
>> > > >> unless
>> > > >> >> we
>> > > >> >>>>>>>> expect to
>> > > >> >>>>>>>>>        cross the statestore.transaction.buffer.max.bytes
>> > > >> threshold
>> > > >> >>>> before
>> > > >> >>>>>>>> the next
>> > > >> >>>>>>>>>        commit
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I'm also open to suggestions.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> Regards,
>> > > >> >>>>>>>>> Nick
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
>> > > >> nick.telf...@gmail.com
>> > > >> >>>
>> > > >> >>>>>>>> wrote:
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>> Hi Bruno!
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 3.
>> > > >> >>>>>>>>>> By "less predictable for users", I meant in terms of
>> > > >> understanding
>> > > >> >>>> the
>> > > >> >>>>>>>>>> performance profile under various circumstances. The
>> more
>> > > >> complex
>> > > >> >>>> the
>> > > >> >>>>>>>>>> solution, the more difficult it would be for users to
>> > > >> understand
>> > > >> >> the
>> > > >> >>>>>>>>>> performance they see. For example, spilling records to
>> disk
>> > > >> when
>> > > >> >> the
>> > > >> >>>>>>>>>> transaction buffer reaches a threshold would, I expect,
>> > > reduce
>> > > >> >> write
>> > > >> >>>>>>>>>> throughput. This reduction in write throughput could be
>> > > >> >> unexpected,
>> > > >> >>>>>>>> and
>> > > >> >>>>>>>>>> potentially difficult to diagnose/understand for users.
>> > > >> >>>>>>>>>> At the moment, I think the "early commit" concept is
>> > > relatively
>> > > >> >>>>>>>>>> straightforward; it's easy to document, and conceptually
>> > > fairly
>> > > >> >>>>>>>> obvious to
>> > > >> >>>>>>>>>> users. We could probably add a metric to make it easier
>> to
>> > > >> >>>> understand
>> > > >> >>>>>>>> when
>> > > >> >>>>>>>>>> it happens though.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 3. (the second one)
>> > > >> >>>>>>>>>> The IsolationLevel is *essentially* an indirect way of
>> > > telling
>> > > >> >>>>>>>> StateStores
>> > > >> >>>>>>>>>> whether they should be transactional. READ_COMMITTED
>> > > >> essentially
>> > > >> >>>>>>>> requires
>> > > >> >>>>>>>>>> transactions, because it dictates that two threads
>> calling
>> > > >> >>>>>>>>>> `newTransaction()` should not see writes from the other
>> > > >> >> transaction
>> > > >> >>>>>>>> until
>> > > >> >>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all
>> bets are
>> > > >> off,
>> > > >> >>>> and
>> > > >> >>>>>>>>>> stores can allow threads to observe written records at
>> any
>> > > >> time,
>> > > >> >>>>>>>> which is
>> > > >> >>>>>>>>>> essentially "no transactions". That said, StateStores
>> are
>> > > free
>> > > >> to
>> > > >> >>>>>>>> implement
>> > > >> >>>>>>>>>> these guarantees however they can, which is a bit more
>> > > relaxed
>> > > >> >> than
>> > > >> >>>>>>>>>> dictating "you must use transactions". For example, with
>> > > >> RocksDB
>> > > >> >> we
>> > > >> >>>>>>>> would
>> > > >> >>>>>>>>>> implement these as READ_COMMITTED == WBWI-based
>> > > "transactions",
>> > > >> >>>>>>>>>> READ_UNCOMMITTED == direct writes to the database. But
>> with
>> > > >> other
>> > > >> >>>>>>>> storage
>> > > >> >>>>>>>>>> engines, it might be preferable to *always* use
>> transactions,
>> > > >> even
>> > > >> >>>>>>>> when
>> > > >> >>>>>>>>>> unnecessary; or there may be storage engines that don't
>> > > provide
>> > > >> >>>>>>>>>> transactions, but the isolation guarantees can be met
>> using a
>> > > >> >>>>>>>> different
>> > > >> >>>>>>>>>> technique.
>> > > >> >>>>>>>>>> My idea was to try to keep the StateStore interface as
>> > > loosely
>> > > >> >>>> coupled
>> > > >> >>>>>>>>>> from the Streams engine as possible, to give
>> implementers
>> > > more
>> > > >> >>>>>>>> freedom, and
>> > > >> >>>>>>>>>> reduce the amount of internal knowledge required.
>> > > >> >>>>>>>>>> That said, I understand that "IsolationLevel" might not
>> be
>> > > the
>> > > >> >> right
>> > > >> >>>>>>>>>> abstraction, and we can always make it much more
>> explicit if
>> > > >> >>>>>>>> required, e.g.
>> > > >> >>>>>>>>>> boolean transactional()
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 7-8.
>> > > >> >>>>>>>>>> I can make these changes either later today or tomorrow.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Small update:
>> > > >> >>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of
>> issues
>> > > >> that
>> > > >> >>>>>>>> needed
>> > > >> >>>>>>>>>> addressing. Currently, all the tests pass, which is
>> > > promising,
>> > > >> but
>> > > >> >>>> it
>> > > >> >>>>>>>> will
>> > > >> >>>>>>>>>> need to undergo some performance testing. I haven't
>> (yet)
>> > > >> worked
>> > > >> >> on
>> > > >> >>>>>>>>>> removing the `newTransaction()` stuff, but I would
>> expect
>> > > that,
>> > > >> >>>>>>>>>> behaviourally, it should make no difference. The branch
>> is
>> > > >> >> available
>> > > >> >>>>>>>> at
>> > > >> >>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if
>> > > anyone
>> > > >> is
>> > > >> >>>>>>>>>> interested in taking an early look.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Regards,
>> > > >> >>>>>>>>>> Nick
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
>> > > >> cado...@apache.org>
>> > > >> >>>>>>>> wrote:
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>> Hi Nick,
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> 1.
>> > > >> >>>>>>>>>>> Yeah, I agree with you. That was actually also my
>> point. I
>> > > >> >>>> understood
>> > > >> >>>>>>>>>>> that John was proposing the ingestion path as a way to
>> avoid
>> > > >> the
>> > > >> >>>>>>>> early
>> > > >> >>>>>>>>>>> commits. Probably, I misinterpreted the intent.
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> 2.
>> > > >> >>>>>>>>>>> I agree with John here, that actually it is public
>> API. My
>> > > >> >> question
>> > > >> >>>>>>>> is
>> > > >> >>>>>>>>>>> how this usage pattern affects normal processing.
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> 3.
>> > > >> >>>>>>>>>>> My concern is that checking for the size of the
>> transaction
>> > > >> >> buffer
>> > > >> >>>>>>>> and
>> > > >> >>>>>>>>>>> maybe triggering an early commit affects the whole
>> > > processing
>> > > >> of
>> > > >> >>>>>>>> Kafka
>> > > >> >>>>>>>>>>> Streams. The transactionality of a state store is not
>> > > >> confined to
>> > > >> >>>> the
>> > > >> >>>>>>>>>>> state store itself, but spills over and changes the
>> behavior
>> > > >> of
>> > > >> >>>> other
>> > > >> >>>>>>>>>>> parts of the system. I agree with you that it is a
>> decent
>> > > >> >>>>>>>> compromise. I
>> > > >> >>>>>>>>>>> just wanted to analyse the downsides and list the
>> options to
>> > > >> >>>> overcome
>> > > >> >>>>>>>>>>> them. I also agree with you that all options seem quite
>> > > heavy
>> > > >> >>>>>>>> compared
>> > > >> >>>>>>>>>>> with your KIP. I do not understand what you mean with
>> "less
>> > > >> >>>>>>>> predictable
>> > > >> >>>>>>>>>>> for users", though.
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> I found the discussions about the alternatives really
>> > > >> >> interesting.
>> > > >> >>>>>>>> But I
>> > > >> >>>>>>>>>>> also think that your plan sounds good and we should
>> continue
>> > > >> with
>> > > >> >>>> it!
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> Some comments on your reply to my e-mail on June 20th:
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> 3.
>> > > >> >>>>>>>>>>> Ah, now, I understand the reasoning behind putting
>> isolation
>> > > >> >> level
>> > > >> >>>> in
>> > > >> >>>>>>>>>>> the state store context. Thanks! Should that also be a
>> way
>> > > to
>> > > >> >> give
>> > > >> >>>>>>>> the
>> > > >> >>>>>>>>>>> the state store the opportunity to decide whether to
>> turn on
>> > > >> >>>>>>>>>>> transactions or not?
>> > > >> >>>>>>>>>>> With my comment, I was more concerned about how do you
>> know
>> > > >> if a
>> > > >> >>>>>>>>>>> checkpoint file needs to be written under EOS, if you
>> do not
>> > > >> >> have a
>> > > >> >>>>>>>> way
>> > > >> >>>>>>>>>>> to know if the state store is transactional or not. If
>> a
>> > > state
>> > > >> >>>> store
>> > > >> >>>>>>>> is
>> > > >> >>>>>>>>>>> transactional, the checkpoint file can be written
>> during
>> > > >> normal
>> > > >> >>>>>>>>>>> processing under EOS. If the state store is not
>> > > transactional,
>> > > >> >> the
>> > > >> >>>>>>>>>>> checkpoint file must not be written under EOS.
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> 7.
>> > > >> >>>>>>>>>>> My point was about not only considering the bytes in
>> memory
>> > > in
>> > > >> >>>> config
>> > > >> >>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes that
>> might
>> > > be
>> > > >> >>>>>>>> spilled
>> > > >> >>>>>>>>>>> on disk. Basically, I was wondering whether you should
>> > > remove
>> > > >> the
>> > > >> >>>>>>>>>>> "memory" in "Maximum number of memory bytes to be used
>> to
>> > > >> >>>>>>>>>>> buffer uncommitted state-store records." My thinking
>> was
>> > > that
>> > > >> >> even
>> > > >> >>>>>>>> if a
>> > > >> >>>>>>>>>>> state store spills uncommitted bytes to disk, limiting
>> the
>> > > >> >> overall
>> > > >> >>>>>>>> bytes
>> > > >> >>>>>>>>>>> might make sense. Thinking about it again and
>> considering
>> > > the
>> > > >> >>>> recent
>> > > >> >>>>>>>>>>> discussions, it does not make too much sense anymore.
>> > > >> >>>>>>>>>>> I like the name
>> statestore.transaction.buffer.max.bytes that
>> > > >> you
>> > > >> >>>>>>>> proposed.
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> 8.
>> > > >> >>>>>>>>>>> A high-level description (without implementation
>> details) of
>> > > >> how
>> > > >> >>>>>>>> Kafka
>> > > >> >>>>>>>>>>> Streams will manage the commit of changelog
>> transactions,
>> > > >> state
>> > > >> >>>> store
>> > > >> >>>>>>>>>>> transactions and checkpointing would be great. Would be
>> > > great
>> > > >> if
>> > > >> >>>> you
>> > > >> >>>>>>>>>>> could also add some sentences about the behavior in
>> case of
>> > > a
>> > > >> >>>>>>>> failure.
>> > > >> >>>>>>>>>>> For instance how does a transactional state store
>> recover
>> > > >> after a
>> > > >> >>>>>>>>>>> failure or what happens with the transaction buffer,
>> etc.
>> > > >> (that
>> > > >> >> is
>> > > >> >>>>>>>> what
>> > > >> >>>>>>>>>>> I meant by "fail-over" in point 9.)
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> Best,
>> > > >> >>>>>>>>>>> Bruno
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
>> > > >> >>>>>>>>>>>> Hi Bruno,
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> 1.
>> > > >> >>>>>>>>>>>> Isn't this exactly the same issue that
>> WriteBatchWithIndex
>> > > >> >>>>>>>> transactions
>> > > >> >>>>>>>>>>>> have, whereby exceeding (or likely to exceed)
>> configured
>> > > >> memory
>> > > >> >>>>>>>> needs to
>> > > >> >>>>>>>>>>>> trigger an early commit?
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> 2.
>> > > >> >>>>>>>>>>>> This is one of my big concerns. Ultimately, any
>> approach
>> > > >> based
>> > > >> >> on
>> > > >> >>>>>>>>>>> cracking
>> > > >> >>>>>>>>>>>> open RocksDB internals and using it in ways it's not
>> really
>> > > >> >>>> designed
>> > > >> >>>>>>>>>>> for is
>> > > >> >>>>>>>>>>>> likely to have some unforseen performance or
>> consistency
>> > > >> issues.
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> 3.
>> > > >> >>>>>>>>>>>> What's your motivation for removing these early
>> commits?
>> > > >> While
>> > > >> >> not
>> > > >> >>>>>>>>>>> ideal, I
>> > > >> >>>>>>>>>>>> think they're a decent compromise to ensure
>> consistency
>> > > >> whilst
>> > > >> >>>>>>>>>>> maintaining
>> > > >> >>>>>>>>>>>> good and predictable performance.
>> > > >> >>>>>>>>>>>> All 3 of your suggested ideas seem *very*
>> complicated, and
>> > > >> might
>> > > >> >>>>>>>>>>> actually
>> > > >> >>>>>>>>>>>> make behaviour less predictable for users as a
>> consequence.
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is
>> growing a
>> > > >> bit
>> > > >> >>>> out
>> > > >> >>>>>>>> of
>> > > >> >>>>>>>>>>>> control. While it's good to discuss ideas for future
>> > > >> >>>> improvements, I
>> > > >> >>>>>>>>>>> think
>> > > >> >>>>>>>>>>>> it's important to narrow the scope down to a design
>> that
>> > > >> >> achieves
>> > > >> >>>>>>>> the
>> > > >> >>>>>>>>>>> most
>> > > >> >>>>>>>>>>>> pressing objectives (constant sized restorations
>> during
>> > > dirty
>> > > >> >>>>>>>>>>>> close/unexpected errors). Any design that this KIP
>> produces
>> > > >> can
>> > > >> >>>>>>>>>>> ultimately
>> > > >> >>>>>>>>>>>> be changed in the future, especially if the bulk of
>> it is
>> > > >> >> internal
>> > > >> >>>>>>>>>>>> behaviour.
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> I'm going to spend some time next week trying to
>> re-work
>> > > the
>> > > >> >>>>>>>> original
>> > > >> >>>>>>>>>>>> WriteBatchWithIndex design to remove the
>> newTransaction()
>> > > >> >> method,
>> > > >> >>>>>>>> such
>> > > >> >>>>>>>>>>> that
>> > > >> >>>>>>>>>>>> it's just an implementation detail of RocksDBStore.
>> That
>> > > >> way, if
>> > > >> >>>> we
>> > > >> >>>>>>>>>>> want to
>> > > >> >>>>>>>>>>>> replace WBWI with something in the future, like the
>> SST
>> > > file
>> > > >> >>>>>>>> management
>> > > >> >>>>>>>>>>>> outlined by John, then we can do so with little/no API
>> > > >> changes.
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> Regards,
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>> Nick
>> > > >> >>>>>>>>>>>>
>> > > >> >>>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>
>> > > >> >>>>>>>
>> > > >> >>>>>
>> > > >> >>>>
>> > > >> >>>
>> > > >> >>
>> > > >> >
>> > > >>
>> > > >
>> > >
>>
>

Reply via email to